diff --git a/message_ix_models/model/water/cli.py b/message_ix_models/model/water/cli.py index 63fbe8b68a..5c60207423 100644 --- a/message_ix_models/model/water/cli.py +++ b/message_ix_models/model/water/cli.py @@ -263,7 +263,7 @@ def cooling(context, regions, rcps, rels): help="Default running legacy and water (full) otherwise only water, if specified", ) @common_params("output_model") -def report_cli(context, output_model, sdgs, water=False): +def report_cli(context: "Context", output_model, sdgs, water=False): """function to run the water report_full from cli to the scenario defined by the user with --url @@ -276,7 +276,7 @@ def report_cli(context, output_model, sdgs, water=False): SDG : Str Defines if and what water SDG measures are activated """ - reg = context.regions + reg = context.model.regions sc = context.get_scenario() if water: from message_ix_models.model.water.reporting import report diff --git a/message_ix_models/model/water/data/infrastructure.py b/message_ix_models/model/water/data/infrastructure.py index 68e87752a7..debf65ed01 100644 --- a/message_ix_models/model/water/data/infrastructure.py +++ b/message_ix_models/model/water/data/infrastructure.py @@ -20,66 +20,17 @@ from message_ix_models import Context -def add_infrastructure_techs(context: "Context"): - """Process water distribution data for a scenario instance. - Parameters - ---------- - context : .Context - Returns - ------- - data : dict of (str -> pandas.DataFrame) - Keys are MESSAGE parameter names such as 'input', 'fix_cost'. - Values are data frames ready for :meth:`~.Scenario.add_par`. - Years in the data include the model horizon indicated by - ``context["water build info"]``, plus the additional year 2010. - """ - # TODO reduce complexity of this function from 18 to 15 or less - # Reference to the water configuration - info = context["water build info"] - - # define an empty dictionary - results = {} - sub_time = context.time - # load the scenario from context - scen = context.get_scenario() - - year_wat = (2010, 2015, *info.Y) - - # first activity year for all water technologies is 2020 - first_year = scen.firstmodelyear - - # reading basin_delineation - FILE2 = f"basins_by_region_simpl_{context.regions}.csv" - PATH = package_data_path("water", "delineation", FILE2) - - df_node = pd.read_csv(PATH) - # Assigning proper nomenclature - df_node["node"] = "B" + df_node["BCU_name"].astype(str) - df_node["mode"] = "M" + df_node["BCU_name"].astype(str) - if context.type_reg == "country": - df_node["region"] = context.map_ISO_c[context.regions] - else: - df_node["region"] = f"{context.regions}_" + df_node["REGION"].astype(str) - - # Reading water distribution mapping from csv - path = package_data_path("water", "infrastructure", "water_distribution.xlsx") - df = pd.read_excel(path) - - techs = [ - "urban_t_d", - "urban_unconnected", - "industry_unconnected", - "rural_t_d", - "rural_unconnected", - ] - - df_non_elec = df[df["incmd"] != "electr"].reset_index() - df_dist = df_non_elec[df_non_elec["tec"].isin(techs)] - df_non_elec = df_non_elec[~df_non_elec["tec"].isin(techs)] - df_elec = df[df["incmd"] == "electr"].reset_index() - +def start_creating_input_dataframe( + sdg: str, + df_node: pd.DataFrame, + df_non_elec: pd.DataFrame, + df_dist: pd.DataFrame, + year_wat: tuple, + first_year: int, + sub_time, +) -> pd.DataFrame: + """Creates an input pd.DataFrame and adds some data to it.""" inp_df = pd.DataFrame([]) - # Input Dataframe for non elec commodities for index, rows in df_non_elec.iterrows(): inp_df = pd.concat( @@ -108,10 +59,9 @@ def add_infrastructure_techs(context: "Context"): ), ] ) - - if context.SDG != "baseline": + if sdg != "baseline": for index, rows in df_dist.iterrows(): - inp_df = pd.concat( + return pd.concat( [ inp_df, ( @@ -166,7 +116,7 @@ def add_infrastructure_techs(context: "Context"): ] ) - inp_df = inp_df.append( + return inp_df.append( ( make_df( "input", @@ -189,6 +139,77 @@ def add_infrastructure_techs(context: "Context"): .pipe(same_time) ) ) + + +def add_infrastructure_techs(context: "Context"): + """Process water distribution data for a scenario instance. + Parameters + ---------- + context : .Context + Returns + ------- + data : dict of (str -> pandas.DataFrame) + Keys are MESSAGE parameter names such as 'input', 'fix_cost'. + Values are data frames ready for :meth:`~.Scenario.add_par`. + Years in the data include the model horizon indicated by + ``context["water build info"]``, plus the additional year 2010. + """ + # TODO reduce complexity of this function from 18 to 15 or less + # Reference to the water configuration + info = context["water build info"] + + # define an empty dictionary + results = {} + sub_time = context.time + # load the scenario from context + scen = context.get_scenario() + + year_wat = (2010, 2015, *info.Y) + + # first activity year for all water technologies is 2020 + first_year = scen.firstmodelyear + + # reading basin_delineation + FILE2 = f"basins_by_region_simpl_{context.regions}.csv" + PATH = package_data_path("water", "delineation", FILE2) + + df_node = pd.read_csv(PATH) + # Assigning proper nomenclature + df_node["node"] = "B" + df_node["BCU_name"].astype(str) + df_node["mode"] = "M" + df_node["BCU_name"].astype(str) + df_node["region"] = ( + context.map_ISO_c[context.regions] + if context.type_reg == "country" + else f"{context.regions}_" + df_node["REGION"].astype(str) + ) + + # Reading water distribution mapping from csv + path = package_data_path("water", "infrastructure", "water_distribution.xlsx") + df = pd.read_excel(path) + + techs = [ + "urban_t_d", + "urban_unconnected", + "industry_unconnected", + "rural_t_d", + "rural_unconnected", + ] + + df_non_elec = df[df["incmd"] != "electr"].reset_index() + df_dist = df_non_elec[df_non_elec["tec"].isin(techs)] + df_non_elec = df_non_elec[~df_non_elec["tec"].isin(techs)] + df_elec = df[df["incmd"] == "electr"].reset_index() + + inp_df = start_creating_input_dataframe( + sdg=context.SDG, + df_node=df_node, + df_non_elec=df_non_elec, + df_dist=df_dist, + year_wat=year_wat, + first_year=first_year, + sub_time=sub_time, + ) + result_dc = defaultdict(list) for index, rows in df_elec.iterrows(): diff --git a/message_ix_models/model/water/data/irrigation.py b/message_ix_models/model/water/data/irrigation.py index d8cf67259b..92f6442d9a 100644 --- a/message_ix_models/model/water/data/irrigation.py +++ b/message_ix_models/model/water/data/irrigation.py @@ -36,10 +36,11 @@ def add_irr_structure(context: "Context"): # Assigning proper nomenclature df_node["node"] = "B" + df_node["BCU_name"].astype(str) df_node["mode"] = "M" + df_node["BCU_name"].astype(str) - if context.type_reg == "country": - df_node["region"] = context.map_ISO_c[context.regions] - else: - df_node["region"] = f"{context.regions}_" + df_node["REGION"].astype(str) + df_node["region"] = ( + context.map_ISO_c[context.regions] + if context.type_reg == "country" + else f"{context.regions}_" + df_node["REGION"].astype(str) + ) # Reference to the water configuration info = context["water build info"] @@ -62,6 +63,7 @@ def add_irr_structure(context: "Context"): node_loc=df_node["region"], ).pipe(broadcast, year_vtg=info.Y) + # FIXME pd.DataFrames don't have append(), please choose another way! inp = inp.append( make_df( "input", diff --git a/message_ix_models/model/water/data/water_for_ppl.py b/message_ix_models/model/water/data/water_for_ppl.py index e1350f2f7b..2436182211 100644 --- a/message_ix_models/model/water/data/water_for_ppl.py +++ b/message_ix_models/model/water/data/water_for_ppl.py @@ -18,6 +18,151 @@ from message_ix_models import Context +def missing_tech(x: pd.Series) -> pd.Series: + """Assign values to missing data. + It goes through the input data frame and extract the technologies which + don't have input values and then assign manual values to those technologies + along with assigning them an arbitrary level i.e dummy supply + """ + data_dic = { + "geo_hpl": 1 / 0.850, + "geo_ppl": 1 / 0.385, + "nuc_hc": 1 / 0.326, + "nuc_lc": 1 / 0.326, + "solar_th_ppl": 1 / 0.385, + } + + if data_dic.get(x["technology"]): + if x["level"] == "cooling": + return pd.Series((data_dic.get(x["technology"]), "dummy_supply")) + else: + return pd.Series((data_dic.get(x["technology"]), x["level"])) + else: + return pd.Series((x["value"], x["level"])) + + +def cooling_fr(x: pd.Series) -> float: + """Calculate cooling fraction + Returns the calculated cooling fraction after for two categories; + 1. Technologies that produce heat as an output + cooling_fraction(h_cool) = input value(hi) - 1 + Simply subtract 1 from the heating value since the rest of the part is already + accounted in the heating value + 2. Rest of technologies + h_cool = hi -Hi* h_fg - 1, + where: + h_fg (flue gasses losses) = 0.1 (10% assumed losses) + """ + if "hpl" in x["index"]: + return x["value"] - 1 + else: + return x["value"] - (x["value"] * 0.1) - 1 + + +def shares( + x: pd.Series, + context: "Context", + search_cols_cooling_fraction: list, + hold_df: pd.DataFrame, + search_cols: list, +) -> pd.Series: + """Process share and cooling fraction. + Returns + ------- + Product of value of shares of cooling technology types of regions with + corresponding cooling fraction + """ + for col in search_cols_cooling_fraction: + # MAPPING ISOCODE to region name, assume one country only + col2 = context.map_ISO_c[col] if context.type_reg == "country" else col + cooling_fraction = hold_df[ + (hold_df["node_loc"] == col2) + & (hold_df["technology_name"] == x["technology"]) + ]["cooling_fraction"] + x[col] = x[col] * cooling_fraction + + results: list[Any] = [] + for i in x: + if isinstance(i, str): + results.append(i) + else: + if not len(i): + return pd.Series( + [i for i in range(len(search_cols) - 1)] + ["delme"], + index=search_cols, + ) + else: + results.append(float(i)) + return pd.Series(results, index=search_cols) + + +def hist_act(x: pd.Series, context: "Context", hold_cost: pd.DataFrame) -> list: + """Calculate historical activity of cooling technology. + The data for shares is read from ``cooltech_cost_and_shares_ssp_msg.csv`` + Returns + ------- + hist_activity(cooling_tech) = hist_activitiy(parent_technology) * share + *cooling_fraction + """ + tech_df = hold_cost[ + hold_cost["technology"].str.startswith(x.technology) + ] # [x.node_loc] + + node_search = context.regions if context.type_reg == "country" else x["node_loc"] + + node_loc = x["node_loc"] + technology = x["technology"] + cooling_technologies = list(tech_df["technology"]) + new_values = tech_df[node_search] * x.value + + return [ + [ + node_loc, + technology, + cooling_technology, + x.year_act, + x.value, + new_value, + x.unit, + ] + for new_value, cooling_technology in zip(new_values, cooling_technologies) + ] + + +def hist_cap(x: pd.Series, context: "Context", hold_cost: pd.DataFrame) -> list: + """Calculate historical capacity of cooling technology. + The data for shares is read from ``cooltech_cost_and_shares_ssp_msg.csv`` + Returns + ------- + hist_new_capacity(cooling_tech) = historical_new_capacity(parent_technology)* + share * cooling_fraction + """ + tech_df = hold_cost[ + hold_cost["technology"].str.startswith(x.technology) + ] # [x.node_loc] + if context.type_reg == "country": + node_search = context.regions + else: + node_search = x["node_loc"] # R11_EEU + node_loc = x["node_loc"] + technology = x["technology"] + cooling_technologies = list(tech_df["technology"]) + new_values = tech_df[node_search] * x.value + + return [ + [ + node_loc, + technology, + cooling_technology, + x.year_vtg, + x.value, + new_value, + x.unit, + ] + for new_value, cooling_technology in zip(new_values, cooling_technologies) + ] + + # water & electricity for cooling technologies def cool_tech(context: "Context"): """Process cooling technology data for a scenario instance. @@ -27,9 +172,11 @@ def cool_tech(context: "Context"): It adds cooling technologies as addons to the parent technologies.The nomenclature for cooling technology is __. E.g: `coal_ppl__ot_fresh` + Parameters ---------- context : .Context + Returns ------- data : dict of (str -> pandas.DataFrame) @@ -38,7 +185,6 @@ def cool_tech(context: "Context"): Years in the data include the model horizon indicated by ``context["water build info"]``, plus the additional year 2010. """ - # TODO reduce complexity of this function from 18 to 15 or less #: Name of the input file. # The input file mentions water withdrawals and emission heating fractions for # cooling technologies alongwith parent technologies: @@ -66,10 +212,11 @@ def cool_tech(context: "Context"): # Assigning proper nomenclature df_node["node"] = "B" + df_node["BCU_name"].astype(str) df_node["mode"] = "M" + df_node["BCU_name"].astype(str) - if context.type_reg == "country": - df_node["region"] = context.map_ISO_c[context.regions] - else: - df_node["region"] = f"{context.regions}_" + df_node["REGION"].astype(str) + df_node["region"] = ( + context.map_ISO_c[context.regions] + if context.type_reg == "country" + else f"{context.regions}_" + df_node["REGION"].astype(str) + ) node_region = df_node["region"].unique() # reading ppl cooling tech dataframe @@ -88,41 +235,20 @@ def cool_tech(context: "Context"): # Extracting input database from scenario for parent technologies # Extracting input values from scenario - ref_input = scen.par("input", {"technology": cooling_df["parent_tech"]}) + ref_input: pd.DataFrame = scen.par( + "input", {"technology": cooling_df["parent_tech"]} + ) # Extracting historical activity from scenario - ref_hist_act = scen.par( + ref_hist_act: pd.DataFrame = scen.par( "historical_activity", {"technology": cooling_df["parent_tech"]} ) # Extracting historical capacity from scenario - ref_hist_cap = scen.par( + ref_hist_cap: pd.DataFrame = scen.par( "historical_new_capacity", {"technology": cooling_df["parent_tech"]} ) # cooling fraction = H_cool = Hi - 1 - Hi*(h_fg) # where h_fg (flue gasses losses) = 0.1 ref_input["cooling_fraction"] = ref_input["value"] * 0.9 - 1 - - def missing_tech(x): - """Assign values to missing data. - It goes through the input data frame and extract the technologies which - don't have input values and then assign manual values to those technologies - along with assigning them an arbitrary level i.e dummy supply - """ - data_dic = { - "geo_hpl": 1 / 0.850, - "geo_ppl": 1 / 0.385, - "nuc_hc": 1 / 0.326, - "nuc_lc": 1 / 0.326, - "solar_th_ppl": 1 / 0.385, - } - - if data_dic.get(x["technology"]): - if x["level"] == "cooling": - return pd.Series((data_dic.get(x["technology"]), "dummy_supply")) - else: - return pd.Series((data_dic.get(x["technology"]), x["level"])) - else: - return pd.Series((x["value"], x["level"])) - ref_input[["value", "level"]] = ref_input[["technology", "value", "level"]].apply( missing_tech, axis=1 ) @@ -153,23 +279,6 @@ def missing_tech(x): & (input_cool["node_origin"] != f"{context.regions}_GLB") ] - def cooling_fr(x): - """Calculate cooling fraction - Returns the calculated cooling fraction after for two categories; - 1. Technologies that produce heat as an output - cooling_fraction(h_cool) = input value(hi) - 1 - Simply subtract 1 from the heating value since the rest of the part is already - accounted in the heating value - 2. Rest of technologies - h_cool = hi -Hi* h_fg - 1, - where: - h_fg (flue gasses losses) = 0.1 (10% assumed losses) - """ - if "hpl" in x["index"]: - return x["value"] - 1 - else: - return x["value"] - (x["value"] * 0.1) - 1 - input_cool["cooling_fraction"] = input_cool.apply(cooling_fr, axis=1) # Converting water withdrawal units to Km3/GWa @@ -368,75 +477,20 @@ def cooling_fr(x): ].drop_duplicates() search_cols_cooling_fraction = [col for col in search_cols if col != "technology"] - def shares(x, context: "Context"): - """Process share and cooling fraction. - Returns - ------- - Product of value of shares of cooling technology types of regions with - corresponding cooling fraction - """ - for col in search_cols_cooling_fraction: - # MAPPING ISOCODE to region name, assume one country only - col2 = context.map_ISO_c[col] if context.type_reg == "country" else col - cooling_fraction = hold_df[ - (hold_df["node_loc"] == col2) - & (hold_df["technology_name"] == x["technology"]) - ]["cooling_fraction"] - x[col] = x[col] * cooling_fraction - - results: list[Any] = [] - for i in x: - if isinstance(i, str): - results.append(i) - else: - if not len(i): - return pd.Series( - [i for i in range(len(search_cols) - 1)] + ["delme"], - index=search_cols, - ) - else: - results.append(float(i)) - return pd.Series(results, index=search_cols) - # Apply function to the - hold_cost = cost[search_cols].apply(shares, axis=1, context=context) + hold_cost = cost[search_cols].apply( + shares, + axis=1, + context=context, + search_cols_cooling_fraction=search_cols_cooling_fraction, + hold_df=hold_df, + search_cols=search_cols, + ) hold_cost = hold_cost[hold_cost["technology"] != "delme"] - def hist_act(x, context: "Context"): - """Calculate historical activity of cooling technology. - The data for shares is read from ``cooltech_cost_and_shares_ssp_msg.csv`` - Returns - ------- - hist_activity(cooling_tech) = hist_activitiy(parent_technology) * share - *cooling_fraction - """ - tech_df = hold_cost[ - hold_cost["technology"].str.startswith(x.technology) - ] # [x.node_loc] - - node_search = ( - context.regions if context.type_reg == "country" else x["node_loc"] - ) - - node_loc = x["node_loc"] - technology = x["technology"] - cooling_technologies = list(tech_df["technology"]) - new_values = tech_df[node_search] * x.value - - return [ - [ - node_loc, - technology, - cooling_technology, - x.year_act, - x.value, - new_value, - x.unit, - ] - for new_value, cooling_technology in zip(new_values, cooling_technologies) - ] - - changed_value_series = ref_hist_act.apply(hist_act, axis=1, context=context) + changed_value_series = ref_hist_act.apply( + hist_act, axis=1, context=context, hold_cost=hold_cost + ) changed_value_series_flat = [ row for series in changed_value_series for row in series ] @@ -452,40 +506,9 @@ def hist_act(x, context: "Context"): # dataframe for historical activities of cooling techs act_value_df = pd.DataFrame(changed_value_series_flat, columns=columns) - def hist_cap(x, context: "Context"): - """Calculate historical capacity of cooling technology. - The data for shares is read from ``cooltech_cost_and_shares_ssp_msg.csv`` - Returns - ------- - hist_new_capacity(cooling_tech) = historical_new_capacity(parent_technology)* - share * cooling_fraction - """ - tech_df = hold_cost[ - hold_cost["technology"].str.startswith(x.technology) - ] # [x.node_loc] - if context.type_reg == "country": - node_search = context.regions - else: - node_search = x["node_loc"] # R11_EEU - node_loc = x["node_loc"] - technology = x["technology"] - cooling_technologies = list(tech_df["technology"]) - new_values = tech_df[node_search] * x.value - - return [ - [ - node_loc, - technology, - cooling_technology, - x.year_vtg, - x.value, - new_value, - x.unit, - ] - for new_value, cooling_technology in zip(new_values, cooling_technologies) - ] - - changed_value_series = ref_hist_cap.apply(hist_cap, axis=1, context=context) + changed_value_series = ref_hist_cap.apply( + hist_cap, axis=1, context=context, hold_cost=hold_cost + ) changed_value_series_flat = [ row for series in changed_value_series for row in series ] @@ -766,7 +789,8 @@ def non_cooling_tec(context: "Context"): FILE = "tech_water_performance_ssp_msg.csv" path = package_data_path("water", "ppl_cooling_tech", FILE) df = pd.read_csv(path) - cooling_df = df.loc[df["technology_group"] == "cooling"] + cooling_df = df.copy() + cooling_df = cooling_df.loc[cooling_df["technology_group"] == "cooling"] # Separate a column for parent technologies of respective cooling # techs cooling_df["parent_tech"] = ( diff --git a/message_ix_models/model/water/data/water_supply.py b/message_ix_models/model/water/data/water_supply.py index d4a2a25e8b..1807686071 100644 --- a/message_ix_models/model/water/data/water_supply.py +++ b/message_ix_models/model/water/data/water_supply.py @@ -16,7 +16,7 @@ def map_basin_region_wat(context: "Context"): """ - Calculate share of water avaialbility of basins per each parent region. + Calculate share of water availability of basins per each parent region. The parent region could be global message regions or country @@ -49,11 +49,11 @@ def map_basin_region_wat(context: "Context"): # Reading data, the data is spatially and temporally aggregated from GHMs df_sw["BCU_name"] = df_x["BCU_name"] - - if context.type_reg == "country": - df_sw["MSGREG"] = context.map_ISO_c[context.regions] - else: - df_sw["MSGREG"] = f"{context.regions}_" + df_sw["BCU_name"].str[-3:] + df_sw["MSGREG"] = ( + context.map_ISO_c[context.regions] + if context.type_reg == "country" + else f"{context.regions}_" + df_sw["BCU_name"].str[-3:] + ) df_sw = df_sw.set_index(["MSGREG", "BCU_name"]) @@ -139,6 +139,7 @@ def add_water_supply(context: "Context"): info = context["water build info"] # load the scenario from context scen = context.get_scenario() + # scen = Scenario(context.get_platform(), **context.core.scenario_info) # year_wat = (2010, 2015) fut_year = info.Y @@ -159,10 +160,11 @@ def add_water_supply(context: "Context"): # Assigning proper nomenclature df_node["node"] = "B" + df_node["BCU_name"].astype(str) df_node["mode"] = "M" + df_node["BCU_name"].astype(str) - if context.type_reg == "country": - df_node["region"] = context.map_ISO_c[context.regions] - else: - df_node["region"] = f"{context.regions}_" + df_node["REGION"].astype(str) + df_node["region"] = ( + context.map_ISO_c[context.regions] + if context.type_reg == "country" + else f"{context.regions}_" + df_node["REGION"].astype(str) + ) # Storing the energy MESSAGE region names node_region = df_node["region"].unique() @@ -171,10 +173,11 @@ def add_water_supply(context: "Context"): FILE1 = f"gw_energy_intensity_depth_{context.regions}.csv" PATH1 = package_data_path("water", "availability", FILE1) df_gwt = pd.read_csv(PATH1) - if context.type_reg == "country": - df_gwt["region"] = context.map_ISO_c[context.regions] - else: - df_gwt["REGION"] = f"{context.regions}_" + df_gwt["REGION"].astype(str) + df_gwt["region"] = ( + context.map_ISO_c[context.regions] + if context.type_reg == "country" + else f"{context.regions}_" + df_gwt["REGION"].astype(str) + ) # reading groundwater energy intensity data FILE2 = f"historical_new_cap_gw_sw_km3_year_{context.regions}.csv" @@ -266,6 +269,7 @@ def add_water_supply(context: "Context"): .pipe(same_time) ) + # FIXME pd.DataFrames don't have append(), please choose another way! # input data frame for slack technology balancing equality with demands inp = inp.append( make_df( diff --git a/message_ix_models/model/water/reporting.py b/message_ix_models/model/water/reporting.py index ce67f35975..b263b68b57 100644 --- a/message_ix_models/model/water/reporting.py +++ b/message_ix_models/model/water/reporting.py @@ -3,10 +3,14 @@ import numpy as np import pandas as pd import pyam -from message_ix import Reporter +from message_ix import Reporter, Scenario from message_ix_models.util import package_data_path +# FIXME This is not how things are supposed to work! Your code always requires +# message_data to be present, which is not allowed in message-ix-models! If +# legacy_reporting truly remained None, your code would fail, as mypy already points +# out! try: from message_data.tools.post_processing.iamc_report_hackathon import ( report as legacy_reporting, @@ -260,9 +264,9 @@ def multiply_electricity_output_of_hydro(elec_hydro_var, report_iam): # TODO -def report(sc=False, reg="", sdgs=False): +def report(sc: Scenario, reg: str, sdgs: bool = False): """Report nexus module results""" - + log.info(f"Regions given as {reg}; no warranty of it's not in ['R11','R12']") # Generating reporter rep = Reporter.from_scenario(sc) report = rep.get( @@ -1084,15 +1088,6 @@ def report(sc=False, reg="", sdgs=False): # add population with sanitation or drinking water access mp2 = sc.platform - map_node = sc.set("map_node") - # this might not be the best way to get the region, better from context - if not reg: - if "R11" in map_node.node.to_list()[1]: - reg = "R11" - elif "R12" in map_node.node.to_list()[1]: - reg = "R12" - else: - print("Check the region of the model is consistent with R11,R12") # load data on water and sanitation access load_path = package_data_path("water", "demands", "harmonized", reg) @@ -1100,65 +1095,62 @@ def report(sc=False, reg="", sdgs=False): pop_check = sc.timeseries(variable="Population") pop_check = pop_check[pop_check.year >= 2020] - if pop_check.empty: - print("The Population data does not exist or timeseries() has no future values") - else: - pop_drink_tot = pd.DataFrame() - pop_sani_tot = pd.DataFrame() - pop_sdg6 = pd.DataFrame() - for ur in ["urban", "rural"]: - # CHANGE TO URBAN AND RURAL POP - pop_tot = sc.timeseries(variable=("Population|" + ur.capitalize())) - pop_tot = pop_tot[-(pop_tot.region == "GLB region (R11)")] - pop_reg = np.unique(pop_tot["region"]) - # need to change names - reg_map = mp2.regions() - reg_map = reg_map[reg_map.mapped_to.isin(pop_reg)].drop( - columns=["parent", "hierarchy"] - ) - reg_map["region"] = [x.split("_")[1] for x in reg_map.region] + assert ( + not pop_check.empty + ), "The Population data does not exist or timeseries() has no future values" + + pop_drink_tot = pd.DataFrame() + pop_sani_tot = pd.DataFrame() + pop_sdg6 = pd.DataFrame() + for ur in ["urban", "rural"]: + # CHANGE TO URBAN AND RURAL POP + pop_tot = sc.timeseries(variable=("Population|" + ur.capitalize())) + pop_tot = pop_tot[-(pop_tot.region == "GLB region (R11)")] + pop_reg = np.unique(pop_tot["region"]) + # need to change names + reg_map = mp2.regions() + reg_map = reg_map[reg_map.mapped_to.isin(pop_reg)].drop( + columns=["parent", "hierarchy"] + ) + reg_map["region"] = [x.split("_")[1] for x in reg_map.region] - df_rate = all_rates[all_rates.variable.str.contains(ur)] + df_rate = all_rates[all_rates.variable.str.contains(ur)] - df_rate = df_rate[ - df_rate.variable.str.contains("sdg" if sdgs else "baseline") - ] + df_rate = df_rate[df_rate.variable.str.contains("sdg" if sdgs else "baseline")] - df_rate["region"] = [x.split("|")[1] for x in df_rate.node] - df_rate = df_rate.drop(columns=["node"]) - # make region mean (no weighted average) - df_rate = ( - df_rate.groupby(["year", "variable", "region"])["value"] - .mean() - .reset_index() - ) - # convert region name - df_rate = df_rate.merge(reg_map, how="left") - df_rate = df_rate.drop(columns=["region"]) - df_rate = df_rate.rename( - columns={"mapped_to": "region", "variable": "new_var", "value": "rate"} - ) + df_rate["region"] = [x.split("|")[1] for x in df_rate.node] + df_rate = df_rate.drop(columns=["node"]) + # make region mean (no weighted average) + df_rate = ( + df_rate.groupby(["year", "variable", "region"])["value"] + .mean() + .reset_index() + ) + # convert region name + df_rate = df_rate.merge(reg_map, how="left") + df_rate = df_rate.drop(columns=["region"]) + df_rate = df_rate.rename( + columns={"mapped_to": "region", "variable": "new_var", "value": "rate"} + ) - # Population|Drinking Water Access - df_drink = df_rate[df_rate.new_var.str.contains("connection")] - pop_drink = pop_tot.merge(df_drink, how="left") - pop_drink["variable"] = ( - "Population|Drinking Water Access|" + ur.capitalize() - ) - pop_drink["value"] = pop_drink.value * pop_drink.rate - cols = pop_tot.columns - pop_drink = pop_drink[cols] - pop_drink_tot = pop_drink_tot.append(pop_drink) - pop_sdg6 = pop_sdg6.append(pop_drink) - - # Population|Sanitation Acces - df_sani = df_rate[df_rate.new_var.str.contains("treatment")] - pop_sani = pop_tot.merge(df_sani, how="left") - pop_sani["variable"] = "Population|Sanitation Access|" + ur.capitalize() - pop_sani["value"] = pop_sani.value * pop_sani.rate - pop_sani = pop_sani[cols] - pop_sani_tot = pop_sani_tot.append(pop_drink) - pop_sdg6 = pop_sdg6.append(pop_sani) + # Population|Drinking Water Access + df_drink = df_rate[df_rate.new_var.str.contains("connection")] + pop_drink = pop_tot.merge(df_drink, how="left") + pop_drink["variable"] = "Population|Drinking Water Access|" + ur.capitalize() + pop_drink["value"] = pop_drink.value * pop_drink.rate + cols = pop_tot.columns + pop_drink = pop_drink[cols] + pop_drink_tot = pop_drink_tot.append(pop_drink) + pop_sdg6 = pop_sdg6.append(pop_drink) + + # Population|Sanitation Acces + df_sani = df_rate[df_rate.new_var.str.contains("treatment")] + pop_sani = pop_tot.merge(df_sani, how="left") + pop_sani["variable"] = "Population|Sanitation Access|" + ur.capitalize() + pop_sani["value"] = pop_sani.value * pop_sani.rate + pop_sani = pop_sani[cols] + pop_sani_tot = pop_sani_tot.append(pop_drink) + pop_sdg6 = pop_sdg6.append(pop_sani) # total values pop_drink_tot = ( @@ -1190,7 +1182,7 @@ def report(sc=False, reg="", sdgs=False): pop_sdg6_glb = pop_sdg6_glb[cols] pop_sdg6 = pop_sdg6.append(pop_sdg6_glb) - print("Population|Drinking Water Access") + log.info("Population|Drinking Water Access") # Add water prices, ad-hoc procedure wp = sc.var( @@ -1282,7 +1274,7 @@ def report(sc=False, reg="", sdgs=False): map_node_dict = map_node.groupby("node_parent")["node"].apply(list).to_dict() for index, row in map_agg_pd.iterrows(): - print(row["names"]) + log.info(f"Processing {row["names"]}") # Aggregates variables as per standard reporting report_iam.aggregate(row["names"], components=row["list_cat"], append=True) @@ -1334,10 +1326,7 @@ def report(sc=False, reg="", sdgs=False): report_pd = report_pd[-report_pd.variable.isin(water_hydro_var)] # add water population - if pop_check.empty: - print("The Population data does not exist or timeseries() has no future values") - else: - report_pd = report_pd.append(pop_sdg6) + report_pd = report_pd.append(pop_sdg6) # add units for index, row in map_agg_pd.iterrows(): @@ -1384,39 +1373,37 @@ def report(sc=False, reg="", sdgs=False): if reg not in ["R11", " R12"]: # temp for leap- re out_path = package_data_path().parents[0] / "reporting_output/" - - if not out_path.exists(): - out_path.mkdir() + out_path.mkdir(exist_ok=True) out_file = out_path / f"{sc.model}_{sc.scenario}_nexus.csv" report_pd.to_csv(out_file, index=False) sc.check_out(timeseries_only=True) - print("Starting to upload timeseries") - print(report_pd.head()) + log.info("Starting to upload timeseries") + log.info(report_pd.head()) sc.add_timeseries(report_pd) - print("Finished uploading timeseries") + log.info("Finished uploading timeseries") sc.commit("Reporting uploaded as timeseries") -def report_full(sc=False, reg="", sdgs=False): +def report_full(sc: Scenario, reg: str, sdgs=False): """Combine old and new reporting workflows""" a = sc.timeseries() # keep historical part, if present a = a[a.year >= 2020] sc.check_out(timeseries_only=True) - print("Remove any previous timeseries") + log.info("Remove any previous timeseries") sc.remove_timeseries(a) - print("Finished removing timeseries, now commit..") + log.info("Finished removing timeseries, now commit..") sc.commit("Remove existing timeseries") run_old_reporting(sc) - print("First part of reporting completed, now procede with the water variables") + log.info("First part of reporting completed, now procede with the water variables") report(sc, reg, sdgs) - print("overall NAVIGATE reporting completed") + log.info("overall NAVIGATE reporting completed") # add ad-hoc caplculated variables with a function ts = sc.timeseries() @@ -1432,4 +1419,4 @@ def report_full(sc=False, reg="", sdgs=False): ts_long = pyam.IamDataFrame(ts) ts_long.to_csv(out_file) - print(f"Saving csv to {out_file}") + log.info(f"Saving csv to {out_file}") diff --git a/message_ix_models/model/water/utils.py b/message_ix_models/model/water/utils.py index 72043498ec..c06f004034 100644 --- a/message_ix_models/model/water/utils.py +++ b/message_ix_models/model/water/utils.py @@ -27,7 +27,7 @@ ] -def read_config(context: Context | None = None): +def read_config(context: Optional[Context] = None): """Read the water model configuration / metadata from file. Numerical values are converted to computation-ready data structures. @@ -38,7 +38,7 @@ def read_config(context: Context | None = None): The current Context, with the loaded configuration. """ - context = context or Context.get_instance(0) + context = context or Context.get_instance(-1) # if context.nexus_set == 'nexus': if "water set" in context: @@ -74,8 +74,8 @@ def map_add_on(rtype=Code): # Create a new code by combining two result["code"].append( Code( - id="".join(c.id for c in indices), - name=", ".join(c.name for c in indices), + id="".join(str(c.id) for c in indices), + name=", ".join(str(c.name) for c in indices), ) ) @@ -100,27 +100,27 @@ def map_add_on(rtype=Code): raise ValueError(rtype) -def add_commodity_and_level(df, default_level=None): +def add_commodity_and_level(df: pd.DataFrame, default_level=None): # Add input commodity and level - t_info = Context.get_instance()["water set"]["technology"]["add"] - c_info = get_codes("commodity") + t_info: list = Context.get_instance()["water set"]["technology"]["add"] + c_info: list = get_codes("commodity") @lru_cache() def t_cl(t): - input = t_info[t_info.index(t)].anno["input"] + input = t_info[t_info.index(t)].annotations["input"] # Commodity must be specified commodity = input["commodity"] # Use the default level for the commodity in the RES (per # commodity.yaml) level = ( input.get("level", "water_supply") - or c_info[c_info.index(commodity)].anno.get("level", None) + or c_info[c_info.index(commodity)].annotations.get("level", None) or default_level ) return commodity, level - def func(row): + def func(row: pd.Series): row[["commodity", "level"]] = t_cl(row["technology"]) return row @@ -160,4 +160,6 @@ def map_yv_ya_lt( ) # Select values using the `ya` and `lt` parameters - return df[(ya <= df.year_act) & (df.year_act - df.year_vtg <= lt)] + return df.loc[(ya <= df.year_act) & (df.year_act - df.year_vtg <= lt)].reset_index( + drop=True + ) diff --git a/message_ix_models/tests/model/water/test_irrigation.py b/message_ix_models/tests/model/water/test_irrigation.py index 43188cae29..c7b45befe4 100644 --- a/message_ix_models/tests/model/water/test_irrigation.py +++ b/message_ix_models/tests/model/water/test_irrigation.py @@ -1,13 +1,23 @@ -from message_ix import Scenario from unittest.mock import patch import pandas as pd +from message_ix import Scenario +from message_ix_models import ScenarioInfo +from message_ix_models.model.structure import get_codes from message_ix_models.model.water.data.irrigation import add_irr_structure def test_add_irr_structure(test_context): context = test_context + + # FIXME You probably want this to be part of a common setup rather than writing + # something like this for every test + context.type_reg = "country" + nodes = get_codes(f"node/{context.regions}") + nodes = list(map(str, nodes[nodes.index("World")].child)) + context.map_ISO_c = {context.regions: nodes[0]} + mp = context.get_platform() scenario_info = { "mp": mp, @@ -21,6 +31,9 @@ def test_add_irr_structure(test_context): s.add_set("node", ["loc1", "loc2"]) s.add_set("year", [2020, 2030, 2040]) + # FIXME same as above + context["water build info"] = ScenarioInfo(s) + # Mock the DataFrame read from CSV df_node = pd.DataFrame({"BCU_name": ["1", "2"], "REGION": ["region1", "region2"]}) diff --git a/message_ix_models/tests/model/water/test_utils.py b/message_ix_models/tests/model/water/test_utils.py index c479ad8adf..b6c40773cf 100644 --- a/message_ix_models/tests/model/water/test_utils.py +++ b/message_ix_models/tests/model/water/test_utils.py @@ -2,7 +2,7 @@ import pandas as pd import xarray as xr -from sdmx.model.common import Code +from sdmx.model.common import Annotation, Code from message_ix_models import Context from message_ix_models.model.water.utils import ( @@ -11,25 +11,25 @@ map_yv_ya_lt, read_config, ) +from message_ix_models.util import load_private_data def test_read_config(test_context): # Mock the context context = test_context - # Mock the data returned by load_private_data - mock_data = {"test_key": "test_value"} + # Call the function to be tested + result = read_config(context) - # Mock the load_private_data function to return mock_data - with patch("message_ix_models.util.load_private_data", return_value=mock_data): - # Call the function to be tested - result = read_config(context) + config_parts = ["water", "config.yaml"] + set_parts = ["water", "set.yaml"] + technology_parts = ["water", "technology.yaml"] # Assert the results assert isinstance(result, Context) - assert result["water config"] == mock_data - assert result["water set"] == mock_data - assert result["water technology"] == mock_data + assert result["water config"] == load_private_data(*config_parts) + assert result["water set"] == load_private_data(*set_parts) + assert result["water technology"] == load_private_data(*technology_parts) def test_map_add_on(): @@ -42,7 +42,9 @@ def test_map_add_on(): } # Mock the read_config function to return mock_data - with patch("your_module.read_config", return_value=mock_data): + with patch( + "message_ix_models.model.water.utils.read_config", return_value=mock_data + ): # Call the function to be tested result = map_add_on() @@ -51,7 +53,9 @@ def test_map_add_on(): assert result == expected # Testing with rtype = 'indexers' - with patch("your_module.read_config", return_value=mock_data): + with patch( + "message_ix_models.model.water.utils.read_config", return_value=mock_data + ): result = map_add_on(rtype="indexers") expected = { @@ -67,35 +71,40 @@ def test_add_commodity_and_level(): # Mock the dataframe df = pd.DataFrame({"technology": ["tech1", "tech2"]}) + # FIXME Something here is seriously broken. Annotations need rework and + # please clarify what and how the annotations will be accessed and how the + # resulting data will be used! # Mock the data returned by Context.get_instance and get_codes mock_context_data = { "water set": { "technology": { - "add": pd.Series( - data=[ - Code( - id="tech1", - annotations=["input", "commodity", "com1", "level", "lev1"], - ), - Code(id="tech2", annotations=["input", "commodity", "com2"]), - ], - name="tech", - ) + "add": [ + Code( + id="tech1", + annotations=[ + Annotation("input", "commodity", "com1", "level", "lev1") + ], + ), + Code( + id="tech2", + annotations=[Annotation("input", "commodity", "com2")], + ), + ], } } } - mock_codes_data = pd.Series( - data=[ - Code(id="com1", annotations=["level", "lev1"]), - Code(id="com2", annotations=["level", "lev2"]), - ], - name="com", - ) + mock_codes_data = [ + Code(id="com1", annotations=[Annotation("level", "lev1")]), + Code(id="com2", annotations=[Annotation("level", "lev2")]), + ] # Mock the Context.get_instance and get_codes functions to return mock_data with patch( - "your_module.Context.get_instance", return_value=mock_context_data - ), patch("your_module.get_codes", return_value=mock_codes_data): + "message_ix_models.util.context.Context.get_instance", + return_value=mock_context_data, + ), patch( + "message_ix_models.model.structure.get_codes", return_value=mock_codes_data + ): # Call the function to be tested result = add_commodity_and_level(df) @@ -115,19 +124,26 @@ def test_map_yv_ya_lt(): lt = 20 ya = 2020 + # TODO this is what we should expect given the formula you use in map_yv_ya_lt, + # but is this what you want to see? If not, you should consider changing the formula expected = pd.DataFrame( - {"year_vtg": [2010, 2020, 2020, 2030], "year_act": [2020, 2020, 2030, 2040]} + { + "year_vtg": [2010, 2010, 2020, 2020, 2020, 2030, 2030, 2040], + "year_act": [2020, 2030, 2020, 2030, 2040, 2030, 2040, 2040], + } ) result = map_yv_ya_lt(periods, lt, ya) + print(result) pd.testing.assert_frame_equal(result, expected) + # TODO same as above # test with no active year specified expected_no_ya = pd.DataFrame( { - "year_vtg": [2020, 2020, 2020, 2020], - "year_act": [2020, 2030, 2040, 2050], + "year_vtg": [2010, 2010, 2010, 2020, 2020, 2020, 2030, 2030, 2040], + "year_act": [2010, 2020, 2030, 2020, 2030, 2040, 2030, 2040, 2040], } ) diff --git a/message_ix_models/tests/model/water/test_water_supply.py b/message_ix_models/tests/model/water/test_water_supply.py index 50f2defd8f..1daa1808a7 100644 --- a/message_ix_models/tests/model/water/test_water_supply.py +++ b/message_ix_models/tests/model/water/test_water_supply.py @@ -1,8 +1,9 @@ -from message_ix import Scenario from unittest.mock import patch import pandas as pd +from message_ix import Scenario +from message_ix_models import ScenarioInfo from message_ix_models.model.water.data.water_supply import ( add_e_flow, add_water_supply, @@ -10,25 +11,20 @@ ) -def test_map_basin_region_wat(): - # Mock the context - context = { - "water build info": {"Y": [2020, 2030, 2040]}, - "type_reg": "country", - "regions": "test_region", - "map_ISO_c": {"test_region": "test_ISO"}, - "RCP": "test_RCP", - "REL": "test_REL", - "time": "year", - } +def test_map_basin_region_wat(test_context): + # FIXME You probably want this to be part of a common setup rather than writing + # something like this for every test + # Personalize the context + context = test_context + context["water build info"] = {"Y": [2020, 2030, 2040]} + context.type_reg = "country" + context.regions = "test_region" + context.map_ISO_c = {"test_region": "test_ISO"} + context.RCP = "test_RCP" + context.REL = "test_REL" + context.time = "year" # Mock the DataFrames read from CSV - pd.DataFrame( - { - "BCU_name": ["test_BCU"], - } - ) - df_sw = pd.DataFrame( { "Unnamed: 0": [0], @@ -42,6 +38,9 @@ def test_map_basin_region_wat(): "message_ix_models.util.private_data_path", return_value="path/to/file" ), patch("pandas.read_csv", return_value=df_sw): context["time"] = "year" + # FIXME This is not working with context.type_reg == "country". Have you ever + # confirmed that the code works in this case? If not, maybe this test is not + # needed. result = map_basin_region_wat(context) # Assert the results @@ -53,19 +52,19 @@ def test_map_basin_region_wat(): def test_add_water_supply(test_context): - # Mock the context - context = { - "water build info": {"Y": [2020, 2030, 2040]}, - "type_reg": "country", - "regions": "test_region", - "map_ISO_c": {"test_region": "test_ISO"}, - "RCP": "test_RCP", - "REL": "test_REL", - "time": "year", - "nexus_set": "nexus", - "get_scenario": lambda: {"firstmodelyear": 2020}, - } + # FIXME You probably want this to be part of a common setup rather than writing + # something like this for every test + # Personalize the context context = test_context + context["water build info"] = {"Y": [2020, 2030, 2040]} + context.type_reg = "country" + context.regions = "test_region" + context.map_ISO_c = {"test_region": "test_ISO"} + context.RCP = "test_RCP" + context.REL = "test_REL" + context.time = "year" + context.nexus_set = "nexus" + mp = context.get_platform() scenario_info = { "mp": mp, @@ -78,6 +77,12 @@ def test_add_water_supply(test_context): s.add_set("technology", ["tech1", "tech2"]) s.add_set("node", ["loc1", "loc2"]) s.add_set("year", [2020, 2030, 2040]) + + # FIXME You probably want this to be part of a common setup rather than writing + # something like this for every test + context.set_scenario(s) + context["water build info"] = ScenarioInfo(s) + # Mock the DataFrames read from CSV df_node = pd.DataFrame({"BCU_name": ["test_BCU"], "REGION": ["test_REGION"]}) @@ -110,9 +115,9 @@ def test_add_water_supply(test_context): with patch( "message_ix_models.util.private_data_path", return_value="path/to/file" ), patch("pandas.read_csv", return_value=df_node), patch( - "message_ix_models.model.water.water_supply.map_basin_region_wat", + "message_ix_models.model.water.data.water_supply.map_basin_region_wat", return_value=df_sw, # Adjust this import - ): + ), patch("message_ix_models.util.context.Context.get_scenario", return_value=s): # Call the function to be tested result = add_water_supply(context) @@ -128,32 +133,41 @@ def test_add_water_supply(test_context): assert isinstance(df, pd.DataFrame) -def test_add_e_flow(): - # Mock the context - context = { - "water build info": {"Y": [2020, 2030, 2040]}, - "regions": "test_region", - "RCP": "test_RCP", - "time": "year", - "SDG": True, - } +def test_add_e_flow(test_context): + # FIXME You probably want this to be part of a common setup rather than writing + # something like this for every test + # Personalize the context + context = test_context + context["water build info"] = {"Y": [2020, 2030, 2040]} + context.regions = "test_region" + context.RCP = "test_RCP" + context.REL = "test_REL" + context.time = "year" + context.SDG = True # Mock the DataFrames read from CSV df_sw = pd.DataFrame( - {"Region": ["test_Region"], "value": [1], "year": [2020], "time": ["year"]} - ) - - pd.DataFrame( - {"Region": ["test_Region"], "value": [1], "year": [2020], "time": ["year"]} + { + "Region": ["test_Region"], + "value": [1], + "year": [2020], + "time": ["year"], + "Unnamed: 0": [0], + "BCU_name": ["test_BCU"], + } ) # Mock the function 'read_water_availability' to return the mocked DataFrame with patch( - "message_ix_models.model.water.demands.read_water_availability", + "message_ix_models.model.water.data.demands.read_water_availability", return_value=(df_sw, df_sw), ), patch( "message_ix_models.util.private_data_path", return_value="path/to/file" ), patch("pandas.read_csv", return_value=df_sw): + # FIXME This doesn't work because read_water_availability() in line 749 of + # water/data/demands expects the second column of df_sw to be "years", but it + # contains the names of the columns at that point starting with df_sw here, not + # something that pandas can convert to DateTimes! # Call the function to be tested result = add_e_flow(context)