Skip to content

Commit

Permalink
double_accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarojhahn committed May 19, 2024
1 parent 03196e2 commit 32804d8
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 22 deletions.
9 changes: 8 additions & 1 deletion pathways/data/smart_categories.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,11 @@ Transport:
- energy use, electric bicycle
- transport, passenger, bicycle
- transport, passenger electric bicycle

Exceptions:
### The input to these activities will not be zeroed.
ecoinvent_aliases:
name_fltr:
- electricity production, photovoltaic, commercial
- electricity production, photovoltaic, residential
product_fltr:
- electricity, low voltage
42 changes: 29 additions & 13 deletions pathways/lca.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def remove_double_accounting(
lca: bc.LCA,
demand: Dict,
activities_to_exclude: List[int],
exceptions: List[int],
):
"""
Remove double counting from a technosphere matrix.
Expand All @@ -206,15 +207,16 @@ def remove_double_accounting(
row_idx = np.where(tm_modified.col == act)[0]

for idx in row_idx:
if tm_modified.row[idx] != act: # skip the diagonal
# Skip the diagonal and exceptions
if tm_modified.row[idx] != act and (exceptions is None or tm_modified.col[idx] not in exceptions):
tm_modified.data[idx] = 0

tm_modified = tm_modified.tocsr()
tm_modified.eliminate_zeros()

# Remove double accounting
lca.technosphere_matrix = tm_modified
lca.lci(demand=demand)
# lca.lci(demand=demand)


def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int]]:
Expand All @@ -237,7 +239,7 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
lca,
characterization_matrix,
methods,
activities_to_exclude,
# activities_to_exclude,
debug,
use_distributions,
uncertain_parameters,
Expand All @@ -246,6 +248,7 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
variables_demand = {}
d = []
impacts_by_method = {method: [] for method in methods}
param_keys = set()

for v, variable in enumerate(variables):
idx, dataset = vars_idx[variable]["idx"], vars_idx[variable]["dataset"]
Expand Down Expand Up @@ -286,10 +289,11 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
}

demand = {idx: demand.values * float(unit_vector)}
if activities_to_exclude is not None:
remove_double_accounting(
lca=lca, demand=demand, activities_to_exclude=activities_to_exclude
)
lca.lci(demand=demand)
# if activities_to_exclude is not None:
# remove_double_accounting(
# lca=lca, demand=demand, activities_to_exclude=activities_to_exclude, exceptions=exception_activities
# )

if use_distributions == 0:
# Regular LCA
Expand All @@ -302,7 +306,7 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int
# next(lca) is a generator that yields the inventory matrix
temp_results = []
params = {}
param_keys = set()

for _ in zip(range(use_distributions), lca):
matrix_result = (characterization_matrix @ lca.inventory).toarray()
temp_results.append(matrix_result)
Expand Down Expand Up @@ -418,8 +422,8 @@ def _calculate_year(args: tuple):

if double_accounting is not None:
categories = read_categories_from_yaml(DATA_DIR / "smart_categories.yaml")
selected_filters = get_combined_filters(categories, double_accounting)
activities_to_exclude = apply_filters(technosphere_indices, selected_filters)
combined_filters, exception_filters = get_combined_filters(categories, double_accounting)
activities_to_exclude, exceptions = apply_filters(technosphere_indices, combined_filters, exception_filters)
else:
activities_to_exclude = None

Expand Down Expand Up @@ -467,7 +471,13 @@ def _calculate_year(args: tuple):
bw_datapackage,
],
)
lca.lci(factorize=True)
# lca.lci(factorize=True)
if activities_to_exclude is not None:
remove_double_accounting(
lca=lca, demand={0: 1}, activities_to_exclude=activities_to_exclude, exceptions=exceptions
)
else:
lca.lci(factorize=True)

else:
logging.info("Calculating LCA results with distributions.")
Expand All @@ -478,7 +488,13 @@ def _calculate_year(args: tuple):
],
use_distributions=True,
)
lca.lci()
# lca.lci()
if activities_to_exclude is not None:
remove_double_accounting(
lca=lca, demand={0: 1}, activities_to_exclude=activities_to_exclude, exceptions=exceptions
)
else:
lca.lci()

if shares:
logging.info("Calculating LCA results with subshares.")
Expand Down Expand Up @@ -533,7 +549,7 @@ def _calculate_year(args: tuple):
lca,
characterization_matrix,
methods,
activities_to_exclude,
# activities_to_exclude,
debug,
use_distributions,
uncertain_parameters,
Expand Down
56 changes: 48 additions & 8 deletions pathways/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def harmonize_units(scenario: xr.DataArray, variables: list) -> xr.DataArray:
:return: xr.DataArray
"""

missing_vars = [var for var in variables if var not in scenario.attrs["units"]]
if missing_vars:
raise KeyError(f"The following variables are missing in 'scenario.attrs[\"units\"]': {missing_vars}")

units = [scenario.attrs["units"][var] for var in variables]

if len(variables) == 0:
Expand Down Expand Up @@ -548,13 +552,16 @@ def gather_filters(current_level: Dict, combined_filters: Dict[str, Set[str]]) -
gather_filters(value, combined_filters)


def get_combined_filters(filters: Dict, paths: List[List[str]]) -> Dict[str, List[str]]:
def get_combined_filters(
filters: Dict,
paths: List[List[str]]
) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]:
"""
Traverse the filters dictionary to get combined filter criteria based on multiple paths.
:param filters: The filters dictionary loaded from YAML.
:param paths: A list of lists, where each inner list represents a path in the hierarchy.
:return: The combined filter criteria dictionary.
:return: A tuple with combined filter criteria dictionary and exceptions dictionary.
"""
combined_filters = {
"name_fltr": set(),
Expand All @@ -563,6 +570,11 @@ def get_combined_filters(filters: Dict, paths: List[List[str]]) -> Dict[str, Lis
"product_mask": set(),
}

exceptions_filters = {
"name_fltr": set(),
"product_fltr": set(),
}

for path in paths:
current_level = filters
for key in path:
Expand All @@ -571,37 +583,51 @@ def get_combined_filters(filters: Dict, paths: List[List[str]]) -> Dict[str, Lis
break
gather_filters(current_level, combined_filters)

# Convert sets back to lists
# Gather exceptions from the "Exceptions" path
exceptions = filters.get("Exceptions", {}).get("ecoinvent_aliases", {})
for k in exceptions_filters.keys():
exceptions_filters[k].update(exceptions.get(k, []))

for k in combined_filters.keys():
combined_filters[k] = list(combined_filters[k])
for k in exceptions_filters.keys():
exceptions_filters[k] = list(exceptions_filters[k])

return combined_filters
return combined_filters, exceptions_filters


def apply_filters(
technosphere_inds: Dict[Tuple[str, str, str, str], int],
filters: Dict[str, List[str]],
) -> List[int]:
exceptions: Dict[str, List[str]]
) -> Tuple[List[int], List[int]]:
"""
Apply the filters to the database and return a list of indices.
Apply the filters to the database and return a list of indices and exceptions.
:param technosphere_inds: Dictionary where keys are tuples of four strings (activity name, product name, location, unit)
and values are integers (indices).
:param filters: Dictionary containing the filter criteria.
:return: List of indices of filtered activities.
:param exceptions: Dictionary containing the exceptions criteria.
:return: Tuple containing a list of indices of filtered activities and a list of indices of exceptions.
"""
name_fltr = filters.get("name_fltr", [])
name_mask = filters.get("name_mask", [])
product_fltr = filters.get("product_fltr", [])
product_mask = filters.get("product_mask", [])

exception_name_fltr = exceptions.get("name_fltr", [])
exception_product_fltr = exceptions.get("product_fltr", [])
exception_name_mask = exceptions.get("name_mask", [])
exception_product_mask = exceptions.get("product_mask", [])

def match_filter(item, filter_values):
return any(fltr in item for fltr in filter_values)

def match_mask(item, mask_values):
return any(msk in item for msk in mask_values)

filtered_indices = []
exception_indices = []

for key, value in technosphere_inds.items():
name, product, location, unit = key
Expand All @@ -617,4 +643,18 @@ def match_mask(item, mask_values):

filtered_indices.append(value)

return filtered_indices
for key, value in technosphere_inds.items():
name, product, location, unit = key

if exception_name_fltr and not match_filter(name, exception_name_fltr):
continue
if exception_product_fltr and not match_filter(product, exception_product_fltr):
continue
if exception_name_mask and match_mask(name, exception_name_mask):
continue
if exception_product_mask and match_mask(product, exception_product_mask):
continue

exception_indices.append(value)

return filtered_indices, exception_indices

0 comments on commit 32804d8

Please sign in to comment.