diff --git a/README.md b/README.md index bee00f1..83e33e6 100644 --- a/README.md +++ b/README.md @@ -22,9 +22,9 @@ DJANGO_KAFKA = { } ``` -### Define Topic: +### Define a Topic: -Topic. Defines how to handle incoming message and how to produce outgoing message. +Topics define how to handle incoming messages and how to produce an outgoing message. ```python from confluent_kafka.serialization import MessageField from django_kafka.topic import Topic @@ -39,9 +39,9 @@ class Topic1(Topic): # ... process values ``` -### Define consumer: +### Define a Consumer: -Consumer. Defines which topics it takes care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel. +Consumer define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel. Consumers are auto-discovered and are expected to be located under the `consumers.py`. @@ -54,10 +54,10 @@ from django_kafka.consumer import Consumer, Topics from my_app.topics import Topic1 -# register your consumer using `DjangoKafka` clas API decorator +# register your consumer using `DjangoKafka` class API decorator @kafka.consumers() class MyAppConsumer(Consumer): - # tell to the consumers which topics to process using `django_kafka.consumer.Topics` interface. + # tell the consumers which topics to process using `django_kafka.consumer.Topics` interface. topics = Topics( Topic1(), ) @@ -70,7 +70,7 @@ class MyAppConsumer(Consumer): ``` -### Start consumer(s): +### Start the Consumers: You can use django management command to start defined consumers. ```bash ./manage.py consume @@ -86,11 +86,11 @@ Check [Confluent Python Consumer](https://docs.confluent.io/platform/current/cli ### Produce: -Message are produced using topic instance. +Message are produced using a Topic instance. ```python from my_app.topics import Topic1 -# this will send a message to kafka serializing it using defined serializer +# this will send a message to kafka, serializing it using the defined serializer Topic1().produce("some message") ``` Check [Confluent Python Producer](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#producer) for API documentation. @@ -98,9 +98,9 @@ Check [Confluent Python Producer](https://docs.confluent.io/platform/current/cli ### Define schema registry: -The library is using [Confluent SchemaRegistryClient](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). In order to use it there is `SCHEMA_REGISTRY` setting. +The library is using [Confluent's SchemaRegistryClient](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). In order to use it define a `SCHEMA_REGISTRY` setting. -Find available configs at [SchemaRegistryClient docs](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). +Find available configs in the [SchemaRegistryClient docs](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). ```python DJANGO_KAFKA = { "SCHEMA_REGISTRY": { @@ -111,7 +111,7 @@ DJANGO_KAFKA = { **Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema. -## Settings. +## Settings: **Defaults:** ```python @@ -131,7 +131,7 @@ Default: `f"{socket.gethostname()}-python"` An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. -**Note:** This parameter is included in config of both consumer and producer unless `client.id` is overwritten within `PRODUCER_CONFIG` or `CONSUMER_CONFIG`. +**Note:** This parameter is included in the config of both the consumer and producer unless `client.id` is overwritten within `PRODUCER_CONFIG` or `CONSUMER_CONFIG`. #### `GLOBAL_CONFIG` Default: `{}` @@ -146,7 +146,7 @@ Defines configurations of the producer. See [configs marked with `P`](https://gi #### `CONSUMER_CONFIG` Default: `{}` -Defines configurations of the producer. See [configs marked with `C`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). +Defines configurations of the consumer. See [configs marked with `C`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md). #### `POLLING_FREQUENCY` Default: 1 # second @@ -165,9 +165,9 @@ This is an `error_cb` hook (see [Kafka Client Configuration](https://docs.conflu It is triggered for client global errors and in case of fatal error it raises `DjangoKafkaException`. -## Bidirectional data sync with no infinite events loop. +## Bidirectional data sync with no infinite event loop. -**For example, you want to keep User table in sync in multiple systems.** +**For example, you want to keep a User table in sync in multiple systems.** The idea is to send events from all systems to the same topic, and also consume events from the same topic, marking the record with `kafka_skip=True` at the consumption time. - Producer should respect `kafka_skip=True` and do not produce new events when `True`. @@ -218,9 +218,9 @@ class User(KafkaSkipMixin, PermissionsMixin, AbstractBaseUser): ## Making a new release - [bump-my-version](https://github.com/callowayproject/bump-my-version) is used to manage releases. -- [Ruff](https://github.com/astral-sh/ruff) linter is used to validate the code style. Make sure your code complies the defined rules. You may use `ruff check --fix` for that. Ruf is executed by GitHub actions and the workflow will fail if Ruff validation fails. +- [Ruff](https://github.com/astral-sh/ruff) linter is used to validate the code style. Make sure your code complies withg the defined rules. You may use `ruff check --fix` for that. Ruff is executed by GitHub actions and the workflow will fail if Ruff validation fails. -- Add your changes to the [CHANGELOG](CHANGELOG.md), run +- Add your changes to the [CHANGELOG](CHANGELOG.md), then run ```bash docker compose run --rm app bump-my-version bump patch ``` diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index 170494d..19a8947 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -54,6 +54,9 @@ def start_consumers(self, consumers: Optional[list[str]] = None): try: pool.map(self.start_consumer, consumers) except KeyboardInterrupt: + # STEFAN: Maybe too many code comments for me, this is kinda standard + # stuff for the multiprocessing library; if anyone's confused they can + # look up the docs for terminate/close/join. # Stops the worker processes immediately without completing # outstanding work. pool.terminate() diff --git a/django_kafka/consumer.py b/django_kafka/consumer.py index f82a875..e2784e4 100644 --- a/django_kafka/consumer.py +++ b/django_kafka/consumer.py @@ -52,6 +52,8 @@ def __init__(self, config: Optional[dict] = None, **kwargs): def __getattr__(self, name): """proxy consumer methods.""" + # STEFAN: I believe the check against {"config"} is not necessary. + # shouldn't this method just be getattr(self._consumer, name) and nothing else? if name not in {"config"}: return getattr(self._consumer, name) raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'") diff --git a/django_kafka/models.py b/django_kafka/models.py index d3470ba..0fd870c 100644 --- a/django_kafka/models.py +++ b/django_kafka/models.py @@ -13,7 +13,7 @@ class KafkaSkipMixin(models.Model): For models (tables) which are synced with other database(s) in both directions. Every update which happens from within the system should set `kafka_skip=False`, - global producer (kafka connect, django post_save signal, etc.) suppose to create + global producer (kafka connect, django post_save signal, etc.) will then create a new event. When db update comes from the consumed event, then the row should be manually @@ -27,7 +27,7 @@ class KafkaSkipMixin(models.Model): "Wont generate an event if `True`." "\nThis field is used to filter out the events to break the infinite loop" " of message generation when synchronizing 2+ databases." - "\nGets reset to True on .save() method call.", + "\nGets reset to True on .save() method call.", # STEFAN: This last line isn't true? ), default=False, ) diff --git a/django_kafka/producer.py b/django_kafka/producer.py index 90c19f1..96ae348 100644 --- a/django_kafka/producer.py +++ b/django_kafka/producer.py @@ -45,6 +45,7 @@ def __getattr__(self, name): """ proxy producer methods. """ + # STEFAN: Like consumer.py, I believe the check against {"config"} is not necessary. if name not in {"config"}: return getattr(self._producer, name) raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'") diff --git a/django_kafka/registry.py b/django_kafka/registry.py index 659c161..f435ad4 100644 --- a/django_kafka/registry.py +++ b/django_kafka/registry.py @@ -28,5 +28,6 @@ def __getitem__(self, key: str): def __iter__(self): yield from self.__consumers.keys() + # STEFAN: Maybe call it get_key? "make_key" sounds like a write operation. def make_key(self, consumer_cls: Type["Consumer"]) -> str: return f"{consumer_cls.__module__}.{consumer_cls.__name__}" diff --git a/django_kafka/tests/test_django_kafka_interface.py b/django_kafka/tests/test_django_kafka_interface.py index 22728f4..8544bef 100644 --- a/django_kafka/tests/test_django_kafka_interface.py +++ b/django_kafka/tests/test_django_kafka_interface.py @@ -29,6 +29,7 @@ def test_producer(self): def test_schema_client(self, mock_SchemaRegistryClient): kafka = DjangoKafka() + # STEFAN: Separate test for the exception? # exception when config is not provided with self.assertRaisesMessage( DjangoKafkaError, diff --git a/django_kafka/tests/test_settings.py b/django_kafka/tests/test_settings.py index 458ef7c..28a6ee2 100644 --- a/django_kafka/tests/test_settings.py +++ b/django_kafka/tests/test_settings.py @@ -21,6 +21,7 @@ def test_defaults(self, mock_consumer_client): for key in settings_keys: self.assertEqual(getattr(settings, key), DEFAULTS[key]) + # STEFAN: Separate test? It's not related to defaults anymore # make sure settings defined by user pulled up user_settings = { "CLIENT_ID": "client-id", diff --git a/django_kafka/tests/test_topic.py b/django_kafka/tests/test_topic.py index a63b646..f1b5e64 100644 --- a/django_kafka/tests/test_topic.py +++ b/django_kafka/tests/test_topic.py @@ -211,6 +211,8 @@ def test_value_schema(self, mock_kafka_schema_client): f"{self.topic.name}-value", ) + # STEFAN: The mock should be from "confluent_kafka.schema_registry.avro" no? + # Then we are testing we are still using the confluent_kafka serializer @patch("django_kafka.topic.AvroSerializer") def test_key_serializer(self, mock_AvroSerializer, mock_kafka_schema_client): key_serializer = self.topic.key_serializer