Skip to content

Commit

Permalink
Merge pull request #15 from Project-OMOTES/sdk-0.0.3
Browse files Browse the repository at this point in the history
Sdk 0.0.3
  • Loading branch information
lfse-slafleur authored Feb 12, 2024
2 parents 1296a8f + 77da62c commit 83d875c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
[project]
name = "orchestrator"
dynamic = ["version"]
authors = [{ name = "Sebastiaan la Fleur", email = "[email protected]" }]
authors = [
{ name = "Sebastiaan la Fleur", email = "[email protected]" },
{ name = "Mark Vrijlandt", email = "[email protected]" },
]
description = "Orchestrator component of OMOTES project which monitors workflows and starts the various steps of each workflow."
classifiers = [
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
Expand All @@ -23,8 +26,7 @@ dependencies = [
"sqlalchemy ~= 2.0.23",
"psycopg2 ~= 2.9.9",
"celery[sqlalchemy] ~= 5.3.6",
"omotes-sdk-python ~= 0.0.1",
"omotes-job-tools ~= 0.0.1",
"omotes-sdk-python ~= 0.0.3",
"jsonpickle ~= 3.0.2",
]

Expand Down
2 changes: 1 addition & 1 deletion src/omotes_orchestrator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
__init__.py file containing the defaults
"""
import os
from omotes_sdk.app_logging import setup_logging, LogLevel
from omotes_sdk.internal.common.app_logging import setup_logging, LogLevel

Check failure on line 20 in src/omotes_orchestrator/__init__.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.common.app_logging": module is installed, but missing library stubs or py.typed marker [import]

Check failure on line 20 in src/omotes_orchestrator/__init__.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.common.app_logging": module is installed, but missing library stubs or py.typed marker [import]

setup_logging(LogLevel.parse(os.environ.get("LOG_LEVEL", "DEBUG")), "omotes_orchestrator")
49 changes: 36 additions & 13 deletions src/omotes_orchestrator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
import threading

import jsonpickle

Check failure on line 6 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "jsonpickle": module is installed, but missing library stubs or py.typed marker [import]

Check failure on line 6 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "jsonpickle": module is installed, but missing library stubs or py.typed marker [import]
from omotes_job_tools.messages import StatusUpdateMessage, TaskStatus, CalculationResult
from omotes_sdk.internal.orchestrator_worker_events.messages import (

Check failure on line 7 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.orchestrator_worker_events.messages": module is installed, but missing library stubs or py.typed marker [import]

Check failure on line 7 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.orchestrator_worker_events.messages": module is installed, but missing library stubs or py.typed marker [import]
StatusUpdateMessage,
TaskStatus,
CalculationResult,
)
from omotes_sdk.internal.orchestrator.orchestrator_interface import OrchestratorInterface

Check failure on line 12 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.orchestrator.orchestrator_interface": module is installed, but missing library stubs or py.typed marker [import]

Check failure on line 12 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.orchestrator.orchestrator_interface": module is installed, but missing library stubs or py.typed marker [import]
from omotes_sdk.internal.common.broker_interface import BrokerInterface as JobBrokerInterface

Check failure on line 13 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.common.broker_interface": module is installed, but missing library stubs or py.typed marker [import]

Check failure on line 13 in src/omotes_orchestrator/main.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11)

error: Skipping analyzing "omotes_sdk.internal.common.broker_interface": module is installed, but missing library stubs or py.typed marker [import]
from omotes_sdk_protocol.job_pb2 import JobSubmission, JobResult
from omotes_sdk.config import RabbitMQConfig
from omotes_sdk.job import Job
from omotes_sdk.orchestrator_interface import OrchestratorInterface
from omotes_sdk.workflow_type import WorkflowTypeManager, WorkflowType
from omotes_job_tools.broker_interface import BrokerInterface as JobBrokerInterface

from omotes_orchestrator.celery_interface import CeleryInterface, PostgreSQLConfig


logger = logging.getLogger("omotes_orchestrator")


Expand All @@ -36,7 +39,9 @@ def __init__(
def start(self):
self.celery_if.start()
self.omotes_if.start()
self.omotes_if.connect_to_job_submissions(callback_on_new_job=self.new_job_submitted_handler)
self.omotes_if.connect_to_job_submissions(
callback_on_new_job=self.new_job_submitted_handler
)
self.jobs_broker_if.start()
self.jobs_broker_if.add_queue_subscription("omotes_task_events", self.task_status_update)

Expand All @@ -46,7 +51,9 @@ def stop(self):
self.jobs_broker_if.stop()

def new_job_submitted_handler(self, job_submission: JobSubmission, job: Job) -> None:
logger.info("Received new job %s for workflow type %s", job.id, job_submission.workflow_type)
logger.info(
"Received new job %s for workflow type %s", job.id, job_submission.workflow_type
)
self.celery_if.start_workflow(job.workflow_type, job.id, job_submission.esdl)

def task_status_update(self, serialized_message: bytes) -> None:
Expand All @@ -59,11 +66,17 @@ def task_status_update(self, serialized_message: bytes) -> None:
)
if status_update.status == TaskStatus.SUCCEEDED:
job = Job(
id=status_update.omotes_job_id, workflow_type=WorkflowType(status_update.task_type, "")
id=status_update.omotes_job_id,
workflow_type=WorkflowType(status_update.task_type, ""),
) # TODO Get workflow from WorkflowManager
result: CalculationResult = jsonpickle.decode(self.celery_if.retrieve_result(status_update.celery_task_id))
result: CalculationResult = jsonpickle.decode(
self.celery_if.retrieve_result(status_update.celery_task_id),
classes=CalculationResult,
)
logger.info(
"Received result for job %s through task %s", status_update.omotes_job_id, status_update.celery_task_id
"Received succeeded result for job %s through task %s",
status_update.omotes_job_id,
status_update.celery_task_id,
)
job_result_msg = JobResult(
uuid=str(job.id),
Expand All @@ -74,15 +87,25 @@ def task_status_update(self, serialized_message: bytes) -> None:


def main():
omotes_rabbitmq_config = RabbitMQConfig(username="omotes", password="somepass1", virtual_host="omotes")
celery_rabbitmq_config = RabbitMQConfig(username="celery", password="somepass2", virtual_host="omotes_celery")
omotes_rabbitmq_config = RabbitMQConfig(
username="omotes", password="somepass1", virtual_host="omotes"
)
celery_rabbitmq_config = RabbitMQConfig(
username="celery", password="somepass2", virtual_host="omotes_celery"
)
celery_postgresql_config = PostgreSQLConfig(
username="celery", password="somepass3", database="omotes_celery", host="localhost", port=5432
username="celery",
password="somepass3",
database="omotes_celery",
host="localhost",
port=5432,
)

workflow_type_manager = WorkflowTypeManager(
possible_workflows=[
WorkflowType(workflow_type_name="grow_optimizer", workflow_type_description_name="Grow Optimizer")
WorkflowType(
workflow_type_name="grow_optimizer", workflow_type_description_name="Grow Optimizer"
)
]
)
orchestrator_if = OrchestratorInterface(omotes_rabbitmq_config, workflow_type_manager)
Expand Down

0 comments on commit 83d875c

Please sign in to comment.