Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(celery): close celery.apply spans even without after_task_publish, when using apply_async #10676

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7baa331
fix(celery) close apply_async spans even when the after_task_publish_…
wantsui Sep 16, 2024
3d21563
Update format.
wantsui Sep 16, 2024
b067390
Merge branch 'main' into close-apply-async-celery-spans
wantsui Sep 17, 2024
64abe8f
Add celery.app.task.Task to the Celery Base Test Case
wantsui Sep 18, 2024
f0c2892
Remove pin from celery.app.task.Task and update patch and tests accor…
wantsui Sep 18, 2024
e5d124f
Comment out new closing span changes and add a test showing that if t…
wantsui Sep 19, 2024
a9a234e
Add new change to close celery.apply spans back, with debug log messages
wantsui Sep 19, 2024
c0d387d
Simplify conditions in utils and test.
wantsui Sep 19, 2024
f4400ee
Merge branch 'main' into close-apply-async-celery-spans
wantsui Sep 19, 2024
18af2fa
Remove unused sys import.
wantsui Sep 19, 2024
5783790
Fix typo.
wantsui Sep 19, 2024
16e4424
Apply formatting suggestions.
wantsui Sep 19, 2024
444b074
Fix more formatting.
wantsui Sep 20, 2024
40293ec
Add release note describing the fix to celery.apply.
wantsui Sep 20, 2024
7f54148
Add additional assertions for span metadata.
wantsui Sep 20, 2024
0606be9
Merge branch 'main' into close-apply-async-celery-spans
wantsui Sep 20, 2024
2c2db93
Fix typo.
wantsui Sep 20, 2024
deadb75
Add clarifying comments
wantsui Sep 20, 2024
7988da8
Fix typo.
wantsui Sep 20, 2024
17e521a
Update releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188…
wantsui Sep 23, 2024
b4556d2
Fix bug in retrieve_task_id that fails to check the body for the task…
wantsui Sep 26, 2024
9bd7ed7
Revert "Fix bug in retrieve_task_id that fails to check the body for …
wantsui Sep 27, 2024
5b96ae7
Merge branch 'main' into close-apply-async-celery-spans
wantsui Sep 27, 2024
9548914
Merge branch 'main' into close-apply-async-celery-spans
wantsui Sep 30, 2024
c289c4a
Update apply_async to capture errors on the span and report them.
wantsui Sep 30, 2024
dba09d0
Merge branch 'main' into close-apply-async-celery-spans
wantsui Sep 30, 2024
ab37114
Update release note.
wantsui Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
from ddtrace.contrib.internal.celery.signals import trace_retry
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.pin import _DD_PIN_NAME


log = get_logger(__name__)


def patch_app(app, pin=None):
"""Attach the Pin class to the application and connect
our handlers to Celery signals.
Expand All @@ -41,6 +46,9 @@ def patch_app(app, pin=None):
trace_utils.wrap("celery.beat", "Scheduler.tick", _traced_beat_function(config.celery, "tick"))
pin.onto(celery.beat.Scheduler)

# Patch apply_async
trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async"))

# connect to the Signal framework
signals.task_prerun.connect(trace_prerun, weak=False)
signals.task_postrun.connect(trace_postrun, weak=False)
Expand All @@ -65,6 +73,7 @@ def unpatch_app(app):

trace_utils.unwrap(celery.beat.Scheduler, "apply_entry")
trace_utils.unwrap(celery.beat.Scheduler, "tick")
trace_utils.unwrap(celery.app.task.Task, "apply_async")

signals.task_prerun.disconnect(trace_prerun)
signals.task_postrun.disconnect(trace_postrun)
Expand Down Expand Up @@ -96,3 +105,39 @@ def _traced_beat_inner(func, instance, args, kwargs):
return func(*args, **kwargs)

return _traced_beat_inner


def _traced_apply_async_function(integration_config, fn_name, resource_fn=None):
tabgok marked this conversation as resolved.
Show resolved Hide resolved
"""
When apply_async is called, it calls various Celery signals in order, which gets used
to start and close the span.
Example: before_task_publish starts the span while after_task_publish closes the span.
If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the
closing signals.
The purpose of _traced_apply_async_function is to close the spans even if one of the closing
signals don't get called over the course of the apply_task lifecycle.
This is done by fetching the stored span and closing it if it hasn't already been closed by a
closing signal.
"""

def _traced_apply_async_inner(func, instance, args, kwargs):
with core.context_with_data("task_context"):
try:
return func(*args, **kwargs)
finally:
task_span = core.get_item("task_span")
if task_span:
log.debug(
"The after_task_publish signal was not called, so manually closing span: %s",
task_span._pprint(),
)
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
log.debug(
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
)
prerun_span.finish()

return _traced_apply_async_inner
7 changes: 7 additions & 0 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import net
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.logger import get_logger
from ddtrace.propagation.http import HTTPPropagator
Expand Down Expand Up @@ -48,6 +49,9 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down Expand Up @@ -111,6 +115,9 @@ def trace_before_publish(*args, **kwargs):
service = config.celery["producer_service_name"]
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name)

# Store an item called "task span" in case after_task_publish doesn't get called
core.set_item("task_span", span)

span.set_tag_str(COMPONENT, config.celery.integration_name)

# set span.kind to the type of request being performed
Expand Down
7 changes: 5 additions & 2 deletions ddtrace/contrib/internal/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,12 @@ def retrieve_task_id(context):
"""
headers = context.get("headers")
body = context.get("body")
if headers:
# Check if the id is in the headers, then check the body for it.
# If we don't check the id first, we could wrongly assume no task_id
# when the task_id is in the body.
if headers and 'id' in headers:
# Protocol Version 2 (default from Celery 4.0)
return headers.get("id")
else:
elif body and 'id' in body:
wantsui marked this conversation as resolved.
Show resolved Hide resolved
# Protocol Version 1
return body.get("id")
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task.
23 changes: 23 additions & 0 deletions tests/contrib/celery/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import celery
from celery.exceptions import Retry
import mock
import pytest

from ddtrace import Pin
Expand Down Expand Up @@ -441,6 +442,28 @@ def run(self):
assert span.get_tag("span.kind") == "consumer"
assert span.error == 0

@mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError))
def test_fn_task_apply_async_soft_exception(self):
# If the underlying library runs into an exception that doesn't crash the app,
# we should still close the span even if the closing signals didn't get
# called

@self.app.task
def fn_task_parameters(user, force_logout=False):
return (user, force_logout)

try:
t = fn_task_parameters.apply_async(args=["user"], kwargs={"force_logout": True})
t.get(timeout=self.ASYNC_GET_TIMEOUT)
except ValueError:
traces = self.pop_traces()
assert 1 == len(traces)
assert traces[0][0].name == "celery.apply"
assert traces[0][0].resource == "tests.contrib.celery.test_integration.fn_task_parameters"
assert traces[0][0].get_tag("celery.action") == "apply_async"
assert traces[0][0].get_tag("component") == "celery"
assert traces[0][0].get_tag("span.kind") == "producer"

def test_shared_task(self):
# Ensure Django Shared Task are supported
@celery.shared_task
Expand Down
Loading