Skip to content

Commit

Permalink
Remove connecting and subscribing to the job result dead letter queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
cwang39403 committed Oct 22, 2024
1 parent 4b08a72 commit e92402f
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 78 deletions.
46 changes: 1 addition & 45 deletions src/omotes_orchestrator/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
import logging
import signal
import sys
import threading
import pprint
import uuid
from datetime import timedelta, datetime
from datetime import timedelta
from types import FrameType
from typing import Any, Union

Expand Down Expand Up @@ -158,9 +157,6 @@ class Orchestrator:
"""Cancel and delete the job when it is timed out."""
_init_barriers: LifeCycleBarrierManager

LOGS_LOCAL_DIR: str = "../logs"
"""The local directory path to keep log files."""

def __init__(
self,
config: OrchestratorConfig,
Expand Down Expand Up @@ -236,10 +232,6 @@ def start(self) -> None:
)
self.omotes_sdk_if.send_available_workflows()

self.omotes_sdk_if.connect_to_job_result_dead_letter_queue(
callback_on_dead_lettered_job_result=self.dead_lettered_job_result_handler
)

self.postgres_job_manager.start()
self.timeout_job_manager.start()

Expand Down Expand Up @@ -422,42 +414,6 @@ def job_cancellation_handler(self, job_cancellation: JobCancel) -> None:
)
self._cleanup_job(job_id)

def dead_lettered_job_result_handler(self, job_result: JobResult) -> None:
"""Handle the received dead lettered job result.
When the log level is set at the DEBUG level or below,
the dead lettered job result will be written to a local file.
:param job_result: Job result message.
"""
logger.info(
"Received a dead lettered job_%s_result with result type as: %s",
job_result.uuid,
job_result.result_type,
)

log_level = logger.getEffectiveLevel()
if log_level <= logging.DEBUG:
try:
log_dir = self.LOGS_LOCAL_DIR
if not os.path.exists(log_dir):
os.makedirs(log_dir)

cur_time = datetime.now().strftime("%Y%m%d-%H%M%S")
file_name = f"{cur_time}_dead_lettered_job_{job_result.uuid}_result.txt"
file_path = os.path.join(log_dir, file_name)
with open(file_path, "a") as f:
f.write("--------------Logs:\n")
f.write(job_result.logs + "\n")
f.write("--------------Status:\n")
f.write(str(job_result.result_type) + "\n")
f.write("--------------ESDL:\n")
f.write(job_result.output_esdl + "\n")

logger.info("The job result is logged to: %s", file_path)
except Exception as e:
logger.warning("An error occurred while logging the job result: %s", e)

def _cleanup_job(self, job_id: uuid.UUID) -> None:
"""Cleanup any references to job with id `job_id`.
Expand Down
33 changes: 0 additions & 33 deletions src/omotes_orchestrator/sdk_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,6 @@ def callback_on_request_workflows_wrapped(self, message: bytes) -> None:
self.callback_on_request_workflows(request_available_workflows)


@dataclass
class DeadLetteredJobResultHandler:
"""Handler to set up callback when receiving a dead lettered job result."""

callback_on_dead_lettered_job_result: Callable[[JobResult], None]
"""Callback to call when a dead lettered job result is received."""

def callback_on_dead_lettered_job_result_wrapped(self, message: bytes) -> None:
"""Prepare the dead lettered `JobResult` message before passing them to the callback.
:param message: Serialized AMQP message containing a dead lettered job result.
"""
dead_lettered_job = JobResult()
dead_lettered_job.ParseFromString(message)

self.callback_on_dead_lettered_job_result(dead_lettered_job)


class SDKInterface:
"""RabbitMQ interface specifically for the orchestrator."""

Expand Down Expand Up @@ -169,21 +151,6 @@ def connect_to_request_available_workflows(
exchange_name=OmotesQueueNames.omotes_exchange_name(),
)

def connect_to_job_result_dead_letter_queue(
self, callback_on_dead_lettered_job_result: Callable[[JobResult], None]
) -> None:
"""Connect to the job result dead letter queue.
:param callback_on_dead_lettered_job_result: Callback to handle a dead lettered job result.
"""
callback_handler = DeadLetteredJobResultHandler(callback_on_dead_lettered_job_result)
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_result_dead_letter_queue_name(),
callback_on_message=callback_handler.callback_on_dead_lettered_job_result_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
)

def send_job_progress_update(self, job: Job, progress_update: JobProgressUpdate) -> None:
"""Send a job progress update to the SDK.
Expand Down

0 comments on commit e92402f

Please sign in to comment.