diff --git a/CHANGES/5762.feature b/CHANGES/5762.feature new file mode 100644 index 0000000000..39fc47523c --- /dev/null +++ b/CHANGES/5762.feature @@ -0,0 +1,2 @@ +Re-enable and refactor the Domain Storage metric emiter. +This is an experimental feature and can change without prior notice. diff --git a/pulpcore/app/models/domain.py b/pulpcore/app/models/domain.py index e85eb0574d..aee7ba81d0 100644 --- a/pulpcore/app/models/domain.py +++ b/pulpcore/app/models/domain.py @@ -72,14 +72,6 @@ def _cleanup_orphans_pre_delete(self): # Delete on by one to properly cleanup the storage. artifact.delete() - # Disabling Storage metrics until we find a solution to resource usage. - # https://github.com/pulp/pulpcore/issues/5468 - # @hook(AFTER_CREATE) - # def _report_domain_disk_usage(self): - # from pulpcore.app.util import DomainMetricsEmitterBuilder - # - # DomainMetricsEmitterBuilder.build(self) - class Meta: permissions = [ ("manage_roles_domain", "Can manage role assignments on domain"), diff --git a/pulpcore/app/tasks/telemetry.py b/pulpcore/app/tasks/telemetry.py new file mode 100644 index 0000000000..76f0cb6bb8 --- /dev/null +++ b/pulpcore/app/tasks/telemetry.py @@ -0,0 +1,40 @@ +import os +from django.db.models import Sum + +from pulpcore.app.models import Artifact + +from opentelemetry.sdk.metrics import MeterProvider + +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import Resource + +# This configuration is needed since the worker thread is not using the opentelemetry +# instrumentation agent to run the task code. + +exporter = OTLPMetricExporter() + +resource = Resource(attributes={"service.name": "pulp-worker"}) + +metric_reader = PeriodicExportingMetricReader(exporter) +provider = MeterProvider(metric_readers=[metric_reader], resource=resource) + + +def emit_domain_space_usage_metric(): + + meter = provider.get_meter(__name__) + space_usage_gauge = meter.create_gauge( + name="space_usage", + description="The total space usage per domain.", + unit="bytes", + ) + + space_utilization_per_domain = Artifact.objects.values("pulp_domain__name").annotate( + total_size=Sum("size", default=0) + ) + + # We're using the same gauge with different attributes for each domain space usage + for domain in space_utilization_per_domain: + space_usage_gauge.set(domain["total_size"], {"domain_name": domain["pulp_domain__name"]}) + + metric_reader.collect() diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index a999068421..2fb97224d7 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -16,9 +16,8 @@ from django.apps import apps from django.conf import settings from django.db import connection -from django.db.models import Model, Sum +from django.db.models import Model from django.urls import Resolver404, resolve -from opentelemetry import metrics from rest_framework.serializers import ValidationError from rest_framework.reverse import reverse as drf_reverse @@ -26,7 +25,6 @@ from pulpcore.app.loggers import deprecation_logger from pulpcore.app.apps import pulp_plugin_configs from pulpcore.app import models -from pulpcore.constants import STORAGE_METRICS_LOCK from pulpcore.exceptions import AdvisoryLockError from pulpcore.exceptions.validation import InvalidSignatureError @@ -463,6 +461,19 @@ def configure_cleanup(): models.TaskSchedule.objects.filter(task_name=task_name).delete() +def configure_periodic_telemetry(): + task_name = "pulpcore.app.tasks.telemetry.emit_domain_space_usage_metric" + dispatch_interval = timedelta(minutes=5) + name = "Emit Domain Space Usage metric periodically" + + if os.getenv("PULP_OTEL_ENABLED", "").lower() == "true" and settings.DOMAIN_ENABLED: + models.TaskSchedule.objects.update_or_create( + name=name, defaults={"task_name": task_name, "dispatch_interval": dispatch_interval} + ) + else: + models.TaskSchedule.objects.filter(task_name=task_name).delete() + + @lru_cache(maxsize=1) def _artifact_serving_distribution(): return models.ArtifactDistribution.objects.get() @@ -613,52 +624,6 @@ def build(cls, *args, **kwargs): return cls._NoopEmitter() -class DomainMetricsEmitter(MetricsEmitter): - """A builder class that initializes an emitter for recording domain's metrics.""" - - def __init__(self, domain): - self.domain = domain - self.meter = metrics.get_meter(f"domain.{domain.name}.meter") - self.instrument = self._init_emitting_total_size() - - def _init_emitting_total_size(self): - return self.meter.create_observable_gauge( - name="disk_usage", - description="The total disk size by domain.", - callbacks=[self._disk_usage_callback()], - unit="Bytes", - ) - - def _disk_usage_callback(self): - try: - with PGAdvisoryLock(STORAGE_METRICS_LOCK): - from pulpcore.app.models import Artifact - - options = yield # noqa - - while True: - artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size") - total_size = artifacts.aggregate(size=Sum("size", default=0))["size"] - options = yield [ # noqa - metrics.Observation( - total_size, - { - "pulp_href": get_url(self.domain), - "domain_name": self.domain.name, - }, - ) - ] - except AdvisoryLockError: - yield - - -def init_domain_metrics_exporter(): - from pulpcore.app.models.domain import Domain - - for domain in Domain.objects.all(): - DomainMetricsEmitter.build(domain) - - @lru_cache(maxsize=1) def get_worker_name(): return f"{os.getpid()}@{socket.gethostname()}" diff --git a/pulpcore/app/wsgi.py b/pulpcore/app/wsgi.py index 9edb867bf9..aaf55613e0 100644 --- a/pulpcore/app/wsgi.py +++ b/pulpcore/app/wsgi.py @@ -44,9 +44,3 @@ def export(self, metrics_data, timeout_millis=10_000, **kwargs): application = get_wsgi_application() if os.getenv("PULP_OTEL_ENABLED", "").lower() == "true": application = OpenTelemetryMiddleware(application, meter_provider=provider) - -# Disabling Storage metrics until we find a solution to resource usage. -# https://github.com/pulp/pulpcore/issues/5468 -# from pulpcore.app.util import init_domain_metrics_exporter # noqa: E402 - -# init_domain_metrics_exporter() diff --git a/pulpcore/tasking/_util.py b/pulpcore/tasking/_util.py index d46c88a114..f90683415a 100644 --- a/pulpcore/tasking/_util.py +++ b/pulpcore/tasking/_util.py @@ -25,6 +25,7 @@ set_domain, configure_analytics, configure_cleanup, + configure_periodic_telemetry, ) from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES from pulpcore.tasking.tasks import dispatch, execute_task @@ -35,6 +36,7 @@ def startup_hook(): configure_analytics() configure_cleanup() + configure_periodic_telemetry() def delete_incomplete_resources(task):