Skip to content

start porting django celery monitoring codes #322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions django_celery_results/camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""The Celery events camera."""

from datetime import timedelta

from celery import states
from celery.events.snapshot import Polaroid
from celery.utils.imports import symbol_by_name
from celery.utils.log import get_logger
from celery.utils.time import maybe_iso8601

from .utils import correct_awareness, fromtimestamp

WORKER_UPDATE_FREQ = 60 # limit worker timestamp write freq.
SUCCESS_STATES = frozenset([states.SUCCESS])

NOT_SAVED_ATTRIBUTES = frozenset(['name', 'args', 'kwargs', 'eta'])

logger = get_logger(__name__)
debug = logger.debug


class Camera(Polaroid):
"""The Celery events Polaroid snapshot camera."""

clear_after = True
worker_update_freq = WORKER_UPDATE_FREQ

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Expiry can be timedelta or None for never expire.
self.app.add_defaults({
'monitors_expire_success': timedelta(days=1),
'monitors_expire_error': timedelta(days=3),
'monitors_expire_pending': timedelta(days=5),
})

@property
def TaskState(self):
"""Return the data model to store task state in."""
return symbol_by_name('django_celery_monitor.models.TaskState')

@property
def WorkerState(self):
"""Return the data model to store worker state in."""
return symbol_by_name('django_celery_monitor.models.WorkerState')

def django_setup(self):
import django
django.setup()

def install(self):
super().install()
self.django_setup()

@property
def expire_task_states(self):
"""Return a twople of Celery task states and expiration timedeltas."""
return (
(SUCCESS_STATES, self.app.conf.monitors_expire_success),
(states.EXCEPTION_STATES, self.app.conf.monitors_expire_error),
(states.UNREADY_STATES, self.app.conf.monitors_expire_pending),
)

def get_heartbeat(self, worker):
try:
heartbeat = worker.heartbeats[-1]
except IndexError:
return
return fromtimestamp(heartbeat)

def handle_worker(self, hostname_worker):
hostname, worker = hostname_worker
return self.WorkerState.objects.update_heartbeat(
hostname,
heartbeat=self.get_heartbeat(worker),
update_freq=self.worker_update_freq,
)

def handle_task(self, uuid_task, worker=None):
"""Handle snapshotted event."""
uuid, task = uuid_task
if task.worker and task.worker.hostname:
worker = self.handle_worker(
(task.worker.hostname, task.worker),
)

defaults = {
'name': task.name,
'args': task.args,
'kwargs': task.kwargs,
'eta': correct_awareness(maybe_iso8601(task.eta)),
'expires': correct_awareness(maybe_iso8601(task.expires)),
'state': task.state,
'tstamp': fromtimestamp(task.timestamp),
'result': task.result or task.exception,
'traceback': task.traceback,
'runtime': task.runtime,
'worker': worker
}
# Some fields are only stored in the RECEIVED event,
# so we should remove these from default values,
# so that they are not overwritten by subsequent states.
[defaults.pop(attr, None) for attr in NOT_SAVED_ATTRIBUTES
if defaults[attr] is None]
return self.update_task(task.state, task_id=uuid, defaults=defaults)

def update_task(self, state, task_id, defaults=None):
defaults = defaults or {}
if not defaults.get('name'):
return
return self.TaskState.objects.update_state(
state=state,
task_id=task_id,
defaults=defaults,
)

def on_shutter(self, state):

def _handle_tasks():
for i, task in enumerate(state.tasks.items()):
self.handle_task(task)

for worker in state.workers.items():
self.handle_worker(worker)
_handle_tasks()

def on_cleanup(self):
expired = (
self.TaskState.objects.expire_by_states(states, expires)
for states, expires in self.expire_task_states
)
dirty = sum(item for item in expired if item is not None)
if dirty:
debug('Cleanup: Marked %s objects as dirty.', dirty)
self.TaskState.objects.purge()
debug('Cleanup: %s objects purged.', dirty)
return dirty
return 0
84 changes: 84 additions & 0 deletions django_celery_results/humanize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Some helpers to humanize values."""

from datetime import datetime

from django.utils.timezone import now
from django.utils.translation import ugettext as _
from django.utils.translation import ungettext


def pluralize_year(n):
"""Return a string with the number of yeargs ago."""
return ungettext(_('{num} year ago'), _('{num} years ago'), n)


def pluralize_month(n):
"""Return a string with the number of months ago."""
return ungettext(_('{num} month ago'), _('{num} months ago'), n)


def pluralize_week(n):
"""Return a string with the number of weeks ago."""
return ungettext(_('{num} week ago'), _('{num} weeks ago'), n)


def pluralize_day(n):
"""Return a string with the number of days ago."""
return ungettext(_('{num} day ago'), _('{num} days ago'), n)


OLDER_CHUNKS = (
(365.0, pluralize_year),
(30.0, pluralize_month),
(7.0, pluralize_week),
(1.0, pluralize_day),
)


def naturaldate(date, include_seconds=False):
"""Convert datetime into a human natural date string."""
if not date:
return ''

right_now = now()
today = datetime(right_now.year, right_now.month,
right_now.day, tzinfo=right_now.tzinfo)
delta = right_now - date
delta_midnight = today - date

days = delta.days
hours = delta.seconds // 3600
minutes = delta.seconds // 60
seconds = delta.seconds

if days < 0:
return _('just now')

if days == 0:
if hours == 0:
if minutes > 0:
return ungettext(
_('{minutes} minute ago'),
_('{minutes} minutes ago'), minutes
).format(minutes=minutes)
else:
if include_seconds and seconds:
return ungettext(
_('{seconds} second ago'),
_('{seconds} seconds ago'), seconds
).format(seconds=seconds)
return _('just now')
else:
return ungettext(
_('{hours} hour ago'), _('{hours} hours ago'), hours
).format(hours=hours)

if delta_midnight.days == 0:
return _('yesterday at {time}').format(time=date.strftime('%H:%M'))

count = 0
for chunk, pluralizefun in OLDER_CHUNKS:
if days >= chunk:
count = int(round((delta_midnight.days + 1) / chunk, 0))
fmt = pluralizefun(count)
return fmt.format(num=count)