From 6a0754e29e4b45278f15ba0dd42dbb2d9ff279e0 Mon Sep 17 00:00:00 2001 From: voetberg Date: Mon, 16 Mar 2026 11:25:45 -0500 Subject: [PATCH 1/5] Common: Update to python3 * Change hashbang * Remove future_print Issue: probes#127 --- common/check_transfer_queues_status | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index 15f94b9b..c6278ac6 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,8 +14,6 @@ """ Probe to check the queues of the transfer service """ -from __future__ import print_function - import sys from prometheus_client import CollectorRegistry, Gauge, push_to_gateway From f0b57846319cc2d1d17219c598870803fe501eb7 Mon Sep 17 00:00:00 2001 From: voetberg Date: Mon, 16 Mar 2026 11:26:41 -0500 Subject: [PATCH 2/5] Common: Update copyright statement Issue: probes#127 --- common/check_transfer_queues_status | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index c6278ac6..5a725099 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -1,15 +1,17 @@ #!/usr/bin/env python3 -# Copyright European Organization for Nuclear Research (CERN) 2013 +# Copyright European Organization for Nuclear Research (CERN) since 2012 # # Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Authors: -# - Mario Lassnig, , 2013-2021 -# - Cedric Serfon, , 2014 -# - Wen Guan, , 2015 -# - Thomas Beermann, , 2019 +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """ Probe to check the queues of the transfer service From dac6da805aa1452c30882cb072639b5851240089 Mon Sep 17 00:00:00 2001 From: voetberg Date: Mon, 16 Mar 2026 11:53:23 -0500 Subject: [PATCH 3/5] Common: Replace query with SQLA2.0 Issue: probes#127 --- common/check_transfer_queues_status | 61 +++++++++++------------------ 1 file changed, 23 insertions(+), 38 deletions(-) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index 5a725099..f2b86f7a 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -17,46 +17,19 @@ Probe to check the queues of the transfer service """ import sys +from urllib.parse import urlparse from prometheus_client import CollectorRegistry, Gauge, push_to_gateway from rucio.common.config import config_get -from rucio.db.sqla.session import BASE, get_session +from rucio.db.sqla import models +from rucio.db.sqla.session import get_session +from sqlalchemy import func, select from utils.common import probe_metrics # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -if BASE.metadata.schema: - schema = BASE.metadata.schema + '.' -else: - schema = '' - -active_queue = """SELECT -CASE - WHEN state = 'S' THEN 'queues.requests.submitted.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'Q' THEN 'queues.requests.queued.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'F' THEN 'queues.requests.failed.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'D' THEN 'queues.requests.done.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'L' THEN 'queues.requests.lost.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'W' THEN 'queues.requests.waiting.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'M' THEN 'queues.requests.mismatchscheme.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'G' THEN 'queues.requests.submitting.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'N' THEN 'queues.requests.nosources.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'O' THEN 'queues.requests.onlytapesources.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'A' THEN 'queues.requests.submissionfailed.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'U' THEN 'queues.requests.suspend.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - WHEN state = 'P' THEN 'queues.requests.preparing.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host') - ELSE state -END state_desc, -num_rows -FROM -( -select state, count(*) num_rows, activity, external_host -FROM {schema}requests -GROUP BY state, activity, external_host -)""".format(schema=schema) - PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') if PROM_SERVERS != '': PROM_SERVERS = PROM_SERVERS.split(',') @@ -66,14 +39,26 @@ if __name__ == "__main__": registry = CollectorRegistry() g = Gauge('conveyor_queues_requests', '', labelnames=('state', 'activity', 'external_host'), registry=registry) session = get_session() + active_queue = select( + models.Request.state, + models.Request.activity, + models.Request.external_host, + func.count() + ).group_by( + models.Request.state, + models.Request.activity, + models.Request.external_host + ) for k in session.execute(active_queue).fetchall(): - print(k[0], k[1], end=" ") - probe_metrics.gauge(name=k[0].replace('-', '_')).set(k[1]) - items = k[0].split('.') - state = items[2] - activity = items[3] - external_host = items[4].replace('-', '_') - g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k[1]) + state = k.state.name.lower() + activity = k.activity.replace(" ", "-") + if k.external_host is not None: + external_host = urlparse(k.external_host).hostname.replace(".", "_") + else: + external_host = 'no_fts_host' + print(f"{state}.{activity}.{external_host}", k.count) + probe_metrics.gauge(name=state).set(k.count) + g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k.count) if len(PROM_SERVERS): for server in PROM_SERVERS: try: From 94b24544842e43a132e56f3f881e4f47be026a11 Mon Sep 17 00:00:00 2001 From: voetberg Date: Mon, 16 Mar 2026 12:01:09 -0500 Subject: [PATCH 4/5] Common: Replace old promethesus methods Closes: probes #127 --- common/check_transfer_queues_status | 44 +++++++++++++---------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index f2b86f7a..ec6addc5 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -19,25 +19,18 @@ Probe to check the queues of the transfer service import sys from urllib.parse import urlparse -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla import models from rucio.db.sqla.session import get_session from sqlalchemy import func, select -from utils.common import probe_metrics +from utils.common import PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') if __name__ == "__main__": try: - registry = CollectorRegistry() - g = Gauge('conveyor_queues_requests', '', labelnames=('state', 'activity', 'external_host'), registry=registry) session = get_session() active_queue = select( models.Request.state, @@ -49,22 +42,25 @@ if __name__ == "__main__": models.Request.activity, models.Request.external_host ) - for k in session.execute(active_queue).fetchall(): - state = k.state.name.lower() - activity = k.activity.replace(" ", "-") - if k.external_host is not None: - external_host = urlparse(k.external_host).hostname.replace(".", "_") - else: - external_host = 'no_fts_host' - print(f"{state}.{activity}.{external_host}", k.count) - probe_metrics.gauge(name=state).set(k.count) - g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k.count) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_transfer_queues_status', registry=registry) - except: - continue + with PrometheusPusher() as manager: + for k in session.execute(active_queue).fetchall(): + state = k.state.name.lower() + activity = k.activity.replace(" ", "-") + if k.external_host is not None: + external_host = urlparse(k.external_host).hostname.replace(".", "_") + else: + external_host = 'no_fts_host' + print(f"{state}.{activity}.{external_host}", k.count) + + manager.gauge( + name = 'conveyor_queues_requests.{activity}.{state}.{external_host}', + documentation='' + ).labels( + activity=activity, + state=state, + external_host=external_host + ).set(k.count) + except: sys.exit(UNKNOWN) sys.exit(OK) From e954080c4d924e2fa469f24865c360914ad8bc92 Mon Sep 17 00:00:00 2001 From: voetberg Date: Mon, 16 Mar 2026 12:02:14 -0500 Subject: [PATCH 5/5] Common: Add documentation for probe Issue: probes#127 --- common/check_transfer_queues_status | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index ec6addc5..3d43c97d 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -54,7 +54,7 @@ if __name__ == "__main__": manager.gauge( name = 'conveyor_queues_requests.{activity}.{state}.{external_host}', - documentation='' + documentation='Queued request for conveyor daemons by activity, state, and host.' ).labels( activity=activity, state=state,