diff --git a/transform/models/intermediate/performance/int_performance__detector_metrics_agg_daily.sql b/transform/models/intermediate/performance/int_performance__detector_metrics_agg_daily.sql index 45c08ef4..b551ac5c 100644 --- a/transform/models/intermediate/performance/int_performance__detector_metrics_agg_daily.sql +++ b/transform/models/intermediate/performance/int_performance__detector_metrics_agg_daily.sql @@ -1,9 +1,14 @@ -{{ config(materialized='table') }} +{{ config( + materialized="incremental", + unique_key=["detector_id", "sample_date"], + on_schema_change="sync_all_columns", +) }} -- read the station hourly data with station_hourly_data as ( select * from {{ ref('int_performance__detector_metrics_agg_hourly') }} + where {{ make_model_incremental('sample_date') }} ), -- now aggregate hourly volume, occupancy and speed to daily level diff --git a/transform/models/marts/performance/performance__station_metric_agg_recent_one_week.sql b/transform/models/marts/performance/performance__station_metric_agg_recent_one_week.sql index dae6b63b..05acf0d3 100644 --- a/transform/models/marts/performance/performance__station_metric_agg_recent_one_week.sql +++ b/transform/models/marts/performance/performance__station_metric_agg_recent_one_week.sql @@ -66,31 +66,30 @@ hourly_station_volume as ( where sample_date >= dateadd('day', -8, current_date()) ), +/* + * Join station metadata pairs with hourly observations. This could be + * expressed more naturally with 2 joins, 1 for ML stations and 1 for HOV, + * but doing so incurs a significant performance cost. Instead, we have a + * more permissive join and an aggregation. + */ station_with_ml_hov_metrics as ( select - ax.*, - ml.sample_date, - ml.sample_hour, - ml.hourly_volume as ml_hourly_volume, - ml.hourly_vmt as ml_hourly_vmt, - ml.hourly_vht as ml_hourly_vht, - hov.hourly_volume as hov_hourly_volume, - hov.hourly_vmt as hov_hourly_vmt, - hov.hourly_vht as hov_hourly_vht - from closest_station_with_selection as ax - inner join hourly_station_volume as ml + css.*, + hourly.sample_date, + hourly.sample_hour, + max(case when hourly.station_type = 'ML' then hourly.hourly_volume end) as ml_hourly_volume, + max(case when hourly.station_type = 'ML' then hourly.hourly_vmt end) as ml_hourly_vmt, + max(case when hourly.station_type = 'ML' then hourly.hourly_vht end) as ml_hourly_vht, + max(case when hourly.station_type = 'HV' then hourly.hourly_volume end) as hov_hourly_volume, + max(case when hourly.station_type = 'HV' then hourly.hourly_vmt end) as hov_hourly_vmt, + max(case when hourly.station_type = 'HV' then hourly.hourly_vht end) as hov_hourly_vht + from closest_station_with_selection as css + inner join hourly_station_volume as hourly on - ax.ml_station_id = ml.station_id - and ax.direction = ml.direction - and ax.active_date = ml.sample_date - and ml.station_type = 'ML' - inner join hourly_station_volume as hov - on - ax.hov_station_id = hov.station_id - and ax.direction = hov.direction - and ml.sample_date = hov.sample_date - and ml.sample_hour = hov.sample_hour - and hov.station_type = 'HV' + (css.ml_station_id = hourly.station_id or css.hov_station_id = hourly.station_id) + and css.direction = hourly.direction + and css.active_date = hourly.sample_date + group by all ), station_metric_agg as ( diff --git a/transform/models/marts/quality/quality__row_count_summary.sql b/transform/models/marts/quality/quality__row_count_summary.sql index 7c99eef7..3409ebc9 100644 --- a/transform/models/marts/quality/quality__row_count_summary.sql +++ b/transform/models/marts/quality/quality__row_count_summary.sql @@ -1,3 +1,6 @@ +{{ config( + snowflake_warehouse = get_snowflake_warehouse(size="XL") +) }} with ML_HV_DETECTOR_STATUS_DAILY_COUNT as (