From 715df6ff9895e8f4c793a771db62baf44c880e40 Mon Sep 17 00:00:00 2001 From: John Vouvakis Manousakis Date: Sun, 19 May 2024 03:41:45 -0700 Subject: [PATCH] Add `apply_consequence_scaling` method for the loss model. --- pelicun/base.py | 71 +++++++++++ pelicun/model/loss_model.py | 70 +++++++++++ pelicun/tests/model/test_loss_model.py | 100 ++++++++++++++++ pelicun/tests/test_base.py | 160 +++++++++++++++++++++---- 4 files changed, 379 insertions(+), 22 deletions(-) diff --git a/pelicun/base.py b/pelicun/base.py index 6843448e7..7cce08ad9 100644 --- a/pelicun/base.py +++ b/pelicun/base.py @@ -465,6 +465,19 @@ def split_file_name(file_path: str): Separates a file name from the extension accounting for the case where the file name itself contains periods. + Parameters + ---------- + file_path: str + Original file path. + + Returns + ------- + tuple + name: str + Name of the file. + extension: str + File extension. + """ path = Path(file_path) name = path.stem @@ -835,6 +848,64 @@ def show_matrix(data, use_describe=False): pp.pprint(pd.DataFrame(data)) +def multiply_factor_multiple_levels( + df, conditions, factor, axis=0, raise_missing=True +): + """ + Multiply a value to selected rows of a DataFrame that is indexed + with a hierarchical index (pd.MultiIndex). The change is done in + place. + + Parameters + ---------- + df: pd.DataFrame + The DataFrame to be modified. + conditions: dict + A dictionary mapping level names with a single value. Only the + rows where the index levels have the provided values will be + affected. The dictionary can be empty, in which case all rows + will be affected, or contain only some levels and values, in + which case only the matching rows will be affected. + factor: float + Scaling factor to use. + axis: int + With 0 the condition is checked against the DataFrame's index, + otherwise with 1 it is checked against the DataFrame's + columns. + + Raises + ------ + ValueError + If the provided `axis` values is not either 0 or 1. + ValueError + If there are no rows matching the conditions and raise_missing + is True. + + """ + + if axis == 0: + idx_to_use = df.index + elif axis == 1: + idx_to_use = df.columns + else: + raise ValueError(f'Invalid axis: `{axis}`') + + mask = pd.Series(True, index=idx_to_use) + + # Apply each condition to update the mask + for level, value in conditions.items(): + mask &= idx_to_use.get_level_values(level) == value + + # pylint: disable=singleton-comparison + if np.all(mask == False) and raise_missing: # noqa + raise ValueError(f'No rows found matching the conditions: `{conditions}`') + + if axis == 0: + df.iloc[mask.values] *= factor + else: + df.iloc[:, mask.values] *= factor + + def _warning(message, category, filename, lineno, file=None, line=None): """ Custom warning function to format and print warnings more diff --git a/pelicun/model/loss_model.py b/pelicun/model/loss_model.py index 188fe6efc..f3cee3147 100644 --- a/pelicun/model/loss_model.py +++ b/pelicun/model/loss_model.py @@ -405,6 +405,76 @@ def calculate(self): self.log.msg("Loss calculation successful.") + def apply_consequence_scaling(self, scaling_conditions, scaling_factor): + """ + Applies a scaling factor to selected columns of the loss + samples. + + The scaling conditiones are passed as a dictionary mapping + level names with their required value for the condition to be + met. It has to contain `dv` as one of its keys, defining the + decision variable where the factors should be applied. Other + valid levels include: + - `dmg`: containing a source component name, + - `loc`: containing a location, + - `dir`: containing a direction, + - `uid`: containing a Unique Component ID (UID). + + If any of the keys is missing, it is assumed that the scaling + factor should be applied to all relevant consequences (those + matching the remaining values of the hierarchical index). + + Parameters + ---------- + scaling_conditions: dict + A dictionary mapping level names with a single value. Only the + rows where the index levels have the provided values will be + affected. The dictionary can be empty, in which case all rows + will be affected, or contain only some levels and values, in + which case only the matching rows will be affected. + scaling_factor: float + Scaling factor to use. + + Raises + ------ + ValueError + If the scaling_conditions dictionary does not contain a + `dv` key. + + """ + + # make sure we won't apply the same factor to all DVs at once, + # highly unlikely anyone would actually want to do this. + if 'dv' not in scaling_conditions: + raise ValueError( + 'The index of the `scaling_conditions` dictionary ' + 'should contain a level named `dv` listing the ' + 'relevant decision variable.' + ) + + for model in self._loss_models: + + # check if it's empty + if model.sample is None: + continue + + # ensure the levels exist (but don't check if speicfied + # values exist yet) + for name in scaling_conditions: + if name not in model.sample.columns.names: + raise ValueError( + f'`scaling_factors` contains an unknown level: `{name}`.' + ) + + # apply scaling factors + base.multiply_factor_multiple_levels( + model.sample, + scaling_conditions, + scaling_factor, + axis=1, + raise_missing=True, + ) + def save_sample(self, filepath=None, save_units=False): """ Saves the loss sample to a CSV file or returns it as a diff --git a/pelicun/tests/model/test_loss_model.py b/pelicun/tests/model/test_loss_model.py index 068cf3427..37ba17e33 100644 --- a/pelicun/tests/model/test_loss_model.py +++ b/pelicun/tests/model/test_loss_model.py @@ -42,6 +42,7 @@ These are unit and integration tests on the loss model of pelicun. """ +from itertools import product from copy import deepcopy import pytest import numpy as np @@ -91,6 +92,75 @@ def asset_model_A(self, asset_model_empty): asset.generate_cmp_sample(sample_size=10) return asset + @pytest.fixture + def loss_model_with_ones(self, assessment_instance): + loss_model = assessment_instance.loss + + # add artificial values to the samples + data_ds = {} + for ( + decision_variable, + consequence, + component, + damage_state, + location, + direction, + uid, + ) in product( + ('dv1', 'dv2'), + ('cons1', 'cons2'), + ('cmp1', 'cmp2'), + ('ds1', 'ds2'), + ('loc1', 'loc2'), + ('dir1', 'dir2'), + ('uid1', 'uid2'), + ): + data_ds[ + ( + decision_variable, + consequence, + component, + damage_state, + location, + direction, + uid, + ) + ] = [1.00, 1.00, 1.00] + loss_model.ds_model.sample = pd.DataFrame(data_ds).rename_axis( + columns=['dv', 'loss', 'dmg', 'ds', 'loc', 'dir', 'uid'] + ) + data_lf = {} + for ( + decision_variable, + consequence, + component, + location, + direction, + uid, + ) in product( + ('dv1', 'dv2'), + ('cons1', 'cons2'), + ('cmp1', 'cmp2'), + ('loc1', 'loc2'), + ('dir1', 'dir2'), + ('uid1', 'uid2'), + ): + data_lf[ + ( + decision_variable, + consequence, + component, + location, + direction, + uid, + ) + ] = [1.00, 1.00, 1.00] + loss_model.lf_model.sample = pd.DataFrame(data_lf).rename_axis( + columns=['dv', 'loss', 'dmg', 'loc', 'dir', 'uid'] + ) + + return loss_model + def test___init__(self, loss_model): assert loss_model.log assert loss_model.ds_model @@ -274,6 +344,36 @@ def test_aggregate_losses_when_no_loss(self, assessment_instance): ), ) + def test_consequence_scaling(self, loss_model_with_ones): + + # When only `dv` is provided + scaling_conditions = {'dv': 'dv1'} + scaling_factor = 2.00 + + loss_model_with_ones.apply_consequence_scaling( + scaling_conditions, scaling_factor + ) + + for loss_model in loss_model_with_ones._loss_models: + mask = (loss_model.sample.columns.get_level_values('dv') == 'dv1') + assert np.all(loss_model.sample.iloc[:, mask] == 2.00) + assert np.all(loss_model.sample.iloc[:, ~mask] == 1.00) + loss_model.sample.iloc[:, :] = 1.00 + + scaling_conditions = {'dv': 'dv2', 'loc': 'loc1', 'uid': 'uid2'} + scaling_factor = 2.00 + loss_model_with_ones.apply_consequence_scaling( + scaling_conditions, scaling_factor + ) + + for loss_model in loss_model_with_ones._loss_models: + mask = np.full(len(loss_model.sample.columns), True) + mask &= (loss_model.sample.columns.get_level_values('dv') == 'dv2') + mask &= (loss_model.sample.columns.get_level_values('loc') == 'loc1') + mask &= (loss_model.sample.columns.get_level_values('uid') == 'uid2') + assert np.all(loss_model.sample.iloc[:, mask] == 2.00) + assert np.all(loss_model.sample.iloc[:, ~mask] == 1.00) + class TestRepairModel_Base(TestPelicunModel): def test___init__(self, assessment_instance): diff --git a/pelicun/tests/test_base.py b/pelicun/tests/test_base.py index 113866fad..ff517c863 100644 --- a/pelicun/tests/test_base.py +++ b/pelicun/tests/test_base.py @@ -455,28 +455,144 @@ def test_show_matrix(): assert True # if no AssertionError is thrown, then the test passes -# TODO: uncomment this block -# def test__warning(capsys): -# msg = 'This is a test.' -# category = 'undefined' -# base._warning(msg, category, '{path to a file}', '{line number}') -# captured = capsys.readouterr() -# assert ( -# captured.out -# == 'WARNING in {path to a file} at line {line number}\nThis is a test.\n\n' -# ) -# base._warning(msg, category, 'some\\file', '{line number}') -# captured = capsys.readouterr() -# assert ( -# captured.out -# == 'WARNING in some/file at line {line number}\nThis is a test.\n\n' -# ) -# base._warning(msg, category, 'some/file', '{line number}') -# captured = capsys.readouterr() -# assert ( -# captured.out -# == 'WARNING in some/file at line {line number}\nThis is a test.\n\n' -# ) +def test_multiply_factor_multiple_levels(): + # Original DataFrame definition + df = pd.DataFrame( + np.full((5, 3), 1.00), + index=pd.MultiIndex.from_tuples( + [ + ('A', 'X', 'K'), + ('A', 'X', 'L'), + ('A', 'Y', 'M'), + ('B', 'X', 'K'), + ('B', 'Y', 'M'), + ], + names=['lv1', 'lv2', 'lv3'], + ), + columns=['col1', 'col2', 'col3'], + ) + + # Test 1: Basic multiplication on rows + result_df = pd.DataFrame( + np.array( + [ + [2.0, 2.0, 2.0], + [2.0, 2.0, 2.0], + [1.0, 1.0, 1.0], + [1.0, 1.0, 1.0], + [1.0, 1.0, 1.0], + ] + ), + index=pd.MultiIndex.from_tuples( + [ + ('A', 'X', 'K'), + ('A', 'X', 'L'), + ('A', 'Y', 'M'), + ('B', 'X', 'K'), + ('B', 'Y', 'M'), + ], + names=['lv1', 'lv2', 'lv3'], + ), + columns=['col1', 'col2', 'col3'], + ) + pd.testing.assert_frame_equal( + base.multiply_factor_multiple_levels(df.copy(), {'lv1': 'A', 'lv2': 'X'}, 2), + result_df, + ) + + # Test 2: Multiplication on all rows + result_df_all = pd.DataFrame( + np.full((5, 3), 3.00), + index=pd.MultiIndex.from_tuples( + [ + ('A', 'X', 'K'), + ('A', 'X', 'L'), + ('A', 'Y', 'M'), + ('B', 'X', 'K'), + ('B', 'Y', 'M'), + ], + names=['lv1', 'lv2', 'lv3'], + ), + columns=['col1', 'col2', 'col3'], + ) + pd.testing.assert_frame_equal( + base.multiply_factor_multiple_levels(df.copy(), {}, 3), result_df_all + ) + + # Original DataFrame definition for columns test + df_columns = pd.DataFrame( + np.ones((3, 5)), + index=['row1', 'row2', 'row3'], + columns=pd.MultiIndex.from_tuples( + [ + ('A', 'X', 'K'), + ('A', 'X', 'L'), + ('A', 'Y', 'M'), + ('B', 'X', 'K'), + ('B', 'Y', 'M'), + ], + names=['lv1', 'lv2', 'lv3'], + ), + ) + + # Test 3: Multiplication on columns + result_df_columns = pd.DataFrame( + np.array( + [ + [2.0, 2.0, 1.0, 2.0, 1.0], + [2.0, 2.0, 1.0, 2.0, 1.0], + [2.0, 2.0, 1.0, 2.0, 1.0], + ] + ), + index=['row1', 'row2', 'row3'], + columns=pd.MultiIndex.from_tuples( + [ + ('A', 'X', 'K'), + ('A', 'X', 'L'), + ('A', 'Y', 'M'), + ('B', 'X', 'K'), + ('B', 'Y', 'M'), + ], + names=['lv1', 'lv2', 'lv3'], + ), + ) + pd.testing.assert_frame_equal( + base.multiply_factor_multiple_levels( + df_columns.copy(), {'lv2': 'X'}, 2, axis=1 + ), + result_df_columns, + ) + + # Test 4: Multiplication with no matching conditions + with pytest.raises(ValueError) as excinfo: + base.multiply_factor_multiple_levels(df.copy(), {'lv1': 'C'}, 2) + assert ( + str(excinfo.value) == "No rows found matching the conditions: `{'lv1': 'C'}`" + ) + + # Test 5: Invalid axis + with pytest.raises(ValueError) as excinfo: + base.multiply_factor_multiple_levels(df.copy(), {'lv1': 'A'}, 2, axis=2) + assert str(excinfo.value) == "Invalid axis: `2`" + + # Test 6: Empty conditions affecting all rows + result_df_empty = pd.DataFrame( + np.full((5, 3), 4.00), + index=pd.MultiIndex.from_tuples( + [ + ('A', 'X', 'K'), + ('A', 'X', 'L'), + ('A', 'Y', 'M'), + ('B', 'X', 'K'), + ('B', 'Y', 'M'), + ], + names=['lv1', 'lv2', 'lv3'], + ), + columns=['col1', 'col2', 'col3'], + ) + pd.testing.assert_frame_equal( + base.multiply_factor_multiple_levels(df.copy(), {}, 4), result_df_empty + ) def test_describe():