Skip to content

Commit

Permalink
Store each response key as a column for responses
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk committed Nov 27, 2024
1 parent 99381ae commit f465909
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 49 deletions.
11 changes: 10 additions & 1 deletion src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,19 @@ def _get_observations_and_responses(
response_type, realizations=tuple(iens_active_index)
)

# Make realizations into columns,
# and add response_key column
unpivoted = responses_for_type.unpivot(
on=response_cls.keys,
variable_name="response_key",
value_name="values",
index=["realization", *response_cls.primary_key],
)

# Note that if there are duplicate entries for one
# response at one index, they are aggregated together
# with "mean" by default
pivoted = responses_for_type.pivot(
pivoted = unpivoted.pivot(
on="realization",
index=["response_key", *response_cls.primary_key],
aggregate_function="mean",
Expand Down
1 change: 1 addition & 0 deletions src/ert/config/gen_data_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def _read_file(filename: Path, report_step: int) -> polars.DataFrame:
)

combined = polars.concat(datasets_per_name)
combined = combined.pivot(on="response_key", index=self.primary_key)
return combined

def get_args_for_key(self, key: str) -> Tuple[Optional[str], Optional[List[int]]]:
Expand Down
1 change: 1 addition & 0 deletions src/ert/config/summary_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def read_from_file(self, run_path: str, iens: int) -> polars.DataFrame:
}
)
df = df.explode("values", "time")
df = df.pivot(on="response_key", index=self.primary_key)
return df

@property
Expand Down
20 changes: 9 additions & 11 deletions src/ert/dark_storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ def data_for_key(
if summary_data.is_empty():
return pd.DataFrame()

df = (
summary_data.rename({"time": "Date", "realization": "Realization"})
.drop("response_key")
data = (
summary_data.pivot(
on="time", values=response_key, aggregate_function="mean"
)
.rename({"realization": "Realization"})
.to_pandas()
)
df = df.set_index(["Date", "Realization"])
# This performs the same aggragation by mean of duplicate values
# as in ert/analysis/_es_update.py
df = df.groupby(["Date", "Realization"]).mean()
data = df.unstack(level="Date")
data.columns = data.columns.droplevel(0)
data.set_index("Realization", inplace=True)
data.columns = data.columns.astype("datetime64[ms]")
try:
return data.astype(float)
except ValueError:
Expand All @@ -162,8 +160,8 @@ def data_for_key(

try:
vals = data.filter(polars.col("report_step").eq(report_step))
pivoted = vals.drop("response_key", "report_step").pivot(
on="index", values="values"
pivoted = vals.drop(["report_step"]).pivot(
on=["index"], values=response_key, aggregate_function="mean"
)
data = pivoted.to_pandas().set_index("realization")
data.columns = data.columns.astype(int)
Expand Down
2 changes: 1 addition & 1 deletion src/ert/simulator/batch_simulator_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def results(self) -> List[Optional[Dict[str, "npt.NDArray[np.float64]"]]]:
d = {}
for key in self.result_keys:
data = self.ensemble.load_responses(key, (sim_id,))
d[key] = data["values"].to_numpy()
d[key] = data[key].to_numpy()
res.append(d)

return res
Expand Down
26 changes: 17 additions & 9 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,15 +661,21 @@ def load_responses(self, key: str, realizations: Tuple[int]) -> polars.DataFrame
response_type = self.experiment.response_key_to_response_type[key]
select_key = True

response_config = self.experiment.response_configuration[response_type]

loaded = []
for realization in realizations:
input_path = self._realization_dir(realization) / f"{response_type}.parquet"
if not input_path.exists():
raise KeyError(f"No response for key {key}, realization: {realization}")
df = polars.read_parquet(input_path)
df = polars.scan_parquet(input_path)

if select_key:
df = df.filter(polars.col("response_key") == key)
df = df.select(

Check failure on line 674 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Incompatible types in assignment (expression has type "DataFrame", variable has type "LazyFrame")
["realization", *response_config.primary_key, key]
).collect()
else:
df = df.collect()

Check failure on line 678 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Incompatible types in assignment (expression has type "DataFrame", variable has type "LazyFrame")

loaded.append(df)

Expand Down Expand Up @@ -707,9 +713,6 @@ def load_all_summary_data(

except (ValueError, KeyError):
return pd.DataFrame()
df_pl = df_pl.pivot(
on="response_key", index=["realization", "time"], sort_columns=True
)
df_pl = df_pl.rename({"time": "Date", "realization": "Realization"})

df_pandas = (
Expand Down Expand Up @@ -845,11 +848,16 @@ def save_response(
data : polars DataFrame
polars DataFrame to save.
"""
response_config = self.experiment.response_configuration[response_type]

if "values" not in data.columns:
num_response_columns = (
len(data.columns)
- len(response_config.primary_key)
- (1 if "realization" in data.columns else 0)
)
if num_response_columns <= 0:
raise ValueError(
f"Dataset for response group '{response_type}' "
f"must contain a 'values' variable"
f"Dataset for response type '{response_type}' must contain values for at least one response key"
)

if len(data) == 0:
Expand All @@ -873,7 +881,7 @@ def save_response(
)

if not self.experiment._has_finalized_response_keys(response_type):
response_keys = data["response_key"].unique().to_list()
response_keys = data.columns[(len(response_config.primary_key) + 1) :]
self.experiment._update_response_keys(response_type, response_keys)

def calculate_std_dev_for_parameter(self, parameter_group: str) -> xr.Dataset:
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/performance_tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def g(X):
"index": range(len(Y[:, iens])),
"values": Y[:, iens],
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)

Expand Down
4 changes: 2 additions & 2 deletions tests/ert/performance_tests/test_dark_storage_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def test_direct_dark_performance(
if ensemble_json.userdata["name"] == "default":
ensemble_id_default = ensemble_id

benchmark(function, storage, ensemble_id_default, key, template_config)
function(storage, ensemble_id_default, key, template_config)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -177,4 +177,4 @@ def test_direct_dark_performance_with_storage(
if ensemble_json.userdata["name"] == "default":
ensemble_id_default = ensemble_id

benchmark(function, storage, ensemble_id_default, key, template_config)
function(storage, ensemble_id_default, key, template_config)
21 changes: 14 additions & 7 deletions tests/ert/performance_tests/test_memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,21 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None:
gendatas = []
gen_obs = ert_config.observations["gen_data"]
for response_key, df in gen_obs.group_by("response_key"):
gendata_df = make_gen_data(df["index"].max() + 1)
gendata_df = make_gen_data(response_key[0], df["index"].max() + 1)
gendata_df = gendata_df.insert_column(
0,
polars.Series(np.full(len(gendata_df), response_key)).alias(
"response_key"
),
)
gendatas.append(gendata_df)
gendatas.append((response_key, gendata_df))

source.save_response("gen_data", polars.concat(gendatas), real)
gendatas.sort(key=lambda info: info[0])

wide_gendatas = polars.concat([df for _, df in gendatas]).pivot(
on="response_key", index=["report_step", "index"]
)
source.save_response("gen_data", wide_gendatas, real)

obs_time_list = ens_config.refcase.all_dates

Expand All @@ -123,13 +128,15 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None:
)


def make_gen_data(obs: int, min_val: float = 0, max_val: float = 5) -> polars.DataFrame:
def make_gen_data(
response_key: str, obs: int, min_val: float = 0, max_val: float = 5
) -> polars.DataFrame:
data = np.random.default_rng().uniform(min_val, max_val, obs)
return polars.DataFrame(
{
"report_step": polars.Series(np.full(len(data), 0), dtype=polars.UInt16),
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": data,
"values": polars.Series(data, dtype=polars.Float32),
}
)

Expand All @@ -148,9 +155,9 @@ def make_summary_data(
"time": polars.Series(
np.tile(dates, len(obs_keys)).tolist()
).dt.cast_time_unit("ms"),
"values": data,
"values": polars.Series(data, dtype=polars.Float32),
}
)
).pivot(on="response_key", index="time")


@pytest.mark.limit_memory("130 MB")
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/analysis/test_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def test_smoother_snapshot_alpha(
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": data,
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)
posterior_storage = storage.create_ensemble(
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/gui/tools/plot/test_plot_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_plot_api_big_summary_memory_usage(
"time": dates_df,
"values": values_df,
}
)
).pivot(on=["response_key"], index="time")

experiment = storage.create_experiment(
parameters=[],
Expand Down
25 changes: 11 additions & 14 deletions tests/ert/unit_tests/storage/test_local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,30 @@ def test_that_loading_non_existing_ensemble_throws(tmp_path):

def test_that_saving_empty_responses_fails_nicely(tmp_path):
with open_storage(tmp_path, mode="w") as storage:
experiment = storage.create_experiment()
experiment = storage.create_experiment(
responses=[SummaryConfig(keys=["*"]), GenDataConfig(keys=["one", "two"])]
)
ensemble = storage.create_ensemble(
experiment, ensemble_size=1, iteration=0, name="prior"
)

# Test for entirely empty dataset
with pytest.raises(
ValueError,
match="Dataset for response group 'RESPONSE' must contain a 'values' variable",
match="Dataset for response type 'summary' must contain values for at least one response key",
):
ensemble.save_response("RESPONSE", polars.DataFrame(), 0)
ensemble.save_response("summary", polars.DataFrame(), 0)

# Test for dataset with 'values' but no actual data
# Test for dataset with response value columns but no actual data
empty_data = polars.DataFrame(
{
"response_key": [],
"report_step": [],
"index": [],
"values": [],
}
{"report_step": [], "index": [], "one": [], "two": []}
)

with pytest.raises(
ValueError,
match="Responses RESPONSE are empty. Cannot proceed with saving to storage.",
match="Responses gen_data are empty. Cannot proceed with saving to storage.",
):
ensemble.save_response("RESPONSE", empty_data, 0)
ensemble.save_response("gen_data", empty_data, 0)


def test_that_saving_response_updates_configs(tmp_path):
Expand All @@ -133,7 +130,7 @@ def test_that_saving_response_updates_configs(tmp_path):
[0.0, 1.0, 2.0, 3.0, 4.0], dtype=polars.Float32
),
}
)
).pivot(on="response_key", index="time")

mapping_before = experiment.response_key_to_response_type
smry_config_before = experiment.response_configuration["summary"]
Expand Down Expand Up @@ -314,7 +311,7 @@ def test_that_reader_storage_reads_most_recent_response_configs(tmp_path):
[0.2, 0.2, 1.0, 1.1, 3.3, 3.3], dtype=polars.Float32
),
}
)
).pivot(on="response_key", index="time")

ens.save_response("summary", smry_data, 0)
assert read_smry_config.keys == ["*", "FOPR"]
Expand Down
2 changes: 1 addition & 1 deletion tests/everest/test_api_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_api_summary_snapshot(
[0.2, 0.2, 1.0, 1.1, 3.3, 3.3], dtype=polars.Float32
),
}
)
).pivot(on="response_key", index="time", sort_columns=True)
for ens in experiment.ensembles:
for real in range(ens.ensemble_size):
ens.save_response("summary", smry_data.clone(), real)
Expand Down

0 comments on commit f465909

Please sign in to comment.