Skip to content

Commit 2d192ca

Browse files
committed
Additional metrics exported from Celery workers
1 parent 4a1e0ce commit 2d192ca

File tree

6 files changed

+522
-108
lines changed

6 files changed

+522
-108
lines changed

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
1212
## Unreleased
1313

14+
### Breaking changes
15+
16+
- `opentelemetry-instrumentation-celery` Rename `flower.task.runtime.seconds` metric to `messaging.process.duration` according to semconv
17+
([#3463](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3463))
18+
19+
### Added
20+
21+
- `opentelemetry-instrumentation-celery` Add three additional worker metrics to count active and prefetched tasks, as well as prefetch duration
22+
([#3463](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3463))
23+
24+
### Fixed
25+
26+
- `opentelemetry-instrumentation-celery` Fix a memory leak where a reference to a task identifier is kept indefinitely
27+
([#3463](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3463))
28+
1429
## Version 1.33.0/0.54b0 (2025-05-09)
1530

1631
### Added

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 106 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def add(x, y):
6060
"""
6161

6262
import logging
63-
from timeit import default_timer
63+
import time
6464
from typing import Collection, Iterable
6565

6666
from billiard import VERSION
@@ -76,6 +76,7 @@ def add(x, y):
7676
from opentelemetry.metrics import get_meter
7777
from opentelemetry.propagate import extract, inject
7878
from opentelemetry.propagators.textmap import Getter
79+
from opentelemetry.semconv._incubating.metrics import messaging_metrics
7980
from opentelemetry.semconv.trace import SpanAttributes
8081
from opentelemetry.trace.status import Status, StatusCode
8182

@@ -96,6 +97,12 @@ def add(x, y):
9697
_TASK_REVOKED_TERMINATED_SIGNAL_KEY = "celery.terminated.signal"
9798
_TASK_NAME_KEY = "celery.task_name"
9899

100+
# Metric names
101+
_TASK_COUNT_ACTIVE = "messaging.client.active_tasks"
102+
_TASK_COUNT_PREFETCHED = "messaging.client.prefetched_tasks"
103+
_TASK_PROCESSING_TIME = messaging_metrics.MESSAGING_PROCESS_DURATION
104+
_TASK_PREFETCH_TIME = "messaging.prefetch.duration"
105+
99106

100107
class CeleryGetter(Getter):
101108
def get(self, carrier, key):
@@ -113,10 +120,36 @@ def keys(self, carrier):
113120
celery_getter = CeleryGetter()
114121

115122

116-
class CeleryInstrumentor(BaseInstrumentor):
117-
metrics = None
118-
task_id_to_start_time = {}
123+
class TaskDurationTracker:
124+
def __init__(self, metrics):
125+
self.metrics = metrics
126+
self.tracker = {}
127+
128+
def record_start(self, key, step):
129+
self.tracker.setdefault(key, {})[step] = time.perf_counter()
130+
131+
def record_finish(self, key, metric_name, attributes):
132+
try:
133+
time_elapsed = self._time_elapsed(key, metric_name)
134+
self.metrics[metric_name].record(
135+
max(0, time_elapsed), attributes=attributes
136+
)
137+
except KeyError:
138+
logger.warning("Failed to record %s for task %s", metric_name, key)
139+
140+
def _time_elapsed(self, key, step):
141+
end_time = time.perf_counter()
142+
try:
143+
start_time = self.tracker.get(key, {}).pop(step)
144+
time_elapsed = end_time - start_time
145+
return time_elapsed
146+
finally:
147+
# Cleanup operation
148+
if key in self.tracker and not self.tracker.get(key):
149+
self.tracker.pop(key)
150+
119151

152+
class CeleryInstrumentor(BaseInstrumentor):
120153
def instrumentation_dependencies(self) -> Collection[str]:
121154
return _instruments
122155

@@ -139,8 +172,10 @@ def _instrument(self, **kwargs):
139172
schema_url="https://opentelemetry.io/schemas/1.11.0",
140173
)
141174

142-
self.create_celery_metrics(meter)
175+
self.metrics = _create_celery_worker_metrics(meter)
176+
self.time_tracker = TaskDurationTracker(self.metrics)
143177

178+
signals.task_received.connect(self._trace_received, weak=False)
144179
signals.task_prerun.connect(self._trace_prerun, weak=False)
145180
signals.task_postrun.connect(self._trace_postrun, weak=False)
146181
signals.before_task_publish.connect(
@@ -153,27 +188,52 @@ def _instrument(self, **kwargs):
153188
signals.task_retry.connect(self._trace_retry, weak=False)
154189

155190
def _uninstrument(self, **kwargs):
191+
signals.task_received.disconnect(self._trace_received)
156192
signals.task_prerun.disconnect(self._trace_prerun)
157193
signals.task_postrun.disconnect(self._trace_postrun)
158194
signals.before_task_publish.disconnect(self._trace_before_publish)
159195
signals.after_task_publish.disconnect(self._trace_after_publish)
160196
signals.task_failure.disconnect(self._trace_failure)
161197
signals.task_retry.disconnect(self._trace_retry)
162198

199+
def _trace_received(self, *args, **kwargs):
200+
"""
201+
On prerun signal, task is prefetched and prefetch timer starts
202+
"""
203+
204+
request = utils.retrieve_request(kwargs)
205+
206+
metrics_attributes = utils.get_metrics_attributes_from_request(request)
207+
self.metrics[_TASK_COUNT_PREFETCHED].add(
208+
1, attributes=metrics_attributes
209+
)
210+
self.time_tracker.record_start(request.task_id, _TASK_PREFETCH_TIME)
211+
163212
def _trace_prerun(self, *args, **kwargs):
213+
"""
214+
On prerun signal, task is no longer prefetched, and execution timer
215+
starts along with the task span
216+
"""
217+
164218
task = utils.retrieve_task(kwargs)
165219
task_id = utils.retrieve_task_id(kwargs)
166220

167221
if task is None or task_id is None:
168222
return
169223

170-
self.update_task_duration_time(task_id)
224+
metrics_attributes = utils.get_metrics_attributes_from_task(task)
225+
self.metrics[_TASK_COUNT_PREFETCHED].add(
226+
-1, attributes=metrics_attributes
227+
)
228+
self.time_tracker.record_finish(
229+
task_id, _TASK_PREFETCH_TIME, metrics_attributes
230+
)
231+
self.time_tracker.record_start(task_id, _TASK_PROCESSING_TIME)
232+
171233
request = task.request
172234
tracectx = extract(request, getter=celery_getter) or None
173235
token = context_api.attach(tracectx) if tracectx is not None else None
174236

175-
logger.debug("prerun signal start task_id=%s", task_id)
176-
177237
operation_name = f"{_TASK_RUN}/{task.name}"
178238
span = self._tracer.start_span(
179239
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
@@ -183,14 +243,24 @@ def _trace_prerun(self, *args, **kwargs):
183243
activation.__enter__() # pylint: disable=E1101
184244
utils.attach_context(task, task_id, span, activation, token)
185245

246+
self.metrics[_TASK_COUNT_ACTIVE].add(1, attributes=metrics_attributes)
247+
186248
def _trace_postrun(self, *args, **kwargs):
249+
"""
250+
On postrun signal, task is no longer being executed
251+
"""
252+
187253
task = utils.retrieve_task(kwargs)
188254
task_id = utils.retrieve_task_id(kwargs)
189255

190256
if task is None or task_id is None:
191257
return
192258

193-
logger.debug("postrun signal task_id=%s", task_id)
259+
metrics_attributes = utils.get_metrics_attributes_from_task(task)
260+
self.metrics[_TASK_COUNT_ACTIVE].add(-1, attributes=metrics_attributes)
261+
self.time_tracker.record_finish(
262+
task_id, _TASK_PROCESSING_TIME, metrics_attributes
263+
)
194264

195265
# retrieve and finish the Span
196266
ctx = utils.retrieve_context(task, task_id)
@@ -210,10 +280,8 @@ def _trace_postrun(self, *args, **kwargs):
210280

211281
activation.__exit__(None, None, None)
212282
utils.detach_context(task, task_id)
213-
self.update_task_duration_time(task_id)
214-
labels = {"task": task.name, "worker": task.request.hostname}
215-
self._record_histograms(task_id, labels)
216-
# if the process sending the task is not instrumented
283+
284+
# If the process sending the task is not instrumented,
217285
# there's no incoming context and no token to detach
218286
if token is not None:
219287
context_api.detach(token)
@@ -345,29 +413,29 @@ def _trace_retry(*args, **kwargs):
345413
# something that isn't an `Exception`
346414
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))
347415

348-
def update_task_duration_time(self, task_id):
349-
cur_time = default_timer()
350-
task_duration_time_until_now = (
351-
cur_time - self.task_id_to_start_time[task_id]
352-
if task_id in self.task_id_to_start_time
353-
else cur_time
354-
)
355-
self.task_id_to_start_time[task_id] = task_duration_time_until_now
356-
357-
def _record_histograms(self, task_id, metric_attributes):
358-
if task_id is None:
359-
return
360416

361-
self.metrics["flower.task.runtime.seconds"].record(
362-
self.task_id_to_start_time.get(task_id),
363-
attributes=metric_attributes,
364-
)
365-
366-
def create_celery_metrics(self, meter) -> None:
367-
self.metrics = {
368-
"flower.task.runtime.seconds": meter.create_histogram(
369-
name="flower.task.runtime.seconds",
370-
unit="seconds",
371-
description="The time it took to run the task.",
372-
)
373-
}
417+
def _create_celery_worker_metrics(meter) -> None:
418+
metrics = {
419+
_TASK_COUNT_ACTIVE: meter.create_up_down_counter(
420+
name=_TASK_COUNT_ACTIVE,
421+
unit="{message}",
422+
description="Number of tasks currently being executed by the worker",
423+
),
424+
_TASK_COUNT_PREFETCHED: meter.create_up_down_counter(
425+
name=_TASK_COUNT_PREFETCHED,
426+
unit="{message}",
427+
description="Number of tasks prefetched by the worker",
428+
),
429+
_TASK_PREFETCH_TIME: meter.create_histogram(
430+
name=_TASK_PREFETCH_TIME,
431+
unit="s",
432+
description="The time the task spent in prefetch mode",
433+
),
434+
_TASK_PROCESSING_TIME: meter.create_histogram(
435+
name=_TASK_PROCESSING_TIME,
436+
unit="s",
437+
description="The time it took to run the task.",
438+
),
439+
}
440+
441+
return metrics

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
from celery import registry # pylint: disable=no-name-in-module
2121
from celery.app.task import Task
2222

23+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
24+
MESSAGING_CLIENT_ID,
25+
MESSAGING_OPERATION_NAME,
26+
)
2327
from opentelemetry.semconv.trace import SpanAttributes
2428
from opentelemetry.trace import Span
2529

@@ -217,6 +221,14 @@ def retrieve_task_id(kwargs):
217221
return task_id
218222

219223

224+
def retrieve_request(kwargs):
225+
request = kwargs.get("request")
226+
if request is None:
227+
logger.debug("Unable to retrieve the request from signal arguments")
228+
229+
return request
230+
231+
220232
def retrieve_task_id_from_request(kwargs):
221233
# retry signal does not include task_id as argument so use request argument
222234
request = kwargs.get("request")
@@ -250,3 +262,17 @@ def retrieve_reason(kwargs):
250262
if not reason:
251263
logger.debug("Unable to retrieve the retry reason")
252264
return reason
265+
266+
267+
def get_metrics_attributes_from_request(request):
268+
return {
269+
MESSAGING_OPERATION_NAME: request.task.name,
270+
MESSAGING_CLIENT_ID: request.hostname,
271+
}
272+
273+
274+
def get_metrics_attributes_from_task(task):
275+
return {
276+
MESSAGING_OPERATION_NAME: task.name,
277+
MESSAGING_CLIENT_ID: task.request.hostname,
278+
}

instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import time
16+
1517
from celery import Celery
1618

1719
from opentelemetry import baggage
1820

1921

2022
class Config:
21-
result_backend = "rpc"
22-
broker_backend = "memory"
23+
result_backend = "rpc://"
24+
without_gossip = True
25+
without_heartbeat = True
26+
without_mingle = True
2327

2428

2529
app = Celery(broker="memory:///")
@@ -31,8 +35,14 @@ class CustomError(Exception):
3135

3236

3337
@app.task
34-
def task_add(num_a, num_b):
35-
return num_a + num_b
38+
def task_add(x=1, y=2):
39+
return x + y
40+
41+
42+
@app.task
43+
def task_sleep(sleep_time):
44+
time.sleep(sleep_time)
45+
return 1
3646

3747

3848
@app.task

instrumentation/opentelemetry-instrumentation-celery/tests/test_duplicate.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,3 @@ def test_duplicate_instrumentaion(self):
2626
CeleryInstrumentor().uninstrument()
2727
self.assertIsNotNone(first.metrics)
2828
self.assertIsNotNone(second.metrics)
29-
self.assertEqual(first.task_id_to_start_time, {})
30-
self.assertEqual(second.task_id_to_start_time, {})

0 commit comments

Comments
 (0)