diff --git a/pathways/data/technologies_shares.yaml b/pathways/data/technologies_shares.yaml index 34de628..b7c4626 100644 --- a/pathways/data/technologies_shares.yaml +++ b/pathways/data/technologies_shares.yaml @@ -25,8 +25,8 @@ Wind: ### Check if IAMs make the distinction between onshore and offshore uncertainty_type: uniform PV: c-Si: - name: electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminimumated, integrated - reference product: electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminimumated, integrated + name: electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminated, integrated + reference product: electricity production, photovoltaic, at 93 kWp slanted-roof, multi-Si, laminated, integrated unit: kilowatt hour share: 2020: @@ -47,7 +47,7 @@ PV: maximum: 0.2500 uncertainty_type: uniform CIGS: - name: electricity production, photovoltaic, 3kWp slanted-roof installation, CdTe, laminimumated, integrated + name: electricity production, photovoltaic, 3kWp slanted-roof installation, CdTe, laminated, integrated reference product: electricity, low voltage unit: kilowatt hour share: @@ -58,7 +58,7 @@ PV: maximum: 0.1250 uncertainty_type: uniform a-Si: - name: electricity production, photovoltaic, 3kWp slanted-roof installation, a-Si, laminimumated, integrated + name: electricity production, photovoltaic, 3kWp slanted-roof installation, a-Si, laminated, integrated reference product: electricity, low voltage unit: kilowatt hour share: @@ -69,8 +69,8 @@ PV: maximum: 0.0090 uncertainty_type: uniform Perovskite: - name: null - reference product: null + name: electricity production, photovoltaic, perovskite + reference product: electricity, low voltage unit: kilowatt hour share: 2020: @@ -80,8 +80,8 @@ PV: maximum: 0.3000 uncertainty_type: uniform GaAs: - name: null - reference product: null + name: electricity production, photovoltaic, GaAs + reference product: electricity, low voltage unit: kilowatt hour share: 2020: @@ -259,7 +259,6 @@ Battery-Mobile: minimum: 0.0 maximum: 0.1 uncertainty_type: uniform -#### ON BATTERIES, WE WILL BE WORKING WITH KILOGRAMS... FORGETTING ABOUT ENERGY DENSITIES :( Battery-Stationary: ### Home storage uses NMC811 (w/o hyphen) in the current premise version NMC111: name: market for battery cell, Li-ion, NMC111 diff --git a/pathways/lca.py b/pathways/lca.py index 8821c26..d8037cd 100644 --- a/pathways/lca.py +++ b/pathways/lca.py @@ -14,7 +14,9 @@ import bw_processing as bwp import numpy as np import pyprind -from bw2calc.monte_carlo import MonteCarloLCA +from bw2calc.monte_carlo import ( # ## Dev version coming: removed `MonteCarloLCA` (normal LCA class can do Monte Carlo) and added `IterativeLCA` (different solving strategy) + MonteCarloLCA, +) from bw2calc.utils import get_datapackage from bw_processing import Datapackage from numpy import dtype, ndarray @@ -24,6 +26,13 @@ from .filesystem_constants import DIR_CACHED_DB from .lcia import fill_characterization_factors_matrices +from .stats import ( + create_mapping_sheet, + log_intensities_to_excel, + log_results_to_excel, + log_subshares_to_excel, + run_stats_analysis, +) from .subshares import ( adjust_matrix_based_on_shares, find_technology_indices, @@ -34,6 +43,7 @@ check_unclassified_activities, fetch_indices, get_unit_conversion_factors, + read_indices_csv, ) # disable warnings @@ -48,35 +58,6 @@ ) -def read_indices_csv(file_path: Path) -> dict[tuple[str, str, str, str], int]: - """ - Reads a CSV file and returns its contents as a dictionary. - - Each row of the CSV file is expected to contain four string values followed by an index. - These are stored in the dictionary as a tuple of the four strings mapped to the index. - - :param file_path: The path to the CSV file. - :type file_path: Path - - :return: A dictionary mapping tuples of four strings to indices. - For technosphere indices, the four strings are the activity name, product name, location, and unit. - For biosphere indices, the four strings are the flow name, category, subcategory, and unit. - :rtype: Dict[Tuple[str, str, str, str], str] - """ - indices = dict() - with open(file_path) as read_obj: - csv_reader = csv.reader(read_obj, delimiter=";") - for row in csv_reader: - try: - indices[(row[0], row[1], row[2], row[3])] = int(row[4]) - except IndexError as err: - logging.error( - f"Error reading row {row} from {file_path}: {err}. " - f"Could it be that the file uses commas instead of semicolons?" - ) - return indices - - def load_matrix_and_index( file_path: Path, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: @@ -85,8 +66,8 @@ def load_matrix_and_index( :param file_path: The path to the CSV file. :type file_path: Path - :return: A tuple containing the data, indices, and sign of the data. - :rtype: Tuple[np.ndarray, np.ndarray, np.ndarray] + :return: A tuple containing the data, indices, and sign of the data as well as the exchanges with distributions. + :rtype: Tuple[np.ndarray, np.ndarray, np.ndarray, list] """ # Load the data from the CSV file array = np.genfromtxt(file_path, delimiter=";", skip_header=1) @@ -125,7 +106,7 @@ def get_lca_matrices( model: str, scenario: str, year: int, -) -> Tuple[Datapackage, Dict, Dict]: +) -> Tuple[Datapackage, Dict, Dict, list[tuple[int, int]]]: """ Retrieve Life Cycle Assessment (LCA) matrices from disk. @@ -137,7 +118,7 @@ def get_lca_matrices( :type scenario: str :param year: The year of the scenario. :type year: int - :rtype: Tuple[sparse.csr_matrix, sparse.csr_matrix, Dict, Dict] + :rtype: Tuple[sparse.csr_matrix, sparse.csr_matrix, Dict, Dict, List] """ # find the correct filepaths in filepaths @@ -177,6 +158,10 @@ def select_filepath(keyword: str, fps): # Load matrices and add them to the datapackage for matrix_name, fp in [("technosphere_matrix", fp_A), ("biosphere_matrix", fp_B)]: data, indices, sign, distributions = load_matrix_and_index(fp) + + if matrix_name == "technosphere_matrix": + uncertain_parameters = find_uncertain_parameters(distributions, indices) + dp.add_persistent_vector( matrix=matrix_name, indices_array=indices, @@ -185,7 +170,22 @@ def select_filepath(keyword: str, fps): distributions_array=distributions, ) - return dp, technosphere_inds, biosphere_inds + return dp, technosphere_inds, biosphere_inds, uncertain_parameters + + +def find_uncertain_parameters( + distributions_array: np.ndarray, indices_array: np.ndarray +) -> list[tuple[int, int]]: + """ + Find the uncertain parameters in the distributions array. They will be used for the stats report + :param distributions_array: + :param indices_array: + :return: + """ + uncertain_indices = np.where(distributions_array["uncertainty_type"] != 0)[0] + uncertain_parameters = [tuple(indices_array[idx]) for idx in uncertain_indices] + + return uncertain_parameters def remove_double_counting(A: csr_matrix, vars_info: dict) -> csr_matrix: @@ -217,7 +217,7 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int """ Process the region data. :param data: Tuple containing the model, scenario, year, region, variables, vars_idx, scenarios, units_map, - demand_cutoff, lca, characterization_matrix, debug, use_distributions. + demand_cutoff, lca, characterization_matrix, debug, use_distributions, uncertain_parameters. :return: Dictionary containing the region data. """ ( @@ -232,12 +232,15 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int demand_cutoff, lca, characterization_matrix, + methods, debug, use_distributions, + uncertain_parameters, ) = data variables_demand = {} d = [] + impacts_by_method = {method: [] for method in methods} for v, variable in enumerate(variables): idx, dataset = vars_idx[variable]["idx"], vars_idx[variable]["dataset"] @@ -288,16 +291,35 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int else: # Use distributions for LCA calculations # next(lca) is a generator that yields the inventory matrix - results = np.array( - [ - (characterization_matrix @ lca.inventory).toarray() - for _ in zip(range(use_distributions), lca) - ] - ) + temp_results = [] + params = {} + param_keys = set() + for _ in zip(range(use_distributions), lca): + matrix_result = (characterization_matrix @ lca.inventory).toarray() + temp_results.append(matrix_result) + for i in range(len(uncertain_parameters)): + param_key = ( + f"{uncertain_parameters[i][0]}_to_{uncertain_parameters[i][1]}" + ) + param_keys.add(param_key) + if param_key not in params: + params[param_key] = [] + value = -lca.technosphere_matrix[ + uncertain_parameters[i][0], uncertain_parameters[i][1] + ] + params[param_key].append(value) + + results = np.array(temp_results) + for idx, method in enumerate(methods): + # Sum over items for each stochastic result, not across all results + method_impacts = results[:, idx, :].sum(axis=1) + impacts_by_method[method] = method_impacts.tolist() # calculate quantiles along the first dimension characterized_inventory = np.quantile(results, [0.05, 0.5, 0.95], axis=0) + log_intensities_to_excel(model, scenario, year, params) + d.append(characterized_inventory) if debug: @@ -321,6 +343,8 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int return { "id_array": id_array, "variables": {k: v["demand"] for k, v in variables_demand.items()}, + "impact_by_method": impacts_by_method, + "param_keys": param_keys, } @@ -346,7 +370,8 @@ def _calculate_year(args: tuple): reverse_classifications, debug, use_distributions, - subshares, + shares, + uncertain_parameters, ) = args print(f"------ Calculating LCA results for {year}...") @@ -368,9 +393,12 @@ def _calculate_year(args: tuple): # Try to load LCA matrices for the given model, scenario, and year try: - bw_datapackage, technosphere_indices, biosphere_indices = get_lca_matrices( - filepaths, model, scenario, year - ) + ( + bw_datapackage, + technosphere_indices, + biosphere_indices, + uncertain_parameters, + ) = get_lca_matrices(filepaths, model, scenario, year) except FileNotFoundError: # If LCA matrices can't be loaded, skip to the next iteration @@ -438,16 +466,26 @@ def _calculate_year(args: tuple): ) lca.lci() - if subshares: + if shares: logging.info("Calculating LCA results with subshares.") shares_indices = find_technology_indices(regions, technosphere_indices, geo) correlated_arrays = adjust_matrix_based_on_shares( - lca, shares_indices, subshares, use_distributions, year + filepaths, + lca, + shares_indices, + shares, + use_distributions, + model, + scenario, + year, ) bw_correlated = get_subshares_matrix(correlated_arrays) lca.packages.append(get_datapackage(bw_correlated)) lca.use_arrays = True + # + # # Log + # log_subshares_to_excel(model, scenario, year, shares) characterization_matrix = fill_characterization_factors_matrices( methods=methods, @@ -461,6 +499,8 @@ def _calculate_year(args: tuple): f"Characterization matrix created. Shape: {characterization_matrix.shape}" ) + total_impacts_by_method = {method: [] for method in methods} + all_param_keys = set() bar = pyprind.ProgBar(len(regions)) for region in regions: bar.update() @@ -478,9 +518,22 @@ def _calculate_year(args: tuple): demand_cutoff, lca, characterization_matrix, + methods, debug, use_distributions, + uncertain_parameters, ) ) + if use_distributions != 0: + for method, impacts in results[region]["impact_by_method"].items(): + total_impacts_by_method[method].extend(impacts) + all_param_keys.update(results[region]["param_keys"]) + + if use_distributions != 0: + if shares: + log_subshares_to_excel(model, scenario, year, shares) + log_results_to_excel(model, scenario, year, total_impacts_by_method, methods) + create_mapping_sheet(filepaths, model, scenario, year, all_param_keys) + run_stats_analysis(model, scenario, year, methods) return results diff --git a/pathways/lcia.py b/pathways/lcia.py index ec76178..7e02a6e 100644 --- a/pathways/lcia.py +++ b/pathways/lcia.py @@ -24,11 +24,11 @@ def get_lcia_method_names(): def format_lcia_method_exchanges(method): """ - Format LCIA method data to fit such structure: - (name, unit, type, category, subcategory, amount, uncertainty type, uncertainty amount) - - :param method: LCIA method - :return: list of tuples + Format LCIA method data to fit such structure: + (name, unit, type, category, subcategory, amount, uncertainty type, uncertainty amount) + - + :param method: LCIA method + :return: list of tuples """ return { diff --git a/pathways/pathways.py b/pathways/pathways.py index f488ae0..6df6db0 100644 --- a/pathways/pathways.py +++ b/pathways/pathways.py @@ -324,11 +324,16 @@ def calculate( if k in self.scenarios.coords["variables"].values } - # Create xarray for storing LCA results if not already present - if self.lca_results is None: - _, technosphere_index, _ = get_lca_matrices( + try: + _, technosphere_index, _, uncertain_parameters = get_lca_matrices( self.filepaths, models[0], scenarios[0], years[0] ) + except Exception as e: + logging.error(f"Error retrieving LCA matrices: {str(e)}") + return + + # Create xarray for storing LCA results if not already present + if self.lca_results is None: locations = fetch_inventories_locations(technosphere_index) self.lca_results = create_lca_results_array( @@ -344,13 +349,12 @@ def calculate( ) # generate share of sub-technologies + shares = None if subshares: shares = generate_samples( years=self.scenarios.coords["year"].values.tolist(), iterations=use_distributions, ) - else: - shares = None # Iterate over each combination of model, scenario, and year results = {} @@ -377,6 +381,7 @@ def calculate( self.reverse_classifications, self.debug, use_distributions, + uncertain_parameters, shares, ) for year in years @@ -414,6 +419,7 @@ def calculate( self.debug, use_distributions, shares or None, + uncertain_parameters, ) ) for year in years diff --git a/pathways/stats.py b/pathways/stats.py new file mode 100644 index 0000000..ee01539 --- /dev/null +++ b/pathways/stats.py @@ -0,0 +1,284 @@ +import os +import re +from pathlib import Path + +import pandas as pd +import statsmodels.api as sm +from openpyxl import load_workbook + + +def log_subshares_to_excel(model: str, scenario: str, year: int, shares: dict): + """ + Logs results to an Excel file named according to model, scenario, and year, specifically for the given year. + This method assumes that each entry in the shares defaultdict is structured to be directly usable in a DataFrame. + + Parameters: + :param model: The model name. + :param scenario: The scenario name. + :param year: The specific year for which data is being logged. + :param shares: A nested defaultdict containing shares data for multiple years and types. + """ + filename = f"stats_report_{model}_{scenario}_{year}.xlsx" + data = [] + + first_tech = next(iter(shares), None) + if not first_tech or year not in shares[first_tech]: + print(f"No data found for year {year} in any technology.") + return + + num_iterations = len(shares[first_tech][year][next(iter(shares[first_tech][year]))]) + for i in range(num_iterations): + iteration_data = {"Iteration": i + 1, "Year": year} + for tech, years_data in shares.items(): + if year in years_data: + for subtype, values in years_data[year].items(): + iteration_data[f"{tech}_{subtype}"] = ( + values[i] if i < len(values) else None + ) + data.append(iteration_data) + + new_df = pd.DataFrame(data) + + try: + if os.path.exists(filename): + df_existing = pd.read_excel(filename) + # Merge new data into existing data, selectively updating share columns + combined_df = ( + df_existing.set_index(["Iteration", "Year"]) + .combine_first(new_df.set_index(["Iteration", "Year"])) + .reset_index() + ) + # Optionally, ensure the columns are in a meaningful order + new_columns = [ + col for col in new_df.columns if col not in ["Iteration", "Year"] + ] + existing_columns = [ + col for col in df_existing.columns if col not in new_df.columns + ] + combined_df = combined_df[ + ["Iteration", "Year"] + new_columns + existing_columns + ] + + combined_df.to_excel(filename, index=False) + else: + new_df.to_excel(filename, index=False) + except Exception as e: + print(f"Error updating Excel file: {str(e)}") + + +def log_intensities_to_excel(model: str, scenario: str, year: int, new_data: dict): + """ + Update or create an Excel file with new columns of data, based on model, scenario, and year. + + :param model: The model name. + :param scenario: The scenario name. + :param year: The year for which the data is logged. + :param new_data: Dictionary where keys are the new column names and values are lists of data for each column. + """ + filename = f"stats_report_{model}_{scenario}_{year}.xlsx" + + if not new_data: + print("Warning: No new data provided to log.") + return + + try: + if os.path.exists(filename): + df_existing = pd.read_excel(filename) + max_length = max(len(v) for v in new_data.values()) + + df_new = pd.DataFrame(index=range(max_length), columns=new_data.keys()) + for key, values in new_data.items(): + df_new[key][: len(values)] = values + + df_new["Iteration"] = range(1, max_length + 1) + df_new["Year"] = [year] * max_length + + combined_df = pd.merge( + df_existing, + df_new, + on=["Iteration", "Year"], + how="outer", + suffixes=("", "_new"), + ) + + for col in df_new.columns: + if col + "_new" in combined_df: + combined_df[col].update(combined_df.pop(col + "_new")) + + # Remove any '_new' columns if they still exist after updates + combined_df = combined_df.loc[:, ~combined_df.columns.str.endswith("_new")] + + df = combined_df + else: + max_length = max(len(v) for v in new_data.values()) + df = pd.DataFrame(new_data, index=range(max_length)) + df["Iteration"] = range(1, max_length + 1) + df["Year"] = [year] * max_length + + df.to_excel(filename, index=False) + except Exception as e: + print(f"Failed to update the Excel file: {e}") + + +def log_results_to_excel( + model: str, + scenario: str, + year: int, + total_impacts_by_method: dict, + methods: list, + filepath=None, +): + """ + Log the characterized inventory results for each LCIA method into separate columns in an Excel file. + + :param model: The model name. + :param scenario: The scenario name. + :param year: The year for which the data is being logged. + :param total_impacts_by_method: Dictionary where keys are method names and values are lists of impacts + from all regions and distributions. + :param methods: List of method names. + :param filepath: Optional. File path for the Excel file to save the results. + """ + + if filepath is None: + filepath = f"stats_report_{model}_{scenario}_{year}.xlsx" + + try: + df = pd.read_excel(filepath) + except FileNotFoundError: + df = pd.DataFrame() + + for method, impacts in total_impacts_by_method.items(): + df[method] = pd.Series(impacts) + + base_cols = ["Iteration", "Year"] if "Iteration" in df.columns else [] + other_cols = [col for col in df.columns if col not in base_cols + methods] + df = df[base_cols + methods + other_cols] + + df.to_excel(filepath, index=False) + + +def create_mapping_sheet( + filepaths: list, model: str, scenario: str, year: int, parameter_keys: list +): + """ + Create a mapping sheet for the activities with uncertainties. + :param filepaths: List of paths to data files. + :param model: Model name as a string. + :param scenario: Scenario name as a string. + :param year: Year as an integer. + :param parameter_keys: List of parameter keys used in intensity iterations. + """ + + def filter_filepaths(suffix: str, contains: list): + return [ + Path(fp) + for fp in filepaths + if all(kw in fp for kw in contains) + and Path(fp).suffix == suffix + and Path(fp).exists() + ] + + unique_indices = {int(idx) for key in parameter_keys for idx in key.split("_to_")} + + fps = filter_filepaths(".csv", [model, scenario, str(year)]) + if len(fps) < 1: + raise ValueError(f"No relevant files found for {model}, {scenario}, {year}") + + technosphere_indices_path = None + for fp in fps: + if "A_matrix_index" in fp.name: + technosphere_indices_path = fp + break + + if not technosphere_indices_path: + raise FileNotFoundError("Technosphere indices file not found.") + + technosphere_inds = pd.read_csv(technosphere_indices_path, sep=";", header=None) + technosphere_inds.columns = ["Activity", "Product", "Unit", "Location", "Index"] + + mapping_df = technosphere_inds[technosphere_inds["Index"].isin(unique_indices)] + mapping_df = mapping_df[ + ["Activity", "Product", "Location", "Unit", "Index"] + ] # Restrict columns if necessary + + excel_path = f"stats_report_{model}_{scenario}_{year}.xlsx" + + try: + with pd.ExcelWriter( + excel_path, mode="a", engine="openpyxl", if_sheet_exists="replace" + ) as writer: + mapping_df.to_excel(writer, index=False, sheet_name="Mapping") + except Exception as e: + print(f"Error writing mapping sheet to {excel_path}: {str(e)}") + + +def escape_formula(text: str): + """ + Prevent a string from being interpreted as a formula in Excel. + Strings starting with '=', '-', or '+' are prefixed with an apostrophe. + + :param text: The string to be adjusted. + :return: The adjusted string. + """ + return "'" + text if text.startswith(("=", "-", "+")) else text + + +def run_stats_analysis(model: str, scenario: str, year: int, methods: list): + """ + Runs OLS regression for specified methods and writes summaries to an Excel file. + + Each method's OLS summary is placed on a new sheet in the file named + 'stats_report_{model}_{scenario}_{year}.xlsx'. + + :param model: Model identifier. + :param scenario: Scenario name. + :param year: Year of interest. + :param methods: Methods corresponding to dataset columns. + """ + + filename = f"stats_report_{model}_{scenario}_{year}.xlsx" + + try: + book = load_workbook(filename) + except FileNotFoundError: + book = pd.ExcelWriter( + filename, engine="openpyxl" + ) # Create a new workbook if not found + book.close() + book = load_workbook(filename) + + data = pd.read_excel(filename, sheet_name="Sheet1") + + for idx, method in enumerate(methods): + if method not in data.columns: + print(f"Data for {method} not found in the file.") + continue + + Y = data[method] + X = data.drop(columns=["Iteration", "Year"] + methods) + X = sm.add_constant(X) + + model_results = sm.OLS(Y, X).fit() + summary = model_results.summary().as_text() + + sheet_name_base = f"{method[:20]} OLS" + sheet_name = f"{sheet_name_base} {idx + 1}" + while sheet_name in book.sheetnames: + idx += 1 + sheet_name = f"{sheet_name_base} {idx + 1}" + + if sheet_name in book.sheetnames: + std = book[sheet_name] + book.remove(std) + ws = book.create_sheet(sheet_name) + + summary_lines = summary.split("\n") + + for line in summary_lines: + line = escape_formula(line) + columns = re.split(r"\s{2,}", line) + ws.append(columns) + + book.save(filename) + print("Analysis complete and results saved.") diff --git a/pathways/subshares.py b/pathways/subshares.py index a40dd54..0a50cef 100644 --- a/pathways/subshares.py +++ b/pathways/subshares.py @@ -5,6 +5,7 @@ import bw_processing import bw_processing as bwp import numpy as np +import pandas as pd import yaml from bw_processing import Datapackage from premise.geomap import Geomap @@ -186,10 +187,13 @@ def get_subshares_matrix( def adjust_matrix_based_on_shares( + filepaths: list, lca: bw2calc.LCA, shares_dict: dict, subshares: dict, use_distributions: int, + model: str, + scenario: str, year: int, ): """ diff --git a/pathways/utils.py b/pathways/utils.py index d2133ee..f46a3ea 100644 --- a/pathways/utils.py +++ b/pathways/utils.py @@ -103,6 +103,35 @@ def load_units_conversion() -> dict: return data +def read_indices_csv(file_path: Path) -> dict[tuple[str, str, str, str], int]: + """ + Reads a CSV file and returns its contents as a dictionary. + + Each row of the CSV file is expected to contain four string values followed by an index. + These are stored in the dictionary as a tuple of the four strings mapped to the index. + + :param file_path: The path to the CSV file. + :type file_path: Path + + :return: A dictionary mapping tuples of four strings to indices. + For technosphere indices, the four strings are the activity name, product name, location, and unit. + For biosphere indices, the four strings are the flow name, category, subcategory, and unit. + :rtype: Dict[Tuple[str, str, str, str], str] + """ + indices = dict() + with open(file_path) as read_obj: + csv_reader = csv.reader(read_obj, delimiter=";") + for row in csv_reader: + try: + indices[(row[0], row[1], row[2], row[3])] = int(row[4]) + except IndexError as err: + logging.error( + f"Error reading row {row} from {file_path}: {err}. " + f"Could it be that the file uses commas instead of semicolons?" + ) + return indices + + def create_lca_results_array( methods: [List[str], None], years: List[int], @@ -253,6 +282,54 @@ def clean_cache_directory(): file.unlink() +# def resize_scenario_data( +# scenario_data: xr.DataArray, +# model: List[str], +# scenario: List[str], +# region: List[str], +# year: List[int], +# variables: List[str], +# ) -> xr.DataArray: +# """ +# Resize the scenario data to the given scenario, year, region, and variables. +# :param model: List of models. +# :param scenario_data: xarray DataArray with scenario data. +# :param scenario: List of scenarios. +# :param year: List of years. +# :param region: List of regions. +# :param variables: List of variables. +# :return: Resized scenario data. +# """ +# def append_missing(data_array, dimension, new_values, dim_name): +# for value in new_values: +# if value not in data_array.coords[dim_name].values: +# new_entry = xr.full_like(data_array.isel(**{dim_name: 0}), fill_value=0) # Placeholder value +# new_entry.coords[dim_name] = value +# data_array = xr.concat([data_array, new_entry], dim=dim_name) +# return data_array +# +# # Check and append missing model, scenario, region, year, and variables +# scenario_data = append_missing(scenario_data, 'model', model, 'model') +# scenario_data = append_missing(scenario_data, 'pathway', scenario, 'pathway') +# scenario_data = append_missing(scenario_data, 'region', region, 'region') +# scenario_data = append_missing(scenario_data, 'year', year, 'year') +# scenario_data = append_missing(scenario_data, 'variables', variables, 'variables') +# +# # Collect indices for all dimensions now that all entries are guaranteed to exist +# indices = { +# 'model': [scenario_data.coords['model'].values.tolist().index(x) for x in model], +# 'pathway': [scenario_data.coords['pathway'].values.tolist().index(x) for x in scenario], +# 'region': [scenario_data.coords['region'].values.tolist().index(x) for x in region], +# 'year': [scenario_data.coords['year'].values.tolist().index(x) for x in year], +# 'variables': [scenario_data.coords['variables'].values.tolist().index(x) for x in variables], +# } +# +# # Use multi-dimensional indexing to select data +# scenario_data = scenario_data.isel(**indices) +# +# return scenario_data + + def resize_scenario_data( scenario_data: xr.DataArray, model: List[str],