Skip to content

Commit

Permalink
adapted methods, created module files
Browse files Browse the repository at this point in the history
  • Loading branch information
cafriedb committed Aug 6, 2024
1 parent 99e77f6 commit 4a2242d
Show file tree
Hide file tree
Showing 7 changed files with 1,363 additions and 715 deletions.
151 changes: 151 additions & 0 deletions dev/activity_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Imports
# -------

import yaml

# Sector filter functions from premise
# ---------------------------------------------------

def act_fltr(
database: list,
fltr = None,
mask = None,
):
"""Filter `database` for activities_list matching field contents given by `fltr` excluding strings in `mask`.
`fltr`: string, list of strings or dictionary.
If a string is provided, it is used to match the name field from the start (*startswith*).
If a list is provided, all strings in the lists are used and dataframes_dict are joined (*or*).
A dict can be given in the form <fieldname>: <str> to filter for <str> in <fieldname>.
`mask`: used in the same way as `fltr`, but filters add up with each other (*and*).
`filter_exact` and `mask_exact`: boolean, set `True` to only allow for exact matches.
:param database: A lice cycle inventory database
:type database: brightway2 database object
:param fltr: value(s) to filter with.
:type fltr: Union[str, lst, dict]
:param mask: value(s) to filter with.
:type mask: Union[str, lst, dict]
:return: list of activity data set names
:rtype: list
"""
if fltr is None:
fltr = {}
if mask is None:
mask = {}

# default field is name
if isinstance(fltr, (list, str)):
fltr = {"name": fltr}
if isinstance(mask, (list, str)):
mask = {"name": mask}

assert len(fltr) > 0, "Filter dict must not be empty."

# find `act` in `database` that match `fltr`
# and do not match `mask`
filters = database
for field, value in fltr.items():
if isinstance(value, list):
for val in value:
filters = [a for a in filters if val in a[field]]

#filters.extend([ws.either(*[ws.contains(field, v) for v in value])])
else:
filters = [
a for a in filters if value in a[field]
]

#filters.append(ws.contains(field, value))


if mask:
for field, value in mask.items():
if isinstance(value, list):
for val in value:
filters = [f for f in filters if val not in f[field]]
#filters.extend([ws.exclude(ws.contains(field, v)) for v in value])
else:
filters = [f for f in filters if value not in f[field]]
#filters.append(ws.exclude(ws.contains(field, value)))

return filters


def generate_sets_from_filters(yaml_filepath, database=None) -> dict:
"""
Generate a dictionary with sets of activity names for
technologies from the filter specifications.
:param filtr:
:func:`activity_maps.InventorySet.act_fltr`.
:return: dictionary with the same keys as provided in filter
and a set of activity data set names as values.
:rtype: dict
"""

filtr=get_mapping(yaml_filepath, var='ecoinvent_aliases')

names = []

for entry in filtr.values():
if "fltr" in entry:
if isinstance(entry["fltr"], dict):
if "name" in entry["fltr"]:
names.extend(entry["fltr"]["name"])
elif isinstance(entry["fltr"], list):
names.extend(entry["fltr"])
else:
names.append(entry["fltr"])

#subset = list(
# ws.get_many(
# database,
# ws.either(*[ws.contains("name", name) for name in names]),
# )
#)

subset=[
a for a in database if any(


x in a["name"] for x in names
)
]


techs = {
tech: act_fltr(subset, fltr.get("fltr"), fltr.get("mask"))
for tech, fltr in filtr.items()
}

mapping = {
tech: {act for act in actlst} for tech, actlst in techs.items()
}


return mapping

def get_mapping(filepath, var):
"""
Loa a YAML file and return a dictionary given a variable.
:param filepath: YAML file path
:param var: variable to return the dictionary for.
:param model: if provided, only return the dictionary for this model.
:return: a dictionary
"""

with open(filepath, "r", encoding="utf-8") as stream:
techs = yaml.full_load(stream)

mapping = {}
for key, val in techs.items():
if var in val:
mapping[key] = val[var]

return mapping


# Example on how to call the functions to create a set of filtered activities_list
#set_from_fltrs = generate_sets_from_filters(yaml_filepath, database=ei39SSP)

110 changes: 110 additions & 0 deletions dev/cpc_inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@

# Imports
# --------

#brightway
import brightway2 as bw
import bw2analyzer as ba

#common
import pandas as pd
import numpy as np

#to be completed
import ast

# Function to generate dataframes containing inputs in cpc format not characterized from an activity list
# Level 2.3 plot dependency
# ------------------------------------------------------------------------------------------------------------------------------------

def get_cpc_inputs_of_activities(activities_list, input_type='list'):

'''
for param description see function lvl23_plot_input_comparison_plot_no_method
NOTE: could adapt this function to get the outputs, or create another one. At the moment only inputs are considered.
'''

def activity_list_inputs_cpc(activities_list, input_type):
all_inputs = []

if input_type == 'list':
activity_iterator = activities_list
elif input_type == 'dict':
activity_iterator = activities_list.values()
else:
raise ValueError("input_type must be either 'list' or 'dict'")

for activity in activity_iterator:
inputs_keys = pd.Series({bw.get_activity(exc.input).key: exc.amount for exc in activity.technosphere()},
name=activity['name'] + ', ' + activity['location'])

# Adjust the way the key is presented
inputs_keys = inputs_keys.reset_index()
inputs_keys['full_key'] = inputs_keys.apply(lambda row: f"('{row['level_0']}', '{row['level_1']}')", axis=1)
inputs_keys = inputs_keys.drop(['level_0', 'level_1'], axis=1).set_index('full_key')

# Add empty cpc column and activity information
inputs_keys.insert(0, 'identifier', activity['name'] + ', ' + activity['location'])
inputs_keys.insert(1, 'activity', activity['name'])
inputs_keys.insert(2, 'location', activity['location'])
inputs_keys.insert(3, 'unit', activity['unit'])
inputs_keys.insert(4, 'cpc', None)

all_inputs.append(inputs_keys)

# Combine all inputs into a single DataFrame
combined_inputs = pd.concat(all_inputs, axis=0)

return combined_inputs

def update_cpc_information(combined_inputs):
for index, row in combined_inputs.iterrows():
# Transform each key to tuple
tuple_key = ast.literal_eval(index)

# Get input activity for the key
input_activity = bw.get_activity(tuple_key)

# Get cpc name for activity
cpc_name = ba.comparisons.get_cpc(input_activity)

# Store cpc_name in the 'cpc' column of the combined_inputs dataframe
combined_inputs.at[index, 'cpc'] = cpc_name

return combined_inputs

def transform_dataframe(combined_inputs):
# Set 'identifier' as the new index and drop the 'full_key' index
combined_inputs = combined_inputs.reset_index().set_index('identifier').drop('full_key', axis=1)

# Determine the index of the 'unit' column
unit_index = combined_inputs.columns.get_loc('unit')

# Split the dataframe into two parts
combined_inputs_identifier = combined_inputs.iloc[:, :unit_index+1]
combined_inputs_cpc = combined_inputs.iloc[:, unit_index+1:]
#set index of to 'cpc' in combined_input_cpc
combined_inputs_cpc = combined_inputs_cpc.set_index('cpc')

# Combine rows with the same index value in combined_inputs_cpc
combined_inputs_cpc = combined_inputs_cpc.groupby(level=0).agg(lambda x: np.sum(x) if x.dtype.kind in 'biufc' else x.iloc[0])
# Transpose combined_inputs_cpc
combined_inputs_cpc_trans = combined_inputs_cpc.T

# Merge combined_inputs_identifier and combined_inputs_cpc_trans
result = combined_inputs_identifier.join(combined_inputs_cpc_trans)
result = result.drop_duplicates()

# Sort dataframe by activity and location aplphabetically and reset the index
result = result.sort_values(by=['activity', 'location'])
result = result.reset_index(drop=True)
return result

# Execute the workflow
combined_inputs = activity_list_inputs_cpc(activities_list, input_type)
combined_inputs_with_cpc = update_cpc_information(combined_inputs)
final_result = transform_dataframe(combined_inputs_with_cpc)

return final_result
103 changes: 47 additions & 56 deletions dev/functions_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,63 +187,54 @@ def get_mapping(filepath, var):
# METHODS
# -----------------------------------------------------------------------------

# Setting up the methods for outlier detection
# ---------------------------------------------------------------------

def find_and_create_method(criteria, exclude=None):
"""
Find a method based on given criteria and create a Brightway Method object. This will choose the first method.
Thus, filter criteria need to be defined precisely to pick the right method.
:param criteria: List of strings that should be in the method name
:param exclude: List of strings that should not be in the method name (optional)
:return: Brightway Method object
"""
methods = bw.methods

# Start with all methods
filtered_methods = methods

# Apply inclusion criteria
for criterion in criteria:
filtered_methods = [m for m in filtered_methods if criterion in str(m)]

# Apply exclusion criteria if provided
if exclude:
for exclusion in exclude:
filtered_methods = [m for m in filtered_methods if exclusion not in str(m)]

# Check if we found exactly one method
if len(filtered_methods) == 0:
raise ValueError("No methods found matching the given criteria.")
elif len(filtered_methods) > 1:
raise ValueError(f"Multiple methods found: {filtered_methods}. Please provide more specific criteria.")

# Get the first (and only) method
selected_method = filtered_methods[0]

# Create and return the Brightway Method object storing it in a defined variable outside of the funciton.
return bw.Method(selected_method)

#NOTE: Would a yaml filter make it easier? OR Could have predefined methods?"""

# Function for creating method dictionaries which holds method name and unit for later tracking of methods.
# ---------------------------------------------------------------------------------------------------------

def create_method_dict(selected_methods_list):
'''
:selected_methods_list: a list of variables which contain the selected methods
'''
method_dict = {}
for method in selected_methods_list:
method_dict[method] = {
'short name': str(method.name[2]),
'method name': str(method.name),
'method unit': str(method.metadata['unit'])
# Class for generating method dictionary
# --------------------------------------
class MethodFinder:
def __init__(self):
self.all_methods = {}
self.method_counter = 0

def find_and_create_method(self, criteria, exclude=None, custom_key=None):
methods = bw.methods
# Start with all methods
filtered_methods = methods
# Apply inclusion criteria
for criterion in criteria:
filtered_methods = [m for m in filtered_methods if criterion in str(m)]
# Apply exclusion criteria if provided
if exclude:
for exclusion in exclude:
filtered_methods = [m for m in filtered_methods if exclusion not in str(m)]
# Check if we found exactly one method
if len(filtered_methods) == 0:
raise ValueError("No methods found matching the given criteria.")
elif len(filtered_methods) > 1:
raise ValueError(f"Multiple methods found: {filtered_methods}. Please provide more specific criteria.")
# Get the first (and only) method
selected_method = filtered_methods[0]
# Create the Brightway Method object
method_object = bw.Method(selected_method)

# Generate a key for storing the method
if custom_key is None:
self.method_counter += 1
key = f"method_{self.method_counter}"
else:
key = custom_key

# Store the method object and additional information in the dictionary
self.all_methods[key] = {
'object': method_object,
'method name': method_object.name,
'short name' : method_object.name[2],
'unit': method_object.metadata.get('unit', 'Unknown')
}

return method_dict

# Return both the method object and its key
return {key: self.all_methods[key]}

def get_all_methods(self):
return self.all_methods

# ------------------------------------------------------------------------------------------------------------------------------
# CALCULATIONS
Expand Down
Loading

0 comments on commit 4a2242d

Please sign in to comment.