diff --git a/API/tasks.py b/API/tasks.py index 29826800..a2de4133 100644 --- a/API/tasks.py +++ b/API/tasks.py @@ -20,9 +20,9 @@ @version(1) def get_task_status( task_id, - args: bool = Query( + only_args: bool = Query( default=False, - description="Displays argument of task as well", + description="Fetches arguments of task", ), ): """Tracks the request from the task id provided by Raw Data API for the request @@ -62,6 +62,8 @@ def get_task_status( """ task_result = AsyncResult(task_id, app=celery) + if only_args: + return JSONResponse(task_result.args) task_response_result = None if task_result.status == "SUCCESS": task_response_result = task_result.result @@ -73,8 +75,6 @@ def get_task_status( "status": task_result.state, "result": task_response_result, } - if args: - result["args"] = task_result.args return JSONResponse(result) @@ -114,18 +114,19 @@ def inspect_workers( if summary: if active_tasks: for worker, tasks in active_tasks.items(): + worker_tasks = {worker: {}} + for task in tasks: - temp_task = {} - temp_task["id"] = task["id"] - temp_task["name"] = task["name"] - temp_task["time_start"] = ( + worker_tasks[worker]["id"] = task["id"] + worker_tasks[worker]["task"] = task["name"] + worker_tasks[worker]["time_start"] = ( datetime.fromtimestamp(task["time_start"]).strftime( "%Y-%m-%d %H:%M:%S" ) if task["time_start"] else None ) - active_tasks_summary.append(temp_task) + active_tasks_summary.append(worker_tasks) response_data = { "active": active_tasks_summary if summary else active_tasks, @@ -177,7 +178,13 @@ def get_queue_info(): @router.get("/queue/details/{queue_name}/") @version(1) -def get_list_details(queue_name: str): +def get_list_details( + queue_name: str, + args: bool = Query( + default=False, + description="Includes arguments of task", + ), +): if queue_name not in queues: raise HTTPException(status_code=404, detail=f"Queue '{queue_name}' not found") redis_client = redis.StrictRedis.from_url(CELERY_BROKER_URL) @@ -191,7 +198,7 @@ def get_list_details(queue_name: str): { "index": index, "id": json.loads(item)["headers"]["id"], - "args": json.loads(item)["headers"]["argsrepr"], + **({"args": json.loads(item)["headers"]["argsrepr"]} if args else {}), } for index, item in enumerate(list_items) ] diff --git a/docs/src/installation/configurations.md b/docs/src/installation/configurations.md index 2e93b1c5..bbc2317f 100644 --- a/docs/src/installation/configurations.md +++ b/docs/src/installation/configurations.md @@ -79,6 +79,7 @@ The following are the different configuration options that are accepted. | `HDX_SOFT_TASK_LIMIT` | `HDX_SOFT_TASK_LIMIT` | `[HDX]` | `18000` | Soft task time limit signal for celery workers in seconds.It will gently remind celery to finish up the task and terminate, Defaults to 5 Hour| OPTIONAL | | `HDX_HARD_TASK_LIMIT` | `HDX_HARD_TASK_LIMIT` | `[HDX]` | `21600` | Hard task time limit signal for celery workers in seconds. It will immediately kill the celery task.Defaults to 6 Hour| OPTIONAL | | `PROCESS_SINGLE_CATEGORY_IN_POSTGRES` | `PROCESS_SINGLE_CATEGORY_IN_POSTGRES` | `[HDX]` | False | Recommended for workers with low memery or CPU usage , This will process single category request like buildings only , Roads only in postgres itself and avoid extraction from duckdb| OPTIONAL | +| `PARALLEL_PROCESSING_CATEGORIES` | `PARALLEL_PROCESSING_CATEGORIES` | `[HDX]` | True | Enable parallel processing for mulitple categories and export formats , Disable this if you have single cpu and limited RAM , Enabled by default| OPTIONAL | ## Which Service uses which settings? @@ -127,6 +128,7 @@ The following are the different configuration options that are accepted. | `HDX_SOFT_TASK_LIMIT` | `[HDX]` | No | Yes | | `HDX_HARD_TASK_LIMIT` | `[HDX]` | No | Yes | | `PROCESS_SINGLE_CATEGORY_IN_POSTGRES` | `[HDX]` | No | Yes | +| `PARALLEL_PROCESSING_CATEGORIES` | `[HDX]` | No | Yes | diff --git a/src/app.py b/src/app.py index 1c193b79..da73c9f5 100644 --- a/src/app.py +++ b/src/app.py @@ -55,7 +55,11 @@ ) from src.config import EXPORT_PATH as export_path from src.config import INDEX_THRESHOLD as index_threshold -from src.config import POLYGON_STATISTICS_API_URL, PROCESS_SINGLE_CATEGORY_IN_POSTGRES +from src.config import ( + PARALLEL_PROCESSING_CATEGORIES, + POLYGON_STATISTICS_API_URL, + PROCESS_SINGLE_CATEGORY_IN_POSTGRES, +) from src.config import USE_CONNECTION_POOLING as use_connection_pooling from src.config import USE_S3_TO_UPLOAD, get_db_connection_params, level from src.config import logger as logging @@ -1302,10 +1306,9 @@ def zip_to_s3(self, resources): - List of resource dictionaries with added download URLs. """ for resource in resources: - resource["download_url"] = self.upload_to_s3( - resource_path=resource["zip_path"] - ) - os.remove(resource["zip_path"]) + temp_zip_path = resource["url"] + resource["url"] = self.upload_to_s3(resource_path=temp_zip_path) + os.remove(temp_zip_path) return resources def file_to_zip(self, working_dir, zip_path): @@ -1325,6 +1328,7 @@ def file_to_zip(self, working_dir, zip_path): compression=zipfile.ZIP_DEFLATED, chunk_size=zipfile.SOZIP_DEFAULT_CHUNK_SIZE, ) + for file_path in pathlib.Path(working_dir).iterdir(): zf.write(file_path, arcname=file_path.name) utc_now = datetime.now(timezone.utc) @@ -1334,6 +1338,8 @@ def file_to_zip(self, working_dir, zip_path): readme_content += "Exported through Raw-data-api (https://github.com/hotosm/raw-data-api) using OpenStreetMap data.\n" readme_content += "Learn more about OpenStreetMap and its data usage policy : https://www.openstreetmap.org/about \n" zf.writestr("Readme.txt", readme_content) + if self.params.geometry: + zf.writestr("clipping_boundary.geojson", self.params.geometry.json()) zf.close() shutil.rmtree(working_dir) return zip_path @@ -1362,6 +1368,7 @@ def process_export_format(export_format): export_format = EXPORT_TYPE_MAPPING.get(export_format) export_format_path = os.path.join(file_export_path, export_format.suffix) os.makedirs(export_format_path, exist_ok=True) + start = time.time() logging.info( "Processing %s:%s", category_name.lower(), export_format.suffix ) @@ -1385,15 +1392,27 @@ def process_export_format(export_format): self.duck_db_instance.run_query(executable_query.strip(), load_spatial=True) zip_file_path = os.path.join(file_export_path, f"{export_filename}.zip") zip_path = self.file_to_zip(export_format_path, zip_file_path) + resource = {} - resource["filename"] = f"{export_filename}.zip" - resource["zip_path"] = zip_path - resource["format_suffix"] = export_format.suffix - resource["format_description"] = export_format.driver_name - logging.info("Done %s:%s", category_name.lower(), export_format.suffix) + resource["name"] = f"{export_filename}.zip" + resource["url"] = zip_path + resource["format"] = export_format.suffix + resource["description"] = export_format.driver_name + resource["size"] = os.path.getsize(zip_path) + resource["last_modifed"] = datetime.now().isoformat() + logging.info( + "Done %s:%s in %s", + category_name.lower(), + export_format.suffix, + humanize.naturaldelta(timedelta(seconds=(time.time() - start))), + ) return resource - if self.parallel_process_state is False and len(export_formats) > 1: + if ( + self.parallel_process_state is False + and len(export_formats) > 1 + and PARALLEL_PROCESSING_CATEGORIES is True + ): logging.info( "Using Parallel Processing for %s Export formats", category_name.lower() ) @@ -1465,7 +1484,10 @@ def process_category(self, category): geometry=self.params.geometry if self.params.geometry else None, ) resources = self.query_to_file( - extract_query, category_name, feature_type, category_data.formats + extract_query, + category_name, + feature_type, + list(set(category_data.formats)), ) uploaded_resources = self.zip_to_s3(resources) all_uploaded_resources.extend(uploaded_resources) @@ -1490,20 +1512,7 @@ def resource_to_response(self, uploaded_resources, category): - Dictionary containing the response information. """ category_name, category_data = list(category.items())[0] - - dataset_info = {} - resources = [] - for resource in uploaded_resources: - resource_meta = { - "name": resource["filename"], - "format": resource["format_suffix"], - "description": resource["format_description"], - "url": resource["download_url"], - } - resource_meta["uploaded_to_hdx"]: False - resources.append(resource_meta) - dataset_info["resources"] = resources - return {category_name: dataset_info} + return {category_name: {"resources": uploaded_resources}} def resource_to_hdx(self, uploaded_resources, dataset_config, category): """ @@ -1518,8 +1527,7 @@ def resource_to_hdx(self, uploaded_resources, dataset_config, category): - Dictionary containing the HDX upload information. """ if any( - item["format_suffix"] in self.HDX_SUPPORTED_FORMATS - for item in uploaded_resources + item["format"] in self.HDX_SUPPORTED_FORMATS for item in uploaded_resources ): uploader = HDXUploader( hdx=dataset_config, @@ -1541,18 +1549,11 @@ def resource_to_hdx(self, uploaded_resources, dataset_config, category): uploader.init_dataset() non_hdx_resources = [] for resource in uploaded_resources: - resource_meta = { - "name": resource["filename"], - "format": resource["format_suffix"], - "description": resource["format_description"], - "url": resource["download_url"], - "last_modifed": datetime.now().isoformat(), - } - if resource["format_suffix"] in self.HDX_SUPPORTED_FORMATS: - uploader.add_resource(resource_meta) + if resource["format"] in self.HDX_SUPPORTED_FORMATS: + uploader.add_resource(resource) + resource["uploaded_to_hdx"] = True else: - resource_meta["uploaded_to_hdx"]: False - non_hdx_resources.append(resource_meta) + non_hdx_resources.append(resource) category_name, hdx_dataset_info = uploader.upload_dataset(self.params.meta) hdx_dataset_info["resources"].extend(non_hdx_resources) return {category_name: hdx_dataset_info} @@ -1606,7 +1607,7 @@ def process_hdx_tags(self): logging.info("Transfer-> Postgres Data to DuckDB Started : %s", table) self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True) logging.info( - "Transfer-> Postgres Data to DuckDB : %s Done in %s s", + "Transfer-> Postgres Data to DuckDB : %s Done in %s", table, humanize.naturaldelta(timedelta(seconds=(time.time() - start))), ) @@ -1617,7 +1618,7 @@ def process_hdx_tags(self): tag_process_results = [] dataset_results = [] - if len(self.params.categories) > 1: + if len(self.params.categories) > 1 and PARALLEL_PROCESSING_CATEGORIES is True: self.parallel_process_state = True logging.info("Starting to Use Parallel Processes") with concurrent.futures.ThreadPoolExecutor( diff --git a/src/config.py b/src/config.py index b9488683..89fda680 100644 --- a/src/config.py +++ b/src/config.py @@ -206,6 +206,11 @@ def not_raises(func, *args, **kwargs): "PROCESS_SINGLE_CATEGORY_IN_POSTGRES" ) or config.getboolean("HDX", "PROCESS_SINGLE_CATEGORY_IN_POSTGRES", fallback=False) +PARALLEL_PROCESSING_CATEGORIES = os.environ.get( + "PARALLEL_PROCESSING_CATEGORIES" +) or config.getboolean("HDX", "PARALLEL_PROCESSING_CATEGORIES", fallback=True) + + if ENABLE_HDX_EXPORTS: HDX_SITE = os.environ.get("HDX_SITE") or config.getboolean( "HDX", "HDX_SITE", fallback="demo" diff --git a/src/query_builder/builder.py b/src/query_builder/builder.py index 4625296d..e6f4d830 100644 --- a/src/query_builder/builder.py +++ b/src/query_builder/builder.py @@ -20,7 +20,7 @@ import re from json import dumps, loads -from geomet import wkt +from geomet import wkb, wkt from src.config import logger as logging from src.validation.models import SupportedFilters, SupportedGeometryFilters @@ -901,7 +901,7 @@ def convert_tags_pattern(query_string): if cid else f"""ST_within(geom,(select ST_SetSRID(ST_Extent(ST_makeValid(ST_GeomFromText('{wkt.dumps(loads(geometry.json()),decimals=6)}',4326))),4326)))""" ) - + postgres_query = f"""select {select_query} from (select * , tableoid::regclass as osm_type from {table} where {row_filter_condition}) as sub_query""" if single_category_where: postgres_query += f" where {convert_tags_pattern(single_category_where)}" @@ -952,9 +952,7 @@ def extract_features_duckdb(base_table_name, select, feature_type, where, geomet for table in from_query: where_query = map_tables[feature_type]["where"][table] if geometry: - where_query += ( - f" and (ST_Within(geometry,ST_GeomFromGeoJSON('{geometry.json()}')))" - ) + where_query += f" and (ST_Intersects_Extent(geometry,ST_GeomFromGeoJSON('{geometry.json()}')))" query = f"""select {select_query} from {f"{base_table_name}_{table}"} where {where_query}""" base_query.append(query) return " UNION ALL ".join(base_query)