Skip to content

Commit

Permalink
feat: add ModelTopicConsumer and DbzModelTopicConsumer, refs #10
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-cardnell-rh committed Sep 17, 2024
1 parent 1d95646 commit d1084e6
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.4.0 (2024-09-17)
* Add ModelTopicConsumer and DbModelTopicConsumer

## 0.3.0 (2024-09-05)
* Add decorator for topic retry and dead letter topic, see `README.md`
* Separate `Topic` class in to `TopicProducer` and `TopicConsumer` classes.
Expand Down
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,67 @@ DJANGO_KAFKA = {

**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.

## Specialized Topics:

### ModelTopicConsumer:

`ModelTopicConsumer` can be used to sync django model instances from abstract kafka events. Simply inherit the class, set the model, the topic to consume from and define a few abstract methods.

```py

from django_kafka.topic.model import ModelTopicConsumer

from my_app.models import MyModel

class MyModelConsumer(ModelTopicConsumer):
name = "topic"
model = MyModel

def check_deletion(self, model, key, value) -> bool:
"""returns if the message represents a deletion"""
return value.pop('__deleted', False)

def get_lookup_kwargs(self, model, key, value) -> dict:
"""returns the lookup kwargs used for filtering the model instance"""
return {"id": key}
```

Model instances will have their attributes synced from the message value. If you need to alter a message key or value before it is assigned, define a `transform_{attr}` method:

```python

class MyModelConsumer(ModelTopicConsumer):
...
def transform_name(model, key, value):
return 'first_name', value["name"].upper()
```

### DbzModelTopicConsumer:

`DbzModelTopicConsumer` helps sync model instances from [debezium source connector](https://debezium.io/documentation/reference/stable/architecture.html) topics. It inherits from `ModelTopicConsumer` and defines default implementations for `check_deletion` and `get_lookup_kwargs` methods.

In Debezium it is possible to [reroute records](https://debezium.io/documentation/reference/stable/transformations/topic-routing.html) from multiple sources to the same topic. In doing so Debezium [inserts a table identifier](https://debezium.io/documentation/reference/stable/transformations/topic-routing.html#_ensure_unique_key) to the key to ensure uniqueness. When this key is inserted, you **must instead** define a `reroute_model_map` to map the table identifier to the model class to be created.

```py

from django_kafka.topic.debezium import DbzModelTopicConsumer

from my_app.models import MyModel, MyOtherModel

class MyModelConsumer(DbzModelTopicConsumer):
name = "debezium_topic"
reroute_model_map = {
'public.my_model': MyModel,
'public.my_other_model': MyOtherModel,
}
```

A few notes:

1. The connector must be using the [event flattening SMT](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html) to simplify the message structure.
2. [Deletions](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#extract-new-record-state-delete-tombstone-handling-mode) are detected automatically based on a null message value or the presence of a `__deleted` field.
3. The message key is assumed to contain the model PK as a field, [which is the default behaviour](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-message-key-columns) for Debezium source connectors. If you need more complicated lookup behaviour, override `get_lookup_kwargs`.

## Non-Blocking Retries:

Add non-blocking retry behaviour to a topic by using the `retry` decorator:
Expand Down
1 change: 0 additions & 1 deletion django_kafka/tests/retry/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def test_build(self):
def test_build__no_retry_topics(self):
class TestConsumer(Consumer):
topics = Topics()
config = {}

self.assertIsNone(RetryConsumer.build(TestConsumer))

Expand Down
4 changes: 0 additions & 4 deletions django_kafka/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class SomeConsumer(Consumer):
def test_run(self, mock_consumer_client, mock_process_message):
class SomeConsumer(Consumer):
topics = MagicMock()
config = {}
log_error = Mock()

consumer = SomeConsumer()
Expand Down Expand Up @@ -65,7 +64,6 @@ class SomeConsumer(Consumer):
def test_process_message_success(self, mock_consumer_client, mock_commit_offset):
class SomeConsumer(Consumer):
topics = MagicMock()
config = {}

msg = Mock(error=Mock(return_value=False))

Expand All @@ -91,7 +89,6 @@ def test_process_message_msg_error_logged(
):
class SomeConsumer(Consumer):
topics = MagicMock()
config = {}

msg = Mock(error=Mock(return_value=True))

Expand Down Expand Up @@ -130,7 +127,6 @@ def test_process_message_exception(

class SomeConsumer(Consumer):
topics = Topics(topic_consumer)
config = {}

consumer = SomeConsumer()

Expand Down
Empty file.
89 changes: 89 additions & 0 deletions django_kafka/tests/topic/test_debezium.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from unittest import mock

from django.db.models import Model
from django.test import TestCase

from django_kafka.exceptions import DjangoKafkaError
from django_kafka.topic.debezium import DbzModelTopicConsumer


class TestDbzModelTopicConsumer(TestCase):
def _get_model_topic_consumer(self) -> DbzModelTopicConsumer:
class SomeModelTopicConsumer(DbzModelTopicConsumer):
name = "name"
model = Model

return SomeModelTopicConsumer()

def test_get_model__uses_reroute_model_map(self):
mock_model = mock.Mock()
topic_consumer = self._get_model_topic_consumer()
topic_consumer.reroute_model_map = {"topic": mock_model}
topic_consumer.reroute_key_field = "table_identifier_key"

self.assertEqual(
topic_consumer.get_model({"table_identifier_key": "topic"}, {}),
mock_model,
)

def test_get_model__uses_direct_model(self):
mock_model = mock.Mock()
topic_consumer = self._get_model_topic_consumer()
topic_consumer.model = mock_model
topic_consumer.reroute_model_map = None

self.assertEqual(topic_consumer.get_model({}, {}), mock_model)

def test_get_model__raises_when_nothing_set(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.model = None
topic_consumer.reroute_model_map = None

with self.assertRaises(DjangoKafkaError):
topic_consumer.get_model({}, {})

def test_get_model__raises_when_reroute_key_present(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.model = Model
topic_consumer.reroute_model_map = None
topic_consumer.reroute_key_field = "table_identifier_key"

with self.assertRaises(DjangoKafkaError):
topic_consumer.get_model({"table_identifier_key": "table"}, {})

def test_check_deletion(self):
topic_consumer = self._get_model_topic_consumer()

self.assertEqual(
topic_consumer.check_deletion(Model, {}, {}),
False,
)
self.assertEqual(
topic_consumer.check_deletion(Model, {}, {"__deleted": False}),
False,
)
self.assertEqual(
topic_consumer.check_deletion(
Model,
{},
{"name": "name", "__deleted": True},
),
True,
)
self.assertEqual(topic_consumer.check_deletion(Model, {}, None), True)

def test_get_lookup_kwargs(self):
topic_consumer = self._get_model_topic_consumer()
mock_model = mock.Mock(**{"_meta.pk.name": "identity_key"})

self.assertEqual(
topic_consumer.get_lookup_kwargs(mock_model, {"identity_key": 1}, {}),
{"identity_key": 1},
)

def test_get_lookup_kwargs__raises_when_pk_not_present(self):
topic_consumer = self._get_model_topic_consumer()
mock_model = mock.Mock(**{"_meta.pk.name": "identity_key"})

with self.assertRaises(KeyError):
topic_consumer.get_lookup_kwargs(mock_model, {"something": 1}, {})
127 changes: 127 additions & 0 deletions django_kafka/tests/topic/test_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from unittest import mock

from django.db.models import Model
from django.test import TestCase

from django_kafka.exceptions import DjangoKafkaError
from django_kafka.models import KafkaSkipMixin
from django_kafka.topic.model import ModelTopicConsumer


class TestModelTopicConsumer(TestCase):
def _get_model_topic_consumer(self):
class SomeModelTopicConsumer(ModelTopicConsumer):
name = "name"
model = Model

def get_lookup_kwargs(self, model, key, value) -> dict:
return {}

def check_deletion(self, *args, **kwargs):
return False

return SomeModelTopicConsumer()

def test_get_defaults(self):
topic_consumer = self._get_model_topic_consumer()

defaults = topic_consumer.get_defaults(model=Model, value={"name": 1})

self.assertEqual(defaults, {"name": 1})

def test_get_defaults__adds_kafka_skip(self):
topic_consumer = self._get_model_topic_consumer()

class KafkaSkip(KafkaSkipMixin):
pass

defaults = topic_consumer.get_defaults(model=KafkaSkip, value={"name": 1})

self.assertEqual(defaults, {"name": 1, "kafka_skip": True})

def test_get_defaults__calls_transform_attr(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.transform_name = mock.Mock(return_value=("name_new", 2))

defaults = topic_consumer.get_defaults(model=Model, value={"name": 1})

topic_consumer.transform_name.assert_called_once_with(
topic_consumer.model,
"name",
1,
)
self.assertEqual(defaults, {"name_new": 2})

def test_sync(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.get_lookup_kwargs = mock.Mock(return_value={"id": "id"})
topic_consumer.get_defaults = mock.Mock(return_value={"name": "name"})
model = mock.Mock()

topic_consumer.sync(model, {"key": "key"}, {"value": "value"})

topic_consumer.get_lookup_kwargs.assert_called_once_with(
model,
{"key": "key"},
{"value": "value"},
)
topic_consumer.get_defaults.assert_called_once_with(
model,
{"value": "value"},
)
model.objects.update_or_create.assert_called_once_with(
id="id",
defaults={"name": "name"},
)

def test_sync__deleted(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.get_lookup_kwargs = mock.Mock(return_value={"id": "id"})
topic_consumer.get_defaults = mock.Mock()
topic_consumer.check_deletion = mock.Mock(return_value=True)
model = mock.Mock()

topic_consumer.sync(model, {"key": "key"}, {"value": "value"})

topic_consumer.get_lookup_kwargs.assert_called_once_with(
model,
{"key": "key"},
{"value": "value"},
)
topic_consumer.get_defaults.assert_not_called()
model.objects.get.assert_called_once_with(id="id")
model.objects.get.return_value.delete.assert_called_once()
model.objects.update_or_create.assert_not_called()

def test_get_model(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.model = mock.Mock()

self.assertEqual(
topic_consumer.get_model({}, {}),
topic_consumer.model,
)

def test_get_model__raises_when_model_not_set(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.model = None

with self.assertRaises(DjangoKafkaError):
topic_consumer.get_model({}, {})

def test_consume(self):
topic_consumer = self._get_model_topic_consumer()
topic_consumer.get_model = mock.Mock()
msg_key = {"key": "key"}
msg_value = {"value": "value"}
topic_consumer.deserialize = mock.Mock(side_effect=[msg_key, msg_value])
topic_consumer.sync = mock.Mock()

topic_consumer.consume(mock.Mock())

topic_consumer.get_model.assert_called_once_with(msg_key, msg_value)
topic_consumer.sync.assert_called_once_with(
topic_consumer.get_model.return_value,
msg_key,
msg_value,
)
34 changes: 34 additions & 0 deletions django_kafka/topic/debezium.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from abc import ABC
from typing import Optional, Type

from django.db.models import Model

from django_kafka.exceptions import DjangoKafkaError
from django_kafka.topic.model import ModelTopicConsumer


class DbzModelTopicConsumer(ModelTopicConsumer, ABC):
"""Syncs a debezium source connector topic directly in to Django model instances"""

reroute_key_field = "__dbz__physicalTableIdentifier"
reroute_model_map: Optional[dict[str, Type[Model]]] = None

def get_model(self, key, value) -> Type[Model]:
if self.reroute_key_field in key:
if not self.reroute_model_map:
raise DjangoKafkaError(
f"To obtain the correct model, reroute_model_map must be set when "
f"`{self.reroute_key_field}` is present in the message key",
)
table = key[self.reroute_key_field]
if table not in self.reroute_model_map:
raise DjangoKafkaError(f"Unrecognised rerouted topic `{table}`")
return self.reroute_model_map[table]
return super().get_model(key, value)

def check_deletion(self, model, key, value) -> bool:
return value is None or value.pop("__deleted", False)

def get_lookup_kwargs(self, model, key, value) -> dict:
pk_field = model._meta.pk.name
return {pk_field: key[pk_field]}
Loading

0 comments on commit d1084e6

Please sign in to comment.