-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #186 from hotosm/feature/s3_api
Feature s3 List and Download API
- Loading branch information
Showing
4 changed files
with
165 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
from urllib.parse import quote | ||
|
||
import boto3 | ||
import humanize | ||
from botocore.exceptions import NoCredentialsError | ||
from fastapi import APIRouter, HTTPException, Path, Query, Request | ||
from fastapi.encoders import jsonable_encoder | ||
from fastapi.responses import JSONResponse | ||
from fastapi_versioning import version | ||
|
||
from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME | ||
from src.config import LIMITER as limiter | ||
from src.config import POLYGON_STATISTICS_API_RATE_LIMIT | ||
|
||
router = APIRouter(prefix="/s3", tags=["S3"]) | ||
|
||
|
||
AWS_REGION = "us-east-1" | ||
|
||
s3 = boto3.client( | ||
"s3", | ||
aws_access_key_id=AWS_ACCESS_KEY_ID, | ||
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | ||
region_name=AWS_REGION, | ||
) | ||
paginator = s3.get_paginator("list_objects_v2") | ||
|
||
|
||
@router.get("/files/") | ||
@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute") | ||
@version(1) | ||
def list_s3_files( | ||
request: Request, | ||
folder: str = Query(default="/HDX"), | ||
page: int = Query(default=1, ge=1, description="Page number"), | ||
page_size: int = Query(default=10, le=100, description="Items per page"), | ||
prettify: bool = Query( | ||
default=False, description="Display size & date in human-readable format" | ||
), | ||
): | ||
bucket_name = BUCKET_NAME | ||
folder = folder.strip("/") | ||
prefix = f"{folder}/" | ||
|
||
result = [] | ||
|
||
try: | ||
# Create a paginator for list_objects_v2 | ||
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) | ||
|
||
# Paginate the results | ||
start_index = (page - 1) * page_size | ||
end_index = start_index + page_size | ||
|
||
for response in page_iterator: | ||
contents = response.get("Contents", []) | ||
|
||
for item in contents: | ||
size = item["Size"] | ||
last_modified = item["LastModified"].strftime("%Y-%m-%dT%H:%M:%SZ") | ||
if prettify: | ||
last_modified = humanize.naturaldate(item["LastModified"]) | ||
size = humanize.naturalsize(size) | ||
|
||
item_dict = { | ||
"Key": item["Key"], | ||
"LastModified": last_modified, | ||
"Size": size, | ||
} | ||
result.append(item_dict) | ||
|
||
paginated_result = result[start_index:end_index] | ||
|
||
# Calculate total number of pages | ||
total_pages = -(-len(result) // page_size) # Ceiling division | ||
|
||
# Include pagination information in the response | ||
response_data = { | ||
"current_page": page, | ||
"total_pages": total_pages, | ||
"items_per_page": page_size, | ||
"items": paginated_result, | ||
} | ||
|
||
return JSONResponse(content=jsonable_encoder(response_data)) | ||
|
||
except NoCredentialsError: | ||
raise HTTPException(status_code=500, detail="AWS credentials not available") | ||
|
||
|
||
@router.get("/s3-files/{file_path:path}") | ||
@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute") | ||
@version(1) | ||
def get_s3_file( | ||
request: Request, | ||
file_path: str = Path(..., description="The path to the file or folder in S3"), | ||
): | ||
bucket_name = BUCKET_NAME | ||
file_path = file_path.strip("/") | ||
encoded_file_path = quote(file_path) | ||
try: | ||
# Check if the file or folder exists | ||
s3.head_object(Bucket=bucket_name, Key=encoded_file_path) | ||
except NoCredentialsError: | ||
raise HTTPException(status_code=500, detail="AWS credentials not available") | ||
except Exception as e: | ||
print(e) | ||
raise HTTPException( | ||
status_code=404, detail=f"File or folder not found: {file_path}" | ||
) | ||
|
||
# If it's a folder, list its contents | ||
if file_path.endswith("/"): | ||
contents = list_objects(bucket_name, file_path) | ||
result = [] | ||
for item in contents: | ||
item_dict = { | ||
"name": item["Key"].split("/")[-1], | ||
"last_modified": item["LastModified"], | ||
"is_folder": item["Key"].endswith("/"), | ||
} | ||
result.append(item_dict) | ||
return JSONResponse(content=jsonable_encoder(result)) | ||
|
||
# If it's a file, return the S3 download link | ||
else: | ||
presigned_url = s3.generate_presigned_url( | ||
"get_object", | ||
Params={"Bucket": bucket_name, "Key": file_path}, | ||
ExpiresIn=3600, # URL expires in 1 hour | ||
) | ||
return JSONResponse(content=jsonable_encoder({"download_link": presigned_url})) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
# <[email protected]> | ||
"""Page contains Main core logic of app""" | ||
import concurrent.futures | ||
import json | ||
import os | ||
import pathlib | ||
import re | ||
|
@@ -27,9 +28,7 @@ | |
import time | ||
import uuid | ||
from collections import namedtuple | ||
from datetime import datetime | ||
from datetime import datetime as dt | ||
from datetime import timezone | ||
from datetime import datetime, timedelta, timezone | ||
from json import dumps | ||
from json import loads as json_loads | ||
|
||
|
@@ -1307,7 +1306,7 @@ def file_to_zip(self, working_dir, zip_path): | |
) | ||
for file_path in pathlib.Path(working_dir).iterdir(): | ||
zf.write(file_path, arcname=file_path.name) | ||
utc_now = dt.now(timezone.utc) | ||
utc_now = datetime.now(timezone.utc) | ||
utc_offset = utc_now.strftime("%z") | ||
# Adding metadata readme.txt | ||
readme_content = f"Exported Timestamp (UTC{utc_offset}): {utc_now.strftime('%Y-%m-%d %H:%M:%S')}\n" | ||
|
@@ -1419,6 +1418,7 @@ def process_category(self, category): | |
- List of resource dictionaries containing export information. | ||
""" | ||
category_name, category_data = list(category.items())[0] | ||
logging.info(f"Started Processing {category_name}") | ||
all_uploaded_resources = [] | ||
for feature_type in category_data.types: | ||
extract_query = extract_features_duckdb( | ||
|
@@ -1432,6 +1432,7 @@ def process_category(self, category): | |
) | ||
uploaded_resources = self.zip_to_s3(resources) | ||
all_uploaded_resources.extend(uploaded_resources) | ||
logging.info(f"Done Processing {category_name}") | ||
return all_uploaded_resources | ||
|
||
def resource_to_response(self, uploaded_resources, category): | ||
|
@@ -1522,6 +1523,8 @@ def process_hdx_tags(self): | |
Returns: | ||
- Dictionary containing the processed dataset information. | ||
""" | ||
started_at = datetime.now().isoformat() | ||
processing_time_start = time.time() | ||
table_type = [ | ||
cat_type | ||
for category in self.params.categories | ||
|
@@ -1536,7 +1539,13 @@ def process_hdx_tags(self): | |
self.cid, | ||
self.params.geometry, | ||
) | ||
start = time.time() | ||
logging.info("Transfer-> Postgres Data to DuckDB Started") | ||
self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True) | ||
logging.info( | ||
f"Transfer-> Postgres Data to DuckDB Done in {time.time()-start}s" | ||
) | ||
|
||
CategoryResult = namedtuple( | ||
"CategoryResult", ["category", "uploaded_resources"] | ||
) | ||
|
@@ -1566,7 +1575,7 @@ def process_hdx_tags(self): | |
category=self.params.categories[0], uploaded_resources=resources | ||
) | ||
tag_process_results.append(category_result) | ||
|
||
logging.info("Export generation is done, Moving forward to process result") | ||
with concurrent.futures.ThreadPoolExecutor() as executor: | ||
futures = { | ||
executor.submit(self.process_category_result, result): result | ||
|
@@ -1580,6 +1589,7 @@ def process_hdx_tags(self): | |
|
||
result = {"datasets": dataset_results} | ||
if self.params.meta: | ||
logging.info("Dumping Duck DB to Parquet") | ||
db_dump_path = os.path.join( | ||
self.default_export_path, | ||
"DB_DUMP", | ||
|
@@ -1594,6 +1604,18 @@ def process_hdx_tags(self): | |
) | ||
) | ||
result["db_dump"] = db_zip_download_url | ||
processing_time_close = time.time() | ||
result["elapsed_time"] = humanize.naturaldelta( | ||
timedelta(seconds=(processing_time_close - processing_time_start)) | ||
) | ||
result["started_at"] = started_at | ||
|
||
meta_last_run_dump_path = os.path.join( | ||
self.default_export_path, "meta_last_run.json" | ||
) | ||
with open(meta_last_run_dump_path, "w") as json_file: | ||
json.dump(result, json_file, indent=4) | ||
self.upload_to_s3(resource_path=meta_last_run_dump_path) | ||
self.clean_resources() | ||
return result | ||
|
||
|