Skip to content

Commit

Permalink
Process monthly batch files for rapid transit (#11)
Browse files Browse the repository at this point in the history
* Process monthly batch files

* Moved columns definition to constants
  • Loading branch information
devinmatte authored Apr 26, 2024
1 parent ecba3f5 commit 328aa4b
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 1 deletion.
File renamed without changes.
17 changes: 17 additions & 0 deletions mbta-performance/chalicelib/historic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Monthly Data Processing

MBTA uploads monthly data files periodically. These monthly batches take the place of performance data when available (This may change with LAMP).

## Backfill all years

This should only be done if we change the processing code or need to repopulate an empty bucket

```sh
poetry run python -m mbta-performance.chalicelib.historic.backfill.main
```

### Upload to S3

```sh
aws s3 cp --recursive data/output/Events/ s3://tm-mbta-performance/Events/
```
Empty file.
Empty file.
39 changes: 39 additions & 0 deletions mbta-performance/chalicelib/historic/backfill/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import threading
from ..constants import ARCGIS_IDS
from ..download import download_historic_data, list_files_in_dir, prep_local_dir, unzip_historic_data
from ..process import process_events


def single_year_thread(year: str):
print(f"Backfilling year {year}")
# download the data
zip_file = download_historic_data(year)
# unzip the data
input_dir = unzip_historic_data(zip_file, f"data/input/{year}")
# process the data
for file in list_files_in_dir(input_dir):
process_events(file, "data/output")
print(f"Finished backfilling year {year}")


def backfill_all_years():
"""Backfill all years of MBTA data we can"""

prep_local_dir()

year_threads: list[threading.Thread] = []
for year in ARCGIS_IDS.keys():
year_thread = threading.Thread(
target=single_year_thread,
args=(year,),
name=year,
)
year_threads.append(year_thread)
year_thread.start()

for year_thread in year_threads:
year_thread.join()


if __name__ == "__main__":
backfill_all_years()
25 changes: 25 additions & 0 deletions mbta-performance/chalicelib/historic/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
ARCGIS_IDS = {
"2016": "3e892be850fe4cc4a15d6450de4bd318",
"2017": "cde60045db904ad299922f4f8759dcad",
"2018": "25c3086e9826407e9f59dd9844f6c975",
"2019": "11bbb87f8fb245c2b87ed3c8a099b95f",
"2020": "cb4cf52bafb1402b9b978a424ed4dd78",
"2021": "611b8c77f30245a0af0c62e2859e8b49",
"2022": "99094a0c59e443cdbdaefa071c6df609",
"2023": "9a7f5634db72459ab731b6a9b274a1d4",
"2024": "4adbec39db40498a8530496d8c63a924",
}


HISTORIC_COLUMNS = [
"service_date",
"route_id",
"trip_id",
"direction_id",
"stop_id",
"stop_sequence",
"vehicle_id",
"vehicle_label",
"event_type",
"event_time_sec",
]
52 changes: 52 additions & 0 deletions mbta-performance/chalicelib/historic/download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pathlib
import requests
import os
from zipfile import ZipFile
import subprocess
from .constants import ARCGIS_IDS


def prep_local_dir():
pathlib.Path("data").mkdir(exist_ok=True)
pathlib.Path("data/input").mkdir(exist_ok=True)
pathlib.Path("data/output").mkdir(exist_ok=True)


def download_historic_data(year: str):
if year not in ARCGIS_IDS.keys():
raise ValueError(f"Year {year} dataset is not available. Supported years are {list(ARCGIS_IDS.keys())}")

url = f"https://www.arcgis.com/sharing/rest/content/items/{ARCGIS_IDS[year]}/data"
response = requests.get(url)
if response.status_code != 200:
raise ValueError(f"Failed to fetch historic data from {url}. Status code: {response.status_code}")

with open(f"data/input/{year}.zip", "wb") as f:
f.write(response.content)
return os.path.abspath(f"data/input/{year}.zip")


def unzip_historic_data(zip_file: str, output_dir: str):
pathlib.Path(output_dir).mkdir(exist_ok=True)

try:
with ZipFile(zip_file, "r") as zip_ref:
# Extract all the contents of zip file in different directory
zip_ref.extractall(output_dir)
except NotImplementedError:
print("Zip file extraction failed. Likely due to unsupported compression method.")
print("Attempting to extract using unzip")
subprocess.Popen(["unzip", "-o", "-d", output_dir, zip_file])

return output_dir


def list_files_in_dir(dir: str):
csv_files = []
files = os.listdir(dir)
for file in files:
if os.path.isfile(os.path.join(dir, file)):
csv_files.append(os.path.join(dir, file))
elif os.path.isdir(os.path.join(dir, file)):
csv_files += list_files_in_dir(os.path.join(dir, file))
return csv_files
41 changes: 41 additions & 0 deletions mbta-performance/chalicelib/historic/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pandas as pd
import pathlib
from .constants import HISTORIC_COLUMNS


def process_events(input_csv: str, outdir: str, nozip: bool = False):
df = pd.read_csv(
input_csv,
usecols=HISTORIC_COLUMNS,
parse_dates=["service_date"],
dtype={
"route_id": "str",
"trip_id": "str",
"stop_id": "str",
"vehicle_id": "str",
"vehicle_label": "str",
"event_time": "int",
},
)

df["event_time"] = df["service_date"] + pd.to_timedelta(df["event_time_sec"], unit="s")
df.drop(columns=["event_time_sec"], inplace=True)

service_date_month = pd.Grouper(key="service_date", freq="1M")
grouped = df.groupby([service_date_month, "stop_id"])

for name, events in grouped:
service_date, stop_id = name

fname = pathlib.Path(
outdir,
"Events",
"monthly-data",
str(stop_id),
f"Year={service_date.year}",
f"Month={service_date.month}",
"events.csv.gz",
)
fname.parent.mkdir(parents=True, exist_ok=True)
# set mtime to 0 in gzip header for determinism (so we can re-gen old routes, and rsync to s3 will ignore)
events.to_csv(fname, index=False, compression={"method": "gzip", "mtime": 0} if not nozip else None)
Empty file.
2 changes: 1 addition & 1 deletion mbta-performance/chalicelib/lamp/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import requests
import pandas as pd

from .date import format_dateint, get_current_service_date
from ..date import format_dateint, get_current_service_date
from .. import parallel
from .. import s3

Expand Down
Empty file.

0 comments on commit 328aa4b

Please sign in to comment.