Skip to content

Commit

Permalink
feat: ⚡️ spain weather data
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgasquez committed Apr 2, 2024
1 parent 6e0bd58 commit 312e027
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 69 deletions.
7 changes: 4 additions & 3 deletions datadex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@
from dagster_dbt import DbtCliResource, load_assets_from_dbt_project
from dagster_duckdb_pandas import DuckDBPandasIOManager

from .assets import others, indicators, huggingface
from .resources import IUCNRedListAPI, HuggingFaceResource
from .assets import others, indicators, huggingface, spain
from .resources import AEMETAPI, IUCNRedListAPI, HuggingFaceResource

DBT_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../dbt/"
DATABASE_PATH = os.getenv("DATABASE_PATH", "data/database.duckdb")

dbt = DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROJECT_DIR)

dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, DBT_PROJECT_DIR)
all_assets = load_assets_from_modules([indicators, huggingface, others])
all_assets = load_assets_from_modules([indicators, huggingface, others, spain])

resources = {
"hf": HuggingFaceResource(token=EnvVar("HUGGINGFACE_TOKEN")),
"dbt": dbt,
"iucn_redlist_api": IUCNRedListAPI(token=EnvVar("IUCN_REDLIST_TOKEN")),
"aemet_api": AEMETAPI(token=EnvVar("AEMET_API_TOKEN")),
"io_manager": DuckDBPandasIOManager(database=DATABASE_PATH, schema="main"),
}

Expand Down
2 changes: 2 additions & 0 deletions datadex/assets/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def hf_asset(data: pd.DataFrame, hf: HuggingFaceResource) -> None:
"threatened_animal_species",
"country_year_indicators",
"spain_ipc",
"spain_aemet_stations",
"spain_aemet_weather_data",
]

assets = []
Expand Down
67 changes: 1 addition & 66 deletions datadex/assets/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import pandas as pd
import requests
from dagster import AssetExecutionContext, asset
from slugify import slugify
from dagster import asset

from ..resources import IUCNRedListAPI

Expand Down Expand Up @@ -53,67 +52,3 @@ def wikidata_asteroids() -> pd.DataFrame:
)

return pd.read_csv(io.StringIO(response.content.decode("utf-8")))


@asset
def spain_energy_demand(context: AssetExecutionContext) -> pd.DataFrame:
"""
Spain energy demand data.
"""
df = pd.DataFrame()

FIRST_DAY = pd.to_datetime("2014-01-01")
ENDPOINT = "https://apidatos.ree.es/en/datos/demanda/demanda-tiempo-real"

start_date = pd.to_datetime(FIRST_DAY)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date = start_date + pd.DateOffset(days=15)
end_date_str = end_date.strftime("%Y-%m-%d")

yesterday = pd.to_datetime("today") - pd.DateOffset(days=1)

while start_date < yesterday:
url = f"{ENDPOINT}?start_date={start_date_str}T00:00&end_date={end_date_str}T00:00&time_trunc=hour"
response = requests.get(url)

context.log.info(
f"Start date: {start_date_str} status code: {response.status_code}"
)

local_df = pd.json_normalize(
response.json()["included"][0]["attributes"]["values"]
)
local_df["datetime"] = pd.to_datetime(local_df["datetime"], utc=True)

df = pd.concat([df, local_df[["value", "datetime"]]])

start_date = start_date + pd.DateOffset(days=15)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date = start_date + pd.DateOffset(days=15)
end_date_str = end_date.strftime("%Y-%m-%d")

return df


@asset
def spain_ipc() -> pd.DataFrame:
"""
Spain IPC data from INE. Downloaded from datos.gob.es (https://datos.gob.es/es/apidata).
"""

df = pd.read_csv("https://www.ine.es/jaxiT3/files/t/csv_bdsc/50904.csv", sep=";")

# Clean data
df["Total"] = pd.to_numeric(df["Total"].str.replace(",", "."), errors="coerce")
df["Periodo"] = pd.to_datetime(df["Periodo"].str.replace("M", "-"), format="%Y-%m")

df = df.pivot_table(
index=["Periodo", "Clases"],
columns=["Tipo de dato"],
values="Total",
aggfunc="sum",
).reset_index()

df.columns = [slugify(col, separator="_") for col in df.columns]

return df
112 changes: 112 additions & 0 deletions datadex/assets/spain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from datetime import datetime, timedelta

import pandas as pd
import requests
from dagster import AssetExecutionContext, asset
from slugify import slugify
from pandas.tseries.offsets import MonthEnd

from ..resources import AEMETAPI


@asset
def spain_energy_demand(context: AssetExecutionContext) -> pd.DataFrame:
"""
Spain energy demand data.
"""
df = pd.DataFrame()

FIRST_DAY = pd.to_datetime("2014-01-01")
ENDPOINT = "https://apidatos.ree.es/en/datos/demanda/demanda-tiempo-real"

start_date = pd.to_datetime(FIRST_DAY)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date = start_date + pd.DateOffset(days=15)
end_date_str = end_date.strftime("%Y-%m-%d")

yesterday = pd.to_datetime("today") - pd.DateOffset(days=1)

while start_date < yesterday:
url = f"{ENDPOINT}?start_date={start_date_str}T00:00&end_date={end_date_str}T00:00&time_trunc=hour"
response = requests.get(url)

context.log.info(
f"Start date: {start_date_str} status code: {response.status_code}"
)

local_df = pd.json_normalize(
response.json()["included"][0]["attributes"]["values"]
)
local_df["datetime"] = pd.to_datetime(local_df["datetime"], utc=True)

df = pd.concat([df, local_df[["value", "datetime"]]])

start_date = start_date + pd.DateOffset(days=15)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date = start_date + pd.DateOffset(days=15)
end_date_str = end_date.strftime("%Y-%m-%d")

return df


@asset
def spain_ipc() -> pd.DataFrame:
"""
Spain IPC data from INE. Downloaded from datos.gob.es (https://datos.gob.es/es/apidata).
"""

df = pd.read_csv("https://www.ine.es/jaxiT3/files/t/csv_bdsc/50904.csv", sep=";")

# Clean data
df["Total"] = pd.to_numeric(df["Total"].str.replace(",", "."), errors="coerce")
df["Periodo"] = pd.to_datetime(df["Periodo"].str.replace("M", "-"), format="%Y-%m")

df = df.pivot_table(
index=["Periodo", "Clases"],
columns=["Tipo de dato"],
values="Total",
aggfunc="sum",
).reset_index()

df.columns = [slugify(col, separator="_") for col in df.columns]

return df


@asset
def spain_aemet_stations(aemet_api: AEMETAPI) -> pd.DataFrame:
"""
Spain AEMET stations data.
"""

df = pd.DataFrame(aemet_api.get_all_stations())

return df


@asset
def spain_aemet_weather_data(
context: AssetExecutionContext, aemet_api: AEMETAPI
) -> pd.DataFrame:
"""
Spain weather data since 1920.
"""

start_date = pd.to_datetime("1920-01-01")

end_date = datetime.now() - timedelta(days=1)

df = pd.DataFrame()

for i in pd.date_range(start_date, end_date, freq="M"):
first_day = i.strftime("%Y-%m-01") + "T00:00:00UTC"
last_day = (i + MonthEnd(0)).strftime("%Y-%m-%d") + "T23:59:59UTC"
context.log.info(f"Getting data from {first_day} to {last_day}")

mdf = pd.DataFrame(aemet_api.get_weather_data(first_day, last_day))

df = pd.concat([df, mdf], ignore_index=True)

# df["fecha"] = pd.to_datetime(df["fecha"], format="%Y-%m-%d")

return df
48 changes: 48 additions & 0 deletions datadex/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import huggingface_hub as hf_hub
from dagster import ConfigurableResource
from datasets import Dataset, NamedSplit
from tenacity import retry, wait_exponential, stop_after_attempt


class HuggingFaceResource(ConfigurableResource):
Expand Down Expand Up @@ -56,3 +57,50 @@ def get_market_prices(self, start_date: str, end_date: str, time_trunc="hour"):
category = "mercados"
widget = "precios-mercados-tiempo-real"
return self.query(category, widget, start_date, end_date, time_trunc)


class AEMETAPI(ConfigurableResource):
endpoint: str = "https://opendata.aemet.es/opendata/api"
token: str

@retry(
stop=stop_after_attempt(10),
wait=wait_exponential(multiplier=1, min=4, max=20),
)
def query(self, url):
query = {
"api_key": self.token,
}

headers = {"cache-control": "no-cache"}
r = requests.get(url, params=query, headers=headers)
r.raise_for_status()

return r.json()

@retry(
stop=stop_after_attempt(10),
wait=wait_exponential(multiplier=1, min=4, max=20),
)
def get_query_data(self, query_response):
data_url = query_response.get("datos")
r = requests.get(data_url)
r.raise_for_status()

return r.json()

def get_all_stations(self):
url = f"{self.endpoint}/valores/climatologicos/inventarioestaciones/todasestaciones"

query_response = self.query(url)
data = self.get_query_data(query_response)

return data

def get_weather_data(self, start_date: str, end_date: str):
url = f"{self.endpoint}/valores/climatologicos/diarios/datos/fechaini/{start_date}/fechafin/{end_date}/todasestaciones"

query_response = self.query(url)
data = self.get_query_data(query_response)

return data
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"pyarrow",
"python-slugify",
"regex",
"tenacity",
]

requires-python = ">=3.11, <=3.12"
Expand Down

0 comments on commit 312e027

Please sign in to comment.