diff --git a/mbta-performance/chalicelib/lamp/date.py b/mbta-performance/chalicelib/date.py similarity index 100% rename from mbta-performance/chalicelib/lamp/date.py rename to mbta-performance/chalicelib/date.py diff --git a/mbta-performance/chalicelib/historic/README.md b/mbta-performance/chalicelib/historic/README.md new file mode 100644 index 0000000..87797b3 --- /dev/null +++ b/mbta-performance/chalicelib/historic/README.md @@ -0,0 +1,17 @@ +# Monthly Data Processing + +MBTA uploads monthly data files periodically. These monthly batches take the place of performance data when available (This may change with LAMP). + +## Backfill all years + +This should only be done if we change the processing code or need to repopulate an empty bucket + +```sh +poetry run python -m mbta-performance.chalicelib.historic.backfill.main +``` + +### Upload to S3 + +```sh +aws s3 cp --recursive data/output/Events/ s3://tm-mbta-performance/Events/ +``` diff --git a/mbta-performance/chalicelib/historic/__init__.py b/mbta-performance/chalicelib/historic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mbta-performance/chalicelib/historic/backfill/__init__.py b/mbta-performance/chalicelib/historic/backfill/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mbta-performance/chalicelib/historic/backfill/main.py b/mbta-performance/chalicelib/historic/backfill/main.py new file mode 100644 index 0000000..d4e910a --- /dev/null +++ b/mbta-performance/chalicelib/historic/backfill/main.py @@ -0,0 +1,39 @@ +import threading +from ..constants import ARCGIS_IDS +from ..download import download_historic_data, list_files_in_dir, prep_local_dir, unzip_historic_data +from ..process import process_events + + +def single_year_thread(year: str): + print(f"Backfilling year {year}") + # download the data + zip_file = download_historic_data(year) + # unzip the data + input_dir = unzip_historic_data(zip_file, f"data/input/{year}") + # process the data + for file in list_files_in_dir(input_dir): + process_events(file, "data/output") + print(f"Finished backfilling year {year}") + + +def backfill_all_years(): + """Backfill all years of MBTA data we can""" + + prep_local_dir() + + year_threads: list[threading.Thread] = [] + for year in ARCGIS_IDS.keys(): + year_thread = threading.Thread( + target=single_year_thread, + args=(year,), + name=year, + ) + year_threads.append(year_thread) + year_thread.start() + + for year_thread in year_threads: + year_thread.join() + + +if __name__ == "__main__": + backfill_all_years() diff --git a/mbta-performance/chalicelib/historic/constants.py b/mbta-performance/chalicelib/historic/constants.py new file mode 100644 index 0000000..0c869b7 --- /dev/null +++ b/mbta-performance/chalicelib/historic/constants.py @@ -0,0 +1,25 @@ +ARCGIS_IDS = { + "2016": "3e892be850fe4cc4a15d6450de4bd318", + "2017": "cde60045db904ad299922f4f8759dcad", + "2018": "25c3086e9826407e9f59dd9844f6c975", + "2019": "11bbb87f8fb245c2b87ed3c8a099b95f", + "2020": "cb4cf52bafb1402b9b978a424ed4dd78", + "2021": "611b8c77f30245a0af0c62e2859e8b49", + "2022": "99094a0c59e443cdbdaefa071c6df609", + "2023": "9a7f5634db72459ab731b6a9b274a1d4", + "2024": "4adbec39db40498a8530496d8c63a924", +} + + +HISTORIC_COLUMNS = [ + "service_date", + "route_id", + "trip_id", + "direction_id", + "stop_id", + "stop_sequence", + "vehicle_id", + "vehicle_label", + "event_type", + "event_time_sec", +] diff --git a/mbta-performance/chalicelib/historic/download.py b/mbta-performance/chalicelib/historic/download.py new file mode 100644 index 0000000..7725b3f --- /dev/null +++ b/mbta-performance/chalicelib/historic/download.py @@ -0,0 +1,52 @@ +import pathlib +import requests +import os +from zipfile import ZipFile +import subprocess +from .constants import ARCGIS_IDS + + +def prep_local_dir(): + pathlib.Path("data").mkdir(exist_ok=True) + pathlib.Path("data/input").mkdir(exist_ok=True) + pathlib.Path("data/output").mkdir(exist_ok=True) + + +def download_historic_data(year: str): + if year not in ARCGIS_IDS.keys(): + raise ValueError(f"Year {year} dataset is not available. Supported years are {list(ARCGIS_IDS.keys())}") + + url = f"https://www.arcgis.com/sharing/rest/content/items/{ARCGIS_IDS[year]}/data" + response = requests.get(url) + if response.status_code != 200: + raise ValueError(f"Failed to fetch historic data from {url}. Status code: {response.status_code}") + + with open(f"data/input/{year}.zip", "wb") as f: + f.write(response.content) + return os.path.abspath(f"data/input/{year}.zip") + + +def unzip_historic_data(zip_file: str, output_dir: str): + pathlib.Path(output_dir).mkdir(exist_ok=True) + + try: + with ZipFile(zip_file, "r") as zip_ref: + # Extract all the contents of zip file in different directory + zip_ref.extractall(output_dir) + except NotImplementedError: + print("Zip file extraction failed. Likely due to unsupported compression method.") + print("Attempting to extract using unzip") + subprocess.Popen(["unzip", "-o", "-d", output_dir, zip_file]) + + return output_dir + + +def list_files_in_dir(dir: str): + csv_files = [] + files = os.listdir(dir) + for file in files: + if os.path.isfile(os.path.join(dir, file)): + csv_files.append(os.path.join(dir, file)) + elif os.path.isdir(os.path.join(dir, file)): + csv_files += list_files_in_dir(os.path.join(dir, file)) + return csv_files diff --git a/mbta-performance/chalicelib/historic/process.py b/mbta-performance/chalicelib/historic/process.py new file mode 100644 index 0000000..23efb59 --- /dev/null +++ b/mbta-performance/chalicelib/historic/process.py @@ -0,0 +1,41 @@ +import pandas as pd +import pathlib +from .constants import HISTORIC_COLUMNS + + +def process_events(input_csv: str, outdir: str, nozip: bool = False): + df = pd.read_csv( + input_csv, + usecols=HISTORIC_COLUMNS, + parse_dates=["service_date"], + dtype={ + "route_id": "str", + "trip_id": "str", + "stop_id": "str", + "vehicle_id": "str", + "vehicle_label": "str", + "event_time": "int", + }, + ) + + df["event_time"] = df["service_date"] + pd.to_timedelta(df["event_time_sec"], unit="s") + df.drop(columns=["event_time_sec"], inplace=True) + + service_date_month = pd.Grouper(key="service_date", freq="1M") + grouped = df.groupby([service_date_month, "stop_id"]) + + for name, events in grouped: + service_date, stop_id = name + + fname = pathlib.Path( + outdir, + "Events", + "monthly-data", + str(stop_id), + f"Year={service_date.year}", + f"Month={service_date.month}", + "events.csv.gz", + ) + fname.parent.mkdir(parents=True, exist_ok=True) + # set mtime to 0 in gzip header for determinism (so we can re-gen old routes, and rsync to s3 will ignore) + events.to_csv(fname, index=False, compression={"method": "gzip", "mtime": 0} if not nozip else None) diff --git a/mbta-performance/chalicelib/historic/tests/__init__.py b/mbta-performance/chalicelib/historic/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mbta-performance/chalicelib/lamp/ingest.py b/mbta-performance/chalicelib/lamp/ingest.py index 49d98b2..c1ce14e 100644 --- a/mbta-performance/chalicelib/lamp/ingest.py +++ b/mbta-performance/chalicelib/lamp/ingest.py @@ -4,7 +4,7 @@ import requests import pandas as pd -from .date import format_dateint, get_current_service_date +from ..date import format_dateint, get_current_service_date from .. import parallel from .. import s3 diff --git a/mbta-performance/chalicelib/tests/__init__.py b/mbta-performance/chalicelib/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mbta-performance/chalicelib/lamp/tests/test_date.py b/mbta-performance/chalicelib/tests/test_date.py similarity index 100% rename from mbta-performance/chalicelib/lamp/tests/test_date.py rename to mbta-performance/chalicelib/tests/test_date.py