Skip to content

Commit

Permalink
Enhances how parameter being passed to hdx commands , and tasks endpo…
Browse files Browse the repository at this point in the history
…ints
  • Loading branch information
kshitijrajsharma committed Dec 26, 2023
1 parent 88c4c23 commit acdd324
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 41 deletions.
4 changes: 3 additions & 1 deletion API/api_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
celery.conf.result_serializer = "pickle"
celery.conf.accept_content = ["application/json", "application/x-python-serialize"]
celery.conf.task_track_started = True
celery.conf.update(result_extended=True)


@celery.task(bind=True, name="process_raw_data")
Expand Down Expand Up @@ -194,7 +195,8 @@ def process_raw_data(self, params):

@celery.task(bind=True, name="process_hdx_request")
def process_hdx_request(self, params):
# params = DynamicCategoriesModel(**params)
params = DynamicCategoriesModel(**params)

if not params.dataset:
params.dataset = DatasetConfig()
hdx_object = HDX(params)
Expand Down
13 changes: 9 additions & 4 deletions API/hdx.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import APIRouter, Body, Depends, Request
from fastapi import APIRouter, Body, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi_versioning import version

Expand All @@ -7,7 +7,7 @@
from src.validation.models import DynamicCategoriesModel

from .api_worker import process_hdx_request
from .auth import AuthUser, staff_required
from .auth import AuthUser, UserRole, staff_required

router = APIRouter(prefix="/hdx", tags=["HDX"])

Expand Down Expand Up @@ -691,8 +691,13 @@ async def process_hdx_requests(
Returns:
dict: Result message.
"""
queue_name = "raw_special"
queue_name = params.queue
if params.queue != "raw_special" and user.role != UserRole.ADMIN.value:
raise HTTPException(
status_code=403,
detail=[{"msg": "Insufficient Permission to choose queue"}],
)
task = process_hdx_request.apply_async(
args=(params,), queue=queue_name, track_started=True
args=(params.model_dump(),), queue=queue_name, track_started=True
)
return JSONResponse({"task_id": task.id, "track_link": f"/tasks/status/{task.id}/"})
23 changes: 20 additions & 3 deletions API/s3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from urllib.parse import quote

import boto3
Expand All @@ -10,7 +11,7 @@

from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME
from src.config import LIMITER as limiter
from src.config import POLYGON_STATISTICS_API_RATE_LIMIT
from src.config import RATE_LIMIT_PER_MIN

router = APIRouter(prefix="/s3", tags=["S3"])

Expand All @@ -27,7 +28,7 @@


@router.get("/files/")
@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute")
@limiter.limit(f"{RATE_LIMIT_PER_MIN}/minute")
@version(1)
def list_s3_files(
request: Request,
Expand Down Expand Up @@ -89,7 +90,7 @@ def list_s3_files(


@router.get("/get/{file_path:path}")
@limiter.limit(f"{POLYGON_STATISTICS_API_RATE_LIMIT}/minute")
@limiter.limit(f"{RATE_LIMIT_PER_MIN}/minute")
@version(1)
def get_s3_file(
request: Request,
Expand All @@ -100,6 +101,10 @@ def get_s3_file(
gt=60 * 10,
le=3600 * 12 * 7,
),
read_meta: bool = Query(
default=True,
description="Whether to read and deliver the content of .json file",
),
):
bucket_name = BUCKET_NAME
file_path = file_path.strip("/")
Expand All @@ -116,6 +121,18 @@ def get_s3_file(
status_code=404, detail=f"File or folder not found: {file_path}"
)

if read_meta and file_path.lower().endswith(".json"):
# Read and deliver the content of meta.json
try:
response = s3.get_object(Bucket=bucket_name, Key=file_path)
content = json.loads(response["Body"].read())
return JSONResponse(content=jsonable_encoder(content))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error reading meta.json: {str(e)}"
)

# If not reading meta.json, generate a presigned URL
presigned_url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_path},
Expand Down
52 changes: 32 additions & 20 deletions API/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import html
import json
import re
from datetime import datetime
from typing import List

import redis
from celery.result import AsyncResult
Expand All @@ -24,7 +22,13 @@

@router.get("/status/{task_id}/", response_model=SnapshotTaskResponse)
@version(1)
def get_task_status(task_id):
def get_task_status(
task_id,
args: bool = Query(
default=False,
description="Displays argument of task as well",
),
):
"""Tracks the request from the task id provided by Raw Data API for the request
Args:
Expand All @@ -41,11 +45,19 @@ def get_task_status(task_id):
"""
task_result = AsyncResult(task_id, app=celery)
task_response_result = None
if task_result.status == "SUCCESS":
task_response_result = task_result.result
if task_result.status == "FAILED":
task_response_result = html.escape(task_result.traceback)

result = {
"id": task_id,
"status": task_result.state,
"result": task_result.result if task_result.status == "SUCCESS" else None,
"result": task_response_result,
}
if args:
result["args"] = task_result.args
return JSONResponse(result)


Expand All @@ -66,7 +78,7 @@ def revoke_task(task_id, user: AuthUser = Depends(staff_required)):

@router.get("/inspect/")
@version(1)
def inspect_workers(
async def inspect_workers(
request: Request,
summary: bool = Query(
default=True,
Expand All @@ -83,19 +95,20 @@ def inspect_workers(
active_tasks_summary = []

if summary:
for worker, tasks in active_tasks.items():
for task in tasks:
temp_task = {}
temp_task["id"] = task["id"]
temp_task["name"] = task["name"]
temp_task["time_start"] = (
datetime.fromtimestamp(task["time_start"]).strftime(
"%Y-%m-%d %H:%M:%S"
if active_tasks:
for worker, tasks in active_tasks.items():
for task in tasks:
temp_task = {}
temp_task["id"] = task["id"]
temp_task["name"] = task["name"]
temp_task["time_start"] = (
datetime.fromtimestamp(task["time_start"]).strftime(
"%Y-%m-%d %H:%M:%S"
)
if task["time_start"]
else None
)
if task["time_start"]
else None
)
active_tasks_summary.append(temp_task)
active_tasks_summary.append(temp_task)

response_data = {
"active": active_tasks_summary if summary else active_tasks,
Expand Down Expand Up @@ -146,7 +159,7 @@ def get_queue_info():

@router.get("/queue/details/{queue_name}/")
@version(1)
def get_list_details(queue_name: str):
async def get_list_details(queue_name: str):
if queue_name not in queues:
raise HTTPException(status_code=404, detail=f"Queue '{queue_name}' not found")

Expand All @@ -155,7 +168,6 @@ def get_list_details(queue_name: str):
# Convert bytes to strings
list_items = [item.decode("utf-8") for item in list_items]

# Create a list of dictionaries with item details
items_details = [
{
"index": index,
Expand Down
19 changes: 8 additions & 11 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
postgres2duckdb_query,
raw_currentdata_extraction_query,
)
from src.validation.models import RawDataOutputType
from src.validation.models import EXPORT_TYPE_MAPPING, RawDataOutputType

# import instance for pooling
if use_connection_pooling:
Expand Down Expand Up @@ -1202,7 +1202,7 @@ def __init__(self, params):
self.default_export_path = os.path.join(
export_path,
self.uuid,
"HDX",
self.params.dataset.dataset_folder,
self.iso3.upper() if self.iso3 else self.params.dataset.dataset_prefix,
)
if os.path.exists(self.default_export_path):
Expand Down Expand Up @@ -1350,6 +1350,7 @@ def query_to_file(self, query, category_name, feature_type, export_formats):
start_export_formats_time = time.time()

def process_export_format(export_format):
export_format = EXPORT_TYPE_MAPPING.get(export_format)
export_format_path = os.path.join(file_export_path, export_format.suffix)
os.makedirs(export_format_path, exist_ok=True)
logging.info(
Expand Down Expand Up @@ -1578,7 +1579,7 @@ def process_hdx_tags(self):
geometry=self.params.geometry,
single_category_where=where_0_category,
)
print(create_table)
logging.debug(create_table)
start = time.time()
logging.info("Transfer-> Postgres Data to DuckDB Started : %s", table)
self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True)
Expand Down Expand Up @@ -1653,9 +1654,7 @@ def process_hdx_tags(self):
)
result["started_at"] = started_at

meta_last_run_dump_path = os.path.join(
self.default_export_path, "meta_last_run.json"
)
meta_last_run_dump_path = os.path.join(self.default_export_path, "meta.json")
with open(meta_last_run_dump_path, "w") as json_file:
json.dump(result, json_file, indent=4)
self.upload_to_s3(resource_path=meta_last_run_dump_path)
Expand Down Expand Up @@ -1756,11 +1755,9 @@ def upload_dataset(self, dump_config_to_s3=False):
if self.dataset:
dataset_info = {}
dt_config_path = os.path.join(
self.category_path, f"{self.dataset['name']}.json"
)
self.dataset.save_to_json(
os.path.join(self.category_path, f"{self.dataset['name']}.json")
self.category_path, f"{self.dataset['name']}_config.json"
)
self.dataset.save_to_json(dt_config_path)
if dump_config_to_s3:
s3_upload_name = os.path.relpath(
dt_config_path, os.path.join(export_path, self.uuid)
Expand Down Expand Up @@ -1799,7 +1796,7 @@ def init_dataset(self):
"methodology": "Other",
"methodology_other": "Volunteered geographic information",
"license_id": "hdx-odc-odbl",
"updated_by_script": f'Hotosm OSM Exports ({datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}',
"updated_by_script": f'Hotosm OSM Exports ({datetime.now().strftime("%Y-%m-%dT%H:%M:%S")})',
"caveats": self.category_data.hdx.caveats,
"private": self.hdx.private,
"notes": self.add_notes(),
Expand Down
12 changes: 10 additions & 2 deletions src/validation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def validate_export_types(cls, value):
for export_type in value:
if export_type not in EXPORT_TYPE_MAPPING:
raise ValueError(f"Unsupported export type: {export_type}")
return [EXPORT_TYPE_MAPPING[export_type] for export_type in value]
return value


class ExportTypeInfo:
Expand Down Expand Up @@ -487,6 +487,9 @@ class DatasetConfig(BaseModel):
description="Valid dataset locations iso3",
example="['npl']",
)
dataset_folder: str = Field(
default="HDX", description="Default base folder for the exports", example="HDX"
)

@validator("update_frequency")
def validate_frequency(cls, value):
Expand Down Expand Up @@ -531,12 +534,17 @@ class DynamicCategoriesModel(BaseModel):
dataset: Optional[DatasetConfig] = Field(
default=None, description="Dataset Configurations for HDX Upload"
)
queue: Optional[str] = Field(
default="raw_special",
description="Lets you decide which queue you wanna place your task, Requires admin access",
)
meta: bool = Field(
default=False,
description="Dumps Meta db in parquet format & hdx config json to s3",
)
hdx_upload: bool = Field(
default=True, description="Enable/Disable uploading dataset to hdx"
default=False,
description="Enable/Disable uploading dataset to hdx, False by default",
)

categories: List[Dict[str, CategoryModel]] = Field(
Expand Down

0 comments on commit acdd324

Please sign in to comment.