From 20c7be6488b8f15695fbbd1c67b34b6ee22cef8a Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Mon, 18 Nov 2024 14:44:37 +0100 Subject: [PATCH 1/3] add esdl messages --- src/omotes_orchestrator/main.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/omotes_orchestrator/main.py b/src/omotes_orchestrator/main.py index 430e81e..45663e3 100644 --- a/src/omotes_orchestrator/main.py +++ b/src/omotes_orchestrator/main.py @@ -13,6 +13,7 @@ from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import ( TaskResult, TaskProgressUpdate, + TaskEsdlMessage, ) from omotes_sdk_protocol.job_pb2 import ( JobSubmission, @@ -20,6 +21,7 @@ JobStatusUpdate, JobProgressUpdate, JobCancel, + EsdlMessage, ) from omotes_sdk_protocol.workflow_pb2 import RequestAvailableWorkflows from omotes_sdk.workflow_type import WorkflowTypeManager @@ -453,6 +455,16 @@ def task_result_received(self, task_result: TaskResult) -> None: self._init_barriers.wait_for_barrier(job.id) job_db = self.postgresql_if.get_job(job.id) + esdl_messages_job = [ + EsdlMessage( + technical_message=esdl_message_task.technical_message, + severity=EsdlMessage.Severity.Value( + TaskEsdlMessage.Severity.Name(esdl_message_task.severity) + ), + esdl_object_id=esdl_message_task.esdl_object_id, + ) + for esdl_message_task in task_result.esdl_messages + ] # Confirm the job is still relevant. if job_db is None: logger.info("Ignoring result as job %s was already cancelled or completed.", job.id) @@ -477,6 +489,7 @@ def task_result_received(self, task_result: TaskResult) -> None: result_type=JobResult.ResultType.SUCCEEDED, output_esdl=task_result.output_esdl, logs=task_result.logs, + esdl_messages=esdl_messages_job, ), ) self._cleanup_job(job.id) @@ -493,6 +506,7 @@ def task_result_received(self, task_result: TaskResult) -> None: result_type=JobResult.ResultType.ERROR, output_esdl=task_result.output_esdl, logs=task_result.logs, + esdl_messages=esdl_messages_job, ), ) self._cleanup_job(job.id) From c5f638dba366f0da49ef9fe4e3b65a829d833744 Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Wed, 20 Nov 2024 12:14:21 +0100 Subject: [PATCH 2/3] move task.proto to omotes-sdk-protocol --- src/omotes_orchestrator/main.py | 18 +++--------------- src/omotes_orchestrator/worker_interface.py | 2 +- unit_test/test_main.py | 2 +- 3 files changed, 5 insertions(+), 17 deletions(-) diff --git a/src/omotes_orchestrator/main.py b/src/omotes_orchestrator/main.py index 45663e3..c850c5b 100644 --- a/src/omotes_orchestrator/main.py +++ b/src/omotes_orchestrator/main.py @@ -10,10 +10,9 @@ from omotes_orchestrator.postgres_interface import PostgresInterface from omotes_orchestrator.postgres_job_manager import PostgresJobManager -from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import ( +from omotes_sdk_protocol.internal.task_pb2 import ( TaskResult, TaskProgressUpdate, - TaskEsdlMessage, ) from omotes_sdk_protocol.job_pb2 import ( JobSubmission, @@ -21,7 +20,6 @@ JobStatusUpdate, JobProgressUpdate, JobCancel, - EsdlMessage, ) from omotes_sdk_protocol.workflow_pb2 import RequestAvailableWorkflows from omotes_sdk.workflow_type import WorkflowTypeManager @@ -455,16 +453,6 @@ def task_result_received(self, task_result: TaskResult) -> None: self._init_barriers.wait_for_barrier(job.id) job_db = self.postgresql_if.get_job(job.id) - esdl_messages_job = [ - EsdlMessage( - technical_message=esdl_message_task.technical_message, - severity=EsdlMessage.Severity.Value( - TaskEsdlMessage.Severity.Name(esdl_message_task.severity) - ), - esdl_object_id=esdl_message_task.esdl_object_id, - ) - for esdl_message_task in task_result.esdl_messages - ] # Confirm the job is still relevant. if job_db is None: logger.info("Ignoring result as job %s was already cancelled or completed.", job.id) @@ -489,7 +477,7 @@ def task_result_received(self, task_result: TaskResult) -> None: result_type=JobResult.ResultType.SUCCEEDED, output_esdl=task_result.output_esdl, logs=task_result.logs, - esdl_messages=esdl_messages_job, + esdl_messages=task_result.esdl_messages, ), ) self._cleanup_job(job.id) @@ -506,7 +494,7 @@ def task_result_received(self, task_result: TaskResult) -> None: result_type=JobResult.ResultType.ERROR, output_esdl=task_result.output_esdl, logs=task_result.logs, - esdl_messages=esdl_messages_job, + esdl_messages=task_result.esdl_messages, ), ) self._cleanup_job(job.id) diff --git a/src/omotes_orchestrator/worker_interface.py b/src/omotes_orchestrator/worker_interface.py index b329c5f..1d14e01 100644 --- a/src/omotes_orchestrator/worker_interface.py +++ b/src/omotes_orchestrator/worker_interface.py @@ -3,7 +3,7 @@ from typing import Callable from omotes_sdk.internal.common.broker_interface import BrokerInterface, AMQPQueueType -from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import ( +from omotes_sdk_protocol.internal.task_pb2 import ( TaskResult, TaskProgressUpdate, ) diff --git a/unit_test/test_main.py b/unit_test/test_main.py index 2e94208..572f1cf 100644 --- a/unit_test/test_main.py +++ b/unit_test/test_main.py @@ -11,7 +11,7 @@ from omotes_sdk.job import Job from omotes_sdk.workflow_type import WorkflowType -from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import TaskProgressUpdate +from omotes_sdk_protocol.internal.task_pb2 import TaskProgressUpdate from omotes_sdk_protocol.job_pb2 import JobSubmission, JobProgressUpdate, JobStatusUpdate, JobResult from google.protobuf import json_format from google.protobuf.struct_pb2 import Struct From e7c6d1f205a0ba4099ff005a325be99af9074d1d Mon Sep 17 00:00:00 2001 From: Mark Vrijlandt Date: Thu, 28 Nov 2024 09:37:35 +0100 Subject: [PATCH 3/3] to omotes-python-sdk 3.2.5, add depends_on for integration tests --- dev-requirements.txt | 4 ++-- integration_test/docker-compose.override.yml | 3 +++ pyproject.toml | 2 +- requirements.txt | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 208baf2..484c0c0 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -115,11 +115,11 @@ mypy-extensions==1.0.0 # via # black # mypy -omotes-sdk-protocol==0.1.6 +omotes-sdk-protocol==0.1.13 # via # -c requirements.txt # omotes-sdk-python -omotes-sdk-python==3.2.4 +omotes-sdk-python==3.2.5 # via # -c requirements.txt # orchestrator (pyproject.toml) diff --git a/integration_test/docker-compose.override.yml b/integration_test/docker-compose.override.yml index 35eebdb..8a74edf 100644 --- a/integration_test/docker-compose.override.yml +++ b/integration_test/docker-compose.override.yml @@ -23,6 +23,9 @@ services: RABBITMQ_VIRTUALHOST: omotes_celery LOG_LEVEL: ${LOG_LEVEL} WORKER_TYPE: NO_FAULT + depends_on: + rabbitmq: + condition: service_healthy test_hard_crash_worker: <<: *test_worker diff --git a/pyproject.toml b/pyproject.toml index c472b48..143fb25 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ "sqlalchemy ~= 2.0.27", "psycopg2-binary ~= 2.9.9", "celery ~= 5.3.6", - "omotes-sdk-python ~= 3.2.4", + "omotes-sdk-python ~= 3.2.5", "alembic ~= 1.13.1", ] diff --git a/requirements.txt b/requirements.txt index f9b01b9..5d6ad2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -48,9 +48,9 @@ markupsafe==2.1.5 # via mako multidict==6.0.5 # via yarl -omotes-sdk-protocol==0.1.6 +omotes-sdk-protocol==0.1.13 # via omotes-sdk-python -omotes-sdk-python==3.2.4 +omotes-sdk-python==3.2.5 # via orchestrator (pyproject.toml) ordered-set==4.1.0 # via pyecore