Skip to content

Commit

Permalink
refactor how we keep final columns, dynamically rename columns
Browse files Browse the repository at this point in the history
  • Loading branch information
charlie-costanzo committed Nov 18, 2024
1 parent 398d0b5 commit 4365a6c
Showing 1 changed file with 18 additions and 22 deletions.
40 changes: 18 additions & 22 deletions airflow/plugins/operators/scrape_state_geoportal.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,34 +193,30 @@ def execute(self, **kwargs):
df = pd.json_normalize(api_content)

if self.product == "state_highway_network":
# Select and rename columns
columns = {
"properties.Route": "Route",
"properties.County": "County",
"properties.District": "District",
"properties.RouteType": "RouteType",
"properties.Direction": "Direction",
"geometry.type": "type",
"geometry.coordinates": "coordinates",
}
df = df[list(columns.keys())].rename(columns=columns)
# Select columns to keep
df = df[
[
"properties.Route",
"properties.County",
"properties.District",
"properties.RouteType",
"properties.Direction",
"geometry.type",
"geometry.coordinates",
]
]

# Dynamically create a mapping by removing known prefixes
columns = {col: col.split(".")[-1] for col in df.columns}

# Rename columns using the dynamically created mapping
df = df.rename(columns=columns)

# Create new column with WKT format
df["wkt_coordinates"] = df.apply(
lambda row: to_wkt(row["type"], row["coordinates"]), axis=1
)

# Select final columns for output
final_columns = [
"Route",
"County",
"District",
"RouteType",
"Direction",
"wkt_coordinates",
]
df = df[final_columns]

# Compress the DataFrame content and save it
self.gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
Expand Down

0 comments on commit 4365a6c

Please sign in to comment.