Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add specialised topics ModelTopicConsumer and DbzModelTopicConsumer #12

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.3.0"
current_version = "0.4.0"
parse = "(?P<major>\\d+)\\.(?P<minor>\\d+)\\.(?P<patch>\\d+)"
serialize = ["{major}.{minor}.{patch}"]
search = "{current_version}"
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.4.0 (2024-09-17)
* Add ModelTopicConsumer and DbModelTopicConsumer

## 0.3.0 (2024-09-05)
* Add decorator for topic retry and dead letter topic, see `README.md`
* Separate `Topic` class in to `TopicProducer` and `TopicConsumer` classes.
Expand Down
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,67 @@ DJANGO_KAFKA = {

**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.

## Specialized Topics:

### ModelTopicConsumer:

`ModelTopicConsumer` can be used to sync django model instances from abstract kafka events. Simply inherit the class, set the model, the topic to consume from and define a few abstract methods.

```py

from django_kafka.topic.model import ModelTopicConsumer

from my_app.models import MyModel

class MyModelConsumer(ModelTopicConsumer):
name = "topic"
model = MyModel

def is_deletion(self, model, key, value) -> bool:
"""returns if the message represents a deletion"""
return value.pop('__deleted', False)

def get_lookup_kwargs(self, model, key, value) -> dict:
"""returns the lookup kwargs used for filtering the model instance"""
return {"id": key}
```

Model instances will have their attributes synced from the message value. If you need to alter a message key or value before it is assigned, define a `transform_{attr}` method:

```python

class MyModelConsumer(ModelTopicConsumer):
...
def transform_name(model, key, value):
return 'first_name', value["name"].upper()
```

### DbzModelTopicConsumer:

`DbzModelTopicConsumer` helps sync model instances from [debezium source connector](https://debezium.io/documentation/reference/stable/architecture.html) topics. It inherits from `ModelTopicConsumer` and defines default implementations for `is_deletion` and `get_lookup_kwargs` methods.

In Debezium it is possible to [reroute records](https://debezium.io/documentation/reference/stable/transformations/topic-routing.html) from multiple sources to the same topic. In doing so Debezium [inserts a table identifier](https://debezium.io/documentation/reference/stable/transformations/topic-routing.html#_ensure_unique_key) to the key to ensure uniqueness. When this key is inserted, you **must instead** define a `reroute_model_map` to map the table identifier to the model class to be created.

```py

from django_kafka.topic.debezium import DbzModelTopicConsumer

from my_app.models import MyModel, MyOtherModel

class MyModelConsumer(DbzModelTopicConsumer):
name = "debezium_topic"
reroute_model_map = {
'public.my_model': MyModel,
'public.my_other_model': MyOtherModel,
}
```

A few notes:

1. The connector must be using the [event flattening SMT](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html) to simplify the message structure.
2. [Deletions](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#extract-new-record-state-delete-tombstone-handling-mode) are detected automatically based on a null message value or the presence of a `__deleted` field.
3. The message key is assumed to contain the model PK as a field, [which is the default behaviour](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-message-key-columns) for Debezium source connectors. If you need more complicated lookup behaviour, override `get_lookup_kwargs`.

## Non-Blocking Retries:

Add non-blocking retry behaviour to a topic by using the `retry` decorator:
Expand Down
4 changes: 2 additions & 2 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry
from django_kafka.retry import RetrySettings
from django_kafka.retry.settings import RetrySettings

logger = logging.getLogger(__name__)

__version__ = "0.3.0"
__version__ = "0.4.0"

__all__ = [
"autodiscover",
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions django_kafka/dead_letter/topic.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import re
from typing import TYPE_CHECKING

from django_kafka import settings
from django_kafka.dead_letter.headers import DeadLetterHeader
from django_kafka.conf import settings
from django_kafka.dead_letter.header import DeadLetterHeader
from django_kafka.retry.topic import RetryTopicProducer
from django_kafka.serialization import NoOpSerializer
from django_kafka.topic import TopicProducer
Expand Down
55 changes: 0 additions & 55 deletions django_kafka/retry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,55 +0,0 @@
from typing import TYPE_CHECKING, Optional, Type

from django.utils import timezone

if TYPE_CHECKING:
from django_kafka.topic import TopicConsumer


class RetrySettings:
def __init__(
self,
max_retries: int,
delay: int,
backoff: bool = False,
include: Optional[list[Type[Exception]]] = None,
exclude: Optional[list[Type[Exception]]] = None,
):
"""
:param max_retries: maximum number of retry attempts
:param delay: delay (seconds)
:param backoff: exponential backoff
:param include: exception types to retry for
:param exclude: exception types to exclude from retry
"""
if max_retries <= 0:
raise ValueError("max_retries must be greater than zero")
if delay <= 0:
raise ValueError("delay must be greater than zero")
if include is not None and exclude is not None:
raise ValueError("cannot specify both include and exclude")

self.max_retries = max_retries
self.delay = delay
self.backoff = backoff
self.include = include
self.exclude = exclude

def __call__(self, topic_cls: Type["TopicConsumer"]):
topic_cls.retry_settings = self
return topic_cls

def attempts_exceeded(self, attempt):
return attempt > self.max_retries

def should_retry(self, exc: Exception) -> bool:
if self.include is not None:
return any(isinstance(exc, e) for e in self.include)
if self.exclude is not None:
return not any(isinstance(exc, e) for e in self.exclude)

return True

def get_retry_timestamp(self, attempt: int) -> str:
delay = self.delay * 2 ** (attempt - 1) if self.backoff else self.delay
return str(timezone.now().timestamp() + delay)
2 changes: 1 addition & 1 deletion django_kafka/retry/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django_kafka.conf import settings
from django_kafka.consumer import Consumer, Topics
from django_kafka.dead_letter.topic import DeadLetterTopicProducer
from django_kafka.retry.headers import RetryHeader
from django_kafka.retry.header import RetryHeader
from django_kafka.retry.topic import RetryTopicConsumer

if TYPE_CHECKING:
Expand Down
File renamed without changes.
55 changes: 55 additions & 0 deletions django_kafka/retry/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import TYPE_CHECKING, Optional, Type

from django.utils import timezone

if TYPE_CHECKING:
from django_kafka.topic import TopicConsumer


class RetrySettings:
def __init__(
self,
max_retries: int,
delay: int,
backoff: bool = False,
include: Optional[list[Type[Exception]]] = None,
exclude: Optional[list[Type[Exception]]] = None,
):
"""
:param max_retries: maximum number of retry attempts
:param delay: delay (seconds)
:param backoff: exponential backoff
:param include: exception types to retry for
:param exclude: exception types to exclude from retry
"""
if max_retries <= 0:
raise ValueError("max_retries must be greater than zero")
if delay <= 0:
raise ValueError("delay must be greater than zero")
if include is not None and exclude is not None:
raise ValueError("cannot specify both include and exclude")

self.max_retries = max_retries
self.delay = delay
self.backoff = backoff
self.include = include
self.exclude = exclude

def __call__(self, topic_cls: Type["TopicConsumer"]):
topic_cls.retry_settings = self
return topic_cls

def attempts_exceeded(self, attempt):
return attempt > self.max_retries

def should_retry(self, exc: Exception) -> bool:
if self.include is not None:
return any(isinstance(exc, e) for e in self.include)
if self.exclude is not None:
return not any(isinstance(exc, e) for e in self.exclude)

return True

def get_retry_timestamp(self, attempt: int) -> str:
delay = self.delay * 2 ** (attempt - 1) if self.backoff else self.delay
return str(timezone.now().timestamp() + delay)
4 changes: 2 additions & 2 deletions django_kafka/retry/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.retry.headers import RetryHeader
from django_kafka.retry.header import RetryHeader
from django_kafka.serialization import NoOpSerializer
from django_kafka.topic import TopicConsumer, TopicProducer

if TYPE_CHECKING:
from confluent_kafka import cimpl

from django_kafka.retry import RetrySettings
from django_kafka.retry.settings import RetrySettings


class RetryTopicProducer(TopicProducer):
Expand Down
2 changes: 1 addition & 1 deletion django_kafka/tests/dead_letter/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.test import TestCase, override_settings

from django_kafka.conf import SETTINGS_KEY
from django_kafka.dead_letter.headers import DeadLetterHeader
from django_kafka.dead_letter.header import DeadLetterHeader
from django_kafka.dead_letter.topic import DeadLetterTopicProducer


Expand Down
5 changes: 2 additions & 3 deletions django_kafka/tests/retry/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

from django_kafka.conf import SETTINGS_KEY
from django_kafka.consumer import Consumer, Topics
from django_kafka.retry import RetrySettings
from django_kafka.retry.consumer import RetryConsumer, RetryTopics
from django_kafka.retry.headers import RetryHeader
from django_kafka.retry.header import RetryHeader
from django_kafka.retry.settings import RetrySettings
from django_kafka.topic import TopicConsumer


Expand Down Expand Up @@ -116,7 +116,6 @@ def test_build(self):
def test_build__no_retry_topics(self):
class TestConsumer(Consumer):
topics = Topics()
config = {}

self.assertIsNone(RetryConsumer.build(TestConsumer))

Expand Down
2 changes: 1 addition & 1 deletion django_kafka/tests/retry/test_headers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.test import TestCase
from django.utils import timezone

from django_kafka.retry.headers import RetryHeader
from django_kafka.retry.header import RetryHeader


class RetryHeaderTestCase(TestCase):
Expand Down
2 changes: 1 addition & 1 deletion django_kafka/tests/retry/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from django.test import TestCase

from django_kafka.retry import RetrySettings
from django_kafka.retry.settings import RetrySettings
from django_kafka.topic import Topic


Expand Down
8 changes: 4 additions & 4 deletions django_kafka/tests/retry/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from django_kafka.conf import SETTINGS_KEY
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.retry import RetrySettings
from django_kafka.retry.headers import RetryHeader
from django_kafka.retry.header import RetryHeader
from django_kafka.retry.settings import RetrySettings
from django_kafka.retry.topic import (
RetryTopicConsumer,
RetryTopicProducer,
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_name__uses_settings(self):

self.assertEqual(rt_producer.name, "group.id.topic.name.test-retry.1")

@mock.patch("django_kafka.retry.RetrySettings.get_retry_timestamp")
@mock.patch("django_kafka.retry.settings.RetrySettings.get_retry_timestamp")
def test_retry__first_retry(self, mock_get_retry_timestamp: mock.Mock):
mock_msg = mock.Mock(**{"topic.return_value": "msg_topic"})
retry_settings = RetrySettings(max_retries=5, delay=60)
Expand All @@ -114,7 +114,7 @@ def test_retry__first_retry(self, mock_get_retry_timestamp: mock.Mock):
)
mock_get_retry_timestamp.assert_called_once_with(1)

@mock.patch("django_kafka.retry.RetrySettings.get_retry_timestamp")
@mock.patch("django_kafka.retry.settings.RetrySettings.get_retry_timestamp")
def test_retry__last_retry(self, mock_get_retry_timestamp):
mock_msg = mock.Mock(
**{"topic.return_value": "group.id.msg_topic.test-retry.4"},
Expand Down
4 changes: 0 additions & 4 deletions django_kafka/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class SomeConsumer(Consumer):
def test_run(self, mock_consumer_client, mock_process_message):
class SomeConsumer(Consumer):
topics = MagicMock()
config = {}
log_error = Mock()

consumer = SomeConsumer()
Expand Down Expand Up @@ -65,7 +64,6 @@ class SomeConsumer(Consumer):
def test_process_message_success(self, mock_consumer_client, mock_commit_offset):
class SomeConsumer(Consumer):
topics = MagicMock()
config = {}

msg = Mock(error=Mock(return_value=False))

Expand All @@ -91,7 +89,6 @@ def test_process_message_msg_error_logged(
):
class SomeConsumer(Consumer):
topics = MagicMock()
config = {}

msg = Mock(error=Mock(return_value=True))

Expand Down Expand Up @@ -130,7 +127,6 @@ def test_process_message_exception(

class SomeConsumer(Consumer):
topics = Topics(topic_consumer)
config = {}

consumer = SomeConsumer()

Expand Down
Empty file.
Loading
Loading