Skip to content

Commit

Permalink
Merge pull request #75 from troykelly/74-failing-to-generate-when-wea…
Browse files Browse the repository at this point in the history
…ther-fails

74 failing to generate when weather fails
  • Loading branch information
troykelly authored Jun 16, 2024
2 parents 7aa942a + 87ff374 commit 694a89a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 19 deletions.
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ croniter==2.0.5
distro==1.9.0
feedparser==6.0.11
gitdb==4.0.11
GitPython==3.1.43
GitPython==3.1.41
h11==0.14.0
httpcore==1.0.5
httpx==0.27.0
Expand All @@ -22,13 +22,13 @@ mutagen==1.47.0
openai==1.34.0
pyacoustid==1.3.0
pydantic==2.7.4
pydantic_core==2.19.0
pydantic_core==2.18.4
pydub==0.25.1
python-dateutil==2.9.0.post0
pytz==2024.1
requests==2.32.3
s3transfer==0.10.1
setuptools==70.0.0
setuptools==69.0.3
sgmllib3k==1.0.0
six==1.16.0
smmap==5.0.1
Expand Down
100 changes: 84 additions & 16 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from s3 import S3Client
from replaygain import process_replaygain
from templating import TemplateHandlers, render_template
from typing import Dict, Any
from typing import Dict, Any, Union, Optional
from logger import setup_logging

setup_logging()
Expand Down Expand Up @@ -637,17 +637,87 @@ def format_datetime(dt):
formatted_datetime = dt.strftime(f"%A the {day}{{suffix}} of %B %Y at %H:%M (%Z)")
return formatted_datetime.replace("{suffix}", suffix)

def validate_and_get(dct: Dict[str, Any], keys: List[Union[str, int]]) -> Optional[Any]:
"""Validate and retrieve a value from a nested dictionary.
Args:
dct: The dictionary to retrieve the value from.
keys: A list of keys representing the path to the desired value.
Can include both string keys for dictionaries and integer keys for list-like structures.
Returns:
The value if found; None if any key is missing or invalid.
Example:
>>> data = {'a': {'b': {'c': 1}}}
>>> validate_and_get(data, ['a', 'b', 'c'])
1
>>> validate_and_get(data, ['a', 'x', 'c'])
None
"""
current_level = dct

for key in keys:
if isinstance(current_level, dict) and key in current_level:
current_level = current_level[key]
elif isinstance(current_level, list) and isinstance(key, int) and 0 <= key < len(current_level):
current_level = current_level[key]
else:
logging.warning(f"Missing or invalid key: {key}")
return None

return current_level

def generate_openweather_weather_report(data, units="metric"):
"""Generate a weather report from JSON data."""
"""Generate a weather report from JSON data.
Args:
data: The weather data from OpenWeatherMap API.
units: Units of measurement ("metric" or "imperial").
Returns:
str: A formatted weather report, or a message if data is missing.
"""
# Required keys at the top level of the data.
top_level_keys = ["timezone_offset", "current", "minutely", "daily"]

# Check for missing top-level keys.
for key in top_level_keys:
if key not in data:
logging.warning(f"Missing key in weather data: {key}")
return "Weather report generation skipped due to missing data."

# Validate current weather data keys.
current_weather_keys = ["dt", "temp", "feels_like", "humidity", "weather", "uvi", "wind_speed", "wind_deg", "sunrise", "sunset"]
for key in current_weather_keys:
if isinstance(data["current"], list) and key not in data["current"][0]:
logging.warning(f"Missing key in current weather data: {key}")
return "Weather report generation skipped due to missing data."

# Validate daily weather data keys.
daily_weather_keys = ["dt", "temp", "feels_like", "humidity", "weather", "pop", "wind_speed", "wind_deg"]
for idx in range(2):
for key in daily_weather_keys:
if isinstance(data["daily"], list) and key not in data["daily"][idx]:
logging.warning(f"Missing key in daily weather data: {key} for index {idx}")
return "Weather report generation skipped due to missing data."

# Validate minutely weather data keys.
minutely_weather_keys = ["precipitation"]
for minute_data in data["minutely"]:
for key in minutely_weather_keys:
if key not in minute_data:
logging.warning(f"Missing key in minutely weather data: {key}")
return "Weather report generation skipped due to missing data."

# Extract the validated data.
timezone_offset = data["timezone_offset"]
current_weather = data["current"]
minutely_weather = data["minutely"]
daily_weather = data["daily"][0]
next_day_weather = data["daily"][1]

# Convert timestamps to readable formats with timezone
# Convert timestamps to readable formats with timezone.
current_time = datetime.fromtimestamp(
current_weather["dt"], timezone(timedelta(seconds=timezone_offset))
)
Expand All @@ -658,15 +728,15 @@ def generate_openweather_weather_report(data, units="metric"):
current_weather["sunset"], timezone(timedelta(seconds=timezone_offset))
)

# Unit labels
# Unit labels.
temp_unit = "°C" if units == "metric" else "°F"
wind_speed_unit = "km/h" if units == "metric" else "mph"

# Convert wind speed if necessary
# Convert wind speed if necessary.
wind_speed = convert_wind_speed(current_weather["wind_speed"], units)
wind_bearing = wind_direction(current_weather["wind_deg"])

# Current weather report
# Current weather report.
current_report = (
f"Current Weather Update as of {format_datetime(current_time)}:\n"
f"Temperature: {current_weather['temp']}{temp_unit} (Feels like: {current_weather['feels_like']}{temp_unit})\n"
Expand All @@ -677,10 +747,8 @@ def generate_openweather_weather_report(data, units="metric"):
f"Sunrise at: {format_datetime(sunrise)}, Sunset at: {format_datetime(sunset)}\n"
)

# Immediate future weather
future_rain = [
minute["precipitation"] for minute in minutely_weather[:60]
] # Next hour
# Immediate future weather.
future_rain = [minute["precipitation"] for minute in minutely_weather[:60]]
future_total_rain = sum(future_rain)
if future_total_rain > 0:
immediate_future_report = (
Expand All @@ -692,27 +760,26 @@ def generate_openweather_weather_report(data, units="metric"):
"No significant precipitation expected in the immediate future."
)

# Convert next day wind speed
# Convert next day wind speed.
next_day_wind_speed = convert_wind_speed(next_day_weather["wind_speed"], units)
next_day_wind_bearing = wind_direction(next_day_weather["wind_deg"])

# Next day weather report
# Next day weather report.
next_day_report = (
f"Weather Forecast for Tomorrow ({format_datetime(datetime.fromtimestamp(next_day_weather['dt'], timezone(timedelta(seconds=timezone_offset))))}):\n"
f"Day Temperature: {next_day_weather['temp']['day']}{temp_unit} (Feels like: {next_day_weather['feels_like']['day']}{temp_unit})\n"
f"Night Temperature: {next_day_weather['temp']['night']}{temp_unit} (Feels like: {next_day_weather['feels_like']['night']}{temp_unit})\n"
f"Condition: {next_day_weather['summary']}\n"
f"Condition: {next_day_weather['weather'][0]['description'].capitalize()}\n"
f"Humidity: {next_day_weather['humidity']}%\n"
f"Wind: {next_day_wind_speed:.2f} {wind_speed_unit} from the {next_day_wind_bearing}\n"
f"Chance of Rain: {next_day_weather['pop']*100}%"
f"Chance of Rain: {next_day_weather['pop'] * 100}%"
)

# Combine all reports
# Combine all reports.
full_report = f"{current_report}\n{immediate_future_report}\n\n{next_day_report}"

return full_report


def clean_text(input_string):
"""Remove HTML tags, non-human-readable content, and excess whitespace from the text."""
cleanr = re.compile("<.*?>")
Expand Down Expand Up @@ -1469,6 +1536,7 @@ def generate_news_audio():
weather_info
and "No data" not in weather_info
and "No description" not in weather_info
and "missing data" not in weather_info
):
news_items.insert(
0,
Expand Down

0 comments on commit 694a89a

Please sign in to comment.