Skip to content

Commit

Permalink
Refactor process_wildcards and add support for TFM_MIG (#166)
Browse files Browse the repository at this point in the history
Fixes #83 

TFM_UPD adds rows to table instead of inplace updating
  • Loading branch information
siddharth-krishna authored Jan 31, 2024
1 parent dbddd67 commit 6a9aa98
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 134 deletions.
2 changes: 1 addition & 1 deletion xl2times/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def convert_xl_to_times(
transforms.generate_dummy_processes,
transforms.process_time_slices,
transforms.process_transform_insert_variants,
transforms.process_transform_insert,
transforms.process_transform_tables,
transforms.process_processes,
transforms.process_topology,
transforms.process_flexible_import_tables, # slow
Expand Down
282 changes: 149 additions & 133 deletions xl2times/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,6 @@ def process_units(
tables: Dict[str, DataFrame],
model: datatypes.TimesModel,
) -> Dict[str, DataFrame]:

units_map = {
"activity": model.processes["tact"].unique(),
"capacity": model.processes["tcap"].unique(),
Expand All @@ -764,7 +763,6 @@ def process_time_periods(
tables: List[datatypes.EmbeddedXlTable],
model: datatypes.TimesModel,
) -> List[datatypes.EmbeddedXlTable]:

model.start_year = utils.get_scalar(datatypes.Tag.start_year, tables)
active_pdef = utils.get_scalar(datatypes.Tag.active_p_def, tables)
df = utils.single_table(tables, datatypes.Tag.time_periods).dataframe.copy()
Expand Down Expand Up @@ -819,20 +817,19 @@ def complete_dictionary(
tables: Dict[str, DataFrame],
model: datatypes.TimesModel,
) -> Dict[str, DataFrame]:

for k, v in {
"AllRegions": model.all_regions,
"Regions": model.internal_regions,
"DataYears": model.data_years,
"PastYears": model.past_years,
"ModelYears": model.model_years,
}.items():
for k, v in [
("AllRegions", model.all_regions),
("Regions", model.internal_regions),
("DataYears", model.data_years),
("PastYears", model.past_years),
("ModelYears", model.model_years),
]:
if "region" in k.lower():
column_list = ["region"]
else:
column_list = ["year"]

tables[k] = pd.DataFrame(v, columns=column_list)
tables[k] = pd.DataFrame(sorted(v), columns=column_list)

# Dataframes
for k, v in {
Expand Down Expand Up @@ -1565,8 +1562,7 @@ def is_year(col_name):
return result


# TODO: should we rename this to something more general, since it takes care of more than tfm_ins?
def process_transform_insert(
def process_transform_tables(
config: datatypes.Config,
tables: List[datatypes.EmbeddedXlTable],
model: datatypes.TimesModel,
Expand All @@ -1578,6 +1574,7 @@ def process_transform_insert(
datatypes.Tag.tfm_dins,
datatypes.Tag.tfm_topins,
datatypes.Tag.tfm_upd,
datatypes.Tag.tfm_mig,
datatypes.Tag.tfm_comgrp,
]

Expand All @@ -1591,12 +1588,18 @@ def process_transform_insert(
datatypes.Tag.tfm_ins,
datatypes.Tag.tfm_ins_txt,
datatypes.Tag.tfm_upd,
datatypes.Tag.tfm_mig,
datatypes.Tag.tfm_comgrp,
]:
df = table.dataframe.copy()

# Standardize column names
known_columns = config.known_columns[datatypes.Tag.tfm_ins] | query_columns
if table.tag == datatypes.Tag.tfm_mig:
# Also allow attribute2, year2 etc for TFM_MIG tables
known_columns.update(
(c + "2" for c in config.known_columns[datatypes.Tag.tfm_ins])
)

# Handle Regions:
if set(df.columns).isdisjoint(
Expand Down Expand Up @@ -1662,7 +1665,7 @@ def process_transform_insert(
]
for key, group in by_tag:
print(
f"WARNING: Dropped {len(group)} transform insert tables ({key})"
f"WARNING: Dropped {len(group)} transform tables ({key})"
f" rather than processing them"
)

Expand Down Expand Up @@ -1860,126 +1863,139 @@ def process_wildcards(
tables: Dict[str, DataFrame],
model: datatypes.TimesModel,
) -> Dict[str, DataFrame]:
dictionary = generate_topology_dictionary(tables, model)

# TODO separate this code into expading wildcards and updating/inserting data!
for tag in [
datatypes.Tag.tfm_upd,
datatypes.Tag.tfm_ins,
datatypes.Tag.tfm_ins_txt,
]:
if tag in tables:
start_time = time.time()
upd = tables[tag]
new_rows = []
# reset index to make sure there are no duplicates
tables[datatypes.Tag.fi_t].reset_index(drop=True, inplace=True)
if tag == datatypes.Tag.tfm_upd:
# copy old index to new column 'index'
tables[datatypes.Tag.fi_t].reset_index(inplace=True)
for i in range(0, len(upd)):
row = upd.iloc[i]
debug = False
if debug:
print(row)
matching_processes = get_matching_processes(row, dictionary)
if matching_processes is not None and len(matching_processes) == 0:
print(f"WARNING: {tag} row matched no processes")
continue
matching_commodities = get_matching_commodities(row, dictionary)
if matching_commodities is not None and len(matching_commodities) == 0:
print(f"WARNING: {tag} row matched no commodities")
continue
df = tables[datatypes.Tag.fi_t]
if any(df.index.duplicated()):
raise ValueError("~FI_T table has duplicated indices")
if tag == datatypes.Tag.tfm_upd:
# construct query into ~FI_T to get indices of matching rows
if matching_processes is not None:
df = df.merge(matching_processes, on="process")
if debug:
print(f"{len(df)} rows after processes")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
if matching_commodities is not None:
df = df.merge(matching_commodities)
if debug:
print(f"{len(df)} rows after commodities")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
attribute = row.attribute
if attribute is not None:
df = df.query("attribute == @attribute")
if debug:
print(f"{len(df)} rows after Attribute")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
region = row.region
if region is not None:
df = df.query("region == @region")
if debug:
print(f"{len(df)} rows after Region")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
# so that we can update the original table, copy original index back that was lost when merging
df = df.set_index("index")
# for speed, extract just the VALUE column as that is the only one being updated
df = df[["value"]]
if debug:
if any(df.index.duplicated()):
raise ValueError("~FI_T table has duplicated indices")
if isinstance(row.value, str) and row.value[0] in {
"*",
"+",
"-",
"/",
}:
df = df.astype({"value": float}).eval("value=value" + row.value)
else:
df["value"] = [row.value] * len(df)
if len(df) == 0:
print(f"WARNING: {tag} row matched nothing")
tables[datatypes.Tag.fi_t].update(df)
elif tag == datatypes.Tag.tfm_ins_txt:
# This row matches either a commodity or a process
assert not (
matching_commodities is not None
and matching_processes is not None
)
if matching_commodities is not None:
df = model.commodities
query_str = f"commodity in [{','.join(map(repr, matching_commodities['commodity']))}] and region == '{row['region']}'"
elif matching_processes is not None:
df = model.processes
query_str = f"process in [{','.join(map(repr, matching_processes['process']))}] and region == '{row['region']}'"
else:
print(
f"WARNING: {tag} row matched neither commodity nor process"
)
continue
# Query for rows with matching process/commodity and region
rows_to_update = df.query(query_str).index
# Overwrite (inplace) the column given by the attribute (translated by attr_prop)
# with the value from row
# E.g. if row['attribute'] == 'PRC_TSL' then we overwrite 'tslvl'
df.loc[rows_to_update, attr_prop[row["attribute"]]] = row["value"]
else:
# Construct 1-row data frame for data
# Cross merge with processes and commodities (if they exist)
row = row.filter(df.columns)
row = pd.DataFrame([row])
if matching_processes is not None:
row = matching_processes.merge(row, how="cross")
if matching_commodities is not None:
row = matching_commodities.merge(row, how="cross")
new_rows.append(row)
if tag == datatypes.Tag.tfm_ins:
new_rows.append(df) # pyright: ignore
tables[datatypes.Tag.fi_t] = pd.concat(new_rows, ignore_index=True)

print(
f" process_wildcards: {tag} took {time.time() - start_time:.2f} seconds for {len(upd)} rows"
topology = generate_topology_dictionary(tables, model)

# TODO add type annots to below fns

def match_wildcards(
row: pd.Series,
) -> tuple[DataFrame | None, DataFrame | None] | None:
matching_processes = get_matching_processes(row, topology)
matching_commodities = get_matching_commodities(row, topology)
if (matching_processes is None or len(matching_processes) == 0) and (
matching_commodities is None or len(matching_commodities) == 0
): # TODO is this necessary? Try without?
# TODO debug these
print(f"WARNING: a row matched no processes or commodities:\n{row}")
return None
return matching_processes, matching_commodities

def query(table, processes, commodities, attribute, region):
qs = []
if processes is not None and not processes.empty:
qs.append(f"process in [{','.join(map(repr, processes['process']))}]")
if commodities is not None and not commodities.empty:
qs.append(f"commodity in [{','.join(map(repr, commodities['commodity']))}]")
if attribute is not None:
qs.append(f"attribute == '{attribute}'")
if region is not None:
qs.append(f"region == '{region}'")
return table.query(" and ".join(qs)).index

def eval_and_update(table, rows_to_update, new_value):
if isinstance(new_value, str) and new_value[0] in {"*", "+", "-", "/"}:
old_values = table.loc[rows_to_update, "value"]
updated = old_values.astype(float).map(lambda x: eval("x" + new_value))
table.loc[rows_to_update, "value"] = updated
else:
table.loc[rows_to_update, "value"] = new_value

def do_an_ins_row(row):
table = tables[datatypes.Tag.fi_t]
match = match_wildcards(row)
# TODO perf: add matched procs/comms into column and use explode?
new_rows = pd.DataFrame([row.filter(table.columns)])
if match is not None:
processes, commodities = match
if processes is not None:
new_rows = processes.merge(new_rows, how="cross")
if commodities is not None:
new_rows = commodities.merge(new_rows, how="cross")
return new_rows

def do_an_ins_txt_row(row):
match = match_wildcards(row)
if match is None:
print(f"WARNING: TFM_INS-TXT row matched neither commodity nor process")
return
processes, commodities = match
if commodities is not None:
table = model.commodities
elif processes is not None:
table = model.processes
else:
assert False # All rows match either a commodity or a process

# Query for rows with matching process/commodity and region
rows_to_update = query(table, processes, commodities, None, row["region"])
# Overwrite (inplace) the column given by the attribute (translated by attr_prop)
# with the value from row
# E.g. if row['attribute'] == 'PRC_TSL' then we overwrite 'tslvl'
table.loc[rows_to_update, attr_prop[row["attribute"]]] = row["value"]
# return rows_to_update

if datatypes.Tag.tfm_upd in tables:
updates = tables[datatypes.Tag.tfm_upd]
table = tables[datatypes.Tag.fi_t]
new_tables = [table]
# Reset FI_T index so that queries can determine unique rows to update
tables[datatypes.Tag.fi_t].reset_index(inplace=True)

# TODO perf: collect all updates and go through FI_T only once?
for _, row in updates.iterrows():
if row["value"] is None: # TODO is this really needed?
continue
match = match_wildcards(row)
if match is None:
continue
processes, commodities = match
rows_to_update = query(
table, processes, commodities, row["attribute"], row["region"]
)
new_rows = table.loc[rows_to_update].copy()
eval_and_update(new_rows, rows_to_update, row["value"])
new_tables.append(new_rows)

# Add new rows to table
tables[datatypes.Tag.fi_t] = pd.concat(new_tables, ignore_index=True)

if datatypes.Tag.tfm_ins in tables:
updates = tables[datatypes.Tag.tfm_ins]
new_rows = []
for _, row in updates.iterrows():
new_rows.append(do_an_ins_row(row))
new_rows.append(tables[datatypes.Tag.fi_t])
tables[datatypes.Tag.fi_t] = pd.concat(new_rows, ignore_index=True)

if datatypes.Tag.tfm_ins_txt in tables:
updates = tables[datatypes.Tag.tfm_ins_txt]
for _, row in updates.iterrows():
do_an_ins_txt_row(row)

if datatypes.Tag.tfm_mig in tables:
updates = tables[datatypes.Tag.tfm_mig]
table = tables[datatypes.Tag.fi_t]
new_tables = []

for _, row in updates.iterrows():
match = match_wildcards(row)
processes, commodities = match if match is not None else (None, None)
# TODO should we also query on limtype?
rows_to_update = query(
table, processes, commodities, row["attribute"], row["region"]
)
new_rows = table.loc[rows_to_update].copy()
# Modify values in all '*2' columns
for c, v in row.items():
if c.endswith("2") and v is not None:
new_rows.loc[:, c[:-1]] = v
# Evaluate 'value' column based on existing values
eval_and_update(new_rows, rows_to_update, row["value"])
new_tables.append(new_rows)

# Add new rows to table
new_tables.append(tables[datatypes.Tag.fi_t])
tables[datatypes.Tag.fi_t] = pd.concat(new_tables, ignore_index=True)

return tables

Expand Down

0 comments on commit 6a9aa98

Please sign in to comment.