Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process monthly batch files for rapid transit #11

Merged
merged 2 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions mbta-performance/chalicelib/historic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Monthly Data Processing

MBTA uploads monthly data files periodically. These monthly batches take the place of performance data when available (This may change with LAMP).

## Backfill all years

This should only be done if we change the processing code or need to repopulate an empty bucket

```sh
poetry run python -m mbta-performance.chalicelib.historic.backfill.main
```

### Upload to S3

```sh
aws s3 cp --recursive data/output/Events/ s3://tm-mbta-performance/Events/
```
Empty file.
Empty file.
39 changes: 39 additions & 0 deletions mbta-performance/chalicelib/historic/backfill/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import threading
from ..constants import ARCGIS_IDS
from ..download import download_historic_data, list_files_in_dir, prep_local_dir, unzip_historic_data
from ..process import process_events


def single_year_thread(year: str):
print(f"Backfilling year {year}")
# download the data
zip_file = download_historic_data(year)
# unzip the data
input_dir = unzip_historic_data(zip_file, f"data/input/{year}")
# process the data
for file in list_files_in_dir(input_dir):
process_events(file, "data/output")
print(f"Finished backfilling year {year}")


def backfill_all_years():
"""Backfill all years of MBTA data we can"""

prep_local_dir()

year_threads: list[threading.Thread] = []
for year in ARCGIS_IDS.keys():
year_thread = threading.Thread(
target=single_year_thread,
args=(year,),
name=year,
)
year_threads.append(year_thread)
year_thread.start()

for year_thread in year_threads:
year_thread.join()


if __name__ == "__main__":
backfill_all_years()
25 changes: 25 additions & 0 deletions mbta-performance/chalicelib/historic/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
ARCGIS_IDS = {
"2016": "3e892be850fe4cc4a15d6450de4bd318",
"2017": "cde60045db904ad299922f4f8759dcad",
"2018": "25c3086e9826407e9f59dd9844f6c975",
"2019": "11bbb87f8fb245c2b87ed3c8a099b95f",
"2020": "cb4cf52bafb1402b9b978a424ed4dd78",
"2021": "611b8c77f30245a0af0c62e2859e8b49",
"2022": "99094a0c59e443cdbdaefa071c6df609",
"2023": "9a7f5634db72459ab731b6a9b274a1d4",
"2024": "4adbec39db40498a8530496d8c63a924",
}


HISTORIC_COLUMNS = [
"service_date",
"route_id",
"trip_id",
"direction_id",
"stop_id",
"stop_sequence",
"vehicle_id",
"vehicle_label",
"event_type",
"event_time_sec",
]
52 changes: 52 additions & 0 deletions mbta-performance/chalicelib/historic/download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pathlib
import requests
import os
from zipfile import ZipFile
import subprocess
from .constants import ARCGIS_IDS


def prep_local_dir():
pathlib.Path("data").mkdir(exist_ok=True)
pathlib.Path("data/input").mkdir(exist_ok=True)
pathlib.Path("data/output").mkdir(exist_ok=True)


def download_historic_data(year: str):
if year not in ARCGIS_IDS.keys():
raise ValueError(f"Year {year} dataset is not available. Supported years are {list(ARCGIS_IDS.keys())}")

url = f"https://www.arcgis.com/sharing/rest/content/items/{ARCGIS_IDS[year]}/data"
response = requests.get(url)
if response.status_code != 200:
raise ValueError(f"Failed to fetch historic data from {url}. Status code: {response.status_code}")

with open(f"data/input/{year}.zip", "wb") as f:
f.write(response.content)
return os.path.abspath(f"data/input/{year}.zip")


def unzip_historic_data(zip_file: str, output_dir: str):
pathlib.Path(output_dir).mkdir(exist_ok=True)

try:
with ZipFile(zip_file, "r") as zip_ref:
# Extract all the contents of zip file in different directory
zip_ref.extractall(output_dir)
except NotImplementedError:
print("Zip file extraction failed. Likely due to unsupported compression method.")
print("Attempting to extract using unzip")
subprocess.Popen(["unzip", "-o", "-d", output_dir, zip_file])

return output_dir


def list_files_in_dir(dir: str):
csv_files = []
files = os.listdir(dir)
for file in files:
if os.path.isfile(os.path.join(dir, file)):
csv_files.append(os.path.join(dir, file))
elif os.path.isdir(os.path.join(dir, file)):
csv_files += list_files_in_dir(os.path.join(dir, file))
return csv_files
41 changes: 41 additions & 0 deletions mbta-performance/chalicelib/historic/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pandas as pd
import pathlib
from .constants import HISTORIC_COLUMNS


def process_events(input_csv: str, outdir: str, nozip: bool = False):
df = pd.read_csv(
input_csv,
usecols=HISTORIC_COLUMNS,
parse_dates=["service_date"],
dtype={
"route_id": "str",
"trip_id": "str",
"stop_id": "str",
"vehicle_id": "str",
"vehicle_label": "str",
"event_time": "int",
},
)

df["event_time"] = df["service_date"] + pd.to_timedelta(df["event_time_sec"], unit="s")
df.drop(columns=["event_time_sec"], inplace=True)

service_date_month = pd.Grouper(key="service_date", freq="1M")
grouped = df.groupby([service_date_month, "stop_id"])

for name, events in grouped:
service_date, stop_id = name

fname = pathlib.Path(
outdir,
"Events",
"monthly-data",
str(stop_id),
f"Year={service_date.year}",
f"Month={service_date.month}",
"events.csv.gz",
)
fname.parent.mkdir(parents=True, exist_ok=True)
# set mtime to 0 in gzip header for determinism (so we can re-gen old routes, and rsync to s3 will ignore)
events.to_csv(fname, index=False, compression={"method": "gzip", "mtime": 0} if not nozip else None)
Empty file.
2 changes: 1 addition & 1 deletion mbta-performance/chalicelib/lamp/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import requests
import pandas as pd

from .date import format_dateint, get_current_service_date
from ..date import format_dateint, get_current_service_date
from .. import parallel
from .. import s3

Expand Down
Empty file.