From 7d6dad8f017f8806e8727d14a652f1f7758a06f0 Mon Sep 17 00:00:00 2001 From: Asif Saif Uddin Date: Tue, 21 Jun 2022 11:59:25 +0600 Subject: [PATCH 1/2] start porting django celery monitoring codes --- django_celery_results/camera.py | 139 ++++++++++++++++++++++++++++++ django_celery_results/humanize.py | 84 ++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 django_celery_results/camera.py create mode 100644 django_celery_results/humanize.py diff --git a/django_celery_results/camera.py b/django_celery_results/camera.py new file mode 100644 index 00000000..08f3a6c9 --- /dev/null +++ b/django_celery_results/camera.py @@ -0,0 +1,139 @@ +"""The Celery events camera.""" +from __future__ import absolute_import, unicode_literals + +from datetime import timedelta + +from celery import states +from celery.events.snapshot import Polaroid +from celery.utils.imports import symbol_by_name +from celery.utils.log import get_logger +from celery.utils.time import maybe_iso8601 + +from .utils import fromtimestamp, correct_awareness + +WORKER_UPDATE_FREQ = 60 # limit worker timestamp write freq. +SUCCESS_STATES = frozenset([states.SUCCESS]) + +NOT_SAVED_ATTRIBUTES = frozenset(['name', 'args', 'kwargs', 'eta']) + +logger = get_logger(__name__) +debug = logger.debug + + +class Camera(Polaroid): + """The Celery events Polaroid snapshot camera.""" + + clear_after = True + worker_update_freq = WORKER_UPDATE_FREQ + + def __init__(self, *args, **kwargs): + super(Camera, self).__init__(*args, **kwargs) + # Expiry can be timedelta or None for never expire. + self.app.add_defaults({ + 'monitors_expire_success': timedelta(days=1), + 'monitors_expire_error': timedelta(days=3), + 'monitors_expire_pending': timedelta(days=5), + }) + + @property + def TaskState(self): + """Return the data model to store task state in.""" + return symbol_by_name('django_celery_monitor.models.TaskState') + + @property + def WorkerState(self): + """Return the data model to store worker state in.""" + return symbol_by_name('django_celery_monitor.models.WorkerState') + + def django_setup(self): + import django + django.setup() + + def install(self): + super(Camera, self).install() + self.django_setup() + + @property + def expire_task_states(self): + """Return a twople of Celery task states and expiration timedeltas.""" + return ( + (SUCCESS_STATES, self.app.conf.monitors_expire_success), + (states.EXCEPTION_STATES, self.app.conf.monitors_expire_error), + (states.UNREADY_STATES, self.app.conf.monitors_expire_pending), + ) + + def get_heartbeat(self, worker): + try: + heartbeat = worker.heartbeats[-1] + except IndexError: + return + return fromtimestamp(heartbeat) + + def handle_worker(self, hostname_worker): + hostname, worker = hostname_worker + return self.WorkerState.objects.update_heartbeat( + hostname, + heartbeat=self.get_heartbeat(worker), + update_freq=self.worker_update_freq, + ) + + def handle_task(self, uuid_task, worker=None): + """Handle snapshotted event.""" + uuid, task = uuid_task + if task.worker and task.worker.hostname: + worker = self.handle_worker( + (task.worker.hostname, task.worker), + ) + + defaults = { + 'name': task.name, + 'args': task.args, + 'kwargs': task.kwargs, + 'eta': correct_awareness(maybe_iso8601(task.eta)), + 'expires': correct_awareness(maybe_iso8601(task.expires)), + 'state': task.state, + 'tstamp': fromtimestamp(task.timestamp), + 'result': task.result or task.exception, + 'traceback': task.traceback, + 'runtime': task.runtime, + 'worker': worker + } + # Some fields are only stored in the RECEIVED event, + # so we should remove these from default values, + # so that they are not overwritten by subsequent states. + [defaults.pop(attr, None) for attr in NOT_SAVED_ATTRIBUTES + if defaults[attr] is None] + return self.update_task(task.state, task_id=uuid, defaults=defaults) + + def update_task(self, state, task_id, defaults=None): + defaults = defaults or {} + if not defaults.get('name'): + return + return self.TaskState.objects.update_state( + state=state, + task_id=task_id, + defaults=defaults, + ) + + def on_shutter(self, state): + + def _handle_tasks(): + for i, task in enumerate(state.tasks.items()): + self.handle_task(task) + + for worker in state.workers.items(): + self.handle_worker(worker) + _handle_tasks() + + def on_cleanup(self): + expired = ( + self.TaskState.objects.expire_by_states(states, expires) + for states, expires in self.expire_task_states + ) + dirty = sum(item for item in expired if item is not None) + if dirty: + debug('Cleanup: Marked %s objects as dirty.', dirty) + self.TaskState.objects.purge() + debug('Cleanup: %s objects purged.', dirty) + return dirty + return 0 diff --git a/django_celery_results/humanize.py b/django_celery_results/humanize.py new file mode 100644 index 00000000..1969c70e --- /dev/null +++ b/django_celery_results/humanize.py @@ -0,0 +1,84 @@ +"""Some helpers to humanize values.""" +from __future__ import absolute_import, unicode_literals + +from datetime import datetime + +from django.utils.translation import ungettext, ugettext as _ +from django.utils.timezone import now + + +def pluralize_year(n): + """Return a string with the number of yeargs ago.""" + return ungettext(_('{num} year ago'), _('{num} years ago'), n) + + +def pluralize_month(n): + """Return a string with the number of months ago.""" + return ungettext(_('{num} month ago'), _('{num} months ago'), n) + + +def pluralize_week(n): + """Return a string with the number of weeks ago.""" + return ungettext(_('{num} week ago'), _('{num} weeks ago'), n) + + +def pluralize_day(n): + """Return a string with the number of days ago.""" + return ungettext(_('{num} day ago'), _('{num} days ago'), n) + + +OLDER_CHUNKS = ( + (365.0, pluralize_year), + (30.0, pluralize_month), + (7.0, pluralize_week), + (1.0, pluralize_day), +) + + +def naturaldate(date, include_seconds=False): + """Convert datetime into a human natural date string.""" + if not date: + return '' + + right_now = now() + today = datetime(right_now.year, right_now.month, + right_now.day, tzinfo=right_now.tzinfo) + delta = right_now - date + delta_midnight = today - date + + days = delta.days + hours = delta.seconds // 3600 + minutes = delta.seconds // 60 + seconds = delta.seconds + + if days < 0: + return _('just now') + + if days == 0: + if hours == 0: + if minutes > 0: + return ungettext( + _('{minutes} minute ago'), + _('{minutes} minutes ago'), minutes + ).format(minutes=minutes) + else: + if include_seconds and seconds: + return ungettext( + _('{seconds} second ago'), + _('{seconds} seconds ago'), seconds + ).format(seconds=seconds) + return _('just now') + else: + return ungettext( + _('{hours} hour ago'), _('{hours} hours ago'), hours + ).format(hours=hours) + + if delta_midnight.days == 0: + return _('yesterday at {time}').format(time=date.strftime('%H:%M')) + + count = 0 + for chunk, pluralizefun in OLDER_CHUNKS: + if days >= chunk: + count = int(round((delta_midnight.days + 1) / chunk, 0)) + fmt = pluralizefun(count) + return fmt.format(num=count) From 15801d319ef4e0e33a8b1e6e27f335785fd207b2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 21 Jun 2022 06:00:56 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- django_celery_results/camera.py | 7 +++---- django_celery_results/humanize.py | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/django_celery_results/camera.py b/django_celery_results/camera.py index 08f3a6c9..9fc0f9f0 100644 --- a/django_celery_results/camera.py +++ b/django_celery_results/camera.py @@ -1,5 +1,4 @@ """The Celery events camera.""" -from __future__ import absolute_import, unicode_literals from datetime import timedelta @@ -9,7 +8,7 @@ from celery.utils.log import get_logger from celery.utils.time import maybe_iso8601 -from .utils import fromtimestamp, correct_awareness +from .utils import correct_awareness, fromtimestamp WORKER_UPDATE_FREQ = 60 # limit worker timestamp write freq. SUCCESS_STATES = frozenset([states.SUCCESS]) @@ -27,7 +26,7 @@ class Camera(Polaroid): worker_update_freq = WORKER_UPDATE_FREQ def __init__(self, *args, **kwargs): - super(Camera, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) # Expiry can be timedelta or None for never expire. self.app.add_defaults({ 'monitors_expire_success': timedelta(days=1), @@ -50,7 +49,7 @@ def django_setup(self): django.setup() def install(self): - super(Camera, self).install() + super().install() self.django_setup() @property diff --git a/django_celery_results/humanize.py b/django_celery_results/humanize.py index 1969c70e..fd053571 100644 --- a/django_celery_results/humanize.py +++ b/django_celery_results/humanize.py @@ -1,10 +1,10 @@ """Some helpers to humanize values.""" -from __future__ import absolute_import, unicode_literals from datetime import datetime -from django.utils.translation import ungettext, ugettext as _ from django.utils.timezone import now +from django.utils.translation import ugettext as _ +from django.utils.translation import ungettext def pluralize_year(n):