From d6cb55f938a2b6a2770b2f1fccc55350bb59d567 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Wed, 2 Oct 2024 21:54:28 -0400 Subject: [PATCH] remove data fetcher, fix park priority glitch, save pipeline output as parquet file in GCS --- data/Dockerfile-fetch-data | 14 ---- data/docker-compose.yml | 14 ---- data/src/classes/featurelayer.py | 77 ++++++++++++++----- data/src/data_utils/park_priority.py | 110 ++++++++++++++++----------- data/src/fetch_and_save_data.py | 50 ------------ data/src/script.py | 2 +- 6 files changed, 125 insertions(+), 142 deletions(-) delete mode 100644 data/Dockerfile-fetch-data delete mode 100644 data/src/fetch_and_save_data.py diff --git a/data/Dockerfile-fetch-data b/data/Dockerfile-fetch-data deleted file mode 100644 index ba0e1b75..00000000 --- a/data/Dockerfile-fetch-data +++ /dev/null @@ -1,14 +0,0 @@ -# Use the existing main container image -FROM vacant-lots-proj:latest - -# Set the working directory in the container -WORKDIR /usr/src/app - -# Copy the fetch_and_save_data.py script to the container -COPY src/fetch_and_save_data.py ./ - -# Ensure Pipenv and dependencies are installed -RUN pipenv install --ignore-pipfile --dev - -# Set the default command to run the script with pipenv and python -ENTRYPOINT ["sh", "-c", "pipenv run python ./fetch_and_save_data.py \"$@\"", "--"] diff --git a/data/docker-compose.yml b/data/docker-compose.yml index 562478d1..6592a03e 100644 --- a/data/docker-compose.yml +++ b/data/docker-compose.yml @@ -74,19 +74,5 @@ services: extra_hosts: - host.docker.internal:host-gateway - data-fetcher: - build: - context: . - dockerfile: Dockerfile-fetch-data - depends_on: - - vacant-lots-proj - environment: - - VACANT_LOTS_DB - volumes: - - ./src:/usr/src/app - - /etc/timezone:/etc/timezone:ro - - /etc/localtime:/etc/localtime:ro - network_mode: 'host' - volumes: database_volume: diff --git a/data/src/classes/featurelayer.py b/data/src/classes/featurelayer.py index 6409ddee..a30bcde9 100644 --- a/data/src/classes/featurelayer.py +++ b/data/src/classes/featurelayer.py @@ -284,15 +284,25 @@ def create_centroid_gdf(self): self.centroid_gdf = self.gdf.copy() self.centroid_gdf.loc[:, "geometry"] = self.gdf["geometry"].centroid - def build_and_publish_pmtiles(self, tileset_id): - zoom_threshold = 13 + def build_and_publish(self, tiles_file_id_prefix: str) -> None: + """ + Builds PMTiles and a Parquet file from a GeoDataFrame and publishes them to Google Cloud Storage. + + Args: + tiles_file_id_prefix (str): The ID prefix used for naming the PMTiles and Parquet files, coming from config. + + Raises: + ValueError: Raised if the generated PMTiles file is smaller than the minimum allowed size, indicating a potential corruption or incomplete file. + """ + zoom_threshold: int = 13 # Export the GeoDataFrame to a temporary GeoJSON file - temp_geojson_points = f"tmp/temp_{tileset_id}_points.geojson" - temp_geojson_polygons = f"tmp/temp_{tileset_id}_polygons.geojson" - temp_pmtiles_points = f"tmp/temp_{tileset_id}_points.pmtiles" - temp_pmtiles_polygons = f"tmp/temp_{tileset_id}_polygons.pmtiles" - temp_merged_pmtiles = f"tmp/temp_{tileset_id}_merged.pmtiles" + temp_geojson_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.geojson" + temp_geojson_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.geojson" + temp_pmtiles_points: str = f"tmp/temp_{tiles_file_id_prefix}_points.pmtiles" + temp_pmtiles_polygons: str = f"tmp/temp_{tiles_file_id_prefix}_polygons.pmtiles" + temp_merged_pmtiles: str = f"tmp/temp_{tiles_file_id_prefix}_merged.pmtiles" + temp_parquet: str = f"tmp/{tiles_file_id_prefix}.parquet" # Reproject gdf_wm = self.gdf.to_crs(epsg=4326) @@ -304,8 +314,36 @@ def build_and_publish_pmtiles(self, tileset_id): self.centroid_gdf = self.centroid_gdf.to_crs(epsg=4326) self.centroid_gdf.to_file(temp_geojson_points, driver="GeoJSON") + # Load the GeoJSON from the polygons, drop geometry, and save as Parquet + gdf_polygons = gpd.read_file(temp_geojson_polygons) + df_no_geom = gdf_polygons.drop(columns=["geometry"]) + + # Check if the DataFrame has fewer than 25,000 rows + num_rows, num_cols = df_no_geom.shape + if num_rows < 25000: + print( + f"Parquet file has {num_rows} rows, which is fewer than 25,000. Skipping upload." + ) + return + + # Save the DataFrame as Parquet + df_no_geom.to_parquet(temp_parquet) + + # Upload Parquet to Google Cloud Storage + blob_parquet = bucket.blob(f"{tiles_file_id_prefix}.parquet") + try: + blob_parquet.upload_from_filename(temp_parquet) + parquet_size = os.stat(temp_parquet).st_size + parquet_size_mb = parquet_size / (1024 * 1024) + print( + f"Parquet upload successful! Size: {parquet_size} bytes ({parquet_size_mb:.2f} MB), Dimensions: {num_rows} rows, {num_cols} columns." + ) + except Exception as e: + print(f"Parquet upload failed: {e}") + return + # Command for generating PMTiles for points up to zoom level zoom_threshold - points_command = [ + points_command: list[str] = [ "tippecanoe", f"--output={temp_pmtiles_points}", f"--maximum-zoom={zoom_threshold}", @@ -320,7 +358,7 @@ def build_and_publish_pmtiles(self, tileset_id): ] # Command for generating PMTiles for polygons from zoom level zoom_threshold - polygons_command = [ + polygons_command: list[str] = [ "tippecanoe", f"--output={temp_pmtiles_polygons}", f"--minimum-zoom={zoom_threshold}", @@ -334,7 +372,7 @@ def build_and_publish_pmtiles(self, tileset_id): ] # Command for merging the two PMTiles files into a single output file - merge_command = [ + merge_command: list[str] = [ "tile-join", f"--output={temp_merged_pmtiles}", "--no-tile-size-limit", @@ -347,20 +385,23 @@ def build_and_publish_pmtiles(self, tileset_id): for command in [points_command, polygons_command, merge_command]: subprocess.run(command) - write_files = [f"{tileset_id}_staging.pmtiles"] + write_files: list[str] = [f"{tiles_file_id_prefix}_staging.pmtiles"] if write_production_tiles_file: - write_files.append(f"{tileset_id}.pmtiles") + write_files.append(f"{tiles_file_id_prefix}.pmtiles") - # check whether the temp saved tiles files is big enough. - # If not then it might be corrupted so log error and don't upload to gcp. - file_size = os.stat(temp_merged_pmtiles).st_size + # Check whether the temp saved tiles files is big enough. + file_size: int = os.stat(temp_merged_pmtiles).st_size if file_size < min_tiles_file_size_in_bytes: raise ValueError( - f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete." + f"{temp_merged_pmtiles} is {file_size} bytes in size but should be at least {min_tiles_file_size_in_bytes}. Therefore, we are not uploading any files to the GCP bucket. The file may be corrupt or incomplete." ) - # Upload to Google Cloud Storage + # Upload PMTiles to Google Cloud Storage for file in write_files: blob = bucket.blob(file) - blob.upload_from_filename(temp_merged_pmtiles) + try: + blob.upload_from_filename(temp_merged_pmtiles) + print(f"PMTiles upload successful for {file}!") + except Exception as e: + print(f"PMTiles upload failed for {file}: {e}") diff --git a/data/src/data_utils/park_priority.py b/data/src/data_utils/park_priority.py index 953ed5b1..7a97fb3b 100644 --- a/data/src/data_utils/park_priority.py +++ b/data/src/data_utils/park_priority.py @@ -9,6 +9,7 @@ from classes.featurelayer import FeatureLayer from config.config import USE_CRS from tqdm import tqdm +import pyogrio def get_latest_shapefile_url() -> str: @@ -32,17 +33,61 @@ def get_latest_shapefile_url() -> str: raise ValueError("Shapefile link not found on the page") +def download_and_process_shapefile( + geojson_path: str, park_url: str, target_files: List[str], file_name_prefix: str +) -> gpd.GeoDataFrame: + """ + Downloads and processes the shapefile to create a GeoDataFrame for Philadelphia parks. + + Args: + geojson_path (str): Path to save the GeoJSON file. + park_url (str): URL to download the shapefile. + target_files (List[str]): List of files to extract from the shapefile. + file_name_prefix (str): Prefix for the file names to be extracted. + + Returns: + gpd.GeoDataFrame: GeoDataFrame containing the processed park data. + """ + print("Downloading and processing park priority data...") + response: requests.Response = requests.get(park_url, stream=True) + total_size: int = int(response.headers.get("content-length", 0)) + + with tqdm( + total=total_size, unit="iB", unit_scale=True, desc="Downloading" + ) as progress_bar: + buffer: BytesIO = BytesIO() + for data in response.iter_content(1024): + size: int = buffer.write(data) + progress_bar.update(size) + + with zipfile.ZipFile(buffer) as zip_ref: + for file_name in tqdm(target_files, desc="Extracting"): + zip_ref.extract(file_name, "tmp/") + + print("Processing shapefile...") + pa_parks: gpd.GeoDataFrame = gpd.read_file( + "tmp/" + file_name_prefix + "_ParkPriorityAreas.shp" + ) + pa_parks = pa_parks.to_crs(USE_CRS) + + phl_parks: gpd.GeoDataFrame = pa_parks[pa_parks["ID"].str.startswith("42101")] + phl_parks = phl_parks.loc[:, ["ParkNeed", "geometry"]] + + if isinstance(phl_parks, gpd.GeoDataFrame): + phl_parks.rename(columns={"ParkNeed": "park_priority"}, inplace=True) + else: + raise TypeError("Expected a GeoDataFrame, got Series or another type instead") + + print(f"Writing filtered data to GeoJSON: {geojson_path}") + phl_parks.to_file(geojson_path, driver="GeoJSON") + + return phl_parks + + def park_priority(primary_featurelayer: FeatureLayer) -> FeatureLayer: """ Downloads and processes park priority data, then joins it with the primary feature layer. - This function performs the following steps: - 1. Downloads the latest shapefile from TPL. - 2. Extracts relevant files from the downloaded zip. - 3. Processes the shapefile, filtering for Philadelphia County. - 4. Creates a GeoJSON file with the processed data. - 5. Joins the processed data with the primary feature layer. - Args: primary_featurelayer (FeatureLayer): The primary feature layer to join with park priority data. @@ -66,47 +111,22 @@ def park_priority(primary_featurelayer: FeatureLayer) -> FeatureLayer: os.makedirs("tmp/", exist_ok=True) - if not os.path.exists(geojson_path): - print("Downloading and processing park priority data...") - response: requests.Response = requests.get(park_url, stream=True) - total_size: int = int(response.headers.get("content-length", 0)) - - with tqdm( - total=total_size, unit="iB", unit_scale=True, desc="Downloading" - ) as progress_bar: - buffer: BytesIO = BytesIO() - for data in response.iter_content(1024): - size: int = buffer.write(data) - progress_bar.update(size) - - with zipfile.ZipFile(buffer) as zip_ref: - for file_name in tqdm(target_files, desc="Extracting"): - zip_ref.extract(file_name, "tmp/") - - print("Processing shapefile...") - pa_parks: gpd.GeoDataFrame = gpd.read_file( - "tmp/" + file_name_prefix + "_ParkPriorityAreas.shp" - ) - pa_parks = pa_parks.to_crs(USE_CRS) - - phl_parks: gpd.GeoDataFrame = pa_parks[pa_parks["ID"].str.startswith("42101")] - phl_parks = phl_parks.loc[:, ["ParkNeed", "geometry"]] - - # Print the type of phl_parks for debugging - print(f"Type of phl_parks: {type(phl_parks)}") - - if isinstance(phl_parks, gpd.GeoDataFrame): - phl_parks.rename(columns={"ParkNeed": "park_priority"}, inplace=True) + try: + if os.path.exists(geojson_path): + print(f"GeoJSON file already exists, loading from {geojson_path}") + phl_parks: gpd.GeoDataFrame = gpd.read_file(geojson_path) else: - raise TypeError( - "Expected a GeoDataFrame, got Series or another type instead" + raise pyogrio.errors.DataSourceError( + "GeoJSON file missing, forcing download." ) - print(f"Writing filtered data to GeoJSON: {geojson_path}") - phl_parks.to_file(geojson_path, driver="GeoJSON") - else: - print(f"GeoJSON file already exists, loading from {geojson_path}") - phl_parks: gpd.GeoDataFrame = gpd.read_file(geojson_path) + except (pyogrio.errors.DataSourceError, ValueError) as e: + print(f"Error loading GeoJSON: {e}. Re-downloading and processing shapefile.") + if os.path.exists(geojson_path): + os.remove(geojson_path) # Delete the corrupted GeoJSON if it exists + phl_parks = download_and_process_shapefile( + geojson_path, park_url, target_files, file_name_prefix + ) park_priority_layer: FeatureLayer = FeatureLayer("Park Priority") park_priority_layer.gdf = phl_parks diff --git a/data/src/fetch_and_save_data.py b/data/src/fetch_and_save_data.py deleted file mode 100644 index be77c7d1..00000000 --- a/data/src/fetch_and_save_data.py +++ /dev/null @@ -1,50 +0,0 @@ -import argparse -import os -import zipfile - -import geopandas as gpd -from sqlalchemy import create_engine - - -def fetch_data(connection_string, sql_query, geom_col): - engine = create_engine(connection_string) - gdf = gpd.read_postgis(sql=sql_query, con=engine, geom_col=geom_col) - return gdf - - -def save_file(gdf, filename, zipped): - os.makedirs(os.path.dirname(filename), exist_ok=True) - - if filename.endswith(".parquet"): - gdf.to_parquet(filename) - else: - temp_filename = filename - gdf.to_file(temp_filename) - - if zipped: - zip_filename = f"{os.path.splitext(filename)[0]}.zip" - with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf: - zipf.write(temp_filename, os.path.basename(temp_filename)) - os.remove(temp_filename) - - -def main(output_filename, zipped): - connection_string = os.getenv("VACANT_LOTS_DB") - sql_query = "SELECT * FROM vacant_properties_end;" - geom_col = "geometry" - gdf = fetch_data(connection_string, sql_query, geom_col) - save_file(gdf, output_filename, zipped) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Fetch data from PostGIS and save as a file." - ) - parser.add_argument( - "output_filename", type=str, help="The output filename for the data." - ) - parser.add_argument( - "--zipped", action="store_true", help="Whether to zip the output file." - ) - args = parser.parse_args() - main(args.output_filename, args.zipped) diff --git a/data/src/script.py b/data/src/script.py index 32b74b89..763de925 100644 --- a/data/src/script.py +++ b/data/src/script.py @@ -99,7 +99,7 @@ conn.commit() # Post to Mapbox -dataset.build_and_publish_pmtiles(tiles_file_id_prefix) +dataset.build_and_publish(tiles_file_id_prefix) # if we are reloading, run the diff report, then archive the backup and finally prune old archives if FORCE_RELOAD: