Skip to content

Commit

Permalink
fix: suggested improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-cardnell-rh committed Jun 13, 2024
1 parent 5bd25da commit eaee3ed
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 20 deletions.
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ DJANGO_KAFKA = {
}
```

### Define Topic:
### Define a Topic:

Topic. Defines how to handle incoming message and how to produce outgoing message.
Topics define how to handle incoming messages and how to produce an outgoing message.
```python
from confluent_kafka.serialization import MessageField
from django_kafka.topic import Topic
Expand All @@ -39,9 +39,9 @@ class Topic1(Topic):
# ... process values
```

### Define consumer:
### Define a Consumer:

Consumer. Defines which topics it takes care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel.
Consumer define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel.

Consumers are auto-discovered and are expected to be located under the `consumers.py`.

Expand All @@ -54,10 +54,10 @@ from django_kafka.consumer import Consumer, Topics
from my_app.topics import Topic1


# register your consumer using `DjangoKafka` clas API decorator
# register your consumer using `DjangoKafka` class API decorator
@kafka.consumers()
class MyAppConsumer(Consumer):
# tell to the consumers which topics to process using `django_kafka.consumer.Topics` interface.
# tell the consumers which topics to process using `django_kafka.consumer.Topics` interface.
topics = Topics(
Topic1(),
)
Expand All @@ -70,7 +70,7 @@ class MyAppConsumer(Consumer):
```


### Start consumer(s):
### Start the Consumers:
You can use django management command to start defined consumers.
```bash
./manage.py consume
Expand All @@ -86,21 +86,21 @@ Check [Confluent Python Consumer](https://docs.confluent.io/platform/current/cli


### Produce:
Message are produced using topic instance.
Message are produced using a Topic instance.
```python
from my_app.topics import Topic1

# this will send a message to kafka serializing it using defined serializer
# this will send a message to kafka, serializing it using the defined serializer
Topic1().produce("some message")
```
Check [Confluent Python Producer](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#producer) for API documentation.


### Define schema registry:

The library is using [Confluent SchemaRegistryClient](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). In order to use it there is `SCHEMA_REGISTRY` setting.
The library is using [Confluent's SchemaRegistryClient](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient). In order to use it define a `SCHEMA_REGISTRY` setting.

Find available configs at [SchemaRegistryClient docs](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient).
Find available configs in the [SchemaRegistryClient docs](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#schemaregistryclient).
```python
DJANGO_KAFKA = {
"SCHEMA_REGISTRY": {
Expand All @@ -111,7 +111,7 @@ DJANGO_KAFKA = {

**Note:** take [django_kafka.topic.AvroTopic](./django_kafka/topic.py) as an example if you want to implement a custom Topic with your schema.

## Settings.
## Settings:

**Defaults:**
```python
Expand All @@ -131,7 +131,7 @@ Default: `f"{socket.gethostname()}-python"`

An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.

**Note:** This parameter is included in config of both consumer and producer unless `client.id` is overwritten within `PRODUCER_CONFIG` or `CONSUMER_CONFIG`.
**Note:** This parameter is included in the config of both the consumer and producer unless `client.id` is overwritten within `PRODUCER_CONFIG` or `CONSUMER_CONFIG`.

#### `GLOBAL_CONFIG`
Default: `{}`
Expand All @@ -146,7 +146,7 @@ Defines configurations of the producer. See [configs marked with `P`](https://gi
#### `CONSUMER_CONFIG`
Default: `{}`

Defines configurations of the producer. See [configs marked with `C`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
Defines configurations of the consumer. See [configs marked with `C`](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).

#### `POLLING_FREQUENCY`
Default: 1 # second
Expand All @@ -165,9 +165,9 @@ This is an `error_cb` hook (see [Kafka Client Configuration](https://docs.conflu
It is triggered for client global errors and in case of fatal error it raises `DjangoKafkaException`.


## Bidirectional data sync with no infinite events loop.
## Bidirectional data sync with no infinite event loop.

**For example, you want to keep User table in sync in multiple systems.**
**For example, you want to keep a User table in sync in multiple systems.**

The idea is to send events from all systems to the same topic, and also consume events from the same topic, marking the record with `kafka_skip=True` at the consumption time.
- Producer should respect `kafka_skip=True` and do not produce new events when `True`.
Expand Down Expand Up @@ -218,9 +218,9 @@ class User(KafkaSkipMixin, PermissionsMixin, AbstractBaseUser):

## Making a new release
- [bump-my-version](https://github.com/callowayproject/bump-my-version) is used to manage releases.
- [Ruff](https://github.com/astral-sh/ruff) linter is used to validate the code style. Make sure your code complies the defined rules. You may use `ruff check --fix` for that. Ruf is executed by GitHub actions and the workflow will fail if Ruff validation fails.
- [Ruff](https://github.com/astral-sh/ruff) linter is used to validate the code style. Make sure your code complies withg the defined rules. You may use `ruff check --fix` for that. Ruff is executed by GitHub actions and the workflow will fail if Ruff validation fails.

- Add your changes to the [CHANGELOG](CHANGELOG.md), run
- Add your changes to the [CHANGELOG](CHANGELOG.md), then run
```bash
docker compose run --rm app bump-my-version bump patch <major|minor|patch>
```
Expand Down
3 changes: 3 additions & 0 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def start_consumers(self, consumers: Optional[list[str]] = None):
try:
pool.map(self.start_consumer, consumers)
except KeyboardInterrupt:
# STEFAN: Maybe too many code comments for me, this is kinda standard
# stuff for the multiprocessing library; if anyone's confused they can
# look up the docs for terminate/close/join.
# Stops the worker processes immediately without completing
# outstanding work.
pool.terminate()
Expand Down
2 changes: 2 additions & 0 deletions django_kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__(self, config: Optional[dict] = None, **kwargs):

def __getattr__(self, name):
"""proxy consumer methods."""
# STEFAN: I believe the check against {"config"} is not necessary.
# shouldn't this method just be getattr(self._consumer, name) and nothing else?
if name not in {"config"}:
return getattr(self._consumer, name)
raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'")
Expand Down
4 changes: 2 additions & 2 deletions django_kafka/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class KafkaSkipMixin(models.Model):
For models (tables) which are synced with other database(s) in both directions.
Every update which happens from within the system should set `kafka_skip=False`,
global producer (kafka connect, django post_save signal, etc.) suppose to create
global producer (kafka connect, django post_save signal, etc.) will then create
a new event.
When db update comes from the consumed event, then the row should be manually
Expand All @@ -27,7 +27,7 @@ class KafkaSkipMixin(models.Model):
"Wont generate an event if `True`."
"\nThis field is used to filter out the events to break the infinite loop"
" of message generation when synchronizing 2+ databases."
"\nGets reset to True on .save() method call.",
"\nGets reset to True on .save() method call.", # STEFAN: This last line isn't true?
),
default=False,
)
Expand Down
1 change: 1 addition & 0 deletions django_kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __getattr__(self, name):
"""
proxy producer methods.
"""
# STEFAN: Like consumer.py, I believe the check against {"config"} is not necessary.
if name not in {"config"}:
return getattr(self._producer, name)
raise AttributeError(f"'{self.__class__.__name__}' has no attribute 'name'")
1 change: 1 addition & 0 deletions django_kafka/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ def __getitem__(self, key: str):
def __iter__(self):
yield from self.__consumers.keys()

# STEFAN: Maybe call it get_key? "make_key" sounds like a write operation.
def make_key(self, consumer_cls: Type["Consumer"]) -> str:
return f"{consumer_cls.__module__}.{consumer_cls.__name__}"
1 change: 1 addition & 0 deletions django_kafka/tests/test_django_kafka_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def test_producer(self):
def test_schema_client(self, mock_SchemaRegistryClient):
kafka = DjangoKafka()

# STEFAN: Separate test for the exception?
# exception when config is not provided
with self.assertRaisesMessage(
DjangoKafkaError,
Expand Down
1 change: 1 addition & 0 deletions django_kafka/tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def test_defaults(self, mock_consumer_client):
for key in settings_keys:
self.assertEqual(getattr(settings, key), DEFAULTS[key])

# STEFAN: Separate test? It's not related to defaults anymore
# make sure settings defined by user pulled up
user_settings = {
"CLIENT_ID": "client-id",
Expand Down
2 changes: 2 additions & 0 deletions django_kafka/tests/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ def test_value_schema(self, mock_kafka_schema_client):
f"{self.topic.name}-value",
)

# STEFAN: The mock should be from "confluent_kafka.schema_registry.avro" no?
# Then we are testing we are still using the confluent_kafka serializer
@patch("django_kafka.topic.AvroSerializer")
def test_key_serializer(self, mock_AvroSerializer, mock_kafka_schema_client):
key_serializer = self.topic.key_serializer
Expand Down

0 comments on commit eaee3ed

Please sign in to comment.