From 4f8992e955cacd1ccf23227cc7c9c3e538ddfcea Mon Sep 17 00:00:00 2001 From: voetberg Date: Fri, 1 Mar 2024 09:45:13 -0600 Subject: [PATCH] Common: Replace string formatter with f-strings, format context message uniformly #129 Changes to metric names include: * undertaker_expired_dids -> expired_dids.total * fts3.{hostname}.submitted -> fts_backlog.submitted.{hostname} * hermes_queues_messages.queues.messages -> messages_to_submit.queues.messages * transmogrifier_new_dids -> new_dids * judge_stuck_rules_without_missing_source_replica -> stuck_rules.{source_status} (source_status = [without_missing_source_replica, with_missing_source_replica]) * check_transfer_queues_status -> transfer_queues_status * judge.waiting_dids -> unevaluated_dids * reaper.unlocked_replicas -> unlocked_replicas.{replica_status} (replica_status = [expired, unlocked]) * judge.updated_dids -> updated_dids --- common/check_expired_dids | 45 +++----- common/check_fts_backlog | 171 +++++++++++++--------------- common/check_messages_to_submit | 43 +++---- common/check_new_dids | 41 +++---- common/check_stuck_rules | 47 ++++---- common/check_transfer_queues_status | 48 ++++---- common/check_unevaluated_dids | 34 ++---- common/check_unlocked_replicas | 51 +++------ common/check_updated_dids | 41 ++----- 9 files changed, 203 insertions(+), 318 deletions(-) diff --git a/common/check_expired_dids b/common/check_expired_dids index dd8229bc..5a655c9b 100755 --- a/common/check_expired_dids +++ b/common/check_expired_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,51 +8,42 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020-2021 +# - Eric Vaandering , 2020-2022 +# - Maggie Voetberg , 2024 """ Probe to check the backlog of expired dids. """ from __future__ import print_function - import sys import traceback from datetime import datetime +from sqlalchemy import func -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla import models from rucio.db.sqla.session import get_session -from rucio.db.sqla.util import get_count -from utils.common import probe_metrics +from utils.common import PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: - registry = CollectorRegistry() session = get_session() - query = session.query(models.DataIdentifier.scope).filter(models.DataIdentifier.expired_at.isnot(None), - models.DataIdentifier.expired_at < datetime.utcnow()) - result = get_count(query) - # Possible check against a threshold. If result > max_value then sys.exit(CRITICAL) - probe_metrics.gauge(name='undertaker.expired_dids').set(result) - Gauge('undertaker_expired_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_expired_dids', registry=registry) - except: - continue - - print(result) + with PrometheusPusher() as manager: + + query = (session.query(func.count(models.DataIdentifier.scope)) + .filter(models.DataIdentifier.expired_at.isnot(None), + models.DataIdentifier.expired_at < datetime.utcnow())) + result = query.scalar() or 0 + # Possible check against a threshold. If result > max_value then sys.exit(CRITICAL) + + manager.gauge('expired_dids.total', + documentation="All expired dids" + ).set(result) + except: print(traceback.format_exc()) sys.exit(UNKNOWN) diff --git a/common/check_fts_backlog b/common/check_fts_backlog index 5c85cd97..473df9f0 100755 --- a/common/check_fts_backlog +++ b/common/check_fts_backlog @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ Copyright European Organization for Nuclear Research (CERN) 2013 @@ -9,27 +9,23 @@ Authors: - Cedric Serfon, , 2014-2018 - Mario Lassnig, , 2015 - - Eric Vaandering, , 2019-2021 + - Eric Vaandering, , 2019-2022 - Thomas Beermann, , 2019 + - Maggie Voetberg, , 2024 """ -from __future__ import print_function import os import sys +from urllib.parse import urlparse import requests import urllib3 -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse - -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway from rucio.common.config import config_get, config_get_bool from rucio.core.distance import update_distances from rucio.db.sqla.session import BASE, get_session -from utils.common import probe_metrics +from utils.common import PrometheusPusher + OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -40,10 +36,6 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": se_matrix = {} @@ -82,87 +74,81 @@ if __name__ == "__main__": except Exception as error: UPDATE_DIST = True - registry = CollectorRegistry() - g = Gauge('fts_submitted', '', labelnames=('hostname',), registry=registry) - errmsg = '' - for ftshost in FTSHOSTS.split(','): - print("=== %s ===" % ftshost) - parsed_url = urlparse(ftshost) - scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port - retvalue = CRITICAL - url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO) - busy_channels = [] - busylimit = 5000 - for attempt in range(0, 5): - result = None - try: - result = requests.get(url, verify=False, cert=(PROXY, PROXY)) - res = result.json() - for channel in res['overview']['items']: - src = channel['source_se'] - dst = channel['dest_se'] - if (src, dst) not in se_matrix: - se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0, - 'transfer_speed': 0, 'mbps_link': 0} - for state in ['submitted', 'active', 'finished', 'failed']: + with PrometheusPusher() as manager: + + errmsg = '' + for ftshost in FTSHOSTS.split(','): + print(f"=== {ftshost} ===") + parsed_url = urlparse(ftshost) + scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port + retvalue = CRITICAL + url = f'{ftshost}/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo={VO}' + busy_channels = [] + busylimit = 5000 + for attempt in range(0, 5): + result = None + try: + result = requests.get(url, verify=False, cert=(PROXY, PROXY)) + res = result.json() + for channel in res['overview']['items']: + src = channel['source_se'] + dst = channel['dest_se'] + if (src, dst) not in se_matrix: + se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0, + 'transfer_speed': 0, 'mbps_link': 0} + for state in ['submitted', 'active', 'finished', 'failed']: + try: + se_matrix[(src, dst)][state] += channel[state] + except Exception: + pass try: - se_matrix[(src, dst)][state] += channel[state] + se_matrix[(src, dst)]['transfer_speed'] += channel['current'] + se_matrix[(src, dst)]['mbps_link'] += channel['current'] except Exception: pass - try: - se_matrix[(src, dst)]['transfer_speed'] += channel['current'] - se_matrix[(src, dst)]['mbps_link'] += channel['current'] - except Exception: - pass - if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit: - url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % (ftshost, VO, - src, dst) - activities = {} - try: - s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY)) - for key, val in s.json().items(): - activities[key] = val['SUBMITTED'] - except Exception as error: - pass - busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'], - 'activities': activities}) - summary = res['summary'] - hostname = hostname.replace('.', '_') - print('%s : Submitted : %s' % (hostname, summary['submitted'])) - print('%s : Active : %s' % (hostname, summary['active'])) - print('%s : Staging : %s' % (hostname, summary['staging'])) - print('%s : Started : %s' % (hostname, summary['started'])) - if busy_channels != []: - print('Busy channels (>%s submitted):' % busylimit) - for bc in busy_channels: - activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()]) - print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'], - str(activities_str))) - probe_metrics.gauge('fts3.{hostname}.submitted').labels(hostname=hostname).set(summary['submitted'] - + summary['active'] - + summary['staging'] - + summary['started']) - - g.labels(**{'hostname': hostname}).set((summary['submitted'] + summary['active'] + summary['staging'] + summary['started'])) - retvalue = OK - break - except Exception as error: - retvalue = CRITICAL - if result and result.status_code: - errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % ( - ftshost, str(result.status_code), str(error)) - else: - errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error)) - if retvalue == CRITICAL: - print("All attempts failed. %s" % errmsg) - WORST_RETVALUE = max(retvalue, WORST_RETVALUE) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_fts_backlog', registry=registry) - except: - continue + + if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit: + url_activities = f'{ftshost}/fts3/ftsmon/config/activities/{VO}?source_se={src}&dest_se={dst}' + activities = {} + try: + s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY)) + for key, val in s.json().items(): + activities[key] = val['SUBMITTED'] + except Exception as error: + pass + busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'], + 'activities': activities}) + summary = res['summary'] + hostname = hostname.replace('.', '_') + + for state in ['submitted', 'active', 'staging', 'started']: + print(f'{hostname} : {state.capitalize()} : {summary[state]}') + + + if busy_channels != []: + print(f'Busy channels (>{busylimit} submitted):') + for bc in busy_channels: + activities_str = ", ".join([(f"{key}: {val}") for key, val in bc['activities'].items()]) + print(f'{bc['src']} to {bc['dst']} : {bc['submitted']} submitted jobs {activities_str}') + + # Add to metrics + backlog_count = summary['submitted'] + summary['active'] + summary['staging'] + summary['started'] + manager.gauge("fts_backlog.submitted.{hostname}", + documentation="All submitted, active, staged, or stated in FTS queue" + ).labels(hostname=hostname).set(backlog_count) + + retvalue = OK + break + except Exception as error: + retvalue = CRITICAL + if result and result.status_code: + errmsg = f'Error when trying to get info from {ftshost} : HTTP status code {result.status_code}. {error}' + else: + errmsg = f'Error when trying to get info from {ftshost}. {error}' + if retvalue == CRITICAL: + print(f"All attempts failed. {errmsg}") + WORST_RETVALUE = max(retvalue, WORST_RETVALUE) + if not UPDATE_DIST: sys.exit(WORST_RETVALUE) @@ -193,7 +179,6 @@ if __name__ == "__main__": for source_rse, dest_rse in se_matrix: for source_rse_id in se_map[source_rse]: for dest_rse_id in se_map[dest_rse]: - # print source_rse_id, dest_rse_id, se_matrix[(source_rse, dest_rse)] update_distances(src_rse_id=source_rse_id, dest_rse_id=dest_rse_id, parameters=se_matrix[(source_rse, dest_rse)], session=None) sys.exit(WORST_RETVALUE) diff --git a/common/check_messages_to_submit b/common/check_messages_to_submit index 4400531e..b7b9ed69 100755 --- a/common/check_messages_to_submit +++ b/common/check_messages_to_submit @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,19 +8,18 @@ # Authors: # - Mario Lassnig, , 2013-2014 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 +# - Maggie Voetberg, , 2024 """ Probe to check the queues of messages to submit by Hermes to the broker """ -from __future__ import print_function import sys +from sqlalchemy.sql import text as sql_text -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla.session import BASE, get_session - -from utils.common import probe_metrics +from utils.common import PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,31 +31,25 @@ else: queue_sql = """SELECT COUNT(*) FROM {schema}messages""".format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: - registry = CollectorRegistry() session = get_session() - result = session.execute(queue_sql).fetchall() - print('queues.messages %s' % result[0][0]) - probe_metrics.gauge(name='queues.messages').set(result[0][0]) - Gauge('hermes_queues_messages', '', registry=registry).set(result[0][0]) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_messages_to_submit', registry=registry) - except: - continue - - if result[0][0] > 100000: + with PrometheusPusher() as manager: + result = session.execute(sql_text(queue_sql)).fetchall() + message_count = result[0][0] + print(f"Messages in queue: {message_count}") + + manager.gauge( + "messages_to_submit.queues.messages", + documentation="Messages in queue, to submit" + ).set(message_count) + + if message_count > 100000: sys.exit(WARNING) - elif result[0][0] > 1000000: + elif message_count > 1000000: sys.exit(CRITICAL) except Exception as e: + print(f"Error: {e}") sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_new_dids b/common/check_new_dids index 6f290cba..62376873 100755 --- a/common/check_new_dids +++ b/common/check_new_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,50 +8,37 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020 +# - Eric Vaandering, , 2020-2022 +# - Maggie Voetberg, , 2024 """ Probe to check the backlog of new dids. """ -from __future__ import print_function import sys import traceback -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from sqlalchemy import func from rucio.db.sqla import models from rucio.db.sqla.session import get_session -from rucio.db.sqla.util import get_count -from utils.common import probe_metrics +from utils.common import PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: - registry = CollectorRegistry() session = get_session() - query = (session.query(models.DataIdentifier.scope) - .with_hint(models.DataIdentifier, "INDEX_FFS(DIDS DIDS_IS_NEW_IDX)", 'oracle') - .filter(models.DataIdentifier.is_new.isnot(None))) - result = get_count(query) - probe_metrics.gauge(name='transmogrifier.new_dids').set(result) - Gauge('transmogrifier_new_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_new_dids', registry=registry) - except: - continue - - print(result) + with PrometheusPusher() as manager: + query = (session.query(func.count(models.DataIdentifier.scope)) + .with_hint(models.DataIdentifier, "INDEX_FFS(DIDS DIDS_IS_NEW_IDX)", 'oracle') + .filter(models.DataIdentifier.is_new.isnot(None))) + result = query.scalar() or 0 + + manager.gauge("new_dids").set(result) + except: print(traceback.format_exc()) sys.exit(UNKNOWN) diff --git a/common/check_stuck_rules b/common/check_stuck_rules index 84633ba6..420f3f4b 100755 --- a/common/check_stuck_rules +++ b/common/check_stuck_rules @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -7,22 +7,20 @@ # # Authors: # - Martin Barisits, , 2014 -# - Eric Vaandering, , 2019-2021 +# - Eric Vaandering, , 2019-2022 # - Thomas Beermann, , 2019 +# - Maggie Voetberg, , 2024 """ Probe to check the backlog of stuck rules. """ -from __future__ import print_function import sys import traceback +from sqlalchemy.sql import text as sql_text -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla.session import BASE, get_session - -from utils.common import probe_metrics +from utils.common import PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,33 +30,26 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: - registry = CollectorRegistry() session = get_session() - sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and (error !=\'MissingSourceReplica\' or error IS NULL)'.format( - schema=schema) - result = session.execute(sql).fetchone()[0] - probe_metrics.gauge(name='judge.stuck_rules_without_missing_source_replica').set(result) - Gauge('judge_stuck_rules_without_missing_source_replica', '', registry=registry).set(result) - sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and error =\'MissingSourceReplica\''.format( - schema=schema) - result = session.execute(sql).fetchone()[0] - probe_metrics.gauge(name='judge.stuck_rules_with_missing_source_replica').set(result) - Gauge('judge_stuck_rules_with_missing_source_replica', '', registry=registry).set(result) + without_missing = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and (error !=\'MissingSourceReplica\' or error IS NULL)'.format(schema=schema) + with_missing = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and error =\'MissingSourceReplica\''.format(schema=schema) + queries = { + "without_missing_source_replica": without_missing, + "with_missing_source_replica": with_missing + } + + with PrometheusPusher() as manager: + for source_status, query in queries.items(): + result = session.execute(sql_text(query)).fetchone()[0] + manager.gauge("stuck_rules.{source_status}", + documentation="Backlog of stuck rules" + ).labels(source_status=source_status).set(result) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_stuck_rules', registry=registry) - except: - continue except: print(traceback.format_exc()) sys.exit(UNKNOWN) + sys.exit(OK) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index 15f94b9b..1ff8e996 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -10,19 +10,18 @@ # - Cedric Serfon, , 2014 # - Wen Guan, , 2015 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 +# - Maggie Voetberg, , 2024 """ Probe to check the queues of the transfer service """ -from __future__ import print_function import sys +from sqlalchemy.sql import text as sql_text -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla.session import BASE, get_session - -from utils.common import probe_metrics +from utils.common import PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -57,29 +56,24 @@ FROM {schema}requests GROUP BY state, activity, external_host )""".format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: - registry = CollectorRegistry() - g = Gauge('conveyor_queues_requests', '', labelnames=('state', 'activity', 'external_host'), registry=registry) session = get_session() - for k in session.execute(active_queue).fetchall(): - print(k[0], k[1], end=" ") - probe_metrics.gauge(name=k[0].replace('-', '_')).set(k[1]) - items = k[0].split('.') - state = items[2] - activity = items[3] - external_host = items[4].replace('-', '_') - g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k[1]) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_transfer_queues_status', registry=registry) - except: - continue - except: + with PrometheusPusher() as manager: + query = session.execute(sql_text(active_queue)).fetchall() + for state_desc, count in query: + print(state_desc, count, end=" ") + + items = state_desc.split('.') + state = items[2] + activity = items[3] + external_host = items[4] + + manager.gauge( + "transfer_queues_status.{activity}.{state}.{external_host}" + ).labels(activity=activity, state=state, external_host=external_host).set(count) + + except Exception as e: + print(f"Error: {e}") sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_unevaluated_dids b/common/check_unevaluated_dids index d3428854..bebb6755 100755 --- a/common/check_unevaluated_dids +++ b/common/check_unevaluated_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,19 +8,17 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 """ Probe to check the backlog of dids waiting for rule evaluation. """ -from __future__ import print_function import sys +from sqlalchemy.sql import text as sql_text -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla.session import BASE, get_session - -from utils.common import probe_metrics +from utils.common import PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,27 +30,15 @@ else: count_sql = 'SELECT COUNT(*) FROM {schema}updated_dids'.format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: session = get_session() - result = session.execute(count_sql).fetchone()[0] - probe_metrics.gauge(name='judge.waiting_dids').set(result) - - registry = CollectorRegistry() - Gauge('judge_waiting_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_unevaluated_dids', registry=registry) - except: - continue + result = session.execute(sql_text(count_sql)).fetchone()[0] + + with PrometheusPusher() as manager: + manager.gauge("unevaluated_dids").set(result) - print(result) - except: + except Exception as e: + print(f"Error: {e}") sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_unlocked_replicas b/common/check_unlocked_replicas index 80276a91..3a90fca8 100755 --- a/common/check_unlocked_replicas +++ b/common/check_unlocked_replicas @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,62 +8,43 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 +# - Maggie Voetberg, , 2024 """ Probe to check the backlog of unlocked replicas. """ -from __future__ import print_function import sys +from sqlalchemy.sql import text as sql_text -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla.session import BASE, get_session - -from utils.common import probe_metrics +from utils.common import PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -# select (select rse from "ATLAS_RUCIO".rses where id = rse_id), -# n -# from -# (SELECT /*+ index_FFS(replicas REPLICAS_TOMBSTONE_IDX) */ -# CASE WHEN ("ATLAS_RUCIO".replicas.tombstone IS NOT NULL) THEN "ATLAS_RUCIO".replicas.rse_id END as rse_id, -# count(*) as n -# FROM "ATLAS_RUCIO".replicas -# WHERE "ATLAS_RUCIO".replicas.tombstone is not null -# GROUP BY CASE WHEN ("ATLAS_RUCIO".replicas.tombstone IS NOT NULL) THEN "ATLAS_RUCIO".replicas.rse_id END) - if BASE.metadata.schema: schema = BASE.metadata.schema + '.' else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: - registry = CollectorRegistry() session = get_session() - unlocked_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null'.format(schema=schema) - result = session.execute(unlocked_sql).fetchone()[0] - probe_metrics.gauge(name='reaper.unlocked_replicas').set(result) - Gauge('reaper_unlocked_replicas', '', registry=registry).set(result) - print(result) + expired_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null and tombstone < sysdate - 2/24'.format(schema=schema) - result = session.execute(expired_sql).fetchone()[0] - probe_metrics.gauge(name='reaper.unlocked_replicas').set(result) - Gauge('reaper_expired_replicas', '', registry=registry).set(result) + unlocked_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null'.format(schema=schema) + queries = { + "expired": expired_sql, + "unlocked": unlocked_sql + } + + with PrometheusPusher() as manager: + for replica_status, query in queries.items(): + result = session.execute(sql_text(query)).fetchone()[0] + manager.gauge("unlocked_replicas.{replica_status}").labels(did_status=replica_status).set(result) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_unlocked_replicas', registry=registry) - except: - continue except: sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_updated_dids b/common/check_updated_dids index 756aad8a..82e25f3b 100755 --- a/common/check_updated_dids +++ b/common/check_updated_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,57 +8,34 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020-2021 +# - Eric Vaandering, , 2020-2022 +# - Maggie Voetberg, , 2024 """ Probe to check the backlog of updated dids. """ -from __future__ import print_function import sys import traceback +from sqlalchemy import func -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get from rucio.db.sqla import models from rucio.db.sqla.session import get_session -from rucio.db.sqla.util import get_count -from utils.common import probe_metrics +from utils.common import PrometheusPusher # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: - registry = CollectorRegistry() session = get_session() - query = session.query(models.UpdatedDID) - result = get_count(query) - probe_metrics.gauge(name='judge.updated_dids').set(result) - Gauge('judge_updated_dids', '', registry=registry).set(result) + with PrometheusPusher() as manager: + query = session.query(func.count(models.UpdatedDID.id)) + result = query.scalar() or 0 - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_updated_dids', registry=registry) - except: - continue + manager.gauge('updated_dids').set(result) - # created_at, count, max, min, avg, stdev = 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 - # result = session.execute('select * from atlas_rucio.concurency_stats where created_at > sysdate - 1/1440') - # for row in result: - # created_at, count, max, min, avg, stdev = row - # monitor.record_gauge(stat='judge.updated_dids_per_min.count', value=count or 0) - # monitor.record_gauge(stat='judge.updated_dids_per_min.max', value=max or 0) - # monitor.record_gauge(stat='judge.updated_dids_per_min.min', value=min or 0) - # monitor.record_gauge(stat='judge.updated_dids_per_min.avg', value=avg or 0) - # monitor.record_gauge(stat='judge.updated_dids_per_min.stdev', value=stdev or 0) - # print created_at, count, max, min, avg, stdev except: print(traceback.format_exc()) sys.exit(UNKNOWN)