From acdd32467b5f36e64a1690e151a107375503743d Mon Sep 17 00:00:00 2001 From: kshitijrajsharma Date: Tue, 26 Dec 2023 14:09:54 +0545 Subject: [PATCH] Enhances how parameter being passed to hdx commands , and tasks endpoints --- API/api_worker.py | 4 +++- API/hdx.py | 13 ++++++---- API/s3.py | 23 +++++++++++++++--- API/tasks.py | 52 ++++++++++++++++++++++++---------------- src/app.py | 19 +++++++-------- src/validation/models.py | 12 ++++++++-- 6 files changed, 82 insertions(+), 41 deletions(-) diff --git a/API/api_worker.py b/API/api_worker.py index 2f5374f2..005f7778 100644 --- a/API/api_worker.py +++ b/API/api_worker.py @@ -33,6 +33,7 @@ celery.conf.result_serializer = "pickle" celery.conf.accept_content = ["application/json", "application/x-python-serialize"] celery.conf.task_track_started = True +celery.conf.update(result_extended=True) @celery.task(bind=True, name="process_raw_data") @@ -194,7 +195,8 @@ def process_raw_data(self, params): @celery.task(bind=True, name="process_hdx_request") def process_hdx_request(self, params): - # params = DynamicCategoriesModel(**params) + params = DynamicCategoriesModel(**params) + if not params.dataset: params.dataset = DatasetConfig() hdx_object = HDX(params) diff --git a/API/hdx.py b/API/hdx.py index 05fa3748..7169fe78 100644 --- a/API/hdx.py +++ b/API/hdx.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Body, Depends, Request +from fastapi import APIRouter, Body, Depends, HTTPException, Request from fastapi.responses import JSONResponse from fastapi_versioning import version @@ -7,7 +7,7 @@ from src.validation.models import DynamicCategoriesModel from .api_worker import process_hdx_request -from .auth import AuthUser, staff_required +from .auth import AuthUser, UserRole, staff_required router = APIRouter(prefix="/hdx", tags=["HDX"]) @@ -691,8 +691,13 @@ async def process_hdx_requests( Returns: dict: Result message. """ - queue_name = "raw_special" + queue_name = params.queue + if params.queue != "raw_special" and user.role != UserRole.ADMIN.value: + raise HTTPException( + status_code=403, + detail=[{"msg": "Insufficient Permission to choose queue"}], + ) task = process_hdx_request.apply_async( - args=(params,), queue=queue_name, track_started=True + args=(params.model_dump(),), queue=queue_name, track_started=True ) return JSONResponse({"task_id": task.id, "track_link": f"/tasks/status/{task.id}/"}) diff --git a/API/s3.py b/API/s3.py index 1c4b8834..bb537b87 100644 --- a/API/s3.py +++ b/API/s3.py @@ -1,3 +1,4 @@ +import json from urllib.parse import quote import boto3 @@ -10,7 +11,7 @@ from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME from src.config import LIMITER as limiter -from src.config import POLYGON_STATISTICS_API_RATE_LIMIT +from src.config import RATE_LIMIT_PER_MIN router = APIRouter(prefix="/s3", tags=["S3"]) @@ -27,7 +28,7 @@ @router.get("/files/") -@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute") +@limiter.limit(f"{RATE_LIMIT_PER_MIN}/minute") @version(1) def list_s3_files( request: Request, @@ -89,7 +90,7 @@ def list_s3_files( @router.get("/get/{file_path:path}") -@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute") +@limiter.limit(f"{RATE_LIMIT_PER_MIN}/minute") @version(1) def get_s3_file( request: Request, @@ -100,6 +101,10 @@ def get_s3_file( gt=60 * 10, le=3600 * 12 * 7, ), + read_meta: bool = Query( + default=True, + description="Whether to read and deliver the content of .json file", + ), ): bucket_name = BUCKET_NAME file_path = file_path.strip("/") @@ -116,6 +121,18 @@ def get_s3_file( status_code=404, detail=f"File or folder not found: {file_path}" ) + if read_meta and file_path.lower().endswith(".json"): + # Read and deliver the content of meta.json + try: + response = s3.get_object(Bucket=bucket_name, Key=file_path) + content = json.loads(response["Body"].read()) + return JSONResponse(content=jsonable_encoder(content)) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Error reading meta.json: {str(e)}" + ) + + # If not reading meta.json, generate a presigned URL presigned_url = s3.generate_presigned_url( "get_object", Params={"Bucket": bucket_name, "Key": file_path}, diff --git a/API/tasks.py b/API/tasks.py index f66f6f3c..bc9de65a 100644 --- a/API/tasks.py +++ b/API/tasks.py @@ -1,7 +1,5 @@ +import html import json -import re -from datetime import datetime -from typing import List import redis from celery.result import AsyncResult @@ -24,7 +22,13 @@ @router.get("/status/{task_id}/", response_model=SnapshotTaskResponse) @version(1) -def get_task_status(task_id): +def get_task_status( + task_id, + args: bool = Query( + default=False, + description="Displays argument of task as well", + ), +): """Tracks the request from the task id provided by Raw Data API for the request Args: @@ -41,11 +45,19 @@ def get_task_status(task_id): """ task_result = AsyncResult(task_id, app=celery) + task_response_result = None + if task_result.status == "SUCCESS": + task_response_result = task_result.result + if task_result.status == "FAILED": + task_response_result = html.escape(task_result.traceback) + result = { "id": task_id, "status": task_result.state, - "result": task_result.result if task_result.status == "SUCCESS" else None, + "result": task_response_result, } + if args: + result["args"] = task_result.args return JSONResponse(result) @@ -66,7 +78,7 @@ def revoke_task(task_id, user: AuthUser = Depends(staff_required)): @router.get("/inspect/") @version(1) -def inspect_workers( +async def inspect_workers( request: Request, summary: bool = Query( default=True, @@ -83,19 +95,20 @@ def inspect_workers( active_tasks_summary = [] if summary: - for worker, tasks in active_tasks.items(): - for task in tasks: - temp_task = {} - temp_task["id"] = task["id"] - temp_task["name"] = task["name"] - temp_task["time_start"] = ( - datetime.fromtimestamp(task["time_start"]).strftime( - "%Y-%m-%d %H:%M:%S" + if active_tasks: + for worker, tasks in active_tasks.items(): + for task in tasks: + temp_task = {} + temp_task["id"] = task["id"] + temp_task["name"] = task["name"] + temp_task["time_start"] = ( + datetime.fromtimestamp(task["time_start"]).strftime( + "%Y-%m-%d %H:%M:%S" + ) + if task["time_start"] + else None ) - if task["time_start"] - else None - ) - active_tasks_summary.append(temp_task) + active_tasks_summary.append(temp_task) response_data = { "active": active_tasks_summary if summary else active_tasks, @@ -146,7 +159,7 @@ def get_queue_info(): @router.get("/queue/details/{queue_name}/") @version(1) -def get_list_details(queue_name: str): +async def get_list_details(queue_name: str): if queue_name not in queues: raise HTTPException(status_code=404, detail=f"Queue '{queue_name}' not found") @@ -155,7 +168,6 @@ def get_list_details(queue_name: str): # Convert bytes to strings list_items = [item.decode("utf-8") for item in list_items] - # Create a list of dictionaries with item details items_details = [ { "index": index, diff --git a/src/app.py b/src/app.py index 5ea4e72b..efd8f5ee 100644 --- a/src/app.py +++ b/src/app.py @@ -74,7 +74,7 @@ postgres2duckdb_query, raw_currentdata_extraction_query, ) -from src.validation.models import RawDataOutputType +from src.validation.models import EXPORT_TYPE_MAPPING, RawDataOutputType # import instance for pooling if use_connection_pooling: @@ -1202,7 +1202,7 @@ def __init__(self, params): self.default_export_path = os.path.join( export_path, self.uuid, - "HDX", + self.params.dataset.dataset_folder, self.iso3.upper() if self.iso3 else self.params.dataset.dataset_prefix, ) if os.path.exists(self.default_export_path): @@ -1350,6 +1350,7 @@ def query_to_file(self, query, category_name, feature_type, export_formats): start_export_formats_time = time.time() def process_export_format(export_format): + export_format = EXPORT_TYPE_MAPPING.get(export_format) export_format_path = os.path.join(file_export_path, export_format.suffix) os.makedirs(export_format_path, exist_ok=True) logging.info( @@ -1578,7 +1579,7 @@ def process_hdx_tags(self): geometry=self.params.geometry, single_category_where=where_0_category, ) - print(create_table) + logging.debug(create_table) start = time.time() logging.info("Transfer-> Postgres Data to DuckDB Started : %s", table) self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True) @@ -1653,9 +1654,7 @@ def process_hdx_tags(self): ) result["started_at"] = started_at - meta_last_run_dump_path = os.path.join( - self.default_export_path, "meta_last_run.json" - ) + meta_last_run_dump_path = os.path.join(self.default_export_path, "meta.json") with open(meta_last_run_dump_path, "w") as json_file: json.dump(result, json_file, indent=4) self.upload_to_s3(resource_path=meta_last_run_dump_path) @@ -1756,11 +1755,9 @@ def upload_dataset(self, dump_config_to_s3=False): if self.dataset: dataset_info = {} dt_config_path = os.path.join( - self.category_path, f"{self.dataset['name']}.json" - ) - self.dataset.save_to_json( - os.path.join(self.category_path, f"{self.dataset['name']}.json") + self.category_path, f"{self.dataset['name']}_config.json" ) + self.dataset.save_to_json(dt_config_path) if dump_config_to_s3: s3_upload_name = os.path.relpath( dt_config_path, os.path.join(export_path, self.uuid) @@ -1799,7 +1796,7 @@ def init_dataset(self): "methodology": "Other", "methodology_other": "Volunteered geographic information", "license_id": "hdx-odc-odbl", - "updated_by_script": f'Hotosm OSM Exports ({datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}', + "updated_by_script": f'Hotosm OSM Exports ({datetime.now().strftime("%Y-%m-%dT%H:%M:%S")})', "caveats": self.category_data.hdx.caveats, "private": self.hdx.private, "notes": self.add_notes(), diff --git a/src/validation/models.py b/src/validation/models.py index 0ea0219b..c4df904e 100644 --- a/src/validation/models.py +++ b/src/validation/models.py @@ -410,7 +410,7 @@ def validate_export_types(cls, value): for export_type in value: if export_type not in EXPORT_TYPE_MAPPING: raise ValueError(f"Unsupported export type: {export_type}") - return [EXPORT_TYPE_MAPPING[export_type] for export_type in value] + return value class ExportTypeInfo: @@ -487,6 +487,9 @@ class DatasetConfig(BaseModel): description="Valid dataset locations iso3", example="['npl']", ) + dataset_folder: str = Field( + default="HDX", description="Default base folder for the exports", example="HDX" + ) @validator("update_frequency") def validate_frequency(cls, value): @@ -531,12 +534,17 @@ class DynamicCategoriesModel(BaseModel): dataset: Optional[DatasetConfig] = Field( default=None, description="Dataset Configurations for HDX Upload" ) + queue: Optional[str] = Field( + default="raw_special", + description="Lets you decide which queue you wanna place your task, Requires admin access", + ) meta: bool = Field( default=False, description="Dumps Meta db in parquet format & hdx config json to s3", ) hdx_upload: bool = Field( - default=True, description="Enable/Disable uploading dataset to hdx" + default=False, + description="Enable/Disable uploading dataset to hdx, False by default", ) categories: List[Dict[str, CategoryModel]] = Field(