diff --git a/data/issues/emissions/historical_data_calculations.py b/data/issues/emissions/historical_data_calculations.py index 7d74f50c..00b11f47 100644 --- a/data/issues/emissions/historical_data_calculations.py +++ b/data/issues/emissions/historical_data_calculations.py @@ -1,60 +1,116 @@ -import pandas as pd -import feather -import os -from datetime import datetime import hashlib +import os + +import pandas as pd +import pyarrow.feather as feather PATH_SMHI = 'https://nationellaemissionsdatabasen.smhi.se/' + \ 'api/getexcelfile/?county=0&municipality=0&sub=CO2' -def cache_df(f: type(pd.read_excel)) -> type(pd.read_excel): +def cache_df(f: callable = None, path: str = None, freq: str = '1Y'): """ - Cache the df to an intermediate file and use it if made this year + Cache the DataFrame to an intermediate file and use it if created within the same period. Args: - f: Function to cache + f: A function to cache (e.g., a function that loads a DataFrame). + path: Path to the file to be cached. If provided, it automatically supplies the path parameter to the decorated function. + freq: Cache period, e.g. '1D', '1M', '1Y'. Defaults to '1Y'. Returns: - f with caching of result + A decorator that adds caching functionality to the intended function. + + Example usage: + + Create a test Excel file: + >>> df_test = pd.DataFrame({"A": [1], "B": [2]}) + >>> test_path = "test_data.xlsx" + >>> df_test.to_excel(test_path, index=False) + + Use the decorator to cache the DataFrame loaded from the file: + >>> @cache_df + ... def load_data(path): + ... print("Creating DataFrame from file (first call)...") + ... return pd.read_excel(path) + >>> load_data.__name__ + 'cache_df_load_data' + >>> print(load_data(test_path)) + Creating DataFrame from file (first call)... + A B + 0 1 2 + + >>> print(load_data(test_path)) # Data loaded from cache, no print output + A B + 0 1 2 + + Use the decorator with a short expiration time: + >>> @cache_df(path=test_path, freq='1ms') + ... def load_data_short_expiry(path=test_path): + ... print("Creating DataFrame from file (short expiration)...") + ... return pd.read_excel(path) + >>> print(load_data_short_expiry()) + Creating DataFrame from file (short expiration)... + A B + 0 1 2 + + >>> import time + >>> time.sleep(0.001) # Sleep for 1 millisecond + + >>> print(load_data_short_expiry()) # Data expired, loading again from file + Creating DataFrame from file (short expiration)... + A B + 0 1 2 + + Clean up the test files: + >>> file_hash = hashlib.md5(test_path.encode()).hexdigest() + >>> os.remove(test_path) + >>> os.remove(f"cache_df_load_data_{file_hash}.feather") + >>> os.remove(f"cache_df_load_data_{file_hash}.pkl") + >>> os.remove(f"cache_df_load_data_short_expiry_{file_hash}.feather") + >>> os.remove(f"cache_df_load_data_short_expiry_{file_hash}.pkl") """ + if f is None: + return lambda f: cache_df(f, path=path, freq=freq) - def caching_f(path=PATH_SMHI, *args, **kwargs): - # Create a hash of the URL for the cache file - url_hash = hashlib.md5(path.encode()).hexdigest() - cache_file = f'cached_data_{url_hash}.feather' - columns_file = f'cached_data_columns_{url_hash}.pkl' + def caching_f(*args, **kwargs): + input_path = kwargs.get('path') or (args[0] if args else path) + + if not input_path: + raise ValueError("Path parameter is required either as a decorator argument or function argument.") + + # Create a hash of the path for the cache file + url_hash = hashlib.md5(input_path.encode()).hexdigest() + cache_file = f'cache_df_{f.__name__}_{url_hash}.feather' + columns_file = f'cache_df_{f.__name__}_{url_hash}.pkl' # Check if cached file and columns file exist and their timestamps if os.path.exists(cache_file): - cache_mtime = datetime.fromtimestamp(os.path.getmtime(cache_file)) - current_year = datetime.now().year - if cache_mtime.year == current_year: + stat = os.stat(cache_file) + cache_mtime = pd.Timestamp(stat.st_mtime_ns // 1_000_000, unit='ms') + if pd.Period(pd.Timestamp.now(), freq=freq) == pd.Period(cache_mtime, freq=freq): # Load cached data df = pd.read_feather(cache_file) # Load original column names if os.path.exists(columns_file): original_columns = pd.read_pickle(columns_file) df.columns = original_columns - print(f"Loaded cached data from {cache_file}") return df # Process and cache the data - df = f(path, *args, **kwargs) + df = f(*args, **kwargs) # Save the original column names separately original_columns = df.columns - df.to_feather(cache_file) + feather.write_feather(df, cache_file) pd.to_pickle(original_columns, columns_file) - print(f"Cached data to {cache_file}") return df - caching_f.__name__ = f.__name__ + '_cached' + caching_f.__name__ = 'cache_df_' + f.__name__ return caching_f -@cache_df -def get_smhi_data(path=PATH_SMHI) -> pd.DataFrame: +@cache_df(path=PATH_SMHI) +def get_smhi_data(path=PATH_SMHI): """ Downloads data from SMHI and loads it into a pandas dataframe. @@ -158,3 +214,9 @@ def get_n_prep_data_from_smhi(df): # df_merge = df_total.merge(df_sectors, on='Kommun', how='left') return df + + +if __name__ == "__main__": + import doctest + + doctest.testmod() \ No newline at end of file