diff --git a/xl2times/__main__.py b/xl2times/__main__.py index 64e55c79..bef22721 100644 --- a/xl2times/__main__.py +++ b/xl2times/__main__.py @@ -70,7 +70,7 @@ def convert_xl_to_times( transforms.generate_dummy_processes, transforms.process_time_slices, transforms.process_transform_insert_variants, - transforms.process_transform_insert, + transforms.process_transform_tables, transforms.process_processes, transforms.process_topology, transforms.process_flexible_import_tables, # slow diff --git a/xl2times/transforms.py b/xl2times/transforms.py index 0ebe02f1..cacb1100 100644 --- a/xl2times/transforms.py +++ b/xl2times/transforms.py @@ -744,7 +744,6 @@ def process_units( tables: Dict[str, DataFrame], model: datatypes.TimesModel, ) -> Dict[str, DataFrame]: - units_map = { "activity": model.processes["tact"].unique(), "capacity": model.processes["tcap"].unique(), @@ -764,7 +763,6 @@ def process_time_periods( tables: List[datatypes.EmbeddedXlTable], model: datatypes.TimesModel, ) -> List[datatypes.EmbeddedXlTable]: - model.start_year = utils.get_scalar(datatypes.Tag.start_year, tables) active_pdef = utils.get_scalar(datatypes.Tag.active_p_def, tables) df = utils.single_table(tables, datatypes.Tag.time_periods).dataframe.copy() @@ -819,20 +817,19 @@ def complete_dictionary( tables: Dict[str, DataFrame], model: datatypes.TimesModel, ) -> Dict[str, DataFrame]: - - for k, v in { - "AllRegions": model.all_regions, - "Regions": model.internal_regions, - "DataYears": model.data_years, - "PastYears": model.past_years, - "ModelYears": model.model_years, - }.items(): + for k, v in [ + ("AllRegions", model.all_regions), + ("Regions", model.internal_regions), + ("DataYears", model.data_years), + ("PastYears", model.past_years), + ("ModelYears", model.model_years), + ]: if "region" in k.lower(): column_list = ["region"] else: column_list = ["year"] - tables[k] = pd.DataFrame(v, columns=column_list) + tables[k] = pd.DataFrame(sorted(v), columns=column_list) # Dataframes for k, v in { @@ -1565,8 +1562,7 @@ def is_year(col_name): return result -# TODO: should we rename this to something more general, since it takes care of more than tfm_ins? -def process_transform_insert( +def process_transform_tables( config: datatypes.Config, tables: List[datatypes.EmbeddedXlTable], model: datatypes.TimesModel, @@ -1578,6 +1574,7 @@ def process_transform_insert( datatypes.Tag.tfm_dins, datatypes.Tag.tfm_topins, datatypes.Tag.tfm_upd, + datatypes.Tag.tfm_mig, datatypes.Tag.tfm_comgrp, ] @@ -1591,12 +1588,18 @@ def process_transform_insert( datatypes.Tag.tfm_ins, datatypes.Tag.tfm_ins_txt, datatypes.Tag.tfm_upd, + datatypes.Tag.tfm_mig, datatypes.Tag.tfm_comgrp, ]: df = table.dataframe.copy() # Standardize column names known_columns = config.known_columns[datatypes.Tag.tfm_ins] | query_columns + if table.tag == datatypes.Tag.tfm_mig: + # Also allow attribute2, year2 etc for TFM_MIG tables + known_columns.update( + (c + "2" for c in config.known_columns[datatypes.Tag.tfm_ins]) + ) # Handle Regions: if set(df.columns).isdisjoint( @@ -1662,7 +1665,7 @@ def process_transform_insert( ] for key, group in by_tag: print( - f"WARNING: Dropped {len(group)} transform insert tables ({key})" + f"WARNING: Dropped {len(group)} transform tables ({key})" f" rather than processing them" ) @@ -1860,126 +1863,139 @@ def process_wildcards( tables: Dict[str, DataFrame], model: datatypes.TimesModel, ) -> Dict[str, DataFrame]: - dictionary = generate_topology_dictionary(tables, model) - - # TODO separate this code into expading wildcards and updating/inserting data! - for tag in [ - datatypes.Tag.tfm_upd, - datatypes.Tag.tfm_ins, - datatypes.Tag.tfm_ins_txt, - ]: - if tag in tables: - start_time = time.time() - upd = tables[tag] - new_rows = [] - # reset index to make sure there are no duplicates - tables[datatypes.Tag.fi_t].reset_index(drop=True, inplace=True) - if tag == datatypes.Tag.tfm_upd: - # copy old index to new column 'index' - tables[datatypes.Tag.fi_t].reset_index(inplace=True) - for i in range(0, len(upd)): - row = upd.iloc[i] - debug = False - if debug: - print(row) - matching_processes = get_matching_processes(row, dictionary) - if matching_processes is not None and len(matching_processes) == 0: - print(f"WARNING: {tag} row matched no processes") - continue - matching_commodities = get_matching_commodities(row, dictionary) - if matching_commodities is not None and len(matching_commodities) == 0: - print(f"WARNING: {tag} row matched no commodities") - continue - df = tables[datatypes.Tag.fi_t] - if any(df.index.duplicated()): - raise ValueError("~FI_T table has duplicated indices") - if tag == datatypes.Tag.tfm_upd: - # construct query into ~FI_T to get indices of matching rows - if matching_processes is not None: - df = df.merge(matching_processes, on="process") - if debug: - print(f"{len(df)} rows after processes") - if any(df["index"].duplicated()): - raise ValueError("~FI_T table has duplicated indices") - if matching_commodities is not None: - df = df.merge(matching_commodities) - if debug: - print(f"{len(df)} rows after commodities") - if any(df["index"].duplicated()): - raise ValueError("~FI_T table has duplicated indices") - attribute = row.attribute - if attribute is not None: - df = df.query("attribute == @attribute") - if debug: - print(f"{len(df)} rows after Attribute") - if any(df["index"].duplicated()): - raise ValueError("~FI_T table has duplicated indices") - region = row.region - if region is not None: - df = df.query("region == @region") - if debug: - print(f"{len(df)} rows after Region") - if any(df["index"].duplicated()): - raise ValueError("~FI_T table has duplicated indices") - # so that we can update the original table, copy original index back that was lost when merging - df = df.set_index("index") - # for speed, extract just the VALUE column as that is the only one being updated - df = df[["value"]] - if debug: - if any(df.index.duplicated()): - raise ValueError("~FI_T table has duplicated indices") - if isinstance(row.value, str) and row.value[0] in { - "*", - "+", - "-", - "/", - }: - df = df.astype({"value": float}).eval("value=value" + row.value) - else: - df["value"] = [row.value] * len(df) - if len(df) == 0: - print(f"WARNING: {tag} row matched nothing") - tables[datatypes.Tag.fi_t].update(df) - elif tag == datatypes.Tag.tfm_ins_txt: - # This row matches either a commodity or a process - assert not ( - matching_commodities is not None - and matching_processes is not None - ) - if matching_commodities is not None: - df = model.commodities - query_str = f"commodity in [{','.join(map(repr, matching_commodities['commodity']))}] and region == '{row['region']}'" - elif matching_processes is not None: - df = model.processes - query_str = f"process in [{','.join(map(repr, matching_processes['process']))}] and region == '{row['region']}'" - else: - print( - f"WARNING: {tag} row matched neither commodity nor process" - ) - continue - # Query for rows with matching process/commodity and region - rows_to_update = df.query(query_str).index - # Overwrite (inplace) the column given by the attribute (translated by attr_prop) - # with the value from row - # E.g. if row['attribute'] == 'PRC_TSL' then we overwrite 'tslvl' - df.loc[rows_to_update, attr_prop[row["attribute"]]] = row["value"] - else: - # Construct 1-row data frame for data - # Cross merge with processes and commodities (if they exist) - row = row.filter(df.columns) - row = pd.DataFrame([row]) - if matching_processes is not None: - row = matching_processes.merge(row, how="cross") - if matching_commodities is not None: - row = matching_commodities.merge(row, how="cross") - new_rows.append(row) - if tag == datatypes.Tag.tfm_ins: - new_rows.append(df) # pyright: ignore - tables[datatypes.Tag.fi_t] = pd.concat(new_rows, ignore_index=True) - - print( - f" process_wildcards: {tag} took {time.time() - start_time:.2f} seconds for {len(upd)} rows" + topology = generate_topology_dictionary(tables, model) + + # TODO add type annots to below fns + + def match_wildcards( + row: pd.Series, + ) -> tuple[DataFrame | None, DataFrame | None] | None: + matching_processes = get_matching_processes(row, topology) + matching_commodities = get_matching_commodities(row, topology) + if (matching_processes is None or len(matching_processes) == 0) and ( + matching_commodities is None or len(matching_commodities) == 0 + ): # TODO is this necessary? Try without? + # TODO debug these + print(f"WARNING: a row matched no processes or commodities:\n{row}") + return None + return matching_processes, matching_commodities + + def query(table, processes, commodities, attribute, region): + qs = [] + if processes is not None and not processes.empty: + qs.append(f"process in [{','.join(map(repr, processes['process']))}]") + if commodities is not None and not commodities.empty: + qs.append(f"commodity in [{','.join(map(repr, commodities['commodity']))}]") + if attribute is not None: + qs.append(f"attribute == '{attribute}'") + if region is not None: + qs.append(f"region == '{region}'") + return table.query(" and ".join(qs)).index + + def eval_and_update(table, rows_to_update, new_value): + if isinstance(new_value, str) and new_value[0] in {"*", "+", "-", "/"}: + old_values = table.loc[rows_to_update, "value"] + updated = old_values.astype(float).map(lambda x: eval("x" + new_value)) + table.loc[rows_to_update, "value"] = updated + else: + table.loc[rows_to_update, "value"] = new_value + + def do_an_ins_row(row): + table = tables[datatypes.Tag.fi_t] + match = match_wildcards(row) + # TODO perf: add matched procs/comms into column and use explode? + new_rows = pd.DataFrame([row.filter(table.columns)]) + if match is not None: + processes, commodities = match + if processes is not None: + new_rows = processes.merge(new_rows, how="cross") + if commodities is not None: + new_rows = commodities.merge(new_rows, how="cross") + return new_rows + + def do_an_ins_txt_row(row): + match = match_wildcards(row) + if match is None: + print(f"WARNING: TFM_INS-TXT row matched neither commodity nor process") + return + processes, commodities = match + if commodities is not None: + table = model.commodities + elif processes is not None: + table = model.processes + else: + assert False # All rows match either a commodity or a process + + # Query for rows with matching process/commodity and region + rows_to_update = query(table, processes, commodities, None, row["region"]) + # Overwrite (inplace) the column given by the attribute (translated by attr_prop) + # with the value from row + # E.g. if row['attribute'] == 'PRC_TSL' then we overwrite 'tslvl' + table.loc[rows_to_update, attr_prop[row["attribute"]]] = row["value"] + # return rows_to_update + + if datatypes.Tag.tfm_upd in tables: + updates = tables[datatypes.Tag.tfm_upd] + table = tables[datatypes.Tag.fi_t] + new_tables = [table] + # Reset FI_T index so that queries can determine unique rows to update + tables[datatypes.Tag.fi_t].reset_index(inplace=True) + + # TODO perf: collect all updates and go through FI_T only once? + for _, row in updates.iterrows(): + if row["value"] is None: # TODO is this really needed? + continue + match = match_wildcards(row) + if match is None: + continue + processes, commodities = match + rows_to_update = query( + table, processes, commodities, row["attribute"], row["region"] + ) + new_rows = table.loc[rows_to_update].copy() + eval_and_update(new_rows, rows_to_update, row["value"]) + new_tables.append(new_rows) + + # Add new rows to table + tables[datatypes.Tag.fi_t] = pd.concat(new_tables, ignore_index=True) + + if datatypes.Tag.tfm_ins in tables: + updates = tables[datatypes.Tag.tfm_ins] + new_rows = [] + for _, row in updates.iterrows(): + new_rows.append(do_an_ins_row(row)) + new_rows.append(tables[datatypes.Tag.fi_t]) + tables[datatypes.Tag.fi_t] = pd.concat(new_rows, ignore_index=True) + + if datatypes.Tag.tfm_ins_txt in tables: + updates = tables[datatypes.Tag.tfm_ins_txt] + for _, row in updates.iterrows(): + do_an_ins_txt_row(row) + + if datatypes.Tag.tfm_mig in tables: + updates = tables[datatypes.Tag.tfm_mig] + table = tables[datatypes.Tag.fi_t] + new_tables = [] + + for _, row in updates.iterrows(): + match = match_wildcards(row) + processes, commodities = match if match is not None else (None, None) + # TODO should we also query on limtype? + rows_to_update = query( + table, processes, commodities, row["attribute"], row["region"] ) + new_rows = table.loc[rows_to_update].copy() + # Modify values in all '*2' columns + for c, v in row.items(): + if c.endswith("2") and v is not None: + new_rows.loc[:, c[:-1]] = v + # Evaluate 'value' column based on existing values + eval_and_update(new_rows, rows_to_update, row["value"]) + new_tables.append(new_rows) + + # Add new rows to table + new_tables.append(tables[datatypes.Tag.fi_t]) + tables[datatypes.Tag.fi_t] = pd.concat(new_tables, ignore_index=True) return tables