Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Feature/process wildcard speedup #210

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ docs/api/
/out.txt
*.log
/profile.*
.cache/
52 changes: 50 additions & 2 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"display.max_columns",
20,
"display.width",
300,
150,
"display.max_colwidth",
75,
"display.precision",
Expand Down Expand Up @@ -69,6 +69,53 @@ def make_str(df):


class TestTransforms:
def test_process_wildcards(self):

with open("tests/data/process_wildcards_test_data.pkl", "rb") as f:
table = pd.read_pickle(f)
with open("tests/data/process_wildcards_test_model.pkl", "rb") as f:
model = pd.read_pickle(f)
t0 = datetime.now()
result = transforms.process_wildcards(None, table, model) # pyright: ignore
logger.info(f"process_wildcards() took {datetime.now() - t0} seconds")

def test_merge_duplicate_columns(self):
"""
Tests that this:
```
df
a a a b b c
0 NaN 4.0 7.0 1 4.0 1
1 2.0 NaN 8.0 2 5.0 2
2 3.0 6.0 NaN 3 NaN 3
```

Gets transformed into this:
```
transforms._merge_duplicate_named_columns(df.copy())
df2
a b c
0 7.0 4.0 1
1 8.0 5.0 2
2 6.0 3.0 3
```
"""
df = pd.DataFrame(
{
"a": [None, 2, 3],
"a2": [4, None, 6],
"a3": [7, 8, None],
"b": [1, 2, 3],
"b2": [4, 5, None],
"c": [1, 2, 3],
}
).rename(columns={"a2": "a", "a3": "a", "b2": "b"})
df2 = transforms._merge_duplicate_named_columns(df.copy())
assert df2.columns.tolist() == ["a", "b", "c"]
assert df2["a"].tolist() == [7, 8, 6]
assert df2["b"].tolist() == [4, 5, 3]
assert df2["c"].tolist() == [1, 2, 3]

def test_uc_wildcards(self):
"""
Tests logic that matches wildcards in the process_uc_wildcards transform .
Expand Down Expand Up @@ -165,4 +212,5 @@ def test_default_pcg_vectorised(self):

if __name__ == "__main__":
# TestTransforms().test_default_pcg_vectorised()
TestTransforms().test_uc_wildcards()
# TestTransforms().test_uc_wildcards()
TestTransforms().test_process_wildcards()
13 changes: 13 additions & 0 deletions utils/run_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@

logger = utils.get_logger()

pd.set_option(
"display.max_rows",
20,
"display.max_columns",
20,
"display.width",
150,
"display.max_colwidth",
75,
"display.precision",
3,
)


def parse_result(output: str) -> Tuple[float, int, int]:
# find pattern in multiline string
Expand Down
133 changes: 106 additions & 27 deletions xl2times/transforms.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pickle
import re
import time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
from dataclasses import replace
from datetime import datetime
from functools import reduce
from itertools import groupby
from pathlib import Path
Expand Down Expand Up @@ -205,23 +207,52 @@ def discard(table):

result = []
for table in tables:

if not datatypes.Tag.has_tag(table.tag.split(":")[0]):
logger.warning(f"Dropping table with unrecognized tag {table.tag}")
continue

if discard(table):
continue

# Check for duplicate columns:
seen = set()
dupes = [x for x in table.dataframe.columns if x in seen or seen.add(x)]
df = table.dataframe
dupes = df.columns[table.dataframe.columns.duplicated()]

if len(dupes) > 0:
logger.warning(
f"Duplicate columns in {table.range}, {table.sheetname},"
f" {table.filename}: {','.join(dupes)}"
f"Merging duplicate columns in {table.range}, {table.sheetname},"
f" {table.filename}: {dupes.to_list()}"
)
table.dataframe = _merge_duplicate_named_columns(df)

result.append(table)
return result


def _merge_duplicate_named_columns(df_in: DataFrame) -> DataFrame:
"""Merges values in duplicate columns into a single column.
This is implemented as a foward-fill of missing values in the left-to-right direction, to match VEDA's behaviour.
So any missing values in the right-most of each set of duplicate-named columns are filled with the first non-missing value to the left.

Parameters
df_in : DataFrame to be processed (not modified)
Returns
DataFrame with duplicate columns merged
"""
if not df_in.columns.duplicated().any():
return df_in

df = df_in.copy()
dupes = pd.unique(df.columns[df.columns.duplicated(keep="first")])
for dup_col in dupes:
df[dup_col] = df[dup_col].ffill(axis=1)

# only keep the right-most duplicate column from each duplicate set
df = df.iloc[:, ~df.columns.duplicated(keep="last")]
return df


def normalize_tags_columns(
config: datatypes.Config,
tables: List[datatypes.EmbeddedXlTable],
Expand Down Expand Up @@ -588,7 +619,8 @@ def process_user_constraint_table(
# TODO: apply table.uc_sets

# Fill in UC_N blank cells with value from above
df["uc_n"] = df["uc_n"].ffill()
if "uc_n" in df.columns:
df["uc_n"] = df["uc_n"].ffill()

data_columns = [
x for x in df.columns if x not in config.known_columns[datatypes.Tag.uc_t]
Expand Down Expand Up @@ -2157,24 +2189,49 @@ def process_uc_wildcards(

def _match_uc_wildcards(
df: pd.DataFrame,
process_map: dict[str, str],
col_name_map: dict[str, str],
dictionary: dict[str, pd.DataFrame],
matcher: Callable,
result_col: str,
explode: bool = True,
) -> pd.DataFrame:
"""
Match wildcards in the given table using the given process map and dictionary.

Args:
df: Table to match wildcards in.
process_map: Mapping of column names to process sets.
col_name_map: Mapping of column names to process sets.
dictionary: Dictionary of process sets to match against.
matcher: Matching function to use, e.g. get_matching_processes or get_matching_commodities.
result_col: Name of the column to store the matched results in.

Returns:
The table with the wildcard columns removed and the results of the wildcard matches added as a column named `results_col`
"""
proc_cols = list(col_name_map.keys())
filter_matches = _wildcard_match(df, col_name_map, dictionary, matcher, result_col)

# Finally we merge the matches back into the original table. This join re-duplicates the duplicate filters dropped above for speed.
# And we explode any matches to multiple names to give a long-format table.
if result_col in df.columns:
df = df.drop(columns=[result_col])
df = df.merge(filter_matches, left_on=proc_cols, right_on=proc_cols, how="left")
if explode:
df = df.explode(result_col).reset_index(drop=True).drop(columns=proc_cols)

# replace NaNs in results_col with None for consistency with older logic
df[result_col] = df[result_col].where(df[result_col].notna(), None)

return df


def _wildcard_match(
df: pd.DataFrame,
process_map: dict[str, str],
dictionary: dict[str, pd.DataFrame],
matcher: Callable,
result_col: str,
):
proc_cols = list(process_map.keys())

# drop duplicate sets of wildcard columns to save repeated (slow) regex matching. This makes things much faster.
Expand All @@ -2199,20 +2256,7 @@ def _match_uc_wildcards(
filter_matches = unique_filters.reset_index(drop=True).merge(
matches, left_index=True, right_index=True
)

# Finally we merge the matches back into the original table. This join re-duplicates the duplicate filters dropped above for speed.
# And we explode any matches to multiple names to give a long-format table.
df = (
df.merge(filter_matches, left_on=proc_cols, right_on=proc_cols, how="left")
.explode(result_col)
.reset_index(drop=True)
.drop(columns=proc_cols)
)

# replace NaNs in results_col with None for consistency with older logic
df[result_col] = df[result_col].where(df[result_col].notna(), None)

return df
return filter_matches


def process_wildcards(
Expand All @@ -2223,7 +2267,6 @@ def process_wildcards(
"""
Process wildcards specified in TFM tables.
"""

topology = generate_topology_dictionary(tables, model)

def match_wildcards(
Expand Down Expand Up @@ -2280,23 +2323,53 @@ def eval_and_update(
# Reset FI_T index so that queries can determine unique rows to update
tables[datatypes.Tag.fi_t].reset_index(inplace=True)

# pre-build lookup tables of matching process and commodity wildcards to actual processes and commodities, for faster lookup below
# proc_matches = _wildcard_match(updates.copy(), process_map, topology, get_matching_processes, "process").fillna('').set_index(list(process_map.keys()))
# comm_matches = _wildcard_match(updates.copy(), commodity_map, topology, get_matching_commodities, "commodity").fillna('').set_index(list(commodity_map.keys()))

updates = _match_uc_wildcards(
updates,
process_map,
topology,
get_matching_processes,
"process",
explode=False,
)
updates = _match_uc_wildcards(
updates,
commodity_map,
topology,
get_matching_commodities,
"commodity",
explode=False,
)

# TFM_UPD: expand wildcards in each row, query FI_T to find matching rows,
# evaluate the update formula, and add new rows to FI_T
# TODO perf: collect all updates and go through FI_T only once?
for _, row in tqdm(
updates.iterrows(),
updates.copy().iterrows(),
total=len(updates),
desc=f"Processing wildcard for {datatypes.Tag.tfm_upd}",
):
if row["value"] is None: # TODO is this really needed?
continue
match = match_wildcards(row)
if match is None:
continue
processes, commodities = match
# match = match_wildcards(row)
# if match is None:
# continue
# processes, commodities = match

processes = pd.DataFrame(
pd.Series(row["process"]), columns=["process"]
).dropna()
commodities = pd.DataFrame(
pd.Series(row["commodity"]), columns=["commodity"]
).dropna()

rows_to_update = query(
table, processes, commodities, row["attribute"], row["region"]
)

new_rows = table.loc[rows_to_update].copy()
eval_and_update(new_rows, rows_to_update, row["value"])
new_tables.append(new_rows)
Expand All @@ -2309,13 +2382,19 @@ def eval_and_update(
table = tables[datatypes.Tag.fi_t]
new_tables = []

# updates = _match_uc_wildcards(updates, process_map, topology, get_matching_processes, "process", explode=False)
# updates = _match_uc_wildcards(updates, commodity_map, topology, get_matching_commodities, "commodity", explode=False)

# TFM_INS: expand each row by wildcards, then add to FI_T
for _, row in tqdm(
updates.iterrows(),
total=len(updates),
desc=f"Processing wildcard for {datatypes.Tag.tfm_ins}",
):
match = match_wildcards(row)
# processes = pd.DataFrame(row["process"], columns=["process"])
# commodities = pd.DataFrame(row["commodity"], columns=["commodity"])

# TODO perf: add matched procs/comms into column and use explode?
new_rows = pd.DataFrame([row.filter(table.columns)])
if match is not None:
Expand Down
6 changes: 6 additions & 0 deletions xl2times/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def explode(df, data_columns):
:return: Tuple with the exploded dataframe and a Series of the original
column name for each value in each new row.
"""
if df.columns.duplicated().any():
raise ValueError(
f"Dataframe has duplicated columns: {df.columns[df.columns.duplicated()]}"
)

data = df[data_columns].values.tolist()
other_columns = [
colname for colname in df.columns.values if colname not in data_columns
Expand All @@ -75,6 +80,7 @@ def explode(df, data_columns):
index = df[value_column].notna()
df = df[index]
names = names[index]

return df, names


Expand Down
Loading