diff --git a/.gitignore b/.gitignore index 0f55d1bc..934cdf40 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ data.env data/src/tmp .DS_Store /data/src/local_outputs/ +/data/notebooks/ ## App diff --git a/data/src/classes/slack_error_reporter.py b/data/src/classes/slack_error_reporter.py new file mode 100644 index 00000000..1c443d4b --- /dev/null +++ b/data/src/classes/slack_error_reporter.py @@ -0,0 +1,16 @@ +import os +from slack_sdk import WebClient + + +def send_error_to_slack(error_message: str) -> None: + """Send error message to Slack.""" + token: str | None = os.getenv("CAGP_SLACK_API_TOKEN") # token can be None + if token: + client = WebClient(token=token) + client.chat_postMessage( + channel="clean-and-green-philly-back-end", # Replace with actual Slack channel ID + text=error_message, + username="Backend Error Reporter", + ) + else: + raise ValueError("Slack API token not found in environment variables.") diff --git a/data/src/script.py b/data/src/script.py index 78c5f90d..0b7bcc3e 100644 --- a/data/src/script.py +++ b/data/src/script.py @@ -30,102 +30,118 @@ from data_utils.unsafe_buildings import unsafe_buildings from data_utils.vacant_properties import vacant_properties +import traceback + +from classes.slack_error_reporter import send_error_to_slack + # Ensure the directory containing awkde is in the Python path awkde_path = "/usr/src/app" if awkde_path not in sys.path: sys.path.append(awkde_path) -services = [ - city_owned_properties, - phs_properties, - l_and_i, - rco_geoms, - tree_canopy, - nbhoods, - gun_crimes, - drug_crimes, - deliquencies, - opa_properties, - unsafe_buildings, - imm_dang_buildings, - tactical_urbanism, - conservatorship, - owner_type, - community_gardens, - park_priority, - ppr_properties, - contig_neighbors, - dev_probability, - negligent_devs, -] - -# backup sql schema if we are reloading data -backup: BackupArchiveDatabase = None -if FORCE_RELOAD: - # first archive any remaining backup that may exist from a previous run that errored - backup = BackupArchiveDatabase() - if backup.is_backup_schema_exists(): - backup.archive_backup_schema() +try: + services = [ + city_owned_properties, + phs_properties, + l_and_i, + rco_geoms, + tree_canopy, + nbhoods, + gun_crimes, + drug_crimes, + deliquencies, + opa_properties, + unsafe_buildings, + imm_dang_buildings, + tactical_urbanism, + conservatorship, + owner_type, + community_gardens, + park_priority, + ppr_properties, + contig_neighbors, + dev_probability, + negligent_devs, + ] + + # backup sql schema if we are reloading data + backup: BackupArchiveDatabase = None + if FORCE_RELOAD: + # first archive any remaining backup that may exist from a previous run that errored + backup = BackupArchiveDatabase() + if backup.is_backup_schema_exists(): + backup.archive_backup_schema() + conn.commit() + time.sleep(1) # make sure we get a different timestamp + backup = ( + BackupArchiveDatabase() + ) # create a new one so we get a new timestamp + + backup.backup_schema() conn.commit() - time.sleep(1) # make sure we get a different timestamp - backup = BackupArchiveDatabase() # create a new one so we get a new timestamp - backup.backup_schema() - conn.commit() + # Load Vacant Property Data + dataset = vacant_properties() + + # Load and join other datasets + for service in services: + dataset = service(dataset) + + before_drop = dataset.gdf.shape[0] + dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id") + after_drop = dataset.gdf.shape[0] + print( + f"Duplicate dataset rows dropped after initial services: {before_drop - after_drop}" + ) + + # Add Priority Level + dataset = priority_level(dataset) + + # Print the distribution of "priority_level" + distribution = dataset.gdf["priority_level"].value_counts() + print("Distribution of priority level:") + print(distribution) + + # Add Access Process + dataset = access_process(dataset) + + # Print the distribution of "access_process" + distribution = dataset.gdf["access_process"].value_counts() + print("Distribution of access process:") + print(distribution) + + before_drop = dataset.gdf.shape[0] + dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id") + after_drop = dataset.gdf.shape[0] + print(f"Duplicate final dataset rows droppeds: {before_drop - after_drop}") + + # back up old tiles file whether we are reloading data or not + if backup is None: + backup = BackupArchiveDatabase() + backup.backup_tiles_file() + + # Finalize in Postgres + dataset.gdf.to_postgis( + "vacant_properties_end", conn, if_exists="replace", index=False + ) -# Load Vacant Property Data -dataset = vacant_properties() - -# Load and join other datasets -for service in services: - dataset = service(dataset) - -before_drop = dataset.gdf.shape[0] -dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id") -after_drop = dataset.gdf.shape[0] -print(f"Duplicate dataset rows dropped after initial services: {before_drop - after_drop}") - -# Add Priority Level -dataset = priority_level(dataset) - -# Print the distribution of "priority_level" -distribution = dataset.gdf["priority_level"].value_counts() -print("Distribution of priority level:") -print(distribution) - -# Add Access Process -dataset = access_process(dataset) - -# Print the distribution of "access_process" -distribution = dataset.gdf["access_process"].value_counts() -print("Distribution of access process:") -print(distribution) - -before_drop = dataset.gdf.shape[0] -dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id") -after_drop = dataset.gdf.shape[0] -print(f"Duplicate final dataset rows droppeds: {before_drop - after_drop}") - -# back up old tiles file whether we are reloading data or not -if backup is None: - backup = BackupArchiveDatabase() -backup.backup_tiles_file() - -# Finalize in Postgres -dataset.gdf.to_postgis("vacant_properties_end", conn, if_exists="replace", index=False) - -conn.commit() - -# Post to Mapbox -dataset.build_and_publish(tiles_file_id_prefix) - -# if we are reloading, run the diff report, then archive the backup and finally prune old archives -if FORCE_RELOAD: - diff_report = DiffReport(timestamp_string=backup.timestamp_string) - diff_report.run() - backup.archive_backup_schema() - conn.commit() - backup.prune_old_archives() conn.commit() -conn.close() + # Post to Mapbox + dataset.build_and_publish(tiles_file_id_prefix) + + # if we are reloading, run the diff report, then archive the backup and finally prune old archives + if FORCE_RELOAD: + diff_report = DiffReport(timestamp_string=backup.timestamp_string) + diff_report.run() + backup.archive_backup_schema() + conn.commit() + backup.prune_old_archives() + conn.commit() + + conn.close() + +except Exception as e: + error_message = f"Error in backend job: {str(e)}\n\n{traceback.format_exc()}" + send_error_to_slack(error_message) + raise # Optionally re-raise the exception diff --git a/data/src/test/test_slack_error_reporter.py b/data/src/test/test_slack_error_reporter.py new file mode 100644 index 00000000..6bb86d11 --- /dev/null +++ b/data/src/test/test_slack_error_reporter.py @@ -0,0 +1,54 @@ +import unittest +from unittest.mock import patch + +import sys +import os + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/..") + +from classes.slack_error_reporter import ( + send_error_to_slack, +) # Ensure correct file import + + +class TestSlackNotifier(unittest.TestCase): + @patch( + "classes.slack_error_reporter.WebClient.chat_postMessage" + ) # Correct patching + @patch( + "classes.slack_error_reporter.os.getenv", return_value="mock_slack_token" + ) # Correct patching + def test_send_error_to_slack(self, mock_getenv, mock_slack_post): + """Test that Slack error reporting is triggered correctly.""" + + error_message = "Test error message" + + # Call the Slack notification function + send_error_to_slack(error_message) + + # Verify the Slack API call was made with the correct parameters + mock_slack_post.assert_called_once_with( + channel="clean-and-green-philly-back-end", # Use actual channel ID + text=error_message, + username="Backend Error Reporter", + ) + + @patch( + "classes.slack_error_reporter.WebClient.chat_postMessage" + ) # Correct patching + @patch( + "classes.slack_error_reporter.os.getenv", return_value=None + ) # Simulate missing Slack token + def test_no_error_no_slack_message(self, mock_getenv, mock_slack_post): + """Test that Slack notification is not triggered if there's no error.""" + + # Call the Slack notification function (with no valid token) + with self.assertRaises(ValueError): + send_error_to_slack("Test error message") + + # Ensure Slack's chat_postMessage was not called due to missing token + mock_slack_post.assert_not_called() + + +if __name__ == "__main__": + unittest.main()