From 94644d1d951c252a3af6e1eb504b1bfd1c61ce8f Mon Sep 17 00:00:00 2001 From: Tomek Miodek Date: Tue, 9 Jul 2024 18:43:11 +0200 Subject: [PATCH] fix: aiokafka `0.11.0` error when creating topics --- .coveragerc | 1 - docs/conf.py | 1 - extra/bandit/baseline.json | 48 ----------- faust/transport/drivers/aiokafka.py | 2 +- faust/utils/kafka/__init__.py | 1 - faust/utils/kafka/protocol/__init__.py | 1 - faust/utils/kafka/protocol/admin.py | 106 ------------------------- faust/utils/kafka/protocol/api.py | 55 ------------- tests/unit/tables/test_wrappers.py | 2 +- 9 files changed, 2 insertions(+), 215 deletions(-) delete mode 100644 faust/utils/kafka/__init__.py delete mode 100644 faust/utils/kafka/protocol/__init__.py delete mode 100644 faust/utils/kafka/protocol/admin.py delete mode 100644 faust/utils/kafka/protocol/api.py diff --git a/.coveragerc b/.coveragerc index 79a68ebf3..31cb9f69d 100644 --- a/.coveragerc +++ b/.coveragerc @@ -28,7 +28,6 @@ omit = # not needed */faust/utils/functional.py - */faust/utils/kafka/* */faust/utils/iso8601.py */faust/utils/platforms.py */faust/utils/tracing.py diff --git a/docs/conf.py b/docs/conf.py index badf67f8a..92518bae0 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -58,7 +58,6 @@ 'faust.types._env', 'faust.utils', 'faust.utils._iso8601_python', - r'faust.utils.kafka.*', 'faust.web', r'faust.web.apps.*', 'faust.web.apps.stats.app', diff --git a/extra/bandit/baseline.json b/extra/bandit/baseline.json index e56de74bb..4c9cc6b2d 100644 --- a/extra/bandit/baseline.json +++ b/extra/bandit/baseline.json @@ -1490,54 +1490,6 @@ "loc": 153, "nosec": 0 }, - "faust/utils/kafka/__init__.py": { - "CONFIDENCE.HIGH": 0.0, - "CONFIDENCE.LOW": 0.0, - "CONFIDENCE.MEDIUM": 0.0, - "CONFIDENCE.UNDEFINED": 0.0, - "SEVERITY.HIGH": 0.0, - "SEVERITY.LOW": 0.0, - "SEVERITY.MEDIUM": 0.0, - "SEVERITY.UNDEFINED": 0.0, - "loc": 1, - "nosec": 0 - }, - "faust/utils/kafka/protocol/__init__.py": { - "CONFIDENCE.HIGH": 0.0, - "CONFIDENCE.LOW": 0.0, - "CONFIDENCE.MEDIUM": 0.0, - "CONFIDENCE.UNDEFINED": 0.0, - "SEVERITY.HIGH": 0.0, - "SEVERITY.LOW": 0.0, - "SEVERITY.MEDIUM": 0.0, - "SEVERITY.UNDEFINED": 0.0, - "loc": 1, - "nosec": 0 - }, - "faust/utils/kafka/protocol/admin.py": { - "CONFIDENCE.HIGH": 0.0, - "CONFIDENCE.LOW": 0.0, - "CONFIDENCE.MEDIUM": 0.0, - "CONFIDENCE.UNDEFINED": 0.0, - "SEVERITY.HIGH": 0.0, - "SEVERITY.LOW": 0.0, - "SEVERITY.MEDIUM": 0.0, - "SEVERITY.UNDEFINED": 0.0, - "loc": 61, - "nosec": 0 - }, - "faust/utils/kafka/protocol/api.py": { - "CONFIDENCE.HIGH": 0.0, - "CONFIDENCE.LOW": 0.0, - "CONFIDENCE.MEDIUM": 0.0, - "CONFIDENCE.UNDEFINED": 0.0, - "SEVERITY.HIGH": 0.0, - "SEVERITY.LOW": 0.0, - "SEVERITY.MEDIUM": 0.0, - "SEVERITY.UNDEFINED": 0.0, - "loc": 47, - "nosec": 0 - }, "faust/utils/platforms.py": { "CONFIDENCE.HIGH": 3.0, "CONFIDENCE.LOW": 0.0, diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index b52be7b16..6bd66ef13 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -41,6 +41,7 @@ for_code, ) from aiokafka.partitioner import DefaultPartitioner, murmur2 +from aiokafka.protocol.admin import CreateTopicsRequest from aiokafka.protocol.metadata import MetadataRequest_v1 from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition from aiokafka.util import parse_kafka_version @@ -82,7 +83,6 @@ ) from faust.types.auth import CredentialsT from faust.types.transports import ConsumerT, PartitionerT, ProducerT -from faust.utils.kafka.protocol.admin import CreateTopicsRequest from faust.utils.tracing import noop_span, set_current_span, traced_from_parent_span __all__ = ["Consumer", "Producer", "Transport"] diff --git a/faust/utils/kafka/__init__.py b/faust/utils/kafka/__init__.py deleted file mode 100644 index 6d83e4241..000000000 --- a/faust/utils/kafka/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Kafka utilities.""" diff --git a/faust/utils/kafka/protocol/__init__.py b/faust/utils/kafka/protocol/__init__.py deleted file mode 100644 index 560567092..000000000 --- a/faust/utils/kafka/protocol/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Kafka protocol definitions.""" diff --git a/faust/utils/kafka/protocol/admin.py b/faust/utils/kafka/protocol/admin.py deleted file mode 100644 index f2be80d9c..000000000 --- a/faust/utils/kafka/protocol/admin.py +++ /dev/null @@ -1,106 +0,0 @@ -"""Admin related Kafka protocol extensions.""" - -from aiokafka.protocol import types - -from .api import Request, Response - - -class CreateTopicsResponse_v0(Response): - """Response from Create Topic request (version 0).""" - - API_KEY = 19 - API_VERSION = 0 - SCHEMA = types.Schema( - ( - "topic_error_codes", - types.Array(("topic", types.String("utf-8")), ("error_code", types.Int16)), - ), - ) - - -class CreateTopicsResponse_v1(Response): - """Response from Create Topic request (version 1).""" - - API_KEY = 19 - API_VERSION = 1 - SCHEMA = types.Schema( - ( - "topic_error_codes", - types.Array( - ("topic", types.String("utf-8")), - ("error_code", types.Int16), - ("error_message", types.String("utf-8")), - ), - ), - ) - - -class CreateTopicsRequest_v0(Request): - """Request to create topic (version 0).""" - - API_KEY = 19 - API_VERSION = 0 - RESPONSE_TYPE = CreateTopicsResponse_v0 - SCHEMA = types.Schema( - ( - "create_topic_requests", - types.Array( - ("topic", types.String("utf-8")), - ("num_partitions", types.Int32), - ("replication_factor", types.Int16), - ( - "replica_assignment", - types.Array( - ("partition_id", types.Int32), - ("replicas", types.Array(types.Int32)), - ), - ), - ( - "configs", - types.Array( - ("config_key", types.String("utf-8")), - ("config_value", types.String("utf-8")), - ), - ), - ), - ), - ("timeout", types.Int32), - ) - - -class CreateTopicsRequest_v1(Request): - """Request to create topic (version 1).""" - - API_KEY = 19 - API_VERSION = 1 - RESPONSE_TYPE = CreateTopicsResponse_v1 - SCHEMA = types.Schema( - ( - "create_topic_requests", - types.Array( - ("topic", types.String("utf-8")), - ("num_partitions", types.Int32), - ("replication_factor", types.Int16), - ( - "replica_assignment", - types.Array( - ("partition_id", types.Int32), - ("replicas", types.Array(types.Int32)), - ), - ), - ( - "configs", - types.Array( - ("config_key", types.String("utf-8")), - ("config_value", types.String("utf-8")), - ), - ), - ), - ), - ("timeout", types.Int32), - ("validate_only", types.Boolean), - ) - - -CreateTopicsRequest = [CreateTopicsRequest_v0, CreateTopicsRequest_v1] -CreateTopicsResponse = [CreateTopicsResponse_v0, CreateTopicsRequest_v1] diff --git a/faust/utils/kafka/protocol/api.py b/faust/utils/kafka/protocol/api.py deleted file mode 100644 index 74af0f466..000000000 --- a/faust/utils/kafka/protocol/api.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Kafka protocol extensions.""" - -# pragma: no cover -import abc -from typing import Type - -from aiokafka.protocol.struct import Struct -from aiokafka.protocol.types import Schema - - -class Response(Struct, metaclass=abc.ABCMeta): # type: ignore - """API Response.""" - - @property - @abc.abstractmethod - def API_KEY(self) -> int: - """Integer identifier for api request/response.""" - - @property - @abc.abstractmethod - def API_VERSION(self) -> int: - """Integer of api request/response version.""" - - @property - @abc.abstractmethod - def SCHEMA(self) -> Schema: - """Return instance of Schema() representing the response structure.""" - - -class Request(Struct, metaclass=abc.ABCMeta): # type: ignore - """API Request.""" - - @property - @abc.abstractmethod - def API_KEY(self) -> int: - """Integer identifier for api request.""" - - @property - @abc.abstractmethod - def API_VERSION(self) -> int: - """Integer of api request version.""" - - @property - @abc.abstractmethod - def SCHEMA(self) -> Schema: - """Return instance of Schema() representing the request structure.""" - - @property - @abc.abstractmethod - def RESPONSE_TYPE(self) -> Type[Response]: - """Response class associated with the api request.""" - - def expect_response(self) -> bool: - """Return True if request type does not always return response.""" - return True diff --git a/tests/unit/tables/test_wrappers.py b/tests/unit/tables/test_wrappers.py index 18c233f6d..0aa11d519 100644 --- a/tests/unit/tables/test_wrappers.py +++ b/tests/unit/tables/test_wrappers.py @@ -374,7 +374,7 @@ def wset(self, *, iwtable, event): @pytest.fixture() def data(self, *, freeze_time, iwtable): - iwtable.key_index_table = {k: 1 for k in self.TABLE_DATA} + iwtable.key_index_table = dict.fromkeys(self.TABLE_DATA, 1) iwtable.table._data = {} for w in iwtable.table._window_ranges(freeze_time.time): iwtable.table._data.update({(k, w): v for k, v in self.TABLE_DATA.items()})