diff --git a/flower/api/tasks.py b/flower/api/tasks.py index 85c3f358c..f163fc58a 100644 --- a/flower/api/tasks.py +++ b/flower/api/tasks.py @@ -398,27 +398,7 @@ def get(self): :statuscode 401: unauthorized request :statuscode 503: result backend is not configured """ - app = self.application - broker_options = self.capp.conf.BROKER_TRANSPORT_OPTIONS - - http_api = None - if app.transport == 'amqp' and app.options.broker_api: - http_api = app.options.broker_api - - broker_use_ssl = None - if self.capp.conf.BROKER_USE_SSL: - broker_use_ssl = self.capp.conf.BROKER_USE_SSL - - broker = Broker(app.capp.connection().as_uri(include_password=True), - http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl) - - queue_names = self.get_active_queue_names() - - if not queue_names: - queue_names = set([self.capp.conf.CELERY_DEFAULT_QUEUE]) |\ - set([q.name for q in self.capp.conf.CELERY_QUEUES or [] if q.name]) - - queues = yield broker.queues(sorted(queue_names)) + queues = yield self.get_active_queue_lengths() self.write({'active_queues': queues}) diff --git a/flower/app.py b/flower/app.py index 17284a3ca..05fd7a727 100644 --- a/flower/app.py +++ b/flower/app.py @@ -8,21 +8,24 @@ import celery import tornado.web +import tornado.gen from tornado import ioloop from tornado.concurrent import run_on_executor from tornado.httpserver import HTTPServer +from tornado.ioloop import PeriodicCallback, IOLoop from tornado.web import url from .api import control from .urls import handlers as default_handlers -from .events import Events +from .events import Events, get_prometheus_metrics from .inspector import Inspector from .options import default_options - +from .utils.broker import get_active_queue_lengths logger = logging.getLogger(__name__) - +# TODO: does BROKER_METRICS_UPDATE_INTERVAL need to be configuration from options? +BROKER_METRICS_UPDATE_INTERVAL_SECONDS = 10 if sys.version_info[0]==3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'): import asyncio @@ -69,6 +72,7 @@ def __init__(self, options=None, capp=None, events=None, max_workers_in_memory=self.options.max_workers, max_tasks_in_memory=self.options.max_tasks) self.started = False + self.io_loop.spawn_callback(self.update_broker_metrics) def start(self): self.events.start() @@ -106,3 +110,16 @@ def workers(self): def update_workers(self, workername=None): return self.inspector.inspect(workername) + + @tornado.gen.coroutine + def update_broker_metrics(self): + while True: + next_call = tornado.gen.sleep(BROKER_METRICS_UPDATE_INTERVAL_SECONDS) + try: + active_queues = yield get_active_queue_lengths(self) + metrics = get_prometheus_metrics() + for queue_entry in active_queues: + metrics.queue_length.labels(queue_entry["name"]).set(queue_entry["messages"]) + except Exception as e: + logger.warning("Updating broker metrics failed with %s", repr(e)) + yield next_call diff --git a/flower/events.py b/flower/events.py index 2268f2fd2..af23db33c 100644 --- a/flower/events.py +++ b/flower/events.py @@ -60,6 +60,7 @@ def __init__(self): "Number of tasks currently executing at a worker", ['worker'] ) + self.queue_length = Gauge('flower_broker_queue_length', "Broker queue length", ['name']) class EventsState(State): diff --git a/flower/utils/broker.py b/flower/utils/broker.py index 0818e12d6..20245a92a 100644 --- a/flower/utils/broker.py +++ b/flower/utils/broker.py @@ -126,7 +126,7 @@ def queues(self, names): 'name': name, 'messages': sum([self.redis.llen(x) for x in priority_names]) }) - raise gen.Return(queue_stats) + return queue_stats class Redis(RedisBase): @@ -256,6 +256,41 @@ def queues(self, names): raise NotImplementedError +def get_active_queue_names(application): + queues = set([]) + for _, info in application.workers.items(): + for q in info.get('active_queues', []): + queues.add(q['name']) + return queues + + +@gen.coroutine +def get_active_queue_lengths(application): + app = application + capp = application.capp + broker_options = capp.conf.BROKER_TRANSPORT_OPTIONS + + http_api = None + if app.transport == 'amqp' and app.options.broker_api: + http_api = app.options.broker_api + + broker_use_ssl = None + if capp.conf.BROKER_USE_SSL: + broker_use_ssl = capp.conf.BROKER_USE_SSL + + broker = Broker(app.capp.connection().as_uri(include_password=True), + http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl) + + queue_names = get_active_queue_names(application) + + if not queue_names: + queue_names = set([capp.conf.CELERY_DEFAULT_QUEUE]) | \ + set([q.name for q in capp.conf.CELERY_QUEUES or [] if q.name]) + + queues = yield broker.queues(sorted(queue_names)) + raise gen.Return(queues) + + @gen.coroutine def main(): broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://' diff --git a/flower/views/__init__.py b/flower/views/__init__.py index 50946d25e..c5459e941 100644 --- a/flower/views/__init__.py +++ b/flower/views/__init__.py @@ -8,8 +8,11 @@ from base64 import b64decode import tornado +import tornado.gen +from tornado import gen from ..utils import template, bugreport, prepend_url +from ..utils.broker import get_active_queue_names, get_active_queue_lengths logger = logging.getLogger(__name__) @@ -122,8 +125,9 @@ def format_task(self, task): return task def get_active_queue_names(self): - queues = set([]) - for _, info in self.application.workers.items(): - for q in info.get('active_queues', []): - queues.add(q['name']) - return queues + return get_active_queue_names(self.application) + + @tornado.gen.coroutine + def get_active_queue_lengths(self): + queues = yield get_active_queue_lengths(self.application) + raise gen.Return(queues)