Skip to content

Commit

Permalink
Merge pull request rucio#125 from haozturk/CMSRucio-688-fix
Browse files Browse the repository at this point in the history
CMS: modernize the check_rule_counts probe
  • Loading branch information
ericvaandering authored Jan 29, 2024
2 parents 5bdb2cd + dc9b8c3 commit 86f8be2
Showing 1 changed file with 75 additions and 257 deletions.
332 changes: 75 additions & 257 deletions cms/check_rule_counts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
# Copyright 2012-2020 CERN
# Copyright 2012-2024 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
# - Donata Mielaikaite, <[email protected]>, 2020
# - Eric Vaandering, <[email protected]>, 2021
# - Fernando Garzon, [email protected], 2022
# - Hasan Ozturk, haozturk AT cern DOT ch, 2024


"""
Expand All @@ -38,273 +39,90 @@ from sqlalchemy import func

from utils import common

PrometheusPusher = common.PrometheusPusher
probe_metrics = common.probe_metrics

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')
else:
PROM_SERVERS = None

prom_labels_config = config_get('monitor', 'prometheus_labels', raise_exception=False, default='{}')
extra_prom_labels = json.loads(prom_labels_config)

# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

if __name__ == '__main__':
registry = CollectorRegistry()

rule_count_labels = ['state']
not_ok_labels = ['Not_ok_rules']
stuck_cnt_labels = ['Stuck_cnt']
replicating_cnt_labels = ['Replicating_cnt']
rules_by_activity_labels = ['state', 'activity']
locks_by_activity_labels = ['state', 'activity']

rule_count_labels.extend(extra_prom_labels.keys())
not_ok_labels.extend(extra_prom_labels.keys())
stuck_cnt_labels.extend(extra_prom_labels.keys())
replicating_cnt_labels.extend(extra_prom_labels.keys())
rules_by_activity_labels.extend(extra_prom_labels.keys())
locks_by_activity_labels.extend(extra_prom_labels.keys())

rules_count_gauge = Gauge('rucio_rules_count', 'Number of rules in a given state',
labelnames=rule_count_labels, registry=registry)
not_ok_rules_gauge = Gauge('rucio_not_ok_rules', 'Number of not OK rules',
labelnames=not_ok_labels, registry=registry)
stuck_cnt_gauge = Gauge('rucio_stuck_cnt', 'Number of stuck files',
labelnames=stuck_cnt_labels, registry=registry)
replicating_cnt_gauge = Gauge('rucio_replicating_cnt', 'Number of replicating files',
labelnames=replicating_cnt_labels, registry=registry)
rule_count_by_activity_gauge = Gauge('rucio_rules_states_by_ativity_cnt', 'Number of S/R/U rules by activity',
labelnames=rules_by_activity_labels, registry=registry)
locks_count_by_activity_gauge = Gauge('rucio_locks_states_by_ativity_cnt', 'Number of S/R locks by activity',
labelnames=locks_by_activity_labels, registry=registry)

try:
session = get_session()

# check rules
state_map = {'REPLICATING': 'rules_replicating',
'OK': 'rules_ok',
'INJECT': 'rules_injecting',
'STUCK': 'rules_stuck',
'SUSPENDED': 'rules_suspend',
'WAITING_APPROVAL': 'rules_waiting_approval', }

ages = {
'created_24hours_ago': datetime.timedelta(days=1),
'created_1week_ago': datetime.timedelta(days=7),
'created_3weeks_ago': datetime.timedelta(days=21),
'created_3months_ago': datetime.timedelta(days=90),
'created_6months_ago': datetime.timedelta(days=180),
'created_12months_ago': datetime.timedelta(days=365),
}

result = (session.query(models.ReplicationRule.state, func.count(models.ReplicationRule.state))
.group_by(models.ReplicationRule.state)
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle')
.all())

for state, num in result:
# Count Rules in x state
gauge_state = state_map.get(str(state.name), 'rules_' + str(state.name).lower())
print('rules.count.%s %s' % (gauge_state, num))

prom_labels = {'state': gauge_state}
prom_labels.update(extra_prom_labels)
rules_count_gauge.labels(**prom_labels).set(num)

probe_metrics.gauge(name='rules.count.{state}',
documentation='Number of rules in a given state').labels(state=gauge_state).set(num)

# Count Rules in x state ordered by activity
results = (session.query(models.ReplicationRule.state, models.ReplicationRule.activity, func.count(models.ReplicationRule.state))
.filter(models.ReplicationRule.state != RuleState.OK)
.group_by(models.ReplicationRule.activity, models.ReplicationRule.state)
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle')
.all())

for result in results:
print(result[0], result[1], result[2])
prom_labels = {'state': result[0], 'activity': result[1]}
prom_labels.update(extra_prom_labels)
rule_count_by_activity_gauge.labels(**prom_labels).set(result[2])

# Count Locks in S/R state ordered by activity
print('Count Locks in S/R state ordered by activity')
results = (session.query(models.ReplicationRule.activity,
func.sum(models.ReplicationRule.locks_stuck_cnt),
func.sum(models.ReplicationRule.locks_replicating_cnt))
.group_by(models.ReplicationRule.activity)
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle')
.all())

for result in results:
prom_labels = {'state': 'stuck', 'activity': result[0]}
prom_labels.update(extra_prom_labels)
locks_count_by_activity_gauge.labels(**prom_labels).set(result[1])
prom_labels = {'state': 'replicating', 'activity': result[0]}
prom_labels.update(extra_prom_labels)
locks_count_by_activity_gauge.labels(**prom_labels).set(result[2])

# Not Ok rules
query = session.query(models.ReplicationRule.scope).filter(models.ReplicationRule.state != RuleState.OK)
result = get_count(query)

prom_labels = {'Not_ok_rules': 'Not_ok_rules'}
prom_labels.update(extra_prom_labels)
not_ok_rules_gauge.labels(**prom_labels).set(result)

probe_metrics.gauge(name='judge.total_not_OK_rules', documentation='Number of not OK rules').set(result)

# Stuck cnt
query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt))
.filter(models.ReplicationRule.state == RuleState.STUCK))
result = query.scalar() or 0

print('rules.no_of_files.total.sum_locks_stuck_cnt %s' % (result))

prom_labels = {'Stuck_cnt': 'Stuck_cnt'}
prom_labels.update(extra_prom_labels)
stuck_cnt_gauge.labels(**prom_labels).set(result)

probe_metrics.gauge(name='rules.no_of_files.total.sum_locks_stuck_cnt',
documentation='Number of stuck files').set(result)

# check left replicating files
query = (session.query(func.sum(models.ReplicationRule.locks_replicating_cnt))
.filter(models.ReplicationRule.state.in_([RuleState.STUCK, RuleState.REPLICATING])))
result = query.scalar() or 0

print('rules.no_of_files.total.sum_locks_replicating_cnt %s' % (result))

prom_labels = {'Replicating_cnt': 'Replicating_cnt'}
prom_labels.update(extra_prom_labels)
replicating_cnt_gauge.labels(**prom_labels).set(result)

probe_metrics.gauge(name='rules.no_of_files.total.sum_locks_replicating_cnt',
documentation='Number of replicating files').set(result)

# check stuck and replicating files which are more than X old
suspended_rules_older_than_X_labels = ['age']
stuck_rules_older_than_X_labels = ['age']
replicating_rules_older_than_X_labels = ['age']

suspended_rules_older_than_X_labels.extend(extra_prom_labels.keys())
stuck_rules_older_than_X_labels.extend(extra_prom_labels.keys())
replicating_rules_older_than_X_labels.extend(extra_prom_labels.keys())

stuck_locks_older_than_X_labels = ['age']
replicating_locks_older_than_X_labels = ['age']

stuck_locks_older_than_X_labels.extend(extra_prom_labels.keys())
replicating_locks_older_than_X_labels.extend(extra_prom_labels.keys())

suspended_rules_older_than_gauge = Gauge('rucio_suspended_rules_cnt',
'Number of suspended rules older than X',
labelnames=suspended_rules_older_than_X_labels,
registry=registry)
print('suspended_rules_older_than_X_labels', suspended_rules_older_than_X_labels)
stuck_rules_older_than_gauge = Gauge('rucio_stuck_rules_cnt', 'Number of stuck rules older than X',
labelnames=stuck_rules_older_than_X_labels,
registry=registry)

replicating_rules_older_than_gauge = Gauge('rucio_replicating_rules_cnt',
'Number of replicating rules older than X',
labelnames=replicating_rules_older_than_X_labels,
registry=registry)

stuck_locks_older_than_gauge = Gauge('rucio_stuck_locks_cnt', 'Number of stuck files older than X',
labelnames=stuck_locks_older_than_X_labels,
registry=registry)
replicating_locks_older_than_gauge = Gauge('rucio_replicating_locks_cnt',
'Number of replicating files older than X',
labelnames=replicating_locks_older_than_X_labels,
registry=registry)

for a_name, a_delta in ages.items():
timeLimit = datetime.datetime.utcnow() - a_delta

# Number of Suspended rules older than x

query = (session.query(func.count(models.ReplicationRule.id))
.filter(models.ReplicationRule.state == RuleState.SUSPENDED)
.filter(models.ReplicationRule.created_at <= timeLimit))
result = query.scalar() or 0
print('rules.no_of_rules.suspended.%s.suspended_rules_cnt %s' % (a_name, result))

prom_labels = {'age': a_name.split('_')[1]}
prom_labels.update(extra_prom_labels)

suspended_rules_older_than_gauge.labels(**prom_labels).set(result)
print(prom_labels, result)

probe_metrics.gauge(name='rules.no_of_rules.suspended.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result)

# Number of Stuck rules older than x

query = (session.query(func.count(models.ReplicationRule.id))
.filter(models.ReplicationRule.state == RuleState.STUCK)
.filter(models.ReplicationRule.created_at <= timeLimit))
result = query.scalar() or 0
print('rules.no_of_rules.stuck.%s.stuck_rules_cnt %s' % (a_name, result))

stuck_rules_older_than_gauge.labels(**prom_labels).set(result)
print(prom_labels, result)

probe_metrics.gauge(name='rules.no_of_rules.stuck.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result)

# Number of replicating rules older than x

query = (session.query(func.count(models.ReplicationRule.id))
.filter(models.ReplicationRule.state == RuleState.REPLICATING)
.filter(models.ReplicationRule.created_at <= timeLimit))
result = query.scalar() or 0
print('rules.no_of_rules.replicating.%s.replicating_rules_cnt %s' % (a_name, result))

replicating_rules_older_than_gauge.labels(**prom_labels).set(result)
print(prom_labels, result)

probe_metrics.gauge(name='rules.no_of_rules.replicating.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result)

# Number of Stuck files

query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt))
.filter(models.ReplicationRule.state == RuleState.STUCK)
.filter(models.ReplicationRule.created_at <= timeLimit))
result = query.scalar() or 0
print('rules.no_of_files.stuck.%s.sum_locks_stuck_cnt %s' % (a_name, result))

stuck_locks_older_than_gauge.labels(**prom_labels).set(result)
print(prom_labels, result)

probe_metrics.gauge(name='rules.no_of_files.stuck.{name}.sum_locks_stuck_cnt',
documentation='Number of stuck files older than X').labels(name=a_name).set(result)

# Number of Replicating files

query = (session.query(func.sum(models.ReplicationRule.locks_replicating_cnt))
.filter(models.ReplicationRule.state.in_([RuleState.STUCK, RuleState.REPLICATING]))
.filter(models.ReplicationRule.created_at <= timeLimit))
result = query.scalar() or 0
print('rules.no_of_files.replicating.%s.sum_locks_replicating_cnt %s' % (a_name, result))

replicating_locks_older_than_gauge.labels(**prom_labels).set(result)
print(prom_labels, result)

(probe_metrics.gauge(name='rules.no_of_files.replicating.{name}.sum_locks_replicating_cnt',
documentation='Number of replicating files older than X')
.labels(name=a_name)
.set(result))
states = {'REPLICATING': RuleState.REPLICATING,
'OK': RuleState.OK,
'INJECT': RuleState.INJECT,
'STUCK': RuleState.STUCK,
'SUSPENDED': RuleState.SUSPENDED,
'WAITING_APPROVAL': RuleState.WAITING_APPROVAL }

# Number of days which will be used to aggregate rules/locks
older_than_n_days = [1, 7, 21, 90, 180, 365]

with PrometheusPusher() as manager:

# Rule count per state
result = (session.query(models.ReplicationRule.state, func.count(models.ReplicationRule.state))
.group_by(models.ReplicationRule.state)
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle')
.all())

for state, num in result:
manager.gauge(name='rule_count_per_state.{state}',
documentation='Number of rules in a given state').labels(state=str(state.name)).set(num)


# Rule count per state and activity
results = (session.query(models.ReplicationRule.state, models.ReplicationRule.activity, func.count(models.ReplicationRule.state))
.filter(models.ReplicationRule.state != RuleState.OK)
.group_by(models.ReplicationRule.activity, models.ReplicationRule.state)
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle')
.all())

for result in results:
manager.gauge(name='rule_count_per_state_and_activity.{state}.{activity}',
documentation='Number of rules in a given state and activity').labels(state=result[0], activity=result[1]).set(result[2])


# Lock count per state (STUCK and REPLICATING) and activity
results = (session.query(models.ReplicationRule.activity,
func.sum(models.ReplicationRule.locks_stuck_cnt),
func.sum(models.ReplicationRule.locks_replicating_cnt))
.group_by(models.ReplicationRule.activity)
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle')
.all())

for result in results:
manager.gauge(name='lock_count_per_state_and_activity.{state}.{activity}',
documentation='Number of S/R locks by activity and state').labels(state='stuck', activity=result[0]).set(result[1])
manager.gauge(name='lock_count_per_state_and_activity.{state}.{activity}',
documentation='Number of S/R locks by activity and state').labels(state='replicating', activity=result[0]).set(result[2])

for nDays in older_than_n_days:
age = datetime.datetime.utcnow() - datetime.timedelta(days=nDays)

for stateName, stateDB in states.items():

# Rule count per state and date
query = (session.query(func.count(models.ReplicationRule.id))
.filter(models.ReplicationRule.state == stateDB)
.filter(models.ReplicationRule.created_at <= age))
result = query.scalar() or 0

manager.gauge(name='rule_count_per_state_and_date.{state}.{older_than_days}',
documentation='Rule count per state and date').labels(state=stateName, older_than_days= nDays ).set(result)


# File count per state and date
query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt))
.filter(models.ReplicationRule.state == stateDB)
.filter(models.ReplicationRule.created_at <= age))
result = query.scalar() or 0

manager.gauge(name='file_count_per_state_and_date.{state}.{older_than_days}',
documentation='File count per state and date').labels(state=stateName, older_than_days=nDays).set(result)

if PROM_SERVERS:
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_rules_count_by_state_by_account', registry=registry)
except:
continue
except:
print(traceback.format_exc())
sys.exit(UNKNOWN)
Expand Down

0 comments on commit 86f8be2

Please sign in to comment.