Skip to content

Commit

Permalink
Refactor stale_job_cleaner into a separate class component
Browse files Browse the repository at this point in the history
  • Loading branch information
cwang39403 committed Jun 20, 2024
1 parent d92d3ee commit e72948d
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 56 deletions.
19 changes: 17 additions & 2 deletions src/omotes_orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class PostgreSQLConfig:
database: str
username: Optional[str]
password: Optional[str]
job_retention_sec: int

def __init__(self, prefix: str = ""):
"""Create the PostgreSQL configuration and retrieve values from env vars.
Expand All @@ -40,7 +39,20 @@ def __init__(self, prefix: str = ""):
self.database = os.environ.get(f"{prefix}POSTGRESQL_DATABASE", "public")
self.username = os.environ.get(f"{prefix}POSTGRESQL_USERNAME")
self.password = os.environ.get(f"{prefix}POSTGRESQL_PASSWORD")
"""Default database job row retention period to be 48 hours."""


class PostgresJobManagerConfig:
"""Retrieve PostgresJobManager configuration from environment variables."""

job_retention_sec: int
"""The allowed retention time in seconds of a database job row"""

def __init__(self, prefix: str = ""):
"""Create the PostgresJobManager configuration and retrieve values from env vars.
:param prefix: Prefix to the name environment variables.
"""
"""Default database job row retention duration to be 48 hours."""
self.job_retention_sec = int(os.environ.get(f"{prefix}JOB_RETENTION_SEC", "172800"))


Expand All @@ -52,6 +64,8 @@ class OrchestratorConfig:
"""Configuration for Celery app."""
postgres_config: PostgreSQLConfig
"""Configuration for PostgreSQL database for job persistence."""
postgres_job_manager_config: PostgresJobManagerConfig
"""Configuration for PostgresJobManager component."""
rabbitmq_omotes: RabbitMQConfig
"""Configuration to connect to RabbitMQ on the OMOTES SDK side."""
rabbitmq_worker_events: RabbitMQConfig
Expand All @@ -69,6 +83,7 @@ def __init__(self) -> None:
"""Construct the orchestrator configuration using environment variables."""
self.celery_config = CeleryConfig()
self.postgres_config = PostgreSQLConfig()
self.postgres_job_manager_config = PostgresJobManagerConfig()
self.rabbitmq_omotes = EnvRabbitMQConfig("SDK_")
self.rabbitmq_worker_events = EnvRabbitMQConfig("TASK_")

Expand Down
30 changes: 18 additions & 12 deletions src/omotes_orchestrator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import threading
import pprint
import uuid
from datetime import datetime, timezone, timedelta
from datetime import timedelta
from types import FrameType
from typing import Any, Union

from omotes_orchestrator.postgres_interface import PostgresInterface
from omotes_orchestrator.postgres_job_manager import PostgresJobManager
from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import (
TaskResult,
TaskProgressUpdate,
Expand Down Expand Up @@ -145,9 +146,9 @@ class Orchestrator:
"""Interface to PostgreSQL."""
workflow_manager: WorkflowTypeManager
"""Store for all available workflow types."""
postgres_job_manager: PostgresJobManager
"""Manage postgres job row"""
_init_barriers: LifeCycleBarrierManager
"""Orchestrator instance start datetime (UTC)"""
_start_datetime_utc: datetime

def __init__(
self,
Expand All @@ -156,6 +157,7 @@ def __init__(
celery_if: CeleryInterface,
postgresql_if: PostgresInterface,
workflow_manager: WorkflowTypeManager,
postgres_job_manager: PostgresJobManager,
):
"""Construct the orchestrator.
Expand All @@ -165,14 +167,15 @@ def __init__(
:param celery_if: Interface to the Celery app.
:param postgresql_if: Interface to PostgreSQL to persist job information.
:param workflow_manager: Store for all available workflow types.
:param postgres_job_manager: Manage postgres job row
"""
self.omotes_if = omotes_orchestrator_if
self.jobs_broker_if = jobs_broker_if
self.celery_if = celery_if
self.postgresql_if = postgresql_if
self.workflow_manager = workflow_manager
self.postgres_job_manager = postgres_job_manager
self._init_barriers = LifeCycleBarrierManager()
self._start_datetime_utc = datetime.now(timezone.utc)

def _resume_init_barriers(self, all_jobs: list[JobDB]) -> None:
"""Resume the INIT lifecycle barriers for all jobs while starting the orchestrator.
Expand All @@ -188,12 +191,7 @@ def start(self) -> None:
"""Start the orchestrator."""
self.postgresql_if.start()
self._resume_init_barriers(self.postgresql_if.get_all_jobs())

"""Start a thread to run database stale jobs cleaner."""
stale_jobs_cleaner = threading.Thread(target=self.postgresql_if.start_stale_jobs_cleaner,
args=[self._start_datetime_utc],
daemon=True)
stale_jobs_cleaner.start()
self.postgres_job_manager.start()

self.celery_if.start()
self.omotes_if.start()
Expand All @@ -215,7 +213,7 @@ def stop(self) -> None:
self.jobs_broker_if.stop()
self.celery_if.stop()
self.postgresql_if.stop()
# TODO: stop db stale_jobs_cleaner?
self.postgres_job_manager.stop()

def new_job_submitted_handler(self, job_submission: JobSubmission, job: Job) -> None:
"""When a new job is submitted through OMOTES SDK.
Expand Down Expand Up @@ -561,8 +559,16 @@ def main() -> None:
celery_if = CeleryInterface(config.celery_config)
jobs_broker_if = JobBrokerInterface(config.rabbitmq_worker_events)
postgresql_if = PostgresInterface(config.postgres_config)
postgres_job_manager = PostgresJobManager(postgresql_if,
config.postgres_job_manager_config)

orchestrator = Orchestrator(
orchestrator_if, jobs_broker_if, celery_if, postgresql_if, workflow_type_manager
orchestrator_if,
jobs_broker_if,
celery_if,
postgresql_if,
workflow_type_manager,
postgres_job_manager
)

stop_event = threading.Event()
Expand Down
43 changes: 1 addition & 42 deletions src/omotes_orchestrator/postgres_interface.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import uuid
from contextlib import contextmanager
from datetime import datetime, timezone, timedelta
import time
from datetime import datetime, timedelta
import logging
from typing import Generator, Optional

Expand Down Expand Up @@ -212,46 +211,6 @@ def delete_job(self, job_id: uuid.UUID) -> bool:

return result

def start_stale_jobs_cleaner(self, orchestrator_start_datetime_utc: datetime) -> None:
"""Start a background process to clean up stale jobs longer than the retention time.
The function acts as a daemon process, which periodically checks
if there are any stale jobs/rows in the database longer than the configured retention time.
If the orchestrator is checked to be active longer than the configured retention time,
the job/row will be deleted outright.
:param orchestrator_start_datetime_utc: Orchestrator instance start datetime (UTC).
"""
LOGGER.info("Start a database stale jobs cleaner as a daemon process")

CHECK_INTERVAL_SEC = 30
job_retention_sec = self.db_config.job_retention_sec

while True:
cur_time = datetime.now(timezone.utc)
orchestrator_active_sec = (cur_time - orchestrator_start_datetime_utc).total_seconds()

if orchestrator_active_sec > job_retention_sec:
jobs = self.get_all_jobs()
for job in jobs:
if job.running_at:
job_duration_sec = (cur_time - job.running_at).total_seconds()
elif job.submitted_at:
job_duration_sec = (cur_time - job.submitted_at).total_seconds()
else:
job_duration_sec = (cur_time - job.registered_at).total_seconds()

if job_duration_sec > job_retention_sec:
orchestrator_up_min = round(orchestrator_active_sec / 60, 1)
job_duration_min = round(job_duration_sec / 60, 1)

self.delete_job(job.job_id)
LOGGER.info("Orchestrator is up %s mins and found a job lasts %s mins. "
+ "Deleted the stale job %s",
orchestrator_up_min, job_duration_min, job.job_id)

time.sleep(CHECK_INTERVAL_SEC)

def job_exists(self, job_id: uuid.UUID) -> bool:
"""Check if the job exists in the database.
Expand Down
102 changes: 102 additions & 0 deletions src/omotes_orchestrator/postgres_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging
from datetime import datetime, timezone
import time
import threading
from omotes_orchestrator.config import PostgresJobManagerConfig
from omotes_orchestrator.postgres_interface import PostgresInterface
from omotes_orchestrator.db_models.job import JobDB


LOGGER = logging.getLogger("omotes_orchestrator")


class PostgresJobManager:
"""Periodically checks the job row in the database and cleans up the stale one."""

postgresql_if: PostgresInterface
"""Interface to PostgreSQL."""
postgres_job_manager_config: PostgresJobManagerConfig
"""PostgresJobManager configuration"""
_init_time: datetime
"""Instantiated datetime (UTC), is typically instantiated when the orchestrator is started"""
_active_threshold_sec: int
"""Only when the instance is initialized longer than the threshold period
can it start cleaning the stale jobs."""
_job_retention_sec: int
"""The allowed retention time in seconds of a postgres job row"""
_rerun_sec: int
"""The period in seconds to rerun the stale job cleaning task"""

def __init__(self,
postgresql_if: PostgresInterface,
postgres_job_manager_config: PostgresJobManagerConfig) -> None:
"""Construct the postgres job manager."""
self.postgresql_if = postgresql_if
self.postgres_job_manager_config = postgres_job_manager_config
self._init_time = datetime.now(timezone.utc)
self._active_threshold_sec = self.postgres_job_manager_config.job_retention_sec
self._job_retention_sec = self.postgres_job_manager_config.job_retention_sec
self._rerun_sec = 30

self._stale_jobs_cleaner_thread = threading.Thread(target=self.stale_jobs_cleaner,
daemon=True)
self._stale_jobs_cleaner_thread_active = False

def start(self) -> None:
"""Start the postgres job manager activities as a daemon process."""
if not self._stale_jobs_cleaner_thread_active:
self._stale_jobs_cleaner_thread.start()
self._stale_jobs_cleaner_thread_active = True

LOGGER.info("Starting the postgres job manager")

def stop(self) -> None:
"""Stop the postgres job manager activities."""
if self._stale_jobs_cleaner_thread_active:
self._stale_jobs_cleaner_thread.join(timeout=0.0)
self._stale_jobs_cleaner_thread_active = False

LOGGER.info("Stopped the postgres job manager")

def stale_jobs_cleaner(self) -> None:
"""Start a background process to clean up stale jobs longer than the retention time.
The function periodically checks if there are any stale jobs/rows in the database
longer than the configured retention time. Meanwhile if the PostgresJobManager
is instantiated (usually when the orchestrator is started) longer than
the configured time, the job/row will be deleted outright.
"""
while True:
cur_time = datetime.now(timezone.utc)
active_sec = (cur_time - self._init_time).total_seconds()

if active_sec > self._active_threshold_sec:
jobs = self.postgresql_if.get_all_jobs()
for job in jobs:
if self.job_row_is_stale(job, cur_time):
self.postgresql_if.delete_job(job.job_id)

job_manager_up_mins = round(active_sec / 60, 1)
LOGGER.info("PostgresJobManager is up %s mins. "
+ "Founded and deleted a stale job %s",
job_manager_up_mins, job.job_id)

time.sleep(self._rerun_sec)

def job_row_is_stale(self, job: JobDB, ref_time: datetime) -> bool:
"""Check if the job row is stale and can be safely deleted subsequently.
:param job: Database job row
:param ref_time: Reference datetime used for determining if the job row is stale
:return: True if the job is stale and can be safely deleted subsequently
"""
if job.running_at:
job_duration_sec = (ref_time - job.running_at).total_seconds()
elif job.submitted_at:
job_duration_sec = (ref_time - job.submitted_at).total_seconds()
else:
job_duration_sec = (ref_time - job.registered_at).total_seconds()

job_row_is_stale = job_duration_sec > self._job_retention_sec
return job_row_is_stale

2 changes: 2 additions & 0 deletions unit_test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(self) -> None:
self.postgresql_if = Mock()

self.workflow_manager = Mock()
self.postgres_job_manager = Mock()

with patch(
"omotes_orchestrator.main.LifeCycleBarrierManager"
Expand All @@ -181,6 +182,7 @@ def __init__(self) -> None:
celery_if=self.celery_if,
postgresql_if=self.postgresql_if,
workflow_manager=self.workflow_manager,
postgres_job_manager=self.postgres_job_manager
)

self.life_cycle_barrier_manager_obj_mock = (
Expand Down

0 comments on commit e72948d

Please sign in to comment.