Skip to content

Commit

Permalink
Merge pull request #969 from CodeForPhilly/staging
Browse files Browse the repository at this point in the history
Weekly PR from Staging to Main
  • Loading branch information
nlebovits authored Oct 25, 2024
2 parents 293e3e5 + f621c75 commit 0da3b21
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 89 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ data.env
data/src/tmp
.DS_Store
/data/src/local_outputs/
/data/notebooks/

## App

Expand Down
16 changes: 16 additions & 0 deletions data/src/classes/slack_error_reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
from slack_sdk import WebClient


def send_error_to_slack(error_message: str) -> None:
"""Send error message to Slack."""
token: str | None = os.getenv("CAGP_SLACK_API_TOKEN") # token can be None
if token:
client = WebClient(token=token)
client.chat_postMessage(
channel="clean-and-green-philly-back-end", # Replace with actual Slack channel ID
text=error_message,
username="Backend Error Reporter",
)
else:
raise ValueError("Slack API token not found in environment variables.")
194 changes: 105 additions & 89 deletions data/src/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,102 +30,118 @@
from data_utils.unsafe_buildings import unsafe_buildings
from data_utils.vacant_properties import vacant_properties

import traceback

from classes.slack_error_reporter import send_error_to_slack

# Ensure the directory containing awkde is in the Python path
awkde_path = "/usr/src/app"
if awkde_path not in sys.path:
sys.path.append(awkde_path)

services = [
city_owned_properties,
phs_properties,
l_and_i,
rco_geoms,
tree_canopy,
nbhoods,
gun_crimes,
drug_crimes,
deliquencies,
opa_properties,
unsafe_buildings,
imm_dang_buildings,
tactical_urbanism,
conservatorship,
owner_type,
community_gardens,
park_priority,
ppr_properties,
contig_neighbors,
dev_probability,
negligent_devs,
]

# backup sql schema if we are reloading data
backup: BackupArchiveDatabase = None
if FORCE_RELOAD:
# first archive any remaining backup that may exist from a previous run that errored
backup = BackupArchiveDatabase()
if backup.is_backup_schema_exists():
backup.archive_backup_schema()
try:
services = [
city_owned_properties,
phs_properties,
l_and_i,
rco_geoms,
tree_canopy,
nbhoods,
gun_crimes,
drug_crimes,
deliquencies,
opa_properties,
unsafe_buildings,
imm_dang_buildings,
tactical_urbanism,
conservatorship,
owner_type,
community_gardens,
park_priority,
ppr_properties,
contig_neighbors,
dev_probability,
negligent_devs,
]

# backup sql schema if we are reloading data
backup: BackupArchiveDatabase = None
if FORCE_RELOAD:
# first archive any remaining backup that may exist from a previous run that errored
backup = BackupArchiveDatabase()
if backup.is_backup_schema_exists():
backup.archive_backup_schema()
conn.commit()
time.sleep(1) # make sure we get a different timestamp
backup = (
BackupArchiveDatabase()
) # create a new one so we get a new timestamp

backup.backup_schema()
conn.commit()
time.sleep(1) # make sure we get a different timestamp
backup = BackupArchiveDatabase() # create a new one so we get a new timestamp

backup.backup_schema()
conn.commit()
# Load Vacant Property Data
dataset = vacant_properties()

# Load and join other datasets
for service in services:
dataset = service(dataset)

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
print(
f"Duplicate dataset rows dropped after initial services: {before_drop - after_drop}"
)

# Add Priority Level
dataset = priority_level(dataset)

# Print the distribution of "priority_level"
distribution = dataset.gdf["priority_level"].value_counts()
print("Distribution of priority level:")
print(distribution)

# Add Access Process
dataset = access_process(dataset)

# Print the distribution of "access_process"
distribution = dataset.gdf["access_process"].value_counts()
print("Distribution of access process:")
print(distribution)

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
print(f"Duplicate final dataset rows droppeds: {before_drop - after_drop}")

# back up old tiles file whether we are reloading data or not
if backup is None:
backup = BackupArchiveDatabase()
backup.backup_tiles_file()

# Finalize in Postgres
dataset.gdf.to_postgis(
"vacant_properties_end", conn, if_exists="replace", index=False
)

# Load Vacant Property Data
dataset = vacant_properties()

# Load and join other datasets
for service in services:
dataset = service(dataset)

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
print(f"Duplicate dataset rows dropped after initial services: {before_drop - after_drop}")

# Add Priority Level
dataset = priority_level(dataset)

# Print the distribution of "priority_level"
distribution = dataset.gdf["priority_level"].value_counts()
print("Distribution of priority level:")
print(distribution)

# Add Access Process
dataset = access_process(dataset)

# Print the distribution of "access_process"
distribution = dataset.gdf["access_process"].value_counts()
print("Distribution of access process:")
print(distribution)

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
print(f"Duplicate final dataset rows droppeds: {before_drop - after_drop}")

# back up old tiles file whether we are reloading data or not
if backup is None:
backup = BackupArchiveDatabase()
backup.backup_tiles_file()

# Finalize in Postgres
dataset.gdf.to_postgis("vacant_properties_end", conn, if_exists="replace", index=False)

conn.commit()

# Post to Mapbox
dataset.build_and_publish(tiles_file_id_prefix)

# if we are reloading, run the diff report, then archive the backup and finally prune old archives
if FORCE_RELOAD:
diff_report = DiffReport(timestamp_string=backup.timestamp_string)
diff_report.run()
backup.archive_backup_schema()
conn.commit()
backup.prune_old_archives()
conn.commit()

conn.close()
# Post to Mapbox
dataset.build_and_publish(tiles_file_id_prefix)

# if we are reloading, run the diff report, then archive the backup and finally prune old archives
if FORCE_RELOAD:
diff_report = DiffReport(timestamp_string=backup.timestamp_string)
diff_report.run()
backup.archive_backup_schema()
conn.commit()
backup.prune_old_archives()
conn.commit()

conn.close()

except Exception as e:
error_message = f"Error in backend job: {str(e)}\n\n{traceback.format_exc()}"
send_error_to_slack(error_message)
raise # Optionally re-raise the exception
54 changes: 54 additions & 0 deletions data/src/test/test_slack_error_reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import unittest
from unittest.mock import patch

import sys
import os

sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/..")

from classes.slack_error_reporter import (
send_error_to_slack,
) # Ensure correct file import


class TestSlackNotifier(unittest.TestCase):
@patch(
"classes.slack_error_reporter.WebClient.chat_postMessage"
) # Correct patching
@patch(
"classes.slack_error_reporter.os.getenv", return_value="mock_slack_token"
) # Correct patching
def test_send_error_to_slack(self, mock_getenv, mock_slack_post):
"""Test that Slack error reporting is triggered correctly."""

error_message = "Test error message"

# Call the Slack notification function
send_error_to_slack(error_message)

# Verify the Slack API call was made with the correct parameters
mock_slack_post.assert_called_once_with(
channel="clean-and-green-philly-back-end", # Use actual channel ID
text=error_message,
username="Backend Error Reporter",
)

@patch(
"classes.slack_error_reporter.WebClient.chat_postMessage"
) # Correct patching
@patch(
"classes.slack_error_reporter.os.getenv", return_value=None
) # Simulate missing Slack token
def test_no_error_no_slack_message(self, mock_getenv, mock_slack_post):
"""Test that Slack notification is not triggered if there's no error."""

# Call the Slack notification function (with no valid token)
with self.assertRaises(ValueError):
send_error_to_slack("Test error message")

# Ensure Slack's chat_postMessage was not called due to missing token
mock_slack_post.assert_not_called()


if __name__ == "__main__":
unittest.main()

0 comments on commit 0da3b21

Please sign in to comment.