From 53d7b66217e887ea6aefce0896e6c476390589db Mon Sep 17 00:00:00 2001 From: Maximilian Jugl Date: Fri, 17 May 2024 13:09:21 +0200 Subject: [PATCH 1/5] feat: rename `upload` endpoint to `final` and `scratch` to `intermediate` --- project/routers/{upload.py => final.py} | 16 ++++------ .../routers/{scratch.py => intermediate.py} | 32 +++++++------------ project/server.py | 14 ++++---- tests/test_auth.py | 6 ++-- tests/{test_upload.py => test_final.py} | 2 +- .../{test_scratch.py => test_intermediate.py} | 14 +++----- 6 files changed, 33 insertions(+), 51 deletions(-) rename project/routers/{upload.py => final.py} (83%) rename project/routers/{scratch.py => intermediate.py} (79%) rename tests/{test_upload.py => test_final.py} (97%) rename tests/{test_scratch.py => test_intermediate.py} (78%) diff --git a/project/routers/upload.py b/project/routers/final.py similarity index 83% rename from project/routers/upload.py rename to project/routers/final.py index dca2fa4..eeac652 100644 --- a/project/routers/upload.py +++ b/project/routers/final.py @@ -66,10 +66,10 @@ def __bg_upload_to_remote( @router.put( "/", status_code=status.HTTP_204_NO_CONTENT, - summary="Upload file to submit to Hub", - operation_id="putResultFile", + summary="Upload file as final result to Hub", + operation_id="putFinalResult", ) -async def upload_to_remote( +async def submit_final_result_to_hub( client_id: Annotated[str, Depends(get_client_id)], file: UploadFile, background_tasks: BackgroundTasks, @@ -77,13 +77,9 @@ async def upload_to_remote( local_minio: Annotated[Minio, Depends(get_local_minio)], api_client: Annotated[FlameHubClient, Depends(get_api_client)], ): - """Upload a file to the local S3 instance and send it to FLAME Hub in the background. - The request is successful if the file was uploaded to the local S3 instance. - Responds with a 204 on success. - - This endpoint is to be used for submitting final results of a federated analysis. - - Currently, there is no way of determining the status or progress of the upload to the FLAME Hub.""" + """Upload a file as a final result to the FLAME Hub. + Returns a 204 on success. + This endpoint returns immediately and submits the file in the background.""" object_id = uuid.uuid4() object_name = f"upload/{client_id}/{object_id}" diff --git a/project/routers/scratch.py b/project/routers/intermediate.py similarity index 79% rename from project/routers/scratch.py rename to project/routers/intermediate.py index 4eb138b..8521d3b 100644 --- a/project/routers/scratch.py +++ b/project/routers/intermediate.py @@ -68,11 +68,12 @@ def __bg_upload_to_remote( @router.put( "/", + status_code=status.HTTP_202_ACCEPTED, response_model=ScratchUploadResponse, - summary="Upload file to local object storage", - operation_id="putIntermediateFile", + summary="Upload file as intermediate result to Hub", + operation_id="putIntermediateResult", ) -async def upload_to_scratch( +async def submit_intermediate_result_to_hub( client_id: Annotated[str, Depends(get_client_id)], file: UploadFile, settings: Annotated[Settings, Depends(get_settings)], @@ -81,12 +82,9 @@ async def upload_to_scratch( api_client: Annotated[FlameHubClient, Depends(get_api_client)], background_tasks: BackgroundTasks, ): - """Upload a file to the local S3 instance. - The file is not forwarded to the FLAME hub. - Responds with a 200 on success and a link to the endpoint for fetching the uploaded file. - - This endpoint is to be used for submitting intermediate results of a federated analysis. - """ + """Upload a file as an intermediate result to the FLAME Hub. + Returns a 202 on success. + This endpoint returns immediately and submits the file in the background.""" object_id = str(uuid.uuid4()) object_name = f"temp/{client_id}/{object_id}" @@ -113,7 +111,7 @@ async def upload_to_scratch( return ScratchUploadResponse( url=str( request.url_for( - "read_from_scratch", + "retrieve_intermediate_result_from_hub", object_id=object_id, ) ), @@ -122,21 +120,15 @@ async def upload_to_scratch( @router.get( "/{object_id}", - summary="Get file from local object storage", - operation_id="getIntermediateFile", + summary="Get intermediate result as file to Hub", + operation_id="getIntermediateResult", ) -async def read_from_scratch( +async def retrieve_intermediate_result_from_hub( client_id: Annotated[str, Depends(get_client_id)], object_id: uuid.UUID, - settings: Annotated[Settings, Depends(get_settings)], api_client: Annotated[FlameHubClient, Depends(get_api_client)], ): - """Get a file from the local S3 instance. - The file must have previously been uploaded using the PUT method of this endpoint. - Responds with a 200 on success and the requested file in the response body. - - This endpoint is to be used for retrieving intermediate results of a federated analysis. - """ + """Get an intermediate result as file from the FLAME Hub.""" object_id_str = str(object_id) if ( diff --git a/project/server.py b/project/server.py index cb3cdfb..17eef0c 100644 --- a/project/server.py +++ b/project/server.py @@ -7,7 +7,7 @@ import tomli from fastapi import FastAPI -from project.routers import upload, scratch +from project.routers import final, intermediate @asynccontextmanager @@ -65,13 +65,13 @@ async def do_healthcheck(): app.include_router( - upload.router, - prefix="/upload", - tags=["upload"], + final.router, + prefix="/final", + tags=["final"], ) app.include_router( - scratch.router, - prefix="/scratch", - tags=["scratch"], + intermediate.router, + prefix="/intermediate", + tags=["intermediate"], ) diff --git a/tests/test_auth.py b/tests/test_auth.py index 5681770..a34fcfe 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -8,9 +8,9 @@ from tests.common.rest import detail_of endpoints = [ - ("GET", f"/scratch/{uuid.uuid4()}"), # UUID can be arbitrary for auth checks - ("PUT", "/scratch"), - ("PUT", "/upload"), + ("GET", f"/intermediate/{uuid.uuid4()}"), # UUID can be arbitrary for auth checks + ("PUT", "/intermediate"), + ("PUT", "/final"), ] diff --git a/tests/test_upload.py b/tests/test_final.py similarity index 97% rename from tests/test_upload.py rename to tests/test_final.py index cf393a0..1d1b356 100644 --- a/tests/test_upload.py +++ b/tests/test_final.py @@ -13,7 +13,7 @@ def test_200_submit_to_upload(test_client, rng, api_client, analysis_id): blob = next_random_bytes(rng) r = test_client.put( - "/upload", + "/final", auth=BearerAuth(issue_client_access_token(analysis_id)), files=wrap_bytes_for_request(blob), ) diff --git a/tests/test_scratch.py b/tests/test_intermediate.py similarity index 78% rename from tests/test_scratch.py rename to tests/test_intermediate.py index 2a2b258..554768e 100644 --- a/tests/test_scratch.py +++ b/tests/test_intermediate.py @@ -1,10 +1,9 @@ -import re import uuid import pytest from starlette import status -from project.routers.scratch import ScratchUploadResponse +from project.routers.intermediate import ScratchUploadResponse from tests.common.auth import BearerAuth, issue_client_access_token from tests.common.helpers import next_random_bytes, eventually from tests.common.rest import wrap_bytes_for_request, detail_of @@ -15,20 +14,15 @@ def test_200_submit_receive_from_scratch(test_client, rng, analysis_id): blob = next_random_bytes(rng) r = test_client.put( - "/scratch", + "/intermediate", auth=BearerAuth(issue_client_access_token(analysis_id)), files=wrap_bytes_for_request(blob), ) - assert r.status_code == status.HTTP_200_OK + assert r.status_code == status.HTTP_202_ACCEPTED # check that the response contains a path to a valid resource model = ScratchUploadResponse(**r.json()) - path_regex = ( - r"/scratch/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" - ) - - assert re.fullmatch(path_regex, model.url.path) is not None def _check_response_from_hub(): r = test_client.get( @@ -48,7 +42,7 @@ def _check_response_from_hub(): def test_whatever(test_client): rand_uuid = str(uuid.uuid4()) r = test_client.get( - f"/scratch/{rand_uuid}", + f"/intermediate/{rand_uuid}", auth=BearerAuth(issue_client_access_token()), ) From 7e47ca5167ea1a324640b6d86b7980dc93535e86 Mon Sep 17 00:00:00 2001 From: Maximilian Jugl Date: Fri, 17 May 2024 14:37:22 +0200 Subject: [PATCH 2/5] feat: rename response model for intermediate route --- project/routers/intermediate.py | 6 +++--- tests/test_intermediate.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/project/routers/intermediate.py b/project/routers/intermediate.py index 8521d3b..ce611fa 100644 --- a/project/routers/intermediate.py +++ b/project/routers/intermediate.py @@ -26,7 +26,7 @@ object_id_to_hub_bucket_dict: dict[str, Optional[str]] = {} -class ScratchUploadResponse(BaseModel): +class IntermediateUploadResponse(BaseModel): url: HttpUrl @@ -69,7 +69,7 @@ def __bg_upload_to_remote( @router.put( "/", status_code=status.HTTP_202_ACCEPTED, - response_model=ScratchUploadResponse, + response_model=IntermediateUploadResponse, summary="Upload file as intermediate result to Hub", operation_id="putIntermediateResult", ) @@ -108,7 +108,7 @@ async def submit_intermediate_result_to_hub( object_id, ) - return ScratchUploadResponse( + return IntermediateUploadResponse( url=str( request.url_for( "retrieve_intermediate_result_from_hub", diff --git a/tests/test_intermediate.py b/tests/test_intermediate.py index 554768e..3eb55b6 100644 --- a/tests/test_intermediate.py +++ b/tests/test_intermediate.py @@ -3,7 +3,7 @@ import pytest from starlette import status -from project.routers.intermediate import ScratchUploadResponse +from project.routers.intermediate import IntermediateUploadResponse from tests.common.auth import BearerAuth, issue_client_access_token from tests.common.helpers import next_random_bytes, eventually from tests.common.rest import wrap_bytes_for_request, detail_of @@ -22,7 +22,7 @@ def test_200_submit_receive_from_scratch(test_client, rng, analysis_id): assert r.status_code == status.HTTP_202_ACCEPTED # check that the response contains a path to a valid resource - model = ScratchUploadResponse(**r.json()) + model = IntermediateUploadResponse(**r.json()) def _check_response_from_hub(): r = test_client.get( From 3cba0a1e9cf96f89bb590d5f67f2752e7b689888 Mon Sep 17 00:00:00 2001 From: Maximilian Jugl Date: Fri, 17 May 2024 14:38:19 +0200 Subject: [PATCH 3/5] feat: add route for local result upload --- project/routers/local.py | 90 ++++++++++++++++++++++++++++++++++++++++ project/server.py | 20 ++++++--- tests/test_local.py | 39 +++++++++++++++++ 3 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 project/routers/local.py create mode 100644 tests/test_local.py diff --git a/project/routers/local.py b/project/routers/local.py new file mode 100644 index 0000000..b00e0ac --- /dev/null +++ b/project/routers/local.py @@ -0,0 +1,90 @@ +import logging +import uuid +from typing import Annotated + +from fastapi import Depends, UploadFile, APIRouter, HTTPException +from minio import Minio, S3Error +from pydantic import BaseModel, HttpUrl +from starlette import status +from starlette.requests import Request +from starlette.responses import StreamingResponse + +from project.config import Settings +from project.dependencies import get_client_id, get_settings, get_local_minio + +router = APIRouter() +logger = logging.getLogger(__name__) + + +class LocalUploadResponse(BaseModel): + url: HttpUrl + + +@router.put( + "/", + response_model=LocalUploadResponse, + summary="Upload file as intermediate result to local storage", + operation_id="putLocalResult", +) +async def submit_intermediate_result_to_local( + client_id: Annotated[str, Depends(get_client_id)], + file: UploadFile, + settings: Annotated[Settings, Depends(get_settings)], + minio: Annotated[Minio, Depends(get_local_minio)], + request: Request, +): + object_id = uuid.uuid4() + object_name = f"local/{client_id}/{object_id}" + + minio.put_object( + settings.minio.bucket, + object_name, + data=file.file, + length=file.size, + content_type=file.content_type or "application/octet-stream", + ) + + return LocalUploadResponse( + url=str( + request.url_for( + "retrieve_intermediate_result_from_local", + object_id=object_id, + ) + ) + ) + + +@router.get( + "/{object_id}", + summary="Get intermediate result as file from local storage", + operation_id="getLocalResult", +) +async def retrieve_intermediate_result_from_local( + client_id: Annotated[str, Depends(get_client_id)], + object_id: uuid.UUID, + settings: Annotated[Settings, Depends(get_settings)], + minio: Annotated[Minio, Depends(get_local_minio)], +): + try: + response = minio.get_object( + settings.minio.bucket, + f"local/{client_id}/{object_id}", + ) + except S3Error as e: + logger.exception(f"Could not get object `{object_id}` for client `{client_id}`") + + if e.code == "NoSuchKey": + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Object with ID {object_id} does not exist", + ) + + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Unexpected error from object store", + ) + + return StreamingResponse( + response, + media_type=response.headers.get("Content-Type", "application/octet-stream"), + ) diff --git a/project/server.py b/project/server.py index 17eef0c..1b2aa95 100644 --- a/project/server.py +++ b/project/server.py @@ -7,7 +7,7 @@ import tomli from fastapi import FastAPI -from project.routers import final, intermediate +from project.routers import final, intermediate, local @asynccontextmanager @@ -47,12 +47,16 @@ async def lifespan(app: FastAPI): }, openapi_tags=[ { - "name": "upload", - "description": "Upload files for submission to FLAME hub", + "name": "final", + "description": "Upload final results to FLAME Hub", }, { - "name": "scratch", - "description": "Upload files to local object storage", + "name": "intermediate", + "description": "Upload intermediate results to FLAME Hub", + }, + { + "name": "local", + "description": "Upload intermediate results to local storage", }, ], ) @@ -75,3 +79,9 @@ async def do_healthcheck(): prefix="/intermediate", tags=["intermediate"], ) + +app.include_router( + local.router, + prefix="/local", + tags=["local"], +) diff --git a/tests/test_local.py b/tests/test_local.py new file mode 100644 index 0000000..ef243fd --- /dev/null +++ b/tests/test_local.py @@ -0,0 +1,39 @@ +import uuid + +from starlette import status + +from project.routers.local import LocalUploadResponse +from tests.common.auth import BearerAuth, issue_client_access_token +from tests.common.helpers import next_random_bytes +from tests.common.rest import wrap_bytes_for_request, detail_of + + +def test_200_submit_receive_from_local(test_client, rng): + blob = next_random_bytes(rng) + r = test_client.put( + "/local", + auth=BearerAuth(issue_client_access_token()), + files=wrap_bytes_for_request(blob), + ) + + assert r.status_code == status.HTTP_200_OK + model = LocalUploadResponse(**r.json()) + + r = test_client.get( + model.url.path, + auth=BearerAuth(issue_client_access_token()), + ) + + assert r.status_code == status.HTTP_200_OK + assert r.read() == blob + + +def test_404_unknown_oid(test_client): + oid = uuid.uuid4() + r = test_client.get( + f"/local/{oid}", + auth=BearerAuth(issue_client_access_token()), + ) + + assert r.status_code == status.HTTP_404_NOT_FOUND + assert detail_of(r) == f"Object with ID {oid} does not exist" From 54429287cf7d9a72c942dcceea9f1ffabe1695f0 Mon Sep 17 00:00:00 2001 From: Maximilian Jugl Date: Fri, 17 May 2024 14:41:36 +0200 Subject: [PATCH 4/5] feat: add local endpoint to auth test --- tests/test_auth.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_auth.py b/tests/test_auth.py index a34fcfe..54cde83 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -10,6 +10,8 @@ endpoints = [ ("GET", f"/intermediate/{uuid.uuid4()}"), # UUID can be arbitrary for auth checks ("PUT", "/intermediate"), + ("GET", f"/local/{uuid.uuid4()}"), + ("PUT", "/local"), ("PUT", "/final"), ] From cb03fa6d94139b4a78d06e9680f9168b50a75470 Mon Sep 17 00:00:00 2001 From: Maximilian Jugl Date: Fri, 17 May 2024 14:44:35 +0200 Subject: [PATCH 5/5] feat: move client ID retrieval in GET `intermediate` to path definition --- project/routers/intermediate.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/routers/intermediate.py b/project/routers/intermediate.py index ce611fa..00bb906 100644 --- a/project/routers/intermediate.py +++ b/project/routers/intermediate.py @@ -122,9 +122,11 @@ async def submit_intermediate_result_to_hub( "/{object_id}", summary="Get intermediate result as file to Hub", operation_id="getIntermediateResult", + # client id is not actually used here but required for auth. having this + # as a path dependency makes pycharm stop complaining about unused params. + dependencies=[Depends(get_client_id)], ) async def retrieve_intermediate_result_from_hub( - client_id: Annotated[str, Depends(get_client_id)], object_id: uuid.UUID, api_client: Annotated[FlameHubClient, Depends(get_api_client)], ):