Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature : Tasking Manger Download Support #155

Merged
merged 14 commits into from
Nov 1, 2023
Merged
39 changes: 35 additions & 4 deletions API/api_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pathlib
import re
import shutil
import time
import zipfile
Expand All @@ -12,6 +13,7 @@
from src.config import ALLOW_BIND_ZIP_FILTER
from src.config import CELERY_BROKER_URL as celery_broker_uri
from src.config import CELERY_RESULT_BACKEND as celery_backend
from src.config import ENABLE_TILES
from src.config import USE_S3_TO_UPLOAD as use_s3_to_upload
from src.config import logger as logging
from src.query_builder.builder import format_file_name_str
Expand All @@ -36,6 +38,13 @@ def process_raw_data(self, params):
if params.output_type
else RawDataOutputType.GEOJSON.value
)
if ENABLE_TILES:
if (
params.output_type == RawDataOutputType.PMTILES.value
or params.output_type == RawDataOutputType.MBTILES.value
):
logging.debug("Using STwithin Logic")
params.use_st_within = True
params.file_name = (
format_file_name_str(params.file_name) if params.file_name else "Export"
)
Expand Down Expand Up @@ -65,15 +74,37 @@ def process_raw_data(self, params):
logging.debug("Zip Binding Done !")
else:
for file_path in pathlib.Path(working_dir).iterdir():
upload_file_path = file_path
inside_file_size += os.path.getsize(file_path)
break # only take one file inside dir , if contains many it should be inside zip
if file_path.is_file() and file_path.name.endswith(
params.output_type.lower()
):
upload_file_path = file_path
inside_file_size += os.path.getsize(file_path)
break # only take one file inside dir , if contains many it should be inside zip
# check if download url will be generated from s3 or not from config
if use_s3_to_upload:
file_transfer_obj = S3FileTransfer()
upload_name = exportname if params.uuid else f"Recurring/{exportname}"
if exportname.startswith("hotosm_project"): # TM
if not params.uuid:
pattern = r"(hotosm_project_)(\d+)"
match = re.match(pattern, exportname)
if match:
prefix = match.group(1)
project_number = match.group(2)
if project_number:
upload_name = f"TM/{project_number}/{exportname}"
elif exportname.startswith("hotosm_"): # HDX
if not params.uuid:
pattern = r"hotosm_([A-Z]{3})_(\w+)"
match = re.match(pattern, exportname)
if match:
iso3countrycode = match.group(1)
if iso3countrycode:
upload_name = f"HDX/{iso3countrycode}/{exportname}"

download_url = file_transfer_obj.upload(
upload_file_path,
exportname if params.uuid else f"Recurring/{exportname}",
upload_name,
file_suffix="zip" if bind_zip else params.output_type.lower(),
)
else:
Expand Down
25 changes: 24 additions & 1 deletion API/raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
from src.config import LIMITER as limiter
from src.config import RATE_LIMIT_PER_MIN as export_rate_limit
from src.config import logger as logging
from src.validation.models import RawDataCurrentParams, SnapshotResponse, StatusResponse
from src.validation.models import (
RawDataCurrentParams,
RawDataCurrentParamsBase,
SnapshotResponse,
StatusResponse,
)

from .api_worker import process_raw_data

Expand Down Expand Up @@ -434,6 +439,24 @@ def get_osm_current_snapshot_as_file(
return JSONResponse({"task_id": task.id, "track_link": f"/tasks/status/{task.id}/"})


@router.get("/snapshot/plain/", response_model=FeatureCollection)
@version(1)
def get_osm_current_snapshot_as_plain_geojson(
request: Request, params: RawDataCurrentParamsBase
):
"""Generates the Plain geojson for the polygon within 100 Sqkm and returns the result right away

Args:
request (Request): _description_
params (RawDataCurrentParamsBase): Same as /snapshot excpet multiple output format options and configurations

Returns:
Featurecollection: Geojson
"""
result = RawData(params).extract_plain_geojson()
return result


@router.get("/countries/", response_model=FeatureCollection)
@version(1)
def get_countries(q: str = ""):
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
| GeoPackage | :heavy_check_mark: |
| PGDUMP | :heavy_check_mark: |
| GeoJSON | :heavy_check_mark: |
| Pmtiles | :heavy_check_mark: |
| Mbtiles | :heavy_check_mark: |

## Installation

Expand Down Expand Up @@ -90,6 +92,9 @@ Setup the necessary configurations for Raw Data API from [configurations](./docs

Setup config.txt in project root.

## Optional : For Tiles Output
If you opt for tiles output and have ```ENABLE_TILES : True``` in env variable . Make sure you install [Tippecanoe] (https://github.com/mapbox/tippecanoe)

### Start the Server

```
Expand Down
7 changes: 7 additions & 0 deletions backend/sql/null_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE INDEX CONCURRENTLY ways_poly_country_idx_null ON public.ways_poly USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];

CREATE INDEX CONCURRENTLY ways_line_country_idx_null ON public.ways_line USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];

CREATE INDEX CONCURRENTLY nodes_country_idx_null ON public.nodes USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];

CREATE INDEX CONCURRENTLY relations_country_idx_null ON public.relations USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];
2 changes: 2 additions & 0 deletions docs/src/installation/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The following are the different configuration options that are accepted.
| `EXPORT_MAX_AREA_SQKM` | `EXPORT_MAX_AREA_SQKM` | `[API_CONFIG]` | `100000` | max area in sq. km. to support for rawdata input | OPTIONAL |
| `USE_CONNECTION_POOLING` | `USE_CONNECTION_POOLING` | `[API_CONFIG]` | `false` | Enable psycopg2 connection pooling | OPTIONAL |
| `ALLOW_BIND_ZIP_FILTER` | `ALLOW_BIND_ZIP_FILTER` | `[API_CONFIG]` | `true` | Enable zip compression for exports | OPTIONAL |
| `ENABLE_TILES` | `ENABLE_TILES` | `[API_CONFIG]` | `false` | Enable Tile Output (Pmtiles and Mbtiles) | OPTIONAL |
| `INDEX_THRESHOLD` | `INDEX_THRESHOLD` | `[API_CONFIG]` | `5000` | Area in sqkm to apply grid/country index filter | OPTIONAL |
| `CELERY_BROKER_URL` | `CELERY_BROKER_URL` | `[CELERY]` | `redis://localhost:6379/0` | Redis connection string for the broker | OPTIONAL |
| `CELERY_RESULT_BACKEND` | `CELERY_RESULT_BACKEND` | `[CELERY]` | `redis://localhost:6379/0` | Redis connection string for the the result backend | OPTIONAL |
Expand Down Expand Up @@ -72,6 +73,7 @@ The following are the different configuration options that are accepted.
| `EXPORT_PATH` | `[API_CONFIG]` | Yes | Yes |
| `EXPORT_MAX_AREA_SQKM` | `[API_CONFIG]` | Yes | No |
| `USE_CONNECTION_POOLING` | `[API_CONFIG]` | Yes | Yes |
| `ENABLE_TILES` | `[API_CONFIG]` | Yes | Yes |
| `ALLOW_BIND_ZIP_FILTER` | `[API_CONFIG]` | Yes | Yes |
| `INDEX_THRESHOLD` | `[API_CONFIG]` | No | Yes |
| `CELERY_BROKER_URL` | TBD | Yes | Yes |
Expand Down
119 changes: 77 additions & 42 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@
from psycopg2 import OperationalError, connect
from psycopg2.extras import DictCursor

from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME
from src.config import (
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
BUCKET_NAME,
ENABLE_TILES,
)
from src.config import EXPORT_PATH as export_path
from src.config import INDEX_THRESHOLD as index_threshold
from src.config import USE_CONNECTION_POOLING as use_connection_pooling
Expand Down Expand Up @@ -321,18 +326,29 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params):
with open(query_path, "w", encoding="UTF-8") as file:
file.write(query)
# for mbtiles we need additional input as well i.e. minzoom and maxzoom , setting default at max=22 and min=10
if outputtype == RawDataOutputType.MBTILES.value:
cmd = """ogr2ogr -overwrite -f MBTILES -dsco MINZOOM={min_zoom} -dsco MAXZOOM={max_zoom} {export_path} PG:"host={host} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress""".format(
min_zoom=params.min_zoom,
max_zoom=params.max_zoom,
export_path=dump_temp_path,
host=db_items.get("host"),
username=db_items.get("user"),
db=db_items.get("dbname"),
password=db_items.get("password"),
pg_sql_select=query_path,
)
run_ogr2ogr_cmd(cmd)
if ENABLE_TILES:
if outputtype == RawDataOutputType.MBTILES.value:
if params.min_zoom and params.max_zoom:
cmd = """ogr2ogr -overwrite -f MBTILES -dsco MINZOOM={min_zoom} -dsco MAXZOOM={max_zoom} {export_path} PG:"host={host} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress""".format(
min_zoom=params.min_zoom,
max_zoom=params.max_zoom,
export_path=dump_temp_path,
host=db_items.get("host"),
username=db_items.get("user"),
db=db_items.get("dbname"),
password=db_items.get("password"),
pg_sql_select=query_path,
)
else:
cmd = """ogr2ogr -overwrite -f MBTILES -dsco ZOOM_LEVEL_AUTO=YES {export_path} PG:"host={host} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress""".format(
export_path=dump_temp_path,
host=db_items.get("host"),
username=db_items.get("user"),
db=db_items.get("dbname"),
password=db_items.get("password"),
pg_sql_select=query_path,
)
run_ogr2ogr_cmd(cmd)

if outputtype == RawDataOutputType.FLATGEOBUF.value:
cmd = """ogr2ogr -overwrite -f FLATGEOBUF {export_path} PG:"host={host} port={port} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress VERIFY_BUFFERS=NO""".format(
Expand Down Expand Up @@ -469,12 +485,14 @@ def get_grid_id(geom, cur):
)

@staticmethod
def to_geojson_raw(results):
"""Responsible for geojson writing"""
features = [orjson.loads(row[0]) for row in results]
feature_collection = FeatureCollection(features=features)

return feature_collection
def geojson2tiles(geojson_path, tile_path, tile_layer_name):
"""Responsible for geojson to tiles"""
cmd = """tippecanoe -zg --projection=EPSG:4326 -o {tile_output_path} -l {tile_layer_name} {geojson_input_path} --force""".format(
tile_output_path=tile_path,
tile_layer_name=tile_layer_name,
geojson_input_path=geojson_path,
)
run_ogr2ogr_cmd(cmd)

def extract_current_data(self, exportname):
"""Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump
Expand All @@ -500,25 +518,59 @@ def extract_current_data(self, exportname):
# Create a exports directory because it does not exist
os.makedirs(working_dir)
# create file path with respect to of output type

dump_temp_file_path = os.path.join(
working_dir,
f"{self.params.file_name if self.params.file_name else 'Export'}.{output_type.lower()}",
)
try:
# currently we have only geojson binding function written other than that we have depend on ogr
if ENABLE_TILES:
if output_type == RawDataOutputType.PMTILES.value:
geojson_path = os.path.join(
working_dir,
f"{self.params.file_name if self.params.file_name else 'Export'}.geojson",
)
RawData.query2geojson(
self.con,
raw_currentdata_extraction_query(
self.params,
g_id=grid_id,
c_id=country,
country_export=country_export,
),
geojson_path,
)
RawData.geojson2tiles(
geojson_path, dump_temp_file_path, self.params.file_name
)
if output_type == RawDataOutputType.MBTILES.value:
RawData.ogr_export(
query=raw_currentdata_extraction_query(
self.params,
grid_id,
country,
ogr_export=True,
country_export=country_export,
),
outputtype=output_type,
dump_temp_path=dump_temp_file_path,
working_dir=working_dir,
params=self.params,
) # uses ogr export to export

if output_type == RawDataOutputType.GEOJSON.value:
RawData.query2geojson(
self.con,
raw_currentdata_extraction_query(
self.params,
g_id=grid_id,
c_id=country,
geometry_dump=geometry_dump,
country_export=country_export,
),
dump_temp_file_path,
) # uses own conversion class
elif output_type == RawDataOutputType.SHAPEFILE.value:
if output_type == RawDataOutputType.SHAPEFILE.value:
(
point_query,
line_query,
Expand All @@ -542,13 +594,12 @@ def extract_current_data(self, exportname):
if self.params.file_name
else "Export",
) # using ogr2ogr
else:
if output_type in ["fgb", "gpkg", "sql", "csv"]:
RawData.ogr_export(
query=raw_currentdata_extraction_query(
self.params,
grid_id,
country,
geometry_dump,
ogr_export=True,
country_export=country_export,
),
Expand Down Expand Up @@ -613,24 +664,7 @@ def get_osm_feature(self, osm_id):

def extract_plain_geojson(self):
"""Gets geojson for small area : Performs direct query with/without geometry"""
query = raw_extract_plain_geojson(self.params, inspect_only=True)
self.cur.execute(query)
analyze_fetched = self.cur.fetchall()
rows = list(
filter(lambda x: x.startswith("rows"), analyze_fetched[0][0].split())
)
approx_returned_rows = rows[0].split("=")[1]
logging.debug("Approximated query output : %s", approx_returned_rows)

if int(approx_returned_rows) > 500:
self.cur.close()
RawData.close_con(self.con)
raise HTTPException(
status_code=500,
detail=f"Query returned {approx_returned_rows} rows (This endpoint supports upto 1000) , Use /current-snapshot/ for larger extraction",
)

extraction_query = raw_extract_plain_geojson(self.params)
extraction_query = raw_currentdata_extraction_query(self.params)
features = []

with self.con.cursor(
Expand Down Expand Up @@ -686,11 +720,12 @@ def upload(self, file_path, file_name, file_suffix="zip"):
sample function call :
S3FileTransfer.transfer(file_path="exports",file_prefix="upload_test")"""
file_name = f"{file_name}.{file_suffix}"
logging.debug("Started Uploading %s from %s", file_name, file_path)
# instantiate upload
start_time = time.time()

try:
self.s_3.upload_file(file_path, BUCKET_NAME, file_name)
self.s_3.upload_file(str(file_path), BUCKET_NAME, str(file_name))
except Exception as ex:
logging.error(ex)
raise ex
Expand Down
4 changes: 4 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
"API_CONFIG", "ALLOW_BIND_ZIP_FILTER", fallback=None
)

ENABLE_TILES = os.environ.get("ENABLE_TILES") or config.get(
"API_CONFIG", "ENABLE_TILES", fallback=None
)

####################

### EXPORT_UPLOAD CONFIG BLOCK
Expand Down
5 changes: 2 additions & 3 deletions src/query_builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,8 @@ def get_country_geojson(c_id):

def raw_currentdata_extraction_query(
params,
g_id,
c_id,
geometry_dump,
g_id=None,
c_id=None,
ogr_export=False,
select_all=False,
country_export=False,
Expand Down
Loading
Loading