From af641e15c98a21f3e52c146d366fe253a14ed3a3 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Wed, 17 Apr 2024 14:44:02 +0200 Subject: [PATCH 1/3] 16: Update to newest SDK and use names for workflows as used from mvp.2 on. --- dev-requirements.txt | 2 +- pyproject.toml | 2 +- requirements.txt | 2 +- scripts/setup.sh | 0 src/omotes_rest/apis/api_dataclasses.py | 2 +- src/omotes_rest/main.py | 32 ++-------------------- src/omotes_rest/rest_interface.py | 15 +++++------ src/omotes_rest/workflows.py | 35 +++++++++++++++++++++++++ 8 files changed, 47 insertions(+), 43 deletions(-) mode change 100644 => 100755 scripts/setup.sh create mode 100644 src/omotes_rest/workflows.py diff --git a/dev-requirements.txt b/dev-requirements.txt index 718bfe8..699ff26 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -172,7 +172,7 @@ omotes-sdk-protocol==0.0.8 # via # -c requirements.txt # omotes-sdk-python -omotes-sdk-python==0.0.12 +omotes-sdk-python==0.0.14 # via # -c requirements.txt # omotes-rest (pyproject.toml) diff --git a/pyproject.toml b/pyproject.toml index 08994c3..150407d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "python-dotenv ~= 1.0.0", "structlog ~= 23.1.0", "SQLAlchemy == 2.0.28", - "omotes-sdk-python ~= 0.0.12", + "omotes-sdk-python ~= 0.0.14", ] [project.optional-dependencies] diff --git a/requirements.txt b/requirements.txt index ffea1ae..93f224a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -84,7 +84,7 @@ mypy-extensions==1.0.0 # via typing-inspect omotes-sdk-protocol==0.0.8 # via omotes-sdk-python -omotes-sdk-python==0.0.12 +omotes-sdk-python==0.0.14 # via omotes-rest (pyproject.toml) packaging==23.1 # via diff --git a/scripts/setup.sh b/scripts/setup.sh old mode 100644 new mode 100755 diff --git a/src/omotes_rest/apis/api_dataclasses.py b/src/omotes_rest/apis/api_dataclasses.py index 20e13cb..f84b6a7 100644 --- a/src/omotes_rest/apis/api_dataclasses.py +++ b/src/omotes_rest/apis/api_dataclasses.py @@ -35,7 +35,7 @@ class JobInput: Schema: ClassVar[Type[Schema]] = Schema job_name: str = "job name" - workflow_type: str = "grow_optimizer" + workflow_type: str = "Draft Design - Quickscan Validation" user_name: str = "user name" input_esdl: str = "input ESDL base64string" project_name: str = "project name" diff --git a/src/omotes_rest/main.py b/src/omotes_rest/main.py index 97fe732..55fcd31 100644 --- a/src/omotes_rest/main.py +++ b/src/omotes_rest/main.py @@ -13,8 +13,8 @@ from omotes_rest.apis.job import OmotesRestApp from omotes_rest.rest_interface import RestInterface from omotes_rest.settings import EnvSettings +from omotes_rest.workflows import WORKFLOW_TYPE_MANAGER import logging -from omotes_sdk.workflow_type import WorkflowTypeManager, WorkflowType """logger.""" logger = logging.getLogger("omotes_rest") @@ -81,40 +81,12 @@ def handle_500(e: Exception) -> tuple[str, int]: }), 500 -# TODO to be retrieved via de omotes_sdk in the future -workflow_type_manager = WorkflowTypeManager( - possible_workflows=[ - WorkflowType( - workflow_type_name="grow_optimizer", - workflow_type_description_name="Grow Optimizer" - ), - WorkflowType( - workflow_type_name="grow_simulator", - workflow_type_description_name="Grow Simulator" - ), - WorkflowType( - workflow_type_name="grow_optimizer_no_heat_losses", - workflow_type_description_name="Grow Optimizer without heat losses", - ), - WorkflowType( - workflow_type_name="grow_optimizer_no_heat_losses_discounted_capex", - workflow_type_description_name="Grow Optimizer without heat losses and a " - "discounted CAPEX", - ), - WorkflowType( - workflow_type_name="simulator", - workflow_type_description_name="High fidelity simulator", - ), - ] -) - - def post_fork(_: Arbiter, __: SyncWorker) -> None: """Called just after a worker has been forked.""" with app.app_context(): """current_app is only within the app context""" - current_app.rest_if = RestInterface(workflow_type_manager) + current_app.rest_if = RestInterface(WORKFLOW_TYPE_MANAGER) """Interface for this Omotes Rest service.""" current_app.rest_if.start() diff --git a/src/omotes_rest/rest_interface.py b/src/omotes_rest/rest_interface.py index f7d4f50..48b53af 100644 --- a/src/omotes_rest/rest_interface.py +++ b/src/omotes_rest/rest_interface.py @@ -10,13 +10,14 @@ JobProgressUpdate, JobStatusUpdate, ) -from omotes_sdk.workflow_type import WorkflowType, WorkflowTypeManager +from omotes_sdk.workflow_type import WorkflowTypeManager import logging from omotes_rest.postgres_interface import PostgresInterface from omotes_rest.config import POSTGRESConfig from omotes_rest.apis.api_dataclasses import JobInput, JobStatusResponse from omotes_rest.db_models.job_rest import JobRestStatus, JobRest +from omotes_rest.workflows import FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME logger = logging.getLogger("omotes_rest") @@ -34,9 +35,9 @@ class RestInterface: def __init__(self, workflow_type_manager: WorkflowTypeManager): """Create the omotes rest interface. - :param workflow_type_manager: All available workflow types. + :param workflow_type_manager: All available OMOTES workflow types. """ - self.omotes_if = OmotesInterface(EnvRabbitMQConfig()) + self.omotes_if = OmotesInterface(EnvRabbitMQConfig(), workflow_type_manager) self.postgres_if = PostgresInterface(POSTGRESConfig()) self.workflow_type_manager = workflow_type_manager @@ -122,8 +123,7 @@ def submit_job(self, job_input: JobInput) -> JobStatusResponse: job = self.omotes_if.submit_job( esdl=esdl_str, params_dict=job_input.input_params_dict, - workflow_type=WorkflowType(workflow_type_name=job_input.workflow_type, - workflow_type_description_name="some descr"), + workflow_type=self.workflow_type_manager.get_workflow_by_name(FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_input.workflow_type]), job_timeout=timedelta(seconds=job_input.timeout_after_s), callback_on_finished=self.handle_on_job_finished, callback_on_progress_update=self.handle_on_job_progress_update, @@ -163,10 +163,7 @@ def cancel_job(self, job_id: uuid.UUID) -> bool: if job_in_db: job = Job( id=job_id, - workflow_type=WorkflowType( - workflow_type_name=job_in_db.workflow_type, - workflow_type_description_name="some descr" - ) + workflow_type=self.workflow_type_manager.get_workflow_by_name(FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_in_db.workflow_type]) ) self.omotes_if.cancel_job(job) result = True diff --git a/src/omotes_rest/workflows.py b/src/omotes_rest/workflows.py new file mode 100644 index 0000000..bca111c --- /dev/null +++ b/src/omotes_rest/workflows.py @@ -0,0 +1,35 @@ +from omotes_sdk.workflow_type import WorkflowTypeManager, WorkflowType + +# TODO to be retrieved via de omotes_sdk in the future +WORKFLOW_TYPE_MANAGER = WorkflowTypeManager( + possible_workflows=[ + WorkflowType( + workflow_type_name="grow_optimizer_default", + workflow_type_description_name="Grow Optimizer" + ), + WorkflowType( + workflow_type_name="grow_simulator", + workflow_type_description_name="Grow Simulator" + ), + WorkflowType( + workflow_type_name="grow_optimizer_no_heat_losses", + workflow_type_description_name="Grow Optimizer without heat losses", + ), + WorkflowType( + workflow_type_name="grow_optimizer_with_pressure", + workflow_type_description_name="Grow Optimizer with pressure drops", + ), + WorkflowType( + workflow_type_name="simulator", + workflow_type_description_name="High fidelity simulator", + ), + ] +) + +FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME = { + 'Draft Design - Quickscan Validation': 'grow_optimizer_no_heat_losses', + 'Draft Design - Optimization': 'grow_optimizer_default', + 'Draft Design - Optimization with Pressure Drops': 'grow_optimizer_with_pressure', + 'Draft Design - Simulation with Source Merit Order': 'grow_simulator', + 'Conceptual Design - Simulation': 'simulator', +} From fe48b97800ffac51fb22cd64e161c94226111283 Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Wed, 17 Apr 2024 14:52:03 +0200 Subject: [PATCH 2/3] 16: Run black on all source. --- src/gunicorn.conf.py | 2 +- src/omotes_rest/apis/job.py | 7 ++-- src/omotes_rest/main.py | 21 +++++----- src/omotes_rest/postgres_interface.py | 55 +++++++++++++++------------ src/omotes_rest/rest_interface.py | 30 +++++++++------ src/omotes_rest/workflows.py | 15 ++++---- 6 files changed, 72 insertions(+), 58 deletions(-) diff --git a/src/gunicorn.conf.py b/src/gunicorn.conf.py index ddf73cc..b55a792 100644 --- a/src/gunicorn.conf.py +++ b/src/gunicorn.conf.py @@ -1,6 +1,6 @@ from omotes_rest import main -bind = '0.0.0.0:9200' +bind = "0.0.0.0:9200" workers = 1 loglevel = "info" timeout = 300 diff --git a/src/omotes_rest/apis/job.py b/src/omotes_rest/apis/job.py index f9f8900..99a117d 100644 --- a/src/omotes_rest/apis/job.py +++ b/src/omotes_rest/apis/job.py @@ -80,8 +80,9 @@ class JobCancelAPI(MethodView): def get(self, job_id: str) -> JobCancelResponse: """Cancel job if queued or running.""" job_uuid = uuid.UUID(job_id) - return JobCancelResponse(job_id=job_uuid, - cancelled=current_app.rest_if.cancel_job(job_uuid)) + return JobCancelResponse( + job_id=job_uuid, cancelled=current_app.rest_if.cancel_job(job_uuid) + ) @api.route("//status") @@ -98,7 +99,7 @@ def get(self, job_id: str) -> JobStatusResponse | Response: if status: result = JobStatusResponse(job_id=job_uuid, status=status) else: - result = Response(status=404, response=f'Unknown job {job_id}.') + result = Response(status=404, response=f"Unknown job {job_id}.") return result diff --git a/src/omotes_rest/main.py b/src/omotes_rest/main.py index 55fcd31..3ac7361 100644 --- a/src/omotes_rest/main.py +++ b/src/omotes_rest/main.py @@ -33,7 +33,8 @@ def before_request() -> None: logger.debug( f"Request, timestamp '{timestamp}', remote_addr '{request.remote_addr}'," f" method '{request.method}', scheme '{request.scheme}', full_path '{request.full_path}," - f" 'payload '{request.get_data()!r}', 'headers '{request.headers}'") + f" 'payload '{request.get_data()!r}', 'headers '{request.headers}'" + ) # return response @@ -44,7 +45,8 @@ def after_request(response: FlaskResponse) -> FlaskResponse: logger.debug( f"Request, timestamp '{timestamp}', remote_addr '{request.remote_addr}'," f" method '{request.method}', scheme '{request.scheme}', full_path '{request.full_path}," - f" 'response '{response.status}'") + f" 'response '{response.status}'" + ) return response @@ -66,19 +68,20 @@ def handle_exception(e: HTTPException) -> WerkzeugResponse: } ) response.content_type = "application/json" - return WerkzeugResponse(response=data, - status=response.status_code, - headers=response.headers, mimetype=response.mimetype, - content_type=response.content_type) + return WerkzeugResponse( + response=data, + status=response.status_code, + headers=response.headers, + mimetype=response.mimetype, + content_type=response.content_type, + ) @app.errorhandler(Exception) def handle_500(e: Exception) -> tuple[str, int]: """Handle exceptions.""" logger.exception(f"Unhandled exception occurred {str(e)}") - return json.dumps({ - "message": "Internal Server Error" - }), 500 + return json.dumps({"message": "Internal Server Error"}), 500 def post_fork(_: Arbiter, __: SyncWorker) -> None: diff --git a/src/omotes_rest/postgres_interface.py b/src/omotes_rest/postgres_interface.py index f79d6d4..319c462 100644 --- a/src/omotes_rest/postgres_interface.py +++ b/src/omotes_rest/postgres_interface.py @@ -69,8 +69,11 @@ def initialize_db(application_name: str, config: POSTGRESConfig) -> Engine: :param config: Configuration on how to connect to the SQL database. """ logger.info( - "Connecting to PostgresDB at %s:%s as user %s to db %s", config.host, config.port, - config.username, config.database + "Connecting to PostgresDB at %s:%s as user %s to db %s", + config.host, + config.port, + config.username, + config.database, ) try: @@ -128,10 +131,10 @@ def stop(self) -> None: self.engine.dispose() def put_new_job( - self, - job_id: uuid.UUID, - job_input: JobInput, - esdl_input: str, + self, + job_id: uuid.UUID, + job_input: JobInput, + esdl_input: str, ) -> None: """Insert a new job into the database. @@ -154,7 +157,7 @@ def put_new_job( user_name=job_input.user_name, project_name=job_input.project_name, input_params_dict=job_input.input_params_dict, - input_esdl=esdl_input + input_esdl=esdl_input, ) session.add(new_job) logger.debug("Job %s is submitted as new job in database", job_id) @@ -170,9 +173,7 @@ def set_job_registered(self, job_id: uuid.UUID) -> None: stmnt = ( update(JobRest) .where(JobRest.job_id == job_id) - .values( - status=JobRestStatus.REGISTERED, registered_at=datetime.now() - ) + .values(status=JobRestStatus.REGISTERED, registered_at=datetime.now()) ) session.execute(stmnt) @@ -187,9 +188,7 @@ def set_job_enqueued(self, job_id: uuid.UUID) -> None: stmnt = ( update(JobRest) .where(JobRest.job_id == job_id) - .values( - status=JobRestStatus.ENQUEUED, submitted_at=datetime.now() - ) + .values(status=JobRestStatus.ENQUEUED, submitted_at=datetime.now()) ) session.execute(stmnt) @@ -204,14 +203,17 @@ def set_job_running(self, job_id: uuid.UUID) -> None: stmnt = ( update(JobRest) .where(JobRest.job_id == job_id) - .values( - status=JobRestStatus.RUNNING, running_at=datetime.now() - ) + .values(status=JobRestStatus.RUNNING, running_at=datetime.now()) ) session.execute(stmnt) - def set_job_stopped(self, job_id: uuid.UUID, new_status: JobRestStatus, logs: str | None = None, - output_esdl: str | None = None) -> None: + def set_job_stopped( + self, + job_id: uuid.UUID, + new_status: JobRestStatus, + logs: str | None = None, + output_esdl: str | None = None, + ) -> None: """Set the job to stopped with supplied status. :param job_id: Job id. @@ -231,23 +233,26 @@ def set_job_stopped(self, job_id: uuid.UUID, new_status: JobRestStatus, logs: st ) session.execute(stmnt) - def set_job_progress(self, job_id: uuid.UUID, progress_fraction: float, - progress_message: str) -> None: + def set_job_progress( + self, job_id: uuid.UUID, progress_fraction: float, progress_message: str + ) -> None: """Set the status of the job to RUNNING. :param job_id: Job id. :param progress_fraction: new progress fraction. :param progress_message: new progress message. """ - logger.debug("For job '%s' received a new progress '%s' with message '%s'", - job_id, progress_fraction, progress_message) + logger.debug( + "For job '%s' received a new progress '%s' with message '%s'", + job_id, + progress_fraction, + progress_message, + ) with session_scope() as session: stmnt = ( update(JobRest) .where(JobRest.job_id == job_id) - .values( - progress_fraction=progress_fraction, progress_message=progress_message - ) + .values(progress_fraction=progress_fraction, progress_message=progress_message) ) session.execute(stmnt) diff --git a/src/omotes_rest/rest_interface.py b/src/omotes_rest/rest_interface.py index 48b53af..954e98f 100644 --- a/src/omotes_rest/rest_interface.py +++ b/src/omotes_rest/rest_interface.py @@ -68,10 +68,7 @@ def handle_on_job_finished(self, job: Job, result: JobResult) -> None: raise NotImplementedError(f"Unknown result type '{result.result_type}'") self.postgres_if.set_job_stopped( - job_id=job.id, - new_status=final_status, - logs=result.logs, - output_esdl=result.output_esdl + job_id=job.id, new_status=final_status, logs=result.logs, output_esdl=result.output_esdl ) def handle_on_job_status_update(self, job: Job, status_update: JobStatusUpdate) -> None: @@ -117,18 +114,24 @@ def submit_job(self, job_input: JobInput) -> JobStatusResponse: :param job_input: JobInput dataclass with job input. :return: JobStatusResponse. """ - esdlstr_bytes = job_input.input_esdl.encode('utf-8') + workflow_type = self.workflow_type_manager.get_workflow_by_name( + FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_input.workflow_type] + ) + if not workflow_type: + raise RuntimeError(f"Unknown workflow type {job_input.workflow_type}") + + esdlstr_bytes = job_input.input_esdl.encode("utf-8") esdlstr_base64_bytes = base64.b64decode(esdlstr_bytes) - esdl_str = esdlstr_base64_bytes.decode('utf-8') + esdl_str = esdlstr_base64_bytes.decode("utf-8") job = self.omotes_if.submit_job( esdl=esdl_str, params_dict=job_input.input_params_dict, - workflow_type=self.workflow_type_manager.get_workflow_by_name(FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_input.workflow_type]), + workflow_type=workflow_type, job_timeout=timedelta(seconds=job_input.timeout_after_s), callback_on_finished=self.handle_on_job_finished, callback_on_progress_update=self.handle_on_job_progress_update, callback_on_status_update=self.handle_on_job_status_update, - auto_disconnect_on_result=True + auto_disconnect_on_result=True, ) self.postgres_if.put_new_job( job_id=job.id, @@ -160,11 +163,14 @@ def cancel_job(self, job_id: uuid.UUID) -> bool: """ job_in_db = self.get_job(job_id) + workflow_type = self.workflow_type_manager.get_workflow_by_name( + FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_in_db.workflow_type] + ) + if not workflow_type: + raise RuntimeError(f"Unknown workflow type {job_in_db.workflow_type}") + if job_in_db: - job = Job( - id=job_id, - workflow_type=self.workflow_type_manager.get_workflow_by_name(FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_in_db.workflow_type]) - ) + job = Job(id=job_id, workflow_type=workflow_type) self.omotes_if.cancel_job(job) result = True else: diff --git a/src/omotes_rest/workflows.py b/src/omotes_rest/workflows.py index bca111c..a47176f 100644 --- a/src/omotes_rest/workflows.py +++ b/src/omotes_rest/workflows.py @@ -5,11 +5,10 @@ possible_workflows=[ WorkflowType( workflow_type_name="grow_optimizer_default", - workflow_type_description_name="Grow Optimizer" + workflow_type_description_name="Grow Optimizer", ), WorkflowType( - workflow_type_name="grow_simulator", - workflow_type_description_name="Grow Simulator" + workflow_type_name="grow_simulator", workflow_type_description_name="Grow Simulator" ), WorkflowType( workflow_type_name="grow_optimizer_no_heat_losses", @@ -27,9 +26,9 @@ ) FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME = { - 'Draft Design - Quickscan Validation': 'grow_optimizer_no_heat_losses', - 'Draft Design - Optimization': 'grow_optimizer_default', - 'Draft Design - Optimization with Pressure Drops': 'grow_optimizer_with_pressure', - 'Draft Design - Simulation with Source Merit Order': 'grow_simulator', - 'Conceptual Design - Simulation': 'simulator', + "Draft Design - Quickscan Validation": "grow_optimizer_no_heat_losses", + "Draft Design - Optimization": "grow_optimizer_default", + "Draft Design - Optimization with Pressure Drops": "grow_optimizer_with_pressure", + "Draft Design - Simulation with Source Merit Order": "grow_simulator", + "Conceptual Design - Simulation": "simulator", } From fc5c44c915ebc368ddfeb2ab6ca5c99810c61c5c Mon Sep 17 00:00:00 2001 From: Sebastiaan la Fleur Date: Wed, 17 Apr 2024 14:54:32 +0200 Subject: [PATCH 3/3] 16: Fix typing issue. --- src/omotes_rest/rest_interface.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/omotes_rest/rest_interface.py b/src/omotes_rest/rest_interface.py index 954e98f..0c3bc59 100644 --- a/src/omotes_rest/rest_interface.py +++ b/src/omotes_rest/rest_interface.py @@ -163,13 +163,13 @@ def cancel_job(self, job_id: uuid.UUID) -> bool: """ job_in_db = self.get_job(job_id) - workflow_type = self.workflow_type_manager.get_workflow_by_name( - FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_in_db.workflow_type] - ) - if not workflow_type: - raise RuntimeError(f"Unknown workflow type {job_in_db.workflow_type}") - if job_in_db: + workflow_type = self.workflow_type_manager.get_workflow_by_name( + FRONTEND_NAME_TO_OMOTES_WORKFLOW_NAME[job_in_db.workflow_type] + ) + if not workflow_type: + raise RuntimeError(f"Unknown workflow type {job_in_db.workflow_type}") + job = Job(id=job_id, workflow_type=workflow_type) self.omotes_if.cancel_job(job) result = True