Skip to content

Commit

Permalink
Feature/improve export (comsysto#49)
Browse files Browse the repository at this point in the history
* extract class to update station table for import and improve console output

* set default log level to info, enable debug logging via -v cli option

* update log statements for station export

* [Breaking] use argparse module for cli option parsing

* revert root log level to warn

* restore "online" as default behaviour for import

* add additional command line options for export task

* add charging station capacity to output

* rename output variable

* add alembic version files to gitignore

* add gitkeep to alembic versions folder, so it is kept in git

* update readme for troubleshooting psycopg2 dependency on mac
  • Loading branch information
mjmader authored Nov 9, 2023
1 parent 155e8f9 commit 048472f
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 68 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ data/
stations_*.csv
stations_*.geo.json
testdata_merge.csv
token_deepatlas.json
token_deepatlas.json
alembic/versions/*.py
58 changes: 34 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,17 @@ source venv/bin/activate
pip install -r requirements.txt
```

**Trouble Shooting:**
##### Trouble Shooting

If pip install is failing during psycopg2 install:

A) Search for openssl lib path (`which openssl` may help)
1. Make sure you have the following packages installed on your system
- Ubuntu users: `build-essential`, `libgdal-dev`,`libpq5`
- Mac users: A postgres version needs to be installed, e.g. via `brew install postgresql`
2. If there are still errors, search for openssl lib path (`which openssl` may help)
and identify which one is needed for linking the compiler for psycopg2 correctly.
Then export it as the corresponding LDFLAGS environment variable before pip install, for instance:

export LDFLAGS="-L/usr/local/Cellar/openssl@3/3.1.3/lib"

B) Ubuntu Users:

Make sure you have the following packages installed:

- build-essential
- libgdal-dev
- libpq5
Then export it as the corresponding LDFLAGS environment variable before pip install, for instance,
`export LDFLAGS="-L/usr/local/Cellar/openssl@3/3.1.3/lib"`


#### Set environment variables
Expand Down Expand Up @@ -134,21 +128,37 @@ Then run migration

alembic upgrade head

### Run your import/merge/export
### Running eCharm
eCharm can be run similar to a command line tool.
Run `python main.py -h` to see the full list of command line options.

Here are a few example commands for running tasks:

#### Import and merge stations for all available countries:
```bash
python main.py import merge export --countries de it
python main.py import merge --delete_data
```
Note that you have to configure API-keys for certain data sources, as explained in the
[set environment variables section](#set-environment-variables).

Run `python main.py -h` to see the full list of command line options.
We also recommend to use the `--delete_data` flag to remove old data from the database before running import or merge
tasks, since eCharm is not (yet) clever with updating data from consecutive imports or merges.

Feel free to adjust the command line options to your needs:
* main tasks
* `import` fetches and stores the data from the original sources, i.e. OSM, OCM and potential government data sources
* `merge` searches for duplicates and merges attributes of duplicate stations
* `export` create a data export for the specified countries in `csv` or `geo-json` format
* `countries` Currently we support `de`,`gb`,`fr`, `it`, `nor` and `swe`
* `offline` if not present (default), fetch data online from original data sources. If present, use files cached on disk
* `delete_data` if present: For import, delete all existing stations data before import. For merge, it delete only merged station data and reset merge status of original stations
#### Import and merge stations for Germany and Italy only:
```bash
python main.py import merge --countries de it --delete_data
```
Currently, we support `de`,`gb`,`fr`, `it`, `nor` and `swe` as country codes.

#### Export all original (un-merged) station data for Germany in csv format:
```bash
python main.py export --countries de
```

#### Export merged stations in a 10km radius around the Munich city center in GeoJSON format:
```bash
python main.py export --export_format GeoJSON --export_merged_stations --export_file_descriptor Munich --export_area 11.574774 48.1375526 10000.0
```


## Contributing
Expand Down
Empty file added alembic/versions/.gitkeep
Empty file.
95 changes: 69 additions & 26 deletions charging_stations_pipelines/stations_data_export.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,88 @@
from dataclasses import dataclass
from charging_stations_pipelines import settings
import logging

import geopandas as gpd

from charging_stations_pipelines import settings

logger = logging.getLogger(__name__)


def stations_data_export(db_connection, country_code: str, is_merged: bool = False, all_countries: bool = False,
csv: bool = False):
select_by_country = f"country_code='{country_code}' AND "
if all_countries:
select_by_country = ""
country_code = "europe"
@dataclass
class ExportArea:
lon: float
lat: float
radius_meters: float


def stations_data_export(db_connection,
country_code: str,
export_merged: bool = False,
export_charging_attributes: bool = False,
export_all_countries: bool = False,
export_to_csv: bool = False,
export_area: ExportArea = None,
file_descriptor: str = ""):
country_filter = f"country_code='{country_code}' AND " if country_code != "" and not export_all_countries else ""
merged_filter = "s.is_merged" if export_merged else "NOT s.is_merged"
export_area_filter = (f" AND ST_Dwithin("
f"point, "
f"ST_MakePoint({export_area.lon}, {export_area.lat}, 4326)::geography, "
f"{export_area.radius_meters}"
f")") \
if export_area else ""

select_merged = ""
file_suffix_merged = "merged"
if not is_merged:
select_merged = "NOT "
file_suffix_merged = "w_duplicates"
get_stations_filter = f"{country_filter}{merged_filter}{export_area_filter}"

prefix = settings.db_table_prefix
get_stations_list_sql = f"""
SELECT s.id as station_id, point, data_source, operator, a.street, a.town FROM {prefix}stations s
if not export_charging_attributes:
get_stations_list_sql = f"""
SELECT s.id as station_id, point, data_source, operator, a.street, a.town
FROM {prefix}stations s
LEFT JOIN {prefix}address a ON s.id = a.station_id
WHERE {select_by_country}{select_merged}s.is_merged
"""
WHERE {get_stations_filter}
"""
else:
get_stations_list_sql = f"""
SELECT s.id as station_id, point, data_source, operator,
a.street, a.town,
c.capacity, c.socket_type_list, c.dc_support, c.total_kw, c.max_kw
FROM {prefix}stations s
LEFT JOIN {prefix}address a ON s.id = a.station_id
LEFT JOIN {prefix}charging c ON s.id = c.station_id
WHERE {get_stations_filter}
"""

gdf: gpd.GeoDataFrame = gpd.read_postgis(get_stations_list_sql,
con=db_connection, geom_col="point")
logger.debug(f"Running postgis query {get_stations_list_sql}")
gdf: gpd.GeoDataFrame = gpd.read_postgis(get_stations_list_sql, con=db_connection, geom_col="point")

if csv:
if export_to_csv:
suffix = "csv"
gdf['latitude'] = gdf['point'].apply(lambda point: point.y)
gdf['longitude'] = gdf['point'].apply(lambda point: point.x)
json_data = gdf.to_csv()
export_data = gdf.to_csv()
else:
suffix = "geo.json"
json_data = gdf.to_json()
export_data = gdf.to_json()

file_country = "europe" if export_all_countries else country_code
file_description = get_file_description(file_descriptor, file_country, export_area)
file_suffix_merged = "merged" if export_merged else "w_duplicates"
file_suffix_charging = "_w_charging" if export_charging_attributes else ""

filename = f"stations_{country_code}_{file_suffix_merged}.{suffix}"
logger.info(f"writing {len(gdf)} stations to {filename}")
filename = f"stations_{file_description}_{file_suffix_merged}{file_suffix_charging}.{suffix}"
logger.info(f"Writing {len(gdf)} stations to {filename}")
with open(filename, "w") as outfile:
outfile.write(json_data)
outfile.write(export_data)


def get_file_description(file_descriptor: str, file_country: str, export_circle: ExportArea):
is_export_circle_specified = export_circle is not None
if file_descriptor == "":
if is_export_circle_specified:
return f"{export_circle.lon}_{export_circle.lat}_{export_circle.radius_meters}"
else:
return file_country
else:
if is_export_circle_specified:
return file_descriptor
else:
return f"{file_descriptor}_{file_country}"
86 changes: 69 additions & 17 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from charging_stations_pipelines.pipelines.pipeline_factory import pipeline_factory
from charging_stations_pipelines.settings import db_uri
from charging_stations_pipelines.shared import config
from charging_stations_pipelines.stations_data_export import stations_data_export
from charging_stations_pipelines.stations_data_export import stations_data_export, ExportArea
from testing import testdata

logger = logging.getLogger("charging_stations_pipelines.main")
Expand All @@ -22,12 +22,13 @@
def parse_args(args):
valid_task_options = ["import", "merge", "export", "testdata"]
valid_country_options = ["DE", "FR", "GB", "IT", "NOR", "SWE"]
valid_export_format_options = ["csv", "GeoJSON"]

parser = argparse.ArgumentParser(
description='eCharm can best be described as an electronic vehicle charging stations data integrator. '
'It imports data from different publicly available sources, converts it into a common format, '
'searches for duplicates in the different sources and merges the data (e.g. the attributes) '
'and exports the original or merged data to csv or geo-json.',
'and exports the original or merged data to csv or GeoJSON.',
epilog='Example: python main.py import merge --countries de it -v ',
)

Expand All @@ -42,17 +43,36 @@ def parse_args(args):
help='specifies the countries for which to perform the given tasks. '
'The country-codes must be one or several of %(choices)s (case-insensitive). '
'If not specified, the given tasks are run for all available countries')
parser.add_argument('-o', '--offline', action='store_true',
help='if set, use data for import that is already present on disk, '
'i.e. from previous runs of the import task. '
'If not set, the data will be retrieved online from the different data sources.')
parser.add_argument('-d', '--delete_data', action='store_true',
help='for the import task, delete all station data before importing. '
'For the merge task, delete only merged station data and '
'reset merge status of original stations.')
parser.add_argument('-v', '--verbose', action='store_true',
help='makes eCharm verbose during operations. Useful for debugging.')

group_import_merge = parser.add_argument_group('import and merge options')
group_import_merge.add_argument('-o', '--offline', action='store_true',
help='use data for import that is already present on disk, '
'i.e. from previous runs of the import task. '
'The default is to retrieve data online from the different data sources.')
group_import_merge.add_argument('-d', '--delete_data', action='store_true',
help='for the import task, delete all station data before importing. '
'For the merge task, delete only merged station data and '
'reset merge status of original stations.')
group_export = parser.add_argument_group('export options')
group_export.add_argument('--export_file_descriptor', action='store', metavar='<file descriptor>',
help='custom descriptor to be used for export file name. Default is to use the country '
'code of the corresponding export.')
group_export.add_argument('--export_format', choices=valid_export_format_options, default='csv',
help='specifies the format of exported data. Default is csv format.')
group_export.add_argument('--export_charging', action='store_true',
help='export additional charging attributes for stations. Default is to export only '
'geo-location, address and operator.')
group_export.add_argument('--export_merged_stations', action='store_true',
help='export only the stations merged by the eCharm merge step. Default is to '
'export only the stations that were imported from original datasources.')
group_export.add_argument('--export_all_countries', action='store_true',
help='ignore the countries option for the export, and export all countries '
'station data into one single file. Default is to export to one file per country.')
group_export.add_argument('--export_area', nargs=3, type=float, metavar=('<lon>', '<lat>', '<radius in m>'),
help='export data in a circular area around the given lon/lat coordinates.')

return parser.parse_args(args)


Expand All @@ -61,8 +81,9 @@ def get_db_engine(**kwargs):
return create_engine(name_or_url=db_uri, connect_args=connect_args, **kwargs)


def run_import(countries, online: bool, delete_data: bool):
def run_import(countries: list[str], online: bool, delete_data: bool):
if delete_data:
logger.debug("deleting all data ...")
db_utils.delete_all_data(sessionmaker(bind=get_db_engine())())

for country in countries:
Expand All @@ -77,21 +98,39 @@ def run_import(countries, online: bool, delete_data: bool):
ocm.run()


def run_merge(countries, delete_data: bool):
def run_merge(countries: list[str], delete_data: bool):
engine = get_db_engine(pool_pre_ping=True)

if delete_data:
print("deleting merged data ...")
logger.debug("deleting merged data ...")
db_utils.delete_all_merged_data(sessionmaker(bind=engine)())

for country in countries:
merger: StationMerger = StationMerger(country_code=country, config=config, db_engine=engine)
merger.run()


def run_export(countries):
for country in countries:
stations_data_export(get_db_engine(), country, is_merged=False, csv=True, all_countries=False)
def run_export(countries: list[str], export_merged: bool, export_charging: bool, export_csv: bool,
export_all_countries: bool, export_area: ExportArea, export_file_descriptor: str):
if export_all_countries or export_area:
stations_data_export(get_db_engine(),
country_code="",
export_merged=export_merged,
export_charging_attributes=export_charging,
export_to_csv=export_csv,
export_all_countries=True,
export_area=export_area,
file_descriptor=export_file_descriptor)
else:
for country in countries:
stations_data_export(get_db_engine(),
country_code=country,
export_merged=export_merged,
export_charging_attributes=export_charging,
export_to_csv=export_csv,
export_all_countries=False,
export_area=None,
file_descriptor=export_file_descriptor)


if __name__ == "__main__":
Expand All @@ -115,4 +154,17 @@ def run_export(countries):
testdata.run()

if task == "export":
run_export(cli_args.countries)
args_file_descriptor = cli_args.export_file_descriptor if cli_args.export_file_descriptor else ""
args_export_area = ExportArea(
lon=cli_args.export_area[0],
lat=cli_args.export_area[1],
radius_meters=cli_args.export_area[2],
) if cli_args.export_area else None
run_export(countries=cli_args.countries,
export_merged=cli_args.export_merged_stations,
export_charging=cli_args.export_charging,
export_csv=cli_args.export_format == "csv",
export_all_countries=cli_args.export_all_countries,
export_area=args_export_area,
export_file_descriptor=args_file_descriptor,
)

0 comments on commit 048472f

Please sign in to comment.