Skip to content

Commit

Permalink
Merge pull request #42 from PrivateAIM/41-refactor-endpoints
Browse files Browse the repository at this point in the history
Reintroduce local endpoint and rename existing endpoints
  • Loading branch information
mjugl authored May 17, 2024
2 parents 8b128f0 + cb03fa6 commit fdd580b
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 60 deletions.
16 changes: 6 additions & 10 deletions project/routers/upload.py → project/routers/final.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,20 @@ def __bg_upload_to_remote(
@router.put(
"/",
status_code=status.HTTP_204_NO_CONTENT,
summary="Upload file to submit to Hub",
operation_id="putResultFile",
summary="Upload file as final result to Hub",
operation_id="putFinalResult",
)
async def upload_to_remote(
async def submit_final_result_to_hub(
client_id: Annotated[str, Depends(get_client_id)],
file: UploadFile,
background_tasks: BackgroundTasks,
settings: Annotated[Settings, Depends(get_settings)],
local_minio: Annotated[Minio, Depends(get_local_minio)],
api_client: Annotated[FlameHubClient, Depends(get_api_client)],
):
"""Upload a file to the local S3 instance and send it to FLAME Hub in the background.
The request is successful if the file was uploaded to the local S3 instance.
Responds with a 204 on success.
This endpoint is to be used for submitting final results of a federated analysis.
Currently, there is no way of determining the status or progress of the upload to the FLAME Hub."""
"""Upload a file as a final result to the FLAME Hub.
Returns a 204 on success.
This endpoint returns immediately and submits the file in the background."""
object_id = uuid.uuid4()
object_name = f"upload/{client_id}/{object_id}"

Expand Down
42 changes: 18 additions & 24 deletions project/routers/scratch.py → project/routers/intermediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
object_id_to_hub_bucket_dict: dict[str, Optional[str]] = {}


class ScratchUploadResponse(BaseModel):
class IntermediateUploadResponse(BaseModel):
url: HttpUrl


Expand Down Expand Up @@ -68,11 +68,12 @@ def __bg_upload_to_remote(

@router.put(
"/",
response_model=ScratchUploadResponse,
summary="Upload file to local object storage",
operation_id="putIntermediateFile",
status_code=status.HTTP_202_ACCEPTED,
response_model=IntermediateUploadResponse,
summary="Upload file as intermediate result to Hub",
operation_id="putIntermediateResult",
)
async def upload_to_scratch(
async def submit_intermediate_result_to_hub(
client_id: Annotated[str, Depends(get_client_id)],
file: UploadFile,
settings: Annotated[Settings, Depends(get_settings)],
Expand All @@ -81,12 +82,9 @@ async def upload_to_scratch(
api_client: Annotated[FlameHubClient, Depends(get_api_client)],
background_tasks: BackgroundTasks,
):
"""Upload a file to the local S3 instance.
The file is not forwarded to the FLAME hub.
Responds with a 200 on success and a link to the endpoint for fetching the uploaded file.
This endpoint is to be used for submitting intermediate results of a federated analysis.
"""
"""Upload a file as an intermediate result to the FLAME Hub.
Returns a 202 on success.
This endpoint returns immediately and submits the file in the background."""
object_id = str(uuid.uuid4())
object_name = f"temp/{client_id}/{object_id}"

Expand All @@ -110,10 +108,10 @@ async def upload_to_scratch(
object_id,
)

return ScratchUploadResponse(
return IntermediateUploadResponse(
url=str(
request.url_for(
"read_from_scratch",
"retrieve_intermediate_result_from_hub",
object_id=object_id,
)
),
Expand All @@ -122,21 +120,17 @@ async def upload_to_scratch(

@router.get(
"/{object_id}",
summary="Get file from local object storage",
operation_id="getIntermediateFile",
summary="Get intermediate result as file to Hub",
operation_id="getIntermediateResult",
# client id is not actually used here but required for auth. having this
# as a path dependency makes pycharm stop complaining about unused params.
dependencies=[Depends(get_client_id)],
)
async def read_from_scratch(
client_id: Annotated[str, Depends(get_client_id)],
async def retrieve_intermediate_result_from_hub(
object_id: uuid.UUID,
settings: Annotated[Settings, Depends(get_settings)],
api_client: Annotated[FlameHubClient, Depends(get_api_client)],
):
"""Get a file from the local S3 instance.
The file must have previously been uploaded using the PUT method of this endpoint.
Responds with a 200 on success and the requested file in the response body.
This endpoint is to be used for retrieving intermediate results of a federated analysis.
"""
"""Get an intermediate result as file from the FLAME Hub."""
object_id_str = str(object_id)

if (
Expand Down
90 changes: 90 additions & 0 deletions project/routers/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
import uuid
from typing import Annotated

from fastapi import Depends, UploadFile, APIRouter, HTTPException
from minio import Minio, S3Error
from pydantic import BaseModel, HttpUrl
from starlette import status
from starlette.requests import Request
from starlette.responses import StreamingResponse

from project.config import Settings
from project.dependencies import get_client_id, get_settings, get_local_minio

router = APIRouter()
logger = logging.getLogger(__name__)


class LocalUploadResponse(BaseModel):
url: HttpUrl


@router.put(
"/",
response_model=LocalUploadResponse,
summary="Upload file as intermediate result to local storage",
operation_id="putLocalResult",
)
async def submit_intermediate_result_to_local(
client_id: Annotated[str, Depends(get_client_id)],
file: UploadFile,
settings: Annotated[Settings, Depends(get_settings)],
minio: Annotated[Minio, Depends(get_local_minio)],
request: Request,
):
object_id = uuid.uuid4()
object_name = f"local/{client_id}/{object_id}"

minio.put_object(
settings.minio.bucket,
object_name,
data=file.file,
length=file.size,
content_type=file.content_type or "application/octet-stream",
)

return LocalUploadResponse(
url=str(
request.url_for(
"retrieve_intermediate_result_from_local",
object_id=object_id,
)
)
)


@router.get(
"/{object_id}",
summary="Get intermediate result as file from local storage",
operation_id="getLocalResult",
)
async def retrieve_intermediate_result_from_local(
client_id: Annotated[str, Depends(get_client_id)],
object_id: uuid.UUID,
settings: Annotated[Settings, Depends(get_settings)],
minio: Annotated[Minio, Depends(get_local_minio)],
):
try:
response = minio.get_object(
settings.minio.bucket,
f"local/{client_id}/{object_id}",
)
except S3Error as e:
logger.exception(f"Could not get object `{object_id}` for client `{client_id}`")

if e.code == "NoSuchKey":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Object with ID {object_id} does not exist",
)

raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Unexpected error from object store",
)

return StreamingResponse(
response,
media_type=response.headers.get("Content-Type", "application/octet-stream"),
)
32 changes: 21 additions & 11 deletions project/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import tomli
from fastapi import FastAPI

from project.routers import upload, scratch
from project.routers import final, intermediate, local


@asynccontextmanager
Expand Down Expand Up @@ -47,12 +47,16 @@ async def lifespan(app: FastAPI):
},
openapi_tags=[
{
"name": "upload",
"description": "Upload files for submission to FLAME hub",
"name": "final",
"description": "Upload final results to FLAME Hub",
},
{
"name": "scratch",
"description": "Upload files to local object storage",
"name": "intermediate",
"description": "Upload intermediate results to FLAME Hub",
},
{
"name": "local",
"description": "Upload intermediate results to local storage",
},
],
)
Expand All @@ -65,13 +69,19 @@ async def do_healthcheck():


app.include_router(
upload.router,
prefix="/upload",
tags=["upload"],
final.router,
prefix="/final",
tags=["final"],
)

app.include_router(
intermediate.router,
prefix="/intermediate",
tags=["intermediate"],
)

app.include_router(
scratch.router,
prefix="/scratch",
tags=["scratch"],
local.router,
prefix="/local",
tags=["local"],
)
8 changes: 5 additions & 3 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from tests.common.rest import detail_of

endpoints = [
("GET", f"/scratch/{uuid.uuid4()}"), # UUID can be arbitrary for auth checks
("PUT", "/scratch"),
("PUT", "/upload"),
("GET", f"/intermediate/{uuid.uuid4()}"), # UUID can be arbitrary for auth checks
("PUT", "/intermediate"),
("GET", f"/local/{uuid.uuid4()}"),
("PUT", "/local"),
("PUT", "/final"),
]


Expand Down
2 changes: 1 addition & 1 deletion tests/test_upload.py → tests/test_final.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_200_submit_to_upload(test_client, rng, api_client, analysis_id):

blob = next_random_bytes(rng)
r = test_client.put(
"/upload",
"/final",
auth=BearerAuth(issue_client_access_token(analysis_id)),
files=wrap_bytes_for_request(blob),
)
Expand Down
16 changes: 5 additions & 11 deletions tests/test_scratch.py → tests/test_intermediate.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import re
import uuid

import pytest
from starlette import status

from project.routers.scratch import ScratchUploadResponse
from project.routers.intermediate import IntermediateUploadResponse
from tests.common.auth import BearerAuth, issue_client_access_token
from tests.common.helpers import next_random_bytes, eventually
from tests.common.rest import wrap_bytes_for_request, detail_of
Expand All @@ -15,20 +14,15 @@
def test_200_submit_receive_from_scratch(test_client, rng, analysis_id):
blob = next_random_bytes(rng)
r = test_client.put(
"/scratch",
"/intermediate",
auth=BearerAuth(issue_client_access_token(analysis_id)),
files=wrap_bytes_for_request(blob),
)

assert r.status_code == status.HTTP_200_OK
assert r.status_code == status.HTTP_202_ACCEPTED

# check that the response contains a path to a valid resource
model = ScratchUploadResponse(**r.json())
path_regex = (
r"/scratch/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
)

assert re.fullmatch(path_regex, model.url.path) is not None
model = IntermediateUploadResponse(**r.json())

def _check_response_from_hub():
r = test_client.get(
Expand All @@ -48,7 +42,7 @@ def _check_response_from_hub():
def test_whatever(test_client):
rand_uuid = str(uuid.uuid4())
r = test_client.get(
f"/scratch/{rand_uuid}",
f"/intermediate/{rand_uuid}",
auth=BearerAuth(issue_client_access_token()),
)

Expand Down
39 changes: 39 additions & 0 deletions tests/test_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import uuid

from starlette import status

from project.routers.local import LocalUploadResponse
from tests.common.auth import BearerAuth, issue_client_access_token
from tests.common.helpers import next_random_bytes
from tests.common.rest import wrap_bytes_for_request, detail_of


def test_200_submit_receive_from_local(test_client, rng):
blob = next_random_bytes(rng)
r = test_client.put(
"/local",
auth=BearerAuth(issue_client_access_token()),
files=wrap_bytes_for_request(blob),
)

assert r.status_code == status.HTTP_200_OK
model = LocalUploadResponse(**r.json())

r = test_client.get(
model.url.path,
auth=BearerAuth(issue_client_access_token()),
)

assert r.status_code == status.HTTP_200_OK
assert r.read() == blob


def test_404_unknown_oid(test_client):
oid = uuid.uuid4()
r = test_client.get(
f"/local/{oid}",
auth=BearerAuth(issue_client_access_token()),
)

assert r.status_code == status.HTTP_404_NOT_FOUND
assert detail_of(r) == f"Object with ID {oid} does not exist"

0 comments on commit fdd580b

Please sign in to comment.