From 1cafcce8dfe6a3f48a7dbd94befba26d16f2a50d Mon Sep 17 00:00:00 2001 From: bodja Date: Fri, 4 Oct 2024 17:18:18 +0200 Subject: [PATCH 1/2] Change `Connector.name` generation logic. Do not allow adding to the registry the same key. Add `kafka` module to autodiscover connectors and consumers. - Use class name only instead of the module and class name together, to avoid issues when the connector is moved to a different module - Registry will now fail when attempting to register the same key. refs #16 --- README.md | 4 +- django_kafka/__init__.py | 11 ++- django_kafka/connect/connector.py | 2 +- django_kafka/registry.py | 8 ++ django_kafka/tests/test_registry.py | 123 +++++++++++++++++++--------- 5 files changed, 106 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 576952a..4e616ca 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ class Topic1(Topic): Consumers define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel. -Consumers are auto-discovered and are expected to be located under the `consumers.py`. +Consumers are auto-discovered and are expected to be located under the `some_django_app/kafka/consumers.py` or `some_django_app/consumers.py`. ```python # ./consumers.py @@ -205,6 +205,8 @@ When consumers are started using [start commands](#start-the-Consumers), an addi ## Connectors +Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`. + Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry. `django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration. diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index 75935bf..f465940 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -9,7 +9,7 @@ from django_kafka.conf import settings from django_kafka.exceptions import DjangoKafkaError from django_kafka.producer import Producer -from django_kafka.registry import ConsumersRegistry, Registry +from django_kafka.registry import ConnectorsRegistry, ConsumersRegistry from django_kafka.retry.settings import RetrySettings if TYPE_CHECKING: @@ -28,11 +28,16 @@ def autodiscover(): - autodiscover_modules("consumers", "connectors") + autodiscover_modules( + "consumers", + "connectors", + "kafka.consumers", + "kafka.connectors", + ) class DjangoKafka: - connectors = Registry["Connector"]() + connectors = ConnectorsRegistry() consumers = ConsumersRegistry() retry = RetrySettings diff --git a/django_kafka/connect/connector.py b/django_kafka/connect/connector.py index 6ba2569..de168b5 100644 --- a/django_kafka/connect/connector.py +++ b/django_kafka/connect/connector.py @@ -30,7 +30,7 @@ class Connector(ABC): @property def name(self) -> str: """Name of the connector.""" - return f"{settings.CLIENT_ID}.{self.__class__.__module__}.{self.__class__.__name__}" + return f"{settings.CLIENT_ID}.{self.__class__.__name__}" @property @abstractmethod diff --git a/django_kafka/registry.py b/django_kafka/registry.py index e1bd328..3128241 100644 --- a/django_kafka/registry.py +++ b/django_kafka/registry.py @@ -4,6 +4,7 @@ if TYPE_CHECKING: from django_kafka.consumer import Consumer + from django_kafka.connect.connector import Connector T = TypeVar('T') @@ -34,9 +35,16 @@ def get_key(self, cls: Type[T]) -> str: def register(self, cls: Type[T]): key = self.get_key(cls) + if key in self._classes: + raise DjangoKafkaError(f"`{key}` is already registered.") self._classes[key] = cls +class ConnectorsRegistry(Registry["Connector"]): + def get_key(self, cls) -> str: + return cls().name + + class ConsumersRegistry(Registry["Consumer"]): def register(self, cls): diff --git a/django_kafka/tests/test_registry.py b/django_kafka/tests/test_registry.py index cd1d831..9089a4f 100644 --- a/django_kafka/tests/test_registry.py +++ b/django_kafka/tests/test_registry.py @@ -1,48 +1,84 @@ from typing import Type from unittest import mock -from django.test import TestCase +from django.test import SimpleTestCase +from django_kafka.connect.connector import Connector from django_kafka.consumer import Consumer, Topics -from django_kafka.registry import ConsumersRegistry +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.registry import ConsumersRegistry, Registry, ConnectorsRegistry -class ConsumersRegistryTestCase(TestCase): - def _get_consumer_cls(self, name, group_id) -> Type[Consumer]: - return type[Consumer]( - name, - (Consumer,), - {"config": {"group.id": group_id}, "topics": Topics()}, +class RegistryTestCase(SimpleTestCase): + def _gen_cls(self, name): + return type(name, (object, ), {}) + + def test_registering_same_key_not_allowed(self): + registry = Registry() + + cls = self._gen_cls("SomeClass") + key = registry.get_key(cls) + error_msg = f"`{key}` is already registered." + + registry()(cls) + + with self.assertRaisesMessage(DjangoKafkaError, error_msg): + registry()(cls) + + def test_get_key(self): + cls_a = self._gen_cls("ClassA") + cls_b = self._gen_cls("ClassB") + registry = Registry() + + key_a = registry.get_key(cls_a) + self.assertEqual( + key_a, + f"{cls_a.__module__}.{cls_a.__name__}", + ) + + key_b = registry.get_key(cls_b) + self.assertEqual( + key_b, + f"{cls_b.__module__}.{cls_b.__name__}", ) def test_decorator_adds_to_registry(self): - consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") - consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b") + cls_a = self._gen_cls("ClassA") + cls_b = self._gen_cls("classB") - registry = ConsumersRegistry() + registry = Registry() - self.assertIs(registry()(consumer_cls_a), consumer_cls_a) - self.assertIs(registry()(consumer_cls_b), consumer_cls_b) + self.assertIs(registry()(cls_a), cls_a) + self.assertIs(registry()(cls_b), cls_b) - key_a = registry.get_key(consumer_cls_a) - self.assertIs(registry[key_a], consumer_cls_a) + key_a = registry.get_key(cls_a) + self.assertIs(registry[key_a], cls_a) - key_b = registry.get_key(consumer_cls_b) - self.assertIs(registry[key_b], consumer_cls_b) + key_b = registry.get_key(cls_b) + self.assertIs(registry[key_b], cls_b) def test_iter_returns_expected_keys(self): - consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") - consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b") - registry = ConsumersRegistry() + cls_a = self._gen_cls("ClassA") + cls_b = self._gen_cls("ClassB") + registry = Registry() - registry()(consumer_cls_a) - registry()(consumer_cls_b) + registry()(cls_a) + registry()(cls_b) - key_a = registry.get_key(consumer_cls_a) - key_b = registry.get_key(consumer_cls_b) + key_a = registry.get_key(cls_a) + key_b = registry.get_key(cls_b) self.assertCountEqual(list(registry), [key_a, key_b]) + +class ConsumersRegistryTestCase(SimpleTestCase): + def _get_consumer_cls(self, name, group_id) -> Type[Consumer]: + return type[Consumer]( + name, + (Consumer,), + {"config": {"group.id": group_id}, "topics": Topics()}, + ) + @mock.patch("django_kafka.retry.consumer.RetryConsumer.build") def test_retry_consumer_registered(self, mock_retry_consumer_build): consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") @@ -61,19 +97,32 @@ def test_retry_consumer_registered(self, mock_retry_consumer_build): self.assertCountEqual(list(registry), [key_a, retry_key_a, key_b]) self.assertIs(registry[retry_key_a], retry_consumer_mock) - def test_get_key(self): - consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") - consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b") - registry = ConsumersRegistry() - key_a = registry.get_key(consumer_cls_a) - self.assertEqual( - key_a, - f"{consumer_cls_a.__module__}.{consumer_cls_a.__name__}", +class ConnectorsRegistryTestCase(SimpleTestCase): + def _gen_cls(self, name) -> Type[Connector]: + return type[Connector]( + name, + (Connector,), + {"config": {}}, ) - key_b = registry.get_key(consumer_cls_b) - self.assertEqual( - key_b, - f"{consumer_cls_b.__module__}.{consumer_cls_b.__name__}", - ) + def test_get_key(self): + connector_cls = self._gen_cls("ConnectorCls") + registry = ConnectorsRegistry() + + key = registry.get_key(connector_cls) + self.assertEqual(key, connector_cls().name) + + def test_registering_same_key_not_allowed(self): + registry = ConnectorsRegistry() + connector_a = self._gen_cls("ConnectorA") + # give the same class name to the connector to generate same key + connector_b = self._gen_cls("ConnectorA") + + key = registry.get_key(connector_a) + error_msg = f"`{key}` is already registered." + + registry()(connector_a) + + with self.assertRaisesMessage(DjangoKafkaError, error_msg): + registry()(connector_b) From 5994a3f3c11a025c52e9b1765d4c01e337864681 Mon Sep 17 00:00:00 2001 From: bodja Date: Fri, 4 Oct 2024 17:18:21 +0200 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version:=200.4.3=20=E2=86=92=200.4.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.toml | 2 +- django_kafka/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.toml b/.bumpversion.toml index c7e0357..e6c93f2 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "0.4.3" +current_version = "0.4.4" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)" serialize = ["{major}.{minor}.{patch}"] search = "{current_version}" diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index f465940..5476be3 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) -__version__ = "0.4.3" +__version__ = "0.4.4" __all__ = [ "autodiscover", diff --git a/pyproject.toml b/pyproject.toml index 922905b..52700e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "django-kafka" -version = "0.4.3" +version = "0.4.4" dependencies = [ "django>=4.0,<6.0", "confluent-kafka[avro, schema-registry]==2.4.0"