Skip to content

Commit 17cba33

Browse files
add command to check for celery health (#290)
1 parent 25f0995 commit 17cba33

3 files changed

Lines changed: 70 additions & 0 deletions

File tree

django_project/health/management/__init__.py

Whitespace-only changes.

django_project/health/management/commands/__init__.py

Whitespace-only changes.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# coding=utf-8
2+
"""Command for celery health check."""
3+
4+
from django.core.management.base import BaseCommand
5+
from celery import current_app
6+
import sys
7+
8+
9+
class Command(BaseCommand):
10+
"""Django management command to check Celery worker health."""
11+
12+
help = 'Check Celery worker health and readiness'
13+
14+
def add_arguments(self, parser):
15+
"""Add command line arguments for health check type and timeout."""
16+
parser.add_argument(
17+
'--check',
18+
type=str,
19+
choices=['liveness', 'readiness'],
20+
required=True,
21+
help='Type of health check to perform'
22+
)
23+
parser.add_argument(
24+
'--timeout',
25+
type=int,
26+
default=5,
27+
help='Timeout in seconds for the health check'
28+
)
29+
30+
def handle(self, *args, **options):
31+
"""Handle the health check command."""
32+
check_type = options['check']
33+
timeout = options['timeout']
34+
35+
try:
36+
if check_type == 'liveness':
37+
self.check_liveness(timeout)
38+
elif check_type == 'readiness':
39+
self.check_readiness(timeout)
40+
41+
self.stdout.write(self.style.SUCCESS(f'{check_type} check passed'))
42+
sys.exit(0)
43+
except Exception as e:
44+
self.stderr.write(
45+
self.style.ERROR(f'{check_type} check failed: {str(e)}')
46+
)
47+
sys.exit(1)
48+
49+
def check_liveness(self, timeout):
50+
"""Check if worker is alive and responding."""
51+
inspect = current_app.control.inspect(timeout=timeout)
52+
pong = inspect.ping()
53+
54+
if not pong:
55+
raise Exception("No active workers found")
56+
57+
# pong returns: {'worker@hostname': {'ok': 'pong'}}
58+
if not any('ok' in response for response in pong.values()):
59+
raise Exception("Workers not responding properly")
60+
61+
def check_readiness(self, timeout):
62+
"""Check if worker can connect to broker."""
63+
conn = current_app.connection()
64+
conn.ensure_connection(
65+
max_retries=3,
66+
interval_start=0,
67+
interval_step=1,
68+
timeout=timeout
69+
)
70+
conn.release()

0 commit comments

Comments
 (0)