Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi authored and romainsacchi committed Jul 22, 2024
1 parent bfce1bf commit 2fcbac7
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 99 deletions.
1 change: 1 addition & 0 deletions dev/timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
p.calculate(
methods=[
'EF v3.1 EN15804 - climate change - global warming potential (GWP100)',
'EF v3.1 EN15804 - ecotoxicity: freshwater - comparative toxic unit for ecosystems (CTUe)',
],
regions=["CH",],
scenarios=p.scenarios.pathway.values.tolist(),
Expand Down
141 changes: 71 additions & 70 deletions pathways/lca.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
from pathlib import Path
from typing import Any, Dict, List, Tuple
from collections import defaultdict

import bw2calc as bc
import bw_processing as bwp
Expand Down Expand Up @@ -270,9 +271,7 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int

variables_demand = {}
d = []
impacts_by_method = {method: [] for method in methods}
param_keys = set()
params_container = []


total_demand = (
scenarios.sel(
Expand All @@ -292,7 +291,6 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
)
return d

variable_count = 0
for v, variable in enumerate(variables):
idx, dataset = vars_idx[variable]["idx"], vars_idx[variable]["dataset"]
# Compute the unit conversion vector for the given activities
Expand Down Expand Up @@ -335,71 +333,75 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
variables_demand[variable] = {
"id": idx,
"demand": demand,
"fu": {idx: demand},
"dataset": dataset,
"unit vector": unit_vector,
}

functional_unit = {idx: demand}
param_keys = set()


if use_distributions == 0:
# Regular LCA calculations
with CustomFilter("(almost) singular matrix"):
lca.lci(demand=functional_unit)
for key, value in variables_demand.items():
lca.lci(demand=value["fu"])

characterized_inventory = (
characterization_matrix @ lca.inventory
).toarray()
d.append(characterized_inventory)

if debug:
logging.info(
f"var.: {key}, name: {value['dataset'][0][:50]}, "
f"ref.: {value['dataset'][1]}, unit: {value['dataset'][2][:50]}, idx: {value['idx']},"
f"loc.: {value['dataset'][3]}, demand: {value['demand']}, "
f"unit conv.: {value['unit vector']}, "
f"impact: {np.round(characterized_inventory.sum(axis=-1) / value['demand'], 3)}. "
)

else:
# Use distributions for LCA calculations
# next(lca) is a generator that yields the inventory matrix
results = np.zeros(
(use_distributions, len(variables_demand), len(methods))
)
params = defaultdict(list)

if use_distributions == 0:
# Regular LCA
characterized_inventory = (
characterization_matrix @ lca.inventory
).toarray()
params_container = defaultdict(list)
with CustomFilter("(almost) singular matrix"):
for iteration in zip(range(use_distributions), lca):
for key, value in enumerate(variables_demand.values()):
lca.lci(demand=value["fu"])

else:
# Use distributions for LCA calculations
# next(lca) is a generator that yields the inventory matrix
temp_results = []
params = {}

for _ in zip(range(use_distributions), lca):

matrix_result = (characterization_matrix @ lca.inventory).toarray()
temp_results.append(matrix_result)
for i in range(len(uncertain_parameters)):
param_key = (
f"{uncertain_parameters[i][0]}_to_{uncertain_parameters[i][1]}"
)
param_keys.add(param_key)
if param_key not in params:
params[param_key] = []
value = -lca.technosphere_matrix[
uncertain_parameters[i][0], uncertain_parameters[i][1]
]
params[param_key].append(value)
matrix_result = (characterization_matrix @ lca.inventory).toarray()
results[iteration[0], key] = matrix_result.sum(axis=-1)

params_container.append(params)
for i in range(len(uncertain_parameters)):
param_key = (
f"{uncertain_parameters[i][0]}_to_{uncertain_parameters[i][1]}"
)
param_keys.add(param_key)
value = -lca.technosphere_matrix[
uncertain_parameters[i][0], uncertain_parameters[i][1]
]
params[param_key].append(value)

results = np.array(temp_results)
for idx, method in enumerate(methods):
# Sum over items for each stochastic result, not across all results
method_impacts = results[:, idx, :].sum(axis=1)
impacts_by_method[method] = method_impacts.tolist()
params_container[key].append(params)

# calculate quantiles along the first dimension
characterized_inventory = np.quantile(results, [0.05, 0.5, 0.95], axis=0)
# calculate quantiles along the first dimension
characterized_inventory = np.quantile(results, [0.05, 0.5, 0.95], axis=0)

d.append(characterized_inventory)
variable_count += 1 # Increment the counter

if debug:
logging.info(
f"var.: {variable}, name: {dataset[0][:50]}, "
f"ref.: {dataset[1]}, unit: {dataset[2][:50]}, idx: {idx},"
f"loc.: {dataset[3]}, demand: {variables_demand[variable]['demand']}, "
f"unit conv.: {unit_vector}, "
f"impact: {np.round(characterized_inventory.sum(axis=-1) / variables_demand[variable]['demand'], 3)}. "
if len(params_container) > 0:
log_intensities_to_excel(
year=year,
params=params_container,
export_path=STATS_DIR / f"{model}_{scenario}_{year}.xlsx",
)

if len(params_container) > 0:
log_intensities_to_excel(
year=year,
params=params_container,
export_path=STATS_DIR / f"{model}_{scenario}_{year}.xlsx",
)

# Save the characterization vectors to disk
id_array = uuid.uuid4()
np.save(file=DIR_CACHED_DB / f"{id_array}.npy", arr=np.stack(d))
Expand All @@ -412,7 +414,6 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
return {
"id_array": id_array,
"variables": {k: v["demand"] for k, v in variables_demand.items()},
"impact_by_method": impacts_by_method,
"param_keys": param_keys,
}

Expand Down Expand Up @@ -588,8 +589,8 @@ def _calculate_year(args: tuple):
# mc_lca,
)
)
for method, impacts in results[region]["impact_by_method"].items():
total_impacts_by_method[method].extend(impacts)
#for method, impacts in results[region]["impact_by_method"].items():
# total_impacts_by_method[method].extend(impacts)
all_param_keys.update(results[region]["param_keys"])
else:
results[region] = process_region(
Expand Down Expand Up @@ -621,11 +622,11 @@ def _calculate_year(args: tuple):
shares=shares,
export_path=export_path,
)
log_results_to_excel(
total_impacts_by_method=total_impacts_by_method,
methods=methods,
filepath=export_path,
)
#log_results_to_excel(
# total_impacts_by_method=total_impacts_by_method,
# methods=methods,
# filepath=export_path,
#)
create_mapping_sheet(
filepaths=filepaths,
model=model,
Expand All @@ -634,13 +635,13 @@ def _calculate_year(args: tuple):
parameter_keys=all_param_keys,
export_path=export_path,
)
run_GSA_OLS(
methods=methods,
export_path=export_path,
)
print(
f"OLS summaries have been saved to the 'OLS' sheets in {export_path.resolve()}"
)
#run_GSA_OLS(
# methods=methods,
# export_path=export_path,
#)
#print(
# f"OLS summaries have been saved to the 'OLS' sheets in {export_path.resolve()}"
#)

run_GSA_delta(
methods=methods,
Expand Down
54 changes: 25 additions & 29 deletions pathways/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
import pandas as pd
import statsmodels.api as sm
from openpyxl import load_workbook
from openpyxl import load_workbook, Workbook
from SALib.analyze import delta


Expand Down Expand Up @@ -313,58 +313,54 @@ def run_GSA_OLS(methods: list, export_path: Path):
try:
book = load_workbook(export_path)
except FileNotFoundError:
book = pd.ExcelWriter(export_path, engine="openpyxl")
book.close()
book = Workbook()
book.save(export_path)
book = load_workbook(export_path)

data = pd.read_excel(export_path, sheet_name="Sheet1")

if "OLS" not in book.sheetnames:
ws = book.create_sheet("OLS")
else:
if "OLS" in book.sheetnames:
ws = book["OLS"]
book.remove(ws)
ws = book.create_sheet("OLS")

for idx, method in enumerate(methods):
ws = book.create_sheet("OLS")

X_base = data.drop(columns=["Iteration", "Year"] + methods)
X_base = sm.add_constant(X_base)
corr_matrix = X_base.corr().abs()
upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
high_correlation = [column for column in upper_triangle.columns if any(upper_triangle[column] > 0.95)]

if high_correlation:
print(f"OLS: High multicollinearity detected in columns: {high_correlation}")
X_base = X_base.drop(columns=high_correlation)

results = []
for method in methods:
if method not in data.columns:
print(f"Data for {method} not found in the file.")
continue

Y = data[method]
X = data.drop(columns=["Iteration", "Year"] + methods)
X = sm.add_constant(X)

# Check for multicollinearity
corr_matrix = X.corr().abs()
upper_triangle = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
high_correlation = [
column
for column in upper_triangle.columns
if any(upper_triangle[column] > 0.95)
]
if high_correlation:
print(
f"OLS: High multicollinearity detected in columns: {high_correlation}"
)
X = X.drop(columns=high_correlation)
X = X_base.copy()

try:
model_results = sm.OLS(Y, X).fit()
summary = model_results.summary().as_text()
summary_lines = summary.split("\n")

ws.append([f"OLS Summary for {method}"])
results.append([f"OLS Summary for {method}"])
for line in summary_lines:
line = escape_formula(line)
columns = re.split(r"\s{2,}", line)
ws.append(columns)
ws.append([])
results.append(columns)
results.append([])
except Exception as e:
print(f"Error running OLS for method {method}: {e}")

for result in results:
ws.append(result)

book.save(export_path)


Expand Down

0 comments on commit 2fcbac7

Please sign in to comment.