Skip to content

Commit

Permalink
Database: Standardize sqla model name HeartBeats to singular rucio#6717
Browse files Browse the repository at this point in the history
  • Loading branch information
ericbanzuzi authored and bari12 committed Aug 23, 2024
1 parent a6d87ab commit dd837d2
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 71 deletions.
122 changes: 61 additions & 61 deletions lib/rucio/core/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from rucio.common.exception import DatabaseException
from rucio.common.utils import pid_exists
from rucio.db.sqla.models import Heartbeats
from rucio.db.sqla.models import Heartbeat
from rucio.db.sqla.session import read_session, transactional_session

if TYPE_CHECKING:
Expand Down Expand Up @@ -93,36 +93,36 @@ def _sanity_check(
:param session: The database session in use.
"""
base_stmt = select(
Heartbeats.pid
Heartbeat.pid
).distinct(
).where(
Heartbeats.hostname == hostname
Heartbeat.hostname == hostname
)
if executable:
if not hash_executable:
hash_executable = calc_hash(executable)

stmt = base_stmt.where(
Heartbeats.executable == hash_executable
Heartbeat.executable == hash_executable
)
for pid in session.execute(stmt).scalars().all():
if not pid_exists(pid):
stmt = delete(
Heartbeats
Heartbeat
).where(
and_(Heartbeats.executable == hash_executable,
Heartbeats.hostname == hostname,
Heartbeats.pid == pid)
and_(Heartbeat.executable == hash_executable,
Heartbeat.hostname == hostname,
Heartbeat.pid == pid)
)
session.execute(stmt)
else:
for pid in session.execute(base_stmt).scalars().all():
if not pid_exists(pid):
stmt = delete(
Heartbeats
Heartbeat
).where(
and_(Heartbeats.hostname == hostname,
Heartbeats.pid == pid)
and_(Heartbeat.hostname == hostname,
Heartbeat.pid == pid)
)
session.execute(stmt)

Expand Down Expand Up @@ -172,45 +172,45 @@ def live(

# upsert the heartbeat
stmt = update(
Heartbeats
Heartbeat
).where(
and_(Heartbeats.executable == hash_executable,
Heartbeats.hostname == hostname,
Heartbeats.pid == pid,
Heartbeats.thread_id == thread_id)
and_(Heartbeat.executable == hash_executable,
Heartbeat.hostname == hostname,
Heartbeat.pid == pid,
Heartbeat.thread_id == thread_id)
).values({
Heartbeats.updated_at: datetime.datetime.utcnow(),
Heartbeats.payload: payload
Heartbeat.updated_at: datetime.datetime.utcnow(),
Heartbeat.payload: payload
})
if not session.execute(stmt).rowcount:
Heartbeats(executable=hash_executable,
readable=executable[:Heartbeats.readable.property.columns[0].type.length],
hostname=hostname,
pid=pid,
thread_id=thread_id,
thread_name=thread_name,
payload=payload).save(session=session)
Heartbeat(executable=hash_executable,
readable=executable[:Heartbeat.readable.property.columns[0].type.length],
hostname=hostname,
pid=pid,
thread_id=thread_id,
thread_name=thread_name,
payload=payload).save(session=session)

# assign thread identifier
stmt = select(
Heartbeats.hostname,
Heartbeats.pid,
Heartbeats.thread_id
Heartbeat.hostname,
Heartbeat.pid,
Heartbeat.thread_id
).with_hint(
Heartbeats,
Heartbeat,
'INDEX(HEARTBEATS HEARTBEATS_PK)',
'oracle'
).where(
and_(Heartbeats.executable == hash_executable,
Heartbeats.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than))
and_(Heartbeat.executable == hash_executable,
Heartbeat.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than))
).group_by(
Heartbeats.hostname,
Heartbeats.pid,
Heartbeats.thread_id
Heartbeat.hostname,
Heartbeat.pid,
Heartbeat.thread_id
).order_by(
Heartbeats.hostname,
Heartbeats.pid,
Heartbeats.thread_id
Heartbeat.hostname,
Heartbeat.pid,
Heartbeat.thread_id
)
result = session.execute(stmt).all()

Expand Down Expand Up @@ -252,17 +252,17 @@ def die(
hash_executable = calc_hash(executable)

stmt = delete(
Heartbeats
Heartbeat
).where(
and_(Heartbeats.executable == hash_executable,
Heartbeats.hostname == hostname,
Heartbeats.pid == pid,
Heartbeats.thread_id == thread.ident)
and_(Heartbeat.executable == hash_executable,
Heartbeat.hostname == hostname,
Heartbeat.pid == pid,
Heartbeat.thread_id == thread.ident)
)

if older_than:
stmt = stmt.where(
Heartbeats.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)
Heartbeat.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)
)
session.execute(stmt)

Expand All @@ -277,12 +277,12 @@ def cardiac_arrest(older_than: Optional[int] = None, *, session: "Session") -> N
"""

stmt = delete(
Heartbeats
Heartbeat
)

if older_than:
stmt = stmt.where(
Heartbeats.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)
Heartbeat.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than)
)
session.execute(stmt)

Expand All @@ -298,17 +298,17 @@ def list_heartbeats(*, session: "Session") -> list["HeartbeatDict"]:
"""

stmt = select(
Heartbeats.readable,
Heartbeats.hostname,
Heartbeats.pid,
Heartbeats.thread_name,
Heartbeats.updated_at,
Heartbeats.created_at,
Heartbeats.payload
Heartbeat.readable,
Heartbeat.hostname,
Heartbeat.pid,
Heartbeat.thread_name,
Heartbeat.updated_at,
Heartbeat.created_at,
Heartbeat.payload
).order_by(
Heartbeats.readable,
Heartbeats.hostname,
Heartbeats.thread_name
Heartbeat.readable,
Heartbeat.hostname,
Heartbeat.thread_name
)

result = session.execute(stmt).all()
Expand Down Expand Up @@ -340,15 +340,15 @@ def list_payload_counts(
if not hash_executable:
hash_executable = calc_hash(executable)
stmt = select(
Heartbeats.payload,
func.count(Heartbeats.payload)
Heartbeat.payload,
func.count(Heartbeat.payload)
).where(
and_(Heartbeats.executable == hash_executable,
Heartbeats.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than))
and_(Heartbeat.executable == hash_executable,
Heartbeat.updated_at >= datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than))
).group_by(
Heartbeats.payload
Heartbeat.payload
).order_by(
Heartbeats.payload
Heartbeat.payload
)

return dict((payload, count) for payload, count in session.execute(stmt).all() if payload)
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/db/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,7 @@ class ConfigHistory(BASE, ModelBase):
_table_args = ()


class Heartbeats(BASE, ModelBase):
class Heartbeat(BASE, ModelBase):
"""Represents the status and heartbeat of the running daemons and services"""
__tablename__ = 'heartbeats'
executable: Mapped[str] = mapped_column(String(64)) # SHA-2
Expand Down
18 changes: 9 additions & 9 deletions tests/test_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from sqlalchemy import delete, update

from rucio.core.heartbeat import cardiac_arrest, die, list_heartbeats, list_payload_counts, live, sanity_check
from rucio.db.sqla.models import Heartbeats
from rucio.db.sqla.models import Heartbeat
from rucio.db.sqla.session import transactional_session


Expand All @@ -36,9 +36,9 @@ def _create_executable():
yield _create_executable

stmt = delete(
Heartbeats
Heartbeat
).where(
Heartbeats.executable.in_(executables)
Heartbeat.executable.in_(executables)
)
db_session.execute(stmt)

Expand Down Expand Up @@ -165,19 +165,19 @@ def __forge_updated_at(*, session=None):
two_days_ago = datetime.utcnow() - timedelta(days=2)
a_dozen_hours_ago = datetime.utcnow() - timedelta(hours=12)
stmt = update(
Heartbeats
Heartbeat
).where(
Heartbeats.hostname == 'host1'
Heartbeat.hostname == 'host1'
).values({
Heartbeats.updated_at: two_days_ago
Heartbeat.updated_at: two_days_ago
})
session.execute(stmt)
stmt = update(
Heartbeats
Heartbeat
).where(
Heartbeats.hostname == 'host2'
Heartbeat.hostname == 'host2'
).values({
Heartbeats.updated_at: a_dozen_hours_ago
Heartbeat.updated_at: a_dozen_hours_ago
})
session.execute(stmt)

Expand Down

0 comments on commit dd837d2

Please sign in to comment.