diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index b61585097a7..42eed2cb468 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs): if task_span: task_span.set_exc_info(*sys.exc_info()) - prerun_span = core.get_item("prerun_span") - if prerun_span: - prerun_span.set_exc_info(*sys.exc_info()) - raise finally: task_span = core.get_item("task_span") @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs): ) task_span.finish() - prerun_span = core.get_item("prerun_span") - if prerun_span: - log.debug( - "The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint() - ) - prerun_span.finish() - return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index 6341bed9bbf..13469440e73 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -49,9 +49,6 @@ def trace_prerun(*args, **kwargs): service = config.celery["worker_service_name"] span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER) - # Store an item called "prerun span" in case task_postrun doesn't get called - core.set_item("prerun_span", span) - # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) diff --git a/releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml b/releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml new file mode 100644 index 00000000000..f16f7b36fed --- /dev/null +++ b/releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing. \ No newline at end of file diff --git a/tests/contrib/celery/run_tasks.py b/tests/contrib/celery/run_tasks.py new file mode 100644 index 00000000000..e91454ab5bb --- /dev/null +++ b/tests/contrib/celery/run_tasks.py @@ -0,0 +1,5 @@ +from tasks import fn_a +from tasks import fn_b + + +(fn_a.si() | fn_b.si()).delay() diff --git a/tests/contrib/celery/tasks.py b/tests/contrib/celery/tasks.py new file mode 100644 index 00000000000..a9dfc936ae4 --- /dev/null +++ b/tests/contrib/celery/tasks.py @@ -0,0 +1,14 @@ +from celery import Celery + + +app = Celery("tasks") + + +@app.task(name="tests.contrib.celery.tasks.fn_a") +def fn_a(): + return "a" + + +@app.task(name="tests.contrib.celery.tasks.fn_b") +def fn_b(): + return "b" diff --git a/tests/contrib/celery/test_chained_task.py b/tests/contrib/celery/test_chained_task.py new file mode 100644 index 00000000000..5fd0c543e72 --- /dev/null +++ b/tests/contrib/celery/test_chained_task.py @@ -0,0 +1,62 @@ +import os +import re +import subprocess +import time + +from celery import Celery + + +# Ensure that when we call Celery chains, the root span has celery specific span tags +# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running. +# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results. +# See https://github.com/DataDog/dd-trace-py/issues/11479 +def test_task_chain_task_call_task(): + app = Celery("tasks") + + celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo" + celery_task_runner_cmd = "ddtrace-run python run_tasks.py" + + # The commands need to run from the directory where this test file lives + current_directory = str(os.path.dirname(__file__)) + + worker_process = subprocess.Popen( + celery_worker_cmd.split(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid, + close_fds=True, + cwd=current_directory, + ) + + max_wait_time = 10 + waited_so_far = 0 + # {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running} + while app.control.inspect().active() is None and waited_so_far < max_wait_time: + time.sleep(1) + waited_so_far += 1 + + # The task should only run after the Celery worker has sufficient time to start up + task_runner_process = subprocess.Popen( + celery_task_runner_cmd.split(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid, + close_fds=True, + cwd=current_directory, + ) + + task_runner_process.wait() + # Kill the process so it starts to send traces to the Trace Agent + worker_process.kill() + worker_logs = worker_process.stderr.read() + + # Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id + # Some versions of python seem to require escaping when using `re.search`: + old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*" + new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*" + + pattern_exists = ( + re.search(old_pattern_match, str(worker_logs)) is not None + or re.search(new_pattern_match, str(worker_logs)) is not None + ) + assert pattern_exists is not None