Skip to content

Commit

Permalink
Expose queue length as a prometheus metric
Browse files Browse the repository at this point in the history
Implements #1173
  • Loading branch information
HTRafal committed Apr 17, 2023
1 parent a4fc7c9 commit 933db84
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 30 deletions.
22 changes: 1 addition & 21 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,27 +398,7 @@ def get(self):
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
"""
app = self.application
broker_options = self.capp.conf.BROKER_TRANSPORT_OPTIONS

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker_use_ssl = None
if self.capp.conf.BROKER_USE_SSL:
broker_use_ssl = self.capp.conf.BROKER_USE_SSL

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl)

queue_names = self.get_active_queue_names()

if not queue_names:
queue_names = set([self.capp.conf.CELERY_DEFAULT_QUEUE]) |\
set([q.name for q in self.capp.conf.CELERY_QUEUES or [] if q.name])

queues = yield broker.queues(sorted(queue_names))
queues = yield self.get_active_queue_lengths()
self.write({'active_queues': queues})


Expand Down
23 changes: 20 additions & 3 deletions flower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,24 @@

import celery
import tornado.web
import tornado.gen

from tornado import ioloop
from tornado.concurrent import run_on_executor
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback, IOLoop
from tornado.web import url

from .api import control
from .urls import handlers as default_handlers
from .events import Events
from .events import Events, get_prometheus_metrics
from .inspector import Inspector
from .options import default_options

from .utils.broker import get_active_queue_lengths

logger = logging.getLogger(__name__)

# TODO: does BROKER_METRICS_UPDATE_INTERVAL need to be configuration from options?
BROKER_METRICS_UPDATE_INTERVAL_SECONDS = 10

if sys.version_info[0]==3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
import asyncio
Expand Down Expand Up @@ -69,6 +72,7 @@ def __init__(self, options=None, capp=None, events=None,
max_workers_in_memory=self.options.max_workers,
max_tasks_in_memory=self.options.max_tasks)
self.started = False
self.io_loop.spawn_callback(self.update_broker_metrics)

def start(self):
self.events.start()
Expand Down Expand Up @@ -106,3 +110,16 @@ def workers(self):

def update_workers(self, workername=None):
return self.inspector.inspect(workername)

@tornado.gen.coroutine
def update_broker_metrics(self):
while True:
next_call = tornado.gen.sleep(BROKER_METRICS_UPDATE_INTERVAL_SECONDS)
try:
active_queues = yield get_active_queue_lengths(self)
metrics = get_prometheus_metrics()
for queue_entry in active_queues:
metrics.queue_length.labels(queue_entry["name"]).set(queue_entry["messages"])
except Exception as e:
logger.warning("Updating broker metrics failed with %s", repr(e))
yield next_call
1 change: 1 addition & 0 deletions flower/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self):
"Number of tasks currently executing at a worker",
['worker']
)
self.queue_length = Gauge('flower_broker_queue_length', "Broker queue length", ['name'])


class EventsState(State):
Expand Down
37 changes: 36 additions & 1 deletion flower/utils/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def queues(self, names):
'name': name,
'messages': sum([self.redis.llen(x) for x in priority_names])
})
raise gen.Return(queue_stats)
return queue_stats


class Redis(RedisBase):
Expand Down Expand Up @@ -256,6 +256,41 @@ def queues(self, names):
raise NotImplementedError


def get_active_queue_names(application):
queues = set([])
for _, info in application.workers.items():
for q in info.get('active_queues', []):
queues.add(q['name'])
return queues


@gen.coroutine
def get_active_queue_lengths(application):
app = application
capp = application.capp
broker_options = capp.conf.BROKER_TRANSPORT_OPTIONS

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker_use_ssl = None
if capp.conf.BROKER_USE_SSL:
broker_use_ssl = capp.conf.BROKER_USE_SSL

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl)

queue_names = get_active_queue_names(application)

if not queue_names:
queue_names = set([capp.conf.CELERY_DEFAULT_QUEUE]) | \
set([q.name for q in capp.conf.CELERY_QUEUES or [] if q.name])

queues = yield broker.queues(sorted(queue_names))
raise gen.Return(queues)


@gen.coroutine
def main():
broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://'
Expand Down
14 changes: 9 additions & 5 deletions flower/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
from base64 import b64decode

import tornado
import tornado.gen
from tornado import gen

from ..utils import template, bugreport, prepend_url
from ..utils.broker import get_active_queue_names, get_active_queue_lengths

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,8 +125,9 @@ def format_task(self, task):
return task

def get_active_queue_names(self):
queues = set([])
for _, info in self.application.workers.items():
for q in info.get('active_queues', []):
queues.add(q['name'])
return queues
return get_active_queue_names(self.application)

@tornado.gen.coroutine
def get_active_queue_lengths(self):
queues = yield get_active_queue_lengths(self.application)
raise gen.Return(queues)

0 comments on commit 933db84

Please sign in to comment.