diff --git a/project/routers/upload.py b/project/routers/final.py similarity index 83% rename from project/routers/upload.py rename to project/routers/final.py index dca2fa4..eeac652 100644 --- a/project/routers/upload.py +++ b/project/routers/final.py @@ -66,10 +66,10 @@ def __bg_upload_to_remote( @router.put( "/", status_code=status.HTTP_204_NO_CONTENT, - summary="Upload file to submit to Hub", - operation_id="putResultFile", + summary="Upload file as final result to Hub", + operation_id="putFinalResult", ) -async def upload_to_remote( +async def submit_final_result_to_hub( client_id: Annotated[str, Depends(get_client_id)], file: UploadFile, background_tasks: BackgroundTasks, @@ -77,13 +77,9 @@ async def upload_to_remote( local_minio: Annotated[Minio, Depends(get_local_minio)], api_client: Annotated[FlameHubClient, Depends(get_api_client)], ): - """Upload a file to the local S3 instance and send it to FLAME Hub in the background. - The request is successful if the file was uploaded to the local S3 instance. - Responds with a 204 on success. - - This endpoint is to be used for submitting final results of a federated analysis. - - Currently, there is no way of determining the status or progress of the upload to the FLAME Hub.""" + """Upload a file as a final result to the FLAME Hub. + Returns a 204 on success. + This endpoint returns immediately and submits the file in the background.""" object_id = uuid.uuid4() object_name = f"upload/{client_id}/{object_id}" diff --git a/project/routers/scratch.py b/project/routers/intermediate.py similarity index 75% rename from project/routers/scratch.py rename to project/routers/intermediate.py index 4eb138b..00bb906 100644 --- a/project/routers/scratch.py +++ b/project/routers/intermediate.py @@ -26,7 +26,7 @@ object_id_to_hub_bucket_dict: dict[str, Optional[str]] = {} -class ScratchUploadResponse(BaseModel): +class IntermediateUploadResponse(BaseModel): url: HttpUrl @@ -68,11 +68,12 @@ def __bg_upload_to_remote( @router.put( "/", - response_model=ScratchUploadResponse, - summary="Upload file to local object storage", - operation_id="putIntermediateFile", + status_code=status.HTTP_202_ACCEPTED, + response_model=IntermediateUploadResponse, + summary="Upload file as intermediate result to Hub", + operation_id="putIntermediateResult", ) -async def upload_to_scratch( +async def submit_intermediate_result_to_hub( client_id: Annotated[str, Depends(get_client_id)], file: UploadFile, settings: Annotated[Settings, Depends(get_settings)], @@ -81,12 +82,9 @@ async def upload_to_scratch( api_client: Annotated[FlameHubClient, Depends(get_api_client)], background_tasks: BackgroundTasks, ): - """Upload a file to the local S3 instance. - The file is not forwarded to the FLAME hub. - Responds with a 200 on success and a link to the endpoint for fetching the uploaded file. - - This endpoint is to be used for submitting intermediate results of a federated analysis. - """ + """Upload a file as an intermediate result to the FLAME Hub. + Returns a 202 on success. + This endpoint returns immediately and submits the file in the background.""" object_id = str(uuid.uuid4()) object_name = f"temp/{client_id}/{object_id}" @@ -110,10 +108,10 @@ async def upload_to_scratch( object_id, ) - return ScratchUploadResponse( + return IntermediateUploadResponse( url=str( request.url_for( - "read_from_scratch", + "retrieve_intermediate_result_from_hub", object_id=object_id, ) ), @@ -122,21 +120,17 @@ async def upload_to_scratch( @router.get( "/{object_id}", - summary="Get file from local object storage", - operation_id="getIntermediateFile", + summary="Get intermediate result as file to Hub", + operation_id="getIntermediateResult", + # client id is not actually used here but required for auth. having this + # as a path dependency makes pycharm stop complaining about unused params. + dependencies=[Depends(get_client_id)], ) -async def read_from_scratch( - client_id: Annotated[str, Depends(get_client_id)], +async def retrieve_intermediate_result_from_hub( object_id: uuid.UUID, - settings: Annotated[Settings, Depends(get_settings)], api_client: Annotated[FlameHubClient, Depends(get_api_client)], ): - """Get a file from the local S3 instance. - The file must have previously been uploaded using the PUT method of this endpoint. - Responds with a 200 on success and the requested file in the response body. - - This endpoint is to be used for retrieving intermediate results of a federated analysis. - """ + """Get an intermediate result as file from the FLAME Hub.""" object_id_str = str(object_id) if ( diff --git a/project/routers/local.py b/project/routers/local.py new file mode 100644 index 0000000..b00e0ac --- /dev/null +++ b/project/routers/local.py @@ -0,0 +1,90 @@ +import logging +import uuid +from typing import Annotated + +from fastapi import Depends, UploadFile, APIRouter, HTTPException +from minio import Minio, S3Error +from pydantic import BaseModel, HttpUrl +from starlette import status +from starlette.requests import Request +from starlette.responses import StreamingResponse + +from project.config import Settings +from project.dependencies import get_client_id, get_settings, get_local_minio + +router = APIRouter() +logger = logging.getLogger(__name__) + + +class LocalUploadResponse(BaseModel): + url: HttpUrl + + +@router.put( + "/", + response_model=LocalUploadResponse, + summary="Upload file as intermediate result to local storage", + operation_id="putLocalResult", +) +async def submit_intermediate_result_to_local( + client_id: Annotated[str, Depends(get_client_id)], + file: UploadFile, + settings: Annotated[Settings, Depends(get_settings)], + minio: Annotated[Minio, Depends(get_local_minio)], + request: Request, +): + object_id = uuid.uuid4() + object_name = f"local/{client_id}/{object_id}" + + minio.put_object( + settings.minio.bucket, + object_name, + data=file.file, + length=file.size, + content_type=file.content_type or "application/octet-stream", + ) + + return LocalUploadResponse( + url=str( + request.url_for( + "retrieve_intermediate_result_from_local", + object_id=object_id, + ) + ) + ) + + +@router.get( + "/{object_id}", + summary="Get intermediate result as file from local storage", + operation_id="getLocalResult", +) +async def retrieve_intermediate_result_from_local( + client_id: Annotated[str, Depends(get_client_id)], + object_id: uuid.UUID, + settings: Annotated[Settings, Depends(get_settings)], + minio: Annotated[Minio, Depends(get_local_minio)], +): + try: + response = minio.get_object( + settings.minio.bucket, + f"local/{client_id}/{object_id}", + ) + except S3Error as e: + logger.exception(f"Could not get object `{object_id}` for client `{client_id}`") + + if e.code == "NoSuchKey": + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Object with ID {object_id} does not exist", + ) + + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail="Unexpected error from object store", + ) + + return StreamingResponse( + response, + media_type=response.headers.get("Content-Type", "application/octet-stream"), + ) diff --git a/project/server.py b/project/server.py index cb3cdfb..1b2aa95 100644 --- a/project/server.py +++ b/project/server.py @@ -7,7 +7,7 @@ import tomli from fastapi import FastAPI -from project.routers import upload, scratch +from project.routers import final, intermediate, local @asynccontextmanager @@ -47,12 +47,16 @@ async def lifespan(app: FastAPI): }, openapi_tags=[ { - "name": "upload", - "description": "Upload files for submission to FLAME hub", + "name": "final", + "description": "Upload final results to FLAME Hub", }, { - "name": "scratch", - "description": "Upload files to local object storage", + "name": "intermediate", + "description": "Upload intermediate results to FLAME Hub", + }, + { + "name": "local", + "description": "Upload intermediate results to local storage", }, ], ) @@ -65,13 +69,19 @@ async def do_healthcheck(): app.include_router( - upload.router, - prefix="/upload", - tags=["upload"], + final.router, + prefix="/final", + tags=["final"], +) + +app.include_router( + intermediate.router, + prefix="/intermediate", + tags=["intermediate"], ) app.include_router( - scratch.router, - prefix="/scratch", - tags=["scratch"], + local.router, + prefix="/local", + tags=["local"], ) diff --git a/tests/test_auth.py b/tests/test_auth.py index 5681770..54cde83 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -8,9 +8,11 @@ from tests.common.rest import detail_of endpoints = [ - ("GET", f"/scratch/{uuid.uuid4()}"), # UUID can be arbitrary for auth checks - ("PUT", "/scratch"), - ("PUT", "/upload"), + ("GET", f"/intermediate/{uuid.uuid4()}"), # UUID can be arbitrary for auth checks + ("PUT", "/intermediate"), + ("GET", f"/local/{uuid.uuid4()}"), + ("PUT", "/local"), + ("PUT", "/final"), ] diff --git a/tests/test_upload.py b/tests/test_final.py similarity index 97% rename from tests/test_upload.py rename to tests/test_final.py index cf393a0..1d1b356 100644 --- a/tests/test_upload.py +++ b/tests/test_final.py @@ -13,7 +13,7 @@ def test_200_submit_to_upload(test_client, rng, api_client, analysis_id): blob = next_random_bytes(rng) r = test_client.put( - "/upload", + "/final", auth=BearerAuth(issue_client_access_token(analysis_id)), files=wrap_bytes_for_request(blob), ) diff --git a/tests/test_scratch.py b/tests/test_intermediate.py similarity index 75% rename from tests/test_scratch.py rename to tests/test_intermediate.py index 2a2b258..3eb55b6 100644 --- a/tests/test_scratch.py +++ b/tests/test_intermediate.py @@ -1,10 +1,9 @@ -import re import uuid import pytest from starlette import status -from project.routers.scratch import ScratchUploadResponse +from project.routers.intermediate import IntermediateUploadResponse from tests.common.auth import BearerAuth, issue_client_access_token from tests.common.helpers import next_random_bytes, eventually from tests.common.rest import wrap_bytes_for_request, detail_of @@ -15,20 +14,15 @@ def test_200_submit_receive_from_scratch(test_client, rng, analysis_id): blob = next_random_bytes(rng) r = test_client.put( - "/scratch", + "/intermediate", auth=BearerAuth(issue_client_access_token(analysis_id)), files=wrap_bytes_for_request(blob), ) - assert r.status_code == status.HTTP_200_OK + assert r.status_code == status.HTTP_202_ACCEPTED # check that the response contains a path to a valid resource - model = ScratchUploadResponse(**r.json()) - path_regex = ( - r"/scratch/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" - ) - - assert re.fullmatch(path_regex, model.url.path) is not None + model = IntermediateUploadResponse(**r.json()) def _check_response_from_hub(): r = test_client.get( @@ -48,7 +42,7 @@ def _check_response_from_hub(): def test_whatever(test_client): rand_uuid = str(uuid.uuid4()) r = test_client.get( - f"/scratch/{rand_uuid}", + f"/intermediate/{rand_uuid}", auth=BearerAuth(issue_client_access_token()), ) diff --git a/tests/test_local.py b/tests/test_local.py new file mode 100644 index 0000000..ef243fd --- /dev/null +++ b/tests/test_local.py @@ -0,0 +1,39 @@ +import uuid + +from starlette import status + +from project.routers.local import LocalUploadResponse +from tests.common.auth import BearerAuth, issue_client_access_token +from tests.common.helpers import next_random_bytes +from tests.common.rest import wrap_bytes_for_request, detail_of + + +def test_200_submit_receive_from_local(test_client, rng): + blob = next_random_bytes(rng) + r = test_client.put( + "/local", + auth=BearerAuth(issue_client_access_token()), + files=wrap_bytes_for_request(blob), + ) + + assert r.status_code == status.HTTP_200_OK + model = LocalUploadResponse(**r.json()) + + r = test_client.get( + model.url.path, + auth=BearerAuth(issue_client_access_token()), + ) + + assert r.status_code == status.HTTP_200_OK + assert r.read() == blob + + +def test_404_unknown_oid(test_client): + oid = uuid.uuid4() + r = test_client.get( + f"/local/{oid}", + auth=BearerAuth(issue_client_access_token()), + ) + + assert r.status_code == status.HTTP_404_NOT_FOUND + assert detail_of(r) == f"Object with ID {oid} does not exist"