Skip to content

Commit

Permalink
Merge pull request #202 from hotosm/enhance/custom_exports
Browse files Browse the repository at this point in the history
Enhance : Custom Exports : Support Postgres
  • Loading branch information
kshitijrajsharma authored Jan 14, 2024
2 parents 1837436 + 49c3ab2 commit a20ea0f
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 108 deletions.
4 changes: 2 additions & 2 deletions API/hdx.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ async def process_custom_requests(
"addr:city",
"source",
],
"where": "tags['amenity'] IN ('kindergarten', 'school', 'college', 'university') OR building IN ('kindergarten', 'school', 'college', 'university')",
"where": "tags['amenity'] IN ('kindergarten', 'school', 'college', 'university') OR tags['building'] IN ('kindergarten', 'school', 'college', 'university')",
"formats": ["geojson"],
}
},
Expand Down Expand Up @@ -681,7 +681,7 @@ async def process_custom_requests(
"addr:city",
"source",
],
"where": "tags['amenity'] IN ('kindergarten', 'school', 'college', 'university') OR building IN ('kindergarten', 'school', 'college', 'university')",
"where": "tags['amenity'] IN ('kindergarten', 'school', 'college', 'university') OR tags['building'] IN ('kindergarten', 'school', 'college', 'university')",
"formats": ["geojson", "shp", "kml"],
}
},
Expand Down
2 changes: 2 additions & 0 deletions docs/src/installation/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The following are the different configuration options that are accepted.
| `POLYGON_STATISTICS_API_URL` | `POLYGON_STATISTICS_API_RATE_LIMIT` | `[API_CONFIG]` | `5` | Rate limit to be applied for statistics endpoint per minute, Defaults to 5 request is allowed per minute | OPTIONAL |
| `DEFAULT_SOFT_TASK_LIMIT` | `DEFAULT_SOFT_TASK_LIMIT` | `[API_CONFIG]` | `7200` | Soft task time limit signal for celery workers in seconds.It will gently remind celery to finish up the task and terminate, Defaults to 2 Hour| OPTIONAL |
| `DEFAULT_HARD_TASK_LIMIT` | `DEFAULT_HARD_TASK_LIMIT` | `[API_CONFIG]` | `10800` | Hard task time limit signal for celery workers in seconds. It will immediately kill the celery task.Defaults to 3 Hour| OPTIONAL |
| `USE_DUCK_DB_FOR_CUSTOM_EXPORTS` | `USE_DUCK_DB_FOR_CUSTOM_EXPORTS` | `[API_CONFIG]` | `True` | Enable this setting to use duckdb , By default duck db is disabled and postgres is used| OPTIONAL |
| `CELERY_BROKER_URL` | `CELERY_BROKER_URL` | `[CELERY]` | `redis://localhost:6379/0` | Redis connection string for the broker | OPTIONAL |
| `CELERY_RESULT_BACKEND` | `CELERY_RESULT_BACKEND` | `[CELERY]` | `redis://localhost:6379/0` | Redis/psotgresql connection string for the the result backend, eg : db+postgresql://username:password@localhost:5432/db_name | OPTIONAL |
| `FILE_UPLOAD_METHOD` | `FILE_UPLOAD_METHOD` | `[EXPORT_UPLOAD]` | `disk` | File upload method; Allowed values - disk, s3 | OPTIONAL |
Expand Down Expand Up @@ -110,6 +111,7 @@ The following are the different configuration options that are accepted.
| `POLYGON_STATISTICS_API_RATE_LIMIT` | `[API_CONFIG]` | Yes | No |
| `DEFAULT_SOFT_TASK_LIMIT` | `[API_CONFIG]` | No | Yes |
| `DEFAULT_HARD_TASK_LIMIT` | `[API_CONFIG]` | No | Yes |
| `USE_DUCK_DB_FOR_CUSTOM_EXPORTS` | `[API_CONFIG]` | Yes | Yes |
| `CELERY_BROKER_URL` | `[CELERY]` | Yes | Yes |
| `CELERY_RESULT_BACKEND` | `[CELERY]` | Yes | Yes |
| `FILE_UPLOAD_METHOD` | `[EXPORT_UPLOAD]` | Yes | Yes |
Expand Down
183 changes: 120 additions & 63 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,19 @@
PROCESS_SINGLE_CATEGORY_IN_POSTGRES,
)
from src.config import USE_CONNECTION_POOLING as use_connection_pooling
from src.config import USE_S3_TO_UPLOAD, get_db_connection_params, level
from src.config import (
USE_DUCK_DB_FOR_CUSTOM_EXPORTS,
USE_S3_TO_UPLOAD,
get_db_connection_params,
level,
)
from src.config import logger as logging
from src.query_builder.builder import (
HDX_FILTER_CRITERIA,
HDX_MARKDOWN,
check_exisiting_country,
check_last_updated_rawdata,
extract_features_duckdb,
extract_features_custom_exports,
extract_geometry_type_query,
generate_polygon_stats_graphql_query,
get_countries_query,
Expand All @@ -88,17 +93,17 @@
import logging as log

if ENABLE_HDX_EXPORTS:
import duckdb
if USE_DUCK_DB_FOR_CUSTOM_EXPORTS is True:
import duckdb
from src.config import (
DUCK_DB_MEMORY_LIMIT,
DUCK_DB_THREAD_LIMIT,
)

from hdx.data.dataset import Dataset
from hdx.data.resource import Resource

from src.config import (
DUCK_DB_MEMORY_LIMIT,
DUCK_DB_THREAD_LIMIT,
HDX_MAINTAINER,
HDX_OWNER_ORG,
HDX_URL_PREFIX,
)
from src.config import HDX_MAINTAINER, HDX_OWNER_ORG, HDX_URL_PREFIX


global LOCAL_CON_POOL
Expand Down Expand Up @@ -154,6 +159,37 @@ def dict_none_clean(to_clean):
return result


def generate_ogr2ogr_cmd_from_psql(
export_file_path,
export_file_format_driver,
postgres_query,
layer_creation_options,
query_dump_path,
):
"""
Generates ogr2ogr command for postgresql queries
"""
db_items = get_db_connection_params()
os.makedirs(query_dump_path, exist_ok=True)
query_path = os.path.join(query_dump_path, "query.sql")
with open(query_path, "w", encoding="UTF-8") as file:
file.write(postgres_query)
ogr2ogr_cmd = """ogr2ogr -overwrite -f "{export_format}" {export_path} PG:"host={host} port={port} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" {layer_creation_options_str} -progress""".format(
export_format=export_file_format_driver,
export_path=export_file_path,
host=db_items.get("host"),
port=db_items.get("port"),
username=db_items.get("user"),
db=db_items.get("dbname"),
password=db_items.get("password"),
pg_sql_select=query_path,
layer_creation_options_str=f"-lco {layer_creation_options}"
if layer_creation_options
else "",
)
return ogr2ogr_cmd


def run_ogr2ogr_cmd(cmd):
"""Runs command and monitors the file size until the process runs
Expand Down Expand Up @@ -1222,11 +1258,13 @@ def __init__(self, params):
if os.path.exists(self.default_export_path):
shutil.rmtree(self.default_export_path, ignore_errors=True)
os.makedirs(self.default_export_path)
self.duck_db_db_path = os.path.join(
self.default_export_path,
f"{self.iso3 if self.iso3 else self.params.dataset.dataset_prefix}.db",
)
self.duck_db_instance = DuckDB(self.duck_db_db_path)

if USE_DUCK_DB_FOR_CUSTOM_EXPORTS is True:
self.duck_db_db_path = os.path.join(
self.default_export_path,
f"{self.iso3 if self.iso3 else self.params.dataset.dataset_prefix}.db",
)
self.duck_db_instance = DuckDB(self.duck_db_db_path)

def types_to_tables(self, type_list: list):
"""
Expand All @@ -1252,7 +1290,7 @@ def types_to_tables(self, type_list: list):

return list(table_set)

def format_where_clause(self, where_clause):
def format_where_clause_duckdb(self, where_clause):
"""
Formats the where_clause by replacing the first occurrence of the pattern.
Expand Down Expand Up @@ -1362,7 +1400,6 @@ def query_to_file(self, query, category_name, feature_type, export_formats):
self.default_export_path, category_name, feature_type
)
resources = []
start_export_formats_time = time.time()

def process_export_format(export_format):
export_format = EXPORT_TYPE_MAPPING.get(export_format)
Expand All @@ -1388,8 +1425,21 @@ def process_export_format(export_format):
if export_format.layer_creation_options
else ""
)
executable_query = f"""COPY ({query.strip()}) TO '{export_file_path}' WITH (FORMAT {export_format.format_option}{f", DRIVER '{export_format.driver_name}'{f', LAYER_CREATION_OPTIONS {layer_creation_options_str}' if layer_creation_options_str else ''}" if export_format.format_option == 'GDAL' else ''})"""
self.duck_db_instance.run_query(executable_query.strip(), load_spatial=True)
if USE_DUCK_DB_FOR_CUSTOM_EXPORTS is True:
executable_query = f"""COPY ({query.strip()}) TO '{export_file_path}' WITH (FORMAT {export_format.format_option}{f", DRIVER '{export_format.driver_name}'{f', LAYER_CREATION_OPTIONS {layer_creation_options_str}' if layer_creation_options_str else ''}" if export_format.format_option == 'GDAL' else ''})"""
self.duck_db_instance.run_query(
executable_query.strip(), load_spatial=True
)
else:
ogr2ogr_cmd = generate_ogr2ogr_cmd_from_psql(
export_file_path=export_file_path,
export_file_format_driver=export_format.driver_name,
postgres_query=query.strip(),
layer_creation_options=layer_creation_options_str,
query_dump_path=export_format_path,
)
run_ogr2ogr_cmd(ogr2ogr_cmd)

zip_file_path = os.path.join(file_export_path, f"{export_filename}.zip")
zip_path = self.file_to_zip(export_format_path, zip_file_path)

Expand Down Expand Up @@ -1469,12 +1519,15 @@ def process_category(self, category):
logging.info("Started Processing %s", category_name)
all_uploaded_resources = []
for feature_type in category_data.types:
extract_query = extract_features_duckdb(
extract_query = extract_features_custom_exports(
self.iso3 if self.iso3 else self.params.dataset.dataset_prefix,
category_data.select,
feature_type,
self.format_where_clause(category_data.where),
self.format_where_clause_duckdb(category_data.where)
if USE_DUCK_DB_FOR_CUSTOM_EXPORTS is True
else category_data.where,
geometry=self.params.geometry if self.params.geometry else None,
cid=self.cid,
)
resources = self.query_to_file(
extract_query,
Expand Down Expand Up @@ -1574,36 +1627,39 @@ def process_hdx_tags(self):
self.params.categories = [
category for category in self.params.categories if category
]
table_type = [
cat_type
for category in self.params.categories
if category
for cat_type in list(category.values())[0].types
]
where_0_category = None
if USE_DUCK_DB_FOR_CUSTOM_EXPORTS is True:
table_type = [
cat_type
for category in self.params.categories
if category
for cat_type in list(category.values())[0].types
]
where_0_category = None

if len(self.params.categories) == 1 and PROCESS_SINGLE_CATEGORY_IN_POSTGRES:
where_0_category = list(self.params.categories[0].values())[0].where
if len(self.params.categories) == 1 and PROCESS_SINGLE_CATEGORY_IN_POSTGRES:
where_0_category = list(self.params.categories[0].values())[0].where

table_names = self.types_to_tables(list(set(table_type)))
base_table_name = self.iso3 if self.iso3 else self.params.dataset.dataset_prefix
for table in table_names:
create_table = postgres2duckdb_query(
base_table_name=base_table_name,
table=table,
cid=self.cid,
geometry=self.params.geometry,
single_category_where=where_0_category,
)
logging.debug(create_table)
start = time.time()
logging.info("Transfer-> Postgres Data to DuckDB Started : %s", table)
self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True)
logging.info(
"Transfer-> Postgres Data to DuckDB : %s Done in %s",
table,
humanize.naturaldelta(timedelta(seconds=(time.time() - start))),
table_names = self.types_to_tables(list(set(table_type)))
base_table_name = (
self.iso3 if self.iso3 else self.params.dataset.dataset_prefix
)
for table in table_names:
create_table = postgres2duckdb_query(
base_table_name=base_table_name,
table=table,
cid=self.cid,
geometry=self.params.geometry,
single_category_where=where_0_category,
)
logging.debug(create_table)
start = time.time()
logging.info("Transfer-> Postgres Data to DuckDB Started : %s", table)
self.duck_db_instance.run_query(create_table.strip(), attach_pgsql=True)
logging.info(
"Transfer-> Postgres Data to DuckDB : %s Done in %s",
table,
humanize.naturaldelta(timedelta(seconds=(time.time() - start))),
)

CategoryResult = namedtuple(
"CategoryResult", ["category", "uploaded_resources"]
Expand Down Expand Up @@ -1649,29 +1705,30 @@ def process_hdx_tags(self):

result = {"datasets": dataset_results}
if self.params.meta:
logging.info("Dumping Duck DB to Parquet")
db_dump_path = os.path.join(
self.default_export_path,
"DB_DUMP",
)
os.makedirs(db_dump_path, exist_ok=True)
export_db = f"""EXPORT DATABASE '{db_dump_path}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000);"""
self.duck_db_instance.run_query(export_db, load_spatial=True)
db_zip_download_url = self.upload_to_s3(
self.file_to_zip(
working_dir=db_dump_path,
zip_path=os.path.join(self.default_export_path, "dbdump.zip"),
if USE_DUCK_DB_FOR_CUSTOM_EXPORTS is True:
logging.info("Dumping Duck DB to Parquet")
db_dump_path = os.path.join(
self.default_export_path,
"DB_DUMP",
)
)
result["db_dump"] = db_zip_download_url
os.makedirs(db_dump_path, exist_ok=True)
export_db = f"""EXPORT DATABASE '{db_dump_path}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000);"""
self.duck_db_instance.run_query(export_db, load_spatial=True)
db_zip_download_url = self.upload_to_s3(
self.file_to_zip(
working_dir=db_dump_path,
zip_path=os.path.join(self.default_export_path, "dbdump.zip"),
)
)
result["db_dump"] = db_zip_download_url
processing_time_close = time.time()
result["elapsed_time"] = humanize.naturaldelta(
timedelta(seconds=(processing_time_close - processing_time_start))
)
result["started_at"] = started_at

meta_last_run_dump_path = os.path.join(self.default_export_path, "meta.json")
with open(meta_last_run_dump_path, "w") as json_file:
with open(meta_last_run_dump_path, "w", encoding="UTF-8") as json_file:
json.dump(result, json_file, indent=4)
self.upload_to_s3(resource_path=meta_last_run_dump_path)
self.clean_resources()
Expand Down
19 changes: 12 additions & 7 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def not_raises(func, *args, **kwargs):
"API_CONFIG", "DEFAULT_HARD_TASK_LIMIT", fallback=3 * 60 * 60
)

USE_DUCK_DB_FOR_CUSTOM_EXPORTS = os.environ.get(
"USE_DUCK_DB_FOR_CUSTOM_EXPORTS"
) or config.getboolean("API_CONFIG", "USE_DUCK_DB_FOR_CUSTOM_EXPORTS", fallback=False)


HDX_SOFT_TASK_LIMIT = os.environ.get("HDX_SOFT_TASK_LIMIT") or config.get(
"HDX", "HDX_SOFT_TASK_LIMIT", fallback=5 * 60 * 60
)
Expand Down Expand Up @@ -265,13 +270,13 @@ def not_raises(func, *args, **kwargs):
else None
)
)

DUCK_DB_MEMORY_LIMIT = os.environ.get("DUCK_DB_MEMORY_LIMIT") or config.get(
"HDX", "DUCK_DB_MEMORY_LIMIT", fallback=None
)
DUCK_DB_THREAD_LIMIT = os.environ.get("DUCK_DB_THREAD_LIMIT") or config.get(
"HDX", "DUCK_DB_THREAD_LIMIT", fallback=None
)
if USE_DUCK_DB_FOR_CUSTOM_EXPORTS:
DUCK_DB_MEMORY_LIMIT = os.environ.get("DUCK_DB_MEMORY_LIMIT") or config.get(
"HDX", "DUCK_DB_MEMORY_LIMIT", fallback=None
)
DUCK_DB_THREAD_LIMIT = os.environ.get("DUCK_DB_THREAD_LIMIT") or config.get(
"HDX", "DUCK_DB_THREAD_LIMIT", fallback=None
)


def get_db_connection_params() -> dict:
Expand Down
Loading

0 comments on commit a20ea0f

Please sign in to comment.