Skip to content

Commit

Permalink
Store each response key as a column for responses
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk committed Nov 27, 2024
1 parent 99381ae commit 5414b3a
Show file tree
Hide file tree
Showing 22 changed files with 231 additions and 109 deletions.
11 changes: 10 additions & 1 deletion src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,19 @@ def _get_observations_and_responses(
response_type, realizations=tuple(iens_active_index)
)

# Make realizations into columns,
# and add response_key column
unpivoted = responses_for_type.unpivot(
on=response_cls.keys,
variable_name="response_key",
value_name="values",
index=["realization", *response_cls.primary_key],
)

# Note that if there are duplicate entries for one
# response at one index, they are aggregated together
# with "mean" by default
pivoted = responses_for_type.pivot(
pivoted = unpivoted.pivot(
on="realization",
index=["response_key", *response_cls.primary_key],
aggregate_function="mean",
Expand Down
1 change: 1 addition & 0 deletions src/ert/config/gen_data_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def _read_file(filename: Path, report_step: int) -> polars.DataFrame:
)

combined = polars.concat(datasets_per_name)
combined = combined.pivot(on="response_key", index=self.primary_key)
return combined

def get_args_for_key(self, key: str) -> Tuple[Optional[str], Optional[List[int]]]:
Expand Down
3 changes: 3 additions & 0 deletions src/ert/config/summary_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def read_from_file(self, run_path: str, iens: int) -> polars.DataFrame:
}
)
df = df.explode("values", "time")
df = df.pivot(
on="response_key", index=self.primary_key, aggregate_function="mean"
)
return df

@property
Expand Down
20 changes: 9 additions & 11 deletions src/ert/dark_storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ def data_for_key(
if summary_data.is_empty():
return pd.DataFrame()

df = (
summary_data.rename({"time": "Date", "realization": "Realization"})
.drop("response_key")
data = (
summary_data.pivot(
on="time", values=response_key, aggregate_function="mean"
)
.rename({"realization": "Realization"})
.to_pandas()
)
df = df.set_index(["Date", "Realization"])
# This performs the same aggragation by mean of duplicate values
# as in ert/analysis/_es_update.py
df = df.groupby(["Date", "Realization"]).mean()
data = df.unstack(level="Date")
data.columns = data.columns.droplevel(0)
data.set_index("Realization", inplace=True)
data.columns = data.columns.astype("datetime64[ms]")
try:
return data.astype(float)
except ValueError:
Expand All @@ -162,8 +160,8 @@ def data_for_key(

try:
vals = data.filter(polars.col("report_step").eq(report_step))
pivoted = vals.drop("response_key", "report_step").pivot(
on="index", values="values"
pivoted = vals.drop(["report_step"]).pivot(
on=["index"], values=response_key, aggregate_function="mean"
)
data = pivoted.to_pandas().set_index("realization")
data.columns = data.columns.astype(int)
Expand Down
11 changes: 10 additions & 1 deletion src/ert/data/_measured_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,19 @@ def _get_data(
f"No response loaded for observation type: {response_type}"
)

# Make realizations into columns,
# and add response_key column
unpivoted = responses_for_type.unpivot(
on=response_cls.keys,
variable_name="response_key",
value_name="values",
index=["realization", *response_cls.primary_key],
)

# Note that if there are duplicate entries for one
# response at one index, they are aggregated together
# with "mean" by default
pivoted = responses_for_type.pivot(
pivoted = unpivoted.pivot(
on="realization",
index=["response_key", *response_cls.primary_key],
aggregate_function="mean",
Expand Down
2 changes: 1 addition & 1 deletion src/ert/simulator/batch_simulator_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def results(self) -> List[Optional[Dict[str, "npt.NDArray[np.float64]"]]]:
d = {}
for key in self.result_keys:
data = self.ensemble.load_responses(key, (sim_id,))
d[key] = data["values"].to_numpy()
d[key] = data[key].to_numpy()
res.append(d)

return res
Expand Down
26 changes: 17 additions & 9 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,15 +661,21 @@ def load_responses(self, key: str, realizations: Tuple[int]) -> polars.DataFrame
response_type = self.experiment.response_key_to_response_type[key]
select_key = True

response_config = self.experiment.response_configuration[response_type]

loaded = []
for realization in realizations:
input_path = self._realization_dir(realization) / f"{response_type}.parquet"
if not input_path.exists():
raise KeyError(f"No response for key {key}, realization: {realization}")
df = polars.read_parquet(input_path)
df = polars.scan_parquet(input_path)

if select_key:
df = df.filter(polars.col("response_key") == key)
df = df.select(
["realization", *response_config.primary_key, key]
).collect()
else:
df = df.collect()

loaded.append(df)

Expand Down Expand Up @@ -707,9 +713,6 @@ def load_all_summary_data(

except (ValueError, KeyError):
return pd.DataFrame()
df_pl = df_pl.pivot(
on="response_key", index=["realization", "time"], sort_columns=True
)
df_pl = df_pl.rename({"time": "Date", "realization": "Realization"})

df_pandas = (
Expand Down Expand Up @@ -845,11 +848,16 @@ def save_response(
data : polars DataFrame
polars DataFrame to save.
"""
response_config = self.experiment.response_configuration[response_type]

if "values" not in data.columns:
num_response_columns = (
len(data.columns)
- len(response_config.primary_key)
- (1 if "realization" in data.columns else 0)
)
if num_response_columns <= 0:
raise ValueError(
f"Dataset for response group '{response_type}' "
f"must contain a 'values' variable"
f"Dataset for response type '{response_type}' must contain values for at least one response key"
)

if len(data) == 0:
Expand All @@ -873,7 +881,7 @@ def save_response(
)

if not self.experiment._has_finalized_response_keys(response_type):
response_keys = data["response_key"].unique().to_list()
response_keys = data.columns[(len(response_config.primary_key) + 1) :]
self.experiment._update_response_keys(response_type, response_keys)

def calculate_std_dev_for_parameter(self, parameter_group: str) -> xr.Dataset:
Expand Down
5 changes: 3 additions & 2 deletions src/ert/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

logger = logging.getLogger(__name__)

_LOCAL_STORAGE_VERSION = 8
_LOCAL_STORAGE_VERSION = 9


class _Migrations(BaseModel):
Expand Down Expand Up @@ -472,6 +472,7 @@ def _migrate(self, version: int) -> None:
to6,
to7,
to8,
to9,
)

try:
Expand Down Expand Up @@ -516,7 +517,7 @@ def _migrate(self, version: int) -> None:

elif version < _LOCAL_STORAGE_VERSION:
migrations = list(
enumerate([to2, to3, to4, to5, to6, to7, to8], start=1)
enumerate([to2, to3, to4, to5, to6, to7, to8, to9], start=1)
)
for from_version, migration in migrations[version - 1 :]:
print(f"* Updating storage to version: {from_version+1}")
Expand Down
5 changes: 4 additions & 1 deletion src/ert/storage/migration/to8.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ def _migrate_responses_from_netcdf_to_parquet(path: Path) -> None:
)

pandas_df = gen_data_ds.to_dataframe().dropna()
polars_df = polars.from_pandas(pandas_df.reset_index())
polars_df = polars.from_pandas(
pandas_df.reset_index(),
schema_overrides={"values": polars.Float32},
)
polars_df = polars_df.rename({"name": "response_key"})

if "time" in polars_df:
Expand Down
89 changes: 89 additions & 0 deletions src/ert/storage/migration/to9.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import dataclasses
import json
import os
from pathlib import Path

import polars
import xarray as xr

info = "Make responses columnar"


@dataclasses.dataclass
class ObservationDatasetInfo:
polars_df: polars.DataFrame
response_type: str
original_ds_path: Path

@classmethod
def from_path(cls, path: Path) -> "ObservationDatasetInfo":
observation_key = os.path.basename(path)
ds = xr.open_dataset(path, engine="scipy")
response_key = ds.attrs["response"]
response_type = "summary" if response_key == "summary" else "gen_data"

df = polars.from_pandas(ds.to_dataframe().dropna().reset_index())
df = df.with_columns(observation_key=polars.lit(observation_key))

primary_key = (
["time"] if response_type == "summary" else ["report_step", "index"]
)
if response_type == "summary":
df = df.rename({"name": "response_key"})
df = df.with_columns(polars.col("time").dt.cast_time_unit("ms"))

if response_type == "gen_data":
df = df.with_columns(
polars.col("report_step").cast(polars.UInt16),
polars.col("index").cast(polars.UInt16),
response_key=polars.lit(response_key),
)

df = df.with_columns(
[
polars.col("std").cast(polars.Float32),
polars.col("observations").cast(polars.Float32),
]
)

df = df[
["response_key", "observation_key", *primary_key, "observations", "std"]
]

return cls(df, response_type, path)


def _migrate_responses_to_one_col_per_response(path: Path) -> None:
for experiment in path.glob("experiments/*"):
ensembles = path.glob("ensembles/*")

experiment_id = None
with open(experiment / "index.json", encoding="utf-8") as f:
exp_index = json.load(f)
experiment_id = exp_index["id"]

for ens in ensembles:
with open(ens / "index.json", encoding="utf-8") as f:
ens_file = json.load(f)
if ens_file["experiment_id"] != experiment_id:
continue

real_dirs = [*ens.glob("realization-*")]

for real_dir in real_dirs:
for df_name, columns in [
("gen_data", ["report_step", "index"]),
("summary", ["time"]),
]:
if (real_dir / f"{df_name}.parquet").exists():
df = polars.read_parquet(real_dir / f"{df_name}.parquet")
pivoted = df.pivot(
on="response_key", index=["realization", *columns]
)

os.remove(real_dir / f"{df_name}.parquet")
pivoted.write_parquet(real_dir / f"{df_name}.parquet")


def migrate(path: Path) -> None:
_migrate_responses_to_one_col_per_response(path)
2 changes: 1 addition & 1 deletion tests/ert/performance_tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def g(X):
"index": range(len(Y[:, iens])),
"values": Y[:, iens],
}
),
).pivot(on="response_key", index=["report_step", "index"]),
iens,
)

Expand Down
4 changes: 2 additions & 2 deletions tests/ert/performance_tests/test_dark_storage_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def test_direct_dark_performance(
if ensemble_json.userdata["name"] == "default":
ensemble_id_default = ensemble_id

benchmark(function, storage, ensemble_id_default, key, template_config)
function(storage, ensemble_id_default, key, template_config)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -177,4 +177,4 @@ def test_direct_dark_performance_with_storage(
if ensemble_json.userdata["name"] == "default":
ensemble_id_default = ensemble_id

benchmark(function, storage, ensemble_id_default, key, template_config)
function(storage, ensemble_id_default, key, template_config)
21 changes: 14 additions & 7 deletions tests/ert/performance_tests/test_memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,21 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None:
gendatas = []
gen_obs = ert_config.observations["gen_data"]
for response_key, df in gen_obs.group_by("response_key"):
gendata_df = make_gen_data(df["index"].max() + 1)
gendata_df = make_gen_data(response_key[0], df["index"].max() + 1)
gendata_df = gendata_df.insert_column(
0,
polars.Series(np.full(len(gendata_df), response_key)).alias(
"response_key"
),
)
gendatas.append(gendata_df)
gendatas.append((response_key, gendata_df))

source.save_response("gen_data", polars.concat(gendatas), real)
gendatas.sort(key=lambda info: info[0])

wide_gendatas = polars.concat([df for _, df in gendatas]).pivot(
on="response_key", index=["report_step", "index"]
)
source.save_response("gen_data", wide_gendatas, real)

obs_time_list = ens_config.refcase.all_dates

Expand All @@ -123,13 +128,15 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None:
)


def make_gen_data(obs: int, min_val: float = 0, max_val: float = 5) -> polars.DataFrame:
def make_gen_data(
response_key: str, obs: int, min_val: float = 0, max_val: float = 5
) -> polars.DataFrame:
data = np.random.default_rng().uniform(min_val, max_val, obs)
return polars.DataFrame(
{
"report_step": polars.Series(np.full(len(data), 0), dtype=polars.UInt16),
"index": polars.Series(range(len(data)), dtype=polars.UInt16),
"values": data,
"values": polars.Series(data, dtype=polars.Float32),
}
)

Expand All @@ -148,9 +155,9 @@ def make_summary_data(
"time": polars.Series(
np.tile(dates, len(obs_keys)).tolist()
).dt.cast_time_unit("ms"),
"values": data,
"values": polars.Series(data, dtype=polars.Float32),
}
)
).pivot(on="response_key", index="time")


@pytest.mark.limit_memory("130 MB")
Expand Down
Loading

0 comments on commit 5414b3a

Please sign in to comment.