From e890c8a994ae417b93461f25f9d78200b0e93d32 Mon Sep 17 00:00:00 2001 From: kshitijrajsharma Date: Tue, 16 Jan 2024 17:11:21 +0545 Subject: [PATCH] Adds async function and streaming response for large aws s3 files listing and get files --- API/s3.py | 110 +++++++++++++++++++++++++++--------------------------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/API/s3.py b/API/s3.py index dc811fb6..39a4d187 100644 --- a/API/s3.py +++ b/API/s3.py @@ -3,10 +3,11 @@ import boto3 import humanize +from boto3.session import Session from botocore.exceptions import NoCredentialsError from fastapi import APIRouter, HTTPException, Path, Query, Request from fastapi.encoders import jsonable_encoder -from fastapi.responses import JSONResponse, RedirectResponse +from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from fastapi_versioning import version from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME @@ -15,10 +16,10 @@ router = APIRouter(prefix="/s3", tags=["S3"]) - +BUCKET_NAME = "production-raw-data-api" AWS_REGION = "us-east-1" - -s3 = boto3.client( +session = Session() +s3 = session.client( "s3", aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, @@ -33,8 +34,6 @@ async def list_s3_files( request: Request, folder: str = Query(default="/HDX"), - page: int = Query(default=1, ge=1, description="Page number"), - page_size: int = Query(default=10, le=100, description="Items per page"), prettify: bool = Query( default=False, description="Display size & date in human-readable format" ), @@ -43,50 +42,65 @@ async def list_s3_files( folder = folder.strip("/") prefix = f"{folder}/" - result = [] - try: - # Create a paginator for list_objects_v2 + # Use list_objects_v2 directly for pagination page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) - # Paginate the results - start_index = (page - 1) * page_size - end_index = start_index + page_size + async def generate(): + first_item = True + yield "[" - for response in page_iterator: - contents = response.get("Contents", []) + for response in page_iterator: + contents = response.get("Contents", []) - for item in contents: - size = item["Size"] - last_modified = item["LastModified"].strftime("%Y-%m-%dT%H:%M:%SZ") - if prettify: - last_modified = humanize.naturaldate(item["LastModified"]) - size = humanize.naturalsize(size) + for item in contents: + size = item["Size"] + last_modified = item["LastModified"].strftime("%Y-%m-%dT%H:%M:%SZ") + if prettify: + last_modified = humanize.naturaldate(item["LastModified"]) + size = humanize.naturalsize(size) - item_dict = { - "Key": item["Key"], - "LastModified": last_modified, - "Size": size, - } - result.append(item_dict) + item_dict = { + "Key": item["Key"], + "LastModified": last_modified, + "Size": size, + } + if not first_item: + yield "," + else: + first_item = False + yield json.dumps(item_dict, default=str) - paginated_result = result[start_index:end_index] + yield "]" - # Calculate total number of pages - total_pages = -(-len(result) // page_size) # Ceiling division + return StreamingResponse(content=generate(), media_type="application/json") - # Include pagination information in the response - response_data = { - "current_page": page, - "total_pages": total_pages, - "items_per_page": page_size, - "items": paginated_result, - } + except NoCredentialsError: + raise HTTPException(status_code=500, detail="AWS credentials not available") - return JSONResponse(content=jsonable_encoder(response_data)) +async def check_object_existence(bucket_name, file_path): + """Async function to check object existence""" + try: + s3.head_object(Bucket=bucket_name, Key=file_path) except NoCredentialsError: raise HTTPException(status_code=500, detail="AWS credentials not available") + except Exception as e: + raise HTTPException( + status_code=404, detail=f"File or folder not found: {file_path}" + ) + + +async def read_meta_json(bucket_name, file_path): + """Async function to read from meta json""" + try: + response = s3.get_object(Bucket=bucket_name, Key=file_path) + content = json.loads(response["Body"].read()) + return content + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error reading meta.json: {str(e)}" + ) @router.get("/get/{file_path:path}") @@ -110,31 +124,17 @@ async def get_s3_file( file_path = file_path.strip("/") encoded_file_path = quote(file_path) - try: - # Check if the file or folder exists - s3.head_object(Bucket=bucket_name, Key=encoded_file_path) - except NoCredentialsError: - raise HTTPException(status_code=500, detail="AWS credentials not available") - except Exception as e: - raise HTTPException( - status_code=404, detail=f"File or folder not found: {file_path}" - ) + await check_object_existence(bucket_name, encoded_file_path) if read_meta and file_path.lower().endswith(".json"): # Read and deliver the content of meta.json - try: - response = s3.get_object(Bucket=bucket_name, Key=file_path) - content = json.loads(response["Body"].read()) - return JSONResponse(content=jsonable_encoder(content)) - except Exception as e: - raise HTTPException( - status_code=500, detail=f"Error reading meta.json: {str(e)}" - ) + content = await read_meta_json(bucket_name, file_path) + return JSONResponse(content=jsonable_encoder(content)) # If not reading meta.json, generate a presigned URL presigned_url = s3.generate_presigned_url( "get_object", - Params={"Bucket": bucket_name, "Key": file_path}, + Params={"Bucket": bucket_name, "Key": encoded_file_path}, ExpiresIn=expiry, )