From 7baa3314bcb05f534f4497056440cc1476e48242 Mon Sep 17 00:00:00 2001 From: wan Date: Mon, 16 Sep 2024 16:58:30 -0400 Subject: [PATCH 01/21] fix(celery) close apply_async spans even when the after_task_publish_signal does not get called --- ddtrace/contrib/internal/celery/app.py | 18 +++++++++++++++++- ddtrace/contrib/internal/celery/signals.py | 4 ++++ ddtrace/contrib/internal/celery/utils.py | 5 ++++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 58dde801ea4..9c2ba90aa21 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -15,9 +15,9 @@ from ddtrace.contrib.internal.celery.signals import trace_retry from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes +from ddtrace.internal import core from ddtrace.pin import _DD_PIN_NAME - def patch_app(app, pin=None): """Attach the Pin class to the application and connect our handlers to Celery signals. @@ -41,6 +41,10 @@ def patch_app(app, pin=None): trace_utils.wrap("celery.beat", "Scheduler.tick", _traced_beat_function(config.celery, "tick")) pin.onto(celery.beat.Scheduler) + # Patch apply_async + trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) + pin.onto(celery.app.task.Task) + # connect to the Signal framework signals.task_prerun.connect(trace_prerun, weak=False) signals.task_postrun.connect(trace_postrun, weak=False) @@ -96,3 +100,15 @@ def _traced_beat_inner(func, instance, args, kwargs): return func(*args, **kwargs) return _traced_beat_inner + +def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): + def _traced_apply_async_inner(func, instance, args, kwargs): + with core.context_with_data('task_context'): + try: + return func(*args, **kwargs) + finally: + task_span = core.get_item('task_span') + if task_span: + task_span.finish() + + return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index 50e9b6e6a7c..e6f4e4c8c30 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -16,6 +16,7 @@ from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes from ddtrace.ext import net +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ddtrace.internal.logger import get_logger from ddtrace.propagation.http import HTTPPropagator @@ -111,6 +112,9 @@ def trace_before_publish(*args, **kwargs): service = config.celery["producer_service_name"] span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name) + # Store an item called "task span" in case AFTER_TASK_PUBLISH doesn't get called + core.set_item("task_span", span) + span.set_tag_str(COMPONENT, config.celery.integration_name) # set span.kind to the type of request being performed diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index 80a4046f191..25d3ac7de97 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -131,6 +131,9 @@ def retrieve_task_id(context): if headers: # Protocol Version 2 (default from Celery 4.0) return headers.get("id") - else: + elif body: # Protocol Version 1 return body.get("id") + else: + # Do not run body.get("id") if the body is None, instead, return a None value + return None From 3d21563bf22de22fcd9454db39d437457c418e73 Mon Sep 17 00:00:00 2001 From: wan Date: Mon, 16 Sep 2024 17:34:34 -0400 Subject: [PATCH 02/21] Update format. --- ddtrace/contrib/internal/celery/app.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 9c2ba90aa21..41cfcb89a99 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -18,6 +18,7 @@ from ddtrace.internal import core from ddtrace.pin import _DD_PIN_NAME + def patch_app(app, pin=None): """Attach the Pin class to the application and connect our handlers to Celery signals. @@ -101,13 +102,14 @@ def _traced_beat_inner(func, instance, args, kwargs): return _traced_beat_inner + def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): def _traced_apply_async_inner(func, instance, args, kwargs): - with core.context_with_data('task_context'): + with core.context_with_data("task_context"): try: return func(*args, **kwargs) finally: - task_span = core.get_item('task_span') + task_span = core.get_item("task_span") if task_span: task_span.finish() From 64abe8f837a775ad0f8d3567a0fb69954c406cee Mon Sep 17 00:00:00 2001 From: wan Date: Wed, 18 Sep 2024 13:53:50 -0400 Subject: [PATCH 03/21] Add celery.app.task.Task to the Celery Base Test Case --- tests/contrib/celery/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/contrib/celery/base.py b/tests/contrib/celery/base.py index 65c1348ac49..f46f0686dfb 100644 --- a/tests/contrib/celery/base.py +++ b/tests/contrib/celery/base.py @@ -85,6 +85,7 @@ def setUp(self): # override pins to use our Dummy Tracer Pin.override(self.app, tracer=self.tracer) Pin.override(celery.beat.Scheduler, tracer=self.tracer) + Pin.override(celery.app.task.Task, tracer=self.tracer) def tearDown(self): self.app = None From f0c289294b26a146cf9e865a74d85aa72fd63209 Mon Sep 17 00:00:00 2001 From: wan Date: Wed, 18 Sep 2024 14:47:29 -0400 Subject: [PATCH 04/21] Remove pin from celery.app.task.Task and update patch and tests accordingly. --- ddtrace/contrib/internal/celery/app.py | 7 ++++++- ddtrace/contrib/internal/celery/signals.py | 5 ++++- tests/contrib/celery/base.py | 1 - 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 41cfcb89a99..c8a13155081 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -44,7 +44,6 @@ def patch_app(app, pin=None): # Patch apply_async trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) - pin.onto(celery.app.task.Task) # connect to the Signal framework signals.task_prerun.connect(trace_prerun, weak=False) @@ -70,6 +69,7 @@ def unpatch_app(app): trace_utils.unwrap(celery.beat.Scheduler, "apply_entry") trace_utils.unwrap(celery.beat.Scheduler, "tick") + trace_utils.unwrap(celery.app.task.Task, "apply_async") signals.task_prerun.disconnect(trace_prerun) signals.task_postrun.disconnect(trace_postrun) @@ -105,6 +105,7 @@ def _traced_beat_inner(func, instance, args, kwargs): def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): def _traced_apply_async_inner(func, instance, args, kwargs): + with core.context_with_data("task_context"): try: return func(*args, **kwargs) @@ -113,4 +114,8 @@ def _traced_apply_async_inner(func, instance, args, kwargs): if task_span: task_span.finish() + prerun_span = core.get_item("prerun_span") + if prerun_span: + prerun_span.finish() + return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index e6f4e4c8c30..6341bed9bbf 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -49,6 +49,9 @@ def trace_prerun(*args, **kwargs): service = config.celery["worker_service_name"] span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER) + # Store an item called "prerun span" in case task_postrun doesn't get called + core.set_item("prerun_span", span) + # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) @@ -112,7 +115,7 @@ def trace_before_publish(*args, **kwargs): service = config.celery["producer_service_name"] span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name) - # Store an item called "task span" in case AFTER_TASK_PUBLISH doesn't get called + # Store an item called "task span" in case after_task_publish doesn't get called core.set_item("task_span", span) span.set_tag_str(COMPONENT, config.celery.integration_name) diff --git a/tests/contrib/celery/base.py b/tests/contrib/celery/base.py index f46f0686dfb..65c1348ac49 100644 --- a/tests/contrib/celery/base.py +++ b/tests/contrib/celery/base.py @@ -85,7 +85,6 @@ def setUp(self): # override pins to use our Dummy Tracer Pin.override(self.app, tracer=self.tracer) Pin.override(celery.beat.Scheduler, tracer=self.tracer) - Pin.override(celery.app.task.Task, tracer=self.tracer) def tearDown(self): self.app = None From e5d124f021513611c591c142acca65f5e85837ea Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 19 Sep 2024 15:47:24 -0400 Subject: [PATCH 05/21] Comment out new closing span changes and add a test showing that if the after task publish signal is not called, there are no spans at the moment so the test fails. --- ddtrace/contrib/internal/celery/app.py | 32 +++++++++++++----------- tests/contrib/celery/test_integration.py | 24 ++++++++++++++++++ 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index c8a13155081..bdc6fe9269b 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -43,7 +43,7 @@ def patch_app(app, pin=None): pin.onto(celery.beat.Scheduler) # Patch apply_async - trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) + # trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) # connect to the Signal framework signals.task_prerun.connect(trace_prerun, weak=False) @@ -69,7 +69,7 @@ def unpatch_app(app): trace_utils.unwrap(celery.beat.Scheduler, "apply_entry") trace_utils.unwrap(celery.beat.Scheduler, "tick") - trace_utils.unwrap(celery.app.task.Task, "apply_async") + # trace_utils.unwrap(celery.app.task.Task, "apply_async") signals.task_prerun.disconnect(trace_prerun) signals.task_postrun.disconnect(trace_postrun) @@ -103,19 +103,21 @@ def _traced_beat_inner(func, instance, args, kwargs): return _traced_beat_inner -def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): - def _traced_apply_async_inner(func, instance, args, kwargs): +# def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): +# def _traced_apply_async_inner(func, instance, args, kwargs): - with core.context_with_data("task_context"): - try: - return func(*args, **kwargs) - finally: - task_span = core.get_item("task_span") - if task_span: - task_span.finish() +# with core.context_with_data("task_context"): +# try: +# return func(*args, **kwargs) +# finally: +# task_span = core.get_item("task_span") +# if task_span: +# print("Post Task Publish signal was not called") +# task_span.finish() - prerun_span = core.get_item("prerun_span") - if prerun_span: - prerun_span.finish() +# prerun_span = core.get_item("prerun_span") +# if prerun_span: +# print("Post Run signal was not called") +# prerun_span.finish() - return _traced_apply_async_inner +# return _traced_apply_async_inner diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 9c89c90e920..5d56bdaff07 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -1,5 +1,6 @@ from collections import Counter import os +import mock import socket import subprocess from time import sleep @@ -441,6 +442,29 @@ def run(self): assert span.get_tag("span.kind") == "consumer" assert span.error == 0 + @mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError)) + def test_fn_task_apply_async_soft_exception(self): + # If the underlying library runs into an exception that doesn't crash the app, + # we should still close the span even if the after_task_publish signal didn't get + # called + + @self.app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + try: + t = fn_task_parameters.apply_async(args=["user"], kwargs={"force_logout": True}) + t.get(timeout=self.ASYNC_GET_TIMEOUT) + except ValueError: + traces = self.pop_traces() + if self.ASYNC_USE_CELERY_FIXTURES: + assert 1 == len(traces) + assert traces[0][0].name == 'celery.apply' + else: + assert 1 == len(traces) + assert traces[0][0].name == 'celery.apply' + + def test_shared_task(self): # Ensure Django Shared Task are supported @celery.shared_task From a9a234e25433844d176f6af76b4eadeb8b1b691a Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 19 Sep 2024 17:13:39 -0400 Subject: [PATCH 06/21] Add new change to close celery.apply spans back, with debug log messages --- ddtrace/contrib/internal/celery/app.py | 37 ++++++++++++++------------ 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index bdc6fe9269b..c8c1f6d1779 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -1,4 +1,5 @@ import celery +import sys from celery import signals from ddtrace import Pin @@ -16,8 +17,10 @@ from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes from ddtrace.internal import core +from ddtrace.internal.logger import get_logger from ddtrace.pin import _DD_PIN_NAME +log = get_logger(__name__) def patch_app(app, pin=None): """Attach the Pin class to the application and connect @@ -43,7 +46,7 @@ def patch_app(app, pin=None): pin.onto(celery.beat.Scheduler) # Patch apply_async - # trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) + trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) # connect to the Signal framework signals.task_prerun.connect(trace_prerun, weak=False) @@ -69,7 +72,7 @@ def unpatch_app(app): trace_utils.unwrap(celery.beat.Scheduler, "apply_entry") trace_utils.unwrap(celery.beat.Scheduler, "tick") - # trace_utils.unwrap(celery.app.task.Task, "apply_async") + trace_utils.unwrap(celery.app.task.Task, "apply_async") signals.task_prerun.disconnect(trace_prerun) signals.task_postrun.disconnect(trace_postrun) @@ -103,21 +106,21 @@ def _traced_beat_inner(func, instance, args, kwargs): return _traced_beat_inner -# def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): -# def _traced_apply_async_inner(func, instance, args, kwargs): +def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): + def _traced_apply_async_inner(func, instance, args, kwargs): -# with core.context_with_data("task_context"): -# try: -# return func(*args, **kwargs) -# finally: -# task_span = core.get_item("task_span") -# if task_span: -# print("Post Task Publish signal was not called") -# task_span.finish() + with core.context_with_data("task_context"): + try: + return func(*args, **kwargs) + finally: + task_span = core.get_item("task_span") + if task_span: + log.debug("The after_task_publish signal was not called, so manually closing span: %s", task_span._pprint()) + task_span.finish() -# prerun_span = core.get_item("prerun_span") -# if prerun_span: -# print("Post Run signal was not called") -# prerun_span.finish() + prerun_span = core.get_item("prerun_span") + if prerun_span: + log.debug("The task_postrun signal was not calle, so manually closing span: %s", prerun_span._pprint()) + prerun_span.finish() -# return _traced_apply_async_inner + return _traced_apply_async_inner From c0d387d14e67f627f259fc6ab2a90898f373077d Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 19 Sep 2024 17:18:47 -0400 Subject: [PATCH 07/21] Simplify conditions in utils and test. --- ddtrace/contrib/internal/celery/utils.py | 5 +---- tests/contrib/celery/test_integration.py | 8 ++------ 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index 25d3ac7de97..b2498852c7b 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -133,7 +133,4 @@ def retrieve_task_id(context): return headers.get("id") elif body: # Protocol Version 1 - return body.get("id") - else: - # Do not run body.get("id") if the body is None, instead, return a None value - return None + return body.get("id") \ No newline at end of file diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 5d56bdaff07..68bb49edb88 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -457,12 +457,8 @@ def fn_task_parameters(user, force_logout=False): t.get(timeout=self.ASYNC_GET_TIMEOUT) except ValueError: traces = self.pop_traces() - if self.ASYNC_USE_CELERY_FIXTURES: - assert 1 == len(traces) - assert traces[0][0].name == 'celery.apply' - else: - assert 1 == len(traces) - assert traces[0][0].name == 'celery.apply' + assert 1 == len(traces) + assert traces[0][0].name == 'celery.apply' def test_shared_task(self): From 18af2fad864f623883f1cc27ba0e0b7d9eda6573 Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 19 Sep 2024 17:33:21 -0400 Subject: [PATCH 08/21] Remove unused sys import. --- ddtrace/contrib/internal/celery/app.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index c8c1f6d1779..ff19b9010d2 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -1,5 +1,4 @@ import celery -import sys from celery import signals from ddtrace import Pin From 57837903647a0c1e49e51c502e19fefe8cdbb41d Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 19 Sep 2024 17:48:37 -0400 Subject: [PATCH 09/21] Fix typo. --- ddtrace/contrib/internal/celery/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index ff19b9010d2..6507a232da9 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -119,7 +119,7 @@ def _traced_apply_async_inner(func, instance, args, kwargs): prerun_span = core.get_item("prerun_span") if prerun_span: - log.debug("The task_postrun signal was not calle, so manually closing span: %s", prerun_span._pprint()) + log.debug("The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()) prerun_span.finish() return _traced_apply_async_inner From 16e442446e2013f83df840f44a1d8e28b4c5bcb7 Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 19 Sep 2024 17:58:18 -0400 Subject: [PATCH 10/21] Apply formatting suggestions. --- ddtrace/contrib/internal/celery/app.py | 10 ++++++++-- ddtrace/contrib/internal/celery/utils.py | 2 +- tests/contrib/celery/test_integration.py | 3 +-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 6507a232da9..3062c9d27b4 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -21,6 +21,7 @@ log = get_logger(__name__) + def patch_app(app, pin=None): """Attach the Pin class to the application and connect our handlers to Celery signals. @@ -114,12 +115,17 @@ def _traced_apply_async_inner(func, instance, args, kwargs): finally: task_span = core.get_item("task_span") if task_span: - log.debug("The after_task_publish signal was not called, so manually closing span: %s", task_span._pprint()) + log.debug( + "The after_task_publish signal was not called, so manually closing span: %s", + task_span._pprint(), + ) task_span.finish() prerun_span = core.get_item("prerun_span") if prerun_span: - log.debug("The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()) + log.debug( + "The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint() + ) prerun_span.finish() return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index b2498852c7b..2cbb8fa31ee 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -133,4 +133,4 @@ def retrieve_task_id(context): return headers.get("id") elif body: # Protocol Version 1 - return body.get("id") \ No newline at end of file + return body.get("id") diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 68bb49edb88..ddbb0d2d519 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -458,8 +458,7 @@ def fn_task_parameters(user, force_logout=False): except ValueError: traces = self.pop_traces() assert 1 == len(traces) - assert traces[0][0].name == 'celery.apply' - + assert traces[0][0].name == "celery.apply" def test_shared_task(self): # Ensure Django Shared Task are supported From 444b074bfa5bc4ca9a94576a1a9c20a0600d8ce1 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 20 Sep 2024 10:17:45 -0400 Subject: [PATCH 11/21] Fix more formatting. --- ddtrace/contrib/internal/celery/app.py | 2 +- tests/contrib/celery/test_integration.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 3062c9d27b4..d145be3605f 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -19,6 +19,7 @@ from ddtrace.internal.logger import get_logger from ddtrace.pin import _DD_PIN_NAME + log = get_logger(__name__) @@ -108,7 +109,6 @@ def _traced_beat_inner(func, instance, args, kwargs): def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): def _traced_apply_async_inner(func, instance, args, kwargs): - with core.context_with_data("task_context"): try: return func(*args, **kwargs) diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index ddbb0d2d519..574db5df330 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -1,12 +1,12 @@ from collections import Counter import os -import mock import socket import subprocess from time import sleep import celery from celery.exceptions import Retry +import mock import pytest from ddtrace import Pin From 40293ec33f94d8fb918b8e27fd6f6746f53a3b88 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 20 Sep 2024 10:46:49 -0400 Subject: [PATCH 12/21] Add release note describing the fix to celery.apply. --- .../fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml diff --git a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml new file mode 100644 index 00000000000..a0c7f5382a7 --- /dev/null +++ b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing(celery): Fixes an issue where `celery.apply` spans didn't close if the after_task_publish or task_postrun signals didn't get called. \ No newline at end of file From 7f54148ba74e9805ee89b1bc22cdaebc9b8ac8a8 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 20 Sep 2024 11:05:36 -0400 Subject: [PATCH 13/21] Add additional assertions for span metadata. --- tests/contrib/celery/test_integration.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 574db5df330..e662fe875a8 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -445,7 +445,7 @@ def run(self): @mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError)) def test_fn_task_apply_async_soft_exception(self): # If the underlying library runs into an exception that doesn't crash the app, - # we should still close the span even if the after_task_publish signal didn't get + # we should still close the span even if the closing signals didn't get # called @self.app.task @@ -459,6 +459,10 @@ def fn_task_parameters(user, force_logout=False): traces = self.pop_traces() assert 1 == len(traces) assert traces[0][0].name == "celery.apply" + assert traces[0][0].resource == "tests.contrib.celery.test_integration.fn_task_parameters" + assert traces[0][0].get_tag("celery.action") == "apply_async" + assert span.get_tag("component") == "celery" + assert span.get_tag("span.kind") == "producer" def test_shared_task(self): # Ensure Django Shared Task are supported From 2c2db931c7fc3f52f71828354c2aa3325231caf1 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 20 Sep 2024 11:15:41 -0400 Subject: [PATCH 14/21] Fix typo. --- tests/contrib/celery/test_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index e662fe875a8..41e80cb8cf2 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -461,8 +461,8 @@ def fn_task_parameters(user, force_logout=False): assert traces[0][0].name == "celery.apply" assert traces[0][0].resource == "tests.contrib.celery.test_integration.fn_task_parameters" assert traces[0][0].get_tag("celery.action") == "apply_async" - assert span.get_tag("component") == "celery" - assert span.get_tag("span.kind") == "producer" + assert traces[0][0].get_tag("component") == "celery" + assert traces[0][0].get_tag("span.kind") == "producer" def test_shared_task(self): # Ensure Django Shared Task are supported From deadb757d7a3b996078aabf1aa24b86c992a7cf3 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 20 Sep 2024 12:32:23 -0400 Subject: [PATCH 15/21] Add clarifying comments --- ddtrace/contrib/internal/celery/app.py | 12 ++++++++++++ ...lery-apply-async-span-close-b7a8db188459f5b5.yaml | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index d145be3605f..12249ab617e 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -108,6 +108,18 @@ def _traced_beat_inner(func, instance, args, kwargs): def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): + """ + When apply_async is called, it calls various Celery signals in order, which gets used + to start and close the span. + Example: before_task_publish starts the span while after_task_publish closes the span. + If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the + closing signals. + The purpse of _traced_apply_async_function is to close the spans even if one of the closing + signals don't get called over the course of the apply_task lifecycle. + This is done by fetching the stored span and closing it if it hasn't already been closed by a + closing signal. + """ + def _traced_apply_async_inner(func, instance, args, kwargs): with core.context_with_data("task_context"): try: diff --git a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml index a0c7f5382a7..3d93be06569 100644 --- a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml +++ b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml @@ -1,4 +1,4 @@ --- fixes: - | - tracing(celery): Fixes an issue where `celery.apply` spans didn't close if the after_task_publish or task_postrun signals didn't get called. \ No newline at end of file + tracing(celery): Fixes an issue where `celery.apply` spans didn't close if the after_task_publish or task_postrun signals didn't get called when using `apply_async`, which can happen if there is an internal exception during the handling of the task. \ No newline at end of file From 7988da883bf3cae305d453500e5b5093ce21d7d4 Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 20 Sep 2024 13:30:07 -0400 Subject: [PATCH 16/21] Fix typo. --- ddtrace/contrib/internal/celery/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 12249ab617e..4a4c59e918b 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -114,7 +114,7 @@ def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): Example: before_task_publish starts the span while after_task_publish closes the span. If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the closing signals. - The purpse of _traced_apply_async_function is to close the spans even if one of the closing + The purpose of _traced_apply_async_function is to close the spans even if one of the closing signals don't get called over the course of the apply_task lifecycle. This is done by fetching the stored span and closing it if it hasn't already been closed by a closing signal. From 17e521acb173bcd93d222ce7c0e75ee03d7e4e53 Mon Sep 17 00:00:00 2001 From: wantsui Date: Mon, 23 Sep 2024 11:06:57 -0400 Subject: [PATCH 17/21] Update releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com> --- .../fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml index 3d93be06569..554d1efc18d 100644 --- a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml +++ b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml @@ -1,4 +1,4 @@ --- fixes: - | - tracing(celery): Fixes an issue where `celery.apply` spans didn't close if the after_task_publish or task_postrun signals didn't get called when using `apply_async`, which can happen if there is an internal exception during the handling of the task. \ No newline at end of file + tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task. \ No newline at end of file From b4556d21e099030c8344bcd68b43e96df11f2346 Mon Sep 17 00:00:00 2001 From: wan Date: Thu, 26 Sep 2024 17:03:18 -0400 Subject: [PATCH 18/21] Fix bug in retrieve_task_id that fails to check the body for the task id when headers has data (but no task id), which wrongly kept spans open. --- ddtrace/contrib/internal/celery/utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index 2cbb8fa31ee..ee803439629 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -128,9 +128,12 @@ def retrieve_task_id(context): """ headers = context.get("headers") body = context.get("body") - if headers: + # Check if the id is in the headers, then check the body for it. + # If we don't check the id first, we could wrongly assume no task_id + # when the task_id is in the body. + if headers and 'id' in headers: # Protocol Version 2 (default from Celery 4.0) return headers.get("id") - elif body: + elif body and 'id' in body: # Protocol Version 1 return body.get("id") From 9bd7ed75f196e8bbe47506e5239a3dd4bc5ec11b Mon Sep 17 00:00:00 2001 From: wan Date: Fri, 27 Sep 2024 09:53:20 -0400 Subject: [PATCH 19/21] Revert "Fix bug in retrieve_task_id that fails to check the body for the task id when headers has data (but no task id), which wrongly kept spans open." This reverts commit b4556d21e099030c8344bcd68b43e96df11f2346. --- ddtrace/contrib/internal/celery/utils.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ddtrace/contrib/internal/celery/utils.py b/ddtrace/contrib/internal/celery/utils.py index ee803439629..2cbb8fa31ee 100644 --- a/ddtrace/contrib/internal/celery/utils.py +++ b/ddtrace/contrib/internal/celery/utils.py @@ -128,12 +128,9 @@ def retrieve_task_id(context): """ headers = context.get("headers") body = context.get("body") - # Check if the id is in the headers, then check the body for it. - # If we don't check the id first, we could wrongly assume no task_id - # when the task_id is in the body. - if headers and 'id' in headers: + if headers: # Protocol Version 2 (default from Celery 4.0) return headers.get("id") - elif body and 'id' in body: + elif body: # Protocol Version 1 return body.get("id") From c289c4a9f4f547a0638b77199c58c0e14b151202 Mon Sep 17 00:00:00 2001 From: wan Date: Mon, 30 Sep 2024 15:17:00 -0400 Subject: [PATCH 20/21] Update apply_async to capture errors on the span and report them. --- ddtrace/contrib/internal/celery/app.py | 14 ++++++++++++++ tests/contrib/celery/test_integration.py | 14 ++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 4a4c59e918b..b61585097a7 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -1,3 +1,5 @@ +import sys + import celery from celery import signals @@ -124,6 +126,18 @@ def _traced_apply_async_inner(func, instance, args, kwargs): with core.context_with_data("task_context"): try: return func(*args, **kwargs) + except Exception: + # If an internal exception occurs, record the exception in the span, + # then raise the Celery error as usual + task_span = core.get_item("task_span") + if task_span: + task_span.set_exc_info(*sys.exc_info()) + + prerun_span = core.get_item("prerun_span") + if prerun_span: + prerun_span.set_exc_info(*sys.exc_info()) + + raise finally: task_span = core.get_item("task_span") if task_span: diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 41e80cb8cf2..5486587311c 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -444,17 +444,17 @@ def run(self): @mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError)) def test_fn_task_apply_async_soft_exception(self): - # If the underlying library runs into an exception that doesn't crash the app, - # we should still close the span even if the closing signals didn't get - # called + # If the underlying library runs into an exception that doesn't crash the app + # while calling apply_async, we should still close the span even + # if the closing signals didn't get called and mark the span as an error @self.app.task def fn_task_parameters(user, force_logout=False): return (user, force_logout) + t = None try: t = fn_task_parameters.apply_async(args=["user"], kwargs={"force_logout": True}) - t.get(timeout=self.ASYNC_GET_TIMEOUT) except ValueError: traces = self.pop_traces() assert 1 == len(traces) @@ -463,6 +463,12 @@ def fn_task_parameters(user, force_logout=False): assert traces[0][0].get_tag("celery.action") == "apply_async" assert traces[0][0].get_tag("component") == "celery" assert traces[0][0].get_tag("span.kind") == "producer" + # Internal library errors get recorded on the span + assert traces[0][0].error == 1 + assert traces[0][0].get_tag("error.type") == "builtins.ValueError" + assert "ValueError" in traces[0][0].get_tag("error.stack") + # apply_async runs into an internal error (ValueError) so nothing is returned to t + assert t is None def test_shared_task(self): # Ensure Django Shared Task are supported From ab37114b6ad5a6f7ad18a6ea37b4e2b56462f59c Mon Sep 17 00:00:00 2001 From: wan Date: Mon, 30 Sep 2024 16:17:10 -0400 Subject: [PATCH 21/21] Update release note. --- .../fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml index 554d1efc18d..4ca112a2cfb 100644 --- a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml +++ b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml @@ -1,4 +1,4 @@ --- fixes: - | - tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task. \ No newline at end of file + tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task. This update also marks the span as an error if an exception occurs. \ No newline at end of file