diff --git a/pyproject.toml b/pyproject.toml index 7e2d436..9b09803 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,7 +68,7 @@ enabled = true [tool.pytest.ini_options] addopts = """--cov=omotes_orchestrator --cov-report html --cov-report term-missing \ ---cov-fail-under 58""" +--cov-fail-under 57""" [tool.coverage.run] source = ["src"] diff --git a/src/omotes_orchestrator/main.py b/src/omotes_orchestrator/main.py index fdf1d12..407e402 100644 --- a/src/omotes_orchestrator/main.py +++ b/src/omotes_orchestrator/main.py @@ -22,8 +22,16 @@ JobProgressUpdate, JobCancel, ) +from omotes_sdk_protocol.work_flow_pb2 import RequestAvailableWorkflows from omotes_sdk.job import Job -from omotes_sdk.workflow_type import WorkflowTypeManager, WorkflowType +from omotes_sdk.workflow_type import ( + WorkflowTypeManager, + WorkflowType, + WorkflowParameter, + ParameterType, + ParameterSchema, + ParameterStringFormat, +) from google.protobuf import json_format from omotes_orchestrator.celery_interface import CeleryInterface @@ -187,11 +195,7 @@ def start(self) -> None: self._resume_init_barriers(self.postgresql_if.get_all_jobs()) self.celery_if.start() - self.omotes_if.start() - self.omotes_if.connect_to_job_submissions( - callback_on_new_job=self.new_job_submitted_handler - ) - self.omotes_if.connect_to_job_cancellations(self.job_cancellation_handler) + self.jobs_broker_if.start() self.jobs_broker_if.add_queue_subscription( "omotes_task_result_events", self.task_result_received @@ -200,6 +204,19 @@ def start(self) -> None: "omotes_task_progress_events", self.task_progress_update ) + self.omotes_if.start() + self.omotes_if.connect_to_request_available_workflows( + callback_on_request_workflows=self.request_workflows_handler + ) + self.omotes_if.connect_to_job_submissions( + callback_on_new_job=self.new_job_submitted_handler + ) + self.omotes_if.connect_to_job_cancellations( + callback_on_job_cancel=self.job_cancellation_handler + ) + + self.omotes_if.send_available_workflows(self.workflow_manager) + def stop(self) -> None: """Stop the orchestrator.""" self.omotes_if.stop() @@ -319,6 +336,14 @@ def job_cancellation_handler(self, job_cancellation: JobCancel) -> None: ) self._cleanup_job(job_id) + def request_workflows_handler(self, request_workflows: RequestAvailableWorkflows) -> None: + """When a available work flows request is received from the SDK. + + :param request_workflows: Request available work flows. + """ + logger.info("Received an available workflows request") + self.omotes_if.send_available_workflows(self.workflow_manager) + def _cleanup_job(self, job_id: uuid.UUID) -> None: """Cleanup any references to job with id `job_id`. @@ -516,7 +541,8 @@ def main() -> None: workflow_type_description_name="Grow Optimizer default workflow", ), WorkflowType( - workflow_type_name="grow_simulator", workflow_type_description_name="Grow Simulator" + workflow_type_name="grow_simulator", + workflow_type_description_name="Grow Simulator", ), WorkflowType( workflow_type_name="grow_optimizer_no_heat_losses", @@ -529,6 +555,31 @@ def main() -> None: WorkflowType( workflow_type_name="simulator", workflow_type_description_name="High fidelity simulator", + workflow_parameters=[ + WorkflowParameter( + key_name="start_time", + schema=ParameterSchema( + type=ParameterType.STRING, + format=ParameterStringFormat.DATETIME, + ), + ), + WorkflowParameter( + key_name="step_size_in_seconds", + schema=ParameterSchema( + type=ParameterType.INTEGER, + default="60", + minimum=0, + ), + ), + WorkflowParameter( + key_name="number_of_steps", + schema=ParameterSchema( + type=ParameterType.INTEGER, + minimum=0, + maximum=1, + ), + ), + ], ), WorkflowType( workflow_type_name="test_worker",