diff --git a/.gitignore b/.gitignore index ef39714..8e52b87 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ build/ .bash_history db.sqlite3 .ruff_cache +docker-compose.override.yaml diff --git a/README.md b/README.md index 660b57c..19ae48f 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,68 @@ When the consumption of a message in a retryable topic fails, the message is re- When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic. +## Connectors + +Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry. + +`django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration. + +### Define connector: +```python +# Connectors are discovered automatically when placed under the connectors module +# e.g. ./connectors.py + +from django_kafka import kafka +from django_kafka.connect.connector import Connector + + +@kafka.connectors() +class MyConnector(Connector): + config = { + # configuration for the connector + } +``` + +### Mark a connector for deletion: + +```python +from django_kafka import kafka +from django_kafka.connect.connector import Connector + + +@kafka.connectors() +class MyConnector(Connector): + mark_for_removal = True + config = { + # configuration for the connector + } +``` + +### Manage connectors: + +django-kafka provides `./manage.py kafka_connect` management command to manage your connectors. + + +#### Manage a single connector +```bash +./manage.py kafka_connect path.to.my.SpecialConnector --validate --publish --check-status --ignore-failures +```` + +#### Manage all connectors +```bash +./manage.py kafka_connect --validate --publish --check-status --ignore-failures +```` + +`--validate` - validates the config over the connect REST API + +`--publish` - create or update the connector or delete when `mark_for_removal = True` + +`--check-status` - check the status of the connector is `RUNNING`. + +`--ignore-failures` - command wont fail if any of the connectors fail to validate or publish. + +See `--help`. + ## Settings: **Defaults:** diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index 0463745..014d7e1 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -1,6 +1,6 @@ import logging from multiprocessing.pool import Pool -from typing import Optional +from typing import Optional, TYPE_CHECKING from confluent_kafka.schema_registry import SchemaRegistryClient from django.utils.functional import cached_property @@ -9,9 +9,13 @@ from django_kafka.conf import settings from django_kafka.exceptions import DjangoKafkaError from django_kafka.producer import Producer -from django_kafka.registry import ConsumersRegistry +from django_kafka.registry import ConsumersRegistry, Registry from django_kafka.retry.settings import RetrySettings +if TYPE_CHECKING: + from django_kafka.connect.connector import Connector + from django_kafka.consumer import Consumer + logger = logging.getLogger(__name__) __version__ = "0.4.2" @@ -24,10 +28,11 @@ def autodiscover(): - autodiscover_modules("consumers") + autodiscover_modules("consumers", "connectors") class DjangoKafka: + connectors = Registry["Connector"]() consumers = ConsumersRegistry() retry = RetrySettings diff --git a/django_kafka/conf.py b/django_kafka/conf.py index 2aaa3de..f977432 100644 --- a/django_kafka/conf.py +++ b/django_kafka/conf.py @@ -18,6 +18,18 @@ "DEAD_LETTER_TOPIC_SUFFIX": "dlt", "POLLING_FREQUENCY": 1, # seconds "SCHEMA_REGISTRY": {}, + "CONNECT": { + "HOST": "", + "AUTH": None, + "RETRY": dict( + connect=5, + read=5, + status=5, + backoff_factor=0.5, + status_forcelist=[502, 503, 504], + ), + "REQUESTS_TIMEOUT": 30, # seconds + }, } diff --git a/django_kafka/connect/__init__.py b/django_kafka/connect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_kafka/connect/client.py b/django_kafka/connect/client.py new file mode 100644 index 0000000..73fcfbc --- /dev/null +++ b/django_kafka/connect/client.py @@ -0,0 +1,62 @@ +import requests +from requests.auth import AuthBase + +from django_kafka.exceptions import DjangoKafkaError + + +from urllib3.util import Retry +from requests.adapters import HTTPAdapter + + +from typing import TypedDict + + +class RetryKwargs(TypedDict): + connect: int + read: int + status: int + backoff_factor: float + status_forcelist: list[int] + + +class KafkaConnectSession(requests.Session): + def __init__(self, host: str, auth: tuple | AuthBase, retry: RetryKwargs, timeout: int = None): + super().__init__() + self.auth = auth + self.host = host + self.timeout = timeout + self.mount(host, HTTPAdapter(max_retries=Retry(**retry))) + + def request(self, method, url, *args, **kwargs) -> requests.Response: + kwargs.setdefault("timeout", self.timeout) + return super().request(method, f"{self.host}{url}", *args, **kwargs) + + +class KafkaConnectClient: + """ + https://kafka.apache.org/documentation/#connect_rest + https://docs.confluent.io/platform/current/connect/references/restapi.html + """ + + def __init__(self, host: str, auth: tuple | AuthBase, retry: RetryKwargs, timeout: int = None): + self._requests = KafkaConnectSession(host, auth, retry, timeout) + + def update_or_create(self, connector_name: str, config: dict): + return self._requests.put(f"/connectors/{connector_name}/config", json=config) + + def delete(self, connector_name: str): + return self._requests.delete(f"/connectors/{connector_name}") + + def validate(self, config: dict): + if not config.get('connector.class'): + raise DjangoKafkaError("'connector.class' config is required for validation.") + + connector_class_name = config.get("connector.class").rsplit(".", 1)[-1] + response = self._requests.put(f"/connector-plugins/{connector_class_name}/config/validate", json=config) + return response + + def connector_status(self, connector_name: str): + """ + https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-status + """ + return self._requests.get(f"/connectors/{connector_name}/status") diff --git a/django_kafka/connect/connector.py b/django_kafka/connect/connector.py new file mode 100644 index 0000000..1e07042 --- /dev/null +++ b/django_kafka/connect/connector.py @@ -0,0 +1,81 @@ +from abc import ABC, abstractmethod +from enum import StrEnum + +from django_kafka.conf import settings +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.connect.client import KafkaConnectClient + +__all__ = [ + "Connector", + "ConnectorStatus", +] + + +class ConnectorStatus(StrEnum): + """ + https://docs.confluent.io/platform/current/connect/monitoring.html#connector-and-task-status + UNASSIGNED: The connector/task has not yet been assigned to a worker. + RUNNING: The connector/task is running. + PAUSED: The connector/task has been administratively paused. + FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output). + """ + UNASSIGNED = "UNASSIGNED" + RUNNING = "RUNNING" + PAUSED = "PAUSED" + + +class Connector(ABC): + mark_for_removal = False + + @property + def name(self) -> str: + """Name of the connector.""" + return f"{self.__class__.__module__}.{self.__class__.__name__}" + + @property + @abstractmethod + def config(self) -> dict: + """Configurations for the connector.""" + + def __init__(self): + self.client = KafkaConnectClient( + host=settings.CONNECT["HOST"], + auth=settings.CONNECT["AUTH"], + retry=settings.CONNECT["RETRY"], + timeout=settings.CONNECT["REQUESTS_TIMEOUT"], + ) + + def delete(self) -> bool: + response = self.client.delete(self.name) + + if response.status_code == 404: + return False + + if not response.ok: + raise DjangoKafkaError(response.text) + + return True + + def submit(self): + response = self.client.update_or_create(self.name, self.config) + + if not response.ok: + raise DjangoKafkaError(response.text) + + return response.json() + + def is_valid(self, raise_exception=False) -> bool | None: + response = self.client.validate(self.config) + + if raise_exception and not response.ok: + raise DjangoKafkaError(response.text) + + return response.ok + + def status(self) -> ConnectorStatus: + response = self.client.connector_status(self.name) + + if not response.ok: + raise DjangoKafkaError(response.text) + + return response.json()["connector"]["state"] diff --git a/django_kafka/management/commands/kafka_connect.py b/django_kafka/management/commands/kafka_connect.py new file mode 100644 index 0000000..8420dea --- /dev/null +++ b/django_kafka/management/commands/kafka_connect.py @@ -0,0 +1,175 @@ +import logging + +from django.core.management import CommandError +from django.core.management.base import BaseCommand +from requests.exceptions import RetryError + +from django_kafka import kafka +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.connect.connector import Connector, ConnectorStatus + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Publish connectors." + + def add_arguments(self, parser): + parser.add_argument( + "connector", + type=str, + default=None, + nargs="?", + help="Python path to the connector class(es). Processes all if not provided.", + ) + parser.add_argument( + "--list", + action="store_true", + default=False, + help="List all connectors.", + ) + parser.add_argument( + "--validate", + action="store_true", + default=False, + help="Validate connectors.", + ) + parser.add_argument( + "--publish", + action="store_true", + default=False, + help="Create/Update/Delete connectors.", + ) + parser.add_argument( + "--check-status", + action="store_true", + default=False, + help="Check status of connectors. Currently RUNNING status is considered as success.", + ) + parser.add_argument( + "--ignore-failures", + action="store_true", + default=False, + help="The command wont fail if failures were detected. " + "By default if any failures were detected the command exist with error status.", + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.connectors: list[str] = [] + self.has_failures = False + + def handle(self, connector, **options): + if options["list"]: + self.list_connectors() + return + + if not any((connector, options["validate"], options["publish"], options["check_status"])): + self.print_help("manage.py", "kafka_connect") + return + + if connector: + self.connectors = [connector] + else: + self.connectors = kafka.connectors + + if options["validate"]: + self.handle_validate() + + if options["publish"]: + self.handle_publish() + + if options["check_status"]: + self.handle_status() + + if self.has_failures and not options["ignore_failures"]: + raise CommandError("Command does not succeed.") + + def list_connectors(self): + self.stdout.write(self.style.SUCCESS("Available connectors:")) + for connector_path in kafka.connectors: + if kafka.connectors[connector_path].mark_for_removal: + self.stdout.write(f"- {connector_path} (marked for removal)") + else: + self.stdout.write(f"- {connector_path}") + + def handle_validate(self): + self.stdout.write(self.style.SUCCESS("Validating connectors...")) + + for connector_path in self.connectors: + self.stdout.write(f"{connector_path}: ", ending="") + + connector = kafka.connectors[connector_path]() + + if connector.mark_for_removal: + self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)")) + continue + + try: + connector.is_valid(raise_exception=True) + except (DjangoKafkaError, RetryError) as error: + self.has_failures = True + self.stdout.write(self.style.ERROR("invalid")) + self.stdout.write(self.style.ERROR(error)) + else: + self.stdout.write(self.style.SUCCESS("valid")) + + def handle_publish(self): + self.stdout.write(self.style.SUCCESS("Publishing connectors...")) + + for connector_path in self.connectors: + self.stdout.write(f"{connector_path}: ", ending="") + + connector = kafka.connectors[connector_path]() + + if connector.mark_for_removal: + self.handle_delete(connector) + else: + self.handle_submit(connector) + + def handle_status(self): + self.stdout.write(self.style.SUCCESS("Checking status...")) + + for connector_path in self.connectors: + self.stdout.write(f"{connector_path}: ", ending="") + + connector = kafka.connectors[connector_path]() + if connector.mark_for_removal: + self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)")) + continue + + try: + status = connector.status() + except (DjangoKafkaError, RetryError) as error: + self.has_failures = True + self.stdout.write(self.style.ERROR("failed to retrieve")) + self.stdout.write(self.style.ERROR(error)) + else: + if status == ConnectorStatus.RUNNING: + self.stdout.write(self.style.SUCCESS(status)) + else: + self.has_failures = True + self.stdout.write(self.style.ERROR(status)) + + def handle_delete(self, connector: Connector): + try: + deleted = connector.delete() + except (DjangoKafkaError, RetryError) as error: + self.has_failures = True + self.stdout.write(self.style.ERROR("failed")) + self.stdout.write(self.style.ERROR(error)) + else: + if deleted: + self.stdout.write(self.style.SUCCESS("deleted")) + else: + self.stdout.write(self.style.WARNING("does not exist (already deleted)")) + + def handle_submit(self, connector: Connector): + try: + connector.submit() + except (DjangoKafkaError, RetryError) as error: + self.has_failures = True + self.stdout.write(self.style.ERROR("failed")) + self.stdout.write(self.style.ERROR(error)) + else: + self.stdout.write(self.style.SUCCESS("submitted")) diff --git a/django_kafka/registry.py b/django_kafka/registry.py index 23e2e91..e1bd328 100644 --- a/django_kafka/registry.py +++ b/django_kafka/registry.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Type +from typing import Generic, TYPE_CHECKING, Type, TypeVar from django_kafka.exceptions import DjangoKafkaError @@ -6,33 +6,43 @@ from django_kafka.consumer import Consumer -class ConsumersRegistry: +T = TypeVar('T') + + +class Registry(Generic[T]): def __init__(self): - self.__consumers: dict[str, Type[Consumer]] = {} + self._classes: dict[str, Type[T]] = {} def __call__(self): - def add_to_registry(consumer_cls: Type["Consumer"]) -> Type["Consumer"]: - self.register(consumer_cls) - return consumer_cls + def add_to_registry(cls: Type[T]) -> Type[T]: + self.register(cls) + return cls return add_to_registry def __getitem__(self, key: str): try: - return self.__consumers[key] + return self._classes[key] except KeyError as error: - raise DjangoKafkaError(f"Consumer `{key}` is not registered.") from error + raise DjangoKafkaError(f"`{key}` is not registered.") from error def __iter__(self): - yield from self.__consumers.keys() + yield from self._classes.keys() - def get_key(self, consumer_cls: Type["Consumer"]) -> str: - return f"{consumer_cls.__module__}.{consumer_cls.__name__}" + def get_key(self, cls: Type[T]) -> str: + return f"{cls.__module__}.{cls.__name__}" - def register(self, consumer_cls: Type["Consumer"]): - from django_kafka.retry.consumer import RetryConsumer + def register(self, cls: Type[T]): + key = self.get_key(cls) + self._classes[key] = cls - key = self.get_key(consumer_cls) - self.__consumers[key] = consumer_cls - if retry_consumer_cls := RetryConsumer.build(consumer_cls): - self.__consumers[f"{key}.retry"] = retry_consumer_cls + +class ConsumersRegistry(Registry["Consumer"]): + + def register(self, cls): + from django_kafka.retry.consumer import RetryConsumer + + super().register(cls) + + if retry_consumer_cls := RetryConsumer.build(cls): + self._classes[f"{self.get_key(cls)}.retry"] = retry_consumer_cls diff --git a/django_kafka/tests/test_kafka_connect.py b/django_kafka/tests/test_kafka_connect.py new file mode 100644 index 0000000..39cb601 --- /dev/null +++ b/django_kafka/tests/test_kafka_connect.py @@ -0,0 +1,178 @@ +from contextlib import contextmanager +from io import StringIO +from unittest.mock import patch, Mock, DEFAULT, MagicMock, call + +from django.core.management import call_command +from django.test import SimpleTestCase +from requests.exceptions import RetryError + +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.connect.connector import Connector, ConnectorStatus +from django_kafka.management.commands.kafka_connect import Command + + +@contextmanager +def patch_kafka_connectors(**attrs): + attrs.setdefault('mark_for_removal', False) + + connector_instance = Mock(spec_set=Connector, **attrs) + connector_class = Mock(spec_set=Connector, return_value=connector_instance) + + with patch("django_kafka.kafka.connectors", {'connector.fake.path': connector_class}): + yield connector_instance + + +class KafkaConnectTestCase(SimpleTestCase): + def setUp(self): + self.connector_path = 'connector.fake.path' + self.command_stdout = StringIO() + self.command = Command(stdout=self.command_stdout) + self.command.connectors = [self.connector_path] + + @patch.multiple( + "django_kafka.management.commands.kafka_connect.Command", + handle_validate=DEFAULT, + handle_publish=DEFAULT, + handle_status=DEFAULT, + ) + def test_execute_order(self, handle_validate, handle_publish, handle_status): + # [Tracking order of calls](https://docs.python.org/3/library/unittest.mock-examples.html#tracking-order-of-calls-and-less-verbose-call-assertions) + manager = MagicMock() + manager.attach_mock(handle_validate, "handle_validate") + manager.attach_mock(handle_publish, "handle_publish") + manager.attach_mock(handle_status, "handle_status") + + call_command("kafka_connect", validate=True, publish=True, check_status=True) + + handle_validate.assert_called_once_with() + handle_publish.assert_called_once_with() + handle_status.assert_called_once_with() + + manager.assert_has_calls([ + call.handle_validate(), + call.handle_publish(), + call.handle_status(), + ], any_order=False) + + def test_handle_validate(self): + with patch_kafka_connectors() as connector: + self.command.handle_validate() + + connector.is_valid.assert_called_once_with(raise_exception=True) + + self.assertFalse(self.command.has_failures) + + def test_handle_validate_when_marked_for_removal(self): + with patch_kafka_connectors(mark_for_removal=True) as connector: + self.command.handle_validate() + + connector.is_valid.assert_not_called() + self.assertFalse(self.command.has_failures) + + def test_handle_validate__exceptions(self): + with patch_kafka_connectors(**{"is_valid.side_effect": DjangoKafkaError}) as connector: + self.command.handle_validate() + + connector.is_valid.assert_called_once_with(raise_exception=True) + self.assertTrue(self.command.has_failures) + + @patch("django_kafka.management.commands.kafka_connect.Command.handle_delete") + @patch("django_kafka.management.commands.kafka_connect.Command.handle_submit") + def test_handle_publish_marked_for_removal(self, mock_command_handle_submit, mock_command_handle_delete): + with patch_kafka_connectors(mark_for_removal=True) as connector: + self.command.handle_publish() + + mock_command_handle_submit.assert_not_called() + mock_command_handle_delete.assert_called_once_with(connector) + + @patch("django_kafka.management.commands.kafka_connect.Command.handle_delete") + @patch("django_kafka.management.commands.kafka_connect.Command.handle_submit") + def test_handle_publish_not_marked_for_removal(self, mock_command_handle_submit, mock_command_handle_delete): + with patch_kafka_connectors(mark_for_removal=False) as connector: + self.command.handle_publish() + + mock_command_handle_submit.assert_called_once_with(connector) + mock_command_handle_delete.assert_not_called() + + def test_handle_delete(self): + with patch_kafka_connectors() as connector: + self.command.handle_delete(connector) + + connector.delete.assert_called_once_with() + self.assertFalse(self.command.has_failures) + + def test_handle_delete__django_kafka_error(self): + with patch_kafka_connectors(**{"delete.side_effect": DjangoKafkaError}) as connector: + self.command.handle_delete(connector) + + connector.delete.assert_called_once_with() + self.assertTrue(self.command.has_failures) + + def test_handle_delete__retry_error_error(self): + with patch_kafka_connectors(**{"delete.side_effect": RetryError}) as connector: + self.command.handle_delete(connector) + + connector.delete.assert_called_once_with() + self.assertTrue(self.command.has_failures) + + def test_handle_submit(self): + with patch_kafka_connectors() as connector: + self.command.handle_submit(connector) + + self.assertFalse(self.command.has_failures) + connector.submit.assert_called_once_with() + + def test_handle_submit__django_kafka_error(self): + with patch_kafka_connectors(**{"submit.side_effect": DjangoKafkaError}) as connector: + self.command.handle_submit(connector) + + connector.submit.assert_called_once_with() + self.assertTrue(self.command.has_failures) + + def test_handle_submit__retry_error_error(self): + with patch_kafka_connectors(**{"submit.side_effect": RetryError}) as connector: + self.command.handle_submit(connector) + + connector.submit.assert_called_once_with() + self.assertTrue(self.command.has_failures) + + def test_handle_status__marked_for_removal(self): + with patch_kafka_connectors(mark_for_removal=True) as connector: + self.command.handle_status() + + connector.status.assert_not_called() + + def test_handle_status__running(self): + with patch_kafka_connectors(**{"status.return_value": ConnectorStatus.RUNNING}) as connector: + self.command.handle_status() + + connector.status.assert_called_once_with() + self.assertFalse(self.command.has_failures) + + def test_handle_status__paused(self): + with patch_kafka_connectors(**{"status.return_value": ConnectorStatus.PAUSED}) as connector: + self.command.handle_status() + + connector.status.assert_called_once_with() + self.assertTrue(self.command.has_failures) + + def test_handle_status__unassigned(self): + with patch_kafka_connectors(**{"status.return_value": ConnectorStatus.UNASSIGNED}) as connector: + self.command.handle_status() + + connector.status.assert_called_once_with() + self.assertTrue(self.command.has_failures) + + def test_handle_status__django_kafka_error(self): + with patch_kafka_connectors(**{"status.side_effect": DjangoKafkaError}) as connector: + self.command.handle_status() + + connector.status.assert_called_once_with() + self.assertTrue(self.command.has_failures) + + def test_handle_status__retry_error(self): + with patch_kafka_connectors(**{"status.side_effect": RetryError}) as connector: + self.command.handle_status() + + connector.status.assert_called_once_with() + self.assertTrue(self.command.has_failures)