Skip to content

Commit

Permalink
Adds async function and streaming response for large aws s3 files lis…
Browse files Browse the repository at this point in the history
…ting and get files
  • Loading branch information
kshitijrajsharma committed Jan 16, 2024
1 parent 94911c0 commit e890c8a
Showing 1 changed file with 55 additions and 55 deletions.
110 changes: 55 additions & 55 deletions API/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

import boto3
import humanize
from boto3.session import Session
from botocore.exceptions import NoCredentialsError
from fastapi import APIRouter, HTTPException, Path, Query, Request
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse
from fastapi_versioning import version

from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME
Expand All @@ -15,10 +16,10 @@

router = APIRouter(prefix="/s3", tags=["S3"])


BUCKET_NAME = "production-raw-data-api"
AWS_REGION = "us-east-1"

s3 = boto3.client(
session = Session()
s3 = session.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
Expand All @@ -33,8 +34,6 @@
async def list_s3_files(
request: Request,
folder: str = Query(default="/HDX"),
page: int = Query(default=1, ge=1, description="Page number"),
page_size: int = Query(default=10, le=100, description="Items per page"),
prettify: bool = Query(
default=False, description="Display size & date in human-readable format"
),
Expand All @@ -43,50 +42,65 @@ async def list_s3_files(
folder = folder.strip("/")
prefix = f"{folder}/"

result = []

try:
# Create a paginator for list_objects_v2
# Use list_objects_v2 directly for pagination
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

# Paginate the results
start_index = (page - 1) * page_size
end_index = start_index + page_size
async def generate():
first_item = True
yield "["

for response in page_iterator:
contents = response.get("Contents", [])
for response in page_iterator:
contents = response.get("Contents", [])

for item in contents:
size = item["Size"]
last_modified = item["LastModified"].strftime("%Y-%m-%dT%H:%M:%SZ")
if prettify:
last_modified = humanize.naturaldate(item["LastModified"])
size = humanize.naturalsize(size)
for item in contents:
size = item["Size"]
last_modified = item["LastModified"].strftime("%Y-%m-%dT%H:%M:%SZ")
if prettify:
last_modified = humanize.naturaldate(item["LastModified"])
size = humanize.naturalsize(size)

item_dict = {
"Key": item["Key"],
"LastModified": last_modified,
"Size": size,
}
result.append(item_dict)
item_dict = {
"Key": item["Key"],
"LastModified": last_modified,
"Size": size,
}
if not first_item:
yield ","
else:
first_item = False
yield json.dumps(item_dict, default=str)

paginated_result = result[start_index:end_index]
yield "]"

# Calculate total number of pages
total_pages = -(-len(result) // page_size) # Ceiling division
return StreamingResponse(content=generate(), media_type="application/json")

# Include pagination information in the response
response_data = {
"current_page": page,
"total_pages": total_pages,
"items_per_page": page_size,
"items": paginated_result,
}
except NoCredentialsError:
raise HTTPException(status_code=500, detail="AWS credentials not available")

return JSONResponse(content=jsonable_encoder(response_data))

async def check_object_existence(bucket_name, file_path):
"""Async function to check object existence"""
try:
s3.head_object(Bucket=bucket_name, Key=file_path)
except NoCredentialsError:
raise HTTPException(status_code=500, detail="AWS credentials not available")
except Exception as e:
raise HTTPException(
status_code=404, detail=f"File or folder not found: {file_path}"
)


async def read_meta_json(bucket_name, file_path):
"""Async function to read from meta json"""
try:
response = s3.get_object(Bucket=bucket_name, Key=file_path)
content = json.loads(response["Body"].read())
return content
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error reading meta.json: {str(e)}"
)


@router.get("/get/{file_path:path}")
Expand All @@ -110,31 +124,17 @@ async def get_s3_file(
file_path = file_path.strip("/")
encoded_file_path = quote(file_path)

try:
# Check if the file or folder exists
s3.head_object(Bucket=bucket_name, Key=encoded_file_path)
except NoCredentialsError:
raise HTTPException(status_code=500, detail="AWS credentials not available")
except Exception as e:
raise HTTPException(
status_code=404, detail=f"File or folder not found: {file_path}"
)
await check_object_existence(bucket_name, encoded_file_path)

if read_meta and file_path.lower().endswith(".json"):
# Read and deliver the content of meta.json
try:
response = s3.get_object(Bucket=bucket_name, Key=file_path)
content = json.loads(response["Body"].read())
return JSONResponse(content=jsonable_encoder(content))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error reading meta.json: {str(e)}"
)
content = await read_meta_json(bucket_name, file_path)
return JSONResponse(content=jsonable_encoder(content))

# If not reading meta.json, generate a presigned URL
presigned_url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_path},
Params={"Bucket": bucket_name, "Key": encoded_file_path},
ExpiresIn=expiry,
)

Expand Down

0 comments on commit e890c8a

Please sign in to comment.