Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 18 additions & 27 deletions common/check_expired_dids
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) 2013
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -8,51 +8,42 @@
# Authors:
# - Vincent Garonne, <vincent.garonne@cern.ch>, 2013
# - Thomas Beermann, <thomas.beermann@cern.ch>, 2019
# - Eric Vaandering <ewv@fnal.gov>, 2020-2021
# - Eric Vaandering <ewv@fnal.gov>, 2020-2022
# - Maggie Voetberg <maggiev@fnal.gov>, 2024

"""
Probe to check the backlog of expired dids.
"""
from __future__ import print_function

import sys
import traceback
from datetime import datetime
from sqlalchemy import func

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get
from rucio.db.sqla import models
from rucio.db.sqla.session import get_session
from rucio.db.sqla.util import get_count

from utils.common import probe_metrics
from utils.common import PrometheusPusher


# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":
try:
registry = CollectorRegistry()
session = get_session()
query = session.query(models.DataIdentifier.scope).filter(models.DataIdentifier.expired_at.isnot(None),
models.DataIdentifier.expired_at < datetime.utcnow())
result = get_count(query)
# Possible check against a threshold. If result > max_value then sys.exit(CRITICAL)
probe_metrics.gauge(name='undertaker.expired_dids').set(result)
Gauge('undertaker_expired_dids', '', registry=registry).set(result)

if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_expired_dids', registry=registry)
except:
continue

print(result)
with PrometheusPusher() as manager:

query = (session.query(func.count(models.DataIdentifier.scope))
.filter(models.DataIdentifier.expired_at.isnot(None),
models.DataIdentifier.expired_at < datetime.utcnow()))
result = query.scalar() or 0
# Possible check against a threshold. If result > max_value then sys.exit(CRITICAL)

manager.gauge('expired_dids.total',
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does change the metric value. IIRC, ATLAS is ok with this in theory as we wanted to make things more sensical. But I'd like @dchristidis thoughts before we start using this for CMS.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should be generally acceptable. But please add a comprehensive list of such changes in the commit message.

documentation="All expired dids"
).set(result)

except:
print(traceback.format_exc())
sys.exit(UNKNOWN)
Expand Down
171 changes: 78 additions & 93 deletions common/check_fts_backlog
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
"""
Copyright European Organization for Nuclear Research (CERN) 2013

Expand All @@ -9,27 +9,23 @@
Authors:
- Cedric Serfon, <cedric.serfon@cern.ch>, 2014-2018
- Mario Lassnig, <mario.lassnig@cern.ch>, 2015
- Eric Vaandering, <ewv@fnal.gov>, 2019-2021
- Eric Vaandering, <ewv@fnal.gov>, 2019-2022
- Thomas Beermann, <thomas.beermann@cern.ch>, 2019
- Maggie Voetberg, <maggiev@fnal.gov>, 2024
"""
from __future__ import print_function

import os
import sys
from urllib.parse import urlparse

import requests
import urllib3
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get, config_get_bool
from rucio.core.distance import update_distances
from rucio.db.sqla.session import BASE, get_session

from utils.common import probe_metrics
from utils.common import PrometheusPusher


OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

Expand All @@ -40,10 +36,6 @@ if BASE.metadata.schema:
else:
schema = ''

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":

se_matrix = {}
Expand Down Expand Up @@ -82,87 +74,81 @@ if __name__ == "__main__":
except Exception as error:
UPDATE_DIST = True

registry = CollectorRegistry()
g = Gauge('fts_submitted', '', labelnames=('hostname',), registry=registry)
errmsg = ''
for ftshost in FTSHOSTS.split(','):
print("=== %s ===" % ftshost)
parsed_url = urlparse(ftshost)
scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port
retvalue = CRITICAL
url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO)
busy_channels = []
busylimit = 5000
for attempt in range(0, 5):
result = None
try:
result = requests.get(url, verify=False, cert=(PROXY, PROXY))
res = result.json()
for channel in res['overview']['items']:
src = channel['source_se']
dst = channel['dest_se']
if (src, dst) not in se_matrix:
se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0,
'transfer_speed': 0, 'mbps_link': 0}
for state in ['submitted', 'active', 'finished', 'failed']:
with PrometheusPusher() as manager:

errmsg = ''
for ftshost in FTSHOSTS.split(','):
print(f"=== {ftshost} ===")
parsed_url = urlparse(ftshost)
scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port
retvalue = CRITICAL
url = f'{ftshost}/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo={VO}'
busy_channels = []
busylimit = 5000
for attempt in range(0, 5):
result = None
try:
result = requests.get(url, verify=False, cert=(PROXY, PROXY))
res = result.json()
for channel in res['overview']['items']:
src = channel['source_se']
dst = channel['dest_se']
if (src, dst) not in se_matrix:
se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0,
'transfer_speed': 0, 'mbps_link': 0}
for state in ['submitted', 'active', 'finished', 'failed']:
try:
se_matrix[(src, dst)][state] += channel[state]
except Exception:
pass
try:
se_matrix[(src, dst)][state] += channel[state]
se_matrix[(src, dst)]['transfer_speed'] += channel['current']
se_matrix[(src, dst)]['mbps_link'] += channel['current']
except Exception:
pass
try:
se_matrix[(src, dst)]['transfer_speed'] += channel['current']
se_matrix[(src, dst)]['mbps_link'] += channel['current']
except Exception:
pass
if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit:
url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % (ftshost, VO,
src, dst)
activities = {}
try:
s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY))
for key, val in s.json().items():
activities[key] = val['SUBMITTED']
except Exception as error:
pass
busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'],
'activities': activities})
summary = res['summary']
hostname = hostname.replace('.', '_')
print('%s : Submitted : %s' % (hostname, summary['submitted']))
print('%s : Active : %s' % (hostname, summary['active']))
print('%s : Staging : %s' % (hostname, summary['staging']))
print('%s : Started : %s' % (hostname, summary['started']))
if busy_channels != []:
print('Busy channels (>%s submitted):' % busylimit)
for bc in busy_channels:
activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()])
print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'],
str(activities_str)))
probe_metrics.gauge('fts3.{hostname}.submitted').labels(hostname=hostname).set(summary['submitted']
+ summary['active']
+ summary['staging']
+ summary['started'])

g.labels(**{'hostname': hostname}).set((summary['submitted'] + summary['active'] + summary['staging'] + summary['started']))
retvalue = OK
break
except Exception as error:
retvalue = CRITICAL
if result and result.status_code:
errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % (
ftshost, str(result.status_code), str(error))
else:
errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error))
if retvalue == CRITICAL:
print("All attempts failed. %s" % errmsg)
WORST_RETVALUE = max(retvalue, WORST_RETVALUE)

if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_fts_backlog', registry=registry)
except:
continue

if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit:
url_activities = f'{ftshost}/fts3/ftsmon/config/activities/{VO}?source_se={src}&dest_se={dst}'
activities = {}
try:
s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY))
for key, val in s.json().items():
activities[key] = val['SUBMITTED']
except Exception as error:
pass
busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'],
'activities': activities})
summary = res['summary']
hostname = hostname.replace('.', '_')

for state in ['submitted', 'active', 'staging', 'started']:
print(f'{hostname} : {state.capitalize()} : {summary[state]}')


if busy_channels != []:
print(f'Busy channels (>{busylimit} submitted):')
for bc in busy_channels:
activities_str = ", ".join([(f"{key}: {val}") for key, val in bc['activities'].items()])
print(f'{bc['src']} to {bc['dst']} : {bc['submitted']} submitted jobs {activities_str}')
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not compile. Did you mean to use double quotes here?
Also at other places for fstrings.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a linter got a little excited and undid my work - thanks for catching it


# Add to metrics
backlog_count = summary['submitted'] + summary['active'] + summary['staging'] + summary['started']
manager.gauge("fts_backlog.submitted.{hostname}",
documentation="All submitted, active, staged, or stated in FTS queue"
).labels(hostname=hostname).set(backlog_count)

retvalue = OK
break
except Exception as error:
retvalue = CRITICAL
if result and result.status_code:
errmsg = f'Error when trying to get info from {ftshost} : HTTP status code {result.status_code}. {error}'
else:
errmsg = f'Error when trying to get info from {ftshost}. {error}'
if retvalue == CRITICAL:
print(f"All attempts failed. {errmsg}")
WORST_RETVALUE = max(retvalue, WORST_RETVALUE)


if not UPDATE_DIST:
sys.exit(WORST_RETVALUE)
Expand Down Expand Up @@ -193,7 +179,6 @@ if __name__ == "__main__":
for source_rse, dest_rse in se_matrix:
for source_rse_id in se_map[source_rse]:
for dest_rse_id in se_map[dest_rse]:
# print source_rse_id, dest_rse_id, se_matrix[(source_rse, dest_rse)]
update_distances(src_rse_id=source_rse_id, dest_rse_id=dest_rse_id,
parameters=se_matrix[(source_rse, dest_rse)], session=None)
sys.exit(WORST_RETVALUE)
43 changes: 18 additions & 25 deletions common/check_messages_to_submit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) 2013
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -8,19 +8,18 @@
# Authors:
# - Mario Lassnig, <mario.lassnig@cern.ch>, 2013-2014
# - Thomas Beermann, <thomas.beermann@cern.ch>, 2019
# - Eric Vaandering, <ewv@fnal.gov>, 2022
# - Maggie Voetberg, <maggiev@fnal.gov>, 2024

"""
Probe to check the queues of messages to submit by Hermes to the broker
"""
from __future__ import print_function

import sys
from sqlalchemy.sql import text as sql_text

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get
from rucio.db.sqla.session import BASE, get_session

from utils.common import probe_metrics
from utils.common import PrometheusPusher

# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3
Expand All @@ -32,31 +31,25 @@ else:

queue_sql = """SELECT COUNT(*) FROM {schema}messages""".format(schema=schema)

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":
try:
registry = CollectorRegistry()
session = get_session()
result = session.execute(queue_sql).fetchall()
print('queues.messages %s' % result[0][0])
probe_metrics.gauge(name='queues.messages').set(result[0][0])
Gauge('hermes_queues_messages', '', registry=registry).set(result[0][0])

if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_messages_to_submit', registry=registry)
except:
continue

if result[0][0] > 100000:
with PrometheusPusher() as manager:
result = session.execute(sql_text(queue_sql)).fetchall()
message_count = result[0][0]
print(f"Messages in queue: {message_count}")

manager.gauge(
"messages_to_submit.queues.messages",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this changes the metric value. The job label in prometheus will say where this comes from, but it still might be a good idea.

documentation="Messages in queue, to submit"
).set(message_count)

if message_count > 100000:
sys.exit(WARNING)
elif result[0][0] > 1000000:
elif message_count > 1000000:
sys.exit(CRITICAL)

except Exception as e:
print(f"Error: {e}")
sys.exit(UNKNOWN)
sys.exit(OK)
Loading