diff --git a/lib/rucio/core/heartbeat.py b/lib/rucio/core/heartbeat.py index f18c344f39..0de16d34ef 100644 --- a/lib/rucio/core/heartbeat.py +++ b/lib/rucio/core/heartbeat.py @@ -20,7 +20,7 @@ from rucio.common.exception import DatabaseException from rucio.common.utils import pid_exists -from rucio.db.sqla.models import Heartbeats +from rucio.db.sqla.models import Heartbeat from rucio.db.sqla.session import read_session, transactional_session if TYPE_CHECKING: @@ -93,36 +93,36 @@ def _sanity_check( :param session: The database session in use. """ base_stmt = select( - Heartbeats.pid + Heartbeat.pid ).distinct( ).where( - Heartbeats.hostname == hostname + Heartbeat.hostname == hostname ) if executable: if not hash_executable: hash_executable = calc_hash(executable) stmt = base_stmt.where( - Heartbeats.executable == hash_executable + Heartbeat.executable == hash_executable ) for pid in session.execute(stmt).scalars().all(): if not pid_exists(pid): stmt = delete( - Heartbeats + Heartbeat ).where( - and_(Heartbeats.executable == hash_executable, - Heartbeats.hostname == hostname, - Heartbeats.pid == pid) + and_(Heartbeat.executable == hash_executable, + Heartbeat.hostname == hostname, + Heartbeat.pid == pid) ) session.execute(stmt) else: for pid in session.execute(base_stmt).scalars().all(): if not pid_exists(pid): stmt = delete( - Heartbeats + Heartbeat ).where( - and_(Heartbeats.hostname == hostname, - Heartbeats.pid == pid) + and_(Heartbeat.hostname == hostname, + Heartbeat.pid == pid) ) session.execute(stmt) @@ -172,45 +172,45 @@ def live( # upsert the heartbeat stmt = update( - Heartbeats + Heartbeat ).where( - and_(Heartbeats.executable == hash_executable, - Heartbeats.hostname == hostname, - Heartbeats.pid == pid, - Heartbeats.thread_id == thread_id) + and_(Heartbeat.executable == hash_executable, + Heartbeat.hostname == hostname, + Heartbeat.pid == pid, + Heartbeat.thread_id == thread_id) ).values({ - Heartbeats.updated_at: datetime.datetime.utcnow(), - Heartbeats.payload: payload + Heartbeat.updated_at: datetime.datetime.utcnow(), + Heartbeat.payload: payload }) if not session.execute(stmt).rowcount: - Heartbeats(executable=hash_executable, - readable=executable[:Heartbeats.readable.property.columns[0].type.length], - hostname=hostname, - pid=pid, - thread_id=thread_id, - thread_name=thread_name, - payload=payload).save(session=session) + Heartbeat(executable=hash_executable, + readable=executable[:Heartbeat.readable.property.columns[0].type.length], + hostname=hostname, + pid=pid, + thread_id=thread_id, + thread_name=thread_name, + payload=payload).save(session=session) # assign thread identifier stmt = select( - Heartbeats.hostname, - Heartbeats.pid, - Heartbeats.thread_id + Heartbeat.hostname, + Heartbeat.pid, + Heartbeat.thread_id ).with_hint( - Heartbeats, + Heartbeat, 'INDEX(HEARTBEATS HEARTBEATS_PK)', 'oracle' ).where( - and_(Heartbeats.executable == hash_executable, - Heartbeats.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)) + and_(Heartbeat.executable == hash_executable, + Heartbeat.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)) ).group_by( - Heartbeats.hostname, - Heartbeats.pid, - Heartbeats.thread_id + Heartbeat.hostname, + Heartbeat.pid, + Heartbeat.thread_id ).order_by( - Heartbeats.hostname, - Heartbeats.pid, - Heartbeats.thread_id + Heartbeat.hostname, + Heartbeat.pid, + Heartbeat.thread_id ) result = session.execute(stmt).all() @@ -252,17 +252,17 @@ def die( hash_executable = calc_hash(executable) stmt = delete( - Heartbeats + Heartbeat ).where( - and_(Heartbeats.executable == hash_executable, - Heartbeats.hostname == hostname, - Heartbeats.pid == pid, - Heartbeats.thread_id == thread.ident) + and_(Heartbeat.executable == hash_executable, + Heartbeat.hostname == hostname, + Heartbeat.pid == pid, + Heartbeat.thread_id == thread.ident) ) if older_than: stmt = stmt.where( - Heartbeats.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than) + Heartbeat.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than) ) session.execute(stmt) @@ -277,12 +277,12 @@ def cardiac_arrest(older_than: Optional[int] = None, *, session: "Session") -> N """ stmt = delete( - Heartbeats + Heartbeat ) if older_than: stmt = stmt.where( - Heartbeats.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than) + Heartbeat.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than) ) session.execute(stmt) @@ -298,17 +298,17 @@ def list_heartbeats(*, session: "Session") -> list["HeartbeatDict"]: """ stmt = select( - Heartbeats.readable, - Heartbeats.hostname, - Heartbeats.pid, - Heartbeats.thread_name, - Heartbeats.updated_at, - Heartbeats.created_at, - Heartbeats.payload + Heartbeat.readable, + Heartbeat.hostname, + Heartbeat.pid, + Heartbeat.thread_name, + Heartbeat.updated_at, + Heartbeat.created_at, + Heartbeat.payload ).order_by( - Heartbeats.readable, - Heartbeats.hostname, - Heartbeats.thread_name + Heartbeat.readable, + Heartbeat.hostname, + Heartbeat.thread_name ) result = session.execute(stmt).all() @@ -340,15 +340,15 @@ def list_payload_counts( if not hash_executable: hash_executable = calc_hash(executable) stmt = select( - Heartbeats.payload, - func.count(Heartbeats.payload) + Heartbeat.payload, + func.count(Heartbeat.payload) ).where( - and_(Heartbeats.executable == hash_executable, - Heartbeats.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)) + and_(Heartbeat.executable == hash_executable, + Heartbeat.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)) ).group_by( - Heartbeats.payload + Heartbeat.payload ).order_by( - Heartbeats.payload + Heartbeat.payload ) return dict((payload, count) for payload, count in session.execute(stmt).all() if payload) diff --git a/lib/rucio/db/sqla/models.py b/lib/rucio/db/sqla/models.py index 408c927b80..c3135fd467 100644 --- a/lib/rucio/db/sqla/models.py +++ b/lib/rucio/db/sqla/models.py @@ -1621,7 +1621,7 @@ class ConfigHistory(BASE, ModelBase): _table_args = () -class Heartbeats(BASE, ModelBase): +class Heartbeat(BASE, ModelBase): """Represents the status and heartbeat of the running daemons and services""" __tablename__ = 'heartbeats' executable: Mapped[str] = mapped_column(String(64)) # SHA-2 diff --git a/tests/test_heartbeat.py b/tests/test_heartbeat.py index 232332ae97..01abe1569c 100644 --- a/tests/test_heartbeat.py +++ b/tests/test_heartbeat.py @@ -20,7 +20,7 @@ from sqlalchemy import delete, update from rucio.core.heartbeat import cardiac_arrest, die, list_heartbeats, list_payload_counts, live, sanity_check -from rucio.db.sqla.models import Heartbeats +from rucio.db.sqla.models import Heartbeat from rucio.db.sqla.session import transactional_session @@ -36,9 +36,9 @@ def _create_executable(): yield _create_executable stmt = delete( - Heartbeats + Heartbeat ).where( - Heartbeats.executable.in_(executables) + Heartbeat.executable.in_(executables) ) db_session.execute(stmt) @@ -165,19 +165,19 @@ def __forge_updated_at(*, session=None): two_days_ago = datetime.utcnow() - timedelta(days=2) a_dozen_hours_ago = datetime.utcnow() - timedelta(hours=12) stmt = update( - Heartbeats + Heartbeat ).where( - Heartbeats.hostname == 'host1' + Heartbeat.hostname == 'host1' ).values({ - Heartbeats.updated_at: two_days_ago + Heartbeat.updated_at: two_days_ago }) session.execute(stmt) stmt = update( - Heartbeats + Heartbeat ).where( - Heartbeats.hostname == 'host2' + Heartbeat.hostname == 'host2' ).values({ - Heartbeats.updated_at: a_dozen_hours_ago + Heartbeat.updated_at: a_dozen_hours_ago }) session.execute(stmt)