Skip to content

Commit

Permalink
Merge pull request #1198 from cal-itp/pems-detectors
Browse files Browse the repository at this point in the history
aggregate Pems detectors to station-weekday-hour and station-weekday-peak/offpeak
  • Loading branch information
tiffanychu90 authored Aug 8, 2024
2 parents ff9d67f + 6485122 commit 5420e3f
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 52 deletions.
118 changes: 77 additions & 41 deletions traffic_ops/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,10 @@

import utils
from utils import RAW_GCS, PROCESSED_GCS
from crosswalks import station_id_cols

fs = gcsfs.GCSFileSystem()


station_id_cols = [
'station_id',
'freeway_id',
'freeway_dir',
'city_id',
'county_id',
'district_id',
'station_type',
'param_set',
#'length',
#'abs_postmile',
#'physical_lanes'
]

def create_station_crosswalk(df: dd.DataFrame) -> pd.DataFrame:
"""
Put in a set of columns that identify the
station + freeway + postmile position.
Create a uuid that we can use to get back all
the columns we may want later.
"""
crosswalk = df[
station_id_cols
].drop_duplicates().reset_index(drop=True)


crosswalk = crosswalk.compute()

crosswalk["station_uuid"] = crosswalk.apply(
lambda _: str(uuid.uuid4()), axis=1,
)

crosswalk.to_parquet(
f"{PROCESSED_GCS}station_crosswalk.parquet"
)

return


def read_filepart_merge_crosswalk(
filename: str,
filepart: str,
Expand Down Expand Up @@ -192,6 +153,60 @@ def compute_and_export(
)

return


def import_detector_status(
filename: str = "hov_portion_detector_status_time_window",
) -> pd.DataFrame:
"""
Import detector partitioned df,
parse time_id, and merge in crosswalk to get station_uuid.
"""
df = pd.read_parquet(
f"{RAW_GCS}{filename}/",
columns = ["time_id", "station_id", "all_samples"],
).pipe(
utils.parse_for_time_components
).pipe(
utils.add_peak_offpeak_column, "hour"
)

# Merge in station_uuid
crosswalk = pd.read_parquet(
f"{PROCESSED_GCS}station_crosswalk.parquet",
columns = ["station_id", "station_uuid"]
)

df2 = pd.merge(
df,
crosswalk,
on = "station_id",
how = "inner"
).drop(columns = "station_id")

return df2


def aggregate_detector_samples(
df: pd.DataFrame,
group_cols: list
) -> pd.DataFrame:
"""
Right now, only all_samples is a column that's understood
all_samples = diag_samples, so we'll keep just one column.
We want to know other metrics like:
status (0/1) - which one is good and which one is bad?
suspected_err (integers, 0-9 inclusive)
certain errors like high_flow, zero_occ, etc, but not sure how to interpret
"""
df2 = (df.groupby(group_cols, group_keys=False)
.agg({
"all_samples": "sum",
}).reset_index()
)

return df2


if __name__ == "__main__":
Expand All @@ -209,7 +224,7 @@ def compute_and_export(
station_cols = ["station_uuid"]
weekday_hour_cols = ["year", "month", "weekday", "hour"]
weekday_peak_cols = ["year", "month", "weekday", "peak_offpeak"]

for metric in metric_list:

time0 = datetime.datetime.now()
Expand Down Expand Up @@ -247,5 +262,26 @@ def compute_and_export(
print(f"{metric} peak/offpeak aggregation: {time2 - time1}")
print(f"{metric} aggregation: {time2 - time0}")


detector_df = import_detector_status()

detector_station_hour = aggregate_detector_samples(
detector_df,
station_cols + weekday_hour_cols
)

detector_station_hour.to_parquet(
f"{PROCESSED_GCS}station_weekday_hour_detectors.parquet"
)

detector_station_weekday = aggregate_detector_samples(
detector_df,
station_cols + weekday_peak_cols
)

detector_station_weekday.to_parquet(
f"{PROCESSED_GCS}station_weekday_peak_detectors.parquet"
)

end = datetime.datetime.now()
print(f"execution time: {end - start}")
59 changes: 59 additions & 0 deletions traffic_ops/crosswalks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import pandas as pd
import uuid

from utils import RAW_GCS, PROCESSED_GCS

station_id_cols = [
'station_id',
'freeway_id',
'freeway_dir',
'city_id',
'county_id',
'district_id',
'station_type',
'param_set',
#'length',
#'abs_postmile',
#'physical_lanes'
]

def create_station_crosswalk(filename: str = "hov_portion") -> pd.DataFrame:
"""
Put in a set of columns that identify the
station + freeway + postmile position.
Create a uuid that we can use to get back all
the columns we may want later.
"""
df = pd.read_parquet(
f"{RAW_GCS}{filename}",
columns = station_id_cols
).drop_duplicates().reset_index(drop=True)

df["station_uuid"] = df.apply(
lambda _: str(uuid.uuid4()), axis=1,
)

df.to_parquet(
f"{PROCESSED_GCS}station_crosswalk.parquet"
)

return


def create_station_detector_crosswalk(
filename: str = "hov_portion_detector_status_time_window"
):
df = pd.read_parquet(
f"{RAW_GCS}{filename}",
columns = ["station_id", "detector_id"]
).drop_duplicates().reset_index(drop=True)

df.to_parquet(
f"{PROCESSED_GCS}station_detector_crosswalk.parquet"
)

return

if __name__ == "__main__":
create_station_crosswalk()
create_station_detector_crosswalk()
16 changes: 6 additions & 10 deletions traffic_ops/import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
from typing import Union

from calitp_data_analysis.sql import to_snakecase

GCS_FILE_PATH = (
"gs://calitp-analytics-data/data-analyses/"
"traffic_ops_raw_data/hov_pems/"
)
from utils import RAW_GCS

fs = gcsfs.GCSFileSystem()

Expand Down Expand Up @@ -45,7 +41,7 @@ def import_csv_export_parquet(file_name: Union[str, Path]):
).pipe(to_snakecase)

df.to_parquet(
f"{GCS_FILE_PATH}staging/{Path(file_name).stem}.parquet"
f"{RAW_GCS}staging/{Path(file_name).stem}.parquet"
)

return
Expand All @@ -57,7 +53,7 @@ def concatenate_files_in_folder(file_name_pattern: str):
parquets, then concatenate together into a
partitioned parquet.
"""
files = fs.glob(f"{GCS_FILE_PATH}{file_name_pattern}.csv")
files = fs.glob(f"{RAW_GCS}{file_name_pattern}.csv")
files = [f"gs://{f}" for f in files]

file_name = file_name_pattern.replace("_*_", "_")
Expand All @@ -74,7 +70,7 @@ def concatenate_files_in_folder(file_name_pattern: str):
# Read in all the parquets in staging/ and
# repartition to go from 100+ -> fewer partitions
df = dd.read_parquet(
f"{GCS_FILE_PATH}staging/{Path(file_name_pattern).stem}.parquet",
f"{RAW_GCS}staging/{Path(file_name_pattern).stem}.parquet",
dtype = {"time_id": "datetime64"}
)

Expand All @@ -84,7 +80,7 @@ def concatenate_files_in_folder(file_name_pattern: str):
df = df.repartition(npartitions=25)

df.to_parquet(
f"{GCS_FILE_PATH}{Path(file_name).stem}",
f"{RAW_GCS}{Path(file_name).stem}",
engine="pyarrow"
)

Expand All @@ -108,6 +104,6 @@ def concatenate_files_in_folder(file_name_pattern: str):
print(f"save {f} as parquet: {end - start}")

# Delete staging folder
fs.rm(f"{GCS_FILE_PATH}staging/", recursive=True)
fs.rm(f"{RAW_GCS}staging/", recursive=True)


8 changes: 7 additions & 1 deletion traffic_ops/research_agenda.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ Gather a list of typical questions that are asked around this dataset. Let's see
* add a lane during traffic really is not a good idea, because traffic does occur at certain bottlenecks
* express lanes pick up 18.5 - 47.5 (look along this length), how many ppl do you move, vehicles you move, speed at which you move them, across congested vs non-congested time (peak vs offpeak)
* 4 GP lanes move Y1 ppl vs 2 HOT lanes move Y2 ppl
* 10 has 1 lane on some part, 2 lanes on other parts
* 10 has 1 lane on some part, 2 lanes on other parts

## References
* [detector status report](https://github.com/cagov/caldata-mdsa-caltrans-pems/issues/269)
* [high flows GH issue](https://github.com/cagov/caldata-mdsa-caltrans-pems/issues/278)
* [dbt models sources.yml](https://github.com/cagov/caldata-mdsa-caltrans-pems/blob/main/transform/models/_sources.yml)
* [grains naming conventions](https://github.com/cagov/caldata-mdsa-caltrans-pems/issues/241)

0 comments on commit 5420e3f

Please sign in to comment.