Skip to content

Commit

Permalink
Merge pull request #17 from Project-OMOTES/16-update-to-sdk-0014-and-…
Browse files Browse the repository at this point in the history
…to-mvp2-release

16: Update to newest SDK and use names for workflows as used from mvp…
  • Loading branch information
lfse-slafleur authored Apr 22, 2024
2 parents 3f2d45d + 45de7d3 commit 762fd3c
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 76 deletions.
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ omotes-sdk-protocol==0.0.8
# via
# -c requirements.txt
# omotes-sdk-python
omotes-sdk-python==0.0.12
omotes-sdk-python==0.0.14
# via
# -c requirements.txt
# omotes-rest (pyproject.toml)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies = [
"python-dotenv ~= 1.0.0",
"structlog ~= 23.1.0",
"SQLAlchemy == 2.0.28",
"omotes-sdk-python ~= 0.0.12",
"omotes-sdk-python ~= 0.0.14",
]

[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ mypy-extensions==1.0.0
# via typing-inspect
omotes-sdk-protocol==0.0.8
# via omotes-sdk-python
omotes-sdk-python==0.0.12
omotes-sdk-python==0.0.14
# via omotes-rest (pyproject.toml)
packaging==23.1
# via
Expand Down
2 changes: 1 addition & 1 deletion src/gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from omotes_rest import main

bind = '0.0.0.0:9200'
bind = "0.0.0.0:9200"
workers = 1
loglevel = "info"
timeout = 300
Expand Down
2 changes: 1 addition & 1 deletion src/omotes_rest/apis/api_dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class JobInput:
Schema: ClassVar[Type[Schema]] = Schema

job_name: str = "job name"
workflow_type: str = "grow_optimizer"
workflow_type: str = "Draft Design - Quickscan Validation"
user_name: str = "user name"
input_esdl: str = "input ESDL base64string"
project_name: str = "project name"
Expand Down
31 changes: 3 additions & 28 deletions src/omotes_rest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from werkzeug.wrappers.response import Response as WerkzeugResponse
from gunicorn.arbiter import Arbiter
from gunicorn.workers.sync import SyncWorker
from omotes_sdk.workflow_type import WorkflowTypeManager, WorkflowType

from omotes_rest import create_app
from omotes_rest.rest_interface import RestInterface
from omotes_rest.settings import EnvSettings
from omotes_rest.workflows import WORKFLOW_TYPE_MANAGER
from omotes_rest.typed_app import current_app


"""logger."""
logger = logging.getLogger("omotes_rest")

Expand Down Expand Up @@ -81,38 +82,12 @@ def handle_500(e: Exception) -> tuple[str, int]:
return json.dumps({"message": "Internal Server Error"}), 500


# TODO to be retrieved via de omotes_sdk in the future
workflow_type_manager = WorkflowTypeManager(
possible_workflows=[
WorkflowType(
workflow_type_name="grow_optimizer", workflow_type_description_name="Grow Optimizer"
),
WorkflowType(
workflow_type_name="grow_simulator", workflow_type_description_name="Grow Simulator"
),
WorkflowType(
workflow_type_name="grow_optimizer_no_heat_losses",
workflow_type_description_name="Grow Optimizer without heat losses",
),
WorkflowType(
workflow_type_name="grow_optimizer_no_heat_losses_discounted_capex",
workflow_type_description_name="Grow Optimizer without heat losses and a "
"discounted CAPEX",
),
WorkflowType(
workflow_type_name="simulator",
workflow_type_description_name="High fidelity simulator",
),
]
)


def post_fork(_: Arbiter, __: SyncWorker) -> None:
"""Called just after a worker has been forked."""
with app.app_context():
"""current_app is only within the app context"""

current_app.rest_if = RestInterface(workflow_type_manager)
current_app.rest_if = RestInterface(WORKFLOW_TYPE_MANAGER)
"""Interface for this Omotes Rest service."""

current_app.rest_if.start()
Expand Down
55 changes: 30 additions & 25 deletions src/omotes_rest/postgres_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ def initialize_db(application_name: str, config: POSTGRESConfig) -> Engine:
:param config: Configuration on how to connect to the SQL database.
"""
logger.info(
"Connecting to PostgresDB at %s:%s as user %s to db %s", config.host, config.port,
config.username, config.database
"Connecting to PostgresDB at %s:%s as user %s to db %s",
config.host,
config.port,
config.username,
config.database,
)

try:
Expand Down Expand Up @@ -128,10 +131,10 @@ def stop(self) -> None:
self.engine.dispose()

def put_new_job(
self,
job_id: uuid.UUID,
job_input: JobInput,
esdl_input: str,
self,
job_id: uuid.UUID,
job_input: JobInput,
esdl_input: str,
) -> None:
"""Insert a new job into the database.
Expand All @@ -154,7 +157,7 @@ def put_new_job(
user_name=job_input.user_name,
project_name=job_input.project_name,
input_params_dict=job_input.input_params_dict,
input_esdl=esdl_input
input_esdl=esdl_input,
)
session.add(new_job)
logger.debug("Job %s is submitted as new job in database", job_id)
Expand All @@ -170,9 +173,7 @@ def set_job_registered(self, job_id: uuid.UUID) -> None:
stmnt = (
update(JobRest)
.where(JobRest.job_id == job_id)
.values(
status=JobRestStatus.REGISTERED, registered_at=datetime.now()
)
.values(status=JobRestStatus.REGISTERED, registered_at=datetime.now())
)
session.execute(stmnt)

Expand All @@ -187,9 +188,7 @@ def set_job_enqueued(self, job_id: uuid.UUID) -> None:
stmnt = (
update(JobRest)
.where(JobRest.job_id == job_id)
.values(
status=JobRestStatus.ENQUEUED, submitted_at=datetime.now()
)
.values(status=JobRestStatus.ENQUEUED, submitted_at=datetime.now())
)
session.execute(stmnt)

Expand All @@ -204,14 +203,17 @@ def set_job_running(self, job_id: uuid.UUID) -> None:
stmnt = (
update(JobRest)
.where(JobRest.job_id == job_id)
.values(
status=JobRestStatus.RUNNING, running_at=datetime.now()
)
.values(status=JobRestStatus.RUNNING, running_at=datetime.now())
)
session.execute(stmnt)

def set_job_stopped(self, job_id: uuid.UUID, new_status: JobRestStatus, logs: str | None = None,
output_esdl: str | None = None) -> None:
def set_job_stopped(
self,
job_id: uuid.UUID,
new_status: JobRestStatus,
logs: str | None = None,
output_esdl: str | None = None,
) -> None:
"""Set the job to stopped with supplied status.
:param job_id: Job id.
Expand All @@ -231,23 +233,26 @@ def set_job_stopped(self, job_id: uuid.UUID, new_status: JobRestStatus, logs: st
)
session.execute(stmnt)

def set_job_progress(self, job_id: uuid.UUID, progress_fraction: float,
progress_message: str) -> None:
def set_job_progress(
self, job_id: uuid.UUID, progress_fraction: float, progress_message: str
) -> None:
"""Set the status of the job to RUNNING.
:param job_id: Job id.
:param progress_fraction: new progress fraction.
:param progress_message: new progress message.
"""
logger.debug("For job '%s' received a new progress '%s' with message '%s'",
job_id, progress_fraction, progress_message)
logger.debug(
"For job '%s' received a new progress '%s' with message '%s'",
job_id,
progress_fraction,
progress_message,
)
with session_scope() as session:
stmnt = (
update(JobRest)
.where(JobRest.job_id == job_id)
.values(
progress_fraction=progress_fraction, progress_message=progress_message
)
.values(progress_fraction=progress_fraction, progress_message=progress_message)
)
session.execute(stmnt)

Expand Down
39 changes: 21 additions & 18 deletions src/omotes_rest/rest_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
JobProgressUpdate,
JobStatusUpdate,
)
from omotes_sdk.workflow_type import WorkflowType, WorkflowTypeManager
from omotes_sdk.workflow_type import WorkflowTypeManager

import logging
from omotes_rest.postgres_interface import PostgresInterface
from omotes_rest.config import POSTGRESConfig
from omotes_rest.apis.api_dataclasses import JobInput, JobStatusResponse
from omotes_rest.db_models.job_rest import JobRestStatus, JobRest
from omotes_rest.workflows import FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME

logger = logging.getLogger("omotes_rest")

Expand All @@ -34,9 +35,9 @@ class RestInterface:
def __init__(self, workflow_type_manager: WorkflowTypeManager):
"""Create the omotes rest interface.
:param workflow_type_manager: All available workflow types.
:param workflow_type_manager: All available OMOTES workflow types.
"""
self.omotes_if = OmotesInterface(EnvRabbitMQConfig())
self.omotes_if = OmotesInterface(EnvRabbitMQConfig(), workflow_type_manager)
self.postgres_if = PostgresInterface(POSTGRESConfig())
self.workflow_type_manager = workflow_type_manager

Expand Down Expand Up @@ -67,10 +68,7 @@ def handle_on_job_finished(self, job: Job, result: JobResult) -> None:
raise NotImplementedError(f"Unknown result type '{result.result_type}'")

self.postgres_if.set_job_stopped(
job_id=job.id,
new_status=final_status,
logs=result.logs,
output_esdl=result.output_esdl
job_id=job.id, new_status=final_status, logs=result.logs, output_esdl=result.output_esdl
)

def handle_on_job_status_update(self, job: Job, status_update: JobStatusUpdate) -> None:
Expand Down Expand Up @@ -116,19 +114,24 @@ def submit_job(self, job_input: JobInput) -> JobStatusResponse:
:param job_input: JobInput dataclass with job input.
:return: JobStatusResponse.
"""
esdlstr_bytes = job_input.input_esdl.encode('utf-8')
workflow_type = self.workflow_type_manager.get_workflow_by_name(
FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_input.workflow_type]
)
if not workflow_type:
raise RuntimeError(f"Unknown workflow type {job_input.workflow_type}")

esdlstr_bytes = job_input.input_esdl.encode("utf-8")
esdlstr_base64_bytes = base64.b64decode(esdlstr_bytes)
esdl_str = esdlstr_base64_bytes.decode('utf-8')
esdl_str = esdlstr_base64_bytes.decode("utf-8")
job = self.omotes_if.submit_job(
esdl=esdl_str,
params_dict=job_input.input_params_dict,
workflow_type=WorkflowType(workflow_type_name=job_input.workflow_type,
workflow_type_description_name="some descr"),
workflow_type=workflow_type,
job_timeout=timedelta(seconds=job_input.timeout_after_s),
callback_on_finished=self.handle_on_job_finished,
callback_on_progress_update=self.handle_on_job_progress_update,
callback_on_status_update=self.handle_on_job_status_update,
auto_disconnect_on_result=True
auto_disconnect_on_result=True,
)
self.postgres_if.put_new_job(
job_id=job.id,
Expand Down Expand Up @@ -161,13 +164,13 @@ def cancel_job(self, job_id: uuid.UUID) -> bool:
job_in_db = self.get_job(job_id)

if job_in_db:
job = Job(
id=job_id,
workflow_type=WorkflowType(
workflow_type_name=job_in_db.workflow_type,
workflow_type_description_name="some descr"
)
workflow_type = self.workflow_type_manager.get_workflow_by_name(
FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_in_db.workflow_type]
)
if not workflow_type:
raise RuntimeError(f"Unknown workflow type {job_in_db.workflow_type}")

job = Job(id=job_id, workflow_type=workflow_type)
self.omotes_if.cancel_job(job)
result = True
else:
Expand Down
34 changes: 34 additions & 0 deletions src/omotes_rest/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from omotes_sdk.workflow_type import WorkflowTypeManager, WorkflowType

# TODO to be retrieved via de omotes_sdk in the future
WORKFLOW_TYPE_MANAGER = WorkflowTypeManager(
possible_workflows=[
WorkflowType(
workflow_type_name="grow_optimizer_default",
workflow_type_description_name="Grow Optimizer",
),
WorkflowType(
workflow_type_name="grow_simulator", workflow_type_description_name="Grow Simulator"
),
WorkflowType(
workflow_type_name="grow_optimizer_no_heat_losses",
workflow_type_description_name="Grow Optimizer without heat losses",
),
WorkflowType(
workflow_type_name="grow_optimizer_with_pressure",
workflow_type_description_name="Grow Optimizer with pressure drops",
),
WorkflowType(
workflow_type_name="simulator",
workflow_type_description_name="High fidelity simulator",
),
]
)

FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME = {
"Draft Design - Quickscan Validation": "grow_optimizer_no_heat_losses",
"Draft Design - Optimization": "grow_optimizer_default",
"Draft Design - Optimization with Pressure Drops": "grow_optimizer_with_pressure",
"Draft Design - Simulation with Source Merit Order": "grow_simulator",
"Conceptual Design - Simulation": "simulator",
}

0 comments on commit 762fd3c

Please sign in to comment.