Skip to content

Commit

Permalink
Black reformating
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi committed Apr 30, 2024
1 parent 3f24f1d commit ab9ce28
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 69 deletions.
60 changes: 38 additions & 22 deletions pathways/lca.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import bw_processing as bwp
import numpy as np
import pyprind
from bw2calc.monte_carlo import MonteCarloLCA ### Dev version coming: removed `MonteCarloLCA` (normal LCA class can do Monte Carlo) and added `IterativeLCA` (different solving strategy)
from bw2calc.monte_carlo import ( # ## Dev version coming: removed `MonteCarloLCA` (normal LCA class can do Monte Carlo) and added `IterativeLCA` (different solving strategy)
MonteCarloLCA,
)
from bw2calc.utils import get_datapackage
from bw_processing import Datapackage
from numpy import dtype, ndarray
Expand All @@ -24,6 +26,13 @@

from .filesystem_constants import DIR_CACHED_DB
from .lcia import fill_characterization_factors_matrices
from .stats import (
create_mapping_sheet,
log_intensities_to_excel,
log_results_to_excel,
log_subshares_to_excel,
run_stats_analysis,
)
from .subshares import (
adjust_matrix_based_on_shares,
find_technology_indices,
Expand All @@ -37,14 +46,6 @@
read_indices_csv,
)

from .stats import (
log_subshares_to_excel,
log_intensities_to_excel,
log_results_to_excel,
create_mapping_sheet,
run_stats_analysis,
)

# disable warnings
warnings.filterwarnings("ignore")

Expand Down Expand Up @@ -172,14 +173,16 @@ def select_filepath(keyword: str, fps):
return dp, technosphere_inds, biosphere_inds, uncertain_parameters


def find_uncertain_parameters(distributions_array: np.ndarray, indices_array: np.ndarray) -> list[tuple[int, int]]:
def find_uncertain_parameters(
distributions_array: np.ndarray, indices_array: np.ndarray
) -> list[tuple[int, int]]:
"""
Find the uncertain parameters in the distributions array. They will be used for the stats report
:param distributions_array:
:param indices_array:
:return:
"""
uncertain_indices = np.where(distributions_array['uncertainty_type'] != 0)[0]
uncertain_indices = np.where(distributions_array["uncertainty_type"] != 0)[0]
uncertain_parameters = [tuple(indices_array[idx]) for idx in uncertain_indices]

return uncertain_parameters
Expand Down Expand Up @@ -295,12 +298,16 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
matrix_result = (characterization_matrix @ lca.inventory).toarray()
temp_results.append(matrix_result)
for i in range(len(uncertain_parameters)):
param_key = f'{uncertain_parameters[i][0]}_to_{uncertain_parameters[i][1]}'
param_keys.add(param_key)
if param_key not in params:
params[param_key] = []
value = - lca.technosphere_matrix[uncertain_parameters[i][0], uncertain_parameters[i][1]]
params[param_key].append(value)
param_key = (
f"{uncertain_parameters[i][0]}_to_{uncertain_parameters[i][1]}"
)
param_keys.add(param_key)
if param_key not in params:
params[param_key] = []
value = -lca.technosphere_matrix[
uncertain_parameters[i][0], uncertain_parameters[i][1]
]
params[param_key].append(value)

results = np.array(temp_results)
for idx, method in enumerate(methods):
Expand All @@ -313,7 +320,6 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int

log_intensities_to_excel(model, scenario, year, params)


d.append(characterized_inventory)

if debug:
Expand Down Expand Up @@ -387,9 +393,12 @@ def _calculate_year(args: tuple):

# Try to load LCA matrices for the given model, scenario, and year
try:
bw_datapackage, technosphere_indices, biosphere_indices, uncertain_parameters = get_lca_matrices(
filepaths, model, scenario, year
)
(
bw_datapackage,
technosphere_indices,
biosphere_indices,
uncertain_parameters,
) = get_lca_matrices(filepaths, model, scenario, year)

except FileNotFoundError:
# If LCA matrices can't be loaded, skip to the next iteration
Expand Down Expand Up @@ -461,7 +470,14 @@ def _calculate_year(args: tuple):
logging.info("Calculating LCA results with subshares.")
shares_indices = find_technology_indices(regions, technosphere_indices, geo)
correlated_arrays = adjust_matrix_based_on_shares(
filepaths, lca, shares_indices, shares, use_distributions, model, scenario, year
filepaths,
lca,
shares_indices,
shares,
use_distributions,
model,
scenario,
year,
)
bw_correlated = get_subshares_matrix(correlated_arrays)

Expand Down
10 changes: 5 additions & 5 deletions pathways/lcia.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def get_lcia_method_names():

def format_lcia_method_exchanges(method):
"""
Format LCIA method data to fit such structure:
(name, unit, type, category, subcategory, amount, uncertainty type, uncertainty amount)
-
:param method: LCIA method
:return: list of tuples
Format LCIA method data to fit such structure:
(name, unit, type, category, subcategory, amount, uncertainty type, uncertainty amount)
-
:param method: LCIA method
:return: list of tuples
"""

return {
Expand Down
1 change: 0 additions & 1 deletion pathways/pathways.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ def calculate(

self._fill_in_result_array(results)


def _fill_in_result_array(self, results: dict):

# Assuming DIR_CACHED_DB, results, and self.lca_results are already defined
Expand Down
94 changes: 55 additions & 39 deletions pathways/stats.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pandas as pd
import re
from pathlib import Path

import pandas as pd
import statsmodels.api as sm
import re
from openpyxl import load_workbook


Expand All @@ -26,24 +27,32 @@ def log_subshares_to_excel(model, scenario, year, shares):
sample_tech = next(iter(shares), None)
if sample_tech and year in shares[sample_tech]:
# Create data for each iteration
num_iterations = len(shares[sample_tech][year][next(iter(shares[sample_tech][year]))])
num_iterations = len(
shares[sample_tech][year][next(iter(shares[sample_tech][year]))]
)
for i in range(num_iterations):
iteration_data = {'Iteration': i + 1, 'Year': year}
iteration_data = {"Iteration": i + 1, "Year": year}
for tech, years_data in shares.items():
if year in years_data:
for subtype, values in years_data[year].items():
iteration_data[f'{tech}_{subtype}'] = values[i]
iteration_data[f"{tech}_{subtype}"] = values[i]
data.append(iteration_data)

new_df = pd.DataFrame(data)
try:
# Try to load the existing Excel file
with pd.ExcelWriter(filename, mode='a', engine='openpyxl', if_sheet_exists='overlay') as writer:
with pd.ExcelWriter(
filename, mode="a", engine="openpyxl", if_sheet_exists="overlay"
) as writer:
existing_df = pd.read_excel(filename)
# Combine the old data with the new data, aligning on 'Iteration' and 'Year'
final_df = pd.merge(existing_df, new_df, on=['Iteration', 'Year'], how='outer')
final_df = pd.merge(
existing_df, new_df, on=["Iteration", "Year"], how="outer"
)
# Reorder columns to ensure 'Iteration' and 'Year' are first, followed by any new columns
column_order = ['Iteration', 'Year'] + [c for c in new_df.columns if c not in ['Iteration', 'Year']]
column_order = ["Iteration", "Year"] + [
c for c in new_df.columns if c not in ["Iteration", "Year"]
]
final_df = final_df[column_order]
final_df.to_excel(writer, index=False)
except FileNotFoundError:
Expand All @@ -62,37 +71,39 @@ def log_intensities_to_excel(model: str, scenario: str, year: int, new_data: dic
:param year: The year for which the data is logged.
:param new_data: Dictionary where keys are the new column names and values are lists of data for each column.
"""
filename = f'stats_report_{model}_{scenario}_{year}.xlsx'
filename = f"stats_report_{model}_{scenario}_{year}.xlsx"

try:
df = pd.read_excel(filename)
except FileNotFoundError:
df = pd.DataFrame()

if 'Iteration' not in df.columns or df.empty:
if "Iteration" not in df.columns or df.empty:
max_length = max(len(data) for data in new_data.values())
df['Iteration'] = range(1, max_length + 1)
df['Year'] = [year] * max_length
df["Iteration"] = range(1, max_length + 1)
df["Year"] = [year] * max_length

if not df.empty and len(df) != len(new_data[next(iter(new_data))]):
df = df.iloc[:len(new_data[next(iter(new_data))])]
df = df.iloc[: len(new_data[next(iter(new_data))])]

for column_name, data in new_data.items():
if len(data) != len(df):
raise ValueError(f"Length of data for '{column_name}' ({len(data)}) does not match DataFrame length ({len(df)}).")
raise ValueError(
f"Length of data for '{column_name}' ({len(data)}) does not match DataFrame length ({len(df)})."
)
df[column_name] = data

df.to_excel(filename, index=False)



def log_results_to_excel(
model: str,
scenario: str,
year: int,
total_impacts_by_method: dict,
methods: list,
filepath=None):
model: str,
scenario: str,
year: int,
total_impacts_by_method: dict,
methods: list,
filepath=None,
):
"""
Log the characterized inventory results for each LCIA method into separate columns in an Excel file.
Expand All @@ -117,14 +128,16 @@ def log_results_to_excel(
for method, impacts in total_impacts_by_method.items():
df[method] = pd.Series(impacts)

base_cols = ['Iteration', 'Year'] if 'Iteration' in df.columns else []
base_cols = ["Iteration", "Year"] if "Iteration" in df.columns else []
other_cols = [col for col in df.columns if col not in base_cols + methods]
df = df[base_cols + methods + other_cols]

df.to_excel(filepath, index=False)


def create_mapping_sheet(filepaths: list, model: str, scenario: str, year: int, parameter_keys: list):
def create_mapping_sheet(
filepaths: list, model: str, scenario: str, year: int, parameter_keys: list
):
"""
Create a mapping sheet for the activities with uncertainties.
:param filepaths: List of paths to data files.
Expand All @@ -139,12 +152,12 @@ def filter_filepaths(suffix: str, contains: list):
Path(fp)
for fp in filepaths
if all(kw in fp for kw in contains)
and Path(fp).suffix == suffix
and Path(fp).exists()
and Path(fp).suffix == suffix
and Path(fp).exists()
]

# Convert parameter keys into a set of unique indices
unique_indices = {int(idx) for key in parameter_keys for idx in key.split('_to_')}
unique_indices = {int(idx) for key in parameter_keys for idx in key.split("_to_")}

fps = filter_filepaths(".csv", [model, scenario, str(year)])
if len(fps) < 1:
Expand All @@ -163,13 +176,17 @@ def filter_filepaths(suffix: str, contains: list):
technosphere_inds.columns = ["Activity", "Product", "Unit", "Location", "Index"]

# Filter the DataFrame using unique indices
mapping_df = technosphere_inds[technosphere_inds['Index'].isin(unique_indices)]
mapping_df = mapping_df[["Activity", "Product", "Location", "Unit", "Index"]] # Restrict columns if necessary
mapping_df = technosphere_inds[technosphere_inds["Index"].isin(unique_indices)]
mapping_df = mapping_df[
["Activity", "Product", "Location", "Unit", "Index"]
] # Restrict columns if necessary

excel_path = f"stats_report_{model}_{scenario}_{year}.xlsx"

try:
with pd.ExcelWriter(excel_path, mode='a', engine='openpyxl', if_sheet_exists='replace') as writer:
with pd.ExcelWriter(
excel_path, mode="a", engine="openpyxl", if_sheet_exists="replace"
) as writer:
mapping_df.to_excel(writer, index=False, sheet_name="Mapping")
except Exception as e:
print(f"Error writing mapping sheet to {excel_path}: {str(e)}")
Expand All @@ -183,7 +200,7 @@ def escape_formula(text: str):
:param text: The string to be adjusted.
:return: The adjusted string.
"""
return "'" + text if text.startswith(('=', '-', '+')) else text
return "'" + text if text.startswith(("=", "-", "+")) else text


def run_stats_analysis(model: str, scenario: str, year: int, methods: list):
Expand All @@ -199,25 +216,27 @@ def run_stats_analysis(model: str, scenario: str, year: int, methods: list):
:param methods: Methods corresponding to dataset columns.
"""

filename = f'stats_report_{model}_{scenario}_{year}.xlsx'
filename = f"stats_report_{model}_{scenario}_{year}.xlsx"

# Attempt to load the existing workbook
try:
book = load_workbook(filename)
except FileNotFoundError:
book = pd.ExcelWriter(filename, engine='openpyxl') # Create a new workbook if not found
book = pd.ExcelWriter(
filename, engine="openpyxl"
) # Create a new workbook if not found
book.close()
book = load_workbook(filename)

data = pd.read_excel(filename, sheet_name='Sheet1')
data = pd.read_excel(filename, sheet_name="Sheet1")

for idx, method in enumerate(methods):
if method not in data.columns:
print(f"Data for {method} not found in the file.")
continue

Y = data[method]
X = data.drop(columns=['Iteration', 'Year'] + methods)
X = data.drop(columns=["Iteration", "Year"] + methods)
X = sm.add_constant(X)

model_results = sm.OLS(Y, X).fit()
Expand All @@ -236,7 +255,7 @@ def run_stats_analysis(model: str, scenario: str, year: int, methods: list):
ws = book.create_sheet(sheet_name)

# Split summary into lines and write upper part to the sheet
summary_lines = summary.split('\n')
summary_lines = summary.split("\n")
upper_part = summary_lines[:10]
lower_part = summary_lines[10:]

Expand All @@ -249,11 +268,8 @@ def run_stats_analysis(model: str, scenario: str, year: int, methods: list):
for line in summary_lines:
line = escape_formula(line)
# Split line based on consecutive spaces for proper column separation
columns = re.split(r'\s{2,}', line)
columns = re.split(r"\s{2,}", line)
ws.append(columns)

book.save(filename)
print("Analysis complete and results saved.")



3 changes: 1 addition & 2 deletions pathways/subshares.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import bw2calc
import bw_processing
import bw_processing as bwp
import pandas as pd
import numpy as np
import pandas as pd
import yaml
from bw_processing import Datapackage
from premise.geomap import Geomap
Expand All @@ -15,7 +15,6 @@
from pathways.filesystem_constants import DATA_DIR
from pathways.utils import get_activity_indices


SUBSHARES = DATA_DIR / "technologies_shares.yaml"

logging.basicConfig(
Expand Down
Loading

0 comments on commit ab9ce28

Please sign in to comment.