diff --git a/datadex/__init__.py b/datadex/__init__.py index f8c2334..4456a50 100644 --- a/datadex/__init__.py +++ b/datadex/__init__.py @@ -4,8 +4,8 @@ from dagster_dbt import DbtCliResource, load_assets_from_dbt_project from dagster_duckdb_pandas import DuckDBPandasIOManager -from .assets import others, indicators, huggingface -from .resources import IUCNRedListAPI, HuggingFaceResource +from .assets import others, indicators, huggingface, spain +from .resources import AEMETAPI, IUCNRedListAPI, HuggingFaceResource DBT_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../dbt/" DATABASE_PATH = os.getenv("DATABASE_PATH", "data/database.duckdb") @@ -13,12 +13,13 @@ dbt = DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROJECT_DIR) dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, DBT_PROJECT_DIR) -all_assets = load_assets_from_modules([indicators, huggingface, others]) +all_assets = load_assets_from_modules([indicators, huggingface, others, spain]) resources = { "hf": HuggingFaceResource(token=EnvVar("HUGGINGFACE_TOKEN")), "dbt": dbt, "iucn_redlist_api": IUCNRedListAPI(token=EnvVar("IUCN_REDLIST_TOKEN")), + "aemet_api": AEMETAPI(token=EnvVar("AEMET_API_TOKEN")), "io_manager": DuckDBPandasIOManager(database=DATABASE_PATH, schema="main"), } diff --git a/datadex/assets/huggingface.py b/datadex/assets/huggingface.py index 0f226b0..9d6fc37 100644 --- a/datadex/assets/huggingface.py +++ b/datadex/assets/huggingface.py @@ -21,6 +21,8 @@ def hf_asset(data: pd.DataFrame, hf: HuggingFaceResource) -> None: "threatened_animal_species", "country_year_indicators", "spain_ipc", + "spain_aemet_stations", + "spain_aemet_weather_data", ] assets = [] diff --git a/datadex/assets/others.py b/datadex/assets/others.py index 090e4db..27311d5 100644 --- a/datadex/assets/others.py +++ b/datadex/assets/others.py @@ -2,8 +2,7 @@ import pandas as pd import requests -from dagster import AssetExecutionContext, asset -from slugify import slugify +from dagster import asset from ..resources import IUCNRedListAPI @@ -53,67 +52,3 @@ def wikidata_asteroids() -> pd.DataFrame: ) return pd.read_csv(io.StringIO(response.content.decode("utf-8"))) - - -@asset -def spain_energy_demand(context: AssetExecutionContext) -> pd.DataFrame: - """ - Spain energy demand data. - """ - df = pd.DataFrame() - - FIRST_DAY = pd.to_datetime("2014-01-01") - ENDPOINT = "https://apidatos.ree.es/en/datos/demanda/demanda-tiempo-real" - - start_date = pd.to_datetime(FIRST_DAY) - start_date_str = start_date.strftime("%Y-%m-%d") - end_date = start_date + pd.DateOffset(days=15) - end_date_str = end_date.strftime("%Y-%m-%d") - - yesterday = pd.to_datetime("today") - pd.DateOffset(days=1) - - while start_date < yesterday: - url = f"{ENDPOINT}?start_date={start_date_str}T00:00&end_date={end_date_str}T00:00&time_trunc=hour" - response = requests.get(url) - - context.log.info( - f"Start date: {start_date_str} status code: {response.status_code}" - ) - - local_df = pd.json_normalize( - response.json()["included"][0]["attributes"]["values"] - ) - local_df["datetime"] = pd.to_datetime(local_df["datetime"], utc=True) - - df = pd.concat([df, local_df[["value", "datetime"]]]) - - start_date = start_date + pd.DateOffset(days=15) - start_date_str = start_date.strftime("%Y-%m-%d") - end_date = start_date + pd.DateOffset(days=15) - end_date_str = end_date.strftime("%Y-%m-%d") - - return df - - -@asset -def spain_ipc() -> pd.DataFrame: - """ - Spain IPC data from INE. Downloaded from datos.gob.es (https://datos.gob.es/es/apidata). - """ - - df = pd.read_csv("https://www.ine.es/jaxiT3/files/t/csv_bdsc/50904.csv", sep=";") - - # Clean data - df["Total"] = pd.to_numeric(df["Total"].str.replace(",", "."), errors="coerce") - df["Periodo"] = pd.to_datetime(df["Periodo"].str.replace("M", "-"), format="%Y-%m") - - df = df.pivot_table( - index=["Periodo", "Clases"], - columns=["Tipo de dato"], - values="Total", - aggfunc="sum", - ).reset_index() - - df.columns = [slugify(col, separator="_") for col in df.columns] - - return df diff --git a/datadex/assets/spain.py b/datadex/assets/spain.py new file mode 100644 index 0000000..e8837ba --- /dev/null +++ b/datadex/assets/spain.py @@ -0,0 +1,112 @@ +from datetime import datetime, timedelta + +import pandas as pd +import requests +from dagster import AssetExecutionContext, asset +from slugify import slugify +from pandas.tseries.offsets import MonthEnd + +from ..resources import AEMETAPI + + +@asset +def spain_energy_demand(context: AssetExecutionContext) -> pd.DataFrame: + """ + Spain energy demand data. + """ + df = pd.DataFrame() + + FIRST_DAY = pd.to_datetime("2014-01-01") + ENDPOINT = "https://apidatos.ree.es/en/datos/demanda/demanda-tiempo-real" + + start_date = pd.to_datetime(FIRST_DAY) + start_date_str = start_date.strftime("%Y-%m-%d") + end_date = start_date + pd.DateOffset(days=15) + end_date_str = end_date.strftime("%Y-%m-%d") + + yesterday = pd.to_datetime("today") - pd.DateOffset(days=1) + + while start_date < yesterday: + url = f"{ENDPOINT}?start_date={start_date_str}T00:00&end_date={end_date_str}T00:00&time_trunc=hour" + response = requests.get(url) + + context.log.info( + f"Start date: {start_date_str} status code: {response.status_code}" + ) + + local_df = pd.json_normalize( + response.json()["included"][0]["attributes"]["values"] + ) + local_df["datetime"] = pd.to_datetime(local_df["datetime"], utc=True) + + df = pd.concat([df, local_df[["value", "datetime"]]]) + + start_date = start_date + pd.DateOffset(days=15) + start_date_str = start_date.strftime("%Y-%m-%d") + end_date = start_date + pd.DateOffset(days=15) + end_date_str = end_date.strftime("%Y-%m-%d") + + return df + + +@asset +def spain_ipc() -> pd.DataFrame: + """ + Spain IPC data from INE. Downloaded from datos.gob.es (https://datos.gob.es/es/apidata). + """ + + df = pd.read_csv("https://www.ine.es/jaxiT3/files/t/csv_bdsc/50904.csv", sep=";") + + # Clean data + df["Total"] = pd.to_numeric(df["Total"].str.replace(",", "."), errors="coerce") + df["Periodo"] = pd.to_datetime(df["Periodo"].str.replace("M", "-"), format="%Y-%m") + + df = df.pivot_table( + index=["Periodo", "Clases"], + columns=["Tipo de dato"], + values="Total", + aggfunc="sum", + ).reset_index() + + df.columns = [slugify(col, separator="_") for col in df.columns] + + return df + + +@asset +def spain_aemet_stations(aemet_api: AEMETAPI) -> pd.DataFrame: + """ + Spain AEMET stations data. + """ + + df = pd.DataFrame(aemet_api.get_all_stations()) + + return df + + +@asset +def spain_aemet_weather_data( + context: AssetExecutionContext, aemet_api: AEMETAPI +) -> pd.DataFrame: + """ + Spain weather data since 1920. + """ + + start_date = pd.to_datetime("1920-01-01") + + end_date = datetime.now() - timedelta(days=1) + + df = pd.DataFrame() + + for i in pd.date_range(start_date, end_date, freq="M"): + first_day = i.strftime("%Y-%m-01") + "T00:00:00UTC" + last_day = (i + MonthEnd(0)).strftime("%Y-%m-%d") + "T23:59:59UTC" + context.log.info(f"Getting data from {first_day} to {last_day}") + + mdf = pd.DataFrame(aemet_api.get_weather_data(first_day, last_day)) + + df = pd.concat([df, mdf], ignore_index=True) + + # df["fecha"] = pd.to_datetime(df["fecha"], format="%Y-%m-%d") + + return df diff --git a/datadex/resources.py b/datadex/resources.py index 2d15d0f..9dbc1e9 100644 --- a/datadex/resources.py +++ b/datadex/resources.py @@ -2,6 +2,7 @@ import huggingface_hub as hf_hub from dagster import ConfigurableResource from datasets import Dataset, NamedSplit +from tenacity import retry, wait_exponential, stop_after_attempt class HuggingFaceResource(ConfigurableResource): @@ -56,3 +57,50 @@ def get_market_prices(self, start_date: str, end_date: str, time_trunc="hour"): category = "mercados" widget = "precios-mercados-tiempo-real" return self.query(category, widget, start_date, end_date, time_trunc) + + +class AEMETAPI(ConfigurableResource): + endpoint: str = "https://opendata.aemet.es/opendata/api" + token: str + + @retry( + stop=stop_after_attempt(10), + wait=wait_exponential(multiplier=1, min=4, max=20), + ) + def query(self, url): + query = { + "api_key": self.token, + } + + headers = {"cache-control": "no-cache"} + r = requests.get(url, params=query, headers=headers) + r.raise_for_status() + + return r.json() + + @retry( + stop=stop_after_attempt(10), + wait=wait_exponential(multiplier=1, min=4, max=20), + ) + def get_query_data(self, query_response): + data_url = query_response.get("datos") + r = requests.get(data_url) + r.raise_for_status() + + return r.json() + + def get_all_stations(self): + url = f"{self.endpoint}/valores/climatologicos/inventarioestaciones/todasestaciones" + + query_response = self.query(url) + data = self.get_query_data(query_response) + + return data + + def get_weather_data(self, start_date: str, end_date: str): + url = f"{self.endpoint}/valores/climatologicos/diarios/datos/fechaini/{start_date}/fechafin/{end_date}/todasestaciones" + + query_response = self.query(url) + data = self.get_query_data(query_response) + + return data diff --git a/pyproject.toml b/pyproject.toml index d87c565..dcb5475 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "pyarrow", "python-slugify", "regex", + "tenacity", ] requires-python = ">=3.11, <=3.12"