Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

16 connectors registry #22

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.4.3"
current_version = "0.4.4"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Topic1(Topic):

Consumers define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel.

Consumers are auto-discovered and are expected to be located under the `consumers.py`.
Consumers are auto-discovered and are expected to be located under the `some_django_app/kafka/consumers.py` or `some_django_app/consumers.py`.

```python
# ./consumers.py
Expand Down Expand Up @@ -205,6 +205,8 @@ When consumers are started using [start commands](#start-the-Consumers), an addi

## Connectors

Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`.

Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry.

`django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration.
Expand Down
13 changes: 9 additions & 4 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry, Registry
from django_kafka.registry import ConnectorsRegistry, ConsumersRegistry
from django_kafka.retry.settings import RetrySettings

if TYPE_CHECKING:
Expand All @@ -18,7 +18,7 @@

logger = logging.getLogger(__name__)

__version__ = "0.4.3"
__version__ = "0.4.4"

__all__ = [
"autodiscover",
Expand All @@ -28,11 +28,16 @@


def autodiscover():
autodiscover_modules("consumers", "connectors")
autodiscover_modules(
"consumers",
"connectors",
"kafka.consumers",
"kafka.connectors",
)


class DjangoKafka:
connectors = Registry["Connector"]()
connectors = ConnectorsRegistry()
consumers = ConsumersRegistry()
retry = RetrySettings

Expand Down
2 changes: 1 addition & 1 deletion django_kafka/connect/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Connector(ABC):
@property
def name(self) -> str:
"""Name of the connector."""
return f"{settings.CLIENT_ID}.{self.__class__.__module__}.{self.__class__.__name__}"
return f"{settings.CLIENT_ID}.{self.__class__.__name__}"

@property
@abstractmethod
Expand Down
8 changes: 8 additions & 0 deletions django_kafka/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

if TYPE_CHECKING:
from django_kafka.consumer import Consumer
from django_kafka.connect.connector import Connector


T = TypeVar('T')
Expand Down Expand Up @@ -34,9 +35,16 @@ def get_key(self, cls: Type[T]) -> str:

def register(self, cls: Type[T]):
key = self.get_key(cls)
if key in self._classes:
raise DjangoKafkaError(f"`{key}` is already registered.")
self._classes[key] = cls


class ConnectorsRegistry(Registry["Connector"]):
def get_key(self, cls) -> str:
return cls().name


class ConsumersRegistry(Registry["Consumer"]):

def register(self, cls):
Expand Down
123 changes: 86 additions & 37 deletions django_kafka/tests/test_registry.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,84 @@
from typing import Type
from unittest import mock

from django.test import TestCase
from django.test import SimpleTestCase

from django_kafka.connect.connector import Connector
from django_kafka.consumer import Consumer, Topics
from django_kafka.registry import ConsumersRegistry
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.registry import ConsumersRegistry, Registry, ConnectorsRegistry


class ConsumersRegistryTestCase(TestCase):
def _get_consumer_cls(self, name, group_id) -> Type[Consumer]:
return type[Consumer](
name,
(Consumer,),
{"config": {"group.id": group_id}, "topics": Topics()},
class RegistryTestCase(SimpleTestCase):
def _gen_cls(self, name):
return type(name, (object, ), {})

def test_registering_same_key_not_allowed(self):
registry = Registry()

cls = self._gen_cls("SomeClass")
key = registry.get_key(cls)
error_msg = f"`{key}` is already registered."

registry()(cls)

with self.assertRaisesMessage(DjangoKafkaError, error_msg):
registry()(cls)

def test_get_key(self):
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("ClassB")
registry = Registry()

key_a = registry.get_key(cls_a)
self.assertEqual(
key_a,
f"{cls_a.__module__}.{cls_a.__name__}",
)

key_b = registry.get_key(cls_b)
self.assertEqual(
key_b,
f"{cls_b.__module__}.{cls_b.__name__}",
)

def test_decorator_adds_to_registry(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("classB")

registry = ConsumersRegistry()
registry = Registry()

self.assertIs(registry()(consumer_cls_a), consumer_cls_a)
self.assertIs(registry()(consumer_cls_b), consumer_cls_b)
self.assertIs(registry()(cls_a), cls_a)
self.assertIs(registry()(cls_b), cls_b)

key_a = registry.get_key(consumer_cls_a)
self.assertIs(registry[key_a], consumer_cls_a)
key_a = registry.get_key(cls_a)
self.assertIs(registry[key_a], cls_a)

key_b = registry.get_key(consumer_cls_b)
self.assertIs(registry[key_b], consumer_cls_b)
key_b = registry.get_key(cls_b)
self.assertIs(registry[key_b], cls_b)

def test_iter_returns_expected_keys(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
registry = ConsumersRegistry()
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("ClassB")
registry = Registry()

registry()(consumer_cls_a)
registry()(consumer_cls_b)
registry()(cls_a)
registry()(cls_b)

key_a = registry.get_key(consumer_cls_a)
key_b = registry.get_key(consumer_cls_b)
key_a = registry.get_key(cls_a)
key_b = registry.get_key(cls_b)

self.assertCountEqual(list(registry), [key_a, key_b])


class ConsumersRegistryTestCase(SimpleTestCase):
def _get_consumer_cls(self, name, group_id) -> Type[Consumer]:
return type[Consumer](
name,
(Consumer,),
{"config": {"group.id": group_id}, "topics": Topics()},
)

@mock.patch("django_kafka.retry.consumer.RetryConsumer.build")
def test_retry_consumer_registered(self, mock_retry_consumer_build):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
Expand All @@ -61,19 +97,32 @@ def test_retry_consumer_registered(self, mock_retry_consumer_build):
self.assertCountEqual(list(registry), [key_a, retry_key_a, key_b])
self.assertIs(registry[retry_key_a], retry_consumer_mock)

def test_get_key(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
registry = ConsumersRegistry()

key_a = registry.get_key(consumer_cls_a)
self.assertEqual(
key_a,
f"{consumer_cls_a.__module__}.{consumer_cls_a.__name__}",
class ConnectorsRegistryTestCase(SimpleTestCase):
def _gen_cls(self, name) -> Type[Connector]:
return type[Connector](
name,
(Connector,),
{"config": {}},
)

key_b = registry.get_key(consumer_cls_b)
self.assertEqual(
key_b,
f"{consumer_cls_b.__module__}.{consumer_cls_b.__name__}",
)
def test_get_key(self):
connector_cls = self._gen_cls("ConnectorCls")
registry = ConnectorsRegistry()

key = registry.get_key(connector_cls)
self.assertEqual(key, connector_cls().name)

def test_registering_same_key_not_allowed(self):
registry = ConnectorsRegistry()
connector_a = self._gen_cls("ConnectorA")
# give the same class name to the connector to generate same key
connector_b = self._gen_cls("ConnectorA")

key = registry.get_key(connector_a)
error_msg = f"`{key}` is already registered."

registry()(connector_a)

with self.assertRaisesMessage(DjangoKafkaError, error_msg):
registry()(connector_b)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "django-kafka"
version = "0.4.3"
version = "0.4.4"
dependencies = [
"django>=4.0,<6.0",
"confluent-kafka[avro, schema-registry]==2.4.0"
Expand Down