Skip to content

Commit

Permalink
Fix disabled aiokafka unit tests (#610)
Browse files Browse the repository at this point in the history
* mess around this unit tests for aiokafka statuses

* apparently highwater had a bug???

* fix formatting

* wow has this been broken this entire time???

* fix more tests

* lint

* fix yet another test

* fix yet another test

* remove unneeded import

* re-enable another test

* fix linting

* fix another test

* Update aiokafka.py

* revert changes
  • Loading branch information
wbarnha committed Apr 1, 2024
1 parent 3d737c7 commit 13e7ef1
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 32 deletions.
6 changes: 3 additions & 3 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,14 +840,14 @@ def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
poll_at = None
aiotp_state = assignment.state_value(aiotp)
if aiotp_state and aiotp_state.timestamp:
poll_at = aiotp_state.timestamp / 1000
poll_at = aiotp_state.timestamp
if poll_at is None:
if secs_since_started >= self.tp_fetch_request_timeout_secs:
# NO FETCH REQUEST SENT AT ALL SINCE WORKER START
self.log.error(
SLOW_PROCESSING_NO_FETCH_SINCE_START,
tp,
secs_since_started,
humanize_seconds_ago(secs_since_started),
)
return True

Expand All @@ -857,7 +857,7 @@ def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
self.log.error(
SLOW_PROCESSING_NO_RECENT_FETCH,
tp,
secs_since_request,
humanize_seconds_ago(secs_since_request),
)
return True

Expand Down
97 changes: 68 additions & 29 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from mode.utils import text
from mode.utils.futures import done_future
from mode.utils.times import humanize_seconds_ago
from opentracing.ext import tags

import faust
Expand Down Expand Up @@ -234,7 +235,7 @@ def mock_record(
serialized_value_size=40,
**kwargs,
):
return Mock(
return MagicMock(
name="record",
topic=topic,
partition=partition,
Expand Down Expand Up @@ -281,8 +282,8 @@ def start_span(operation_name=None, **kwargs):
return tracer

@pytest.fixture()
def _consumer(self):
return Mock(
def _consumer(self, now, cthread, tp):
_consumer = Mock(
name="AIOKafkaConsumer",
autospec=aiokafka.AIOKafkaConsumer,
start=AsyncMock(),
Expand All @@ -293,6 +294,17 @@ def _consumer(self):
_client=Mock(name="Client", close=AsyncMock()),
_coordinator=Mock(name="Coordinator", close=AsyncMock()),
)
_consumer.assignment.return_value = {tp}

(
_consumer._fetcher._subscriptions.subscription.assignment.state_value
).return_value = MagicMock(
assignment={tp},
timestamp=now,
highwater=1,
position=0,
)
return _consumer

@pytest.fixture()
def now(self):
Expand Down Expand Up @@ -465,18 +477,22 @@ def test_timed_out(self, *, cthread, _consumer, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_no_recent_fetch(Test_verify_event_path_base):
def test_recent_fetch(self, *, cthread, now, tp, logger):
self._set_last_response(now - 30.0)
self._set_last_request(now - 2.0)
assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

def test_timed_out(self, *, cthread, now, tp, logger):
def test_timed_out(self, *, cthread, now, tp, logger, _consumer):
self._set_last_response(now - 30.0)
self._set_last_request(now - cthread.tp_fetch_request_timeout_secs * 2)
assert cthread.verify_event_path(now, tp) is None
assert (
cthread.verify_event_path(
now + cthread.tp_fetch_request_timeout_secs * 2, tp
)
is None
)
logger.error.assert_called_with(
mod.SLOW_PROCESSING_NO_RECENT_FETCH,
ANY,
Expand All @@ -503,29 +519,46 @@ def test_timed_out(self, *, cthread, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_no_highwater_since_start(Test_verify_event_path_base):
highwater = None

def test_no_monitor(self, *, app, cthread, now, tp, logger):
def test_no_monitor(self, *, app, cthread, now, tp, logger, _consumer):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now)
app.monitor = None
assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

def test_just_started(self, *, cthread, now, tp, logger):
def test_just_started(self, *, cthread, now, tp, logger, _consumer):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now)
assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

def test_timed_out(self, *, cthread, now, tp, logger):
def test_timed_out(self, *, cthread, now, tp, logger, _consumer):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now - cthread.tp_stream_timeout_secs * 2)
_consumer.assignment.return_value = {tp}
assignment = cthread.assignment()

assert assignment == {tp}
fetcher = _consumer._fetcher
(fetcher._subscriptions.subscription.assignment.state_value).return_value = (
MagicMock(
assignment=assignment,
timestamp=now,
highwater=None,
tp_stream_timeout_secs=cthread.tp_stream_timeout_secs,
tp_fetch_request_timeout_secs=cthread.tp_fetch_request_timeout_secs,
)
)
(
fetcher._subscriptions.subscription.assignment.state_value.timestamp
).return_value = now

assert cthread.verify_event_path(now, tp) is None
logger.error.assert_called_with(
mod.SLOW_PROCESSING_NO_HIGHWATER_SINCE_START,
Expand All @@ -534,7 +567,6 @@ def test_timed_out(self, *, cthread, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_stream_idle_no_highwater(Test_verify_event_path_base):
highwater = 10
committed_offset = 10
Expand All @@ -547,7 +579,6 @@ def test_highwater_same_as_offset(self, *, cthread, now, tp, logger):
logger.error.assert_not_called()


@pytest.mark.skip("Needs fixing")
class Test_VEP_stream_idle_highwater_no_acks(Test_verify_event_path_base):
acks_enabled = False

Expand All @@ -559,7 +590,6 @@ def test_no_acks(self, *, cthread, now, tp, logger):
logger.error.assert_not_called()


@pytest.mark.skip("Needs fixing")
class Test_VEP_stream_idle_highwater_same_has_acks_everything_OK(
Test_verify_event_path_base
):
Expand All @@ -572,6 +602,7 @@ def test_main(self, *, cthread, now, tp, logger):
self._set_last_request(now - 10.0)
self._set_last_response(now - 5.0)
self._set_started(now)

assert cthread.verify_event_path(now, tp) is None
logger.error.assert_not_called()

Expand Down Expand Up @@ -636,7 +667,6 @@ def test_inbound_timed_out(self, *, app, cthread, now, tp, logger):
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_no_commit(Test_verify_event_path_base):
highwater = 20
committed_offset = 10
Expand Down Expand Up @@ -664,13 +694,13 @@ def test_timed_out_since_start(self, *, app, cthread, now, tp, logger):
expected_message = cthread._make_slow_processing_error(
mod.SLOW_PROCESSING_NO_COMMIT_SINCE_START,
[mod.SLOW_PROCESSING_CAUSE_COMMIT],
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
)
logger.error.assert_called_once_with(
expected_message,
tp,
ANY,
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
humanize_seconds_ago(cthread.tp_commit_timeout_secs * 2),
)

def test_timed_out_since_last(self, *, app, cthread, now, tp, logger):
Expand All @@ -681,13 +711,13 @@ def test_timed_out_since_last(self, *, app, cthread, now, tp, logger):
expected_message = cthread._make_slow_processing_error(
mod.SLOW_PROCESSING_NO_RECENT_COMMIT,
[mod.SLOW_PROCESSING_CAUSE_COMMIT],
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
)
logger.error.assert_called_once_with(
expected_message,
tp,
ANY,
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
humanize_seconds_ago(now - cthread.tp_commit_timeout_secs * 4),
)

def test_committing_fine(self, *, app, cthread, now, tp, logger):
Expand Down Expand Up @@ -1344,26 +1374,30 @@ def assert_new_producer(
max_batch_size=16384,
max_request_size=1000000,
request_timeout_ms=1200000,
metadata_max_age_ms=300000,
connections_max_idle_ms=540000,
security_protocol="PLAINTEXT",
**kwargs,
):
with patch("aiokafka.AIOKafkaProducer") as AIOKafkaProducer:
p = producer._new_producer()
assert p is AIOKafkaProducer.return_value
AIOKafkaProducer.assert_called_once_with(
acks=acks,
api_version=api_version,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
compression_type=compression_type,
acks=acks,
linger_ms=linger_ms,
max_batch_size=max_batch_size,
max_request_size=max_request_size,
request_timeout_ms=request_timeout_ms,
compression_type=compression_type,
security_protocol=security_protocol,
loop=producer.loop,
partitioner=producer.partitioner,
transactional_id=None,
api_version=api_version,
metadata_max_age_ms=metadata_max_age_ms,
connections_max_idle_ms=connections_max_idle_ms,
request_timeout_ms=request_timeout_ms,
**kwargs,
)

Expand Down Expand Up @@ -1433,7 +1467,6 @@ def test__settings_extra(self, *, producer, app):
app.in_transaction = False
assert producer._settings_extra() == {}

@pytest.mark.skip("fix me")
def test__new_producer(self, *, app):
producer = Producer(app.transport)
self.assert_new_producer(producer)
Expand All @@ -1442,8 +1475,8 @@ def test__new_producer(self, *, app):
"expected_args",
[
pytest.param(
{"api_version": "0.10"},
marks=pytest.mark.conf(producer_api_version="0.10"),
{"api_version": "auto"},
marks=pytest.mark.conf(producer_api_version="auto"),
),
pytest.param({"acks": -1}, marks=pytest.mark.conf(producer_acks="all")),
pytest.param(
Expand Down Expand Up @@ -1474,6 +1507,14 @@ def test__new_producer(self, *, app):
{"request_timeout_ms": 1234134000},
marks=pytest.mark.conf(producer_request_timeout=1234134),
),
pytest.param(
{"metadata_max_age_ms": 300000},
marks=pytest.mark.conf(metadata_max_age_ms=300000),
),
pytest.param(
{"connections_max_idle_ms": 540000},
marks=pytest.mark.conf(connections_max_idle_ms=540000),
),
pytest.param(
{
"security_protocol": "SASL_PLAINTEXT",
Expand All @@ -1490,7 +1531,6 @@ def test__new_producer(self, *, app):
),
],
)
@pytest.mark.skip("fix me")
def test__new_producer__using_settings(self, expected_args, *, app):
producer = Producer(app.transport)
self.assert_new_producer(producer, **expected_args)
Expand Down Expand Up @@ -1802,7 +1842,6 @@ async def test_on_start(
await threaded_producer.start()
await threaded_producer.stop()

@pytest.mark.skip("Needs fixing")
@pytest.mark.asyncio
async def test_on_thread_stop(
self,
Expand Down

0 comments on commit 13e7ef1

Please sign in to comment.