Skip to content

Commit

Permalink
Make default excel file path and df cache period configurable Klimatb…
Browse files Browse the repository at this point in the history
…yran#682

Generalize the df cache decorator.
  • Loading branch information
joakimbits committed Sep 21, 2024
1 parent 53eeb3f commit 34ab729
Showing 1 changed file with 85 additions and 23 deletions.
108 changes: 85 additions & 23 deletions data/issues/emissions/historical_data_calculations.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,116 @@
import pandas as pd
import feather
import os
from datetime import datetime
import hashlib
import os

import pandas as pd
import pyarrow.feather as feather

PATH_SMHI = 'https://nationellaemissionsdatabasen.smhi.se/' + \
'api/getexcelfile/?county=0&municipality=0&sub=CO2'


def cache_df(f: type(pd.read_excel)) -> type(pd.read_excel):
def cache_df(f: callable = None, path: str = None, freq: str = '1Y'):
"""
Cache the df to an intermediate file and use it if made this year
Cache the DataFrame to an intermediate file and use it if created within the same period.
Args:
f: Function to cache
f: A function to cache (e.g., a function that loads a DataFrame).
path: Path to the file to be cached. If provided, it automatically supplies the path parameter to the decorated function.
freq: Cache period, e.g. '1D', '1M', '1Y'. Defaults to '1Y'.
Returns:
f with caching of result
A decorator that adds caching functionality to the intended function.
Example usage:
Create a test Excel file:
>>> df_test = pd.DataFrame({"A": [1], "B": [2]})
>>> test_path = "test_data.xlsx"
>>> df_test.to_excel(test_path, index=False)
Use the decorator to cache the DataFrame loaded from the file:
>>> @cache_df
... def load_data(path):
... print("Creating DataFrame from file (first call)...")
... return pd.read_excel(path)
>>> load_data.__name__
'cache_df_load_data'
>>> print(load_data(test_path))
Creating DataFrame from file (first call)...
A B
0 1 2
>>> print(load_data(test_path)) # Data loaded from cache, no print output
A B
0 1 2
Use the decorator with a short expiration time:
>>> @cache_df(path=test_path, freq='1ms')
... def load_data_short_expiry(path=test_path):
... print("Creating DataFrame from file (short expiration)...")
... return pd.read_excel(path)
>>> print(load_data_short_expiry())
Creating DataFrame from file (short expiration)...
A B
0 1 2
>>> import time
>>> time.sleep(0.001) # Sleep for 1 millisecond
>>> print(load_data_short_expiry()) # Data expired, loading again from file
Creating DataFrame from file (short expiration)...
A B
0 1 2
Clean up the test files:
>>> file_hash = hashlib.md5(test_path.encode()).hexdigest()
>>> os.remove(test_path)
>>> os.remove(f"cache_df_load_data_{file_hash}.feather")
>>> os.remove(f"cache_df_load_data_{file_hash}.pkl")
>>> os.remove(f"cache_df_load_data_short_expiry_{file_hash}.feather")
>>> os.remove(f"cache_df_load_data_short_expiry_{file_hash}.pkl")
"""
if f is None:
return lambda f: cache_df(f, path=path, freq=freq)

def caching_f(path=PATH_SMHI, *args, **kwargs):
# Create a hash of the URL for the cache file
url_hash = hashlib.md5(path.encode()).hexdigest()
cache_file = f'cached_data_{url_hash}.feather'
columns_file = f'cached_data_columns_{url_hash}.pkl'
def caching_f(*args, **kwargs):
input_path = kwargs.get('path') or (args[0] if args else path)

if not input_path:
raise ValueError("Path parameter is required either as a decorator argument or function argument.")

# Create a hash of the path for the cache file
url_hash = hashlib.md5(input_path.encode()).hexdigest()
cache_file = f'cache_df_{f.__name__}_{url_hash}.feather'
columns_file = f'cache_df_{f.__name__}_{url_hash}.pkl'

# Check if cached file and columns file exist and their timestamps
if os.path.exists(cache_file):
cache_mtime = datetime.fromtimestamp(os.path.getmtime(cache_file))
current_year = datetime.now().year
if cache_mtime.year == current_year:
stat = os.stat(cache_file)
cache_mtime = pd.Timestamp(stat.st_mtime_ns // 1_000_000, unit='ms')
if pd.Period(pd.Timestamp.now(), freq=freq) == pd.Period(cache_mtime, freq=freq):
# Load cached data
df = pd.read_feather(cache_file)
# Load original column names
if os.path.exists(columns_file):
original_columns = pd.read_pickle(columns_file)
df.columns = original_columns
print(f"Loaded cached data from {cache_file}")
return df

# Process and cache the data
df = f(path, *args, **kwargs)
df = f(*args, **kwargs)
# Save the original column names separately
original_columns = df.columns
df.to_feather(cache_file)
feather.write_feather(df, cache_file)
pd.to_pickle(original_columns, columns_file)

print(f"Cached data to {cache_file}")
return df

caching_f.__name__ = f.__name__ + '_cached'
caching_f.__name__ = 'cache_df_' + f.__name__
return caching_f


@cache_df
def get_smhi_data(path=PATH_SMHI) -> pd.DataFrame:
@cache_df(path=PATH_SMHI)
def get_smhi_data(path=PATH_SMHI):
"""
Downloads data from SMHI and loads it into a pandas dataframe.
Expand Down Expand Up @@ -158,3 +214,9 @@ def get_n_prep_data_from_smhi(df):
# df_merge = df_total.merge(df_sectors, on='Kommun', how='left')

return df


if __name__ == "__main__":
import doctest

doctest.testmod()

0 comments on commit 34ab729

Please sign in to comment.