From 9fa9e7a45639a0e7e93ac79cc65b8101a430bae6 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 12:05:50 -0400 Subject: [PATCH 1/9] add google cloud project to docker-compose environment --- data/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/data/docker-compose.yml b/data/docker-compose.yml index 6592a03e..5d797d9e 100644 --- a/data/docker-compose.yml +++ b/data/docker-compose.yml @@ -11,6 +11,7 @@ services: - CLEAN_GREEN_GOOGLE_KEY - PYTHONUNBUFFERED=1 - GOOGLE_CLOUD_BUCKET_NAME + - GOOGLE_CLOUD_PROJECT - CAGP_SLACK_API_TOKEN volumes: - ./src:/usr/src/app From 58b94cb6e47ea28bcc3ab1c265434ecc1bdc96dd Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 12:06:14 -0400 Subject: [PATCH 2/9] add backup path for june 2024 vacant lots data --- data/src/data_utils/vacant_properties.py | 106 ++++++++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-) diff --git a/data/src/data_utils/vacant_properties.py b/data/src/data_utils/vacant_properties.py index b84f2f44..a636840a 100644 --- a/data/src/data_utils/vacant_properties.py +++ b/data/src/data_utils/vacant_properties.py @@ -1,8 +1,65 @@ -from classes.featurelayer import FeatureLayer +from classes.featurelayer import FeatureLayer, google_cloud_bucket from constants.services import VACANT_PROPS_LAYERS_TO_LOAD +import geopandas as gpd +from io import BytesIO + +import pandas as pd + + +def load_backup_data_from_gcs(file_name: str) -> gpd.GeoDataFrame: + """ + Load backup data from Google Cloud Storage (GCS) and return it as a GeoDataFrame. + + Args: + file_name (str): The name of the file in GCS to load. + + Returns: + gpd.GeoDataFrame: The loaded GeoDataFrame from the backup. + """ + # Get the bucket using the google_cloud_bucket function + bucket = google_cloud_bucket() + + # Ensure the file name is correctly passed to the blob + blob = bucket.blob(file_name) + if not blob.exists(): + raise FileNotFoundError(f"File {file_name} not found in the GCS bucket.") + + # Download the file from GCS + file_bytes = blob.download_as_bytes() + + # Load the contents into a GeoDataFrame + gdf = gpd.read_file(BytesIO(file_bytes)) + print(f"Loaded backup data from GCS: {len(gdf)} rows.") + + # Rename the backup data columns to match the original dataset's format + gdf = gdf.rename( + columns={ + "ADDRESS": "address", + "OWNER1": "owner_1", + "OWNER2": "owner_2", + "BLDG_DESC": "building_description", + "CouncilDistrict": "council_district", + "ZoningBaseDistrict": "zoning_base_district", + "ZipCode": "zipcode", + "OPA_ID": "opa_id", + } + ) + + return gdf + + +def vacant_properties() -> FeatureLayer: + """ + Load vacant properties data from Esri or backup from Google Cloud Storage (GCS) if the row count for vacant land is below 20,000. + + The function initializes a `FeatureLayer` for vacant properties, checks if the vacant land data from Esri contains fewer than 20,000 rows, + and if so, loads backup data from GCS. The function also renames certain columns for consistency. + + Returns: + FeatureLayer: The `FeatureLayer` object containing the vacant properties data. + """ -def vacant_properties(): vacant_properties = FeatureLayer( name="Vacant Properties", esri_rest_urls=VACANT_PROPS_LAYERS_TO_LOAD, @@ -16,11 +73,56 @@ def vacant_properties(): "ZIPCODE", "OPA_ID", "parcel_type", + ], + ) + + # Check for vacant land specifically + vacant_land_gdf = vacant_properties.gdf[ + vacant_properties.gdf["parcel_type"] == "Land" + ] + + # Print the size of the vacant land dataset + print(f"Vacant land data size: {len(vacant_land_gdf)} rows.") + + # If the number of rows is less than 20,000, load backup data from GCS + if len(vacant_land_gdf) < 20000: + print("Vacant land data is below the threshold. Loading backup data from GCS.") + backup_gdf = load_backup_data_from_gcs("vacant_indicators_land_06_2024.geojson") + + # Ensure columns match between the backup and the original dataset + common_columns = vacant_properties.gdf.columns.intersection(backup_gdf.columns) + + # Align the backup data with the original dataset's columns + backup_gdf = backup_gdf[common_columns] + + # Add the parcel_type column to the backup data and set it to "Land" + backup_gdf["parcel_type"] = "Land" + + # Drop the old land data from the main dataframe + vacant_properties.gdf = vacant_properties.gdf[ + vacant_properties.gdf["parcel_type"] != "Land" ] + + # Concatenate the backup data with the existing data + print(f"Appending backup data ({len(backup_gdf)} rows) to the existing data.") + vacant_properties.gdf = pd.concat( + [vacant_properties.gdf, backup_gdf], ignore_index=True + ) + + # Print the size of the dataset before dropping NAs + print( + f"Vacant properties data size before dropping NAs: {len(vacant_properties.gdf)} rows." ) + # Drop rows with missing 'opa_id' values vacant_properties.gdf.dropna(subset=["opa_id"], inplace=True) + # Print the size of the dataset after dropping NAs + print( + f"Vacant properties data size after dropping NAs: {len(vacant_properties.gdf)} rows." + ) + + # Rename columns for consistency (this ensures the existing data also matches) vacant_properties.gdf = vacant_properties.gdf.rename( columns={ "owner1": "owner_1", From 54c09131f452db4edecde16b43dc71bfd41e8535 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 12:07:05 -0400 Subject: [PATCH 3/9] formatting and adding project name explicitly --- data/src/classes/featurelayer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data/src/classes/featurelayer.py b/data/src/classes/featurelayer.py index a30bcde9..35251706 100644 --- a/data/src/classes/featurelayer.py +++ b/data/src/classes/featurelayer.py @@ -36,8 +36,9 @@ def google_cloud_bucket() -> Bucket: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path bucket_name = os.getenv("GOOGLE_CLOUD_BUCKET_NAME", "cleanandgreenphl") + project_name = os.getenv("GOOGLE_CLOUD_PROJECT", "clean-and-green-philly") - storage_client = storage.Client(project="clean-and-green-philly") + storage_client = storage.Client(project=project_name) return storage_client.bucket(bucket_name) From 7e5da24eaf982c6bd07f579e8181dceaffa32d14 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 15:07:03 -0400 Subject: [PATCH 4/9] remove column name check --- data/src/script.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/data/src/script.py b/data/src/script.py index c5f3a8aa..46e1db3b 100644 --- a/data/src/script.py +++ b/data/src/script.py @@ -76,8 +76,6 @@ # Load Vacant Property Data dataset = vacant_properties() -print("Initial vacant properties dataset columns:", dataset.gdf.columns) - # Load and join other datasets for service in services: dataset = service(dataset) From 82038206281c11b25f968f169e30567d524cc715 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 16:07:37 -0400 Subject: [PATCH 5/9] update formatting --- data/src/data_utils/access_process.py | 1 + 1 file changed, 1 insertion(+) diff --git a/data/src/data_utils/access_process.py b/data/src/data_utils/access_process.py index c888f5cc..039843f1 100644 --- a/data/src/data_utils/access_process.py +++ b/data/src/data_utils/access_process.py @@ -1,5 +1,6 @@ from typing import Any + def access_process(dataset: Any) -> Any: """ Process a dataset to determine the access process for each property based on From 54280c26ab817782ee664c3d6c16d963d2bbf532 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 16:08:21 -0400 Subject: [PATCH 6/9] abstract kde function to generic; add parallel processing; refine resolution --- data/src/data_utils/drug_crimes.py | 123 +--------------------- data/src/data_utils/gun_crimes.py | 121 +--------------------- data/src/data_utils/kde.py | 157 +++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+), 237 deletions(-) create mode 100644 data/src/data_utils/kde.py diff --git a/data/src/data_utils/drug_crimes.py b/data/src/data_utils/drug_crimes.py index 1602c290..61a4a43c 100644 --- a/data/src/data_utils/drug_crimes.py +++ b/data/src/data_utils/drug_crimes.py @@ -1,125 +1,10 @@ -import mapclassify -import matplotlib.pyplot as plt -import numpy as np -import rasterio -from awkde.awkde import GaussianKDE -from classes.featurelayer import FeatureLayer -from config.config import USE_CRS from constants.services import DRUGCRIME_SQL_QUERY -from rasterio.transform import Affine -def drug_crimes(primary_featurelayer): - # Initialize gun_crimes object - drug_crimes = FeatureLayer( - name="Drug Crimes", carto_sql_queries=DRUGCRIME_SQL_QUERY - ) - - # Extract x, y coordinates from geometry - x = np.array([]) - y = np.array([]) - - for geom in drug_crimes.gdf.geometry: - coords = np.array(geom.xy) - x = np.concatenate([x, coords[0]]) - y = np.concatenate([y, coords[1]]) - - # Prepare data for KDE - X = np.array(list(zip(x, y))) - - # Generate grid for plotting - grid_length = 2500 - - x_grid, y_grid = ( - np.linspace(x.min(), x.max(), grid_length), - np.linspace(y.min(), y.max(), grid_length), - ) - xx, yy = np.meshgrid(x_grid, y_grid) - grid_points = np.array([xx.ravel(), yy.ravel()]).T - - # Compute adaptive KDE values - print("fitting KDE for drug crime data") - kde = GaussianKDE(glob_bw=0.1, alpha=0.999, diag_cov=True) - kde.fit(X) - - z = kde.predict(grid_points) - zz = z.reshape(xx.shape) - - # Calculate resolutions and min values - x_res = (x.max() - x.min()) / (len(x_grid) - 1) - y_res = (y.max() - y.min()) / (len(y_grid) - 1) - min_x, min_y = x.min(), y.min() - - # Save the plot in tmp folder - plt.pcolormesh(xx, yy, zz) - plt.scatter(x, y, c="red", s=0.005) - plt.colorbar() - plt.savefig("tmp/kde.png") - - # Define the affine transform - transform = Affine.translation(min_x, min_y) * Affine.scale(x_res, y_res) - - # Export as raster - with rasterio.open( - "tmp/drug_crimes.tif", - "w", - driver="GTiff", - height=zz.shape[0], - width=zz.shape[1], - count=1, - dtype=zz.dtype, - crs=USE_CRS, - transform=transform, - ) as dst: - dst.write(zz, 1) +from data_utils.kde import apply_kde_to_primary - primary_featurelayer.gdf["centroid"] = primary_featurelayer.gdf.geometry.centroid - coord_list = [ - (x, y) - for x, y in zip( - primary_featurelayer.gdf["centroid"].x, - primary_featurelayer.gdf["centroid"].y, - ) - ] - - primary_featurelayer.gdf = primary_featurelayer.gdf.drop(columns=["centroid"]) - - src = rasterio.open("tmp/drug_crimes.tif") - sampled_values = [x[0] for x in src.sample(coord_list)] - - primary_featurelayer.gdf["drugcrime_density"] = sampled_values - - percentile_breaks = list(range(101)) # [0, 1, 2, ..., 100] - - drugcrime_classifier = mapclassify.Percentiles( - primary_featurelayer.gdf["drugcrime_density"], pct=percentile_breaks - ) - - primary_featurelayer.gdf["drugcrime_density_percentile"] = primary_featurelayer.gdf[ - "drugcrime_density" - ].apply(drugcrime_classifier) - - def label_percentile(value): - if value == 1: - return "1st Percentile" - elif value == 2: - return "2nd Percentile" - elif value == 3: - return "3rd Percentile" - else: - return f"{value}th Percentile" - - primary_featurelayer.gdf["drugcrime_density_label"] = primary_featurelayer.gdf[ - "drugcrime_density_percentile" - ].apply(label_percentile) - - primary_featurelayer.gdf["drugcrime_density_percentile"] = primary_featurelayer.gdf[ - "drugcrime_density_percentile" - ].astype(float) - - primary_featurelayer.gdf = primary_featurelayer.gdf.drop( - columns=["drugcrime_density"] +def drug_crimes(primary_featurelayer): + return apply_kde_to_primary( + primary_featurelayer, "Drug Crimes", DRUGCRIME_SQL_QUERY ) - - return primary_featurelayer diff --git a/data/src/data_utils/gun_crimes.py b/data/src/data_utils/gun_crimes.py index 1180cdc1..27155546 100644 --- a/data/src/data_utils/gun_crimes.py +++ b/data/src/data_utils/gun_crimes.py @@ -1,123 +1,8 @@ -import mapclassify -import matplotlib.pyplot as plt -import numpy as np -import rasterio -from awkde.awkde import GaussianKDE -from classes.featurelayer import FeatureLayer -from config.config import USE_CRS from constants.services import GUNCRIME_SQL_QUERY -from rasterio.transform import Affine -def gun_crimes(primary_featurelayer): - # Initialize gun_crimes object - gun_crimes = FeatureLayer(name="Gun Crimes", carto_sql_queries=GUNCRIME_SQL_QUERY) - - # Extract x, y coordinates from geometry - x = np.array([]) - y = np.array([]) - - for geom in gun_crimes.gdf.geometry: - coords = np.array(geom.xy) - x = np.concatenate([x, coords[0]]) - y = np.concatenate([y, coords[1]]) - - # Prepare data for KDE - X = np.array(list(zip(x, y))) - - # Generate grid for plotting - grid_length = 2500 - - x_grid, y_grid = ( - np.linspace(x.min(), x.max(), grid_length), - np.linspace(y.min(), y.max(), grid_length), - ) - xx, yy = np.meshgrid(x_grid, y_grid) - grid_points = np.array([xx.ravel(), yy.ravel()]).T - - # Compute adaptive KDE values - print("fitting KDE for gun crime data") - kde = GaussianKDE(glob_bw=0.1, alpha=0.999, diag_cov=True) - kde.fit(X) - - z = kde.predict(grid_points) - zz = z.reshape(xx.shape) - - # Calculate resolutions and min values - x_res = (x.max() - x.min()) / (len(x_grid) - 1) - y_res = (y.max() - y.min()) / (len(y_grid) - 1) - min_x, min_y = x.min(), y.min() - - # Save the plot in tmp folder - plt.pcolormesh(xx, yy, zz) - plt.scatter(x, y, c="red", s=0.005) - plt.colorbar() - plt.savefig("tmp/kde.png") - - # Define the affine transform - transform = Affine.translation(min_x, min_y) * Affine.scale(x_res, y_res) - - # Export as raster - with rasterio.open( - "tmp/gun_crimes.tif", - "w", - driver="GTiff", - height=zz.shape[0], - width=zz.shape[1], - count=1, - dtype=zz.dtype, - crs=USE_CRS, - transform=transform, - ) as dst: - dst.write(zz, 1) +from data_utils.kde import apply_kde_to_primary - primary_featurelayer.gdf["centroid"] = primary_featurelayer.gdf.geometry.centroid - coord_list = [ - (x, y) - for x, y in zip( - primary_featurelayer.gdf["centroid"].x, - primary_featurelayer.gdf["centroid"].y, - ) - ] - - primary_featurelayer.gdf = primary_featurelayer.gdf.drop(columns=["centroid"]) - - src = rasterio.open("tmp/gun_crimes.tif") - sampled_values = [x[0] for x in src.sample(coord_list)] - - primary_featurelayer.gdf["guncrime_density"] = sampled_values - - percentile_breaks = list(range(101)) # [0, 1, 2, ..., 100] - - guncrime_classifier = mapclassify.Percentiles( - primary_featurelayer.gdf["guncrime_density"], pct=percentile_breaks - ) - - primary_featurelayer.gdf["guncrime_density_percentile"] = primary_featurelayer.gdf[ - "guncrime_density" - ].apply(guncrime_classifier) - - def label_percentile(value): - if value == 1: - return "1st Percentile" - elif value == 2: - return "2nd Percentile" - elif value == 3: - return "3rd Percentile" - else: - return f"{value}th Percentile" - - primary_featurelayer.gdf["guncrime_density_label"] = primary_featurelayer.gdf[ - "guncrime_density_percentile" - ].apply(label_percentile) - - primary_featurelayer.gdf["guncrime_density_percentile"] = primary_featurelayer.gdf[ - "guncrime_density_percentile" - ].astype(float) - - primary_featurelayer.gdf = primary_featurelayer.gdf.drop( - columns=["guncrime_density"] - ) - - return primary_featurelayer +def gun_crimes(primary_featurelayer): + return apply_kde_to_primary(primary_featurelayer, "Gun Crimes", GUNCRIME_SQL_QUERY) diff --git a/data/src/data_utils/kde.py b/data/src/data_utils/kde.py new file mode 100644 index 00000000..8741dacc --- /dev/null +++ b/data/src/data_utils/kde.py @@ -0,0 +1,157 @@ +import numpy as np +import rasterio +from awkde.awkde import GaussianKDE +from classes.featurelayer import FeatureLayer +from config.config import USE_CRS +from rasterio.transform import Affine +from tqdm import tqdm +from concurrent.futures import ProcessPoolExecutor, as_completed + +import mapclassify + +resolution = 1320 # 0.25 miles (in feet, bc the CRS is 2272) +batch_size = 100000 + + +def kde_predict_chunk(kde, chunk): + """Helper function to predict KDE for a chunk of grid points.""" + return kde.predict(chunk) + + +def generic_kde(name, query, resolution=resolution, batch_size=batch_size): + print(f"Initializing FeatureLayer for {name}") + + feature_layer = FeatureLayer(name=name, carto_sql_queries=query) + + coords = np.array([geom.xy for geom in feature_layer.gdf.geometry]) + x, y = coords[:, 0, :].flatten(), coords[:, 1, :].flatten() + + X = np.column_stack((x, y)) + + x_grid, y_grid = ( + np.linspace(x.min(), x.max(), resolution), + np.linspace(y.min(), y.max(), resolution), + ) + xx, yy = np.meshgrid(x_grid, y_grid) + grid_points = np.column_stack((xx.ravel(), yy.ravel())) + + print(f"Fitting KDE for {name} data") + kde = GaussianKDE(glob_bw=0.1, alpha=0.999, diag_cov=True) + kde.fit(X) + + print(f"Predicting KDE values for grid of size {grid_points.shape}") + + # Split grid points into chunks + chunks = [ + grid_points[i : i + batch_size] for i in range(0, len(grid_points), batch_size) + ] + + # Run predictions in parallel + z = np.zeros(len(grid_points)) # Placeholder for predicted values + + with ProcessPoolExecutor() as executor: + # Submit the tasks first, wrapped with tqdm to monitor as they're submitted + futures = { + executor.submit(kde_predict_chunk, kde, chunk): i + for i, chunk in enumerate(tqdm(chunks, desc="Submitting tasks")) + } + + # Now wrap the as_completed with tqdm for progress tracking + for future in tqdm( + as_completed(futures), total=len(futures), desc="Processing tasks" + ): + i = futures[future] + z[i * batch_size : (i + 1) * batch_size] = future.result() + + zz = z.reshape(xx.shape) + + x_res, y_res = ( + (x.max() - x.min()) / (resolution - 1), + (y.max() - y.min()) / (resolution - 1), + ) + min_x, min_y = x.min(), y.min() + + transform = Affine.translation(min_x, min_y) * Affine.scale(x_res, y_res) + + raster_filename = f"tmp/{name.lower().replace(' ', '_')}.tif" + print(f"Saving raster to {raster_filename}") + + with rasterio.open( + raster_filename, + "w", + driver="GTiff", + height=zz.shape[0], + width=zz.shape[1], + count=1, + dtype=zz.dtype, + crs=USE_CRS, + transform=transform, + ) as dst: + dst.write(zz, 1) + + return raster_filename, X + + +def apply_kde_to_primary(primary_featurelayer, name, query, resolution=resolution): + # Generate KDE and raster file + raster_filename, crime_coords = generic_kde(name, query, resolution) + + # Add centroid column temporarily + primary_featurelayer.gdf["centroid"] = primary_featurelayer.gdf.geometry.centroid + + # Create list of (x, y) coordinates for centroids + coord_list = [ + (x, y) + for x, y in zip( + primary_featurelayer.gdf["centroid"].x, + primary_featurelayer.gdf["centroid"].y, + ) + ] + + # Remove the temporary centroid column + primary_featurelayer.gdf = primary_featurelayer.gdf.drop(columns=["centroid"]) + + # Open the generated raster file and sample the KDE density values at the centroids + with rasterio.open(raster_filename) as src: + sampled_values = [x[0] for x in src.sample(coord_list)] + + # Create a column for the density values + density_column = f"{name.lower().replace(' ', '_')}_density" + primary_featurelayer.gdf[density_column] = sampled_values + + # Calculate percentiles using mapclassify.Percentiles + percentile_breaks = list(range(101)) # Percentile breaks from 0 to 100 + classifier = mapclassify.Percentiles( + primary_featurelayer.gdf[density_column], pct=percentile_breaks + ) + + # Assign the percentile bins to the density values + primary_featurelayer.gdf[density_column + "_percentile"] = ( + classifier.yb + ) # yb gives the bin index + + # Apply percentile labels (e.g., 1st Percentile, 2nd Percentile, etc.) + primary_featurelayer.gdf[density_column + "_label"] = primary_featurelayer.gdf[ + density_column + "_percentile" + ].apply(label_percentile) + + # Convert the percentile column to float and drop the density column + primary_featurelayer.gdf[density_column + "_percentile"] = primary_featurelayer.gdf[ + density_column + "_percentile" + ].astype(float) + + primary_featurelayer.gdf = primary_featurelayer.gdf.drop(columns=[density_column]) + + print(f"Finished processing {name}") + return primary_featurelayer + + +def label_percentile(value): + if value == 1: + return "1st Percentile" + elif value == 2: + return "2nd Percentile" + elif value == 3: + return "3rd Percentile" + else: + return f"{value}th Percentile" From 9c582b1063c9f3096574d9a012ce2dbe53fbf514 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 16:08:40 -0400 Subject: [PATCH 7/9] update to reflect new syntax --- data/src/data_utils/priority_level.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/src/data_utils/priority_level.py b/data/src/data_utils/priority_level.py index 4a3ad0ad..e3ade816 100644 --- a/data/src/data_utils/priority_level.py +++ b/data/src/data_utils/priority_level.py @@ -4,7 +4,7 @@ def priority_level(dataset): priority_level = "" # Decision Points - guncrime_density_percentile = row["guncrime_density_percentile"] + guncrime_density_percentile = row["gun_crimes_density_percentile"] in_phs_landcare = row["phs_partner_agency"] == "PHS" has_li_complaint_or_violation = ( row["li_complaints"] is not None From fefa220751752b43b5f1587867a13ecbb86ca71e Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 16:09:06 -0400 Subject: [PATCH 8/9] fix warning message related to data downcasting --- data/src/data_utils/rco_geoms.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/data/src/data_utils/rco_geoms.py b/data/src/data_utils/rco_geoms.py index 8ad3b36f..6aa3dca6 100644 --- a/data/src/data_utils/rco_geoms.py +++ b/data/src/data_utils/rco_geoms.py @@ -1,5 +1,8 @@ from classes.featurelayer import FeatureLayer from constants.services import RCOS_LAYERS_TO_LOAD +import pandas as pd + +pd.set_option("future.no_silent_downcasting", True) def rco_geoms(primary_featurelayer): From 5c42627687e44ea2355ae266d7896c293f402441 Mon Sep 17 00:00:00 2001 From: nlebovits Date: Fri, 18 Oct 2024 16:09:49 -0400 Subject: [PATCH 9/9] small logging changes --- data/src/data_utils/vacant_properties.py | 141 +++++++++++++++-------- 1 file changed, 96 insertions(+), 45 deletions(-) diff --git a/data/src/data_utils/vacant_properties.py b/data/src/data_utils/vacant_properties.py index a636840a..87a8b6f7 100644 --- a/data/src/data_utils/vacant_properties.py +++ b/data/src/data_utils/vacant_properties.py @@ -1,38 +1,27 @@ from classes.featurelayer import FeatureLayer, google_cloud_bucket from constants.services import VACANT_PROPS_LAYERS_TO_LOAD import geopandas as gpd - +from config.config import USE_CRS from io import BytesIO import pandas as pd def load_backup_data_from_gcs(file_name: str) -> gpd.GeoDataFrame: - """ - Load backup data from Google Cloud Storage (GCS) and return it as a GeoDataFrame. - - Args: - file_name (str): The name of the file in GCS to load. - - Returns: - gpd.GeoDataFrame: The loaded GeoDataFrame from the backup. - """ - # Get the bucket using the google_cloud_bucket function bucket = google_cloud_bucket() - - # Ensure the file name is correctly passed to the blob blob = bucket.blob(file_name) if not blob.exists(): raise FileNotFoundError(f"File {file_name} not found in the GCS bucket.") - # Download the file from GCS file_bytes = blob.download_as_bytes() + try: + gdf = gpd.read_file(BytesIO(file_bytes)) + except Exception as e: + raise ValueError(f"Error reading GeoJSON file: {e}") - # Load the contents into a GeoDataFrame - gdf = gpd.read_file(BytesIO(file_bytes)) - print(f"Loaded backup data from GCS: {len(gdf)} rows.") + print("Loaded backup data from GCS.") - # Rename the backup data columns to match the original dataset's format + # Ensure column names are consistent gdf = gdf.rename( columns={ "ADDRESS": "address", @@ -49,17 +38,17 @@ def load_backup_data_from_gcs(file_name: str) -> gpd.GeoDataFrame: return gdf -def vacant_properties() -> FeatureLayer: - """ - Load vacant properties data from Esri or backup from Google Cloud Storage (GCS) if the row count for vacant land is below 20,000. +def check_null_percentage(df: pd.DataFrame, threshold: float = 0.05): + """Checks if any column in the dataframe has more than the given threshold of null values.""" + null_percentages = df.isnull().mean() + for col, pct in null_percentages.items(): + if col not in ["owner1", "owner2"] and pct > threshold: + raise ValueError( + f"Column '{col}' has more than {threshold * 100}% null values ({pct * 100}%)." + ) - The function initializes a `FeatureLayer` for vacant properties, checks if the vacant land data from Esri contains fewer than 20,000 rows, - and if so, loads backup data from GCS. The function also renames certain columns for consistency. - - Returns: - FeatureLayer: The `FeatureLayer` object containing the vacant properties data. - """ +def vacant_properties() -> FeatureLayer: vacant_properties = FeatureLayer( name="Vacant Properties", esri_rest_urls=VACANT_PROPS_LAYERS_TO_LOAD, @@ -76,29 +65,71 @@ def vacant_properties() -> FeatureLayer: ], ) - # Check for vacant land specifically + # Rename columns for consistency in the original data + vacant_properties.gdf = vacant_properties.gdf.rename( + columns={ + "ADDRESS": "address", + "OWNER1": "owner_1", + "OWNER2": "owner_2", + "BLDG_DESC": "building_description", + "COUNCILDISTRICT": "council_district", + "ZONINGBASEDISTRICT": "zoning_base_district", + "ZIPCODE": "zipcode", + "OPA_ID": "opa_id", + } + ) + vacant_land_gdf = vacant_properties.gdf[ vacant_properties.gdf["parcel_type"] == "Land" ] - - # Print the size of the vacant land dataset print(f"Vacant land data size: {len(vacant_land_gdf)} rows.") - # If the number of rows is less than 20,000, load backup data from GCS if len(vacant_land_gdf) < 20000: print("Vacant land data is below the threshold. Loading backup data from GCS.") backup_gdf = load_backup_data_from_gcs("vacant_indicators_land_06_2024.geojson") - # Ensure columns match between the backup and the original dataset - common_columns = vacant_properties.gdf.columns.intersection(backup_gdf.columns) - - # Align the backup data with the original dataset's columns - backup_gdf = backup_gdf[common_columns] + # Ensure CRS is consistent with project-wide CRS (USE_CRS) + if backup_gdf.crs != USE_CRS: + print(f"Reprojecting backup data from {backup_gdf.crs} to {USE_CRS}") + backup_gdf = backup_gdf.to_crs(USE_CRS) + + # Ensure CRS is the same + if backup_gdf.crs != vacant_properties.gdf.crs: + backup_gdf = backup_gdf.to_crs(vacant_properties.gdf.crs) + + # Map backup dataset column names to match the original dataset + backup_gdf = backup_gdf.rename( + columns={ + "owner_1": "owner1", + "owner_2": "owner2", + "building_description": "bldg_desc", + "council_district": "councildistrict", + "zoning_base_district": "zoningbasedistrict", + } + ) - # Add the parcel_type column to the backup data and set it to "Land" + # Set parcel_type to "Land" for backup data backup_gdf["parcel_type"] = "Land" - # Drop the old land data from the main dataframe + # Select only the columns present in the original dataset + backup_gdf = backup_gdf[vacant_properties.gdf.columns] + + # Ensure all necessary columns are present in backup data + for col in vacant_properties.gdf.columns: + if col not in backup_gdf.columns: + backup_gdf[col] = None + + # Check for column mismatches between original and backup datasets + for col in vacant_properties.gdf.columns: + if vacant_properties.gdf[col].dtype != backup_gdf[col].dtype: + print( + f"Warning: Data type mismatch in column '{col}'. Original: {vacant_properties.gdf[col].dtype}, Backup: {backup_gdf[col].dtype}" + ) + + # Verify if backup data contains more than expected null values + check_null_percentage(backup_gdf) + + # Remove existing Land data vacant_properties.gdf = vacant_properties.gdf[ vacant_properties.gdf["parcel_type"] != "Land" ] @@ -109,28 +140,48 @@ def vacant_properties() -> FeatureLayer: [vacant_properties.gdf, backup_gdf], ignore_index=True ) - # Print the size of the dataset before dropping NAs + # Ensure concatenated data is still a GeoDataFrame + vacant_properties.gdf = gpd.GeoDataFrame( + vacant_properties.gdf, geometry="geometry" + ) + print( f"Vacant properties data size before dropping NAs: {len(vacant_properties.gdf)} rows." ) - - # Drop rows with missing 'opa_id' values vacant_properties.gdf.dropna(subset=["opa_id"], inplace=True) - - # Print the size of the dataset after dropping NAs print( f"Vacant properties data size after dropping NAs: {len(vacant_properties.gdf)} rows." ) - # Rename columns for consistency (this ensures the existing data also matches) + # Final null value check before returning + check_null_percentage(vacant_properties.gdf) + + # Final column renaming and selection vacant_properties.gdf = vacant_properties.gdf.rename( columns={ "owner1": "owner_1", "owner2": "owner_2", - "bldg_desc": "building_description", "councildistrict": "council_district", "zoningbasedistrict": "zoning_base_district", } ) + # Select only the final columns needed + final_columns = [ + "address", + "owner_1", + "owner_2", + "council_district", + "zoning_base_district", + "zipcode", + "opa_id", + "parcel_type", + "geometry", + ] + + vacant_properties.gdf = vacant_properties.gdf[final_columns] + + # Ensure concatenated data is still a GeoDataFrame + vacant_properties.gdf = gpd.GeoDataFrame(vacant_properties.gdf, geometry="geometry") + return vacant_properties