diff --git a/flower/events.py b/flower/events.py index cd15d7a2..68b9ad0c 100644 --- a/flower/events.py +++ b/flower/events.py @@ -85,16 +85,23 @@ def event(self, event): task_started = task.started task_received = task.received + task_revoked = task.revoked - if event_type == 'task-received' and not task.eta and task_received: - self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).inc() + if event_type == 'task-received' and not task.eta and task_received: + self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).inc() - if event_type == 'task-started' and not task.eta and task_started and task_received: - self.metrics.prefetch_time.labels(worker_name, task_name).set(task_started - task_received) - self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).dec() + if event_type == 'task-started' and not task.eta and task_started and task_received: + self.metrics.prefetch_time.labels(worker_name, task_name).set(task_started - task_received) + self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).dec() + elif event_type == 'task-revoked' and not task.eta and task_revoked and task_received: + self.metrics.prefetch_time.labels(worker_name, task_name).set(task_revoked - task_received) + self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).dec() + + if event_type in ['task-succeeded', 'task-failed'] and not task.eta and task_started and task_received: + self.metrics.prefetch_time.labels(worker_name, task_name).set(0) + elif event_type == 'task-revoked' and not task.eta and task_revoked and task_received: + self.metrics.prefetch_time.labels(worker_name, task_name).set(0) - if event_type in ['task-succeeded', 'task-failed'] and not task.eta and task_started and task_received: - self.metrics.prefetch_time.labels(worker_name, task_name).set(0) if event_type == 'worker-online': self.metrics.worker_online.labels(worker_name).set(1) diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py index 29084345..9dbcd9d5 100644 --- a/tests/unit/utils/__init__.py +++ b/tests/unit/utils/__init__.py @@ -68,3 +68,11 @@ def task_failed_events(worker, id=None, name=None): Event('task-started', uuid=id, hostname=worker), Event('task-failed', uuid=id, exception="KeyError('foo')", traceback='line 1 at main', hostname=worker)] + +def task_revoked_events(worker, id=None, name=None): + id = id or uuid() + name = name or 'sometask' + return [Event('task-received', uuid=id, name=name, + args='(2, 2)', kwargs="{'foo': 'bar'}", + retries=0, eta=None, hostname=worker), + Event('task-revoked', uuid=id, hostname=worker)] diff --git a/tests/unit/views/test_monitor.py b/tests/unit/views/test_monitor.py index b7d00bd2..11bc9098 100644 --- a/tests/unit/views/test_monitor.py +++ b/tests/unit/views/test_monitor.py @@ -7,7 +7,7 @@ from flower.events import EventsState from tests.unit import AsyncHTTPTestCase -from tests.unit.utils import task_failed_events, task_succeeded_events +from tests.unit.utils import task_failed_events, task_succeeded_events, task_revoked_events class PrometheusTests(AsyncHTTPTestCase): @@ -119,6 +119,31 @@ def test_task_prefetch_time_metric_failed_task_resets_metric_to_zero(self): f'flower_task_prefetch_time_seconds{{task="{task_name}",worker="{worker_name}"}} 0.0' in metrics ) + def test_task_prefetch_time_metric_revoked_task_resets_metric_to_zero(self): + state = EventsState() + worker_name = 'worker1' + task_name = 'task1' + state.get_or_create_worker(worker_name) + events = task_revoked_events(worker=worker_name, name=task_name, id='123') + + task_received = time.time() + task_revoked = task_received + 3 + for i, e in enumerate(events): + e['clock'] = i + e['local_received'] = time.time() + if e['type'] == 'task-received': + e['timestamp'] = task_received + if e['type'] == 'task-revoked': + e['timestamp'] = task_revoked + state.event(e) + self.app.events.state = state + + metrics = self.get('/metrics').body.decode('utf-8') + + self.assertTrue( + f'flower_task_prefetch_time_seconds{{task="{task_name}",worker="{worker_name}"}} 0.0' in metrics + ) + def test_task_prefetch_time_metric_does_not_compute_prefetch_time_if_task_has_eta(self): state = EventsState() worker_name = 'worker2' @@ -197,6 +222,55 @@ def test_worker_prefetched_tasks_metric(self): f'flower_worker_prefetched_tasks{{task="{task_name}",worker="{worker_name}"}} 1.0' in metrics ) + def test_worker_revoked_prefetched_tasks_metric(self): + + state = EventsState() + worker_name = 'worker2' + task_name = 'task3' + task_id = uuid() + state.get_or_create_worker(worker_name) + events = [ + Event( + 'task-received', + uuid=task_id, + name=task_name, + args='(2, 2)', + kwargs="{'foo': 'bar'}", + retries=1, + eta=None, + hostname=worker_name + ), + Event( + 'task-received', + uuid=uuid(), + name=task_name, + args='(2, 2)', + kwargs="{'foo': 'bar'}", + retries=1, + eta=None, + hostname=worker_name + ), + Event('task-revoked', eta=None, uuid=task_id, hostname=worker_name), + ] + + task_received = time.time() + task_revoked = task_received + 3 + for i, e in enumerate(events): + e['clock'] = i + e['local_received'] = time.time() + if e['type'] == 'task-received': + e['timestamp'] = task_received + if e['type'] == 'task-revoked': + e['timestamp'] = task_revoked + state.event(e) + self.app.events.state = state + + metrics = self.get('/metrics').body.decode('utf-8') + + self.assertTrue( + f'flower_worker_prefetched_tasks{{task="{task_name}",worker="{worker_name}"}} 1.0' in metrics + ) + class HealthcheckTests(AsyncHTTPTestCase): def setUp(self):