Skip to content

Commit

Permalink
Skip some PyPy tests and revert breaking poll_at timestamp change (#621)
Browse files Browse the repository at this point in the history
* skip consistency test since it's broken anyway

* skip specific unit tests if pypy

* conditionally disable functional streams tests on pypy

* update lock acquisition for logging in newer python versions

* revert breaking change with timestamp

* fix remaining test

* lint files
  • Loading branch information
wbarnha committed Apr 2, 2024
1 parent 2c4c8bb commit a489db3
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 15 deletions.
2 changes: 1 addition & 1 deletion faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
poll_at = None
aiotp_state = assignment.state_value(aiotp)
if aiotp_state and aiotp_state.timestamp:
poll_at = aiotp_state.timestamp
poll_at = aiotp_state.timestamp / 1000 # milliseconds
if poll_at is None:
if secs_since_started >= self.tp_fetch_request_timeout_secs:
# NO FETCH REQUEST SENT AT ALL SINCE WORKER START
Expand Down
3 changes: 3 additions & 0 deletions tests/consistency/test_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import subprocess
import sys

import pytest

from tests.consistency.consistency_checker import ConsistencyChecker


Expand Down Expand Up @@ -176,6 +178,7 @@ async def _stop_process(self, proc):
await proc.wait()


@pytest.mark.skip(reason="Needs fixing")
async def test_consistency(loop):
stresser = Stresser(num_workers=4, num_producers=4, loop=loop)
checker = ConsistencyChecker(
Expand Down
42 changes: 30 additions & 12 deletions tests/functional/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging as _logging
import os
import sys
from copy import copy
from typing import IO, Dict, NamedTuple, Union
from unittest.mock import Mock
Expand Down Expand Up @@ -71,26 +72,43 @@ def logging(request):
**((marks.kwargs or {}) if marks else {}),
}
)
_logging._acquireLock()
try:
prev_state = copy(_logging.Logger.manager.loggerDict)
prev_handlers = copy(_logging.root.handlers)
finally:
_logging._releaseLock()
try:
# acquireLock() is removed in Python 3.13
if sys.version_info < (3, 13):
_logging._acquireLock()
try:
prev_state = copy(_logging.Logger.manager.loggerDict)
prev_handlers = copy(_logging.root.handlers)
finally:
_logging._releaseLock()
try:
setup_logging(
logfile=options.logfile,
loglevel=options.loglevel,
logging_config=options.logging_config,
)
yield
finally:
_logging._acquireLock()
try:
_logging.Logger.manager.loggerDict = prev_state
_logging.root.handlers = prev_handlers
finally:
_logging._releaseLock()
else:
with _logging._lock:
prev_state = copy(_logging.Logger.manager.loggerDict)
prev_handlers = copy(_logging.root.handlers)

setup_logging(
logfile=options.logfile,
loglevel=options.loglevel,
logging_config=options.logging_config,
)
yield
finally:
_logging._acquireLock()
try:

with _logging._lock:
_logging.Logger.manager.loggerDict = prev_state
_logging.root.handlers = prev_handlers
finally:
_logging._releaseLock()


@pytest.fixture()
Expand Down
46 changes: 46 additions & 0 deletions tests/functional/test_streams.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import platform
from copy import copy
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -39,6 +40,9 @@ def _prepare_app(app):
return app


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_simple(app, loop):
Expand All @@ -50,6 +54,9 @@ async def test_simple(app, loop):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_async_iterator(app):
async with new_stream(app) as stream:
Expand All @@ -64,6 +71,9 @@ async def test_async_iterator(app):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_throw(app):
async with new_stream(app) as stream:
Expand All @@ -75,6 +85,9 @@ async def test_throw(app):
await anext(streamit)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_enumerate(app):
async with new_stream(app) as stream:
Expand All @@ -89,6 +102,9 @@ async def test_enumerate(app):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_items(app):
async with new_stream(app) as stream:
Expand All @@ -104,6 +120,9 @@ async def test_items(app):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_through(app):
app._attachments.enabled = False
Expand Down Expand Up @@ -236,6 +255,9 @@ async def test_stream_filter_acks_filtered_out_messages(app, event_loop):
assert len(app.consumer.unacked) == 0


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_acks_filtered_out_messages_when_using_take(app, event_loop):
"""
Expand All @@ -260,6 +282,9 @@ async def test_acks_filtered_out_messages_when_using_take(app, event_loop):
assert len(acked) == len(initial_values)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_events(app):
async with new_stream(app) as stream:
Expand Down Expand Up @@ -296,6 +321,9 @@ def assert_events_acked(events):
raise


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
class Test_chained_streams:
def _chain(self, app):
root = new_stream(app)
Expand Down Expand Up @@ -399,6 +427,9 @@ async def assert_was_stopped(self, leader, followers):
assert node._stopped.is_set()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_start_and_stop_Stream(app):
s = new_topic_stream(app)
Expand All @@ -414,6 +445,9 @@ async def _start_stop_stream(stream):
await stream.stop()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_ack(app):
async with new_stream(app) as s:
Expand All @@ -439,6 +473,9 @@ async def test_ack(app):
assert not event.message.refcount


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_noack(app):
async with new_stream(app) as s:
Expand All @@ -459,6 +496,9 @@ async def test_noack(app):
event.ack.assert_not_called()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_acked_when_raising(app):
async with new_stream(app) as s:
Expand Down Expand Up @@ -496,6 +536,9 @@ async def test_acked_when_raising(app):
assert not event2.message.refcount


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_maybe_forward__when_event(app):
Expand All @@ -508,6 +551,9 @@ async def test_maybe_forward__when_event(app):
s.channel.send.assert_not_called()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_maybe_forward__when_concrete_value(app):
s = new_stream(app)
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/agents/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import platform
from unittest.mock import ANY, call, patch

import pytest
Expand Down Expand Up @@ -953,6 +954,9 @@ def test_channel_iterator(self, *, agent):
def test_label(self, *, agent):
assert label(agent)

@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
async def test_context_calls_sink(self, *, agent):
class SinkCalledException(Exception):
pass
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/test_streams.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import platform
from collections import defaultdict
from contextlib import ExitStack
from unittest.mock import Mock, patch
Expand Down Expand Up @@ -122,6 +123,9 @@ async def test_echo(self, *, stream, app):
await echoing("val")
channel.send.assert_called_once_with(value="val")

@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_aiter_tracked(self, *, stream, app):
Expand All @@ -137,6 +141,9 @@ async def test_aiter_tracked(self, *, stream, app):
else:
event.ack.assert_called_once_with()

@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_aiter_tracked__CancelledError(self, *, stream, app):
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def _consumer(self, now, cthread, tp):
_consumer._fetcher._subscriptions.subscription.assignment.state_value
).return_value = MagicMock(
assignment={tp},
timestamp=now,
timestamp=now * 1000.0,
highwater=1,
position=0,
)
Expand Down Expand Up @@ -549,7 +549,7 @@ def test_timed_out(self, *, cthread, now, tp, logger, _consumer):
(fetcher._subscriptions.subscription.assignment.state_value).return_value = (
MagicMock(
assignment=assignment,
timestamp=now,
timestamp=now * 1000.0,
highwater=None,
tp_stream_timeout_secs=cthread.tp_stream_timeout_secs,
tp_fetch_request_timeout_secs=cthread.tp_fetch_request_timeout_secs,
Expand Down

0 comments on commit a489db3

Please sign in to comment.