Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mdellweg committed Sep 25, 2024
1 parent 361392f commit 09bde15
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 94 deletions.
153 changes: 98 additions & 55 deletions pulpcore/tasking/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,105 @@
from threading import Thread
from typing import Optional

from confluent_kafka import Producer

from django.conf import settings

_logger = logging.getLogger(__name__)
_kafka_producer = None

_bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS")
_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT")
_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL")
_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM")
_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM")
_sasl_username = settings.get("KAFKA_SASL_USERNAME")
_sasl_password = settings.get("KAFKA_SASL_PASSWORD")


class KafkaProducerPollingWorker:
def __init__(self, kafka_producer):
self._kafka_producer = kafka_producer
self._running = False
self._thread = None

def start(self):
self._running = True
self._thread = Thread(target=self._run)
self._thread.start()

def _run(self):
while self._running:
self._kafka_producer.poll(_producer_poll_timeout)
self._kafka_producer.flush()

def stop(self):
self._running = False
self._thread.join()


def get_kafka_producer() -> Optional[Producer]:
global _kafka_producer
if _bootstrap_servers is None:
return None
if _kafka_producer is None:
conf = {
"bootstrap.servers": _bootstrap_servers,
"security.protocol": _security_protocol,
"client.id": socket.gethostname(),
}
optional_conf = {
"ssl.ca.pem": _ssl_ca_pem,
"sasl.mechanisms": _sasl_mechanism,
"sasl.username": _sasl_username,
"sasl.password": _sasl_password,


if _bootstrap_servers is None:

def send_task_notification(task):
pass

else:
from confluent_kafka import Producer

# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols
from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured
from pulpcore.app.serializers.task import TaskStatusMessageSerializer

_logger = logging.getLogger(__name__)
_kafka_producer = None
_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT")
_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL")
_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM")
_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM")
_sasl_username = settings.get("KAFKA_SASL_USERNAME")
_sasl_password = settings.get("KAFKA_SASL_PASSWORD")

_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC")
_kafka_tasks_status_producer_sync_enabled = settings.get(
"KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED"
)

class KafkaProducerPollingWorker:
def __init__(self, kafka_producer):
self._kafka_producer = kafka_producer
self._running = False
self._thread = None

def start(self):
self._running = True
self._thread = Thread(target=self._run)
self._thread.start()

def _run(self):
while self._running:
self._kafka_producer.poll(_producer_poll_timeout)
self._kafka_producer.flush()

def stop(self):
self._running = False
self._thread.join()

def _get_kafka_producer() -> Optional[Producer]:
global _kafka_producer
if _kafka_producer is None:
conf = {
"bootstrap.servers": _bootstrap_servers,
"security.protocol": _security_protocol,
"client.id": socket.gethostname(),
}
optional_conf = {
"ssl.ca.pem": _ssl_ca_pem,
"sasl.mechanisms": _sasl_mechanism,
"sasl.username": _sasl_username,
"sasl.password": _sasl_password,
}
for key, value in optional_conf.items():
if value:
conf[key] = value
_kafka_producer = Producer(conf, logger=_logger)
polling_worker = KafkaProducerPollingWorker(_kafka_producer)
polling_worker.start()
atexit.register(polling_worker.stop)
return _kafka_producer

def _report_message_delivery(error, message):
if error is not None:
_logger.error(error)
elif _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f"Message delivery successfully with contents {message.value}")

def send_task_notification(task):
kafka_producer = _get_kafka_producer()
attributes = {
"type": "pulpcore.tasking.status",
"source": "pulpcore.tasking",
"datacontenttype": "application/json",
"dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml",
}
for key, value in optional_conf.items():
if value:
conf[key] = value
_kafka_producer = Producer(conf, logger=_logger)
polling_worker = KafkaProducerPollingWorker(_kafka_producer)
polling_worker.start()
atexit.register(polling_worker.stop)
return _kafka_producer
data = TaskStatusMessageSerializer(task, context={"request": None}).data
task_message = to_structured(CloudEvent(attributes, data))
kafka_producer.produce(
topic=_kafka_tasks_status_topic,
value=task_message.value,
key=task_message.key,
headers=task_message.headers,
on_delivery=_report_message_delivery,
)
if _kafka_tasks_status_producer_sync_enabled:
kafka_producer.flush()
42 changes: 3 additions & 39 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,23 @@
from datetime import timedelta
from gettext import gettext as _

# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols
from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured
from django.conf import settings
from django.db import connection, transaction
from django.db.models import Model, Max
from django_guid import get_guid
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
from pulpcore.app.models import Task, TaskGroup
from pulpcore.app.serializers.task import TaskStatusMessageSerializer
from pulpcore.app.util import current_task, get_domain, get_prn
from pulpcore.constants import (
TASK_FINAL_STATES,
TASK_INCOMPLETE_STATES,
TASK_STATES,
TASK_DISPATCH_LOCK,
)
from pulpcore.tasking.kafka import get_kafka_producer
from pulpcore.tasking.kafka import send_task_notification

_logger = logging.getLogger(__name__)

_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC")
_kafka_tasks_status_producer_sync_enabled = settings.get("KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED")


def _validate_and_get_resources(resources):
resource_set = set()
Expand Down Expand Up @@ -84,11 +77,11 @@ def _execute_task(task):
task.set_failed(exc, tb)
_logger.info(_("Task %s failed (%s) in domain: %s"), task.pk, exc, domain.name)
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
_send_task_notification(task)
send_task_notification(task)
else:
task.set_completed()
_logger.info(_("Task completed %s in domain: %s"), task.pk, domain.name)
_send_task_notification(task)
send_task_notification(task)


def dispatch(
Expand Down Expand Up @@ -297,32 +290,3 @@ def cancel_task_group(task_group_id):
except RuntimeError:
pass
return task_group


def _send_task_notification(task):
kafka_producer = get_kafka_producer()
if kafka_producer is not None:
attributes = {
"type": "pulpcore.tasking.status",
"source": "pulpcore.tasking",
"datacontenttype": "application/json",
"dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml",
}
data = TaskStatusMessageSerializer(task, context={"request": None}).data
task_message = to_structured(CloudEvent(attributes, data))
kafka_producer.produce(
topic=_kafka_tasks_status_topic,
value=task_message.value,
key=task_message.key,
headers=task_message.headers,
on_delivery=_report_message_delivery,
)
if _kafka_tasks_status_producer_sync_enabled:
kafka_producer.flush()


def _report_message_delivery(error, message):
if error is not None:
_logger.error(error)
elif _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f"Message delivery successfully with contents {message.value}")

0 comments on commit 09bde15

Please sign in to comment.