Skip to content

Commit

Permalink
Add apply_consequence_scaling method for the loss model.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioannis-vm committed May 19, 2024
1 parent 0586289 commit 715df6f
Show file tree
Hide file tree
Showing 4 changed files with 379 additions and 22 deletions.
71 changes: 71 additions & 0 deletions pelicun/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,19 @@ def split_file_name(file_path: str):
Separates a file name from the extension accounting for the case
where the file name itself contains periods.
Parameters
----------
file_path: str
Original file path.
Returns
-------
tuple
name: str
Name of the file.
extension: str
File extension.
"""
path = Path(file_path)
name = path.stem
Expand Down Expand Up @@ -835,6 +848,64 @@ def show_matrix(data, use_describe=False):
pp.pprint(pd.DataFrame(data))


def multiply_factor_multiple_levels(
df, conditions, factor, axis=0, raise_missing=True
):
"""
Multiply a value to selected rows of a DataFrame that is indexed
with a hierarchical index (pd.MultiIndex). The change is done in
place.
Parameters
----------
df: pd.DataFrame
The DataFrame to be modified.
conditions: dict
A dictionary mapping level names with a single value. Only the
rows where the index levels have the provided values will be
affected. The dictionary can be empty, in which case all rows
will be affected, or contain only some levels and values, in
which case only the matching rows will be affected.
factor: float
Scaling factor to use.
axis: int
With 0 the condition is checked against the DataFrame's index,
otherwise with 1 it is checked against the DataFrame's
columns.
Raises
------
ValueError
If the provided `axis` values is not either 0 or 1.
ValueError
If there are no rows matching the conditions and raise_missing
is True.
"""

if axis == 0:
idx_to_use = df.index
elif axis == 1:
idx_to_use = df.columns
else:
raise ValueError(f'Invalid axis: `{axis}`')

mask = pd.Series(True, index=idx_to_use)

# Apply each condition to update the mask
for level, value in conditions.items():
mask &= idx_to_use.get_level_values(level) == value

# pylint: disable=singleton-comparison
if np.all(mask == False) and raise_missing: # noqa
raise ValueError(f'No rows found matching the conditions: `{conditions}`')

if axis == 0:
df.iloc[mask.values] *= factor
else:
df.iloc[:, mask.values] *= factor


def _warning(message, category, filename, lineno, file=None, line=None):
"""
Custom warning function to format and print warnings more
Expand Down
70 changes: 70 additions & 0 deletions pelicun/model/loss_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,76 @@ def calculate(self):

self.log.msg("Loss calculation successful.")

def apply_consequence_scaling(self, scaling_conditions, scaling_factor):
"""
Applies a scaling factor to selected columns of the loss
samples.
The scaling conditiones are passed as a dictionary mapping
level names with their required value for the condition to be
met. It has to contain `dv` as one of its keys, defining the
decision variable where the factors should be applied. Other
valid levels include:
- `dmg`: containing a source component name,
- `loc`: containing a location,
- `dir`: containing a direction,
- `uid`: containing a Unique Component ID (UID).
If any of the keys is missing, it is assumed that the scaling
factor should be applied to all relevant consequences (those
matching the remaining values of the hierarchical index).
Parameters
----------
scaling_conditions: dict
A dictionary mapping level names with a single value. Only the
rows where the index levels have the provided values will be
affected. The dictionary can be empty, in which case all rows
will be affected, or contain only some levels and values, in
which case only the matching rows will be affected.
scaling_factor: float
Scaling factor to use.
Raises
------
ValueError
If the scaling_conditions dictionary does not contain a
`dv` key.
"""

# make sure we won't apply the same factor to all DVs at once,
# highly unlikely anyone would actually want to do this.
if 'dv' not in scaling_conditions:
raise ValueError(
'The index of the `scaling_conditions` dictionary '
'should contain a level named `dv` listing the '
'relevant decision variable.'
)

for model in self._loss_models:

# check if it's empty
if model.sample is None:
continue

# ensure the levels exist (but don't check if speicfied
# values exist yet)
for name in scaling_conditions:
if name not in model.sample.columns.names:
raise ValueError(
f'`scaling_factors` contains an unknown level: `{name}`.'
)

# apply scaling factors
base.multiply_factor_multiple_levels(
model.sample,
scaling_conditions,
scaling_factor,
axis=1,
raise_missing=True,
)

def save_sample(self, filepath=None, save_units=False):
"""
Saves the loss sample to a CSV file or returns it as a
Expand Down
100 changes: 100 additions & 0 deletions pelicun/tests/model/test_loss_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
These are unit and integration tests on the loss model of pelicun.
"""

from itertools import product
from copy import deepcopy
import pytest
import numpy as np
Expand Down Expand Up @@ -91,6 +92,75 @@ def asset_model_A(self, asset_model_empty):
asset.generate_cmp_sample(sample_size=10)
return asset

@pytest.fixture
def loss_model_with_ones(self, assessment_instance):
loss_model = assessment_instance.loss

# add artificial values to the samples
data_ds = {}
for (
decision_variable,
consequence,
component,
damage_state,
location,
direction,
uid,
) in product(
('dv1', 'dv2'),
('cons1', 'cons2'),
('cmp1', 'cmp2'),
('ds1', 'ds2'),
('loc1', 'loc2'),
('dir1', 'dir2'),
('uid1', 'uid2'),
):
data_ds[
(
decision_variable,
consequence,
component,
damage_state,
location,
direction,
uid,
)
] = [1.00, 1.00, 1.00]
loss_model.ds_model.sample = pd.DataFrame(data_ds).rename_axis(
columns=['dv', 'loss', 'dmg', 'ds', 'loc', 'dir', 'uid']
)
data_lf = {}
for (
decision_variable,
consequence,
component,
location,
direction,
uid,
) in product(
('dv1', 'dv2'),
('cons1', 'cons2'),
('cmp1', 'cmp2'),
('loc1', 'loc2'),
('dir1', 'dir2'),
('uid1', 'uid2'),
):
data_lf[
(
decision_variable,
consequence,
component,
location,
direction,
uid,
)
] = [1.00, 1.00, 1.00]
loss_model.lf_model.sample = pd.DataFrame(data_lf).rename_axis(
columns=['dv', 'loss', 'dmg', 'loc', 'dir', 'uid']
)

return loss_model

def test___init__(self, loss_model):
assert loss_model.log
assert loss_model.ds_model
Expand Down Expand Up @@ -274,6 +344,36 @@ def test_aggregate_losses_when_no_loss(self, assessment_instance):
),
)

def test_consequence_scaling(self, loss_model_with_ones):

# When only `dv` is provided
scaling_conditions = {'dv': 'dv1'}
scaling_factor = 2.00

loss_model_with_ones.apply_consequence_scaling(
scaling_conditions, scaling_factor
)

for loss_model in loss_model_with_ones._loss_models:
mask = (loss_model.sample.columns.get_level_values('dv') == 'dv1')
assert np.all(loss_model.sample.iloc[:, mask] == 2.00)
assert np.all(loss_model.sample.iloc[:, ~mask] == 1.00)
loss_model.sample.iloc[:, :] = 1.00

scaling_conditions = {'dv': 'dv2', 'loc': 'loc1', 'uid': 'uid2'}
scaling_factor = 2.00
loss_model_with_ones.apply_consequence_scaling(
scaling_conditions, scaling_factor
)

for loss_model in loss_model_with_ones._loss_models:
mask = np.full(len(loss_model.sample.columns), True)
mask &= (loss_model.sample.columns.get_level_values('dv') == 'dv2')
mask &= (loss_model.sample.columns.get_level_values('loc') == 'loc1')
mask &= (loss_model.sample.columns.get_level_values('uid') == 'uid2')
assert np.all(loss_model.sample.iloc[:, mask] == 2.00)
assert np.all(loss_model.sample.iloc[:, ~mask] == 1.00)


class TestRepairModel_Base(TestPelicunModel):
def test___init__(self, assessment_instance):
Expand Down
Loading

0 comments on commit 715df6f

Please sign in to comment.