Skip to content

Commit

Permalink
Merge branch 'main' into 32-update-to-newest-optimzer-workflow-names
Browse files Browse the repository at this point in the history
  • Loading branch information
lfse-slafleur committed Apr 17, 2024
2 parents 42e9d5b + b22551c commit d1111ad
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 9 deletions.
11 changes: 9 additions & 2 deletions integration_test/run.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
#!/bin/bash

. .venv/bin/activate
python3 job_submitter.py
#!/bin/bash

if [[ "$OSTYPE" != "win32" && "$OSTYPE" != "msys" ]]; then # Linux
. .venv/bin/activate
python3 job_submitter.py
else
source venv/Scripts/activate
python job_submitter.py
fi
34 changes: 28 additions & 6 deletions src/omotes_orchestrator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class BarrierTimeoutException(BaseException):
pass


class MissingBarrierException(BaseException):
"""Exception which is thrown if a barrier is waited on or set while the barrier is missing."""

pass


class LifeCycleBarrierManager:
"""Maintain a (processing) barrier per job until a lifecycle is finished.
Expand Down Expand Up @@ -66,7 +72,7 @@ def ensure_barrier(self, job_id: uuid.UUID) -> threading.Event:
:param: job_id: The id of the job to ensure that a barrier is available.
:return: The barrier that is either created or was already available.
"""
# First check if the barrier doesn't before waiting on the modification lock
# First check if the barrier doesn't exist before waiting on the modification lock
if job_id not in self._barriers:
# Barrier doesn't exist yet, queue for the lock to add the barrier.
with self._barrier_modification_lock:
Expand All @@ -78,15 +84,22 @@ def ensure_barrier(self, job_id: uuid.UUID) -> threading.Event:

return self._barriers[job_id]

def _get_barrier(self, job_id: uuid.UUID) -> threading.Event:
"""Retrieve the barrier and throw exception if the barrier is not available."""
if job_id not in self._barriers:
raise MissingBarrierException(f"Lifecycle barrier is missing for job {job_id}")
return self._barriers[job_id]

def set_barrier(self, job_id: uuid.UUID) -> None:
"""Set the barrier for the job to ready.
Any threads that were waiting are notified they can continue and future threads will not
wait.
:param job_id: The id of the job for which the barrier may be set to ready.
:raises MissingBarrierException: Thrown if the barrier is not ensured or already cleaned up.
"""
barrier = self.ensure_barrier(job_id)
barrier = self._get_barrier(job_id)
barrier.set()

def wait_for_barrier(self, job_id: uuid.UUID) -> None:
Expand All @@ -97,8 +110,9 @@ def wait_for_barrier(self, job_id: uuid.UUID) -> None:
:param job_id: The id of the job for which to wait until the barrier is ready.
:raises BarrierTimeoutException: If the barrier is not ready within BARRIER_WAIT_TIMEOUT,
this exception is thrown.
:raises MissingBarrierException: Thrown if the barrier is not ensured or already cleaned up.
"""
barrier = self.ensure_barrier(job_id)
barrier = self._get_barrier(job_id)
result = barrier.wait(LifeCycleBarrierManager.BARRIER_WAIT_TIMEOUT)
if not result:
raise BarrierTimeoutException(f"Barrier for job {job_id} was not ready on time.")
Expand Down Expand Up @@ -158,8 +172,13 @@ def __init__(
self._init_barriers = LifeCycleBarrierManager()

def _resume_init_barriers(self, all_jobs: list[JobDB]) -> None:
"""Resume the INIT lifecycle barriers for all jobs while starting the orchestrator.
:param all_jobs: All jobs that are known while the orchestrator is starting up.
"""
for job in all_jobs:
if job.status != JobStatusDB.REGISTERED:
self._init_barriers.ensure_barrier(job.job_id)
self._init_barriers.set_barrier(job.job_id)

def start(self) -> None:
Expand Down Expand Up @@ -225,6 +244,7 @@ def new_job_submitted_handler(self, job_submission: JobSubmission, job: Job) ->
submit = True

if submit:
self._init_barriers.ensure_barrier(submitted_job_id)
celery_task_id = self.celery_if.start_workflow(
job.workflow_type,
job.id,
Expand Down Expand Up @@ -297,7 +317,7 @@ def job_cancellation_handler(self, job_cancellation: JobCancel) -> None:
logs="",
),
)
self.postgresql_if.delete_job(job.id)
self._cleanup_job(job_id)

def _cleanup_job(self, job_id: uuid.UUID) -> None:
"""Cleanup any references to job with id `job_id`.
Expand Down Expand Up @@ -349,9 +369,11 @@ def task_result_received(self, serialized_message: bytes) -> None:
logger.info("Ignoring result as job %s was already cancelled or completed.", job.id)
elif job_db.celery_id != task_result.celery_task_id:
logger.warning(
"Job %s has a result but was not successfully submitted yet."
"Ignoring result.",
"Job %s has a result but was is not the celery task that was expected."
"Ignoring result. Expected celery task id %s but received celery task id %s",
job.id,
job_db.celery_id,
task_result.celery_task_id,
)

elif task_result.result_type == TaskResult.ResultType.SUCCEEDED:
Expand Down
24 changes: 23 additions & 1 deletion unit_test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from uuid import UUID

from omotes_orchestrator.config import OrchestratorConfig
from omotes_orchestrator.main import LifeCycleBarrierManager, BarrierTimeoutException
from omotes_orchestrator.main import LifeCycleBarrierManager, BarrierTimeoutException, \
MissingBarrierException


class LifeCycleBarrierManagerTest(unittest.TestCase):
Expand Down Expand Up @@ -67,6 +68,7 @@ def test__set_barrier__first_set_the_barrier_then_wait(self) -> None:
# Arrange
barrier_manager = LifeCycleBarrierManager()
job_id = UUID("ab995599-a117-47b6-8da5-5d9488900858")
barrier_manager.ensure_barrier(job_id)

# Act / Assert
barrier_manager.set_barrier(job_id)
Expand All @@ -76,6 +78,7 @@ def test__set_barrier__wait_and_set_by_different_threads(self) -> None:
# Arrange
barrier_manager = LifeCycleBarrierManager()
job_id = UUID("ab995599-a117-47b6-8da5-5d9488900858")
barrier_manager.ensure_barrier(job_id)

def set_barrier() -> None:
time.sleep(0.5)
Expand All @@ -91,6 +94,7 @@ def test__wait_barrier__takes_too_long(self) -> None:
# Arrange
barrier_manager = LifeCycleBarrierManager()
job_id = UUID("ab995599-a117-47b6-8da5-5d9488900858")
barrier_manager.ensure_barrier(job_id)

# Overwrite the timeout temporarily so the test doesn't take too long.
previous_timeout = LifeCycleBarrierManager.BARRIER_WAIT_TIMEOUT
Expand All @@ -103,6 +107,24 @@ def test__wait_barrier__takes_too_long(self) -> None:
# Cleanup
LifeCycleBarrierManager.BARRIER_WAIT_TIMEOUT = previous_timeout

def test__wait_barrier__barrier_is_not_created(self) -> None:
# Arrange
barrier_manager = LifeCycleBarrierManager()
job_id = UUID("ab995599-a117-47b6-8da5-5d9488900858")

# Act / Assert
with self.assertRaises(MissingBarrierException):
barrier_manager.wait_for_barrier(job_id)

def test__set_barrier__barrier_is_not_created(self) -> None:
# Arrange
barrier_manager = LifeCycleBarrierManager()
job_id = UUID("ab995599-a117-47b6-8da5-5d9488900858")

# Act / Assert
with self.assertRaises(MissingBarrierException):
barrier_manager.set_barrier(job_id)

def test__cleanup_barrier__cleaning_non_existing_barrier(self) -> None:
# Arrange
barrier_manager = LifeCycleBarrierManager()
Expand Down

0 comments on commit d1111ad

Please sign in to comment.