From 82a559ef67daa51c42a5ea82c222bde6f5e32b56 Mon Sep 17 00:00:00 2001 From: ShubhAtWork Date: Tue, 3 Mar 2026 12:40:54 +0530 Subject: [PATCH] Fix connection leaks in broker and transport access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Several places call `capp.connection()` without using a context manager, creating AMQP/Redis connections that are never closed. Each leaked connection holds open a socket and file descriptor. Under sustained use (e.g. the Broker view refreshing, or the `/api/queues/length` endpoint being polled by monitoring), this causes: - Gradual file descriptor exhaustion (hitting OS `ulimit`) - Socket leak warnings from the broker client library - Eventual `Too many open files` errors that crash the process Specific leaks fixed: 1. **`Flower.transport` property** — called on every request that checks the broker type. Opens a connection, reads the transport driver type, then discards the connection object without closing. Fixed by caching the result (transport type never changes at runtime) and using a context manager for the initial lookup. 2. **`print_banner()`** — `app.connection().as_uri()` opens a connection to format the broker URL for the startup log. Never closed. Fixed with a context manager. 3. **`BrokerView.get()`** — two separate `app.capp.connection()` calls: one to create the Broker object, another to get the display URL. Neither closed. Fixed by using a single context-managed connection for both. 4. **`GetQueueLengths.get()`** — `app.capp.connection().as_uri()` opens a connection to get the broker URI. Never closed. Fixed with a context manager. 5. **`WorkersView.get()`** — `self.application.capp.connection().as_uri()` opens a connection on every non-JSON page render. Never closed. Fixed with a context manager. Co-Authored-By: Claude Opus 4.6 --- flower/api/tasks.py | 4 +++- flower/app.py | 6 +++++- flower/command.py | 3 ++- flower/views/broker.py | 8 ++++++-- flower/views/workers.py | 4 +++- 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/flower/api/tasks.py b/flower/api/tasks.py index 730c290e4..51b713b57 100644 --- a/flower/api/tasks.py +++ b/flower/api/tasks.py @@ -396,7 +396,9 @@ async def get(self): if app.transport == 'amqp' and app.options.broker_api: http_api = app.options.broker_api - broker = Broker(app.capp.connection().as_uri(include_password=True), + with app.capp.connection() as conn: + broker_uri = conn.as_uri(include_password=True) + broker = Broker(broker_uri, http_api=http_api, broker_options=self.capp.conf.broker_transport_options, broker_use_ssl=self.capp.conf.broker_use_ssl) diff --git a/flower/app.py b/flower/app.py index 3427e098a..cf5f38400 100644 --- a/flower/app.py +++ b/flower/app.py @@ -64,6 +64,7 @@ def __init__(self, options=None, capp=None, events=None, max_workers_in_memory=self.options.max_workers, max_tasks_in_memory=self.options.max_tasks) self.started = False + self._transport = None def start(self): self.events.start() @@ -93,7 +94,10 @@ def stop(self): @property def transport(self): - return getattr(self.capp.connection().transport, 'driver_type', None) + if self._transport is None: + with self.capp.connection() as conn: + self._transport = getattr(conn.transport, 'driver_type', None) + return self._transport @property def workers(self): diff --git a/flower/command.py b/flower/command.py index 94ed6c7b6..31a1631e7 100644 --- a/flower/command.py +++ b/flower/command.py @@ -173,7 +173,8 @@ def print_banner(app, ssl): else: logger.info("Visit me via unix socket file: %s", options.unix_socket) - logger.info('Broker: %s', app.connection().as_uri()) + with app.connection() as conn: + logger.info('Broker: %s', conn.as_uri()) logger.info( 'Registered tasks: \n%s', pformat(sorted(app.tasks.keys())) diff --git a/flower/views/broker.py b/flower/views/broker.py index 75f6c9b3f..88cccefee 100644 --- a/flower/views/broker.py +++ b/flower/views/broker.py @@ -17,8 +17,12 @@ async def get(self): if app.transport == 'amqp' and app.options.broker_api: http_api = app.options.broker_api + with app.capp.connection(connect_timeout=1.0) as conn: + broker_uri = conn.as_uri(include_password=True) + broker_url = conn.as_uri() + try: - broker = Broker(app.capp.connection(connect_timeout=1.0).as_uri(include_password=True), + broker = Broker(broker_uri, http_api=http_api, broker_options=self.capp.conf.broker_transport_options, broker_use_ssl=self.capp.conf.broker_use_ssl) except NotImplementedError as exc: @@ -31,5 +35,5 @@ async def get(self): logger.error("Unable to get queues: '%s'", e) self.render("broker.html", - broker_url=app.capp.connection().as_uri(), + broker_url=broker_url, queues=queues) diff --git a/flower/views/workers.py b/flower/views/workers.py index defd0469a..0fdb9287b 100644 --- a/flower/views/workers.py +++ b/flower/views/workers.py @@ -69,9 +69,11 @@ async def get(self): if json: self.write(dict(data=list(workers.values()))) else: + with self.application.capp.connection() as conn: + broker_url = conn.as_uri() self.render("workers.html", workers=workers, - broker=self.application.capp.connection().as_uri(), + broker=broker_url, autorefresh=1 if self.application.options.auto_refresh else 0) @classmethod