diff --git a/.github/workflows/pr-ci.yml b/.github/workflows/pr-ci.yml index a2aecaa..d26a9f3 100755 --- a/.github/workflows/pr-ci.yml +++ b/.github/workflows/pr-ci.yml @@ -16,7 +16,7 @@ jobs: test: strategy: matrix: - os: [windows-latest, ubuntu-latest, macos-latest] + os: [ubuntu-latest, macos-latest] py3version: ["9", "11"] fail-fast: false uses: arup-group/actions-city-modelling-lab/.github/workflows/python-install-lint-test.yml@main diff --git a/.gitignore b/.gitignore index 06fd923..56c75cc 100755 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,8 @@ reports/ mike-*.yml # Jupyter notebooks -.ipynb_checkpoints \ No newline at end of file +.ipynb_checkpoints + +sandbox.py +tests/test_data/outputs/ +tests/test_data/outputs/*log \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e2a5dc..7e5d2c2 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed -## [v0.1.0] - 2023-11-28 +## [v0.1.0] - 2023-12-13 Initial release. diff --git a/README.md b/README.md index db9a868..87cfe61 100755 --- a/README.md +++ b/README.md @@ -2,11 +2,15 @@ ![gtfs_skims](resources/logos/title.png) -# gtfs-skims (gtfs_skims) +# Argo (gtfs_skims) [![Daily CI Build](https://github.com/arup-group/gtfs_skims/actions/workflows/daily-scheduled-ci.yml/badge.svg)](https://github.com/arup-group/gtfs_skims/actions/workflows/daily-scheduled-ci.yml) [![Documentation](https://github.com/arup-group/gtfs_skims/actions/workflows/pages/pages-build-deployment/badge.svg?branch=gh-pages)](https://arup-group.github.io/gtfs_skims) +Argo is a library aimed at the fast calculation of generalised time matrices from GTFS files. +By applying appropriate simplifications on the GTFS dataset, the library is able to calculate such matrices at scale. +For example, it was possible to calculate an MSOA-to-MSOA matrix for England and Wales in ~1 hour (with a relatevile large machine). + ## Documentation diff --git a/docs/installation.md b/docs/installation.md index 3c1d127..876e1f6 100755 --- a/docs/installation.md +++ b/docs/installation.md @@ -1,6 +1,8 @@ # Installation +Note: this library only supports Unix-based systems (ie Ubuntu/macOS). If you wish to use it on Windows please use the Windows Subsystem for Linux. + ## Setting up a user environment As a `gtfs_skims` user, it is easiest to install using the [mamba](https://mamba.readthedocs.io/en/latest/index.html) package manager, as follows: diff --git a/docs/methodology.md b/docs/methodology.md new file mode 100644 index 0000000..d8261e7 --- /dev/null +++ b/docs/methodology.md @@ -0,0 +1,37 @@ +# Methodology + +Argo calculates generalised time matrices between a set of origin and destination points. + +Generalised time is defined as follows: + +$$ +gc = ivt + \beta_{wait} \cdot wait\_time + \beta_{walk} \cdot walk\_time + \beta_{interchange\_penalty} \cdot n\_transfers +$$ + +Some example values for the leg component weights are: + +$$ +\beta_{wait} = \beta_{walk} = 2-3 +$$ + +and + +$$ +\beta_{\text{interchange\_penalty}} = 5 \text{ to } 10 \text{ minutes} +$$ + +Walk distance is calculated as the crow's fly distance between two points, multiplied by a factor specified in the config file (typically ~1.3). + +The library creates a graph representation of the GTFS dataset, where the edges represent vehicle movements or connections (access/egress/transfer legs). It then applied a shortest-paths algorithm, using generalised time as edge weights. + +To achieve high performance, the user can limit the search space by: +* selecting a time scope and maximum travel time +* selecting a specific day +* selecting a maximum walk, wait and trasfer time for legs +* applying a spatial bounding box + +We further improve performance by: +* using K-dimensional trees to organise spatial data +* using the effiecient graph-tool library to calculate shortest distances +* parallelising the shortest distances calculation, and vectorising data transformation tasks +* saving files to a compressed parquet format \ No newline at end of file diff --git a/docs/run.md b/docs/run.md new file mode 100644 index 0000000..bfe1eb6 --- /dev/null +++ b/docs/run.md @@ -0,0 +1,48 @@ +# Running Argo + +To run argo simply type this command on the command line: +``` +argo run +``` +, where is the path to the config yaml file. + +An example config file is shown below: +``` +paths: + path_gtfs: ./tests/test_data/iow-bus-gtfs.zip + path_outputs: ./tests/test_data/outputs + path_origins: ./tests/test_data/centroids.csv # path to the origin points + path_destinations: ./tests/test_data/centroids.csv # path to the destination points + +settings: + calendar_date : 20190515 # yyyymmdd | Date for filtering the GTFS file. + start_s : 32400 # sec | Start time of the journey. + end_s : 41400 # sec | Max end time of a journey. + walk_distance_threshold : 2000 # m | Max walk distance in a leg + walk_speed : 4.5 # kph | Walking speed + crows_fly_factor : 1.3 # Conversion factor from euclidean to routed distances + max_transfer_time : 1800 # Max combined time of walking and waiting (sec) of a transfer + max_wait : 1800 # sec | Max wait time at a stop / leg + bounding_box : null + epsg_centroids: 27700 # coordinate system of the centroids file. Needs to be Cartesian and in meters. + weight_walk: 2 # value of walk time, ratio to in-vehicle time + weight_wait: 2 # value of wait time, ratio to in-vehicle time + penalty_interchange: 300 # seconds added to generalised cost for each interchange + +steps: + - preprocessing + - connectors + - graph +``` + +To run the example provided by the repo, use: +``` +argo run ./tests/test_data/config_demo.yaml +``` + +The time matrices will be saved in the `output_path` directory defined in the config file, in the `skims.parquet.gzip` file. An easy way to read the file is with pandas: +``` +import pandas as pd +df = pd.read_parquet('/skims.parquet.gzip') +df +``` \ No newline at end of file diff --git a/gtfs_skims/__init__.py b/gtfs_skims/__init__.py index 24d1f27..36ba3aa 100755 --- a/gtfs_skims/__init__.py +++ b/gtfs_skims/__init__.py @@ -1,5 +1,8 @@ """Top-level module for gtfs_skims.""" +import pyproj __author__ = """Theodore-Chatziioannou""" __email__ = "Theodore.Chatziioannou@arup.com" __version__ = "0.1.0" + +pyproj.network.set_network_enabled(False) diff --git a/gtfs_skims/cli.py b/gtfs_skims/cli.py index 0707407..e170fb0 100755 --- a/gtfs_skims/cli.py +++ b/gtfs_skims/cli.py @@ -1,14 +1,39 @@ """Console script for gtfs_skims.""" +from typing import Optional + import click +from gtfs_skims.connectors import main as main_connectors +from gtfs_skims.graph import main as main_graph +from gtfs_skims.preprocessing import main as main_preprocessing +from gtfs_skims.utils import Config + @click.version_option(package_name="gtfs_skims") -@click.command() +@click.group def cli(args=None): - """Console script for gtfs_skims.""" - click.echo( - "Replace this message by putting your code into gtfs_skims.cli.cli" - ) - click.echo("See click documentation at https://click.palletsprojects.com/") + """Console script for Argo (gtfs_skims).""" return 0 + + +@cli.command() +@click.argument("config_path") +@click.option("--output_directory_override", default=None, help="override output directory") +def run(config_path: str, output_directory_override: Optional[str] = None): + config = Config.from_yaml(config_path) + if output_directory_override is not None: + config.path_outputs = output_directory_override + steps = config.steps + + gtfs_data = None + connectors_data = None + + if "preprocessing" in steps: + gtfs_data = main_preprocessing(config=config) + + if "connectors" in steps: + connectors_data = main_connectors(config=config, data=gtfs_data) + + if "graph" in steps: + main_graph(config=config, gtfs_data=gtfs_data, connectors_data=connectors_data) diff --git a/gtfs_skims/connectors.py b/gtfs_skims/connectors.py new file mode 100644 index 0000000..77d574c --- /dev/null +++ b/gtfs_skims/connectors.py @@ -0,0 +1,414 @@ +from __future__ import annotations + +import os +from functools import cached_property +from typing import Optional + +import numpy as np +import pandas as pd +from scipy.spatial import KDTree + +from gtfs_skims.utils import Config, ConnectorsData, GTFSData, get_logger +from gtfs_skims.variables import DATA_TYPE + + +def query_pairs(coords: np.ndarray, radius: float) -> np.array: + """Get origin-destination pairs between points, within a radius. + The connections are forward-looking in z: ie the destination point + has always greater z coordinate than the origin point. + + Args: + coords (np.ndarray): Point coordinates (x, y, z) + radius (float): Maximum distance between points + + Returns: + np.array: Feasible connections between points. + """ + ids = coords[:, 2].argsort() + + dtree = KDTree(coords[ids]) + connectors = dtree.query_pairs(r=radius, output_type="ndarray", p=2) + + return ids[connectors] + + +class TransferConnectors: + def __init__(self, coords: np.ndarray, max_transfer_distance: float) -> None: + """Manages transfer connectors. + + Args: + coords (np.ndarray): Point coordinates (x, y, z) + max_transfer_distance (float): Maximum distance between points + """ + self.coords = coords + radius = max_transfer_distance * (2**0.5) + self.ods = query_pairs(coords, radius=radius) + + @cached_property + def ocoords(self) -> np.array: + """Origin coordinates. + + Returns: + np.array: x, y, z + """ + return self.coords[self.ods[:, 0]] + + @cached_property + def dcoords(self) -> np.array: + """Destination coordinates. + + Returns: + np.array: x, y, z + """ + return self.coords[self.ods[:, 1]] + + @cached_property + def walk(self) -> np.array: + """Walk distance (euclidean). + + Returns: + np.array: Distance from origin to destination point (on the xy axis). + """ + walk = ((self.dcoords[:, :2] - self.ocoords[:, :2]) ** 2).sum(1) ** 0.5 + return walk + + @cached_property + def wait(self) -> np.array: + """Wait distance. It is calculated as the difference between timestamps (dz) + and the distance required to walk to the destination. + + Returns: + np.array: Wait distance. + """ + wait = self.dcoords[:, 2] - self.ocoords[:, 2] - self.walk + return wait + + def filter(self, cond: np.ndarray[bool]) -> None: + """Filter (in-place) Connnectors' origin-destination data based on a set of conditions. + + Args: + cond np.array[bool]: The boolean condition filter to use. + """ + ods = self.ods + ocoords = self.ocoords + dcoords = self.dcoords + walk = self.walk + wait = self.wait + + self.ods = ods[cond] + self.ocoords = ocoords[cond] + self.dcoords = dcoords[cond] + self.walk = walk[cond] + self.wait = wait[cond] + + return self + + def filter_feasible_transfer(self, maxdist: float) -> None: + """Remove any connections with insufficient transfer time. + + + Args: + maxdist (float): Maximum transfer distance (walk+wait) + """ + is_feasible = (self.wait > 0) & ((self.walk + self.wait) <= maxdist) + self.filter(is_feasible) + + def filter_max_walk(self, max_walk: float) -> None: + """Remove any connections beyond a walk-distance threshold. + + Args: + max_walk (float): Max walk distance + """ + cond = self.walk <= max_walk + self.filter(cond) + + def filter_max_wait(self, max_wait: float) -> None: + """Remove any connections beyond a wait distance threshold. + + Args: + max_wait (float): Maximum stop (leg) wait time. + """ + self.filter(self.wait <= max_wait) + + def filter_same_route(self, routes: np.ndarray) -> None: + """Remove connections between services of the same route. + + Args: + routes (np.array): Route IDs array. Its indexing matches the self.coords table. + """ + self.filter(routes[self.ods[:, 0]] != routes[self.ods[:, 1]]) + + def filter_nearest_service(self, services: np.ndarray) -> None: + """If a service can be accessed from a origin through multiple stops, + then only keep the most efficient transfer for that connection. + + Args: + services (np.array): Service IDs array. Its indexing must match the self.coords table. + """ + services_d = services[self.ods[:, 1]] # destination service + + # sort by trasfer distance + transfer = self.wait + self.walk + idx_sorted = transfer.argsort() + + # create origin-service combinations + order_o = int(np.floor(np.log10(services.max())) + 1) + comb = (self.ods[:, 0] + 1) * 10**order_o + services_d + + # get first instance of each origin-service combination + # (which corresponds to the most efficient transfer) + keep = idx_sorted[np.unique(comb[idx_sorted], return_index=True)[1]] + cond = np.isin(np.arange(len(comb)), keep) + + self.filter(cond) + + +def query_pairs_od( + coords_origins: np.ndarray, coords_destinations: np.ndarray, radius: float +) -> np.array: + """Get origin-destination pairs between points, within a radius. + + Args: + coords_origins (np.array): Coordinates of origin points + coords_destinations (np.array): Coordinates of destination points + radius (float): Maximum distance between points + + Returns: + np.array: Feasible connections between points. + """ + tree_origins = KDTree(coords_origins) + tree_destinations = KDTree(coords_destinations) + + ods = tree_origins.query_ball_tree(tree_destinations, r=radius) + + # flatten + ods = np.column_stack( + [np.repeat(range(len(coords_origins)), list(map(len, ods))), np.concatenate(ods)] + ).astype(DATA_TYPE) + + return ods + + +class AccessEgressConnectors(TransferConnectors): + """Connections between zones/endpoints and stops""" + + def __init__( + self, + coords_origins: np.ndarray, + coords_destinations: np.ndarray, + max_transfer_distance: float, + ) -> None: + self.coords_origins = coords_origins + self.coords_destinations = coords_destinations + + radius = max_transfer_distance + if coords_origins.shape[1] == 3: + radius += max_transfer_distance * (2**0.5) + + self.ods = query_pairs_od(coords_origins, coords_destinations, radius=radius) + + @cached_property + def ocoords(self) -> np.array: + """Origin coordinates. + + Returns: + np.array: x, y (, z) + """ + return self.coords_origins[self.ods[:, 0]] + + @cached_property + def dcoords(self) -> np.array: + """Destination coordinates. + + Returns: + np.array: x, y (,z) + """ + return self.coords_destinations[self.ods[:, 1]] + + +def get_transfer_connectors(data: GTFSData, config: Config) -> np.array: + """Get all transfer connectors (between stops). + + Args: + data (GTFSData): GTFS data object. + config (Config): Config object. + + Returns: + np.ndarray: [origin id, destination id, walk time, wait time] + """ + time_to_distance = config.walk_speed / 3.6 # km/hr to meters + max_transfer_distance = config.max_transfer_time * time_to_distance + max_wait_distance = config.max_wait * time_to_distance + + # get candidate connectors + coords = data.stop_times[["x", "y", "departure_s"]].values + coords[:, :2] = coords[:, :2] * config.crows_fly_factor # crow's fly transformation + tc = TransferConnectors(coords, max_transfer_distance) + + # apply more narrow filters: + # enough time to make transfer + tc.filter_feasible_transfer(max_transfer_distance) + + # maximum walk + if config.walk_distance_threshold < max_transfer_distance: + tc.filter_max_walk(config.walk_distance_threshold) + + # maximum wait + if max_wait_distance < max_transfer_distance: + tc.filter_max_wait(max_wait_distance) + + # not same route + routes = data.stop_times["trip_id"].map(data.trips.set_index("trip_id")["route_id"]).values + tc.filter_same_route(routes) + + # most efficient transfer to service + services = data.stop_times["trip_id"].map(data.trips.set_index("trip_id")["service_id"]).values + tc.filter_nearest_service(services) + + # construct array + arr = ( + np.concatenate( + [ + tc.ods, # origin and destination index + (tc.walk / time_to_distance).reshape(-1, 1), # walk time (seconds) + (tc.wait / time_to_distance).reshape(-1, 1), # wait time (seconds) + ], + axis=1, + ) + .round(1) + .astype(DATA_TYPE) + ) + + return arr + + +def get_access_connectors(data: GTFSData, config: Config, origins: pd.DataFrame) -> np.ndarray: + """Get all access connectors (between origins and stops). + + Args: + data (GTFSData): GTFS data object. + config (Config): Config object. + destinations (pd.DataFrame): Origin coordinates dataframe. + Must include 'x' and 'y' columns, providing the cartesian coordinates of the trip start points. + + Returns: + np.ndarray: [origin id, destination id, walk time, wait time] + """ + time_to_distance = config.walk_speed / 3.6 # km/hr to meters + max_transfer_distance = config.max_transfer_time * time_to_distance + max_wait_distance = config.max_wait * time_to_distance + + # get candidate connectors + coords_stops = data.stop_times[["x", "y", "departure_s"]].values + coords_stops[:, :2] = coords_stops[:, :2] * config.crows_fly_factor # crow's fly transformation + coords_origins = (origins[["x", "y"]] * config.crows_fly_factor).assign(z=config.start_s).values + + ac = AccessEgressConnectors(coords_origins, coords_stops, max_transfer_distance) + + # more narrow filtering + ac.filter_feasible_transfer(max_transfer_distance) + if config.walk_distance_threshold < max_transfer_distance: + ac.filter_max_walk(config.walk_distance_threshold) + if max_wait_distance < max_transfer_distance: + ac.filter_max_wait(max_wait_distance) + + arr = ( + np.concatenate( + [ + ac.ods, # origin and destination index + (ac.walk / time_to_distance).reshape(-1, 1), # walk time (seconds) + (ac.wait / time_to_distance).reshape(-1, 1), # wait time (seconds) + ], + axis=1, + ) + .round(1) + .astype(DATA_TYPE) + ) + + return arr + + +def get_egress_connectors(data: GTFSData, config: Config, destinations: pd.DataFrame) -> np.ndarray: + """Get all egress connectors (between stops and destinations). + + Args: + data (GTFSData): GTFS data object. + config (Config): Config object. + destinations (pd.DataFrame): Destination coordinates dataframe. + Must include 'x' and 'y' columns, providing the cartesian coordinates of the trip ends. + + Returns: + np.ndarray: [origin id, destination id, walk time, wait time] + """ + time_to_distance = config.walk_speed / 3.6 # km/hr to meters + + # get candidate connectors + coords_stops = data.stop_times[["x", "y"]].values + coords_stops[:, :2] = coords_stops[:, :2] * config.crows_fly_factor # crow's fly transformation + coords_destinations = (destinations[["x", "y"]] * config.crows_fly_factor).values + + ec = AccessEgressConnectors(coords_stops, coords_destinations, config.walk_distance_threshold) + + arr = ( + np.concatenate( + [ + ec.ods, # origin and destination index + (ec.walk / time_to_distance).reshape(-1, 1), # walk time (seconds) + np.array([0] * len(ec.ods)).reshape(-1, 1), # wait time = 0 + ], + axis=1, + ) + .round(1) + .astype(DATA_TYPE) + ) + + return arr + + +def main(config: Config, data: Optional[GTFSData] = None) -> ConnectorsData: + """Get feasible connections (transfers, access, egress). + + Args: + config (Config): Config object. + data (Optional[GTFSData], optional): GTFS data object. + If not provided, reads the stored parquet files from the outputs directory. + Defaults to None. + + Returns: + ConnectorsData: Connectors object, holding the three output tables. + """ + logger = get_logger(os.path.join(config.path_outputs, "log_connectors.log")) + + if data is None: + data = GTFSData.from_parquet(config.path_outputs) + origins = pd.read_csv(config.path_origins, index_col=0) + destinations = pd.read_csv(config.path_destinations, index_col=0) + + # get feasible connections + logger.info("Getting transfer connectors...") + connectors_transfer = get_transfer_connectors(data, config) + logger.info("Getting access connectors...") + connectors_access = get_access_connectors(data, config, origins) + logger.info("Getting egress connectors...") + connectors_egress = get_egress_connectors(data, config, destinations) + + # convert to dataframe + colnames = ["onode", "dnode", "walk", "wait"] + connectors_transfer = pd.DataFrame(connectors_transfer, columns=colnames) + connectors_access = pd.DataFrame(connectors_access, columns=colnames) + connectors_egress = pd.DataFrame(connectors_egress, columns=colnames) + + # offset IDs for endpoints + connectors_access["onode"] += len(data.stop_times) + connectors_egress["dnode"] += len(data.stop_times) + len(origins) + + # save + logger.info(f"Saving connectors to {config.path_outputs}...") + connectors = ConnectorsData( + connectors_transfer=connectors_transfer, + connectors_access=connectors_access, + connectors_egress=connectors_egress, + ) + connectors.save(config.path_outputs) + + return connectors diff --git a/gtfs_skims/graph.py b/gtfs_skims/graph.py new file mode 100644 index 0000000..3ce8af8 --- /dev/null +++ b/gtfs_skims/graph.py @@ -0,0 +1,234 @@ +import multiprocessing +import os +from functools import partial +from typing import Optional + +import numpy as np +import pandas as pd +from graph_tool import Graph +from graph_tool.topology import shortest_distance + +from gtfs_skims.utils import Config, ConnectorsData, GTFSData, get_logger + + +def get_ivt_edges(stop_times: pd.DataFrame) -> pd.DataFrame: + """Get in-vehicle times between stops. + + Args: + stop_times (pd.DataFrame): The stoptimes GTFS table. + + Returns: + np.ndarray: [origin id, destination id, in-vehicle time] + """ + edges_ivt = pd.Series(range(len(stop_times))) + trip_id = stop_times.reset_index()["trip_id"] + departures = stop_times.reset_index()["departure_s"] + + edges_ivt = ( + pd.concat( + [ + edges_ivt, + edges_ivt.groupby(trip_id).shift(-1), + departures.groupby(trip_id).shift(-1) - departures, + ], + axis=1, + ) + .dropna() + .map(int) + ) + edges_ivt.columns = ["onode", "dnode", "ivt"] + + return edges_ivt + + +def get_all_edges(gtfs_data: GTFSData, connectors_data: ConnectorsData) -> pd.DataFrame: + """Get all edges for the accessibility graph. + + Args: + gtfs_data (GTFSData): GTFS data object. + connectors_data (ConnectorsData): Connectords data object. + + Returns: + pd.DataFrame: ['onode', 'dnode', 'ivt', 'walk', 'wait', 'transfer'] + """ + edges = ( + pd.concat( + [ + get_ivt_edges(gtfs_data.stop_times), + connectors_data.connectors_transfer.assign(transfer=1), + connectors_data.connectors_access, + connectors_data.connectors_egress, + ], + axis=0, + ) + .fillna(0) + .map(int) + ) + + return edges + + +def add_gc(edges: pd.DataFrame, config: Config) -> pd.DataFrame: + """Calculate generalised time and add it as a column to the 'edges' table. + + Args: + edges (pd.DataFrame): Edges dataframe. Should include these columns: + ['ivt', 'walk', 'wait', 'transfer'] + config (Config): Config object. + + Returns: + pd.DataFrame: Edges dataframe, with the generalised time ("gc") column included. + """ + edges["gc"] = ( + edges["ivt"] + + edges["walk"] * config.weight_walk + + edges["wait"] * config.weight_wait + + edges["transfer"] * config.penalty_interchange + ) + + # adding unweighted time as well + edges["time"] = edges[["ivt", "walk", "wait"]].sum(1) + + return edges + + +def build_graph(edges: pd.DataFrame, vars=["ivt", "walk", "wait", "time", "gc"]) -> Graph: + """Build a network graph from the edges table. + + Args: + edges (pd.DataFrame): Edges table. Should include the 'gc' and 'time' columns from the 'add_gc' method. + vars (list): list of variables to include in the graph as edge properties. + + Returns: + Graph: Connected GTFS graph + """ + eprops = [(x, "int") for x in vars] + g = Graph(edges[["onode", "dnode"] + vars].values, hashed=False, eprops=eprops) + return g + + +def get_shortest_distances_single( + graph: Graph, + onode: int, + dnodes: list[int], + max_dist: Optional[float] = None, + attribute: str = "gc", +) -> np.ndarray: + """Get shortest distances from a single origin. + + Args: + graph (Graph): GTFS graph. + onode (int): Source node. + dnodes (list[int]): Destination nodes. + max_dist (Optional[float], optional): Maximum search distance. Defaults to None. + attribute (str, optional): Edge weights attribute. Defaults to 'gc'. + + Returns: + np.ndarray: Shortest distances. The first value is the source node. + """ + d = shortest_distance( + graph, + onode, + dnodes, + weights=graph.edge_properties[attribute], + dense=False, + max_dist=max_dist, + directed=True, + ) + d = np.concatenate([np.array([onode]), d]) + + return d + + +def get_shortest_distances( + graph: Graph, + onodes: list[int], + dnodes: list[int], + max_dist: Optional[float] = None, + attribute: str = "gc", +) -> pd.DataFrame: + """Get shortest distances from a set of origins to a set of destinations. + + Args: + graph (Graph): GTFS graph. + onode (int): Source nodes. + dnodes (list[int]): Destination nodes. + max_dist (Optional[float], optional): Maximum search distance. Defaults to None. + attribute (str, optional): Edge weights attribute. Defaults to 'gc'. + + Returns: + pd.DataFrame: Shortest distances matrix. + The dataframe indices are the origin nodes, + and the column indices are the destination nodes. + """ + n_cpus = multiprocessing.cpu_count() - 1 + dist_wrapper = partial( + get_shortest_distances_single, graph, dnodes=dnodes, max_dist=max_dist, attribute=attribute + ) + with multiprocessing.Pool(n_cpus) as pool_obj: + dists = pool_obj.map(dist_wrapper, onodes) + + dists = np.array(dists) + dists = dists[dists[:, 0].argsort()] # sort by source node + + # convert to dataframe and reindex + dists = pd.DataFrame(dists[:, 1:], index=dists[:, 0], columns=dnodes) + dists = dists.loc[onodes] + + return dists + + +def main( + config: Config, + gtfs_data: Optional[GTFSData] = None, + connectors_data: Optional[ConnectorsData] = None, +) -> pd.DataFrame: + # read + logger = get_logger(os.path.join(config.path_outputs, "log_graph.log")) + + logger.info("Reading files...") + if gtfs_data is None: + gtfs_data = GTFSData.from_parquet(path=config.path_outputs) + if connectors_data is None: + connectors_data = ConnectorsData.from_parquet(path=config.path_outputs) + origins = pd.read_csv(config.path_origins, index_col=0) + destinations = pd.read_csv(config.path_destinations, index_col=0) + + # graph + logger.info("Building graph...") + edges = get_all_edges(gtfs_data, connectors_data) + edges = add_gc(edges=edges, config=config) + g = build_graph(edges=edges) + + # shortest paths + logger.info("Calculating shortest distances...") + origins["idx"] = range(len(origins)) + origins["idx"] += len(gtfs_data.stop_times) + destinations["idx"] = range(len(destinations)) + destinations["idx"] += len(gtfs_data.stop_times) + len(origins) + + onodes_scope = list(origins[origins["idx"].isin(edges["onode"])]["idx"]) + dnodes_scope = list(destinations[destinations["idx"].isin(edges["dnode"])]["idx"]) + maxdist = config.end_s - config.start_s + distmat = get_shortest_distances(g, onodes=onodes_scope, dnodes=dnodes_scope, max_dist=maxdist) + + # expand to the full OD space + distmat_full = pd.DataFrame(np.inf, index=origins["idx"], columns=destinations["idx"]) + distmat_full.loc[distmat.index, distmat.columns] = distmat.values + + # map labels + distmat_full.index = distmat_full.index.map(origins.reset_index().set_index("idx")["name"]) + distmat_full.columns = distmat_full.columns.map( + destinations.reset_index().set_index("idx")["name"] + ) + + # infill intra_zonal + distmat_full = distmat_full.apply(lambda x: np.where(x.name == x.index, np.nan, x), axis=0) + distmat_full = distmat_full.map(lambda x: np.where(x >= maxdist, np.inf, x)) + + # save + path = os.path.join(config.path_outputs, "skims.parquet.gzip") + logger.info(f"Saving results to {path}...") + distmat_full.to_parquet(path, compression="gzip", index=True) + + return distmat_full diff --git a/gtfs_skims/preprocessing.py b/gtfs_skims/preprocessing.py new file mode 100644 index 0000000..0299da6 --- /dev/null +++ b/gtfs_skims/preprocessing.py @@ -0,0 +1,135 @@ +import os + +import pyproj + +from gtfs_skims.utils import Config, GTFSData, get_logger, get_weekday, ts_to_sec + + +def filter_day(data: GTFSData, date: int) -> None: + """Filter the GTFS for a specific date in the calendar. + + Args: + data (Data): GTFS data object + date (int): Date as yyyymmdd + """ + weekday = get_weekday(date) + data.calendar = data.calendar[ + (data.calendar["start_date"] <= date) + & (data.calendar["end_date"] >= date) + & (data.calendar[weekday] == 1) + ] + + data.trips = data.trips[data.trips["service_id"].isin(set(data.calendar["service_id"]))] + + data.routes = data.routes[data.routes["route_id"].isin(set(data.trips["route_id"]))] + + data.stop_times = data.stop_times[data.stop_times["trip_id"].isin(set(data.trips["trip_id"]))] + + data.stops = data.stops[data.stops["stop_id"].isin(set(data.stop_times["stop_id"]))] + + +def filter_time(data: GTFSData, start_time: int, end_time: int) -> None: + """Filter the GTFS for a specified time window. + + Args: + data (Data): GTFS data object + start_time (int): Start of the time window (seconds from midnight) + end_time (int): End of the time window (seconds from midnight) + """ + # filter stop times + data.stop_times["departure_s"] = data.stop_times["departure_time"].apply(ts_to_sec) + data.stop_times["arrival_s"] = data.stop_times["arrival_time"].apply(ts_to_sec) + data.stop_times = data.stop_times[ + (data.stop_times["arrival_s"] >= start_time) & (data.stop_times["departure_s"] <= end_time) + ] + + # filter stops + data.stops = data.stops[data.stops["stop_id"].isin(set(data.stop_times["stop_id"]))] + + # filter trips + data.trips = data.trips[data.trips["trip_id"].isin(set(data.stop_times["trip_id"]))] + + # filter routes + data.routes = data.routes[data.routes["route_id"].isin(set(data.trips["route_id"]))] + + +def add_coordinates(data: GTFSData, epsg: int = 27700) -> None: + """Add BNG coordinates to the stop and stoptime tables. + + Args: + data (Data): Data object. + epsg (int): The target coordinate system + """ + transformer = pyproj.Transformer.from_crs( + pyproj.transformer.CRS("epsg:4326"), pyproj.transformer.CRS(f"epsg:{epsg}"), always_xy=True + ) + + data.stops["x"], data.stops["y"] = transformer.transform( + data.stops["stop_lon"], data.stops["stop_lat"] + ) + + data.stops["x"] = data.stops["x"].round().map(int) + data.stops["y"] = data.stops["y"].round().map(int) + + data.stop_times["x"] = data.stop_times["stop_id"].map(data.stops.set_index("stop_id")["x"]) + data.stop_times["y"] = data.stop_times["stop_id"].map(data.stops.set_index("stop_id")["y"]) + + +def filter_bounding_box(data: GTFSData, xmin: int, xmax: int, ymin: int, ymax: int) -> None: + """Filter a GTFS with a bounding box. Coordinates are using the BNG projection. + + Args: + data (Data): Data object. + xmin (int): Min Easting. + xmax (int): Max Easting. + ymin (int): Min Northing. + ymax (int): Max Northing + """ + data.stops = data.stops[ + (data.stops["x"] >= xmin) + & (data.stops["x"] <= xmax) + & (data.stops["y"] >= ymin) + & (data.stops["y"] <= ymax) + ] + + # filter stop times + data.stop_times = data.stop_times[ + data.stop_times["stop_id"].isin(set(list(data.stops["stop_id"]))) + ] + + # filter trips + data.trips = data.trips[data.trips["trip_id"].isin(set(data.stop_times["trip_id"]))] + + # filter routes + data.routes = data.routes[data.routes["route_id"].isin(set(data.trips["route_id"]))] + + +def main(config: Config) -> GTFSData: + """Run the preprocessing pipeline and save resulting tables to disk. + + Args: + config (Config): Config object. + + Returns: + GTFSData: Pre-processed GTFS data object. + """ + logger = get_logger(os.path.join(config.path_outputs, "log_preprocessing.log")) + + logger.info("Reading files...") + data = GTFSData.from_gtfs(path_gtfs=config.path_gtfs) + + logger.info("Time filtering..") + filter_day(data, config.calendar_date) + filter_time(data, config.start_s, config.end_s) + add_coordinates(data, epsg=config.epsg_centroids) + + if config.bounding_box is not None: + logger.info("Cropping to bounding box..") + filter_bounding_box(data, **config.bounding_box) + + logger.info(f"Saving outputs at {config.path_outputs}") + data.save(config.path_outputs) + + logger.info("Preprocessing complete.") + + return data diff --git a/gtfs_skims/utils.py b/gtfs_skims/utils.py new file mode 100644 index 0000000..b673141 --- /dev/null +++ b/gtfs_skims/utils.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import logging +import os +from abc import ABC +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Optional +from zipfile import ZipFile + +import pandas as pd +import yaml + + +def ts_to_sec(x: str) -> int: + """Convert a hh:mm:ss timestamp to seconds from midnight. + + Args: + x (str): Timestamp + + Returns: + int: Seconds from midnight + """ + s = [int(i) for i in x.split(":")] + return 3600 * s[0] + 60 * s[1] + s[2] + + +def get_weekday(date: int) -> str: + """Get the weekday of a date + + Args: + date (int): Date as yyyymmdd + + Returns: + str: Day name + """ + weekday = datetime.strptime(str(date), "%Y%m%d") + weekday = datetime.strftime(weekday, "%A").lower() + return weekday + + +def get_logger(path_output: Optional[str] = None) -> logging.Logger: + """Get the library logger. + + Args: + path_output (Optional[str], optional): Path to save the logs. Defaults to None. + + Returns: + logging.Logger: Logger. + """ + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + if len(logger.handlers) == 0: + logger.addHandler(handler) + else: + logger.handlers[0] = handler + + if path_output is not None: + parent_dir = Path(path_output).parent.absolute() + if not os.path.exists(parent_dir): + os.makedirs(parent_dir) + + file_handler = logging.FileHandler(path_output, mode="w") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + + +@dataclass +class Config: + """Config file + + Example config file: + + ``` + paths: + path_gtfs: ./iow-bus-gtfs.zip + path_outputs: /mnt/efs/otp/gtfs_transfers/skims_iow + path_origins: ./centroids.csv # path to the origin points + path_destinations: ./centroids.csv # path to the destination points + + settings: + calendar_date : 20190515 # yyyymmdd | Date for filtering the GTFS file. + start_s : 32400 # sec | Start time of the journey. + end_s : 41400 # sec | Max end time of a journey. + walk_distance_threshold : 2000 # m | Max walk distance in a leg + walk_speed : 4.5 # kph | Walking speed + crows_fly_factor : 1.3 # Conversion factor from euclidean to routed distances + max_transfer_time : 1800 # Max combined time of walking and waiting (sec) + k : 500 # max nearest neighbours when calculating distances + max_wait : 1800 # sec | Max wait time at a stop + bounding_box : null + epsg_centroids: 27700 # coordinate system of the centroids file. Needs to be Cartesian and in meters. + + + steps: + - preprocessing + - connectors + - graph + ``` + + """ + + path_gtfs: str + path_outputs: str + path_origins: str + path_destinations: str + calendar_date: int + crows_fly_factor: float + max_transfer_time: int + end_s: int + bounding_box: dict + epsg_centroids: int + max_wait: int + start_s: int + walk_distance_threshold: int + walk_speed: float + weight_walk: float + weight_wait: float + penalty_interchange: float + steps: list + + @classmethod + def from_yaml(cls, path: str) -> Config: + """Construct class from a config yaml file. + + Args: + path (str): Path to the yaml config. + + Returns: + Config: Config object + """ + with open(path, "r") as f: + config = yaml.safe_load(f) + config_flat = {**config["paths"], **config["settings"], "steps": config["steps"]} + return cls(**config_flat) + + def __repr__(self) -> str: + s = "Config file\n" + s += "-" * 50 + "\n" + s += yaml.dump(self.__dict__) + return s + + +@dataclass +class Data(ABC): + @classmethod + def from_gtfs(cls, path_gtfs: str) -> Data: + """Load GTFS tables from a standard zipped GTFS file. + + Args: + path_gtfs (str): Path to a zipped GTFS dataset. + + Returns: + GTFSData: GTFS data object. + """ + data = {} + with ZipFile(path_gtfs, "r") as zf: + for name in cls.__annotations__.keys(): + with zf.open(f"{name}.txt") as f: + data[name] = pd.read_csv(f, low_memory=False) + return cls(**data) + + @classmethod + def from_parquet(cls, path: str) -> Data: + """Construct class from pre-processed GTFS tables in Parquet format. + + Args: + path (str): Path to tables. + + Returns: + GTFSData: GTFS data object. + """ + data = {} + for name in cls.__annotations__.keys(): + data[name] = pd.read_parquet(os.path.join(path, f"{name}.parquet.gzip")) + return cls(**data) + + def save(self, path_outputs: str) -> None: + """Export all tables in zipped parquet format. + + Args: + path_outputs (str): Directory to save outputs. + """ + if not os.path.exists(path_outputs): + os.makedirs(path_outputs) + + for k, v in self.__dict__.items(): + v.to_parquet(os.path.join(path_outputs, f"{k}.parquet.gzip"), compression="gzip") + + +@dataclass +class GTFSData(Data): + calendar: pd.DataFrame + routes: pd.DataFrame + stops: pd.DataFrame + stop_times: pd.DataFrame + trips: pd.DataFrame + + +@dataclass +class ConnectorsData(Data): + connectors_transfer: pd.DataFrame + connectors_access: pd.DataFrame + connectors_egress: pd.DataFrame diff --git a/gtfs_skims/variables.py b/gtfs_skims/variables.py new file mode 100644 index 0000000..fd70251 --- /dev/null +++ b/gtfs_skims/variables.py @@ -0,0 +1,22 @@ +import numpy as np + +DATA_TYPE = np.uint32 + +# route types lookup +# source: https://developers.google.com/transit/gtfs/reference#routestxt +# and https://developers.google.com/transit/gtfs/reference/extended-route-types +ROUTE_TYPES = { + 0: "tram", # Tram, Streetcar, Light rail. + 1: "underground", # Subway, Metro. + 2: "rail", # Rail. Used for intercity or long-distance travel. + 3: "bus", # Bus. Used for short- and long-distance bus routes. + 4: "ferry", # Ferry. Used for short- and long-distance boat service. + 5: "cable", + 6: "cable aerial", + 7: "furnicular", # Funicular. Any rail system designed for steep inclines. + 11: "trolley", # Trolleybus. + 12: "monorail", # Monorail. + 200: "coach", # Coach Service + 401: "undergound", # Metro Service + 402: "underground", # Underground Service +} diff --git a/mkdocs.yml b/mkdocs.yml index 1064385..235e7ca 100755 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -2,6 +2,8 @@ site_name: gtfs-skims nav: - Home: index.md - Installation: installation.md + - Running the tool: run.md + - Methodology: methodology.md - Contributing: contributing.md - Changelog: CHANGELOG.md - Reference: diff --git a/requirements/base.txt b/requirements/base.txt index 9c9350d..4c3de05 100755 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,3 +1,9 @@ -# this dependency exists so that the base file is not empty -# it was chosen since it is a dependency that is included in any python environment already +click +fastparquet +graph-tool +numpy +pandas +pyproj +pyyaml +yaml zipp \ No newline at end of file diff --git a/resources/logos/title.png b/resources/logos/title.png new file mode 100644 index 0000000..24ad2cf Binary files /dev/null and b/resources/logos/title.png differ diff --git a/tests/conftest.py b/tests/conftest.py index a6de8b3..b2486e7 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,9 +7,15 @@ def test_content(response): assert response.content ``` """ +import os +from pathlib import Path import pytest +from gtfs_skims.utils import Config, ConnectorsData, GTFSData + +TEST_DATA_DIR = os.path.join(Path(__file__).parent, "test_data") + @pytest.fixture def response(): @@ -19,3 +25,23 @@ def response(): """ # import requests # return requests.get('https://github.com/arup-group/cookiecutter-pypackage') + + +@pytest.fixture +def config(): + return Config.from_yaml(os.path.join(TEST_DATA_DIR, "config_demo.yaml")) + + +@pytest.fixture +def gtfs_data(): + return GTFSData.from_gtfs(os.path.join(TEST_DATA_DIR, "iow-bus-gtfs.zip")) + + +@pytest.fixture +def gtfs_data_preprocessed(): + return GTFSData.from_parquet(os.path.join(TEST_DATA_DIR, "outputs")) + + +@pytest.fixture +def connectors_data(): + return ConnectorsData.from_parquet(os.path.join(TEST_DATA_DIR, "outputs")) diff --git a/tests/test_cli.py b/tests/test_cli.py index cb08971..4f60664 100755 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,21 +1,46 @@ """Tests for `gtfs_skims` CLI.""" +import os +from pathlib import Path from click.testing import CliRunner from gtfs_skims import cli +TEST_DATA_DIR = os.path.join(Path(__file__).parent, "test_data") + def test_command_line_interface(): """Test the CLI.""" runner = CliRunner() result = runner.invoke(cli.cli) assert result.exit_code == 0 - assert "gtfs_skims.cli.cli" in result.output help_result = runner.invoke(cli.cli, ["--help"]) assert help_result.exit_code == 0 assert ( - "Console script for gtfs_skims.\n\nOptions:\n " + "Console script for Argo (gtfs_skims).\n\nOptions:\n " "--version Show the version and exit.\n " - "--help Show this message and exit.\n" - in help_result.output + "--help Show this message and exit.\n" in help_result.output + ) + + +def test_run_steps_saves_outputs(tmpdir): + runner = CliRunner() + result = runner.invoke( + cli.cli, + [ + "run", + os.path.join(TEST_DATA_DIR, "config_demo.yaml"), + "--output_directory_override", + tmpdir, + ], ) + + assert result.exit_code == 0 + + for x in ["calendar", "routes", "stops", "stop_times", "trips"]: + assert os.path.exists(os.path.join(tmpdir, f"{x}.parquet.gzip")) + + for x in ["transfer", "access", "egress"]: + assert os.path.exists(os.path.join(tmpdir, f"connectors_{x}.parquet.gzip")) + + assert os.path.exists(os.path.join(tmpdir, "skims.parquet.gzip")) diff --git a/tests/test_connectors.py b/tests/test_connectors.py new file mode 100644 index 0000000..e18b27f --- /dev/null +++ b/tests/test_connectors.py @@ -0,0 +1,184 @@ +import itertools +import os +from collections import defaultdict + +import numpy as np +import pytest + +from gtfs_skims import connectors + + +@pytest.fixture() +def points(): + p = np.arange(-20, 20, 2.5) + coords = np.array([(x, y, z) for x, y, z in itertools.product(p, p, p)]) + return coords + + +@pytest.fixture() +def transfer_connectors(points): + return connectors.TransferConnectors(points, 10) + + +def find_index(coords, x, y, z): + idx = np.where(np.all(coords == np.array([x, y, z]), axis=1))[0][0] + return idx + + +def get_valid_points(coords, source_idx, max_trasfer_dist): + dcoords = coords - coords[source_idx] + walk = (dcoords[:, :2] ** 2).sum(1) ** 0.5 # euclidean distance on xy + wait = dcoords[:, 2] - walk + + is_valid = (wait > 0) & ((walk + wait) <= max_trasfer_dist) + + return is_valid + + +@pytest.mark.parametrize("source", [(0, 0, 0), (2.5, 2.5, 2.5), (-2.5, 0, 2.5)]) +def test_query_all_valid_included(points, source): + """All valid points are included in the query results""" + source_idx = find_index(points, *source) + maxdist = 10 + is_valid = get_valid_points(points, source_idx, maxdist) + + radius = maxdist * (2**0.5) + ods = connectors.query_pairs(points, radius) + + dest = ods[ods[:, 0] == source_idx, 1] + assert is_valid[dest].sum() == is_valid.sum() + + +@pytest.mark.parametrize("source", [(0, 0, 0), (2.5, 2.5, 2.5), (-2.5, 0, 2.5)]) +def test_query_all_included_valid(points, source): + """All results from the query are valid""" + source_idx = find_index(points, *source) + maxdist = 10 + is_valid = get_valid_points(points, source_idx, maxdist) + + tc = connectors.TransferConnectors(points, maxdist) + tc.filter_feasible_transfer(maxdist) + dest = tc.ods[tc.ods[:, 0] == source_idx, 1] + + assert is_valid[dest].sum() == is_valid.sum() + assert len(is_valid[dest]) > 0 and all(is_valid[dest]) + + +def test_filter_transfer_walk(transfer_connectors): + max_walk = 5 + assert transfer_connectors.walk.max() > max_walk + transfer_connectors.filter_max_walk(max_walk) + assert transfer_connectors.walk.max() <= max_walk + + +def test_filter_transfer_wait(transfer_connectors): + max_wait = 5 + assert transfer_connectors.wait.max() > max_wait + transfer_connectors.filter_max_wait(max_wait) + assert transfer_connectors.wait.max() <= max_wait + + +def test_filter_same_route(transfer_connectors): + # assume all even-to-even point ID are in the same route + routes = np.arange(len(transfer_connectors.coords)) + routes = np.where(routes % 2, -1, routes) + transfer_connectors.filter_same_route(routes) + assert (transfer_connectors.ods % 2).prod(1).sum() == 0 + + +def get_o_service_transfers(conn, services_d): + transfer_times = conn.wait + conn.walk + d = defaultdict(list) + for i in range(len(services_d)): + d[(conn.ods[i, 0], services_d[i])].append(transfer_times[i]) + return d + + +def test_filter_nearest_service(transfer_connectors): + np.random.seed(0) + services = np.random.randint(0, 2, size=transfer_connectors.coords.shape[0]) + services_d = services[transfer_connectors.ods[:, 1]] + + # for every origin-service pair there are multiple connections + d_before = get_o_service_transfers(transfer_connectors, services_d) + + assert max(map(len, d_before.values())) > 0 + + # after filtering, there is only one and it is the + # one with the minumum transfer time. + transfer_connectors.filter_nearest_service(services) + services_d = services[transfer_connectors.ods[:, 1]] + + d_after = get_o_service_transfers(transfer_connectors, services_d) + + # didn't lose any origin-service pairs + assert len(d_before) == len(d_after) + # single connection per origin-service + assert max(map(len, d_after.values())) == 1 + + for o, service in d_before.keys(): + d_after[(o, service)][0] == min(d_before[(o, service)]) + + +def test_get_transfer_array(gtfs_data_preprocessed, config): + arr = connectors.get_transfer_connectors(gtfs_data_preprocessed, config) + assert len(arr) > 0 + assert isinstance(arr, np.ndarray) + + +def test_get_od_pairs(): + ods = connectors.query_pairs_od( + np.array([[0, 0], [1, 1]]), np.array([[0.5, 0.5], [2, 1], [2, 2]]), radius=1 + ) + expected = np.array([[0, 0], [1, 0], [1, 1]]) + np.testing.assert_equal(ods, expected) + + +def test_get_od_walk(): + egress = connectors.AccessEgressConnectors( + np.array([[0, 0], [1, 1]]), np.array([[0.5, 0.5], [2, 1], [2, 2]]), max_transfer_distance=1 + ) + walk = egress.walk + expected = np.array([(2 * 0.5**2) ** 0.5, (2 * 0.5**2) ** 0.5, 1]) + np.testing.assert_almost_equal(walk, expected) + + +def test_convert_distance_3d(): + access = connectors.AccessEgressConnectors( + np.array([[0, 0, 0]]), np.array([[1, 1, 1]]), max_transfer_distance=1 + ) + assert len(access.ods) == 1 # radius has been adjusted to 3D space + + +def test_apply_crow_fly_factoring(gtfs_data_preprocessed, config): + arr = connectors.get_transfer_connectors(gtfs_data_preprocessed, config) + assert len(arr) == 2 + max_walk = arr[:, 3].max() + + config.walk_distance_threshold = max_walk + config.crows_fly_factor = 1 + arr = connectors.get_transfer_connectors(gtfs_data_preprocessed, config) + assert len(arr) == 2 + + # after adding the crow's fly factor, the destination is further than the max distance + config.crows_fly_factor = 1.05 + arr = connectors.get_transfer_connectors(gtfs_data_preprocessed, config) + assert len(arr) < 2 + + +def test_indices_are_offset(config, gtfs_data_preprocessed, tmpdir): + config.path_outputs = tmpdir + conn = connectors.main(config=config, data=gtfs_data_preprocessed) + stop_time_ids = list(range(len(gtfs_data_preprocessed.stop_times))) + assert all(np.isin(conn.connectors_access["dnode"], stop_time_ids)) + assert all(np.isin(conn.connectors_egress["onode"], stop_time_ids)) + assert np.isin(conn.connectors_access["onode"], stop_time_ids).sum() == 0 + assert np.isin(conn.connectors_egress["dnode"], stop_time_ids).sum() == 0 + assert conn.connectors_access["onode"].max() < conn.connectors_egress["dnode"].min() + + +def test_main_saves_outputs(config, gtfs_data_preprocessed, tmpdir): + config.path_outputs = tmpdir + connectors.main(config=config, data=gtfs_data_preprocessed) + for x in ["transfer", "access", "egress"]: + assert os.path.exists(os.path.join(tmpdir, f"connectors_{x}.parquet.gzip")) diff --git a/tests/test_data/centroids.csv b/tests/test_data/centroids.csv new file mode 100644 index 0000000..3e3b3ac --- /dev/null +++ b/tests/test_data/centroids.csv @@ -0,0 +1,19 @@ +name,x,y +E02003587,459682,91699 +E02003586,458411,91813 +E02003585,454864,92122 +E02003584,459488,92462 +E02003583,450783,94940 +E02003582,448766,95094 +E02003581,449648,95518 +E02003589,449364,89458 +E02003588,450639,89535 +E02003597,450770,79798 +E02003596,458382,81965 +E02003595,458050,83041 +E02003594,459825,84754 +E02003593,443438,87068 +E02003592,433558,87253 +E02003591,449461,88507 +E02003590,463905,88678 +E02003598,455952,77862 diff --git a/tests/test_data/config_demo.yaml b/tests/test_data/config_demo.yaml new file mode 100644 index 0000000..fb049f7 --- /dev/null +++ b/tests/test_data/config_demo.yaml @@ -0,0 +1,25 @@ +paths: + path_gtfs: ./tests/test_data/iow-bus-gtfs.zip + path_outputs: ./tests/test_data/outputs + path_origins: ./tests/test_data/centroids.csv # path to the origin points + path_destinations: ./tests/test_data/centroids.csv # path to the destination points + +settings: + calendar_date : 20190515 # yyyymmdd | Date for filtering the GTFS file. + start_s : 32400 # sec | Start time of the journey. + end_s : 41400 # sec | Max end time of a journey. + walk_distance_threshold : 2000 # m | Max walk distance in a leg + walk_speed : 4.5 # kph | Walking speed + crows_fly_factor : 1.3 # Conversion factor from euclidean to routed distances + max_transfer_time : 1800 # Max combined time of walking and waiting (sec) of a transfer + max_wait : 1800 # sec | Max wait time at a stop / leg + bounding_box : null + epsg_centroids: 27700 # coordinate system of the centroids file. Needs to be Cartesian and in meters. + weight_walk: 2 # value of walk time, ratio to in-vehicle time + weight_wait: 2 # value of wait time, ratio to in-vehicle time + penalty_interchange: 300 # seconds added to generalised cost for each interchange + +steps: + - preprocessing + - connectors + - graph \ No newline at end of file diff --git a/tests/test_data/iow-bus-gtfs.zip b/tests/test_data/iow-bus-gtfs.zip new file mode 100755 index 0000000..1bcb7ae Binary files /dev/null and b/tests/test_data/iow-bus-gtfs.zip differ diff --git a/tests/test_data/outputs/calendar.parquet.gzip b/tests/test_data/outputs/calendar.parquet.gzip new file mode 100644 index 0000000..f760856 Binary files /dev/null and b/tests/test_data/outputs/calendar.parquet.gzip differ diff --git a/tests/test_data/outputs/connectors_access.parquet.gzip b/tests/test_data/outputs/connectors_access.parquet.gzip new file mode 100644 index 0000000..73e66e9 Binary files /dev/null and b/tests/test_data/outputs/connectors_access.parquet.gzip differ diff --git a/tests/test_data/outputs/connectors_egress.parquet.gzip b/tests/test_data/outputs/connectors_egress.parquet.gzip new file mode 100644 index 0000000..e6e6a19 Binary files /dev/null and b/tests/test_data/outputs/connectors_egress.parquet.gzip differ diff --git a/tests/test_data/outputs/connectors_transfer.parquet.gzip b/tests/test_data/outputs/connectors_transfer.parquet.gzip new file mode 100644 index 0000000..0d02413 Binary files /dev/null and b/tests/test_data/outputs/connectors_transfer.parquet.gzip differ diff --git a/tests/test_data/outputs/routes.parquet.gzip b/tests/test_data/outputs/routes.parquet.gzip new file mode 100644 index 0000000..a5b796d Binary files /dev/null and b/tests/test_data/outputs/routes.parquet.gzip differ diff --git a/tests/test_data/outputs/stop_times.parquet.gzip b/tests/test_data/outputs/stop_times.parquet.gzip new file mode 100644 index 0000000..2dd4231 Binary files /dev/null and b/tests/test_data/outputs/stop_times.parquet.gzip differ diff --git a/tests/test_data/outputs/stops.parquet.gzip b/tests/test_data/outputs/stops.parquet.gzip new file mode 100644 index 0000000..f375508 Binary files /dev/null and b/tests/test_data/outputs/stops.parquet.gzip differ diff --git a/tests/test_data/outputs/trips.parquet.gzip b/tests/test_data/outputs/trips.parquet.gzip new file mode 100644 index 0000000..0bd4afa Binary files /dev/null and b/tests/test_data/outputs/trips.parquet.gzip differ diff --git a/tests/test_graph.py b/tests/test_graph.py new file mode 100644 index 0000000..009daea --- /dev/null +++ b/tests/test_graph.py @@ -0,0 +1,93 @@ +from unittest.mock import Mock + +import numpy as np +import pandas as pd +import pytest +from graph_tool import Graph + +from gtfs_skims import graph + + +@pytest.fixture() +def mock_config(mocker): + mock = Mock() + mock.weight_wait = 3 + mock.weight_walk = 2 + mock.penalty_interchange = 600 + return mock + + +@pytest.fixture() +def small_graph() -> Graph: + edges = pd.DataFrame({"onode": [0, 0, 1, 2], "dnode": [1, 2, 3, 3], "gc": [10, 20, 15, 4]}) + return graph.build_graph(edges, vars=["gc"]) + + +@pytest.fixture() +def small_graph_birectional() -> Graph: + edges = pd.DataFrame( + { + "onode": [0, 0, 1, 2, 1, 2, 3, 3], + "dnode": [1, 2, 3, 3, 0, 0, 1, 2], + "gc": [10, 20, 15, 4, 10, 20, 15, 4], + } + ) + return graph.build_graph(edges, vars=["gc"]) + + +def test_get_ivt_times(): + stop_times = pd.DataFrame({"trip_id": [0, 1, 0, 1], "departure_s": [100, 105, 120, 150]}) + ivt_edges = graph.get_ivt_edges(stop_times) + expected = np.array([[0, 2, 20], [1, 3, 45]]) + np.testing.assert_equal(ivt_edges.values, expected) + + +def test_get_all_edges(gtfs_data_preprocessed, connectors_data): + edges = graph.get_all_edges(gtfs_data_preprocessed, connectors_data) + + # all connections are included + len_expected = ( + len(gtfs_data_preprocessed.stop_times) + - gtfs_data_preprocessed.stop_times["trip_id"].nunique() + ) + len_expected += len(connectors_data.connectors_transfer) + len_expected += len(connectors_data.connectors_access) + len_expected += len(connectors_data.connectors_egress) + assert len(edges) == len_expected + + # all variables are included + assert list(edges.columns) == ["onode", "dnode", "ivt", "walk", "wait", "transfer"] + + +def test_calculate_gc(mock_config): + edges = pd.DataFrame({"ivt": [100, 200], "walk": [30, 10], "wait": [10, 5], "transfer": [0, 1]}) + graph.add_gc(edges, mock_config) + assert list(edges["gc"]) == [190, 835] + + +def test_get_shortest_distance_single(small_graph): + dists = graph.get_shortest_distances_single(small_graph, 0, [3, 2, 1, 0]) + expected = np.array([24, 20, 10, 0]) + assert dists[0] == 0 # the first value is the source + np.testing.assert_equal(dists[1:], expected) + + +def test_get_distance_matrix(small_graph_birectional): + distmat = graph.get_shortest_distances(small_graph_birectional, [0, 1, 2], [1, 2]) + expected = np.array([[10, 20], [0, 19], [19, 0]]) + assert list(distmat.index) == [0, 1, 2] + assert list(distmat.columns) == [1, 2] + + np.testing.assert_equal(distmat.values, expected) + + +def test_correct_labels(config, gtfs_data_preprocessed, connectors_data, tmpdir): + origins = pd.read_csv(config.path_origins, index_col=0) + destinations = pd.read_csv(config.path_destinations, index_col=0) + config.path_outputs = tmpdir + distmat = graph.main( + config=config, gtfs_data=gtfs_data_preprocessed, connectors_data=connectors_data + ) + + assert list(distmat.index) == list(origins.index) + assert list(distmat.columns) == list(destinations.index) diff --git a/tests/test_preprocessing.py b/tests/test_preprocessing.py new file mode 100644 index 0000000..c3b7a9d --- /dev/null +++ b/tests/test_preprocessing.py @@ -0,0 +1,53 @@ +import os + +from gtfs_skims import preprocessing + + +def test_filter_date(gtfs_data): + assert 14 in gtfs_data.calendar.service_id.values + preprocessing.filter_day(gtfs_data, 20180507) + assert list(gtfs_data.calendar.service_id) == [14] + assert set(gtfs_data.trips["service_id"]) == set([14]) + + +def test_filter_time(gtfs_data): + start_time = 9 * 3600 + end_time = 10 * 3600 + preprocessing.filter_time(gtfs_data, start_time, end_time) + assert gtfs_data.stop_times["arrival_s"].min() >= start_time + assert gtfs_data.stop_times["departure_s"].max() <= end_time + + +def test_projected_coords_within_bounds(gtfs_data): + preprocessing.add_coordinates(gtfs_data) + # check that the BNG coordinates fall within an Isle-of-Wight bounding box + xmin, ymin = 423104, 69171 + xmax, ymax = 471370, 101154 + + assert gtfs_data.stops["x"].min() > xmin + assert gtfs_data.stops["x"].max() < xmax + assert gtfs_data.stops["y"].min() > ymin + assert gtfs_data.stops["y"].max() < ymax + + +def test_within_bounding_box(gtfs_data): + preprocessing.add_coordinates(gtfs_data) + + # filter for Cowes + xmin, ymin = 447477, 92592 + xmax, ymax = 451870, 96909 + assert gtfs_data.stops["x"].min() < xmin + preprocessing.filter_bounding_box(gtfs_data, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax) + + assert gtfs_data.stops["x"].min() > xmin + assert gtfs_data.stops["x"].max() < xmax + assert gtfs_data.stops["y"].min() > ymin + assert gtfs_data.stops["y"].max() < ymax + + +def test_run_preprocessing_demo(config, tmpdir): + path_outputs = os.path.join(tmpdir, "outputs") + config.path_outputs = path_outputs + preprocessing.main(config) + for x in ["calendar", "routes", "stops", "stop_times", "trips"]: + assert os.path.exists(os.path.join(path_outputs, f"{x}.parquet.gzip")) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..eba5713 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,35 @@ +import os + +import pandas as pd + +from gtfs_skims import utils + + +def test_parse_timestamp(): + assert utils.ts_to_sec("00:00:00") == 0 + assert utils.ts_to_sec("10:01:01") == 36061 + + +def test_get_logger(tmpdir): + logger = utils.get_logger(os.path.join(tmpdir, "logs", "log.log")) + logger.info("test") + + +def test_weekday(): + assert utils.get_weekday(20231201) == "friday" + + +def test_load_config(config): + "path_gtfs" in config.__dict__ + + +def test_load_gtfs(gtfs_data): + for x in ["calendar", "routes", "stops", "stop_times", "trips"]: + assert isinstance(getattr(gtfs_data, x), pd.DataFrame) + + +def test_cache_gtfs(gtfs_data, tmpdir): + gtfs_data.save(tmpdir) + gtfs_cached = utils.GTFSData.from_parquet(tmpdir) + for x in ["calendar", "routes", "stops", "stop_times", "trips"]: + pd.testing.assert_frame_equal(getattr(gtfs_data, x), getattr(gtfs_cached, x))