Skip to content

Commit

Permalink
fix(profiling): enable endpoint profiling using libdatadog exporter […
Browse files Browse the repository at this point in the history
…backport-2.12] (#10787)

Manual backport of #10649 to 2.12

[Endpoint
profiling](https://docs.datadoghq.com/profiler/connect_traces_and_profiles/?code-lang=python#break-down-code-performance-by-api-endpoints)
allows you to scope flamegraphs by any endpoint in your service as in
the screenshot. stack v2 needs more work.

<img width="1510" alt="Screenshot 2024-09-19 at 5 18 57 PM"
src="https://github.com/user-attachments/assets/4e0b5cc3-a0e2-4208-9a86-91508d362248">

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance

policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

(cherry picked from commit 6912987)
  • Loading branch information
taegyunkim committed Sep 25, 2024
1 parent 630aaf2 commit 8c029f3
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <map>
#include <stddef.h>
#include <stdint.h>
#include <string_view>
Expand Down Expand Up @@ -31,6 +32,8 @@ extern "C"
bool ddup_is_initialized();
void ddup_start();
void ddup_set_runtime_id(std::string_view runtime_id);
void ddup_profile_set_endpoints(std::map<int64_t, std::string_view> span_ids_to_endpoints);
void ddup_profile_add_endpoint_counts(std::map<std::string_view, int64_t> trace_endpoints_to_counts);
bool ddup_upload();

// Proxy functions to the underlying sample
Expand All @@ -48,10 +51,9 @@ extern "C"
std::string_view thread_name);
void ddup_push_task_id(Datadog::Sample* sample, int64_t task_id);
void ddup_push_task_name(Datadog::Sample* sample, std::string_view task_name);
void ddup_push_span_id(Datadog::Sample* sample, int64_t span_id);
void ddup_push_local_root_span_id(Datadog::Sample* sample, int64_t local_root_span_id);
void ddup_push_span_id(Datadog::Sample* sample, uint64_t span_id);
void ddup_push_local_root_span_id(Datadog::Sample* sample, uint64_t local_root_span_id);
void ddup_push_trace_type(Datadog::Sample* sample, std::string_view trace_type);
void ddup_push_trace_resource_container(Datadog::Sample* sample, std::string_view trace_resource_container);
void ddup_push_exceptioninfo(Datadog::Sample* sample, std::string_view exception_type, int64_t count);
void ddup_push_class_name(Datadog::Sample* sample, std::string_view class_name);
void ddup_push_frame(Datadog::Sample* sample,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ namespace Datadog {
X(span_id, "span id") \
X(local_root_span_id, "local root span id") \
X(trace_type, "trace type") \
X(trace_resource_container, "trace resource container") \
X(trace_endpoint, "trace endpoint") \
X(class_name, "class name") \
X(lock_name, "lock name")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class Sample
bool push_span_id(uint64_t span_id);
bool push_local_root_span_id(uint64_t local_root_span_id);
bool push_trace_type(std::string_view trace_type);
bool push_trace_resource_container(std::string_view trace_resource_container);
bool push_exceptioninfo(std::string_view exception_type, int64_t count);
bool push_class_name(std::string_view class_name);
bool push_monotonic_ns(int64_t monotonic_ns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ ddup_push_task_name(Datadog::Sample* sample, std::string_view task_name) // cppc
}

void
ddup_push_span_id(Datadog::Sample* sample, int64_t span_id) // cppcheck-suppress unusedFunction
ddup_push_span_id(Datadog::Sample* sample, uint64_t span_id) // cppcheck-suppress unusedFunction
{
sample->push_span_id(span_id);
}

void
ddup_push_local_root_span_id(Datadog::Sample* sample, int64_t local_root_span_id) // cppcheck-suppress unusedFunction
ddup_push_local_root_span_id(Datadog::Sample* sample, uint64_t local_root_span_id) // cppcheck-suppress unusedFunction
{
sample->push_local_root_span_id(local_root_span_id);
}
Expand All @@ -235,13 +235,6 @@ ddup_push_trace_type(Datadog::Sample* sample, std::string_view trace_type) // cp
sample->push_trace_type(trace_type);
}

void
ddup_push_trace_resource_container(Datadog::Sample* sample, // cppcheck-suppress unusedFunction
std::string_view trace_resource_container)
{
sample->push_trace_resource_container(trace_resource_container);
}

void
ddup_push_exceptioninfo(Datadog::Sample* sample, // cppcheck-suppress unusedFunction
std::string_view exception_type,
Expand Down Expand Up @@ -314,3 +307,38 @@ ddup_upload() // cppcheck-suppress unusedFunction
}
return success;
}

void
ddup_profile_set_endpoints(
std::map<int64_t, std::string_view> span_ids_to_endpoints) // cppcheck-suppress unusedFunction
{
ddog_prof_Profile& profile = Datadog::Sample::profile_borrow();
for (const auto& [span_id, trace_endpoint] : span_ids_to_endpoints) {
ddog_CharSlice trace_endpoint_slice = Datadog::to_slice(trace_endpoint);
auto res = ddog_prof_Profile_set_endpoint(&profile, span_id, trace_endpoint_slice);
if (!res.ok) {
auto err = res.err;
const std::string errmsg = Datadog::err_to_msg(&err, "Error setting endpoint");
std::cerr << errmsg << std::endl;
ddog_Error_drop(&err);
}
}
Datadog::Sample::profile_release();
}

void
ddup_profile_add_endpoint_counts(std::map<std::string_view, int64_t> trace_endpoints_to_counts)
{
ddog_prof_Profile& profile = Datadog::Sample::profile_borrow();
for (const auto& [trace_endpoint, count] : trace_endpoints_to_counts) {
ddog_CharSlice trace_endpoint_slice = Datadog::to_slice(trace_endpoint);
auto res = ddog_prof_Profile_add_endpoint_count(&profile, trace_endpoint_slice, count);
if (!res.ok) {
auto err = res.err;
const std::string errmsg = Datadog::err_to_msg(&err, "Error adding endpoint count");
std::cerr << errmsg << std::endl;
ddog_Error_drop(&err);
}
}
Datadog::Sample::profile_release();
}
10 changes: 0 additions & 10 deletions ddtrace/internal/datadog/profiling/dd_wrapper/src/sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,6 @@ Datadog::Sample::push_trace_type(std::string_view trace_type)
return true;
}

bool
Datadog::Sample::push_trace_resource_container(std::string_view trace_resource_container)
{
if (!push_label(ExportLabelKey::trace_resource_container, trace_resource_container)) {
std::cout << "bad push" << std::endl;
return false;
}
return true;
}

bool
Datadog::Sample::push_class_name(std::string_view class_name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Datadog::Uploader::upload(ddog_prof_Profile& profile)
ddog_prof_Exporter_Slice_File_empty(),
{ .ptr = &file, .len = 1 },
nullptr,
nullptr,
encoded->endpoints_stats,
nullptr,
nullptr);
ddog_prof_EncodedProfile_drop(encoded);
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/internal/datadog/profiling/ddup/_ddup.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ class SampleHandle:
def push_task_name(self, task_name: StringType) -> None: ...
def push_exceptioninfo(self, exc_type: Union[None, bytes, str, type], count: int) -> None: ...
def push_class_name(self, class_name: StringType) -> None: ...
def push_span(self, span: Optional[Span], endpoint_collection_enabled: bool) -> None: ...
def push_span(self, span: Optional[Span]) -> None: ...
def push_monotonic_ns(self, monotonic_ns: int) -> None: ...
def flush_sample(self) -> None: ...
43 changes: 33 additions & 10 deletions ddtrace/internal/datadog/profiling/ddup/_ddup.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ from typing import Dict
from typing import Optional
from typing import Union

from libcpp.map cimport map
from libcpp.utility cimport pair

import ddtrace
import platform
from .._types import StringType
Expand Down Expand Up @@ -46,6 +49,10 @@ cdef extern from "ddup_interface.hpp":
void ddup_config_sample_type(unsigned int type)

void ddup_start()
void ddup_set_runtime_id(string_view _id)
void ddup_profile_set_endpoints(map[int64_t, string_view] span_ids_to_endpoints)
void ddup_profile_add_endpoint_counts(map[string_view, int64_t] trace_endpoints_to_counts)
bint ddup_upload() nogil

Sample *ddup_start_sample()
void ddup_push_walltime(Sample *sample, int64_t walltime, int64_t count)
Expand All @@ -61,15 +68,12 @@ cdef extern from "ddup_interface.hpp":
void ddup_push_span_id(Sample *sample, uint64_t span_id)
void ddup_push_local_root_span_id(Sample *sample, uint64_t local_root_span_id)
void ddup_push_trace_type(Sample *sample, string_view trace_type)
void ddup_push_trace_resource_container(Sample *sample, string_view trace_resource_container)
void ddup_push_exceptioninfo(Sample *sample, string_view exception_type, int64_t count)
void ddup_push_class_name(Sample *sample, string_view class_name)
void ddup_push_frame(Sample *sample, string_view _name, string_view _filename, uint64_t address, int64_t line)
void ddup_push_monotonic_ns(Sample *sample, int64_t monotonic_ns)
void ddup_flush_sample(Sample *sample)
void ddup_drop_sample(Sample *sample)
void ddup_set_runtime_id(string_view _id)
bint ddup_upload() nogil

# Create wrappers for cython
cdef call_ddup_config_service(bytes service):
Expand Down Expand Up @@ -171,6 +175,31 @@ def start() -> None:
def upload() -> None:
runtime_id = ensure_binary_or_empty(get_runtime_id())
ddup_set_runtime_id(string_view(<const char*>runtime_id, len(runtime_id)))

processor = ddtrace.tracer._endpoint_call_counter_span_processor
endpoint_counts, endpoint_to_span_ids = processor.reset()

cdef map[int64_t, string_view] span_ids_to_endpoints = map[int64_t, string_view]()
for endpoint, span_ids in endpoint_to_span_ids.items():
endpoint_bytes = ensure_binary_or_empty(endpoint)
for span_id in span_ids:
span_ids_to_endpoints.insert(
pair[int64_t, string_view](
clamp_to_uint64_unsigned(span_id),
string_view(<const char*>endpoint_bytes, len(endpoint_bytes))
)
)
ddup_profile_set_endpoints(span_ids_to_endpoints)

cdef map[string_view, int64_t] trace_endpoints_to_counts = map[string_view, int64_t]()
for endpoint, cnt in endpoint_counts.items():
endpoint_bytes = ensure_binary_or_empty(endpoint)
trace_endpoints_to_counts.insert(pair[string_view, int64_t](
string_view(<const char*>endpoint_bytes, len(endpoint_bytes)),
clamp_to_int64_unsigned(cnt)
))
ddup_profile_add_endpoint_counts(trace_endpoints_to_counts)

with nogil:
ddup_upload()

Expand Down Expand Up @@ -269,7 +298,7 @@ cdef class SampleHandle:
class_name_bytes = ensure_binary_or_empty(class_name)
ddup_push_class_name(self.ptr, string_view(<const char*>class_name_bytes, len(class_name_bytes)))

def push_span(self, span: Optional[Span], endpoint_collection_enabled: bool) -> None:
def push_span(self, span: Optional[Span]) -> None:
if self.ptr is NULL:
return
if not span:
Expand All @@ -283,12 +312,6 @@ cdef class SampleHandle:
if span._local_root.span_type:
span_type_bytes = ensure_binary_or_empty(span._local_root.span_type)
ddup_push_trace_type(self.ptr, string_view(<const char*>span_type_bytes, len(span_type_bytes)))
if endpoint_collection_enabled:
root_resource_bytes = ensure_binary_or_empty(span._local_root.resource)
ddup_push_trace_resource_container(
self.ptr,
string_view(<const char*>root_resource_bytes, len(root_resource_bytes))
)

def push_monotonic_ns(self, monotonic_ns: int) -> None:
if self.ptr is not NULL:
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/internal/datadog/profiling/ddup/test/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def test():
h.push_task_id(value)
h.push_task_name(name)
h.push_exceptioninfo(exc_type, value)
h.push_span(span, endpoint)
h.push_span(span)
h.push_frame(name, name, value, lineno)
h.flush_sample()
except Exception as e:
Expand Down
17 changes: 14 additions & 3 deletions ddtrace/internal/processor/endpoint_call_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
@dataclass(eq=False)
class EndpointCallCounterProcessor(SpanProcessor):
endpoint_counts: EndpointCountsType = field(default_factory=dict, init=False, repr=False, compare=False)
# Mapping from endpoint to list of span IDs, here we use mapping from
# endpoint to span_ids instead of mapping from span_id to endpoint to
# avoid creating a new string object for each span id.
endpoint_to_span_ids: typing.Dict[str, typing.List[int]] = field(
default_factory=dict, init=False, repr=False, compare=False
)
_endpoint_counts_lock: typing.ContextManager = field(
default_factory=forksafe.Lock, init=False, repr=False, compare=False
)
Expand All @@ -34,12 +40,17 @@ def on_span_finish(self, span):
return
if span._local_root == span and span.span_type == SpanTypes.WEB:
resource = ensure_text(span.resource, errors="backslashreplace")
span_id = span.span_id
with self._endpoint_counts_lock:
self.endpoint_counts[resource] = self.endpoint_counts.get(resource, 0) + 1
if resource not in self.endpoint_to_span_ids:
self.endpoint_to_span_ids[resource] = []
self.endpoint_to_span_ids[resource].append(span_id)

def reset(self):
# type: () -> EndpointCountsType
def reset(self) -> typing.Tuple[EndpointCountsType, typing.Dict[str, typing.List[int]]]:
with self._endpoint_counts_lock:
counts = self.endpoint_counts
self.endpoint_counts = {}
return counts
span_ids = self.endpoint_to_span_ids
self.endpoint_to_span_ids = {}
return counts, span_ids
3 changes: 2 additions & 1 deletion ddtrace/profiling/collector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def periodic(self):
# type: (...) -> None
"""Collect events and push them into the recorder."""
for events in self.collect():
self.recorder.push_events(events)
if self.recorder:
self.recorder.push_events(events)

def collect(self):
# type: (...) -> typing.Iterable[typing.Iterable[event.Event]]
Expand Down
6 changes: 2 additions & 4 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _acquire(self, inner_func, *args, **kwargs):
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span(), self._self_endpoint_collection_enabled)
handle.push_span(self._self_tracer.current_span())
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.flush_sample()
Expand Down Expand Up @@ -214,9 +214,7 @@ def _release(self, inner_func, *args, **kwargs):
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(
self._self_tracer.current_span(), self._self_endpoint_collection_enabled
)
handle.push_span(self._self_tracer.current_span())
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.flush_sample()
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
handle.push_class_name(frames[0].class_name)
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.push_span(span, collect_endpoint)
handle.push_span(span)
handle.flush_sample()
else:
event = stack_event.StackSampleEvent(
Expand Down Expand Up @@ -399,7 +399,7 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim
handle.push_class_name(frames[0].class_name)
for frame in frames:
handle.push_frame(frame.function_name, frame.file_name, 0, frame.lineno)
handle.push_span(span, collect_endpoint)
handle.push_span(span)
handle.flush_sample()
else:
exc_event = stack_event.StackExceptionSampleEvent(
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/profiling/exporter/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def export(
} # type: Dict[str, Any]

if self.endpoint_call_counter_span_processor is not None:
event["endpoint_counts"] = self.endpoint_call_counter_span_processor.reset()
event["endpoint_counts"] = self.endpoint_call_counter_span_processor.reset()[0]

content_type, body = self._encode_multipart_formdata(
event=json.dumps(event).encode("utf-8"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
Fixes endpoint profiling when using libdatadog exporter, either with
``DD_PROFILING_EXPORT_LIBDD_ENABLED`` or ``DD_PROFILING_TIMELINE_ENABLED``.
8 changes: 4 additions & 4 deletions tests/tracer/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,9 +613,9 @@ def test_endpoint_call_counter_processor():
processor.on_span_finish(spanNonWeb)
processor.on_span_finish(spanNonLocalRoot)

assert processor.reset() == {"a": 2, "b": 1}
assert processor.reset()[0] == {"a": 2, "b": 1}
# Make sure data has been cleared
assert processor.reset() == {}
assert processor.reset()[0] == {}


def test_endpoint_call_counter_processor_disabled():
Expand All @@ -627,7 +627,7 @@ def test_endpoint_call_counter_processor_disabled():

processor.on_span_finish(spanA)

assert processor.reset() == {}
assert processor.reset()[0] == {}


def test_endpoint_call_counter_processor_real_tracer():
Expand All @@ -651,7 +651,7 @@ def test_endpoint_call_counter_processor_real_tracer():
with tracer.trace("parent", service="top_level_test_service", resource="ignored", span_type=SpanTypes.HTTP):
pass

assert tracer._endpoint_call_counter_span_processor.reset() == {"a": 2, "b": 1}
assert tracer._endpoint_call_counter_span_processor.reset()[0] == {"a": 2, "b": 1}


def test_trace_tag_processor_adds_chunk_root_tags():
Expand Down

0 comments on commit 8c029f3

Please sign in to comment.