From 8361a18ec76e506f22f638e27a111bc4d60da70d Mon Sep 17 00:00:00 2001 From: kshitijrajsharma Date: Sat, 23 Dec 2023 11:55:05 +0545 Subject: [PATCH] Adds s3 api link for listing dir and downloading files --- API/main.py | 6 +++ API/s3.py | 132 +++++++++++++++++++++++++++++++++++++++++++++++++++ API/tasks.py | 3 -- src/app.py | 32 +++++++++++-- 4 files changed, 165 insertions(+), 8 deletions(-) create mode 100644 API/s3.py diff --git a/API/main.py b/API/main.py index b1323244..3da1400d 100644 --- a/API/main.py +++ b/API/main.py @@ -44,6 +44,9 @@ from .raw_data import router as raw_data_router from .tasks import router as tasks_router +if USE_S3_TO_UPLOAD: + from .s3 import router as s3_router + if ENABLE_POLYGON_STATISTICS_ENDPOINTS: from .stats import router as stats_router @@ -76,6 +79,9 @@ if ENABLE_POLYGON_STATISTICS_ENDPOINTS: app.include_router(stats_router) +if USE_S3_TO_UPLOAD: + app.include_router(s3_router) + app.openapi = { "info": { "title": "Raw Data API", diff --git a/API/s3.py b/API/s3.py new file mode 100644 index 00000000..cc84fae6 --- /dev/null +++ b/API/s3.py @@ -0,0 +1,132 @@ +from urllib.parse import quote + +import boto3 +import humanize +from botocore.exceptions import NoCredentialsError +from fastapi import APIRouter, HTTPException, Path, Query, Request +from fastapi.encoders import jsonable_encoder +from fastapi.responses import JSONResponse +from fastapi_versioning import version + +from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME +from src.config import LIMITER as limiter +from src.config import POLYGON_STATISTICS_API_RATE_LIMIT + +router = APIRouter(prefix="/s3", tags=["S3"]) + + +AWS_REGION = "us-east-1" + +s3 = boto3.client( + "s3", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + region_name=AWS_REGION, +) +paginator = s3.get_paginator("list_objects_v2") + + +@router.get("/files/") +@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute") +@version(1) +def list_s3_files( + request: Request, + folder: str = Query(default="/HDX"), + page: int = Query(default=1, ge=1, description="Page number"), + page_size: int = Query(default=10, le=100, description="Items per page"), + prettify: bool = Query( + default=False, description="Display size & date in human-readable format" + ), +): + bucket_name = BUCKET_NAME + folder = folder.strip("/") + prefix = f"{folder}/" + + result = [] + + try: + # Create a paginator for list_objects_v2 + page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) + + # Paginate the results + start_index = (page - 1) * page_size + end_index = start_index + page_size + + for response in page_iterator: + contents = response.get("Contents", []) + + for item in contents: + size = item["Size"] + last_modified = item["LastModified"].strftime("%Y-%m-%dT%H:%M:%SZ") + if prettify: + last_modified = humanize.naturaldate(item["LastModified"]) + size = humanize.naturalsize(size) + + item_dict = { + "Key": item["Key"], + "LastModified": last_modified, + "Size": size, + } + result.append(item_dict) + + paginated_result = result[start_index:end_index] + + # Calculate total number of pages + total_pages = -(-len(result) // page_size) # Ceiling division + + # Include pagination information in the response + response_data = { + "current_page": page, + "total_pages": total_pages, + "items_per_page": page_size, + "items": paginated_result, + } + + return JSONResponse(content=jsonable_encoder(response_data)) + + except NoCredentialsError: + raise HTTPException(status_code=500, detail="AWS credentials not available") + + +@router.get("/s3-files/{file_path:path}") +@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute") +@version(1) +def get_s3_file( + request: Request, + file_path: str = Path(..., description="The path to the file or folder in S3"), +): + bucket_name = BUCKET_NAME + file_path = file_path.strip("/") + encoded_file_path = quote(file_path) + try: + # Check if the file or folder exists + s3.head_object(Bucket=bucket_name, Key=encoded_file_path) + except NoCredentialsError: + raise HTTPException(status_code=500, detail="AWS credentials not available") + except Exception as e: + print(e) + raise HTTPException( + status_code=404, detail=f"File or folder not found: {file_path}" + ) + + # If it's a folder, list its contents + if file_path.endswith("/"): + contents = list_objects(bucket_name, file_path) + result = [] + for item in contents: + item_dict = { + "name": item["Key"].split("/")[-1], + "last_modified": item["LastModified"], + "is_folder": item["Key"].endswith("/"), + } + result.append(item_dict) + return JSONResponse(content=jsonable_encoder(result)) + + # If it's a file, return the S3 download link + else: + presigned_url = s3.generate_presigned_url( + "get_object", + Params={"Bucket": bucket_name, "Key": file_path}, + ExpiresIn=3600, # URL expires in 1 hour + ) + return JSONResponse(content=jsonable_encoder({"download_link": presigned_url})) diff --git a/API/tasks.py b/API/tasks.py index f78f828d..a93b0dc6 100644 --- a/API/tasks.py +++ b/API/tasks.py @@ -37,9 +37,6 @@ def get_task_status(task_id): "id": task_id, "status": task_result.state, "result": task_result.result if task_result.status == "SUCCESS" else None, - "finished_at": task_result.date_done.strftime("%Y-%m-%dT%H:%M:%S") - if task_result.date_done - else None, } return JSONResponse(result) diff --git a/src/app.py b/src/app.py index c0e27cd2..61c02401 100644 --- a/src/app.py +++ b/src/app.py @@ -18,6 +18,7 @@ # """Page contains Main core logic of app""" import concurrent.futures +import json import os import pathlib import re @@ -27,9 +28,7 @@ import time import uuid from collections import namedtuple -from datetime import datetime -from datetime import datetime as dt -from datetime import timezone +from datetime import datetime, timedelta, timezone from json import dumps from json import loads as json_loads @@ -1307,7 +1306,7 @@ def file_to_zip(self, working_dir, zip_path): ) for file_path in pathlib.Path(working_dir).iterdir(): zf.write(file_path, arcname=file_path.name) - utc_now = dt.now(timezone.utc) + utc_now = datetime.now(timezone.utc) utc_offset = utc_now.strftime("%z") # Adding metadata readme.txt readme_content = f"Exported Timestamp (UTC{utc_offset}): {utc_now.strftime('%Y-%m-%d %H:%M:%S')}\n" @@ -1419,6 +1418,7 @@ def process_category(self, category): - List of resource dictionaries containing export information. """ category_name, category_data = list(category.items())[0] + logging.info(f"Started Processing {category_name}") all_uploaded_resources = [] for feature_type in category_data.types: extract_query = extract_features_duckdb( @@ -1432,6 +1432,7 @@ def process_category(self, category): ) uploaded_resources = self.zip_to_s3(resources) all_uploaded_resources.extend(uploaded_resources) + logging.info(f"Done Processing {category_name}") return all_uploaded_resources def resource_to_response(self, uploaded_resources, category): @@ -1522,6 +1523,8 @@ def process_hdx_tags(self): Returns: - Dictionary containing the processed dataset information. """ + started_at = datetime.now().isoformat() + processing_time_start = time.time() table_type = [ cat_type for category in self.params.categories @@ -1536,7 +1539,13 @@ def process_hdx_tags(self): self.cid, self.params.geometry, ) + start = time.time() + logging.info("Transfer-> Postgres Data to DuckDB Started") self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True) + logging.info( + f"Transfer-> Postgres Data to DuckDB Done in {time.time()-start}s" + ) + CategoryResult = namedtuple( "CategoryResult", ["category", "uploaded_resources"] ) @@ -1566,7 +1575,7 @@ def process_hdx_tags(self): category=self.params.categories[0], uploaded_resources=resources ) tag_process_results.append(category_result) - + logging.info("Export generation is done, Moving forward to process result") with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(self.process_category_result, result): result @@ -1580,6 +1589,7 @@ def process_hdx_tags(self): result = {"datasets": dataset_results} if self.params.meta: + logging.info("Dumping Duck DB to Parquet") db_dump_path = os.path.join( self.default_export_path, "DB_DUMP", @@ -1594,6 +1604,18 @@ def process_hdx_tags(self): ) ) result["db_dump"] = db_zip_download_url + processing_time_close = time.time() + result["elapsed_time"] = humanize.naturaldelta( + timedelta(seconds=(processing_time_close - processing_time_start)) + ) + result["started_at"] = started_at + + meta_last_run_dump_path = os.path.join( + self.default_export_path, "meta_last_run.json" + ) + with open(meta_last_run_dump_path, "w") as json_file: + json.dump(result, json_file, indent=4) + self.upload_to_s3(resource_path=meta_last_run_dump_path) self.clean_resources() return result