Skip to content

Commit

Permalink
Implement functionality to manage configs of the Kafka-Connect connec…
Browse files Browse the repository at this point in the history
…tors.

- implement `kafka_connect` management command to manage connectors configurations.
- implement `Connector` interface to define and publish configs.
- implement `KafkaConnectClient` with several methods to talk to Kafka-Connect REST API.

refs #16
  • Loading branch information
Bogdan Radko committed Sep 27, 2024
1 parent e666e8f commit 7823853
Show file tree
Hide file tree
Showing 10 changed files with 614 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ build/
.bash_history
db.sqlite3
.ruff_cache
docker-compose.override.yaml
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,68 @@ When the consumption of a message in a retryable topic fails, the message is re-

When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic.

## Connectors

Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry.

`django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration.

### Define connector:
```python
# Connectors are discovered automatically when placed under the connectors module
# e.g. ./connectors.py

from django_kafka import kafka
from django_kafka.connect.connector import Connector


@kafka.connectors()
class MyConnector(Connector):
config = {
# configuration for the connector
}
```

### Mark a connector for deletion:

```python
from django_kafka import kafka
from django_kafka.connect.connector import Connector


@kafka.connectors()
class MyConnector(Connector):
mark_for_removal = True
config = {
# configuration for the connector
}
```

### Manage connectors:

django-kafka provides `./manage.py kafka_connect` management command to manage your connectors.


#### Manage a single connector
```bash
./manage.py kafka_connect path.to.my.SpecialConnector --validate --publish --check-status --ignore-failures
````

#### Manage all connectors
```bash
./manage.py kafka_connect --validate --publish --check-status --ignore-failures
````
`--validate` - validates the config over the connect REST API
`--publish` - create or update the connector or delete when `mark_for_removal = True`
`--check-status` - check the status of the connector is `RUNNING`.
`--ignore-failures` - command wont fail if any of the connectors fail to validate or publish.
See `--help`.
## Settings:
**Defaults:**
Expand Down
11 changes: 8 additions & 3 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from multiprocessing.pool import Pool
from typing import Optional
from typing import Optional, TYPE_CHECKING

from confluent_kafka.schema_registry import SchemaRegistryClient
from django.utils.functional import cached_property
Expand All @@ -9,9 +9,13 @@
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry
from django_kafka.registry import ConsumersRegistry, Registry
from django_kafka.retry.settings import RetrySettings

if TYPE_CHECKING:
from django_kafka.connect.connector import Connector
from django_kafka.consumer import Consumer

logger = logging.getLogger(__name__)

__version__ = "0.4.2"
Expand All @@ -24,10 +28,11 @@


def autodiscover():
autodiscover_modules("consumers")
autodiscover_modules("consumers", "connectors")


class DjangoKafka:
connectors = Registry["Connector"]()
consumers = ConsumersRegistry()
retry = RetrySettings

Expand Down
12 changes: 12 additions & 0 deletions django_kafka/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
"SCHEMA_REGISTRY": {},
"CONNECT": {
"HOST": "",
"AUTH": None,
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
"REQUESTS_TIMEOUT": 30, # seconds
},
}


Expand Down
Empty file.
62 changes: 62 additions & 0 deletions django_kafka/connect/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import requests
from requests.auth import AuthBase

from django_kafka.exceptions import DjangoKafkaError


from urllib3.util import Retry
from requests.adapters import HTTPAdapter


from typing import TypedDict


class RetryKwargs(TypedDict):
connect: int
read: int
status: int
backoff_factor: float
status_forcelist: list[int]


class KafkaConnectSession(requests.Session):
def __init__(self, host: str, auth: tuple | AuthBase, retry: RetryKwargs, timeout: int = None):
super().__init__()
self.auth = auth
self.host = host
self.timeout = timeout
self.mount(host, HTTPAdapter(max_retries=Retry(**retry)))

def request(self, method, url, *args, **kwargs) -> requests.Response:
kwargs.setdefault("timeout", self.timeout)
return super().request(method, f"{self.host}{url}", *args, **kwargs)


class KafkaConnectClient:
"""
https://kafka.apache.org/documentation/#connect_rest
https://docs.confluent.io/platform/current/connect/references/restapi.html
"""

def __init__(self, host: str, auth: tuple | AuthBase, retry: RetryKwargs, timeout: int = None):
self._requests = KafkaConnectSession(host, auth, retry, timeout)

def update_or_create(self, connector_name: str, config: dict):
return self._requests.put(f"/connectors/{connector_name}/config", json=config)

def delete(self, connector_name: str):
return self._requests.delete(f"/connectors/{connector_name}")

def validate(self, config: dict):
if not config.get('connector.class'):
raise DjangoKafkaError("'connector.class' config is required for validation.")

connector_class_name = config.get("connector.class").rsplit(".", 1)[-1]
response = self._requests.put(f"/connector-plugins/{connector_class_name}/config/validate", json=config)
return response

def connector_status(self, connector_name: str):
"""
https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-status
"""
return self._requests.get(f"/connectors/{connector_name}/status")
86 changes: 86 additions & 0 deletions django_kafka/connect/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from abc import ABC, abstractmethod
from enum import StrEnum

from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.connect.client import KafkaConnectClient

__all__ = [
"Connector",
"ConnectorStatus",
]


class ConnectorStatus(StrEnum):
"""
https://docs.confluent.io/platform/current/connect/monitoring.html#connector-and-task-status
UNASSIGNED: The connector/task has not yet been assigned to a worker.
RUNNING: The connector/task is running.
PAUSED: The connector/task has been administratively paused.
FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output).
"""
UNASSIGNED = "UNASSIGNED"
RUNNING = "RUNNING"
PAUSED = "PAUSED"


class Connector(ABC):
mark_for_removal = False

@property
def name(self) -> str:
"""Name of the connector."""
return f"{self.__class__.__module__}.{self.__class__.__name__}"

@property
@abstractmethod
def config(self) -> dict:
"""Configurations for the connector."""

def __init__(self):
self.client = KafkaConnectClient(
host=settings.CONNECT["HOST"],
auth=settings.CONNECT["AUTH"],
retry=settings.CONNECT["RETRY"],
timeout=settings.CONNECT["REQUESTS_TIMEOUT"],
)

def publish(self):
if self.mark_for_removal:
return self.delete()
return self.submit()

def delete(self) -> bool:
response = self.client.delete(self.name)

if response.status_code == 404:
return False

if not response.ok:
raise DjangoKafkaError(response.text)

return True

def submit(self):
response = self.client.update_or_create(self.name, self.config)

if not response.ok:
raise DjangoKafkaError(response.text)

return response.json()

def is_valid(self, raise_exception=False) -> bool | None:
response = self.client.validate(self.config)

if raise_exception and not response.ok:
raise DjangoKafkaError(response.text)

return response.ok

def status(self) -> ConnectorStatus:
response = self.client.connector_status(self.name)

if not response.ok:
raise DjangoKafkaError(response.text)

return response.json()["connector"]["state"]
Loading

0 comments on commit 7823853

Please sign in to comment.