Skip to content

Commit

Permalink
Re-enable Storage Metrics emmiter and refactor it.
Browse files Browse the repository at this point in the history
Closes #5762
  • Loading branch information
decko committed Oct 7, 2024
1 parent f5c439c commit 687e7ee
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 63 deletions.
2 changes: 2 additions & 0 deletions CHANGES/5762.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Re-enable the Domain Storage metric emmiter and adds a feature flag to it.
This is an experimental feature and can change without prior notice.
8 changes: 0 additions & 8 deletions pulpcore/app/models/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,6 @@ def _cleanup_orphans_pre_delete(self):
# Delete on by one to properly cleanup the storage.
artifact.delete()

# Disabling Storage metrics until we find a solution to resource usage.
# https://github.com/pulp/pulpcore/issues/5468
# @hook(AFTER_CREATE)
# def _report_domain_disk_usage(self):
# from pulpcore.app.util import DomainMetricsEmitterBuilder
#
# DomainMetricsEmitterBuilder.build(self)

class Meta:
permissions = [
("manage_roles_domain", "Can manage role assignments on domain"),
Expand Down
44 changes: 44 additions & 0 deletions pulpcore/app/tasks/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
from django.db.models import Sum

from pulpcore.app.models import Artifact

from opentelemetry.sdk.metrics import MeterProvider

from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource

# This configuration is needed since the worker thread is not using the opentelemetry
# instrumentation agent to run the task code.

OTLP_EXPORTER_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318/")

exporter = OTLPMetricExporter()

resource = Resource(attributes={"service.name": "pulp-worker"})

metric_reader = PeriodicExportingMetricReader(
exporter, export_interval_millis=3000, export_timeout_millis=3000
)
provider = MeterProvider(metric_readers=[metric_reader], resource=resource)


def emit_domain_space_usage_metric():

meter = provider.get_meter(__name__)
space_usage_gauge = meter.create_gauge(
name="space_usage",
description="The total space usage per domain.",
unit="bytes",
)

space_utilization_per_domain = Artifact.objects.values("pulp_domain__name").annotate(
total_size=Sum("size", default=0)
)

# We're using the same gauge with different attributes for each domain space usage
for domain in space_utilization_per_domain:
space_usage_gauge.set(domain["total_size"], {"domain_name": domain["pulp_domain__name"]})

metric_reader.collect()
63 changes: 14 additions & 49 deletions pulpcore/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
from django.apps import apps
from django.conf import settings
from django.db import connection
from django.db.models import Model, Sum
from django.db.models import Model
from django.urls import Resolver404, resolve
from opentelemetry import metrics

from rest_framework.serializers import ValidationError
from rest_framework.reverse import reverse as drf_reverse

from pulpcore.app.loggers import deprecation_logger
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app import models
from pulpcore.constants import STORAGE_METRICS_LOCK
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.exceptions.validation import InvalidSignatureError

Expand Down Expand Up @@ -463,6 +461,19 @@ def configure_cleanup():
models.TaskSchedule.objects.filter(task_name=task_name).delete()


def configure_periodic_telemetry():
task_name = "pulpcore.app.tasks.telemetry.emit_domain_space_usage_metric"
dispatch_interval = timedelta(minutes=5)
name = "Emit Domain Space Usage metric periodically"

if os.getenv("PULP_OTEL_ENABLED", "").lower() == "true" and settings.DOMAIN_ENABLED:
models.TaskSchedule.objects.update_or_create(
name=name, defaults={"task_name": task_name, "dispatch_interval": dispatch_interval}
)
else:
models.TaskSchedule.objects.filter(task_name=task_name).delete()


@lru_cache(maxsize=1)
def _artifact_serving_distribution():
return models.ArtifactDistribution.objects.get()
Expand Down Expand Up @@ -613,52 +624,6 @@ def build(cls, *args, **kwargs):
return cls._NoopEmitter()


class DomainMetricsEmitter(MetricsEmitter):
"""A builder class that initializes an emitter for recording domain's metrics."""

def __init__(self, domain):
self.domain = domain
self.meter = metrics.get_meter(f"domain.{domain.name}.meter")
self.instrument = self._init_emitting_total_size()

def _init_emitting_total_size(self):
return self.meter.create_observable_gauge(
name="disk_usage",
description="The total disk size by domain.",
callbacks=[self._disk_usage_callback()],
unit="Bytes",
)

def _disk_usage_callback(self):
try:
with PGAdvisoryLock(STORAGE_METRICS_LOCK):
from pulpcore.app.models import Artifact

options = yield # noqa

while True:
artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size")
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
options = yield [ # noqa
metrics.Observation(
total_size,
{
"pulp_href": get_url(self.domain),
"domain_name": self.domain.name,
},
)
]
except AdvisoryLockError:
yield


def init_domain_metrics_exporter():
from pulpcore.app.models.domain import Domain

for domain in Domain.objects.all():
DomainMetricsEmitter.build(domain)


@lru_cache(maxsize=1)
def get_worker_name():
return f"{os.getpid()}@{socket.gethostname()}"
Expand Down
6 changes: 0 additions & 6 deletions pulpcore/app/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,3 @@ def export(self, metrics_data, timeout_millis=10_000, **kwargs):
application = get_wsgi_application()
if os.getenv("PULP_OTEL_ENABLED", "").lower() == "true":
application = OpenTelemetryMiddleware(application, meter_provider=provider)

# Disabling Storage metrics until we find a solution to resource usage.
# https://github.com/pulp/pulpcore/issues/5468
# from pulpcore.app.util import init_domain_metrics_exporter # noqa: E402

# init_domain_metrics_exporter()
2 changes: 2 additions & 0 deletions pulpcore/tasking/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
set_domain,
configure_analytics,
configure_cleanup,
configure_periodic_telemetry,
)
from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES
from pulpcore.tasking.tasks import dispatch, execute_task
Expand All @@ -35,6 +36,7 @@
def startup_hook():
configure_analytics()
configure_cleanup()
configure_periodic_telemetry()


def delete_incomplete_resources(task):
Expand Down

0 comments on commit 687e7ee

Please sign in to comment.