Skip to content

Commit

Permalink
Merge pull request CodeForPhilly#939 from CodeForPhilly/lebovits/save…
Browse files Browse the repository at this point in the history
…-output-parquet-gcs

remove data fetcher, fix park priority glitch, save pipeline output a…
  • Loading branch information
nlebovits authored Oct 3, 2024
2 parents 6e3f8b4 + d6cb55f commit 21bb586
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 142 deletions.
14 changes: 0 additions & 14 deletions data/Dockerfile-fetch-data

This file was deleted.

14 changes: 0 additions & 14 deletions data/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,5 @@ services:
extra_hosts:
- host.docker.internal:host-gateway

data-fetcher:
build:
context: .
dockerfile: Dockerfile-fetch-data
depends_on:
- vacant-lots-proj
environment:
- VACANT_LOTS_DB
volumes:
- ./src:/usr/src/app
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
network_mode: 'host'

volumes:
database_volume:
77 changes: 59 additions & 18 deletions data/src/classes/featurelayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,25 @@ def create_centroid_gdf(self):
self.centroid_gdf = self.gdf.copy()
self.centroid_gdf.loc[:, "geometry"] = self.gdf["geometry"].centroid

def build_and_publish_pmtiles(self, tileset_id):
zoom_threshold = 13
def build_and_publish(self, tiles_file_id_prefix: str) -> None:
"""
Builds PMTiles and a Parquet file from a GeoDataFrame and publishes them to Google Cloud Storage.
Args:
tiles_file_id_prefix (str): The ID prefix used for naming the PMTiles and Parquet files, coming from config.
Raises:
ValueError: Raised if the generated PMTiles file is smaller than the minimum allowed size, indicating a potential corruption or incomplete file.
"""
zoom_threshold: int = 13

# Export the GeoDataFrame to a temporary GeoJSON file
temp_geojson_points = f"tmp/temp_{tileset_id}_points.geojson"
temp_geojson_polygons = f"tmp/temp_{tileset_id}_polygons.geojson"
temp_pmtiles_points = f"tmp/temp_{tileset_id}_points.pmtiles"
temp_pmtiles_polygons = f"tmp/temp_{tileset_id}_polygons.pmtiles"
temp_merged_pmtiles = f"tmp/temp_{tileset_id}_merged.pmtiles"
temp_geojson_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.geojson"
temp_geojson_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.geojson"
temp_pmtiles_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.pmtiles"
temp_pmtiles_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.pmtiles"
temp_merged_pmtiles: str = f"tmp/temp_{tiles_file_id_prefix}_merged.pmtiles"
temp_parquet: str = f"tmp/{tiles_file_id_prefix}.parquet"

# Reproject
gdf_wm = self.gdf.to_crs(epsg=4326)
Expand All @@ -304,8 +314,36 @@ def build_and_publish_pmtiles(self, tileset_id):
self.centroid_gdf = self.centroid_gdf.to_crs(epsg=4326)
self.centroid_gdf.to_file(temp_geojson_points, driver="GeoJSON")

# Load the GeoJSON from the polygons, drop geometry, and save as Parquet
gdf_polygons = gpd.read_file(temp_geojson_polygons)
df_no_geom = gdf_polygons.drop(columns=["geometry"])

# Check if the DataFrame has fewer than 25,000 rows
num_rows, num_cols = df_no_geom.shape
if num_rows < 25000:
print(
f"Parquet file has {num_rows} rows, which is fewer than 25,000. Skipping upload."
)
return

# Save the DataFrame as Parquet
df_no_geom.to_parquet(temp_parquet)

# Upload Parquet to Google Cloud Storage
blob_parquet = bucket.blob(f"{tiles_file_id_prefix}.parquet")
try:
blob_parquet.upload_from_filename(temp_parquet)
parquet_size = os.stat(temp_parquet).st_size
parquet_size_mb = parquet_size / (1024 * 1024)
print(
f"Parquet upload successful! Size: {parquet_size} bytes ({parquet_size_mb:.2f} MB), Dimensions: {num_rows} rows, {num_cols} columns."
)
except Exception as e:
print(f"Parquet upload failed: {e}")
return

# Command for generating PMTiles for points up to zoom level zoom_threshold
points_command = [
points_command: list[str] = [
"tippecanoe",
f"--output={temp_pmtiles_points}",
f"--maximum-zoom={zoom_threshold}",
Expand All @@ -320,7 +358,7 @@ def build_and_publish_pmtiles(self, tileset_id):
]

# Command for generating PMTiles for polygons from zoom level zoom_threshold
polygons_command = [
polygons_command: list[str] = [
"tippecanoe",
f"--output={temp_pmtiles_polygons}",
f"--minimum-zoom={zoom_threshold}",
Expand All @@ -334,7 +372,7 @@ def build_and_publish_pmtiles(self, tileset_id):
]

# Command for merging the two PMTiles files into a single output file
merge_command = [
merge_command: list[str] = [
"tile-join",
f"--output={temp_merged_pmtiles}",
"--no-tile-size-limit",
Expand All @@ -347,20 +385,23 @@ def build_and_publish_pmtiles(self, tileset_id):
for command in [points_command, polygons_command, merge_command]:
subprocess.run(command)

write_files = [f"{tileset_id}_staging.pmtiles"]
write_files: list[str] = [f"{tiles_file_id_prefix}_staging.pmtiles"]

if write_production_tiles_file:
write_files.append(f"{tileset_id}.pmtiles")
write_files.append(f"{tiles_file_id_prefix}.pmtiles")

# check whether the temp saved tiles files is big enough.
# If not then it might be corrupted so log error and don't upload to gcp.
file_size = os.stat(temp_merged_pmtiles).st_size
# Check whether the temp saved tiles files is big enough.
file_size: int = os.stat(temp_merged_pmtiles).st_size
if file_size < min_tiles_file_size_in_bytes:
raise ValueError(
f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete."
)

# Upload to Google Cloud Storage
# Upload PMTiles to Google Cloud Storage
for file in write_files:
blob = bucket.blob(file)
blob.upload_from_filename(temp_merged_pmtiles)
try:
blob.upload_from_filename(temp_merged_pmtiles)
print(f"PMTiles upload successful for {file}!")
except Exception as e:
print(f"PMTiles upload failed for {file}: {e}")
110 changes: 65 additions & 45 deletions data/src/data_utils/park_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from classes.featurelayer import FeatureLayer
from config.config import USE_CRS
from tqdm import tqdm
import pyogrio


def get_latest_shapefile_url() -> str:
Expand All @@ -32,17 +33,61 @@ def get_latest_shapefile_url() -> str:
raise ValueError("Shapefile link not found on the page")


def download_and_process_shapefile(
geojson_path: str, park_url: str, target_files: List[str], file_name_prefix: str
) -> gpd.GeoDataFrame:
"""
Downloads and processes the shapefile to create a GeoDataFrame for Philadelphia parks.
Args:
geojson_path (str): Path to save the GeoJSON file.
park_url (str): URL to download the shapefile.
target_files (List[str]): List of files to extract from the shapefile.
file_name_prefix (str): Prefix for the file names to be extracted.
Returns:
gpd.GeoDataFrame: GeoDataFrame containing the processed park data.
"""
print("Downloading and processing park priority data...")
response: requests.Response = requests.get(park_url, stream=True)
total_size: int = int(response.headers.get("content-length", 0))

with tqdm(
total=total_size, unit="iB", unit_scale=True, desc="Downloading"
) as progress_bar:
buffer: BytesIO = BytesIO()
for data in response.iter_content(1024):
size: int = buffer.write(data)
progress_bar.update(size)

with zipfile.ZipFile(buffer) as zip_ref:
for file_name in tqdm(target_files, desc="Extracting"):
zip_ref.extract(file_name, "tmp/")

print("Processing shapefile...")
pa_parks: gpd.GeoDataFrame = gpd.read_file(
"tmp/" + file_name_prefix + "_ParkPriorityAreas.shp"
)
pa_parks = pa_parks.to_crs(USE_CRS)

phl_parks: gpd.GeoDataFrame = pa_parks[pa_parks["ID"].str.startswith("42101")]
phl_parks = phl_parks.loc[:, ["ParkNeed", "geometry"]]

if isinstance(phl_parks, gpd.GeoDataFrame):
phl_parks.rename(columns={"ParkNeed": "park_priority"}, inplace=True)
else:
raise TypeError("Expected a GeoDataFrame, got Series or another type instead")

print(f"Writing filtered data to GeoJSON: {geojson_path}")
phl_parks.to_file(geojson_path, driver="GeoJSON")

return phl_parks


def park_priority(primary_featurelayer: FeatureLayer) -> FeatureLayer:
"""
Downloads and processes park priority data, then joins it with the primary feature layer.
This function performs the following steps:
1. Downloads the latest shapefile from TPL.
2. Extracts relevant files from the downloaded zip.
3. Processes the shapefile, filtering for Philadelphia County.
4. Creates a GeoJSON file with the processed data.
5. Joins the processed data with the primary feature layer.
Args:
primary_featurelayer (FeatureLayer): The primary feature layer to join with park priority data.
Expand All @@ -66,47 +111,22 @@ def park_priority(primary_featurelayer: FeatureLayer) -> FeatureLayer:

os.makedirs("tmp/", exist_ok=True)

if not os.path.exists(geojson_path):
print("Downloading and processing park priority data...")
response: requests.Response = requests.get(park_url, stream=True)
total_size: int = int(response.headers.get("content-length", 0))

with tqdm(
total=total_size, unit="iB", unit_scale=True, desc="Downloading"
) as progress_bar:
buffer: BytesIO = BytesIO()
for data in response.iter_content(1024):
size: int = buffer.write(data)
progress_bar.update(size)

with zipfile.ZipFile(buffer) as zip_ref:
for file_name in tqdm(target_files, desc="Extracting"):
zip_ref.extract(file_name, "tmp/")

print("Processing shapefile...")
pa_parks: gpd.GeoDataFrame = gpd.read_file(
"tmp/" + file_name_prefix + "_ParkPriorityAreas.shp"
)
pa_parks = pa_parks.to_crs(USE_CRS)

phl_parks: gpd.GeoDataFrame = pa_parks[pa_parks["ID"].str.startswith("42101")]
phl_parks = phl_parks.loc[:, ["ParkNeed", "geometry"]]

# Print the type of phl_parks for debugging
print(f"Type of phl_parks: {type(phl_parks)}")

if isinstance(phl_parks, gpd.GeoDataFrame):
phl_parks.rename(columns={"ParkNeed": "park_priority"}, inplace=True)
try:
if os.path.exists(geojson_path):
print(f"GeoJSON file already exists, loading from {geojson_path}")
phl_parks: gpd.GeoDataFrame = gpd.read_file(geojson_path)
else:
raise TypeError(
"Expected a GeoDataFrame, got Series or another type instead"
raise pyogrio.errors.DataSourceError(
"GeoJSON file missing, forcing download."
)

print(f"Writing filtered data to GeoJSON: {geojson_path}")
phl_parks.to_file(geojson_path, driver="GeoJSON")
else:
print(f"GeoJSON file already exists, loading from {geojson_path}")
phl_parks: gpd.GeoDataFrame = gpd.read_file(geojson_path)
except (pyogrio.errors.DataSourceError, ValueError) as e:
print(f"Error loading GeoJSON: {e}. Re-downloading and processing shapefile.")
if os.path.exists(geojson_path):
os.remove(geojson_path) # Delete the corrupted GeoJSON if it exists
phl_parks = download_and_process_shapefile(
geojson_path, park_url, target_files, file_name_prefix
)

park_priority_layer: FeatureLayer = FeatureLayer("Park Priority")
park_priority_layer.gdf = phl_parks
Expand Down
50 changes: 0 additions & 50 deletions data/src/fetch_and_save_data.py

This file was deleted.

2 changes: 1 addition & 1 deletion data/src/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
conn.commit()

# Post to Mapbox
dataset.build_and_publish_pmtiles(tiles_file_id_prefix)
dataset.build_and_publish(tiles_file_id_prefix)

# if we are reloading, run the diff report, then archive the backup and finally prune old archives
if FORCE_RELOAD:
Expand Down

0 comments on commit 21bb586

Please sign in to comment.