From a57b045bf2238b9430dfc4234f55a82328fb7d15 Mon Sep 17 00:00:00 2001 From: Olexandr Balyk Date: Fri, 15 Nov 2024 06:36:50 -0600 Subject: [PATCH] Refactor apply_final_fixup (#243) Mostly get rid of the for loops to speed things up. --- xl2times/transforms.py | 151 ++++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 64 deletions(-) diff --git a/xl2times/transforms.py b/xl2times/transforms.py index f9e1328..2a42788 100644 --- a/xl2times/transforms.py +++ b/xl2times/transforms.py @@ -2877,86 +2877,109 @@ def apply_final_fixup( model: TimesModel, ) -> dict[str, DataFrame]: - veda_process_sets = tables["VedaProcessSets"] + veda_process_sets = tables["VedaProcessSets"][["sets", "process"]] reg_com_flows = tables["ProcessTopology"].drop(columns="io") + reg_com_flows.drop_duplicates(inplace=True, ignore_index=True) df = tables[Tag.fi_t] # Fill other_indexes for COST cost_mapping = {"MIN": "IMP", "EXP": "EXP", "IMP": "IMP"} - i = (df["attribute"] == "COST") & df["process"].notna() - if any(i): - for process in df[i]["process"].unique(): - veda_process_set = ( - veda_process_sets["sets"] - .loc[veda_process_sets["process"] == process] - .unique() + cost_index = (df["attribute"] == "COST") & df["process"].notna() + + if any(cost_index): + processes = set(df[cost_index]["process"].unique()) + # Index of IRE processes and their IRE sets specification + sets_index = veda_process_sets["process"].isin(processes) & veda_process_sets[ + "sets" + ].isin(cost_mapping.keys()) + + ire_processes = set(veda_process_sets["process"][sets_index].unique()) + other_processes = processes - ire_processes + + if other_processes: + logger.warning( + f"COST won't be processed as IRE_PRICE for {other_processes}, because they are not in IMP/EXP/MIN" + ) + + if any(ire_processes): + # Ensure only one IRE set is specified per process + subst_df = veda_process_sets[sets_index].drop_duplicates( + subset="process", keep="last" + ) + index = cost_index & df["process"].isin(ire_processes) + df.loc[index, "other_indexes"] = df.loc[index, "process"].replace( + subst_df.set_index("process")["sets"].replace(cost_mapping).to_dict() ) - if veda_process_set.shape[0]: - df.loc[i & (df["process"] == process), "other_indexes"] = cost_mapping[ - veda_process_set[0] - ] - else: - logger.warning( - f"COST won't be processed as IRE_PRICE for {process}, because it is not in IMP/EXP/MIN" - ) # Use CommName to store the active commodity for EXP / IMP - i = df["attribute"].isin({"COST", "IRE_PRICE"}) - if any(i): - i_exp = i & (df["other_indexes"] == "EXP") + index = df["attribute"].isin({"COST", "IRE_PRICE"}) + if any(index): + i_exp = index & (df["other_indexes"] == "EXP") df.loc[i_exp, "commodity"] = df.loc[i_exp, "commodity-in"] - i_imp = i & (df["other_indexes"] == "IMP") + i_imp = index & (df["other_indexes"] == "IMP") df.loc[i_imp, "commodity"] = df.loc[i_imp, "commodity-out"] # Fill CommName for COST (alias of IRE_PRICE) if missing - i = (df["attribute"] == "COST") & df["commodity"].isna() - if any(i): - df.loc[i, "commodity"] = df[i].apply( - lambda row: ",".join( - reg_com_flows.loc[ - (reg_com_flows["region"] == row["region"]) - & (reg_com_flows["process"] == row["process"]), - "commodity", - ].unique() - ), - axis=1, + i_com_na = (df["attribute"] == "COST") & df["commodity"].isna() + if any(i_com_na): + comm_rp = reg_com_flows.groupby(["region", "process"]).agg(set) + comm_rp["commodity"] = comm_rp["commodity"].str.join(",") + df.set_index(["region", "process"], inplace=True) + i_cost = df["attribute"] == "COST" + df.loc[i_cost, "commodity"] = df["commodity"][i_cost].fillna( + comm_rp["commodity"].to_dict() ) + df.reset_index(inplace=True) # Handle STOCK specified for a single year - i = (df["attribute"] == "STOCK") & df["process"].notna() - # Temporary solution to include only processes defined in BASE - i_vt = i & (df["source_filename"].str.contains("VT_", case=False)) - if any(i): - extra_rows = [] - for region in df[i]["region"].unique(): - i_reg = i & (df["region"] == region) - for process in df[(i_reg & i_vt)]["process"].unique(): - i_reg_prc = i_reg & (df["process"] == process) - if any(i_reg_prc): - extra_rows.append(["NCAP_BND", region, process, "UP", 0, 2]) - # TODO: TIMES already handles this. Drop? - if len(df[i_reg_prc]["year"].unique()) == 1: - year = df[i_reg_prc]["year"].unique()[0] - i_attr = ( - df["attribute"].isin({"NCAP_TLIFE", "LIFE"}) - & (df["region"] == region) - & (df["process"] == process) - ) - if any(i_attr): - lifetime = df[i_attr]["value"].unique()[-1] - else: - lifetime = 30 - extra_rows.append( - ["STOCK", region, process, "", year + lifetime, 0] - ) - if len(extra_rows) > 0: - cols = ["attribute", "region", "process", "limtype", "year", "value"] - df = pd.concat( - [ - df, - pd.DataFrame(extra_rows, columns=cols), - ] + stock_index = (df["attribute"] == "STOCK") & df["process"].notna() + if any(stock_index): + # Temporary solution to include only processes defined in BASE + i_vt = stock_index & (df["source_filename"].str.contains("VT_", case=False)) + # Create (region, process) index for data defined in vt + i_df_rp_vt = df[i_vt].set_index(["region", "process"]).index.drop_duplicates() + # Create extra rows with NCAP_BND + ncap_bnd_data = { + "attribute": "NCAP_BND", + "limtype": "UP", + "year": 0, + "value": 2, + } + ncap_bnd_rows = pd.DataFrame(ncap_bnd_data, index=i_df_rp_vt).reset_index() + # Create df list to concatenate later on + df_list = [df, ncap_bnd_rows] + # Stock indexed by process/region + cols = ["region", "process", "year"] + df_rp = ( + df[stock_index] + .drop_duplicates(subset=cols, keep="last") + .set_index(["region", "process"]) + ) + # Index of region/process with STOCK specified only once + i_single_stock = ~df_rp.index.duplicated(keep=False) + + # TODO: TIMES already handles this. Drop? + if any(i_single_stock): + default_life = 30 + life_rp = ( + df[df["attribute"].isin({"NCAP_TLIFE", "LIFE"})] + .drop_duplicates(subset=["region", "process"], keep="last") + .set_index(["region", "process"])["value"] + ) + stock_rows = df_rp[["attribute", "year"]][i_single_stock].copy() + stock_rows = stock_rows.merge( + life_rp, how="left", left_index=True, right_index=True ) + # Use default if lifetime not specified + stock_rows.loc[stock_rows["value"].isna(), "value"] = default_life + # Calculate the year in which STOCK is zero + stock_rows["year"] = stock_rows["year"] + stock_rows["value"] + # Specify stock value zero + stock_rows["value"] = 0 + stock_rows.reset_index(inplace=True) + df_list.append(stock_rows) + + df = pd.concat(df_list) tables[Tag.fi_t] = df