Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,14 @@ Default: None
Sets the URI to which an OAuth 2.0 server redirects the user after successful authentication and authorization.

`oauth2_redirect_uri` option should be used with :ref:`auth`, :ref:`auth_provider`, :ref:`oauth2_key` and :ref:`oauth2_secret` options.

.. _queue_cache_ttl:

queue_cache_ttl
~~~~~~~~~~~~~~~

Default: 5.0

TTL in seconds for caching broker queue stats. Set to 0 to disable caching.
When many queues are configured (e.g. 10,000+), caching avoids re-fetching
queue lengths from the broker on every page load or API call.
37 changes: 32 additions & 5 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,17 +391,44 @@ async def get(self):
:statuscode 503: result backend is not configured
"""
app = self.application
limit = self.get_argument('limit', default=None, type=int)
offset = self.get_argument('offset', default=0, type=int)
offset = max(offset, 0)

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=self.capp.conf.broker_transport_options,
broker_use_ssl=self.capp.conf.broker_use_ssl)
queue_names = self.get_active_queue_names()
names_key = frozenset(queue_names)

queues = await broker.queues(self.get_active_queue_names())
self.write({'active_queues': queues})
# Check cache first
queues = app.get_cached_queue_stats(names_key)
if queues is None:
with app.capp.connection() as conn:
broker_uri = conn.as_uri(include_password=True)
broker = Broker(broker_uri,
http_api=http_api, broker_options=self.capp.conf.broker_transport_options,
broker_use_ssl=self.capp.conf.broker_use_ssl)

try:
queues = await broker.queues(queue_names)
app.set_queue_cache(names_key, queues)
finally:
if hasattr(broker, 'close'):
broker.close()

total = len(queues)

# Apply pagination
if offset:
queues = queues[offset:]
if limit is not None:
if limit < 0:
raise HTTPError(400, "Query argument 'limit' must be a non-negative integer")
queues = queues[:limit]

self.write({'active_queues': queues, 'total': total})


class ListTasks(BaseTaskHandler):
Expand Down
87 changes: 85 additions & 2 deletions flower/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import logging
import time

from concurrent.futures import ThreadPoolExecutor

Expand All @@ -8,6 +9,7 @@

from tornado import ioloop
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback
from tornado.web import url

from .urls import handlers as default_handlers
Expand Down Expand Up @@ -64,6 +66,10 @@ def __init__(self, options=None, capp=None, events=None,
max_workers_in_memory=self.options.max_workers,
max_tasks_in_memory=self.options.max_tasks)
self.started = False
self._transport = None
self._purge_timer = None
self._queue_cache = None # (timestamp, frozenset(names), result)
self._queue_cache_ttl = getattr(self.options, 'queue_cache_ttl', 5.0)

def start(self):
self.events.start()
Expand All @@ -80,11 +86,26 @@ def start(self):

self.started = True
self.update_workers()

if self.options.purge_offline_workers is not None:
interval_ms = max(self.options.purge_offline_workers * 1000, 10000)
self._purge_timer = PeriodicCallback(self._purge_offline_workers,
interval_ms)
self._purge_timer.start()

self.io_loop.start()

def stop(self):
if self.started:
self.events.stop()
try:
self.events.stop()
except Exception:
logger.debug("Error stopping events", exc_info=True)
if self._purge_timer:
try:
self._purge_timer.stop()
except Exception:
logger.debug("Error stopping purge timer", exc_info=True)
logging.debug("Stopping executors...")
self.executor.shutdown(wait=False)
logging.debug("Stopping event loop...")
Expand All @@ -93,11 +114,73 @@ def stop(self):

@property
def transport(self):
return getattr(self.capp.connection().transport, 'driver_type', None)
if self._transport is None:
with self.capp.connection() as conn:
self._transport = getattr(conn.transport, 'driver_type', None)
return self._transport

@property
def workers(self):
return self.inspector.workers

def update_workers(self, workername=None):
return self.inspector.inspect(workername)

def get_cached_queue_stats(self, names_key):
"""Return cached queue stats if still valid, else None.

Returns a shallow copy to prevent callers from mutating the cache."""
if self._queue_cache_ttl <= 0 or self._queue_cache is None:
return None
ts, cached_key, result = self._queue_cache
if cached_key == names_key and (time.time() - ts) < self._queue_cache_ttl:
return list(result)
return None
Comment on lines +129 to +138
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_cached_queue_stats() claims it returns a copy to prevent callers from mutating the cache, but list(result) only copies the list container—mutating any dict elements in the returned list will still mutate the cached objects. If the intent is immutability, return a deep copy (e.g., copy.deepcopy) or store/return an immutable structure (e.g., tuples / MappingProxyType).

Copilot uses AI. Check for mistakes.

def set_queue_cache(self, names_key, result):
"""Store queue stats in the cache."""
if self._queue_cache_ttl > 0:
self._queue_cache = (time.time(), names_key, result)

def _purge_offline_workers(self):
"""Purge workers that have been offline beyond the threshold.

Handles two cases:
- Workers present in state.workers: check alive status + heartbeat age
- Orphaned entries (in counter/inspector but not state.workers): always purge
"""
threshold = self.options.purge_offline_workers
if threshold is None:
return

now = time.time()
state = self.events.state

# Collect all known worker names from state.counter and inspector.workers
all_worker_names = set(state.counter.keys()) | set(self.inspector.workers.keys())

for worker_name in all_worker_names:
worker = state.workers.get(worker_name)
if worker is not None:
# Skip workers that are still alive
if worker.alive:
continue

# Check if the worker has been offline beyond the threshold
heartbeats = getattr(worker, 'heartbeats', [])
if heartbeats:
last_heartbeat = max(heartbeats)
if now - last_heartbeat <= threshold:
continue
# else: worker not in state.workers — orphaned entry, always purge

# Purge from state.counter
state.counter.pop(worker_name, None)

# Purge Prometheus metrics for this worker
state.metrics.remove_worker_metrics(worker_name)

# Purge from inspector
self.inspector.purge_worker(worker_name)

logger.debug("Purged offline worker: %s", worker_name)
3 changes: 2 additions & 1 deletion flower/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def print_banner(app, ssl):
else:
logger.info("Visit me via unix socket file: %s", options.unix_socket)

logger.info('Broker: %s', app.connection().as_uri())
with app.connection() as conn:
logger.info('Broker: %s', conn.as_uri())
logger.info(
'Registered tasks: \n%s',
pformat(sorted(app.tasks.keys()))
Expand Down
118 changes: 101 additions & 17 deletions flower/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import logging
import queue
import shelve
import threading
import time
Expand All @@ -17,6 +18,8 @@

PROMETHEUS_METRICS = None

MAX_RETRY_INTERVAL = 60


def get_prometheus_metrics():
global PROMETHEUS_METRICS # pylint: disable=global-statement
Expand Down Expand Up @@ -53,6 +56,30 @@ def __init__(self):
['worker']
)

def remove_worker_metrics(self, worker_name):
"""Remove all Prometheus metric label series for a given worker."""
metrics = [
self.events, self.runtime, self.prefetch_time,
self.number_of_prefetched_tasks, self.worker_online,
self.worker_number_of_currently_executing_tasks,
]
for metric in metrics:
# _metrics is the internal dict of label-value tuples -> child metrics.
# Guard access since it's a private attr that may vary across versions.
storage = getattr(metric, '_metrics', None)
if storage is None:
continue
try:
keys_to_remove = [
key for key in storage
if key and key[0] == worker_name
]
for key in keys_to_remove:
metric.remove(*key)
except Exception:
logger.debug("Failed to remove metrics for worker %s from %s",
worker_name, metric, exc_info=True)


class EventsState(State):
# EventsState object is created and accessed only from ioloop thread
Expand Down Expand Up @@ -112,6 +139,9 @@ def event(self, event):

class Events(threading.Thread):
events_enable_interval = 5000
_BACKPRESSURE_MAXSIZE = 10000
_DRAIN_INTERVAL_MS = 100
_DRAIN_BATCH_SIZE = 500

# pylint: disable=too-many-arguments
def __init__(self, capp, io_loop, db=None, persistent=False,
Expand All @@ -128,13 +158,21 @@ def __init__(self, capp, io_loop, db=None, persistent=False,
self.enable_events = enable_events
self.state = None
self.state_save_timer = None
self._drain_timer = None
self._event_queue = queue.Queue(maxsize=self._BACKPRESSURE_MAXSIZE)
self._drop_count = 0
self._last_drop_log_time = 0.0

if self.persistent:
logger.debug("Loading state from '%s'...", self.db)
state = shelve.open(self.db)
if state:
self.state = state['events']
state.close()
try:
with shelve.open(self.db) as state:
if state:
self.state = state['events']
except KeyError:
logger.debug("No existing state found in '%s'", self.db)
except Exception:
logger.error("Failed to load state from '%s'", self.db, exc_info=True)

if state_save_interval:
self.state_save_timer = PeriodicCallback(self.save_state,
Expand All @@ -156,23 +194,42 @@ def start(self):
logger.debug("Starting state save timer...")
self.state_save_timer.start()

self._drain_timer = PeriodicCallback(self._drain_events,
self._DRAIN_INTERVAL_MS)
self._drain_timer.start()

def stop(self):
if self.enable_events:
logger.debug("Stopping enable events timer...")
self.timer.stop()
try:
if self.enable_events:
logger.debug("Stopping enable events timer...")
try:
self.timer.stop()
except Exception:
logger.debug("Error stopping enable events timer", exc_info=True)

if self.state_save_timer:
logger.debug("Stopping state save timer...")
self.state_save_timer.stop()
if self.state_save_timer:
logger.debug("Stopping state save timer...")
try:
self.state_save_timer.stop()
except Exception:
logger.debug("Error stopping state save timer", exc_info=True)

if self.persistent:
self.save_state()
if self._drain_timer:
try:
self._drain_timer.stop()
except Exception:
logger.debug("Error stopping drain timer", exc_info=True)
finally:
if self.persistent:
self.save_state()

def run(self):
try_interval = 1
while True:
try:
try_interval *= 2
if try_interval > MAX_RETRY_INTERVAL:
try_interval = MAX_RETRY_INTERVAL

with self.capp.connection() as conn:
recv = EventReceiver(conn,
Expand All @@ -196,15 +253,42 @@ def run(self):

def save_state(self):
logger.debug("Saving state to '%s'...", self.db)
state = shelve.open(self.db, flag='n')
state['events'] = self.state
state.close()
try:
with shelve.open(self.db, flag='n') as state:
state['events'] = self.state
except Exception:
logger.error("Failed to save state to '%s'", self.db, exc_info=True)

def on_enable_events(self):
# Periodically enable events for workers
# launched after flower
self.io_loop.run_in_executor(None, self.capp.control.enable_events)

def on_event(self, event):
# Call EventsState.event in ioloop thread to avoid synchronization
self.io_loop.add_callback(partial(self.state.event, event))
# Enqueue event with backpressure — drop if queue is full.
# Rate-limit drop warnings to avoid flooding logs under sustained load.
try:
self._event_queue.put_nowait(event)
except queue.Full:
self._drop_count += 1
now = time.monotonic()
if now - self._last_drop_log_time >= 5.0:
window_start = self._last_drop_log_time or now
duration = now - window_start
logger.warning(
"Event queue full (%d), dropped %d event(s) in last %.0fs",
self._BACKPRESSURE_MAXSIZE, self._drop_count, duration)
self._drop_count = 0
self._last_drop_log_time = now

def _drain_events(self):
"""Process up to _DRAIN_BATCH_SIZE events from the backpressure queue."""
for _ in range(self._DRAIN_BATCH_SIZE):
try:
event = self._event_queue.get_nowait()
except queue.Empty:
break
try:
self.state.event(event)
except Exception:
logger.error("Error processing event", exc_info=True)
4 changes: 4 additions & 0 deletions flower/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def __init__(self, io_loop, capp, timeout):
self.timeout = timeout
self.workers = collections.defaultdict(dict)

def purge_worker(self, worker_name):
"""Remove a worker from the inspector's cached data."""
self.workers.pop(worker_name, None)

def inspect(self, workername=None):
feutures = []
for method in self.methods:
Expand Down
Loading