diff --git a/.gitignore b/.gitignore index d140c86..aad53df 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ data/ stations_*.csv stations_*.geo.json testdata_merge.csv -token_deepatlas.json \ No newline at end of file +token_deepatlas.json +alembic/versions/*.py \ No newline at end of file diff --git a/README.md b/README.md index c676051..6be9bb3 100644 --- a/README.md +++ b/README.md @@ -65,23 +65,17 @@ source venv/bin/activate pip install -r requirements.txt ``` -**Trouble Shooting:** +##### Trouble Shooting If pip install is failing during psycopg2 install: -A) Search for openssl lib path (`which openssl` may help) +1. Make sure you have the following packages installed on your system + - Ubuntu users: `build-essential`, `libgdal-dev`,`libpq5` + - Mac users: A postgres version needs to be installed, e.g. via `brew install postgresql` +2. If there are still errors, search for openssl lib path (`which openssl` may help) and identify which one is needed for linking the compiler for psycopg2 correctly. -Then export it as the corresponding LDFLAGS environment variable before pip install, for instance: - - export LDFLAGS="-L/usr/local/Cellar/openssl@3/3.1.3/lib" - -B) Ubuntu Users: - -Make sure you have the following packages installed: - -- build-essential -- libgdal-dev -- libpq5 +Then export it as the corresponding LDFLAGS environment variable before pip install, for instance, +`export LDFLAGS="-L/usr/local/Cellar/openssl@3/3.1.3/lib"` #### Set environment variables @@ -134,21 +128,37 @@ Then run migration alembic upgrade head -### Run your import/merge/export +### Running eCharm +eCharm can be run similar to a command line tool. +Run `python main.py -h` to see the full list of command line options. + +Here are a few example commands for running tasks: + +#### Import and merge stations for all available countries: ```bash -python main.py import merge export --countries de it +python main.py import merge --delete_data ``` +Note that you have to configure API-keys for certain data sources, as explained in the +[set environment variables section](#set-environment-variables). -Run `python main.py -h` to see the full list of command line options. +We also recommend to use the `--delete_data` flag to remove old data from the database before running import or merge +tasks, since eCharm is not (yet) clever with updating data from consecutive imports or merges. -Feel free to adjust the command line options to your needs: -* main tasks - * `import` fetches and stores the data from the original sources, i.e. OSM, OCM and potential government data sources - * `merge` searches for duplicates and merges attributes of duplicate stations - * `export` create a data export for the specified countries in `csv` or `geo-json` format -* `countries` Currently we support `de`,`gb`,`fr`, `it`, `nor` and `swe` -* `offline` if not present (default), fetch data online from original data sources. If present, use files cached on disk -* `delete_data` if present: For import, delete all existing stations data before import. For merge, it delete only merged station data and reset merge status of original stations +#### Import and merge stations for Germany and Italy only: +```bash +python main.py import merge --countries de it --delete_data +``` +Currently, we support `de`,`gb`,`fr`, `it`, `nor` and `swe` as country codes. + +#### Export all original (un-merged) station data for Germany in csv format: +```bash +python main.py export --countries de +``` + +#### Export merged stations in a 10km radius around the Munich city center in GeoJSON format: +```bash +python main.py export --export_format GeoJSON --export_merged_stations --export_file_descriptor Munich --export_area 11.574774 48.1375526 10000.0 +``` ## Contributing diff --git a/alembic/versions/.gitkeep b/alembic/versions/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/charging_stations_pipelines/stations_data_export.py b/charging_stations_pipelines/stations_data_export.py index b84a910..655a49c 100644 --- a/charging_stations_pipelines/stations_data_export.py +++ b/charging_stations_pipelines/stations_data_export.py @@ -1,45 +1,88 @@ +from dataclasses import dataclass +from charging_stations_pipelines import settings import logging - import geopandas as gpd -from charging_stations_pipelines import settings - logger = logging.getLogger(__name__) -def stations_data_export(db_connection, country_code: str, is_merged: bool = False, all_countries: bool = False, - csv: bool = False): - select_by_country = f"country_code='{country_code}' AND " - if all_countries: - select_by_country = "" - country_code = "europe" +@dataclass +class ExportArea: + lon: float + lat: float + radius_meters: float + + +def stations_data_export(db_connection, + country_code: str, + export_merged: bool = False, + export_charging_attributes: bool = False, + export_all_countries: bool = False, + export_to_csv: bool = False, + export_area: ExportArea = None, + file_descriptor: str = ""): + country_filter = f"country_code='{country_code}' AND " if country_code != "" and not export_all_countries else "" + merged_filter = "s.is_merged" if export_merged else "NOT s.is_merged" + export_area_filter = (f" AND ST_Dwithin(" + f"point, " + f"ST_MakePoint({export_area.lon}, {export_area.lat}, 4326)::geography, " + f"{export_area.radius_meters}" + f")") \ + if export_area else "" - select_merged = "" - file_suffix_merged = "merged" - if not is_merged: - select_merged = "NOT " - file_suffix_merged = "w_duplicates" + get_stations_filter = f"{country_filter}{merged_filter}{export_area_filter}" prefix = settings.db_table_prefix - get_stations_list_sql = f""" - SELECT s.id as station_id, point, data_source, operator, a.street, a.town FROM {prefix}stations s + if not export_charging_attributes: + get_stations_list_sql = f""" + SELECT s.id as station_id, point, data_source, operator, a.street, a.town + FROM {prefix}stations s LEFT JOIN {prefix}address a ON s.id = a.station_id - WHERE {select_by_country}{select_merged}s.is_merged - """ + WHERE {get_stations_filter} + """ + else: + get_stations_list_sql = f""" + SELECT s.id as station_id, point, data_source, operator, + a.street, a.town, + c.capacity, c.socket_type_list, c.dc_support, c.total_kw, c.max_kw + FROM {prefix}stations s + LEFT JOIN {prefix}address a ON s.id = a.station_id + LEFT JOIN {prefix}charging c ON s.id = c.station_id + WHERE {get_stations_filter} + """ - gdf: gpd.GeoDataFrame = gpd.read_postgis(get_stations_list_sql, - con=db_connection, geom_col="point") + logger.debug(f"Running postgis query {get_stations_list_sql}") + gdf: gpd.GeoDataFrame = gpd.read_postgis(get_stations_list_sql, con=db_connection, geom_col="point") - if csv: + if export_to_csv: suffix = "csv" gdf['latitude'] = gdf['point'].apply(lambda point: point.y) gdf['longitude'] = gdf['point'].apply(lambda point: point.x) - json_data = gdf.to_csv() + export_data = gdf.to_csv() else: suffix = "geo.json" - json_data = gdf.to_json() + export_data = gdf.to_json() + + file_country = "europe" if export_all_countries else country_code + file_description = get_file_description(file_descriptor, file_country, export_area) + file_suffix_merged = "merged" if export_merged else "w_duplicates" + file_suffix_charging = "_w_charging" if export_charging_attributes else "" - filename = f"stations_{country_code}_{file_suffix_merged}.{suffix}" - logger.info(f"writing {len(gdf)} stations to {filename}") + filename = f"stations_{file_description}_{file_suffix_merged}{file_suffix_charging}.{suffix}" + logger.info(f"Writing {len(gdf)} stations to {filename}") with open(filename, "w") as outfile: - outfile.write(json_data) + outfile.write(export_data) + + +def get_file_description(file_descriptor: str, file_country: str, export_circle: ExportArea): + is_export_circle_specified = export_circle is not None + if file_descriptor == "": + if is_export_circle_specified: + return f"{export_circle.lon}_{export_circle.lat}_{export_circle.radius_meters}" + else: + return file_country + else: + if is_export_circle_specified: + return file_descriptor + else: + return f"{file_descriptor}_{file_country}" diff --git a/main.py b/main.py index 49de710..2c2a28b 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,7 @@ from charging_stations_pipelines.pipelines.pipeline_factory import pipeline_factory from charging_stations_pipelines.settings import db_uri from charging_stations_pipelines.shared import config -from charging_stations_pipelines.stations_data_export import stations_data_export +from charging_stations_pipelines.stations_data_export import stations_data_export, ExportArea from testing import testdata logger = logging.getLogger("charging_stations_pipelines.main") @@ -22,12 +22,13 @@ def parse_args(args): valid_task_options = ["import", "merge", "export", "testdata"] valid_country_options = ["DE", "FR", "GB", "IT", "NOR", "SWE"] + valid_export_format_options = ["csv", "GeoJSON"] parser = argparse.ArgumentParser( description='eCharm can best be described as an electronic vehicle charging stations data integrator. ' 'It imports data from different publicly available sources, converts it into a common format, ' 'searches for duplicates in the different sources and merges the data (e.g. the attributes) ' - 'and exports the original or merged data to csv or geo-json.', + 'and exports the original or merged data to csv or GeoJSON.', epilog='Example: python main.py import merge --countries de it -v ', ) @@ -42,17 +43,36 @@ def parse_args(args): help='specifies the countries for which to perform the given tasks. ' 'The country-codes must be one or several of %(choices)s (case-insensitive). ' 'If not specified, the given tasks are run for all available countries') - parser.add_argument('-o', '--offline', action='store_true', - help='if set, use data for import that is already present on disk, ' - 'i.e. from previous runs of the import task. ' - 'If not set, the data will be retrieved online from the different data sources.') - parser.add_argument('-d', '--delete_data', action='store_true', - help='for the import task, delete all station data before importing. ' - 'For the merge task, delete only merged station data and ' - 'reset merge status of original stations.') parser.add_argument('-v', '--verbose', action='store_true', help='makes eCharm verbose during operations. Useful for debugging.') + group_import_merge = parser.add_argument_group('import and merge options') + group_import_merge.add_argument('-o', '--offline', action='store_true', + help='use data for import that is already present on disk, ' + 'i.e. from previous runs of the import task. ' + 'The default is to retrieve data online from the different data sources.') + group_import_merge.add_argument('-d', '--delete_data', action='store_true', + help='for the import task, delete all station data before importing. ' + 'For the merge task, delete only merged station data and ' + 'reset merge status of original stations.') + group_export = parser.add_argument_group('export options') + group_export.add_argument('--export_file_descriptor', action='store', metavar='', + help='custom descriptor to be used for export file name. Default is to use the country ' + 'code of the corresponding export.') + group_export.add_argument('--export_format', choices=valid_export_format_options, default='csv', + help='specifies the format of exported data. Default is csv format.') + group_export.add_argument('--export_charging', action='store_true', + help='export additional charging attributes for stations. Default is to export only ' + 'geo-location, address and operator.') + group_export.add_argument('--export_merged_stations', action='store_true', + help='export only the stations merged by the eCharm merge step. Default is to ' + 'export only the stations that were imported from original datasources.') + group_export.add_argument('--export_all_countries', action='store_true', + help='ignore the countries option for the export, and export all countries ' + 'station data into one single file. Default is to export to one file per country.') + group_export.add_argument('--export_area', nargs=3, type=float, metavar=('', '', ''), + help='export data in a circular area around the given lon/lat coordinates.') + return parser.parse_args(args) @@ -61,8 +81,9 @@ def get_db_engine(**kwargs): return create_engine(name_or_url=db_uri, connect_args=connect_args, **kwargs) -def run_import(countries, online: bool, delete_data: bool): +def run_import(countries: list[str], online: bool, delete_data: bool): if delete_data: + logger.debug("deleting all data ...") db_utils.delete_all_data(sessionmaker(bind=get_db_engine())()) for country in countries: @@ -77,11 +98,11 @@ def run_import(countries, online: bool, delete_data: bool): ocm.run() -def run_merge(countries, delete_data: bool): +def run_merge(countries: list[str], delete_data: bool): engine = get_db_engine(pool_pre_ping=True) if delete_data: - print("deleting merged data ...") + logger.debug("deleting merged data ...") db_utils.delete_all_merged_data(sessionmaker(bind=engine)()) for country in countries: @@ -89,9 +110,27 @@ def run_merge(countries, delete_data: bool): merger.run() -def run_export(countries): - for country in countries: - stations_data_export(get_db_engine(), country, is_merged=False, csv=True, all_countries=False) +def run_export(countries: list[str], export_merged: bool, export_charging: bool, export_csv: bool, + export_all_countries: bool, export_area: ExportArea, export_file_descriptor: str): + if export_all_countries or export_area: + stations_data_export(get_db_engine(), + country_code="", + export_merged=export_merged, + export_charging_attributes=export_charging, + export_to_csv=export_csv, + export_all_countries=True, + export_area=export_area, + file_descriptor=export_file_descriptor) + else: + for country in countries: + stations_data_export(get_db_engine(), + country_code=country, + export_merged=export_merged, + export_charging_attributes=export_charging, + export_to_csv=export_csv, + export_all_countries=False, + export_area=None, + file_descriptor=export_file_descriptor) if __name__ == "__main__": @@ -115,4 +154,17 @@ def run_export(countries): testdata.run() if task == "export": - run_export(cli_args.countries) + args_file_descriptor = cli_args.export_file_descriptor if cli_args.export_file_descriptor else "" + args_export_area = ExportArea( + lon=cli_args.export_area[0], + lat=cli_args.export_area[1], + radius_meters=cli_args.export_area[2], + ) if cli_args.export_area else None + run_export(countries=cli_args.countries, + export_merged=cli_args.export_merged_stations, + export_charging=cli_args.export_charging, + export_csv=cli_args.export_format == "csv", + export_all_countries=cli_args.export_all_countries, + export_area=args_export_area, + export_file_descriptor=args_file_descriptor, + )