Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,14 @@ Default: None
Sets the URI to which an OAuth 2.0 server redirects the user after successful authentication and authorization.

`oauth2_redirect_uri` option should be used with :ref:`auth`, :ref:`auth_provider`, :ref:`oauth2_key` and :ref:`oauth2_secret` options.

.. _queue_cache_ttl:

queue_cache_ttl
~~~~~~~~~~~~~~~

Default: 5.0

TTL in seconds for caching broker queue stats. Set to 0 to disable caching.
When many queues are configured (e.g. 10,000+), caching avoids re-fetching
queue lengths from the broker on every page load or API call.
38 changes: 33 additions & 5 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,17 +391,45 @@ async def get(self):
:statuscode 503: result backend is not configured
"""
app = self.application
limit = self.get_argument('limit', default=None, type=int)
offset = self.get_argument('offset', default=0, type=int)
offset = max(offset, 0)

if limit is not None and limit < 0:
raise HTTPError(400, "Query argument 'limit' must be a non-negative integer")

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=self.capp.conf.broker_transport_options,
broker_use_ssl=self.capp.conf.broker_use_ssl)
queue_names = self.get_active_queue_names()
names_key = frozenset(queue_names)

# Check cache first
queues = app.get_cached_queue_stats(names_key)
if queues is None:
with app.capp.connection() as conn:
broker_uri = conn.as_uri(include_password=True)
broker = Broker(broker_uri,
http_api=http_api, broker_options=self.capp.conf.broker_transport_options,
broker_use_ssl=self.capp.conf.broker_use_ssl)

try:
queues = await broker.queues(queue_names)
app.set_queue_cache(names_key, queues)
finally:
if hasattr(broker, 'close'):
broker.close()

total = len(queues)

# Apply pagination
if offset:
queues = queues[offset:]
if limit is not None:
queues = queues[:limit]

queues = await broker.queues(self.get_active_queue_names())
self.write({'active_queues': queues})
self.write({'active_queues': queues, 'total': total})


class ListTasks(BaseTaskHandler):
Expand Down
20 changes: 20 additions & 0 deletions flower/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import copy
import sys
import logging
import time

from concurrent.futures import ThreadPoolExecutor

Expand Down Expand Up @@ -64,6 +66,8 @@ def __init__(self, options=None, capp=None, events=None,
max_workers_in_memory=self.options.max_workers,
max_tasks_in_memory=self.options.max_tasks)
self.started = False
self._queue_cache = None # (timestamp, frozenset(names), result)
self._queue_cache_ttl = getattr(self.options, 'queue_cache_ttl', 5.0)

def start(self):
self.events.start()
Expand Down Expand Up @@ -101,3 +105,19 @@ def workers(self):

def update_workers(self, workername=None):
return self.inspector.inspect(workername)

def get_cached_queue_stats(self, names_key):
"""Return cached queue stats if still valid, else None.

Returns a deep copy to prevent callers from mutating the cache."""
if self._queue_cache_ttl <= 0 or self._queue_cache is None:
return None
ts, cached_key, result = self._queue_cache
if cached_key == names_key and (time.time() - ts) < self._queue_cache_ttl:
return copy.deepcopy(result)
return None

def set_queue_cache(self, names_key, result):
"""Store queue stats in the cache."""
if self._queue_cache_ttl > 0:
self._queue_cache = (time.time(), names_key, result)
2 changes: 2 additions & 0 deletions flower/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
define("url_prefix", type=str, help="base url prefix")
define("task_runtime_metric_buckets", type=float, default=Histogram.DEFAULT_BUCKETS,
multiple=True, help="histogram latency bucket value")
define("queue_cache_ttl", type=float, default=5.0,
help="TTL in seconds for caching broker queue stats (0 to disable)")


default_options = options
54 changes: 43 additions & 11 deletions flower/utils/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ async def queues(self, names):
try:
response = await http_client.fetch(
url, auth_username=username, auth_password=password,
connect_timeout=1.0, request_timeout=2.0,
connect_timeout=5.0, request_timeout=30.0,
validate_cert=False)
except (socket.error, httpclient.HTTPError) as e:
logger.error("RabbitMQ management API call failed: %s", e)
return []
finally:
http_client.close()

if response.code == 200:
info = json.loads(response.body.decode())
return [x for x in info if x['name'] in names]
names_set = frozenset(names)
return [x for x in info if x['name'] in names_set]
response.rethrow()
return []

@classmethod
def validate_http_api(cls, http_api):
Expand All @@ -102,21 +102,53 @@ def __init__(self, broker_url, *_, **kwargs):
self.sep = broker_options.get('sep', self.DEFAULT_SEP)
self.broker_prefix = broker_options.get('global_keyprefix', '')

def close(self):
"""Close the Redis connection and release resources."""
if self.redis is not None:
try:
if hasattr(self.redis, 'close'):
self.redis.close()
elif hasattr(self.redis, 'connection_pool'):
self.redis.connection_pool.disconnect()
except Exception:
logger.debug("Error closing Redis connection", exc_info=True)
self.redis = None

def _q_for_pri(self, queue, pri):
if pri not in self.priority_steps:
raise ValueError('Priority not in priority steps')
# pylint: disable=consider-using-f-string
return '{0}{1}{2}'.format(*((queue, self.sep, pri) if pri else (queue, '', '')))

_PIPELINE_CHUNK_SIZE = 5000

async def queues(self, names):
queue_stats = []
if not names:
return []

steps = len(self.priority_steps)

# Build all Redis key names upfront
all_keys = []
for name in names:
priority_names = [self.broker_prefix + self._q_for_pri(
name, pri) for pri in self.priority_steps]
queue_stats.append({
'name': name,
'messages': sum((self.redis.llen(x) for x in priority_names))
})
for pri in self.priority_steps:
all_keys.append(self.broker_prefix + self._q_for_pri(name, pri))

# Execute pipelined LLEN in chunks to avoid overwhelming Redis
# with a single huge pipeline for very large queue counts.
all_results = []
chunk_size = self._PIPELINE_CHUNK_SIZE
for start in range(0, len(all_keys), chunk_size):
pipe = self.redis.pipeline(transaction=False)
for key in all_keys[start:start + chunk_size]:
pipe.llen(key)
all_results.extend(pipe.execute())

queue_stats = []
for i, name in enumerate(names):
offset = i * steps
total = sum(all_results[offset:offset + steps])
queue_stats.append({'name': name, 'messages': total})
return queue_stats


Expand Down
42 changes: 29 additions & 13 deletions flower/views/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,35 @@ async def get(self):
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

try:
broker = Broker(app.capp.connection(connect_timeout=1.0).as_uri(include_password=True),
http_api=http_api, broker_options=self.capp.conf.broker_transport_options,
broker_use_ssl=self.capp.conf.broker_use_ssl)
except NotImplementedError as exc:
raise web.HTTPError(
404, f"'{app.transport}' broker is not supported") from exc

try:
queues = await broker.queues(self.get_active_queue_names())
except Exception as e:
logger.error("Unable to get queues: '%s'", e)
queue_names = self.get_active_queue_names()
names_key = frozenset(queue_names)

# Get broker URI once — reuse for both Broker creation and display
with app.capp.connection(connect_timeout=1.0) as conn:
broker_uri = conn.as_uri(include_password=True)
broker_url = conn.as_uri()

# Check cache first
queues = app.get_cached_queue_stats(names_key)
if queues is None:
try:
broker = Broker(broker_uri,
http_api=http_api, broker_options=self.capp.conf.broker_transport_options,
broker_use_ssl=self.capp.conf.broker_use_ssl)
except NotImplementedError as exc:
raise web.HTTPError(
404, f"'{app.transport}' broker is not supported") from exc

queues = []
try:
queues = await broker.queues(queue_names)
app.set_queue_cache(names_key, queues)
except Exception as e:
logger.error("Unable to get queues: '%s'", e)
finally:
if hasattr(broker, 'close'):
broker.close()

self.render("broker.html",
broker_url=app.capp.connection().as_uri(),
broker_url=broker_url,
queues=queues)
75 changes: 75 additions & 0 deletions tests/unit/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import time
import unittest

import celery
from tornado.ioloop import IOLoop
from tornado.options import options

from flower import command # noqa: F401 side effect - define options
from flower.app import Flower
from flower.events import Events
from flower.urls import handlers, settings


class TestQueueCache(unittest.TestCase):
def setUp(self):
capp = celery.Celery()
events = Events(capp, IOLoop.current())
self.app = Flower(capp=capp, events=events,
options=options, handlers=handlers, **settings)
self.app._queue_cache_ttl = 5.0

def test_cache_miss_returns_none(self):
result = self.app.get_cached_queue_stats(frozenset(['q1', 'q2']))
self.assertIsNone(result)

def test_cache_hit_returns_copy(self):
names_key = frozenset(['q1', 'q2'])
data = [{'name': 'q1', 'messages': 5}, {'name': 'q2', 'messages': 10}]
self.app.set_queue_cache(names_key, data)

result = self.app.get_cached_queue_stats(names_key)
self.assertEqual(result, data)
self.assertIsNot(result, data)

def test_cache_returns_deep_copy_to_prevent_mutation(self):
"""Mutating dict elements in the returned list must not affect the cache."""
names_key = frozenset(['q1'])
data = [{'name': 'q1', 'messages': 5}]
self.app.set_queue_cache(names_key, data)

result = self.app.get_cached_queue_stats(names_key)
result[0]['messages'] = 999

result2 = self.app.get_cached_queue_stats(names_key)
self.assertEqual(result2[0]['messages'], 5)

def test_cache_expires_after_ttl(self):
names_key = frozenset(['q1'])
data = [{'name': 'q1', 'messages': 5}]
self.app.set_queue_cache(names_key, data)

ts, key, result = self.app._queue_cache
self.app._queue_cache = (ts - 10.0, key, result)

self.assertIsNone(self.app.get_cached_queue_stats(names_key))

def test_cache_miss_on_different_names(self):
names_key = frozenset(['q1'])
data = [{'name': 'q1', 'messages': 5}]
self.app.set_queue_cache(names_key, data)

different_key = frozenset(['q1', 'q2'])
self.assertIsNone(self.app.get_cached_queue_stats(different_key))

def test_cache_disabled_when_ttl_zero(self):
self.app._queue_cache_ttl = 0
names_key = frozenset(['q1'])
data = [{'name': 'q1', 'messages': 5}]
self.app.set_queue_cache(names_key, data)

self.assertIsNone(self.app.get_cached_queue_stats(names_key))


if __name__ == '__main__':
unittest.main()
Loading