Skip to content

Commit

Permalink
Adds size in exports and enhanced query for fastapi
Browse files Browse the repository at this point in the history
  • Loading branch information
kshitijrajsharma committed Jan 12, 2024
1 parent b561e33 commit 80c91a7
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 57 deletions.
29 changes: 18 additions & 11 deletions API/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
@version(1)
def get_task_status(
task_id,
args: bool = Query(
only_args: bool = Query(
default=False,
description="Displays argument of task as well",
description="Fetches arguments of task",
),
):
"""Tracks the request from the task id provided by Raw Data API for the request
Expand Down Expand Up @@ -62,6 +62,8 @@ def get_task_status(
"""
task_result = AsyncResult(task_id, app=celery)
if only_args:
return JSONResponse(task_result.args)
task_response_result = None
if task_result.status == "SUCCESS":
task_response_result = task_result.result
Expand All @@ -73,8 +75,6 @@ def get_task_status(
"status": task_result.state,
"result": task_response_result,
}
if args:
result["args"] = task_result.args
return JSONResponse(result)


Expand Down Expand Up @@ -114,18 +114,19 @@ def inspect_workers(
if summary:
if active_tasks:
for worker, tasks in active_tasks.items():
worker_tasks = {worker: {}}

for task in tasks:
temp_task = {}
temp_task["id"] = task["id"]
temp_task["name"] = task["name"]
temp_task["time_start"] = (
worker_tasks[worker]["id"] = task["id"]
worker_tasks[worker]["task"] = task["name"]
worker_tasks[worker]["time_start"] = (
datetime.fromtimestamp(task["time_start"]).strftime(
"%Y-%m-%d %H:%M:%S"
)
if task["time_start"]
else None
)
active_tasks_summary.append(temp_task)
active_tasks_summary.append(worker_tasks)

response_data = {
"active": active_tasks_summary if summary else active_tasks,
Expand Down Expand Up @@ -177,7 +178,13 @@ def get_queue_info():

@router.get("/queue/details/{queue_name}/")
@version(1)
def get_list_details(queue_name: str):
def get_list_details(
queue_name: str,
args: bool = Query(
default=False,
description="Includes arguments of task",
),
):
if queue_name not in queues:
raise HTTPException(status_code=404, detail=f"Queue '{queue_name}' not found")
redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL)
Expand All @@ -191,7 +198,7 @@ def get_list_details(queue_name: str):
{
"index": index,
"id": json.loads(item)["headers"]["id"],
"args": json.loads(item)["headers"]["argsrepr"],
**({"args": json.loads(item)["headers"]["argsrepr"]} if args else {}),
}
for index, item in enumerate(list_items)
]
Expand Down
2 changes: 2 additions & 0 deletions docs/src/installation/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The following are the different configuration options that are accepted.
| `HDX_SOFT_TASK_LIMIT` | `HDX_SOFT_TASK_LIMIT` | `[HDX]` | `18000` | Soft task time limit signal for celery workers in seconds.It will gently remind celery to finish up the task and terminate, Defaults to 5 Hour| OPTIONAL |
| `HDX_HARD_TASK_LIMIT` | `HDX_HARD_TASK_LIMIT` | `[HDX]` | `21600` | Hard task time limit signal for celery workers in seconds. It will immediately kill the celery task.Defaults to 6 Hour| OPTIONAL |
| `PROCESS_SINGLE_CATEGORY_IN_POSTGRES` | `PROCESS_SINGLE_CATEGORY_IN_POSTGRES` | `[HDX]` | False | Recommended for workers with low memery or CPU usage , This will process single category request like buildings only , Roads only in postgres itself and avoid extraction from duckdb| OPTIONAL |
| `PARALLEL_PROCESSING_CATEGORIES` | `PARALLEL_PROCESSING_CATEGORIES` | `[HDX]` | True | Enable parallel processing for mulitple categories and export formats , Disable this if you have single cpu and limited RAM , Enabled by default| OPTIONAL |

## Which Service uses which settings?

Expand Down Expand Up @@ -127,6 +128,7 @@ The following are the different configuration options that are accepted.
| `HDX_SOFT_TASK_LIMIT` | `[HDX]` | No | Yes |
| `HDX_HARD_TASK_LIMIT` | `[HDX]` | No | Yes |
| `PROCESS_SINGLE_CATEGORY_IN_POSTGRES` | `[HDX]` | No | Yes |
| `PARALLEL_PROCESSING_CATEGORIES` | `[HDX]` | No | Yes |



Expand Down
83 changes: 42 additions & 41 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@
)
from src.config import EXPORT_PATH as export_path
from src.config import INDEX_THRESHOLD as index_threshold
from src.config import POLYGON_STATISTICS_API_URL, PROCESS_SINGLE_CATEGORY_IN_POSTGRES
from src.config import (
PARALLEL_PROCESSING_CATEGORIES,
POLYGON_STATISTICS_API_URL,
PROCESS_SINGLE_CATEGORY_IN_POSTGRES,
)
from src.config import USE_CONNECTION_POOLING as use_connection_pooling
from src.config import USE_S3_TO_UPLOAD, get_db_connection_params, level
from src.config import logger as logging
Expand Down Expand Up @@ -1302,10 +1306,9 @@ def zip_to_s3(self, resources):
- List of resource dictionaries with added download URLs.
"""
for resource in resources:
resource["download_url"] = self.upload_to_s3(
resource_path=resource["zip_path"]
)
os.remove(resource["zip_path"])
temp_zip_path = resource["url"]
resource["url"] = self.upload_to_s3(resource_path=temp_zip_path)
os.remove(temp_zip_path)
return resources

def file_to_zip(self, working_dir, zip_path):
Expand All @@ -1325,6 +1328,7 @@ def file_to_zip(self, working_dir, zip_path):
compression=zipfile.ZIP_DEFLATED,
chunk_size=zipfile.SOZIP_DEFAULT_CHUNK_SIZE,
)

for file_path in pathlib.Path(working_dir).iterdir():
zf.write(file_path, arcname=file_path.name)
utc_now = datetime.now(timezone.utc)
Expand All @@ -1334,6 +1338,8 @@ def file_to_zip(self, working_dir, zip_path):
readme_content += "Exported through Raw-data-api (https://github.com/hotosm/raw-data-api) using OpenStreetMap data.\n"
readme_content += "Learn more about OpenStreetMap and its data usage policy : https://www.openstreetmap.org/about \n"
zf.writestr("Readme.txt", readme_content)
if self.params.geometry:
zf.writestr("clipping_boundary.geojson", self.params.geometry.json())
zf.close()
shutil.rmtree(working_dir)
return zip_path
Expand Down Expand Up @@ -1362,6 +1368,7 @@ def process_export_format(export_format):
export_format = EXPORT_TYPE_MAPPING.get(export_format)
export_format_path = os.path.join(file_export_path, export_format.suffix)
os.makedirs(export_format_path, exist_ok=True)
start = time.time()
logging.info(
"Processing %s:%s", category_name.lower(), export_format.suffix
)
Expand All @@ -1385,15 +1392,27 @@ def process_export_format(export_format):
self.duck_db_instance.run_query(executable_query.strip(), load_spatial=True)
zip_file_path = os.path.join(file_export_path, f"{export_filename}.zip")
zip_path = self.file_to_zip(export_format_path, zip_file_path)

resource = {}
resource["filename"] = f"{export_filename}.zip"
resource["zip_path"] = zip_path
resource["format_suffix"] = export_format.suffix
resource["format_description"] = export_format.driver_name
logging.info("Done %s:%s", category_name.lower(), export_format.suffix)
resource["name"] = f"{export_filename}.zip"
resource["url"] = zip_path
resource["format"] = export_format.suffix
resource["description"] = export_format.driver_name
resource["size"] = os.path.getsize(zip_path)
resource["last_modifed"] = datetime.now().isoformat()
logging.info(
"Done %s:%s in %s",
category_name.lower(),
export_format.suffix,
humanize.naturaldelta(timedelta(seconds=(time.time() - start))),
)
return resource

if self.parallel_process_state is False and len(export_formats) > 1:
if (
self.parallel_process_state is False
and len(export_formats) > 1
and PARALLEL_PROCESSING_CATEGORIES is True
):
logging.info(
"Using Parallel Processing for %s Export formats", category_name.lower()
)
Expand Down Expand Up @@ -1465,7 +1484,10 @@ def process_category(self, category):
geometry=self.params.geometry if self.params.geometry else None,
)
resources = self.query_to_file(
extract_query, category_name, feature_type, category_data.formats
extract_query,
category_name,
feature_type,
list(set(category_data.formats)),
)
uploaded_resources = self.zip_to_s3(resources)
all_uploaded_resources.extend(uploaded_resources)
Expand All @@ -1490,20 +1512,7 @@ def resource_to_response(self, uploaded_resources, category):
- Dictionary containing the response information.
"""
category_name, category_data = list(category.items())[0]

dataset_info = {}
resources = []
for resource in uploaded_resources:
resource_meta = {
"name": resource["filename"],
"format": resource["format_suffix"],
"description": resource["format_description"],
"url": resource["download_url"],
}
resource_meta["uploaded_to_hdx"]: False
resources.append(resource_meta)
dataset_info["resources"] = resources
return {category_name: dataset_info}
return {category_name: {"resources": uploaded_resources}}

def resource_to_hdx(self, uploaded_resources, dataset_config, category):
"""
Expand All @@ -1518,8 +1527,7 @@ def resource_to_hdx(self, uploaded_resources, dataset_config, category):
- Dictionary containing the HDX upload information.
"""
if any(
item["format_suffix"] in self.HDX_SUPPORTED_FORMATS
for item in uploaded_resources
item["format"] in self.HDX_SUPPORTED_FORMATS for item in uploaded_resources
):
uploader = HDXUploader(
hdx=dataset_config,
Expand All @@ -1541,18 +1549,11 @@ def resource_to_hdx(self, uploaded_resources, dataset_config, category):
uploader.init_dataset()
non_hdx_resources = []
for resource in uploaded_resources:
resource_meta = {
"name": resource["filename"],
"format": resource["format_suffix"],
"description": resource["format_description"],
"url": resource["download_url"],
"last_modifed": datetime.now().isoformat(),
}
if resource["format_suffix"] in self.HDX_SUPPORTED_FORMATS:
uploader.add_resource(resource_meta)
if resource["format"] in self.HDX_SUPPORTED_FORMATS:
uploader.add_resource(resource)
resource["uploaded_to_hdx"] = True
else:
resource_meta["uploaded_to_hdx"]: False
non_hdx_resources.append(resource_meta)
non_hdx_resources.append(resource)
category_name, hdx_dataset_info = uploader.upload_dataset(self.params.meta)
hdx_dataset_info["resources"].extend(non_hdx_resources)
return {category_name: hdx_dataset_info}
Expand Down Expand Up @@ -1606,7 +1607,7 @@ def process_hdx_tags(self):
logging.info("Transfer-> Postgres Data to DuckDB Started : %s", table)
self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True)
logging.info(
"Transfer-> Postgres Data to DuckDB : %s Done in %s s",
"Transfer-> Postgres Data to DuckDB : %s Done in %s",
table,
humanize.naturaldelta(timedelta(seconds=(time.time() - start))),
)
Expand All @@ -1617,7 +1618,7 @@ def process_hdx_tags(self):

tag_process_results = []
dataset_results = []
if len(self.params.categories) > 1:
if len(self.params.categories) > 1 and PARALLEL_PROCESSING_CATEGORIES is True:
self.parallel_process_state = True
logging.info("Starting to Use Parallel Processes")
with concurrent.futures.ThreadPoolExecutor(
Expand Down
5 changes: 5 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ def not_raises(func, *args, **kwargs):
"PROCESS_SINGLE_CATEGORY_IN_POSTGRES"
) or config.getboolean("HDX", "PROCESS_SINGLE_CATEGORY_IN_POSTGRES", fallback=False)

PARALLEL_PROCESSING_CATEGORIES = os.environ.get(
"PARALLEL_PROCESSING_CATEGORIES"
) or config.getboolean("HDX", "PARALLEL_PROCESSING_CATEGORIES", fallback=True)


if ENABLE_HDX_EXPORTS:
HDX_SITE = os.environ.get("HDX_SITE") or config.getboolean(
"HDX", "HDX_SITE", fallback="demo"
Expand Down
8 changes: 3 additions & 5 deletions src/query_builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import re
from json import dumps, loads

from geomet import wkt
from geomet import wkb, wkt

from src.config import logger as logging
from src.validation.models import SupportedFilters, SupportedGeometryFilters
Expand Down Expand Up @@ -901,7 +901,7 @@ def convert_tags_pattern(query_string):
if cid
else f"""ST_within(geom,(select ST_SetSRID(ST_Extent(ST_makeValid(ST_GeomFromText('{wkt.dumps(loads(geometry.json()),decimals=6)}',4326))),4326)))"""
)

postgres_query = f"""select {select_query} from (select * , tableoid::regclass as osm_type from {table} where {row_filter_condition}) as sub_query"""
if single_category_where:
postgres_query += f" where {convert_tags_pattern(single_category_where)}"
Expand Down Expand Up @@ -952,9 +952,7 @@ def extract_features_duckdb(base_table_name, select, feature_type, where, geomet
for table in from_query:
where_query = map_tables[feature_type]["where"][table]
if geometry:
where_query += (
f" and (ST_Within(geometry,ST_GeomFromGeoJSON('{geometry.json()}')))"
)
where_query += f" and (ST_Intersects_Extent(geometry,ST_GeomFromGeoJSON('{geometry.json()}')))"
query = f"""select {select_query} from {f"{base_table_name}_{table}"} where {where_query}"""
base_query.append(query)
return " UNION ALL ".join(base_query)
Expand Down

0 comments on commit 80c91a7

Please sign in to comment.