diff --git a/services/api-server/openapi.json b/services/api-server/openapi.json index 87a31578289..84ba7fb362f 100644 --- a/services/api-server/openapi.json +++ b/services/api-server/openapi.json @@ -3069,11 +3069,20 @@ ], "responses": { "200": { - "description": "Successful Response", + "description": "Returns a JobLog or an ErrorGet", "content": { "application/x-ndjson": { "schema": { - "type": "string" + "anyOf": [ + { + "$ref": "#/components/schemas/JobLog" + }, + { + "$ref": "#/components/schemas/ErrorGet" + } + ], + "type": "string", + "title": "Response 200 Get Log Stream V0 Solvers Solver Key Releases Version Jobs Job Id Logstream Get" } } } @@ -3081,6 +3090,7 @@ "409": { "description": "Conflict: Logs are already being streamed", "content": { + "application/json": {}, "application/x-ndjson": { "schema": { "$ref": "#/components/schemas/ErrorGet" @@ -4371,6 +4381,46 @@ } } }, + "JobLog": { + "properties": { + "job_id": { + "type": "string", + "format": "uuid", + "title": "Job Id" + }, + "node_id": { + "type": "string", + "format": "uuid", + "title": "Node Id" + }, + "log_level": { + "type": "integer", + "title": "Log Level" + }, + "messages": { + "items": { + "type": "string" + }, + "type": "array", + "title": "Messages" + } + }, + "type": "object", + "required": [ + "job_id", + "log_level", + "messages" + ], + "title": "JobLog", + "example": { + "job_id": "145beae4-a3a8-4fde-adbb-4e8257c2c083", + "node_id": "3742215e-6756-48d2-8b73-4d043065309f", + "log_level": 10, + "messages": [ + "PROGRESS: 5/10" + ] + } + }, "JobLogsMap": { "properties": { "log_links": { diff --git a/services/api-server/src/simcore_service_api_server/_constants.py b/services/api-server/src/simcore_service_api_server/_constants.py index 5f8975fc285..7bfbfd43907 100644 --- a/services/api-server/src/simcore_service_api_server/_constants.py +++ b/services/api-server/src/simcore_service_api_server/_constants.py @@ -3,3 +3,7 @@ MSG_BACKEND_SERVICE_UNAVAILABLE: Final[ str ] = "backend service is disabled or unreachable" + +MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE: Final[ + str +] = "Oops! Something went wrong, but we've noted it down and we'll sort it out ASAP. Thanks for your patience!" diff --git a/services/api-server/src/simcore_service_api_server/api/routes/files.py b/services/api-server/src/simcore_service_api_server/api/routes/files.py index eccdda0cba9..7ee8ed1c65d 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/files.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/files.py @@ -106,7 +106,9 @@ async def list_files( SEE get_files_page for a paginated version of this function """ - stored_files: list[StorageFileMetaData] = await storage_client.list_files(user_id) + stored_files: list[StorageFileMetaData] = await storage_client.list_files( + user_id=user_id + ) # Adapts storage API model to API model all_files: list[File] = [] @@ -355,7 +357,7 @@ async def abort_multipart_upload( e_tag=None, ) abort_link: URL = await storage_client.create_abort_upload_link( - file, query={"user_id": str(user_id)} + file=file, query={"user_id": str(user_id)} ) await abort_upload(abort_upload_link=parse_obj_as(AnyUrl, str(abort_link))) @@ -384,7 +386,7 @@ async def complete_multipart_upload( e_tag=None, ) complete_link: URL = await storage_client.create_complete_upload_link( - file, {"user_id": str(user_id)} + file=file, query={"user_id": str(user_id)} ) e_tag: ETag = await complete_file_upload( diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers.py index 219184352ca..6b5c9716912 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers.py @@ -139,7 +139,7 @@ async def get_solver( # otherwise, {solver_key:path} will override and consume any of the paths that follow. try: solver = await catalog_client.get_latest_release( - user_id, solver_key, product_name=product_name + user_id=user_id, solver_key=solver_key, product_name=product_name ) solver.url = url_for( "get_solver_release", solver_key=solver.id, version=solver.version @@ -277,4 +277,6 @@ async def get_solver_pricing_plan( ): assert user_id assert product_name - return await webserver_api.get_service_pricing_plan(solver_key, version) + return await webserver_api.get_service_pricing_plan( + solver_key=solver_key, version=version + ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py index baab5a40fef..4479efc2129 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py @@ -262,7 +262,7 @@ async def inspect_job( job_name = _compose_job_resource_name(solver_key, version, job_id) _logger.debug("Inspecting Job '%s'", job_name) - task = await director2_api.get_computation(job_id, user_id) + task = await director2_api.get_computation(project_id=job_id, user_id=user_id) job_status: JobStatus = create_jobstatus_from_task(task) return job_status diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py index 73da939cf83..10be3a91e95 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py @@ -3,7 +3,7 @@ import logging from collections import deque from collections.abc import Callable -from typing import Annotated, Any +from typing import Annotated, Any, Union from uuid import UUID from fastapi import APIRouter, Depends, Request, status @@ -20,7 +20,6 @@ from pydantic.types import PositiveInt from servicelib.fastapi.requests_decorators import cancel_on_disconnect from servicelib.logging_utils import log_context -from starlette.background import BackgroundTask from ...exceptions.custom_errors import InsufficientCreditsError, MissingWalletError from ...exceptions.service_errors_utils import DEFAULT_BACKEND_SERVICE_STATUS_CODES @@ -28,7 +27,14 @@ from ...models.pagination import Page, PaginationParams from ...models.schemas.errors import ErrorGet from ...models.schemas.files import File -from ...models.schemas.jobs import ArgumentTypes, Job, JobID, JobMetadata, JobOutputs +from ...models.schemas.jobs import ( + ArgumentTypes, + Job, + JobID, + JobLog, + JobMetadata, + JobOutputs, +) from ...models.schemas.solvers import SolverKeyId from ...services.catalog import CatalogApi from ...services.director_v2 import DirectorV2Api @@ -89,10 +95,14 @@ } | DEFAULT_BACKEND_SERVICE_STATUS_CODES _LOGSTREAM_STATUS_CODES: dict[int | str, dict[str, Any]] = { + status.HTTP_200_OK: { + "description": "Returns a JobLog or an ErrorGet", + "model": Union[JobLog, ErrorGet], + }, status.HTTP_409_CONFLICT: { "description": "Conflict: Logs are already being streamed", "model": ErrorGet, - } + }, } | DEFAULT_BACKEND_SERVICE_STATUS_CODES router = APIRouter() @@ -126,7 +136,7 @@ async def list_jobs( _logger.debug("Listing Jobs in Solver '%s'", solver.name) projects_page = await webserver_api.get_projects_w_solver_page( - solver.name, limit=20, offset=0 + solver_name=solver.name, limit=20, offset=0 ) jobs: deque[Job] = deque() @@ -170,7 +180,7 @@ async def get_jobs_page( _logger.debug("Listing Jobs in Solver '%s'", solver.name) projects_page = await webserver_api.get_projects_w_solver_page( - solver.name, limit=page_params.limit, offset=page_params.offset + solver_name=solver.name, limit=page_params.limit, offset=page_params.offset ) jobs: list[Job] = [ @@ -262,7 +272,7 @@ async def get_job_outputs( results[name] = to_file_api_model(found[0]) else: api_file: File = await storage_client.create_soft_link( - user_id, value.path, file_id + user_id=user_id, target_s3_path=value.path, as_file_id=file_id ) results[name] = api_file else: @@ -440,8 +450,6 @@ async def get_log_stream( log_distributor=log_distributor, log_check_timeout=log_check_timeout, ) - await log_streamer.setup() return LogStreamingResponse( log_streamer.log_generator(), - background=BackgroundTask(log_streamer.teardown), ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py b/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py index 8b735009cf5..f078b093413 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/studies_jobs.py @@ -143,11 +143,15 @@ async def create_study_job( node_id = file_param_nodes[node_label] await webserver_api.update_node_outputs( - project.uuid, node_id, NodeOutputs(outputs={"outFile": file_link}) + project_id=project.uuid, + node_id=node_id, + new_node_outputs=NodeOutputs(outputs={"outFile": file_link}), ) if len(new_project_inputs) > 0: - await webserver_api.update_project_inputs(project.uuid, new_project_inputs) + await webserver_api.update_project_inputs( + project_id=project.uuid, new_inputs=new_project_inputs + ) assert job.name == _compose_job_resource_name(study_id, job.id) @@ -275,7 +279,7 @@ async def inspect_study_job( job_name = _compose_job_resource_name(study_id, job_id) _logger.debug("Inspecting Job '%s'", job_name) - task = await director2_api.get_computation(job_id, user_id) + task = await director2_api.get_computation(project_id=job_id, user_id=user_id) job_status: JobStatus = create_jobstatus_from_task(task) return job_status @@ -294,7 +298,7 @@ async def get_study_job_outputs( job_name = _compose_job_resource_name(study_id, job_id) _logger.debug("Getting Job Outputs for '%s'", job_name) - project_outputs = await webserver_api.get_project_outputs(job_id) + project_outputs = await webserver_api.get_project_outputs(project_id=job_id) job_outputs: JobOutputs = await create_job_outputs_from_project_outputs( job_id, project_outputs, user_id, storage_client ) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/users.py b/services/api-server/src/simcore_service_api_server/api/routes/users.py index 67d907b2492..7e298413775 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/users.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/users.py @@ -37,5 +37,5 @@ async def update_my_profile( AuthSession, Security(get_webserver_session, scopes=["write"]) ], ) -> Profile: - profile: Profile = await webserver_session.update_me(profile_update) + profile: Profile = await webserver_session.update_me(profile_update=profile_update) return profile diff --git a/services/api-server/src/simcore_service_api_server/core/application.py b/services/api-server/src/simcore_service_api_server/core/application.py index da20e874169..1adbb9043cf 100644 --- a/services/api-server/src/simcore_service_api_server/core/application.py +++ b/services/api-server/src/simcore_service_api_server/core/application.py @@ -59,7 +59,7 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI: version=version, openapi_url=f"/api/{API_VTAG}/openapi.json", docs_url="/dev/doc", - redoc_url=None, # default disabled, see below + redoc_url="/doc", ) override_openapi_method(app) add_pagination(app) diff --git a/services/api-server/src/simcore_service_api_server/exceptions/handlers/__init__.py b/services/api-server/src/simcore_service_api_server/exceptions/handlers/__init__.py index 17b2bb54644..d2779338980 100644 --- a/services/api-server/src/simcore_service_api_server/exceptions/handlers/__init__.py +++ b/services/api-server/src/simcore_service_api_server/exceptions/handlers/__init__.py @@ -5,6 +5,7 @@ from starlette import status from starlette.exceptions import HTTPException +from ..._constants import MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE from ..custom_errors import CustomBaseError from ..log_streaming_errors import LogStreamingBaseError from ._custom_errors import custom_error_handler @@ -15,8 +16,6 @@ from ._log_streaming_errors import log_handling_error_handler from ._validation_errors import http422_error_handler -MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE = "Oops! Something went wrong, but we've noted it down and we'll sort it out ASAP. Thanks for your patience!" - def setup(app: FastAPI, *, is_debug: bool = False): app.add_exception_handler(HTTPException, http_exception_handler) diff --git a/services/api-server/src/simcore_service_api_server/services/catalog.py b/services/api-server/src/simcore_service_api_server/services/catalog.py index 3f3e0111eec..56a7d648790 100644 --- a/services/api-server/src/simcore_service_api_server/services/catalog.py +++ b/services/api-server/src/simcore_service_api_server/services/catalog.py @@ -196,7 +196,7 @@ def _this_solver(solver: Solver) -> bool: return releases async def get_latest_release( - self, user_id: int, solver_key: SolverKeyId, *, product_name: str + self, *, user_id: int, solver_key: SolverKeyId, product_name: str ) -> Solver: releases = await self.list_solver_releases( user_id=user_id, solver_key=solver_key, product_name=product_name diff --git a/services/api-server/src/simcore_service_api_server/services/director_v2.py b/services/api-server/src/simcore_service_api_server/services/director_v2.py index 4628e617099..d0f7869c15f 100644 --- a/services/api-server/src/simcore_service_api_server/services/director_v2.py +++ b/services/api-server/src/simcore_service_api_server/services/director_v2.py @@ -70,6 +70,7 @@ class DirectorV2Api(BaseServiceClientApi): @_exception_mapper({}) async def create_computation( self, + *, project_id: UUID, user_id: PositiveInt, product_name: str, @@ -90,6 +91,7 @@ async def create_computation( @_exception_mapper({}) async def start_computation( self, + *, project_id: UUID, user_id: PositiveInt, product_name: str, @@ -124,7 +126,7 @@ async def start_computation( @_exception_mapper({status.HTTP_404_NOT_FOUND: JobNotFoundError}) async def get_computation( - self, project_id: UUID, user_id: PositiveInt + self, *, project_id: UUID, user_id: PositiveInt ) -> ComputationTaskGet: response = await self.client.get( f"/v2/computations/{project_id}", @@ -138,7 +140,7 @@ async def get_computation( @_exception_mapper({status.HTTP_404_NOT_FOUND: JobNotFoundError}) async def stop_computation( - self, project_id: UUID, user_id: PositiveInt + self, *, project_id: UUID, user_id: PositiveInt ) -> ComputationTaskGet: response = await self.client.post( f"/v2/computations/{project_id}:stop", @@ -151,7 +153,7 @@ async def stop_computation( return task @_exception_mapper({status.HTTP_404_NOT_FOUND: JobNotFoundError}) - async def delete_computation(self, project_id: UUID, user_id: PositiveInt): + async def delete_computation(self, *, project_id: UUID, user_id: PositiveInt): response = await self.client.request( "DELETE", f"/v2/computations/{project_id}", @@ -164,7 +166,7 @@ async def delete_computation(self, project_id: UUID, user_id: PositiveInt): @_exception_mapper({status.HTTP_404_NOT_FOUND: LogFileNotFoundError}) async def get_computation_logs( - self, user_id: PositiveInt, project_id: UUID + self, *, user_id: PositiveInt, project_id: UUID ) -> JobLogsMap: response = await self.client.get( f"/v2/computations/{project_id}/tasks/-/logfile", diff --git a/services/api-server/src/simcore_service_api_server/services/jobs.py b/services/api-server/src/simcore_service_api_server/services/jobs.py index e1e1b63a88f..7bc46d5ed1e 100644 --- a/services/api-server/src/simcore_service_api_server/services/jobs.py +++ b/services/api-server/src/simcore_service_api_server/services/jobs.py @@ -65,9 +65,9 @@ async def stop_project( user_id: Annotated[PositiveInt, Depends(get_current_user_id)], director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], ) -> JobStatus: - await director2_api.stop_computation(job_id, user_id) + await director2_api.stop_computation(project_id=job_id, user_id=user_id) - task = await director2_api.get_computation(job_id, user_id) + task = await director2_api.get_computation(project_id=job_id, user_id=user_id) job_status: JobStatus = create_jobstatus_from_task(task) return job_status diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 7ca6b1b7c68..faefc5c905b 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -7,9 +7,13 @@ from models_library.rabbitmq_messages import LoggerRabbitMessage from models_library.users import UserID from pydantic import NonNegativeInt +from servicelib.error_codes import create_error_code from servicelib.logging_utils import log_catch from servicelib.rabbitmq import RabbitMQClient +from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError +from simcore_service_api_server.models.schemas.errors import ErrorGet +from .._constants import MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE from ..exceptions.log_streaming_errors import ( LogStreamerNotRegisteredError, LogStreamerRegistionConflictError, @@ -100,38 +104,41 @@ def __init__( self._queue: Queue[JobLog] = Queue() self._job_id: JobID = job_id self._log_distributor: LogDistributor = log_distributor - self._is_registered: bool = False self._log_check_timeout: NonNegativeInt = log_check_timeout - async def setup(self): - await self._log_distributor.register(self._job_id, self._queue) - self._is_registered = True - - async def teardown(self): - await self._log_distributor.deregister(self._job_id) - self._is_registered = False - - async def __aenter__(self): - await self.setup() - return self - - async def __aexit__(self, exc_type, exc, tb): - await self.teardown() - async def _project_done(self) -> bool: - task = await self._director2_api.get_computation(self._job_id, self._user_id) + task = await self._director2_api.get_computation( + project_id=self._job_id, user_id=self._user_id + ) return task.stopped is not None async def log_generator(self) -> AsyncIterable[str]: - if not self._is_registered: - msg = f"LogStreamer for job_id={self._job_id} is not correctly registered" - raise LogStreamerNotRegisteredError(msg=msg) - done: bool = False - while not done: - try: - log: JobLog = await asyncio.wait_for( - self._queue.get(), timeout=self._log_check_timeout - ) - yield log.json() + _NEW_LINE - except asyncio.TimeoutError: - done = await self._project_done() + try: + await self._log_distributor.register(self._job_id, self._queue) + done: bool = False + while not done: + try: + log: JobLog = await asyncio.wait_for( + self._queue.get(), timeout=self._log_check_timeout + ) + yield log.json() + _NEW_LINE + except asyncio.TimeoutError: + done = await self._project_done() + except BaseBackEndError as exc: + _logger.info("%s", f"{exc}") + yield ErrorGet(errors=[f"{exc}"]).json() + _NEW_LINE + except Exception as exc: # pylint: disable=W0718 + error_code = create_error_code(exc) + _logger.exception( + "Unexpected %s: %s", + exc.__class__.__name__, + f"{exc}", + extra={"error_code": error_code}, + ) + yield ErrorGet( + errors=[ + MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE + f" (OEC: {error_code})" + ] + ).json() + _NEW_LINE + finally: + await self._log_distributor.deregister(self._job_id) diff --git a/services/api-server/src/simcore_service_api_server/services/storage.py b/services/api-server/src/simcore_service_api_server/services/storage.py index 93d593625ed..13920d8a931 100644 --- a/services/api-server/src/simcore_service_api_server/services/storage.py +++ b/services/api-server/src/simcore_service_api_server/services/storage.py @@ -56,6 +56,7 @@ class StorageApi(BaseServiceClientApi): @_exception_mapper({}) async def list_files( self, + *, user_id: int, ) -> list[StorageFileMetaData]: """Lists metadata of all s3 objects name as api/* from a given user""" @@ -115,7 +116,7 @@ async def search_owned_files( @_exception_mapper({}) async def get_download_link( - self, user_id: int, file_id: UUID, file_name: str + self, *, user_id: int, file_id: UUID, file_name: str ) -> AnyUrl: object_path = urllib.parse.quote_plus(f"api/{file_id}/{file_name}") @@ -133,7 +134,7 @@ async def get_download_link( return link @_exception_mapper({}) - async def delete_file(self, user_id: int, quoted_storage_file_id: str) -> None: + async def delete_file(self, *, user_id: int, quoted_storage_file_id: str) -> None: response = await self.client.delete( f"/locations/{self.SIMCORE_S3_ID}/files/{quoted_storage_file_id}", params={"user_id": user_id}, @@ -142,7 +143,7 @@ async def delete_file(self, user_id: int, quoted_storage_file_id: str) -> None: @_exception_mapper({}) async def get_upload_links( - self, user_id: int, file_id: UUID, file_name: str + self, *, user_id: int, file_id: UUID, file_name: str ) -> FileUploadSchema: object_path = urllib.parse.quote_plus(f"api/{file_id}/{file_name}") @@ -158,7 +159,7 @@ async def get_upload_links( return enveloped_data.data async def create_complete_upload_link( - self, file: File, query: dict[str, str] | None = None + self, *, file: File, query: dict[str, str] | None = None ) -> URL: url = URL( f"{self.client.base_url}locations/{self.SIMCORE_S3_ID}/files/{file.quoted_storage_file_id}:complete" @@ -168,7 +169,7 @@ async def create_complete_upload_link( return url async def create_abort_upload_link( - self, file: File, query: dict[str, str] | None = None + self, *, file: File, query: dict[str, str] | None = None ) -> URL: url = URL( f"{self.client.base_url}locations/{self.SIMCORE_S3_ID}/files/{file.quoted_storage_file_id}:abort" @@ -179,7 +180,7 @@ async def create_abort_upload_link( @_exception_mapper({}) async def create_soft_link( - self, user_id: int, target_s3_path: str, as_file_id: UUID + self, *, user_id: int, target_s3_path: str, as_file_id: UUID ) -> File: assert len(target_s3_path.split("/")) == 3 # nosec diff --git a/services/api-server/src/simcore_service_api_server/services/webserver.py b/services/api-server/src/simcore_service_api_server/services/webserver.py index 2626a22d740..61595d81130 100644 --- a/services/api-server/src/simcore_service_api_server/services/webserver.py +++ b/services/api-server/src/simcore_service_api_server/services/webserver.py @@ -244,7 +244,7 @@ async def get_me(self) -> Profile: return profile @_exception_mapper(_PROFILE_STATUS_MAP) - async def update_me(self, profile_update: ProfileUpdate) -> Profile: + async def update_me(self, *, profile_update: ProfileUpdate) -> Profile: response = await self.client.put( "/me", json=profile_update.dict(exclude_none=True), @@ -307,7 +307,7 @@ async def clone_project( return ProjectGet.parse_obj(result) @_exception_mapper(_JOB_STATUS_MAP) - async def get_project(self, project_id: UUID) -> ProjectGet: + async def get_project(self, *, project_id: UUID) -> ProjectGet: response = await self.client.get( f"/projects/{project_id}", cookies=self.session_cookies, @@ -318,7 +318,7 @@ async def get_project(self, project_id: UUID) -> ProjectGet: return data async def get_projects_w_solver_page( - self, solver_name: str, limit: int, offset: int + self, *, solver_name: str, limit: int, offset: int ) -> Page[ProjectGet]: return await self._page_projects( limit=limit, @@ -329,7 +329,7 @@ async def get_projects_w_solver_page( search=urllib.parse.quote(solver_name, safe=""), ) - async def get_projects_page(self, limit: int, offset: int): + async def get_projects_page(self, *, limit: int, offset: int): return await self._page_projects( limit=limit, offset=offset, @@ -337,7 +337,7 @@ async def get_projects_page(self, limit: int, offset: int): ) @_exception_mapper(_JOB_STATUS_MAP) - async def delete_project(self, project_id: ProjectID) -> None: + async def delete_project(self, *, project_id: ProjectID) -> None: response = await self.client.delete( f"/projects/{project_id}", cookies=self.session_cookies, @@ -346,7 +346,7 @@ async def delete_project(self, project_id: ProjectID) -> None: @_exception_mapper({status.HTTP_404_NOT_FOUND: ProjectPortsNotFoundError}) async def get_project_metadata_ports( - self, project_id: ProjectID + self, *, project_id: ProjectID ) -> list[StudyPort]: """ maps GET "/projects/{study_id}/metadata/ports", unenvelopes @@ -363,7 +363,9 @@ async def get_project_metadata_ports( return data @_exception_mapper({status.HTTP_404_NOT_FOUND: ProjectMetadataNotFoundError}) - async def get_project_metadata(self, project_id: ProjectID) -> ProjectMetadataGet: + async def get_project_metadata( + self, *, project_id: ProjectID + ) -> ProjectMetadataGet: response = await self.client.get( f"/projects/{project_id}/metadata", cookies=self.session_cookies, @@ -384,7 +386,7 @@ async def patch_project(self, *, project_id: UUID, patch_params: ProjectPatch): @_exception_mapper({status.HTTP_404_NOT_FOUND: ProjectMetadataNotFoundError}) async def update_project_metadata( - self, project_id: ProjectID, metadata: dict[str, MetaValueType] + self, *, project_id: ProjectID, metadata: dict[str, MetaValueType] ) -> ProjectMetadataGet: response = await self.client.patch( f"/projects/{project_id}/metadata", @@ -398,7 +400,7 @@ async def update_project_metadata( @_exception_mapper({status.HTTP_404_NOT_FOUND: PricingUnitNotFoundError}) async def get_project_node_pricing_unit( - self, project_id: UUID, node_id: UUID + self, *, project_id: UUID, node_id: UUID ) -> PricingUnitGet | None: response = await self.client.get( f"/projects/{project_id}/nodes/{node_id}/pricing-unit", @@ -413,6 +415,7 @@ async def get_project_node_pricing_unit( @_exception_mapper({status.HTTP_404_NOT_FOUND: PricingUnitNotFoundError}) async def connect_pricing_unit_to_project_node( self, + *, project_id: UUID, node_id: UUID, pricing_plan: PositiveInt, @@ -433,7 +436,7 @@ async def connect_pricing_unit_to_project_node( } ) async def start_project( - self, project_id: UUID, cluster_id: ClusterID | None = None + self, *, project_id: UUID, cluster_id: ClusterID | None = None ) -> None: body_input: dict[str, Any] = {} if cluster_id: @@ -449,6 +452,7 @@ async def start_project( @_exception_mapper({}) async def update_project_inputs( self, + *, project_id: ProjectID, new_inputs: list[ProjectInputUpdate], ) -> dict[NodeID, ProjectInputGet]: @@ -466,7 +470,7 @@ async def update_project_inputs( @_exception_mapper({}) async def get_project_inputs( - self, project_id: ProjectID + self, *, project_id: ProjectID ) -> dict[NodeID, ProjectInputGet]: response = await self.client.get( f"/projects/{project_id}/inputs", @@ -483,7 +487,7 @@ async def get_project_inputs( @_exception_mapper({status.HTTP_404_NOT_FOUND: SolverOutputNotFoundError}) async def get_project_outputs( - self, project_id: ProjectID + self, *, project_id: ProjectID ) -> dict[NodeID, dict[str, Any]]: response = await self.client.get( f"/projects/{project_id}/outputs", @@ -500,7 +504,7 @@ async def get_project_outputs( @_exception_mapper({}) async def update_node_outputs( - self, project_id: UUID, node_id: UUID, new_node_outputs: NodeOutputs + self, *, project_id: UUID, node_id: UUID, new_node_outputs: NodeOutputs ) -> None: response = await self.client.patch( f"/projects/{project_id}/nodes/{node_id}/outputs", @@ -523,7 +527,7 @@ async def get_default_wallet(self) -> WalletGetWithAvailableCredits: return data @_exception_mapper(_WALLET_STATUS_MAP) - async def get_wallet(self, wallet_id: int) -> WalletGetWithAvailableCredits: + async def get_wallet(self, *, wallet_id: int) -> WalletGetWithAvailableCredits: response = await self.client.get( f"/wallets/{wallet_id}", cookies=self.session_cookies, @@ -534,7 +538,7 @@ async def get_wallet(self, wallet_id: int) -> WalletGetWithAvailableCredits: return data @_exception_mapper(_WALLET_STATUS_MAP) - async def get_project_wallet(self, project_id: ProjectID) -> WalletGet | None: + async def get_project_wallet(self, *, project_id: ProjectID) -> WalletGet | None: response = await self.client.get( f"/projects/{project_id}/wallet", cookies=self.session_cookies, @@ -561,7 +565,7 @@ async def get_product_price(self) -> GetCreditPrice: @_exception_mapper({status.HTTP_404_NOT_FOUND: PricingPlanNotFoundError}) async def get_service_pricing_plan( - self, solver_key: SolverKeyId, version: VersionStr + self, *, solver_key: SolverKeyId, version: VersionStr ) -> ServicePricingPlanGet | None: service_key = urllib.parse.quote_plus(solver_key) diff --git a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py index 6a9a066948c..f7624cb1df9 100644 --- a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py +++ b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py @@ -7,21 +7,23 @@ import asyncio import logging -from collections.abc import Awaitable, Callable, Iterable +from collections.abc import Iterable from pprint import pprint -from typing import Final +from typing import AsyncIterable, Final import httpx import pytest from attr import dataclass from faker import Faker -from fastapi import FastAPI +from fastapi import FastAPI, status from models_library.api_schemas_webserver.projects import ProjectGet +from pydantic import ValidationError from pytest_mock import MockFixture from pytest_simcore.simcore_webserver_projects_rest_api import GET_PROJECT from respx import MockRouter from simcore_service_api_server._meta import API_VTAG from simcore_service_api_server.api.dependencies.rabbitmq import get_log_distributor +from simcore_service_api_server.models.schemas.errors import ErrorGet from simcore_service_api_server.models.schemas.jobs import JobID, JobLog _logger = logging.getLogger(__name__) @@ -38,9 +40,7 @@ class FakeLogDistributor: _produced_logs: list[str] = [] deregister_is_called: bool = False - async def register( - self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]] - ): + async def register(self, job_id: JobID, callback: asyncio.Queue[JobLog]): self._job_id = job_id async def produce_log(): @@ -53,7 +53,7 @@ async def produce_log(): log_level=logging.INFO, messages=[txt], ) - await callback(msg) + await callback.put(msg) await asyncio.sleep(0.1) asyncio.create_task(produce_log()) @@ -123,3 +123,49 @@ async def test_log_streaming( collected_messages == fake_log_distributor._produced_logs[: len(collected_messages)] ) + + +@pytest.fixture +async def mock_job_not_found( + mocked_directorv2_service_api_base: MockRouter, +) -> AsyncIterable[MockRouter]: + def _get_computation(request: httpx.Request, **kwargs) -> httpx.Response: + return httpx.Response(status_code=status.HTTP_404_NOT_FOUND) + + mocked_directorv2_service_api_base.get( + path__regex=r"/v2/computations/(?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" + ).mock(side_effect=_get_computation) + yield mocked_directorv2_service_api_base + + +async def test_logstreaming_job_not_found_exception( + app: FastAPI, + auth: httpx.BasicAuth, + client: httpx.AsyncClient, + solver_key: str, + solver_version: str, + fake_log_distributor, + fake_project_for_streaming: ProjectGet, + mock_job_not_found: MockRouter, +): + + job_id: JobID = fake_project_for_streaming.uuid + _received_error = False + + async with client.stream( + "GET", + f"/{API_VTAG}/solvers/{solver_key}/releases/{solver_version}/jobs/{job_id}/logstream", + auth=auth, + ) as response: + response.raise_for_status() + async for line in response.aiter_lines(): + try: + job_log = JobLog.parse_raw(line) + pprint(job_log.json()) + except ValidationError: + error = ErrorGet.parse_raw(line) + _received_error = True + print(error.json()) + + assert fake_log_distributor.deregister_is_called + assert _received_error diff --git a/services/api-server/tests/unit/conftest.py b/services/api-server/tests/unit/conftest.py index d4ed6f04229..e8324bcc0b7 100644 --- a/services/api-server/tests/unit/conftest.py +++ b/services/api-server/tests/unit/conftest.py @@ -66,6 +66,7 @@ def app_environment( "SC_BOOT_MODE": "production", "API_SERVER_HEALTH_CHECK_TASK_PERIOD_SECONDS": "3", "API_SERVER_HEALTH_CHECK_TASK_TIMEOUT_SECONDS": "1", + "API_SERVER_LOG_CHECK_TIMEOUT_SECONDS": "1", **backend_env_vars_overrides, }, ) diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index 6f67b3386bd..ee68615c8f1 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -13,6 +13,7 @@ from datetime import datetime, timedelta from typing import Final, Literal from unittest.mock import AsyncMock +from uuid import UUID import httpx import pytest @@ -44,7 +45,6 @@ from simcore_service_api_server.services.log_streaming import ( LogDistributor, LogStreamer, - LogStreamerNotRegisteredError, LogStreamerRegistionConflictError, ) from tenacity import AsyncRetrying, retry_if_not_exception_type, stop_after_delay @@ -347,14 +347,13 @@ def _get_computation(request: httpx.Request, **kwargs) -> httpx.Response: ) assert isinstance(d2_client := DirectorV2Api.get_instance(app), DirectorV2Api) - async with LogStreamer( + yield LogStreamer( user_id=user_id, director2_api=d2_client, job_id=project_id, log_distributor=log_distributor, log_check_timeout=1, - ) as log_streamer: - yield log_streamer + ) assert len(log_distributor._log_streamers.keys()) == 0 @@ -432,13 +431,20 @@ def routing_key(self) -> str: assert ii == 0 +class _MockLogDistributor: + async def register(self, job_id: UUID, queue: asyncio.Queue): + return None + + async def deregister(self, job_id: None): + return None + + async def test_log_generator(mocker: MockFixture, faker: Faker): mocker.patch( "simcore_service_api_server.services.log_streaming.LogStreamer._project_done", return_value=True, ) - log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore - log_streamer._is_registered = True + log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=_MockLogDistributor(), log_check_timeout=1) # type: ignore published_logs: list[str] = [] for _ in range(10): @@ -457,13 +463,6 @@ async def test_log_generator(mocker: MockFixture, faker: Faker): assert published_logs == collected_logs -async def test_log_generator_context(mocker: MockFixture, faker: Faker): - log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore - with pytest.raises(LogStreamerNotRegisteredError): - async for log in log_streamer.log_generator(): - print(log) - - @pytest.mark.parametrize("is_healthy", [True, False]) async def test_logstreaming_health_checker( mocker: MockFixture, client: httpx.AsyncClient, app: FastAPI, is_healthy: bool