Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(profiling): enable endpoint profiling using libdatadog exporter [backport-2.12] #10787

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <map>
#include <stddef.h>
#include <stdint.h>
#include <string_view>
Expand Down Expand Up @@ -31,6 +32,8 @@ extern "C"
bool ddup_is_initialized();
void ddup_start();
void ddup_set_runtime_id(std::string_view runtime_id);
void ddup_profile_set_endpoints(std::map<int64_t, std::string_view> span_ids_to_endpoints);
void ddup_profile_add_endpoint_counts(std::map<std::string_view, int64_t> trace_endpoints_to_counts);
bool ddup_upload();

// Proxy functions to the underlying sample
Expand All @@ -48,10 +51,9 @@ extern "C"
std::string_view thread_name);
void ddup_push_task_id(Datadog::Sample* sample, int64_t task_id);
void ddup_push_task_name(Datadog::Sample* sample, std::string_view task_name);
void ddup_push_span_id(Datadog::Sample* sample, int64_t span_id);
void ddup_push_local_root_span_id(Datadog::Sample* sample, int64_t local_root_span_id);
void ddup_push_span_id(Datadog::Sample* sample, uint64_t span_id);
void ddup_push_local_root_span_id(Datadog::Sample* sample, uint64_t local_root_span_id);
void ddup_push_trace_type(Datadog::Sample* sample, std::string_view trace_type);
void ddup_push_trace_resource_container(Datadog::Sample* sample, std::string_view trace_resource_container);
void ddup_push_exceptioninfo(Datadog::Sample* sample, std::string_view exception_type, int64_t count);
void ddup_push_class_name(Datadog::Sample* sample, std::string_view class_name);
void ddup_push_frame(Datadog::Sample* sample,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ namespace Datadog {
X(span_id, "span id") \
X(local_root_span_id, "local root span id") \
X(trace_type, "trace type") \
X(trace_resource_container, "trace resource container") \
X(trace_endpoint, "trace endpoint") \
X(class_name, "class name") \
X(lock_name, "lock name")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class Sample
bool push_span_id(uint64_t span_id);
bool push_local_root_span_id(uint64_t local_root_span_id);
bool push_trace_type(std::string_view trace_type);
bool push_trace_resource_container(std::string_view trace_resource_container);
bool push_exceptioninfo(std::string_view exception_type, int64_t count);
bool push_class_name(std::string_view class_name);
bool push_monotonic_ns(int64_t monotonic_ns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ ddup_push_task_name(Datadog::Sample* sample, std::string_view task_name) // cppc
}

void
ddup_push_span_id(Datadog::Sample* sample, int64_t span_id) // cppcheck-suppress unusedFunction
ddup_push_span_id(Datadog::Sample* sample, uint64_t span_id) // cppcheck-suppress unusedFunction
{
sample->push_span_id(span_id);
}

void
ddup_push_local_root_span_id(Datadog::Sample* sample, int64_t local_root_span_id) // cppcheck-suppress unusedFunction
ddup_push_local_root_span_id(Datadog::Sample* sample, uint64_t local_root_span_id) // cppcheck-suppress unusedFunction
{
sample->push_local_root_span_id(local_root_span_id);
}
Expand All @@ -235,13 +235,6 @@ ddup_push_trace_type(Datadog::Sample* sample, std::string_view trace_type) // cp
sample->push_trace_type(trace_type);
}

void
ddup_push_trace_resource_container(Datadog::Sample* sample, // cppcheck-suppress unusedFunction
std::string_view trace_resource_container)
{
sample->push_trace_resource_container(trace_resource_container);
}

void
ddup_push_exceptioninfo(Datadog::Sample* sample, // cppcheck-suppress unusedFunction
std::string_view exception_type,
Expand Down Expand Up @@ -314,3 +307,38 @@ ddup_upload() // cppcheck-suppress unusedFunction
}
return success;
}

void
ddup_profile_set_endpoints(
std::map<int64_t, std::string_view> span_ids_to_endpoints) // cppcheck-suppress unusedFunction
{
ddog_prof_Profile& profile = Datadog::Sample::profile_borrow();
for (const auto& [span_id, trace_endpoint] : span_ids_to_endpoints) {
ddog_CharSlice trace_endpoint_slice = Datadog::to_slice(trace_endpoint);
auto res = ddog_prof_Profile_set_endpoint(&profile, span_id, trace_endpoint_slice);
if (!res.ok) {
auto err = res.err;
const std::string errmsg = Datadog::err_to_msg(&err, "Error setting endpoint");
std::cerr << errmsg << std::endl;
ddog_Error_drop(&err);
}
}
Datadog::Sample::profile_release();
}

void
ddup_profile_add_endpoint_counts(std::map<std::string_view, int64_t> trace_endpoints_to_counts)
{
ddog_prof_Profile& profile = Datadog::Sample::profile_borrow();
for (const auto& [trace_endpoint, count] : trace_endpoints_to_counts) {
ddog_CharSlice trace_endpoint_slice = Datadog::to_slice(trace_endpoint);
auto res = ddog_prof_Profile_add_endpoint_count(&profile, trace_endpoint_slice, count);
if (!res.ok) {
auto err = res.err;
const std::string errmsg = Datadog::err_to_msg(&err, "Error adding endpoint count");
std::cerr << errmsg << std::endl;
ddog_Error_drop(&err);
}
}
Datadog::Sample::profile_release();
}
10 changes: 0 additions & 10 deletions ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,6 @@ Datadog::Sample::push_trace_type(std::string_view trace_type)
return true;
}

bool
Datadog::Sample::push_trace_resource_container(std::string_view trace_resource_container)
{
if (!push_label(ExportLabelKey::trace_resource_container, trace_resource_container)) {
std::cout << "bad push" << std::endl;
return false;
}
return true;
}

bool
Datadog::Sample::push_class_name(std::string_view class_name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Datadog::Uploader::upload(ddog_prof_Profile& profile)
ddog_prof_Exporter_Slice_File_empty(),
{ .ptr = &file, .len = 1 },
nullptr,
nullptr,
encoded->endpoints_stats,
nullptr,
nullptr);
ddog_prof_EncodedProfile_drop(encoded);
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/internal/datadog/profiling/ddup/_ddup.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ class SampleHandle:
def push_task_name(self, task_name: StringType) -> None: ...
def push_exceptioninfo(self, exc_type: Union[None, bytes, str, type], count: int) -> None: ...
def push_class_name(self, class_name: StringType) -> None: ...
def push_span(self, span: Optional[Span], endpoint_collection_enabled: bool) -> None: ...
def push_span(self, span: Optional[Span]) -> None: ...
def push_monotonic_ns(self, monotonic_ns: int) -> None: ...
def flush_sample(self) -> None: ...
43 changes: 33 additions & 10 deletions ddtrace/internal/datadog/profiling/ddup/_ddup.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ from typing import Dict
from typing import Optional
from typing import Union

from libcpp.map cimport map
from libcpp.utility cimport pair

import ddtrace
import platform
from .._types import StringType
Expand Down Expand Up @@ -46,6 +49,10 @@ cdef extern from "ddup_interface.hpp":
void ddup_config_sample_type(unsigned int type)

void ddup_start()
void ddup_set_runtime_id(string_view _id)
void ddup_profile_set_endpoints(map[int64_t, string_view] span_ids_to_endpoints)
void ddup_profile_add_endpoint_counts(map[string_view, int64_t] trace_endpoints_to_counts)
bint ddup_upload() nogil

Sample *ddup_start_sample()
void ddup_push_walltime(Sample *sample, int64_t walltime, int64_t count)
Expand All @@ -61,15 +68,12 @@ cdef extern from "ddup_interface.hpp":
void ddup_push_span_id(Sample *sample, uint64_t span_id)
void ddup_push_local_root_span_id(Sample *sample, uint64_t local_root_span_id)
void ddup_push_trace_type(Sample *sample, string_view trace_type)
void ddup_push_trace_resource_container(Sample *sample, string_view trace_resource_container)
void ddup_push_exceptioninfo(Sample *sample, string_view exception_type, int64_t count)
void ddup_push_class_name(Sample *sample, string_view class_name)
void ddup_push_frame(Sample *sample, string_view _name, string_view _filename, uint64_t address, int64_t line)
void ddup_push_monotonic_ns(Sample *sample, int64_t monotonic_ns)
void ddup_flush_sample(Sample *sample)
void ddup_drop_sample(Sample *sample)
void ddup_set_runtime_id(string_view _id)
bint ddup_upload() nogil

# Create wrappers for cython
cdef call_ddup_config_service(bytes service):
Expand Down Expand Up @@ -171,6 +175,31 @@ def start() -> None:
def upload() -> None:
runtime_id = ensure_binary_or_empty(get_runtime_id())
ddup_set_runtime_id(string_view(<const char*>runtime_id, len(runtime_id)))

processor = ddtrace.tracer._endpoint_call_counter_span_processor
endpoint_counts, endpoint_to_span_ids = processor.reset()

cdef map[int64_t, string_view] span_ids_to_endpoints = map[int64_t, string_view]()
for endpoint, span_ids in endpoint_to_span_ids.items():
endpoint_bytes = ensure_binary_or_empty(endpoint)
for span_id in span_ids:
span_ids_to_endpoints.insert(
pair[int64_t, string_view](
clamp_to_uint64_unsigned(span_id),
string_view(<const char*>endpoint_bytes, len(endpoint_bytes))
)
)
ddup_profile_set_endpoints(span_ids_to_endpoints)

cdef map[string_view, int64_t] trace_endpoints_to_counts = map[string_view, int64_t]()
for endpoint, cnt in endpoint_counts.items():
endpoint_bytes = ensure_binary_or_empty(endpoint)
trace_endpoints_to_counts.insert(pair[string_view, int64_t](
string_view(<const char*>endpoint_bytes, len(endpoint_bytes)),
clamp_to_int64_unsigned(cnt)
))
ddup_profile_add_endpoint_counts(trace_endpoints_to_counts)

with nogil:
ddup_upload()

Expand Down Expand Up @@ -269,7 +298,7 @@ cdef class SampleHandle:
class_name_bytes = ensure_binary_or_empty(class_name)
ddup_push_class_name(self.ptr, string_view(<const char*>class_name_bytes, len(class_name_bytes)))

def push_span(self, span: Optional[Span], endpoint_collection_enabled: bool) -> None:
def push_span(self, span: Optional[Span]) -> None:
if self.ptr is NULL:
return
if not span:
Expand All @@ -283,12 +312,6 @@ cdef class SampleHandle:
if span._local_root.span_type:
span_type_bytes = ensure_binary_or_empty(span._local_root.span_type)
ddup_push_trace_type(self.ptr, string_view(<const char*>span_type_bytes, len(span_type_bytes)))
if endpoint_collection_enabled:
root_resource_bytes = ensure_binary_or_empty(span._local_root.resource)
ddup_push_trace_resource_container(
self.ptr,
string_view(<const char*>root_resource_bytes, len(root_resource_bytes))
)

def push_monotonic_ns(self, monotonic_ns: int) -> None:
if self.ptr is not NULL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def test():
h.push_task_id(value)
h.push_task_name(name)
h.push_exceptioninfo(exc_type, value)
h.push_span(span, endpoint)
h.push_span(span)
h.push_frame(name, name, value, lineno)
h.flush_sample()
except Exception as e:
Expand Down
17 changes: 14 additions & 3 deletions ddtrace/internal/processor/endpoint_call_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
@dataclass(eq=False)
class EndpointCallCounterProcessor(SpanProcessor):
endpoint_counts: EndpointCountsType = field(default_factory=dict, init=False, repr=False, compare=False)
# Mapping from endpoint to list of span IDs, here we use mapping from
# endpoint to span_ids instead of mapping from span_id to endpoint to
# avoid creating a new string object for each span id.
endpoint_to_span_ids: typing.Dict[str, typing.List[int]] = field(
default_factory=dict, init=False, repr=False, compare=False
)
_endpoint_counts_lock: typing.ContextManager = field(
default_factory=forksafe.Lock, init=False, repr=False, compare=False
)
Expand All @@ -34,12 +40,17 @@ def on_span_finish(self, span):
return
if span._local_root == span and span.span_type == SpanTypes.WEB:
resource = ensure_text(span.resource, errors="backslashreplace")
span_id = span.span_id
with self._endpoint_counts_lock:
self.endpoint_counts[resource] = self.endpoint_counts.get(resource, 0) + 1
if resource not in self.endpoint_to_span_ids:
self.endpoint_to_span_ids[resource] = []
self.endpoint_to_span_ids[resource].append(span_id)

def reset(self):
# type: () -> EndpointCountsType
def reset(self) -> typing.Tuple[EndpointCountsType, typing.Dict[str, typing.List[int]]]:
with self._endpoint_counts_lock:
counts = self.endpoint_counts
self.endpoint_counts = {}
return counts
span_ids = self.endpoint_to_span_ids
self.endpoint_to_span_ids = {}
return counts, span_ids
3 changes: 2 additions & 1 deletion ddtrace/profiling/collector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def periodic(self):
# type: (...) -> None
"""Collect events and push them into the recorder."""
for events in self.collect():
self.recorder.push_events(events)
if self.recorder:
self.recorder.push_events(events)

def collect(self):
# type: (...) -> typing.Iterable[typing.Iterable[event.Event]]
Expand Down
6 changes: 2 additions & 4 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _acquire(self, inner_func, *args, **kwargs):
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span(), self._self_endpoint_collection_enabled)
handle.push_span(self._self_tracer.current_span())
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.flush_sample()
Expand Down Expand Up @@ -214,9 +214,7 @@ def _release(self, inner_func, *args, **kwargs):
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(
self._self_tracer.current_span(), self._self_endpoint_collection_enabled
)
handle.push_span(self._self_tracer.current_span())
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.flush_sample()
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
handle.push_class_name(frames[0].class_name)
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.push_span(span, collect_endpoint)
handle.push_span(span)
handle.flush_sample()
else:
event = stack_event.StackSampleEvent(
Expand Down Expand Up @@ -399,7 +399,7 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
handle.push_class_name(frames[0].class_name)
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.push_span(span, collect_endpoint)
handle.push_span(span)
handle.flush_sample()
else:
exc_event = stack_event.StackExceptionSampleEvent(
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/profiling/exporter/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def export(
} # type: Dict[str, Any]

if self.endpoint_call_counter_span_processor is not None:
event["endpoint_counts"] = self.endpoint_call_counter_span_processor.reset()
event["endpoint_counts"] = self.endpoint_call_counter_span_processor.reset()[0]

content_type, body = self._encode_multipart_formdata(
event=json.dumps(event).encode("utf-8"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
Fixes endpoint profiling when using libdatadog exporter, either with
``DD_PROFILING_EXPORT_LIBDD_ENABLED`` or ``DD_PROFILING_TIMELINE_ENABLED``.

8 changes: 4 additions & 4 deletions tests/tracer/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,9 +613,9 @@ def test_endpoint_call_counter_processor():
processor.on_span_finish(spanNonWeb)
processor.on_span_finish(spanNonLocalRoot)

assert processor.reset() == {"a": 2, "b": 1}
assert processor.reset()[0] == {"a": 2, "b": 1}
# Make sure data has been cleared
assert processor.reset() == {}
assert processor.reset()[0] == {}


def test_endpoint_call_counter_processor_disabled():
Expand All @@ -627,7 +627,7 @@ def test_endpoint_call_counter_processor_disabled():

processor.on_span_finish(spanA)

assert processor.reset() == {}
assert processor.reset()[0] == {}


def test_endpoint_call_counter_processor_real_tracer():
Expand All @@ -651,7 +651,7 @@ def test_endpoint_call_counter_processor_real_tracer():
with tracer.trace("parent", service="top_level_test_service", resource="ignored", span_type=SpanTypes.HTTP):
pass

assert tracer._endpoint_call_counter_span_processor.reset() == {"a": 2, "b": 1}
assert tracer._endpoint_call_counter_span_processor.reset()[0] == {"a": 2, "b": 1}


def test_trace_tag_processor_adds_chunk_root_tags():
Expand Down
Loading