Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrape geoportal for shn, add SHN flag on stops table #3529

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

charlie-costanzo
Copy link
Member

@charlie-costanzo charlie-costanzo commented Nov 5, 2024

Description

As requested in #3397, this PR creates:

  • An airflow operator to scrape the state geoportal for the state highway network geometry, clean the data and save it to a GCS bucket
  • A dag task for that operator
  • A dag task for creating a new external table for the SHN geometry
  • New source, staging, and docs in dbt for this data
    • Geometry operations in the dim_stops_latest table and a new flag column on the table to indicate whether it exists on the state highway network

Resolves #3397

Type of change

  • New feature

How has this been tested?

locally using airflow/dbt
Screenshot 2024-11-18 at 4 56 20 PM
Screenshot 2024-11-18 at 4 58 32 PM
Screenshot 2024-11-18 at 5 06 16 PM
Screenshot 2024-11-18 at 5 07 34 PM

Post-merge follow-ups

  • Actions required (specified below)
    GCS bucket needs to be added to Composer variables, new airflow operator needs to be turned on

Copy link

github-actions bot commented Nov 12, 2024

Warehouse report 📦

Checks/potential follow-ups

Checks indicate the following action items may be necessary.

  • For new models, do they all have a surrogate primary key that is tested to be not-null and unique?

New models 🌱

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__breakdowns

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__breakdowns_by_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__capital_expenses_by_capital_use

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__capital_expenses_by_mode

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__capital_expenses_for_existing_service

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__capital_expenses_for_expansion_of_service

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__employees_by_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__employees_by_mode

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__employees_by_mode_and_employee_type

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__fuel_and_energy

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__fuel_and_energy_by_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__funding_sources_by_expense_type

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__funding_sources_directly_generated

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__funding_sources_federal

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__funding_sources_local

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__funding_sources_state

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__funding_sources_taxes_levied_by_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__maintenance_facilities

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__maintenance_facilities_by_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__metrics

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__operating_expenses_by_function

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__operating_expenses_by_function_and_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__operating_expenses_by_type

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__operating_expenses_by_type_and_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__service_by_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__service_by_mode

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__service_by_mode_and_time_period

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__stations_by_mode_and_age

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__track_and_roadway_by_agency

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__track_and_roadway_by_mode

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__track_and_roadway_guideway_age_distribution

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__vehicles_age_distribution

calitp_warehouse.mart.ntd_fct_annual.fct_ntd_annual_data__vehicles_type_count_by_agency

calitp_warehouse.staging.state_geoportal.stg_state_geoportal__state_highway_network_stops

DAG

Legend (in order of precedence)

Resource type Indicator Resolution
Large table-materialized model Orange Make the model incremental
Large model without partitioning or clustering Orange Add partitioning and/or clustering
View with more than one child Yellow Materialize as a table or incremental
Incremental Light green
Table Green
View White

Copy link
Contributor

@mjumbewu mjumbewu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I know it looks like a lot of comments, but most of it pertains to the options on the geoportal operator. Overall, I really dig this work. Also note that I did not run any of the suggested changes, so 🙏 please make sure they make sense. Happy to hop on a call tomorrow to discuss.

product: str
where: str
outFields: str
f: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: We probably always want the format that comes from the API to be the same (e.g. GeoJSON) if this operator is going to convert it to JSON-L, so maybe we don't include an f option.

airflow/plugins/operators/scrape_state_geoportal.py Outdated Show resolved Hide resolved
airflow/plugins/operators/scrape_state_geoportal.py Outdated Show resolved Hide resolved
airflow/plugins/operators/scrape_state_geoportal.py Outdated Show resolved Hide resolved
Comment on lines 173 to 206
if self.product == "state_highway_network":
# Select and rename columns
columns = {
"properties.Route": "Route",
"properties.County": "County",
"properties.District": "District",
"properties.RouteType": "RouteType",
"properties.Direction": "Direction",
"geometry.type": "type",
"geometry.coordinates": "coordinates",
}
df = df[list(columns.keys())].rename(columns=columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (non-blocking): I don't think this is something that needs to be addressed right now, but it seems like we could do this without hard-coding the product here. We're really just cleaning up column names -- removing properties. and geometry. prefixes that come from the GeoJSON.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes very good point, refactored


stg_state_geoportal__state_highway_network_stops AS (
SELECT *
-- FROM `cal-itp-data-infra-staging.external_state_geoportal.stg_state_geoportal__state_highway_network_stops`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- FROM `cal-itp-data-infra-staging.external_state_geoportal.stg_state_geoportal__state_highway_network_stops`

@charlie-costanzo charlie-costanzo force-pushed the shn-stops-attribute branch 2 times, most recently from fd5e5d4 to 4365a6c Compare November 18, 2024 20:45
@charlie-costanzo charlie-costanzo marked this pull request as ready for review November 18, 2024 20:45
@charlie-costanzo charlie-costanzo added the data-pipeline-ingestion-and-modeling Ingesting, parsing and modeling data. Evan Siroky is product owner. label Nov 18, 2024
Comment on lines 43 to 49
IF(stops_on_shn.stop_id IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest
FROM
dim_stops_latest
LEFT JOIN
stops_on_shn
ON
dim_stops_latest.stop_id = stops_on_shn.stop_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is joining on stop_id going to potentially cause some conflicts here? There may be two feeds that have the same stop_id that this could match on. Would a join on dim_stops_latest._gtfs_key be more appropriate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @evansiroky – you're completely right. I must have left this in here after some comparative testing, apologies! I just made the swap and am re-requesting your review

Copy link
Contributor

@mjumbewu mjumbewu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, just two more small notes from me, but otherwise I'm ready to approve! Lemme know if you have any questions.

Comment on lines 7 to 9
where: "1=1" # You can change this to filter data
outFields: "*" # Specify the fields to return
f: "geojson" # Format of the response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: These are no longer necessary with the new defaults, right? Should probably remove them (but verify that it still works with them removed).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed and still ran successfully!

params["resultOffset"] = offset

# Make the request
response = requests.get(validated_url, params=params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: What should happen if we get a non-200 response from the server? I would think we should raise an exception so that the task fails and has to retry. Does that seem right? You could add .raise_for_status() at the end of the line if so. See https://3.python-requests.org/user/quickstart/#response-status-codes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added! great suggestion

Copy link
Member

@evansiroky evansiroky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved pending your discussion with @mjumbewu.

@charlie-costanzo charlie-costanzo force-pushed the shn-stops-attribute branch 2 times, most recently from 65689f8 to ac1cd87 Compare November 22, 2024 20:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-pipeline-ingestion-and-modeling Ingesting, parsing and modeling data. Evan Siroky is product owner.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Mart table request: bus stops on the state highway network
3 participants