diff --git a/.bumpversion.toml b/.bumpversion.toml new file mode 100644 index 0000000..f1c931f --- /dev/null +++ b/.bumpversion.toml @@ -0,0 +1,19 @@ +[tool.bumpversion] +current_version = "0.0.0" +parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)" +serialize = ["{major}.{minor}.{patch}"] +search = "{current_version}" +replace = "{new_version}" +regex = false +ignore_missing_version = false +ignore_missing_files = false +tag = true +sign_tags = false +tag_name = "v{new_version}" +tag_message = "Bump version: {current_version} → {new_version}" +allow_dirty = false +commit = true +message = "Bump version: {current_version} → {new_version}" +commit_args = "" +[[tool.bumpversion.files ]] +filename = "./django_kafka/__init__.py" diff --git a/.github/workflows/workflow.yaml b/.github/workflows/workflow.yaml new file mode 100644 index 0000000..3723168 --- /dev/null +++ b/.github/workflows/workflow.yaml @@ -0,0 +1,72 @@ +name: Test + +on: + - push + - pull_request + +jobs: + test: + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + python-version: + - "3.11" + - "3.12" + django: + - "4.0" + - "4.1" + - "4.2" + - "5.0" + exclude: + - python-version: "3.11" + django: "4.0" + - python-version: "3.12" + django: "4.0" + - python-version: "3.12" + django: "4.1" + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Update pip + run: python -m pip install --upgrade pip + + - name: Install Django ${{ matrix.django }} + run: pip install "Django~=${{ matrix.django }}" + + - name: Install package + run: pip install -e . + + - name: Run tests + run: python ./example/manage.py test + + publish: + name: Build and publish Python 🐍 distributions 📦 to PyPI + needs: test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: 3.12 + + - name: Install req packages + run: python -m pip install -U setuptools build + + - name: Build a binary wheel and a source tarball + run: python -m build --sdist --wheel + + - name: Publish Package on PyPI + if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags') + uses: pypa/gh-action-pypi-publish@release/v1.8 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d73cb22 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +*.pyc +.idea/ +.pycharm_helpers/ +.ipython/ +dist/ +*.egg-info/ +build/ +*.sw* +.coverage +.bash_history +db.sqlite3 diff --git a/.ruff.toml b/.ruff.toml new file mode 100644 index 0000000..356dbc1 --- /dev/null +++ b/.ruff.toml @@ -0,0 +1,72 @@ +exclude = [ + ".git", + "__pycache", + "migrations", + "src", + "docs", + "rh_django_shared", + "example/conf/settings.py", +] + +[lint] +dummy-variable-rgx = "_|dummy" +# See https://github.com/astral-sh/ruff?tab=readme-ov-file#rules for all supported rules +select = [ + "A", + "B", + "BLE", + "C", + "C4", + "C90", + "COM", + "DJ", + "DTZ", + "E", + "ERA", + "F", + "G", + "I", + "ICN", + "INP", + "N", + "PIE", + "PGH", + "PL", + "PTH", + "RET", + "RSE", + "RUF", + "S", + "SIM", + "T20", + "UP", + "W", + "YTT", +] +ignore = [ +# "N802", # Function name `{name}` should be lowercase + "N803", # Argument name `{name}` should be lowercase +# "N806", # Variable `{name}` in function should be lowercase +# "N815", # Variable `{name}` in class scope should not be mixedCase +# "N818", # Exception name `{name}` should be named with an Error suffix +# "A003", # Class attribute `{name}` is shadowing a python builtin +# "S101", # Use of `assert` detected +# "UP007", # Use `X | Y` for type annotations + "S105", # Possible hardcoded password: "{}" +# "PLR0913", # Too many arguments to function call ({c_args}/{max_args}) + "RUF012", # Mutable class attributes should be annotated with `typing.ClassVar` +] + +[lint.mccabe] +max-complexity = 16 + +[lint.pycodestyle] +max-line-length = 88 + +[lint.pylint] +max-branches = 16 +max-args = 10 +max-public-methods = 50 + +[format] +skip-magic-trailing-comma = false diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..e69de29 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ae05f23 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim + +ENV PYTHONUNBUFFERED 1 +ENV LC_ALL=C.UTF-8 + +RUN useradd -m app + +USER app +WORKDIR /app + +ADD ./example/requirements.txt /app/example/ + +ENV PATH /home/app/venv/bin:$PATH + +RUN python3 -m venv ~/venv && \ + pip install -r ./example/requirements.txt + +ADD . /app/ + +ENV DJANGO_SETTINGS_MODULE conf.settings diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..07d6ab1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright 2024 RegioHelden GmbH + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..8070beb --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include LICENSE README.md +recursive-include django_kafka/ * diff --git a/README.md b/README.md new file mode 100644 index 0000000..a98a49c --- /dev/null +++ b/README.md @@ -0,0 +1,236 @@ +# django-kafka +This library is using [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) which is a wrapper around the [librdkafka](https://github.com/confluentinc/librdkafka) (Apache Kafka C/C++ client library). + +It helps to integrate kafka with Django. + +## Quick start + +```bash +pip install django-kafka +``` + +### Configure: +Considering you have locally setup kafka instance with no authentication. All you need is to define the bootstrap servers. +```python +# ./settings.py + +INSTALLED_APPS = [ + # ... + "django_kafka", +] + +DJANGO_KAFKA = { + "GLOBAL_CONFIG": { + "bootstrap.servers": "kafka1:9092", + }, +} +``` + +### Define Topic: + +Topic. Defines how to handle incoming message and how to produce outgoing message. +```python +from confluent_kafka.serialization import MessageField +from django_kafka.topic import Topic + + +class Topic1(Topic): + name = "topic1" + + def consume(self, msg): + key = self.deserialize(msg.key(), MessageField.KEY, msg.headers()) + value = self.deserialize(msg.value(), MessageField.VALUE, msg.headers()) + # ... process values +``` + +### Define consumer: + +Consumer. Defines which topics it takes care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel. + +Consumers are auto-discovered and are expected to be located under the `consumers.py`. + +```python +# ./consumers.py + +from django_kafka import kafka +from django_kafka.consumer import Consumer, Topics + +from my_app.topics import Topic1 + + +# register your consumer using `DjangoKafka` class API decorator +@kafka.consumers() +class MyAppConsumer(Consumer): + # tell to the consumers which topics to process using `django_kafka.consumer.Topics` interface. + topics = Topics( + Topic1(), + ) + + config = { + "group.id": "my-app-consumer", + "auto.offset.reset": "latest", + "enable.auto.offset.store": False, + } +``` + + +### Start consumer(s): +You can use django management command to start defined consumers. +```bash +./manage.py kafka_consume +``` +Or you can use `DjangoKafka` class API. +```python +from django_kafka import kafka + +kafka.start_consumers() +``` +Check [Confluent Python Consumer](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#consumer) for API documentation. + + + +### Produce: +Message are produced using topic instance. +```python +from my_app.topics import Topic1 + +# this will send a message to kafka serializing it using defined serializer +Topic1().produce("some message") +``` +Check [Confluent Python Producer](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#producer) for API documentation. + + +### Define schema registry: + +The library is using [Confluent SchemaRegistryClient](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). In order to use it there is `SCHEMA_REGISTRY` setting. + +Find available configs at [SchemaRegistryClient docs](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). +```python +DJANGO_KAFKA = { + "SCHEMA_REGISTRY": { + "url": "http://schema-registry", + }, +} +``` + +**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema. + +## Settings. + +**Defaults:** +```python +DJANGO_KAFKA = { + "CLIENT_ID": f"{socket.gethostname()}-python", + "GLOBAL_CONFIG": {}, + "PRODUCER_CONFIG": {}, + "CONSUMER_CONFIG": {}, + "POLLING_FREQUENCY": 1, # seconds + "SCHEMA_REGISTRY": {}, + "ERROR_HANDLER": "django_kafka.error_handlers.ClientErrorHandler", +} +``` + +#### `CLIENT_ID` +Default: `f"{socket.gethostname()}-python"` + +An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. + +**Note:** This parameter is included in config of both consumer and producer unless `client.id` is overwritten within `PRODUCER_CONFIG` or `CONSUMER_CONFIG`. + +#### `GLOBAL_CONFIG` +Default: `{}` + +Defines configurations applied to both consumer and producer. See [configs marked with `*`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + +#### `PRODUCER_CONFIG` +Default: `{}` + +Defines configurations of the producer. See [configs marked with `P`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + +#### `CONSUMER_CONFIG` +Default: `{}` + +Defines configurations of the producer. See [configs marked with `C`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). + +#### `POLLING_FREQUENCY` +Default: 1 # second + +How often client polls for events. + +#### `SCHEMA_REGISTRY` +Default: `{}` + +Configuration for [confluent_kafka.schema_registry.SchemaRegistryClient](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). + +#### `ERROR_HANDLER` +Default: `django_kafka.error_handlers.ClientErrorHandler` + +This is an `error_cb` hook (see [Kafka Client Configuration](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration) for reference). +It is triggered for client global errors and in case of fatal error it raises `DjangoKafkaException`. + + +## Bidirectional data sync with no infinite events loop. + +**For example, you want to keep User table in sync in multiple systems.** + +The idea is to send events from all systems to the same topic, and also consume events from the same topic, marking the record with `kafka_skip=True` at the consumption time. +- Producer should respect `kafka_skip=True` and do not produce new events when `True`. +- Any updates to the User table, which are happening outside the consumer, should set `kafka_skip=False` which will allow the producer to create an event again. + +This way the chronology is strictly kept and the infinite events loop is avoided. + +The disadvantage is that each system will still consume its own message. + +#### There are 2 mixins for django Model and for QuerySet: + +#### KafkaSkipMixin +It adds new `kafka_skip` boolean field, which defaults to `False`. And overrides `Model.save` method and sets `kafka_skip=False`. + +Usage: +```python +from django.contrib.auth.base_user import AbstractBaseUser +from django.contrib.auth.models import PermissionsMixin +from django_kafka.models import KafkaSkipMixin + +class User(KafkaSkipMixin, PermissionsMixin, AbstractBaseUser): + # ... +``` + + +#### KafkaSkipQueryset +If you have defined a custom manager on your model then you should inherit it from `KafkaSkipQueryset`. It adds `kafka_skip=False` when using `update` method. + +**Note:** `kafka_skip=False` is only set when it's not provided to the `update` kwargs. E.g. `User.objects.update(first_name="John", kafka_skip=True)` will not be changed to `kafka_skip=False`. + +Usage: +```python +from django.contrib.auth.base_user import AbstractBaseUser +from django.contrib.auth.base_user import BaseUserManager +from django.contrib.auth.models import PermissionsMixin +from django_kafka.models import KafkaSkipMixin, KafkaSkipQueryset + + +class UserManager(BaseUserManager.from_queryset(KafkaSkipQueryset)): + # ... + + +class User(KafkaSkipMixin, PermissionsMixin, AbstractBaseUser): + # ... + objects = UserManager() +``` + + +## Making a new release +- [bump-my-version](https://github.com/callowayproject/bump-my-version) is used to manage releases. +- [Ruff](https://github.com/astral-sh/ruff) linter is used to validate the code style. Make sure your code complies the defined rules. You may use `ruff check --fix` for that. Ruf is executed by GitHub actions and the workflow will fail if Ruff validation fails. + +- Add your changes to the [CHANGELOG](CHANGELOG.md), run +```bash +docker compose run --rm app bump-my-version bump patch +``` +This will update version major/minor/patch version respectively and add a tag for release. + +- Push including new tag to publish the release to pypi. +```bash +git push origin tag +``` diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py new file mode 100644 index 0000000..170494d --- /dev/null +++ b/django_kafka/__init__.py @@ -0,0 +1,73 @@ +import logging +from multiprocessing.pool import Pool +from typing import Optional + +from confluent_kafka.schema_registry import SchemaRegistryClient +from django.utils.functional import cached_property +from django.utils.module_loading import autodiscover_modules + +from django_kafka.conf import settings +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.producer import Producer +from django_kafka.registry import ConsumersRegistry + +logger = logging.getLogger(__name__) + +__version__ = "0.0.0" + +__all__ = [ + "autodiscover", + "DjangoKafka", + "kafka", +] + + +def autodiscover(): + autodiscover_modules("consumers") + + +class DjangoKafka: + consumers = ConsumersRegistry() + + @cached_property + def producer(self) -> Producer: + return Producer() + + @cached_property + def schema_client(self) -> SchemaRegistryClient: + """ + https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient + """ + if not settings.SCHEMA_REGISTRY: + raise DjangoKafkaError( + "`SCHEMA_REGISTRY` configuration is not defined.", + ) + + return SchemaRegistryClient(settings.SCHEMA_REGISTRY) + + def start_consumer(self, consumer: str): + self.consumers[consumer]().start() + + def start_consumers(self, consumers: Optional[list[str]] = None): + consumers = consumers or list(self.consumers) + with Pool(processes=len(consumers)) as pool: + try: + pool.map(self.start_consumer, consumers) + except KeyboardInterrupt: + # Stops the worker processes immediately without completing + # outstanding work. + pool.terminate() + # Wait for the worker processes to exit. + # Should be called after close() or terminate(). + pool.join() + logger.debug("KeyboardInterrupt. Pool workers terminated.") + else: + # Prevents any more tasks from being submitted to the pool. + # Once all the tasks have been completed the worker processes will exit. + pool.close() + # Wait for the worker processes to exit. + # Should be called after close() or terminate(). + pool.join() + + +kafka = DjangoKafka() diff --git a/django_kafka/apps.py b/django_kafka/apps.py new file mode 100644 index 0000000..1daef6a --- /dev/null +++ b/django_kafka/apps.py @@ -0,0 +1,10 @@ +from django.apps import AppConfig + + +class DjangoKafkaConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "django_kafka" + + def ready(self): + super().ready() + self.module.autodiscover() diff --git a/django_kafka/conf.py b/django_kafka/conf.py new file mode 100644 index 0000000..ab76af2 --- /dev/null +++ b/django_kafka/conf.py @@ -0,0 +1,32 @@ +import socket + +from django.conf import settings as django_settings + +SETTINGS_KEY = "DJANGO_KAFKA" +DEFAULTS = { + "CLIENT_ID": f"{socket.gethostname()}-python", + "ERROR_HANDLER": "django_kafka.error_handlers.ClientErrorHandler", + "GLOBAL_CONFIG": {}, + "PRODUCER_CONFIG": {}, + "CONSUMER_CONFIG": {}, + "POLLING_FREQUENCY": 1, # seconds + "SCHEMA_REGISTRY": {}, +} + + +class Settings: + @property + def _settings(self): + return getattr(django_settings, SETTINGS_KEY, {}) + + def __getattr__(self, attr): + if attr in self._settings: + return self._settings[attr] + + if attr in DEFAULTS: + return DEFAULTS[attr] + + raise AttributeError(f"Invalid setting: '{attr}'") + + +settings = Settings() diff --git a/django_kafka/consumer.py b/django_kafka/consumer.py new file mode 100644 index 0000000..f82a875 --- /dev/null +++ b/django_kafka/consumer.py @@ -0,0 +1,92 @@ +import logging +from pydoc import locate +from typing import Optional + +from confluent_kafka import Consumer as ConfluentConsumer +from confluent_kafka import cimpl + +from django_kafka.conf import settings +from django_kafka.topic import Topic + +logger = logging.getLogger(__name__) + + +class Topics(dict): + def __init__(self, *topics: Topic): + for topic in topics: + self[topic.name] = topic + + +class Consumer: + """ + Available settings of the producers (P) and consumers (C) + https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md + Consumer configs + https://kafka.apache.org/documentation/#consumerconfigs + Kafka Client Configuration + https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration + confluent_kafka.Consumer API + https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-consumer + """ + + topics: Topics[str, Topic] + config: dict + + polling_freq = settings.POLLING_FREQUENCY + default_logger = logger + default_error_handler = settings.ERROR_HANDLER + + def __init__(self, config: Optional[dict] = None, **kwargs): + kwargs.setdefault("logger", self.default_logger) + kwargs.setdefault("error_cb", locate(self.default_error_handler)()) + + self.config = { + "client.id": settings.CLIENT_ID, + **settings.GLOBAL_CONFIG, + **settings.CONSUMER_CONFIG, + **getattr(self, "config", {}), + **(config or {}), + } + + self._consumer = ConfluentConsumer(self.config, **kwargs) + + def __getattr__(self, name): + """proxy consumer methods.""" + if name not in {"config"}: + return getattr(self._consumer, name) + raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'") + + def start(self): + # define topics + self.subscribe(topics=list(self.topics)) + while True: + # poll every self.polling_freq seconds + if msg := self.poll(timeout=self.polling_freq): + self.process_message(msg) + + def stop(self): + self.close() + + def process_message(self, msg: cimpl.Message): + if msg_error := msg.error(): + self.handle_error(msg_error) + return + + try: + self.topics[msg.topic()].consume(msg) + # ruff: noqa: BLE001 (we do not want consumer to stop if message processing is failing in any circumstances) + except Exception as error: + self.handle_error(error) + else: + self.commit_offset(msg) + + def commit_offset(self, msg: cimpl.Message): + if not self.config.get("enable.auto.offset.store"): + # Store the offset associated with msg to a local cache. + # Stored offsets are committed to Kafka by a background + # thread every 'auto.commit.interval.ms'. + # Explicitly storing offsets after processing gives at-least once semantics. + self.store_offsets(msg) + + def handle_error(self, error): + logger.error(error) diff --git a/django_kafka/error_handlers.py b/django_kafka/error_handlers.py new file mode 100644 index 0000000..a26ba41 --- /dev/null +++ b/django_kafka/error_handlers.py @@ -0,0 +1,9 @@ +from confluent_kafka import KafkaError + +from django_kafka.exceptions import DjangoKafkaError + + +class ClientErrorHandler: + def __call__(self, error: KafkaError): + if error.fatal(): + raise DjangoKafkaError(error) diff --git a/django_kafka/exceptions.py b/django_kafka/exceptions.py new file mode 100644 index 0000000..d18f69a --- /dev/null +++ b/django_kafka/exceptions.py @@ -0,0 +1,2 @@ +class DjangoKafkaError(Exception): + pass diff --git a/django_kafka/management/__init__.py b/django_kafka/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_kafka/management/commands/__init__.py b/django_kafka/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_kafka/management/commands/kafka_consume.py b/django_kafka/management/commands/kafka_consume.py new file mode 100644 index 0000000..36a1da0 --- /dev/null +++ b/django_kafka/management/commands/kafka_consume.py @@ -0,0 +1,24 @@ +import logging +from typing import Optional + +from django.core.management.base import BaseCommand + +from django_kafka import kafka + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Start python consumers in parallel." + + def add_arguments(self, parser): + parser.add_argument( + "consumers", + nargs="*", + type=str, + default=None, + help="Python path to the consumer class(es). Starts all if not provided.", + ) + + def handle(self, consumers: Optional[list[str]] = None, *args, **options): + kafka.start_consumers(consumers) diff --git a/django_kafka/models.py b/django_kafka/models.py new file mode 100644 index 0000000..d3470ba --- /dev/null +++ b/django_kafka/models.py @@ -0,0 +1,48 @@ +from django.db import models +from django.utils.translation import gettext_lazy as _ + + +class KafkaSkipQueryset(models.QuerySet): + def update(self, **kwargs) -> int: + kwargs.setdefault("kafka_skip", False) + return super().update(**kwargs) + + +class KafkaSkipMixin(models.Model): + """ + For models (tables) which are synced with other database(s) in both directions. + + Every update which happens from within the system should set `kafka_skip=False`, + global producer (kafka connect, django post_save signal, etc.) suppose to create + a new event. + + When db update comes from the consumed event, then the row should be manually + marked for skip `kafka_skip=True`, and kafka connector or global python producer + should not generate a new one by filtering it out based on `kafka_skip` field. + """ + + kafka_skip = models.BooleanField( + _("Kafka skip"), + help_text=_( + "Wont generate an event if `True`." + "\nThis field is used to filter out the events to break the infinite loop" + " of message generation when synchronizing 2+ databases." + "\nGets reset to True on .save() method call.", + ), + default=False, + ) + objects = KafkaSkipQueryset.as_manager() + + class Meta: + abstract = True + base_manager_name = "objects" + + def save(self, **kwargs): + if "update_fields" not in kwargs: + self.kafka_skip = False + + elif "kafka_skip" not in kwargs["update_fields"]: + self.kafka_skip = False + kwargs["update_fields"] = ["kafka_skip", *kwargs["update_fields"]] + + super().save(**kwargs) diff --git a/django_kafka/producer.py b/django_kafka/producer.py new file mode 100644 index 0000000..90c19f1 --- /dev/null +++ b/django_kafka/producer.py @@ -0,0 +1,50 @@ +import logging +from pydoc import locate +from typing import Optional + +from confluent_kafka import Producer as ConfluentProducer + +from django_kafka.conf import settings + +logger = logging.getLogger(__name__) + + +class Producer: + """ + Available settings of the producers (P) and consumers (C): + https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md + Producer configs + https://kafka.apache.org/documentation/#producerconfigs + Kafka Client Configuration + https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration + confluent_kafka.Producer API + https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-producer + """ + + config: dict + + default_logger = logger + default_error_handler = settings.ERROR_HANDLER + + def __init__(self, config: Optional[dict] = None, **kwargs): + kwargs.setdefault("logger", self.default_logger) + kwargs.setdefault("error_cb", locate(self.default_error_handler)()) + + self._producer = ConfluentProducer( + { + "client.id": settings.CLIENT_ID, + **settings.GLOBAL_CONFIG, + **settings.PRODUCER_CONFIG, + **getattr(self, "config", {}), + **(config or {}), + }, + **kwargs, + ) + + def __getattr__(self, name): + """ + proxy producer methods. + """ + if name not in {"config"}: + return getattr(self._producer, name) + raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'") diff --git a/django_kafka/registry.py b/django_kafka/registry.py new file mode 100644 index 0000000..659c161 --- /dev/null +++ b/django_kafka/registry.py @@ -0,0 +1,32 @@ +from functools import wraps +from typing import TYPE_CHECKING, Type + +from django_kafka.exceptions import DjangoKafkaError + +if TYPE_CHECKING: + from django_kafka.consumer import Consumer + + +class ConsumersRegistry: + def __init__(self): + self.__consumers: dict[str, Type[Consumer]] = {} + + def __call__(self): + @wraps(self) + def add_to_registry(consumer_cls: Type["Consumer"]) -> Type["Consumer"]: + self.__consumers[self.make_key(consumer_cls)] = consumer_cls + return consumer_cls + + return add_to_registry + + def __getitem__(self, key: str): + try: + return self.__consumers[key] + except KeyError as error: + raise DjangoKafkaError(f"Consumer `{key}` is not registered.") from error + + def __iter__(self): + yield from self.__consumers.keys() + + def make_key(self, consumer_cls: Type["Consumer"]) -> str: + return f"{consumer_cls.__module__}.{consumer_cls.__name__}" diff --git a/django_kafka/tests/__init__.py b/django_kafka/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/django_kafka/tests/models.py b/django_kafka/tests/models.py new file mode 100644 index 0000000..0f7c928 --- /dev/null +++ b/django_kafka/tests/models.py @@ -0,0 +1,31 @@ +from django.db import connection +from django.db.models import Model +from django.test import TestCase + + +class AbstractModelTestCase(TestCase): + abstract_model: type[Model] + model: type[Model] + + @classmethod + def setUpClass(cls): + class TestModel(cls.abstract_model): + class Meta: + app_label = "django_kafka" + + cls.model = TestModel + + with connection.schema_editor() as editor: + editor.create_model(cls.model) + + # super has to be at the end, otherwise model won't initialize + super().setUpClass() + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + + with connection.schema_editor() as editor: + editor.delete_model(cls.model) + + connection.close() diff --git a/django_kafka/tests/test_consumer.py b/django_kafka/tests/test_consumer.py new file mode 100644 index 0000000..89959e7 --- /dev/null +++ b/django_kafka/tests/test_consumer.py @@ -0,0 +1,220 @@ +from contextlib import suppress +from unittest.mock import MagicMock, Mock, call, patch + +from django.test import TestCase, override_settings + +from django_kafka.conf import SETTINGS_KEY, settings +from django_kafka.consumer import Consumer, Topics + + +class StopWhileTrueException(Exception): + pass + + +class ConsumerTestCase(TestCase): + @patch( + "django_kafka.consumer.Consumer.process_message", + side_effect=StopWhileTrueException(), + ) + @patch( + "django_kafka.consumer.ConfluentConsumer", + **{ + # first iteration - no message, + # second iteration - 1 message represented as boolean value for testing + "return_value.poll.side_effect": [None, True], + }, + ) + def test_start(self, mock_consumer_client, mock_process_message): + class SomeConsumer(Consumer): + topics = MagicMock() + config = {} + + consumer = SomeConsumer() + + # hack to break infinite loop + # `consumer.start` is using `while True:` loop which never ends + # in order to test it we need to break it which is achievable with side_effect + with suppress(StopWhileTrueException): + consumer.start() + + # subscribed to the topics defined by consumer class + mock_consumer_client.return_value.subscribe.assert_called_once_with( + topics=list(consumer.topics), + ) + # poll for message with the frequency defined by consumer class + self.assertEqual( + mock_consumer_client.return_value.poll.call_args_list, + # 2 calls expected as we setup 2 iterations in the test + [call(timeout=consumer.polling_freq)] * 2, + ) + # process_message called once as there was no message on the first iteration + mock_process_message.assert_called_once_with(True) + + @patch("django_kafka.consumer.Consumer.commit_offset") + @patch("django_kafka.consumer.ConfluentConsumer") + def test_process_message_success(self, mock_consumer_client, mock_commit_offset): + class SomeConsumer(Consumer): + topics = MagicMock() + config = {} + + msg = Mock(error=Mock(return_value=False)) + + consumer = SomeConsumer() + + consumer.process_message(msg) + + # check if msg has error before processing + msg.error.assert_called_once_with() + # Topic.consume called + consumer.topics[msg.topic()].consume.assert_called_once_with(msg) + # commit_offset triggered + mock_commit_offset.assert_called_once_with(msg) + + @patch("django_kafka.consumer.Consumer.handle_error") + @patch("django_kafka.consumer.Consumer.commit_offset") + @patch("django_kafka.consumer.ConfluentConsumer") + def test_process_message_msg_error_logged( + self, + mock_consumer_client, + mock_commit_offset, + handle_error, + ): + class SomeConsumer(Consumer): + topics = MagicMock() + config = {} + + msg = Mock(error=Mock(return_value=True)) + + consumer = SomeConsumer() + + consumer.process_message(msg) + + # check if msg has error before processing + msg.error.assert_called_once_with() + # error handler was triggered + handle_error.assert_called_once_with(msg.error.return_value) + # Topic.consume is not called + consumer.topics[msg.topic()].consume.assert_not_called() + # Consumer.commit_offset is not called + mock_commit_offset.assert_not_called() + + @patch("django_kafka.consumer.Consumer.handle_error") + @patch("django_kafka.consumer.Consumer.commit_offset") + @patch("django_kafka.consumer.ConfluentConsumer") + def test_process_message_exception( + self, + mock_consumer_client, + mock_commit_offset, + handle_error, + ): + topic_consume_side_effect = TypeError("test") + topic = Mock( + **{ + "name": "topic", + "consume.side_effect": topic_consume_side_effect, + }, + ) + msg = Mock(**{"topic.return_value": topic.name, "error.return_value": False}) + + class SomeConsumer(Consumer): + topics = Topics( + topic, + ) + config = {} + + consumer = SomeConsumer() + + consumer.process_message(msg) + + # check if msg has error before processing + msg.error.assert_called_once_with() + # Topic.consume was triggered + consumer.topics[msg.topic()].consume.assert_called_once_with(msg) + # error handler was triggered on exception + handle_error.assert_called_once_with(topic_consume_side_effect) + # Consumer.commit_offset is not called + mock_commit_offset.assert_not_called() + + @patch("django_kafka.consumer.ConfluentConsumer") + def test_auto_offset_false(self, mock_consumer_client): + class SomeConsumer(Consumer): + config = {} + + consumer = SomeConsumer({"enable.auto.offset.store": False}) + + msg = Mock() + consumer.commit_offset(msg) + + mock_consumer_client.return_value.store_offsets.assert_called_once_with(msg) + + @patch("django_kafka.consumer.ConfluentConsumer") + def test_auto_offset_true(self, mock_consumer_client): + class SomeConsumer(Consumer): + config = {} + + consumer = SomeConsumer({"enable.auto.offset.store": True}) + + consumer.commit_offset(Mock()) + + mock_consumer_client.return_value.store_offsets.assert_not_called() + + def test_settings_are_correctly_assigned(self): + self.assertEqual(Consumer.polling_freq, settings.POLLING_FREQUENCY) + self.assertEqual(Consumer.default_error_handler, settings.ERROR_HANDLER) + + @override_settings( + **{ + SETTINGS_KEY: { + "CLIENT_ID": "client.id-initial-definition", + "GLOBAL_CONFIG": { + "client.id": "client-id-overridden-by-global-config", + "bootstrap.servers": "defined-in-global-config", + }, + "CONSUMER_CONFIG": { + "bootstrap.servers": "bootstrap.servers-overridden-by-consumer", + "group.id": "group.id-defined-by-consumer-config", + }, + }, + }, + ) + @patch("django_kafka.consumer.ConfluentConsumer") + def test_init_config_merge_override(self, mock_consumer_client): + """ + 1. CLIENT_ID is added to the consumers config + 2. GLOBAL_CONFIG overrides CLIENT_ID (client.id) if it contains one + 3. CONSUMER_CONFIG merges with GLOBAL_CONFIG and overrides keys if any + 4. Consumer.config is merged with CONSUMER_CONFIG and overrides keys if any + 4. Lastly `config` provided to Consumer.__init__ merges and overrides everything + """ + + class SomeConsumer(Consumer): + config = { + "group.id": "group.id.overridden-by-consumer-class", + "enable.auto.offset.store": True, + } + + # Consumer.config is properly merged + consumer = SomeConsumer() + + self.assertDictEqual( + consumer.config, + { + "client.id": "client-id-overridden-by-global-config", + "bootstrap.servers": "bootstrap.servers-overridden-by-consumer", + "group.id": "group.id.overridden-by-consumer-class", + "enable.auto.offset.store": True, + }, + ) + + # config provided to the Consumer.__init__ is properly merged + consumer = SomeConsumer({"enable.auto.offset.store": False}) + + self.assertDictEqual( + consumer.config, + { + "client.id": "client-id-overridden-by-global-config", + "bootstrap.servers": "bootstrap.servers-overridden-by-consumer", + "group.id": "group.id.overridden-by-consumer-class", + "enable.auto.offset.store": False, + }, + ) diff --git a/django_kafka/tests/test_django_kafka_interface.py b/django_kafka/tests/test_django_kafka_interface.py new file mode 100644 index 0000000..22728f4 --- /dev/null +++ b/django_kafka/tests/test_django_kafka_interface.py @@ -0,0 +1,60 @@ +from unittest.mock import patch + +from django.test import TestCase, override_settings + +from django_kafka import DjangoKafka +from django_kafka.conf import SETTINGS_KEY +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.producer import Producer +from django_kafka.registry import ConsumersRegistry + + +class DjangoKafkaTestCase(TestCase): + def test_registry(self): + kafka = DjangoKafka() + + # registry is there (registry itself is tested in a separate testcase) + self.assertIsInstance(kafka.consumers, ConsumersRegistry) + + def test_producer(self): + kafka = DjangoKafka() + + producer = kafka.producer + # producer is there + self.assertIsInstance(producer, Producer) + # producer instance is the same (cached) + self.assertIs(kafka.producer, producer) + + @patch("django_kafka.SchemaRegistryClient") + def test_schema_client(self, mock_SchemaRegistryClient): + kafka = DjangoKafka() + + # exception when config is not provided + with self.assertRaisesMessage( + DjangoKafkaError, + "`SCHEMA_REGISTRY` configuration is not defined.", + ): + # ruff: noqa: B018 + kafka.schema_client + + schema_registry_config = {"url": "http://schema-registry"} + with override_settings( + **{SETTINGS_KEY: {"SCHEMA_REGISTRY": schema_registry_config}}, + ): + schema_client = kafka.schema_client + # porper instance returned + self.assertIs(schema_client, mock_SchemaRegistryClient.return_value) + # instantiated only once (cached) + self.assertIs(kafka.schema_client, mock_SchemaRegistryClient.return_value) + # called with right args + mock_SchemaRegistryClient.assert_called_once_with(schema_registry_config) + + @patch("django_kafka.DjangoKafka.consumers") + def test_start_consumer(self, mock_DjangoKafka_consumers): + kafka = DjangoKafka() + + kafka.start_consumer("path.to.Consumer") + + mock_DjangoKafka_consumers[ + "path.to.Consumer" + ].return_value.start.assert_called_once_with() diff --git a/django_kafka/tests/test_models.py b/django_kafka/tests/test_models.py new file mode 100644 index 0000000..718d401 --- /dev/null +++ b/django_kafka/tests/test_models.py @@ -0,0 +1,99 @@ +from unittest.mock import patch + +from django_kafka.models import KafkaSkipMixin +from django_kafka.tests.models import AbstractModelTestCase + + +class KafkaSkipModelTestCase(AbstractModelTestCase): + abstract_model = KafkaSkipMixin + + @patch("django_kafka.models.super") + def test_save_update_fields_not_in_kwargs(self, super_mock): + """ + kafka_skip should be set to False when update_fields is not provided + """ + save_kwargs = {} + + instance = self.model(pk=1, kafka_skip=True) + + instance.save(**save_kwargs) + + # save sets kafka_skip value to False + self.assertFalse(instance.kafka_skip) + # didn't forget to call super + super_mock.assert_called_once_with() + # save kwargs are not changed + super_mock.return_value.save.assert_called_once_with(**save_kwargs) + + @patch("django_kafka.models.super") + def test_save_update_fields_in_kwargs_without_kafka_skip(self, super_mock): + """ + kafka_skip should be set to False when update_fields does not contain kafka_skip + """ + save_kwargs = { + "update_fields": ["some_field"], + } + instance = self.model(pk=1, kafka_skip=True) + + instance.save(**save_kwargs) + + # save sets kafka_skip value to False + self.assertFalse(instance.kafka_skip) + # didn't forget to call super + super_mock.assert_called_once_with() + # kafka_skip added to the update fields + super_mock.return_value.save.assert_called_once_with( + **{ + **save_kwargs, + "update_fields": ["kafka_skip", *save_kwargs["update_fields"]], + }, + ) + + @patch("django_kafka.models.super") + def test_save_update_fields_in_kwargs_with_kafka_skip(self, super_mock): + """ + kafka_skip should not be changed if provided in update_fields + """ + save_kwargs = { + "update_fields": ["some_field", "kafka_skip"], + } + instance = self.model(pk=1, kafka_skip=True) + + instance.save(**save_kwargs) + + # save does not change kafka_skip value + self.assertTrue(instance.kafka_skip) + # didn't forget to call super + super_mock.assert_called_once_with() + # save kwargs are not changed + super_mock.return_value.save.assert_called_once_with(**save_kwargs) + + @patch("django_kafka.models.super") + def test_queryset_update_sets_kafka_skip(self, super_mock): + """ + `update` method should automatically set `kafka_skip=False` + if `kafka_skip` is not provided in kwargs. + """ + # update_kwargs does not contain kafka_skip + update_kwargs = {"some_field": "some_value"} + + self.model.objects.update(**update_kwargs) + + # kafka_skip=False was added to the update_kwargs + super_mock.return_value.update.assert_called_once_with( + **{"kafka_skip": False, **update_kwargs}, + ) + + @patch("django_kafka.models.super") + def test_queryset_update_does_not_override_kafka_skip(self, super_mock): + """ + `update` method should not change `kafka_skip` + if `kafka_skip` is provided in kwargs + """ + # update_kwargs contains kafka_skip + update_kwargs = {"kafka_skip": True, "some_field": "some_value"} + + self.model.objects.update(**update_kwargs) + + # kafka_skip is not changed, update_kwargs are not changed + super_mock.return_value.update.assert_called_once_with(**update_kwargs) diff --git a/django_kafka/tests/test_registry.py b/django_kafka/tests/test_registry.py new file mode 100644 index 0000000..2e34b7b --- /dev/null +++ b/django_kafka/tests/test_registry.py @@ -0,0 +1,56 @@ +from django.test import TestCase + +from django_kafka.consumer import Consumer +from django_kafka.registry import ConsumersRegistry + + +class ConsumersRegistryTestCase(TestCase): + def test_decorator_adds_to_registry(self): + class ConsumerA(Consumer): + pass + + class ConsumerB(Consumer): + pass + + registry = ConsumersRegistry() + + self.assertIs(registry()(ConsumerA), ConsumerA) + self.assertIs(registry()(ConsumerB), ConsumerB) + + key_a = registry.make_key(ConsumerA) + self.assertIs(registry[key_a], ConsumerA) + + key_b = registry.make_key(ConsumerB) + self.assertIs(registry[key_b], ConsumerB) + + def test_iter_returns_keys(self): + class ConsumerA(Consumer): + pass + + class ConsumerB(Consumer): + pass + + registry = ConsumersRegistry() + + registry()(ConsumerA) + registry()(ConsumerB) + + self.assertListEqual( + list(registry), + [registry.make_key(ConsumerA), registry.make_key(ConsumerB)], + ) + + def test_make_key(self): + class ConsumerA(Consumer): + pass + + class ConsumerB(Consumer): + pass + + registry = ConsumersRegistry() + + key_a = registry.make_key(ConsumerA) + self.assertEqual(key_a, f"{ConsumerA.__module__}.{ConsumerA.__name__}") + + key_b = registry.make_key(ConsumerB) + self.assertEqual(key_b, f"{ConsumerB.__module__}.{ConsumerB.__name__}") diff --git a/django_kafka/tests/test_settings.py b/django_kafka/tests/test_settings.py new file mode 100644 index 0000000..458ef7c --- /dev/null +++ b/django_kafka/tests/test_settings.py @@ -0,0 +1,42 @@ +from unittest.mock import patch + +from django.test import TestCase, override_settings + +from django_kafka.conf import DEFAULTS, SETTINGS_KEY, settings + + +class SettingsTestCase(TestCase): + @patch("django_kafka.consumer.ConfluentConsumer") + def test_defaults(self, mock_consumer_client): + settings_keys = ( + "CLIENT_ID", + "ERROR_HANDLER", + "GLOBAL_CONFIG", + "PRODUCER_CONFIG", + "CONSUMER_CONFIG", + "POLLING_FREQUENCY", + "SCHEMA_REGISTRY", + ) + # make sure defaults re assigned + for key in settings_keys: + self.assertEqual(getattr(settings, key), DEFAULTS[key]) + + # make sure settings defined by user pulled up + user_settings = { + "CLIENT_ID": "client-id", + "ERROR_HANDLER": "error.handler.class", + "GLOBAL_CONFIG": {"bootstrap.servers": "kafka1"}, + "PRODUCER_CONFIG": { + "enable.idempotence": True, + }, + "CONSUMER_CONFIG": { + "group.id": "group-1", + }, + "POLLING_FREQUENCY": 0.5, + "SCHEMA_REGISTRY": { + "url": "https://schema-registry", + }, + } + with override_settings(**{SETTINGS_KEY: user_settings}): + for key in settings_keys: + self.assertEqual(getattr(settings, key), user_settings[key]) diff --git a/django_kafka/tests/test_topic.py b/django_kafka/tests/test_topic.py new file mode 100644 index 0000000..a63b646 --- /dev/null +++ b/django_kafka/tests/test_topic.py @@ -0,0 +1,260 @@ +from unittest.mock import call, patch + +from confluent_kafka.serialization import MessageField +from django.test import TestCase, override_settings + +from django_kafka.conf import SETTINGS_KEY +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.topic import AvroTopic, Topic + + +class SomeTopic(Topic): + name = "some-topic" + + def consume(self, msg): + pass + + +class TopicTestCase(TestCase): + def setUp(self): + self.topic = SomeTopic() + + @patch("django_kafka.topic.Topic.serialize") + @patch("django_kafka.DjangoKafka.producer") + def test_produce_only_value(self, mock_kafka_producer, mock_topic_serialize): + value = "message value" + headers = None # default is None when not provided + + self.topic.produce(value) + + mock_topic_serialize.assert_called_once_with(value, MessageField.VALUE, headers) + mock_kafka_producer.produce.assert_called_once_with( + self.topic.name, + mock_topic_serialize.return_value, + ) + + @patch("django_kafka.topic.Topic.serialize") + @patch("django_kafka.DjangoKafka.producer") + def test_produce_key_is_serialized(self, mock_kafka_producer, mock_topic_serialize): + value = "message value" + key = "some key" + headers = {"header-1": "header-1-value"} + + self.topic.produce(value, key=key, headers=headers) + + self.assertEqual( + mock_topic_serialize.call_args_list, + [ + call(key, MessageField.KEY, headers), + call(value, MessageField.VALUE, headers), + ], + ) + + mock_kafka_producer.produce.assert_called_once_with( + self.topic.name, + mock_topic_serialize.return_value, + key=mock_topic_serialize.return_value, + headers=headers, + ) + + @patch("django_kafka.topic.Topic.key_deserializer") + @patch("django_kafka.topic.Topic.context") + def test_deserialize_key(self, mock_topic_context, mock_key_deserializer): + value = b"some key" + field = MessageField.KEY + + self.topic.deserialize(value, field) + + mock_topic_context.assert_called_once_with(field, None) + mock_key_deserializer.assert_called_once_with( + value, + mock_topic_context.return_value, + ) + + @patch("django_kafka.topic.Topic.value_deserializer") + @patch("django_kafka.topic.Topic.context") + def test_deserialize_value(self, mock_topic_context, mock_value_deserializer): + value = b"some value" + field = MessageField.VALUE + + self.topic.deserialize(value, field) + + mock_topic_context.assert_called_once_with(field, None) + mock_value_deserializer.assert_called_once_with( + value, + mock_topic_context.return_value, + ) + + @patch("django_kafka.topic.Topic.key_deserializer") + @patch("django_kafka.topic.Topic.value_deserializer") + @patch("django_kafka.topic.Topic.context") + def test_deserialize_unknown_field( + self, + mock_topic_context, + mock_value_deserializer, + mock_key_deserializer, + ): + field = "something_unknown" + + with self.assertRaisesMessage( + DjangoKafkaError, + f"Unsupported deserialization field {field}.", + ): + self.topic.deserialize("some value", field) + + mock_topic_context.assert_not_called() + mock_value_deserializer.assert_not_called() + mock_key_deserializer.assert_not_called() + + @patch("django_kafka.topic.Topic.key_serializer") + @patch("django_kafka.topic.Topic.context") + def test_serialize_key(self, mock_topic_context, mock_key_serializer): + value = "some key" + field = MessageField.KEY + + self.topic.serialize(value, field) + + mock_topic_context.assert_called_once_with(field, None) + mock_key_serializer.assert_called_once_with( + value, + mock_topic_context.return_value, + ) + + @patch("django_kafka.topic.Topic.value_serializer") + @patch("django_kafka.topic.Topic.context") + def test_serialize_value(self, mock_topic_context, mock_value_serializer): + value = "some value" + field = MessageField.VALUE + + self.topic.serialize(value, field) + + mock_topic_context.assert_called_once_with(field, None) + mock_value_serializer.assert_called_once_with( + value, + mock_topic_context.return_value, + ) + + @patch("django_kafka.topic.Topic.key_serializer") + @patch("django_kafka.topic.Topic.value_serializer") + @patch("django_kafka.topic.Topic.context") + def test_serialize_unknown_field( + self, + mock_topic_context, + mock_value_serializer, + mock_key_serializer, + ): + field = "something_unknown" + + with self.assertRaisesMessage( + DjangoKafkaError, + f"Unsupported serialization field {field}.", + ): + self.topic.serialize("some value", field) + + mock_topic_context.assert_not_called() + mock_value_serializer.assert_not_called() + mock_key_serializer.assert_not_called() + + def test_context(self): + fields = [MessageField.VALUE, MessageField.KEY] + headers = {"header-1": "header-1-value"} + + for field in fields: + with patch( + "django_kafka.topic.SerializationContext", + ) as mock_serialization_context: + self.topic.context(field, headers) + mock_serialization_context.assert_called_once_with( + self.topic.name, + field, + headers=headers, + ) + + +class ATopic(AvroTopic): + name = "some_topic_with_avro_serialization" + + def consume(self, msg): + pass + + +@override_settings( + **{SETTINGS_KEY: {"SCHEMA_REGISTRY": {"url": "http://schema-registy"}}}, +) +@patch("django_kafka.topic.kafka.schema_client") +class AvroTopicTestCase(TestCase): + def setUp(self): + self.topic = ATopic() + + def test_key_schema(self, mock_kafka_schema_client): + schema = self.topic.key_schema + + # return value of the schema.get_latest_version method call + self.assertEqual( + schema, + mock_kafka_schema_client.get_latest_version.return_value, + ) + # get_latest_version called with right arguments + mock_kafka_schema_client.get_latest_version.assert_called_once_with( + f"{self.topic.name}-key", + ) + + def test_value_schema(self, mock_kafka_schema_client): + schema = self.topic.value_schema + # return value of the schema.get_latest_version method call + self.assertEqual( + schema, + mock_kafka_schema_client.get_latest_version.return_value, + ) + # called with right arguments + mock_kafka_schema_client.get_latest_version.assert_called_once_with( + f"{self.topic.name}-value", + ) + + @patch("django_kafka.topic.AvroSerializer") + def test_key_serializer(self, mock_AvroSerializer, mock_kafka_schema_client): + key_serializer = self.topic.key_serializer + + # returns AvroSerializer instance + self.assertEqual(key_serializer, mock_AvroSerializer.return_value) + # instance was initialized with right arguments + mock_AvroSerializer.assert_called_once_with( + mock_kafka_schema_client, + schema_str=self.topic.key_schema.schema.schema_str, + ) + + @patch("django_kafka.topic.AvroDeserializer") + def test_key_deserializer(self, mock_AvroDeserializer, mock_kafka_schema_client): + key_deserializer = self.topic.key_deserializer + + # returns mock_AvroDeserializer instance + self.assertEqual(key_deserializer, mock_AvroDeserializer.return_value) + # instance was initialized with right arguments + mock_AvroDeserializer.assert_called_once_with( + mock_kafka_schema_client, + schema_str=self.topic.key_schema.schema.schema_str, + ) + + @patch("django_kafka.topic.AvroSerializer") + def test_value_serializer(self, mock_AvroSerializer, mock_kafka_schema_client): + value_serializer = self.topic.value_serializer + + # returns AvroSerializer instance + self.assertEqual(value_serializer, mock_AvroSerializer.return_value) + # instance was initialized with right arguments + mock_AvroSerializer.assert_called_once_with( + mock_kafka_schema_client, + schema_str=self.topic.key_schema.schema.schema_str, + ) + + @patch("django_kafka.topic.AvroDeserializer") + def test_value_deserializer(self, mock_AvroDeserializer, mock_kafka_schema_client): + value_deserializer = self.topic.value_deserializer + + # returns mock_AvroDeserializer instance + self.assertEqual(value_deserializer, mock_AvroDeserializer.return_value) + # instance was initialized with right arguments + mock_AvroDeserializer.assert_called_once_with( + mock_kafka_schema_client, + schema_str=self.topic.key_schema.schema.schema_str, + ) diff --git a/django_kafka/topic.py b/django_kafka/topic.py new file mode 100644 index 0000000..1fd43e9 --- /dev/null +++ b/django_kafka/topic.py @@ -0,0 +1,121 @@ +import logging +from abc import ABC, abstractmethod +from typing import Optional + +from confluent_kafka import cimpl +from confluent_kafka.schema_registry.avro import AvroDeserializer, AvroSerializer +from confluent_kafka.serialization import ( + Deserializer, + MessageField, + SerializationContext, + Serializer, + StringDeserializer, + StringSerializer, +) + +from django_kafka import kafka +from django_kafka.exceptions import DjangoKafkaError + +logger = logging.getLogger(__name__) + + +class Topic(ABC): + key_serializer: Serializer = StringSerializer() + key_deserializer: Deserializer = StringDeserializer() + + value_serializer: Serializer = StringSerializer() + value_deserializer: Deserializer = StringDeserializer() + + @property + @abstractmethod + def name(self) -> str: + """Define Kafka topic name""" + + @abstractmethod + def consume(self, msg: cimpl.Message): + """Implement message processing""" + + def produce(self, value: any, **kwargs): + headers = kwargs.get("headers") + + if "key" in kwargs: + kwargs["key"] = self.serialize(kwargs["key"], MessageField.KEY, headers) + + kafka.producer.produce( + self.name, + self.serialize(value, MessageField.VALUE, headers), + **kwargs, + ) + + def deserialize( + self, value, field: MessageField, headers: Optional[dict | list] = None, + ): + if field == MessageField.VALUE: + return self.value_deserializer( + value, + self.context(MessageField.VALUE, headers), + ) + + if field == MessageField.KEY: + return self.key_deserializer(value, self.context(MessageField.KEY, headers)) + + raise DjangoKafkaError(f"Unsupported deserialization field {field}.") + + def serialize( + self, value, field: MessageField, headers: Optional[dict | list] = None, + ): + if field == MessageField.VALUE: + return self.value_serializer( + value, + self.context(MessageField.VALUE, headers), + ) + + if field == MessageField.KEY: + return self.key_serializer(value, self.context(MessageField.KEY, headers)) + + raise DjangoKafkaError(f"Unsupported serialization field {field}.") + + def context( + self, + field: MessageField, + headers: Optional[dict | list] = None, + ) -> SerializationContext: + return SerializationContext(self.name, field, headers=headers) + + +class AvroTopic(Topic): + @property + def key_schema(self): + return kafka.schema_client.get_latest_version(f"{self.name}-key") + + @property + def value_schema(self): + return kafka.schema_client.get_latest_version(f"{self.name}-value") + + @property + def key_serializer(self): + return AvroSerializer( + kafka.schema_client, + schema_str=self.key_schema.schema.schema_str, + ) + + @property + def key_deserializer(self): + return AvroDeserializer( + kafka.schema_client, + schema_str=self.key_schema.schema.schema_str, + ) + + @property + def value_serializer(self): + return AvroSerializer( + kafka.schema_client, + schema_str=self.value_schema.schema.schema_str, + ) + + @property + def value_deserializer(self): + return AvroDeserializer( + kafka.schema_client, + schema_str=self.value_schema.schema.schema_str, + ) diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..fbd56e0 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,13 @@ +services: + app: + build: . + user: app + command: /app/example/manage.py runserver 0:8000 + volumes: + - .:/app:cached + environment: + SHELL: /bin/bash + IPYTHONDIR: /app/.ipython + HISTFILE: /app/.bash_history + PYTHONPATH: /app # make django_kafka available without installation + restart: "no" diff --git a/example/__init__.py b/example/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/conf/__init__.py b/example/conf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/conf/asgi.py b/example/conf/asgi.py new file mode 100644 index 0000000..8bbbbfe --- /dev/null +++ b/example/conf/asgi.py @@ -0,0 +1,16 @@ +""" +ASGI config for example project. + +It exposes the ASGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/5.0/howto/deployment/asgi/ +""" + +import os + +from django.core.asgi import get_asgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "conf.settings") + +application = get_asgi_application() diff --git a/example/conf/settings.py b/example/conf/settings.py new file mode 100644 index 0000000..3ea555b --- /dev/null +++ b/example/conf/settings.py @@ -0,0 +1,124 @@ +""" +Django settings for example project. + +Generated by 'django-admin startproject' using Django 5.0.6. + +For more information on this file, see +https://docs.djangoproject.com/en/5.0/topics/settings/ + +For the full list of settings and their values, see +https://docs.djangoproject.com/en/5.0/ref/settings/ +""" + +from pathlib import Path + +# Build paths inside the project like this: BASE_DIR / 'subdir'. +BASE_DIR = Path(__file__).resolve().parent.parent + + +# Quick-start development settings - unsuitable for production +# See https://docs.djangoproject.com/en/5.0/howto/deployment/checklist/ + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = "django-insecure-qnl)7o%dg^q_-xt80d5%r94jx!ct4x3qnfelc1qtrwy7h9-a&$" + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +ALLOWED_HOSTS = [ + "0.0.0.0", +] + +# Application definition +INSTALLED_APPS = [ + "django.contrib.admin", + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.messages", + "django.contrib.staticfiles", + "django_kafka", +] + +MIDDLEWARE = [ + "django.middleware.security.SecurityMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.middleware.common.CommonMiddleware", + "django.middleware.csrf.CsrfViewMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "django.middleware.clickjacking.XFrameOptionsMiddleware", +] + +ROOT_URLCONF = "conf.urls" + +TEMPLATES = [ + { + "BACKEND": "django.template.backends.django.DjangoTemplates", + "DIRS": [], + "APP_DIRS": True, + "OPTIONS": { + "context_processors": [ + "django.template.context_processors.debug", + "django.template.context_processors.request", + "django.contrib.auth.context_processors.auth", + "django.contrib.messages.context_processors.messages", + ], + }, + }, +] + +WSGI_APPLICATION = "conf.wsgi.application" + + +# Database +# https://docs.djangoproject.com/en/5.0/ref/settings/#databases + +DATABASES = { + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": BASE_DIR / "db.sqlite3", + }, +} + + +# Password validation +# https://docs.djangoproject.com/en/5.0/ref/settings/#auth-password-validators + +AUTH_PASSWORD_VALIDATORS = [ + { + "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator", + }, + { + "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", + }, + { + "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator", + }, + { + "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator", + }, +] + + +# Internationalization +# https://docs.djangoproject.com/en/5.0/topics/i18n/ + +LANGUAGE_CODE = "en-us" + +TIME_ZONE = "UTC" + +USE_I18N = True + +USE_TZ = True + + +# Static files (CSS, JavaScript, Images) +# https://docs.djangoproject.com/en/5.0/howto/static-files/ + +STATIC_URL = "static/" + +# Default primary key field type +# https://docs.djangoproject.com/en/5.0/ref/settings/#default-auto-field + +DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" diff --git a/example/conf/urls.py b/example/conf/urls.py new file mode 100644 index 0000000..b844c7d --- /dev/null +++ b/example/conf/urls.py @@ -0,0 +1,23 @@ +""" +URL configuration for example project. + +The `urlpatterns` list routes URLs to views. For more information please see: + https://docs.djangoproject.com/en/5.0/topics/http/urls/ +Examples: +Function views + 1. Add an import: from my_app import views + 2. Add a URL to urlpatterns: path('', views.home, name='home') +Class-based views + 1. Add an import: from other_app.views import Home + 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') +Including another URLconf + 1. Import the include() function: from django.urls import include, path + 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) +""" + +from django.contrib import admin +from django.urls import path + +urlpatterns = [ + path("admin/", admin.site.urls), +] diff --git a/example/conf/wsgi.py b/example/conf/wsgi.py new file mode 100644 index 0000000..356fdeb --- /dev/null +++ b/example/conf/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for example project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/5.0/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "conf.settings") + +application = get_wsgi_application() diff --git a/example/manage.py b/example/manage.py new file mode 100755 index 0000000..224087e --- /dev/null +++ b/example/manage.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +"""Django's command-line utility for administrative tasks.""" + +import os +import sys + + +def main(): + """Run administrative tasks.""" + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "conf.settings") + try: + from django.core.management import execute_from_command_line + except ImportError as exc: + raise ImportError( + "Couldn't import Django. Are you sure it's installed and " + "available on your PYTHONPATH environment variable? Did you " + "forget to activate a virtual environment?", + ) from exc + execute_from_command_line(sys.argv) + + +if __name__ == "__main__": + main() diff --git a/example/requirements.txt b/example/requirements.txt new file mode 100644 index 0000000..01bab22 --- /dev/null +++ b/example/requirements.txt @@ -0,0 +1,8 @@ +django==5.0.6 # famous web framework +confluent-kafka==2.4.0 +avro==1.11.3 +fastavro==1.9.4 +requests==2.32 +bump-my-version==0.21.1 # for making releases +ruff==0.4.8 # speed of light linter +setuptools==70.0.0 # without it PyCharm fails to index packages inside the Docker container diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..22436ad --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["setuptools==70.0.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "django-kafka" +version = "0.0.1" +dependencies = [ + "django>=4.0,<6.0", + "confluent-kafka==2.4.0", + "avro==1.11.3", + "fastavro==1.9.4", + "requests==2.32.2" +] +requires-python = ">=3.11" +authors = [ + { name = "RegioHelden GmbH", email = "opensource@regiohelden.de" }, +] +description = "Confluent's Kafka Python Client combined with Django" +readme = { file = "README.md", content-type = "text/markdown" } +license = { file = "LICENSE" } +keywords = ["django", "kafka"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Environment :: Web Environment", + "Framework :: Django", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3.11", +] + +[project.urls] +Repository = "https://github.com/RegioHelden/django-kafka" +Issues = "https://github.com/RegioHelden/django-kafka/issues" +Changelog = "https://github.com/RegioHelden/django-kafka/blob/main/CHANGELOG.md"