Skip to content

Commit

Permalink
remove data fetcher, fix park priority glitch, save pipeline output a…
Browse files Browse the repository at this point in the history
…s parquet file in GCS
  • Loading branch information
nlebovits committed Oct 3, 2024
1 parent 6e3f8b4 commit d6cb55f
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 142 deletions.
14 changes: 0 additions & 14 deletions data/Dockerfile-fetch-data

This file was deleted.

14 changes: 0 additions & 14 deletions data/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,5 @@ services:
extra_hosts:
- host.docker.internal:host-gateway

data-fetcher:
build:
context: .
dockerfile: Dockerfile-fetch-data
depends_on:
- vacant-lots-proj
environment:
- VACANT_LOTS_DB
volumes:
- ./src:/usr/src/app
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
network_mode: 'host'

volumes:
database_volume:
77 changes: 59 additions & 18 deletions data/src/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,25 @@ def create_centroid_gdf(self):
self.centroid_gdf = self.gdf.copy()
self.centroid_gdf.loc[:, "geometry"] = self.gdf["geometry"].centroid

def build_and_publish_pmtiles(self, tileset_id):
zoom_threshold = 13
def build_and_publish(self, tiles_file_id_prefix: str) -> None:
"""
Builds PMTiles and a Parquet file from a GeoDataFrame and publishes them to Google Cloud Storage.
Args:
tiles_file_id_prefix (str): The ID prefix used for naming the PMTiles and Parquet files, coming from config.
Raises:
ValueError: Raised if the generated PMTiles file is smaller than the minimum allowed size, indicating a potential corruption or incomplete file.
"""
zoom_threshold: int = 13

# Export the GeoDataFrame to a temporary GeoJSON file
temp_geojson_points = f"tmp/temp_{tileset_id}_points.geojson"
temp_geojson_polygons = f"tmp/temp_{tileset_id}_polygons.geojson"
temp_pmtiles_points = f"tmp/temp_{tileset_id}_points.pmtiles"
temp_pmtiles_polygons = f"tmp/temp_{tileset_id}_polygons.pmtiles"
temp_merged_pmtiles = f"tmp/temp_{tileset_id}_merged.pmtiles"
temp_geojson_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.geojson"
temp_geojson_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.geojson"
temp_pmtiles_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.pmtiles"
temp_pmtiles_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.pmtiles"
temp_merged_pmtiles: str = f"tmp/temp_{tiles_file_id_prefix}_merged.pmtiles"
temp_parquet: str = f"tmp/{tiles_file_id_prefix}.parquet"

# Reproject
gdf_wm = self.gdf.to_crs(epsg=4326)
Expand All @@ -304,8 +314,36 @@ def build_and_publish_pmtiles(self, tileset_id):
self.centroid_gdf = self.centroid_gdf.to_crs(epsg=4326)
self.centroid_gdf.to_file(temp_geojson_points, driver="GeoJSON")

# Load the GeoJSON from the polygons, drop geometry, and save as Parquet
gdf_polygons = gpd.read_file(temp_geojson_polygons)
df_no_geom = gdf_polygons.drop(columns=["geometry"])

# Check if the DataFrame has fewer than 25,000 rows
num_rows, num_cols = df_no_geom.shape
if num_rows < 25000:
print(
f"Parquet file has {num_rows} rows, which is fewer than 25,000. Skipping upload."
)
return

# Save the DataFrame as Parquet
df_no_geom.to_parquet(temp_parquet)

# Upload Parquet to Google Cloud Storage
blob_parquet = bucket.blob(f"{tiles_file_id_prefix}.parquet")
try:
blob_parquet.upload_from_filename(temp_parquet)
parquet_size = os.stat(temp_parquet).st_size
parquet_size_mb = parquet_size / (1024 * 1024)
print(
f"Parquet upload successful! Size: {parquet_size} bytes ({parquet_size_mb:.2f} MB), Dimensions: {num_rows} rows, {num_cols} columns."
)
except Exception as e:
print(f"Parquet upload failed: {e}")
return

# Command for generating PMTiles for points up to zoom level zoom_threshold
points_command = [
points_command: list[str] = [
"tippecanoe",
f"--output={temp_pmtiles_points}",
f"--maximum-zoom={zoom_threshold}",
Expand All @@ -320,7 +358,7 @@ def build_and_publish_pmtiles(self, tileset_id):
]

# Command for generating PMTiles for polygons from zoom level zoom_threshold
polygons_command = [
polygons_command: list[str] = [
"tippecanoe",
f"--output={temp_pmtiles_polygons}",
f"--minimum-zoom={zoom_threshold}",
Expand All @@ -334,7 +372,7 @@ def build_and_publish_pmtiles(self, tileset_id):
]

# Command for merging the two PMTiles files into a single output file
merge_command = [
merge_command: list[str] = [
"tile-join",
f"--output={temp_merged_pmtiles}",
"--no-tile-size-limit",
Expand All @@ -347,20 +385,23 @@ def build_and_publish_pmtiles(self, tileset_id):
for command in [points_command, polygons_command, merge_command]:
subprocess.run(command)

write_files = [f"{tileset_id}_staging.pmtiles"]
write_files: list[str] = [f"{tiles_file_id_prefix}_staging.pmtiles"]

if write_production_tiles_file:
write_files.append(f"{tileset_id}.pmtiles")
write_files.append(f"{tiles_file_id_prefix}.pmtiles")

# check whether the temp saved tiles files is big enough.
# If not then it might be corrupted so log error and don't upload to gcp.
file_size = os.stat(temp_merged_pmtiles).st_size
# Check whether the temp saved tiles files is big enough.
file_size: int = os.stat(temp_merged_pmtiles).st_size
if file_size < min_tiles_file_size_in_bytes:
raise ValueError(
f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
)

# Upload to Google Cloud Storage
# Upload PMTiles to Google Cloud Storage
for file in write_files:
blob = bucket.blob(file)
blob.upload_from_filename(temp_merged_pmtiles)
try:
blob.upload_from_filename(temp_merged_pmtiles)
print(f"PMTiles upload successful for {file}!")
except Exception as e:
print(f"PMTiles upload failed for {file}: {e}")
110 changes: 65 additions & 45 deletions data/src/data_utils/park_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from classes.featurelayer import FeatureLayer
from config.config import USE_CRS
from tqdm import tqdm
import pyogrio


def get_latest_shapefile_url() -> str:
Expand All @@ -32,17 +33,61 @@ def get_latest_shapefile_url() -> str:
raise ValueError("Shapefile link not found on the page")


def download_and_process_shapefile(
geojson_path: str, park_url: str, target_files: List[str], file_name_prefix: str
) -> gpd.GeoDataFrame:
"""
Downloads and processes the shapefile to create a GeoDataFrame for Philadelphia parks.
Args:
geojson_path (str): Path to save the GeoJSON file.
park_url (str): URL to download the shapefile.
target_files (List[str]): List of files to extract from the shapefile.
file_name_prefix (str): Prefix for the file names to be extracted.
Returns:
gpd.GeoDataFrame: GeoDataFrame containing the processed park data.
"""
print("Downloading and processing park priority data...")
response: requests.Response = requests.get(park_url, stream=True)
total_size: int = int(response.headers.get("content-length", 0))

with tqdm(
total=total_size, unit="iB", unit_scale=True, desc="Downloading"
) as progress_bar:
buffer: BytesIO = BytesIO()
for data in response.iter_content(1024):
size: int = buffer.write(data)
progress_bar.update(size)

with zipfile.ZipFile(buffer) as zip_ref:
for file_name in tqdm(target_files, desc="Extracting"):
zip_ref.extract(file_name, "tmp/")

print("Processing shapefile...")
pa_parks: gpd.GeoDataFrame = gpd.read_file(
"tmp/" + file_name_prefix + "_ParkPriorityAreas.shp"
)
pa_parks = pa_parks.to_crs(USE_CRS)

phl_parks: gpd.GeoDataFrame = pa_parks[pa_parks["ID"].str.startswith("42101")]
phl_parks = phl_parks.loc[:, ["ParkNeed", "geometry"]]

if isinstance(phl_parks, gpd.GeoDataFrame):
phl_parks.rename(columns={"ParkNeed": "park_priority"}, inplace=True)
else:
raise TypeError("Expected a GeoDataFrame, got Series or another type instead")

print(f"Writing filtered data to GeoJSON: {geojson_path}")
phl_parks.to_file(geojson_path, driver="GeoJSON")

return phl_parks


def park_priority(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Downloads and processes park priority data, then joins it with the primary feature layer.
This function performs the following steps:
1. Downloads the latest shapefile from TPL.
2. Extracts relevant files from the downloaded zip.
3. Processes the shapefile, filtering for Philadelphia County.
4. Creates a GeoJSON file with the processed data.
5. Joins the processed data with the primary feature layer.
Args:
primary_featurelayer (FeatureLayer): The primary feature layer to join with park priority data.
Expand All @@ -66,47 +111,22 @@ def park_priority(primary_featurelayer: FeatureLayer) -> FeatureLayer:

os.makedirs("tmp/", exist_ok=True)

if not os.path.exists(geojson_path):
print("Downloading and processing park priority data...")
response: requests.Response = requests.get(park_url, stream=True)
total_size: int = int(response.headers.get("content-length", 0))

with tqdm(
total=total_size, unit="iB", unit_scale=True, desc="Downloading"
) as progress_bar:
buffer: BytesIO = BytesIO()
for data in response.iter_content(1024):
size: int = buffer.write(data)
progress_bar.update(size)

with zipfile.ZipFile(buffer) as zip_ref:
for file_name in tqdm(target_files, desc="Extracting"):
zip_ref.extract(file_name, "tmp/")

print("Processing shapefile...")
pa_parks: gpd.GeoDataFrame = gpd.read_file(
"tmp/" + file_name_prefix + "_ParkPriorityAreas.shp"
)
pa_parks = pa_parks.to_crs(USE_CRS)

phl_parks: gpd.GeoDataFrame = pa_parks[pa_parks["ID"].str.startswith("42101")]
phl_parks = phl_parks.loc[:, ["ParkNeed", "geometry"]]

# Print the type of phl_parks for debugging
print(f"Type of phl_parks: {type(phl_parks)}")

if isinstance(phl_parks, gpd.GeoDataFrame):
phl_parks.rename(columns={"ParkNeed": "park_priority"}, inplace=True)
try:
if os.path.exists(geojson_path):
print(f"GeoJSON file already exists, loading from {geojson_path}")
phl_parks: gpd.GeoDataFrame = gpd.read_file(geojson_path)
else:
raise TypeError(
"Expected a GeoDataFrame, got Series or another type instead"
raise pyogrio.errors.DataSourceError(
"GeoJSON file missing, forcing download."
)

print(f"Writing filtered data to GeoJSON: {geojson_path}")
phl_parks.to_file(geojson_path, driver="GeoJSON")
else:
print(f"GeoJSON file already exists, loading from {geojson_path}")
phl_parks: gpd.GeoDataFrame = gpd.read_file(geojson_path)
except (pyogrio.errors.DataSourceError, ValueError) as e:
print(f"Error loading GeoJSON: {e}. Re-downloading and processing shapefile.")
if os.path.exists(geojson_path):
os.remove(geojson_path) # Delete the corrupted GeoJSON if it exists
phl_parks = download_and_process_shapefile(
geojson_path, park_url, target_files, file_name_prefix
)

park_priority_layer: FeatureLayer = FeatureLayer("Park Priority")
park_priority_layer.gdf = phl_parks
Expand Down
50 changes: 0 additions & 50 deletions data/src/fetch_and_save_data.py

This file was deleted.

2 changes: 1 addition & 1 deletion data/src/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
conn.commit()

# Post to Mapbox
dataset.build_and_publish_pmtiles(tiles_file_id_prefix)
dataset.build_and_publish(tiles_file_id_prefix)

# if we are reloading, run the diff report, then archive the backup and finally prune old archives
if FORCE_RELOAD:
Expand Down

0 comments on commit d6cb55f

Please sign in to comment.