Skip to content

Commit

Permalink
Fix manifest having too many files and not substituting
Browse files Browse the repository at this point in the history
Also makes substitution of <IENS> and <ITER> for
all parameters and responses
  • Loading branch information
eivindjahren committed Nov 18, 2024
1 parent 7612535 commit 456919e
Show file tree
Hide file tree
Showing 19 changed files with 377 additions and 107 deletions.
6 changes: 4 additions & 2 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
async def _read_parameters(
run_path: str,
realization: int,
iteration: int,
ensemble: Ensemble,
) -> LoadResult:
result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
Expand All @@ -28,7 +29,7 @@ async def _read_parameters(
try:
start_time = time.perf_counter()
logger.debug(f"Starting to load parameter: {config.name}")
ds = config.read_from_runpath(Path(run_path), realization)
ds = config.read_from_runpath(Path(run_path), realization, iteration)
await asyncio.sleep(0)
logger.debug(
f"Loaded {config.name}",
Expand Down Expand Up @@ -60,7 +61,7 @@ async def _write_responses_to_storage(
start_time = time.perf_counter()
logger.debug(f"Starting to load response: {config.response_type}")
try:
ds = config.read_from_file(run_path, realization)
ds = config.read_from_file(run_path, realization, ensemble.iteration)
except (FileNotFoundError, InvalidResponseFile) as err:
errors.append(str(err))
logger.warning(f"Failed to write: {realization}: {err}")
Expand Down Expand Up @@ -105,6 +106,7 @@ async def forward_model_ok(
parameters_result = await _read_parameters(
run_path,
realization,
iter,
ensemble,
)

Expand Down
10 changes: 8 additions & 2 deletions src/ert/config/ext_param_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import numpy as np
import xarray as xr

from ert.substitutions import substitute_runpath_name

from .parameter_config import ParameterConfig

if TYPE_CHECKING:
Expand Down Expand Up @@ -65,13 +67,17 @@ def __post_init__(self) -> None:
f"Duplicate keys for key '{self.name}' - keys: {self.input_keys}"
)

def read_from_runpath(self, run_path: Path, real_nr: int) -> xr.Dataset:
def read_from_runpath(
self, run_path: Path, real_nr: int, iteration: int
) -> xr.Dataset:
raise NotImplementedError()

def write_to_runpath(
self, run_path: Path, real_nr: int, ensemble: Ensemble
) -> None:
file_path = run_path / self.output_file
file_path = run_path / substitute_runpath_name(
self.output_file, real_nr, ensemble.iteration
)
Path.mkdir(file_path.parent, exist_ok=True, parents=True)

data: MutableDataType = {}
Expand Down
13 changes: 8 additions & 5 deletions src/ert/config/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing_extensions import Self

from ert.field_utils import FieldFileFormat, Shape, read_field, read_mask, save_field
from ert.substitutions import substitute_runpath_name

from ._option_dict import option_dict
from ._str_to_bool import str_to_bool
Expand Down Expand Up @@ -138,11 +139,11 @@ def __len__(self) -> int:

return np.size(self.mask) - np.count_nonzero(self.mask)

def read_from_runpath(self, run_path: Path, real_nr: int) -> xr.Dataset:
def read_from_runpath(
self, run_path: Path, real_nr: int, iteration: int
) -> xr.Dataset:
t = time.perf_counter()
file_name = self.forward_init_file
if "%d" in file_name:
file_name = file_name % real_nr # noqa
file_name = substitute_runpath_name(self.forward_init_file, real_nr, iteration)
ds = xr.Dataset(
{
"values": (
Expand All @@ -166,7 +167,9 @@ def write_to_runpath(
self, run_path: Path, real_nr: int, ensemble: Ensemble
) -> None:
t = time.perf_counter()
file_out = run_path.joinpath(self.output_file)
file_out = run_path.joinpath(
substitute_runpath_name(str(self.output_file), real_nr, ensemble.iteration)
)
if os.path.islink(file_out):
os.unlink(file_out)

Expand Down
12 changes: 7 additions & 5 deletions src/ert/config/gen_data_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import polars
from typing_extensions import Self

from ert.substitutions import substitute_runpath_name
from ert.validation import rangestring_to_list

from ._option_dict import option_dict
Expand Down Expand Up @@ -118,7 +119,7 @@ def from_config_dict(cls, config_dict: ConfigDict) -> Optional[Self]:
report_steps_list=report_steps,
)

def read_from_file(self, run_path: str, _: int) -> polars.DataFrame:
def read_from_file(self, run_path: str, iens: int, iter: int) -> polars.DataFrame:
def _read_file(filename: Path, report_step: int) -> polars.DataFrame:
try:
data = np.loadtxt(_run_path / filename, ndmin=1)
Expand Down Expand Up @@ -152,14 +153,15 @@ def _read_file(filename: Path, report_step: int) -> polars.DataFrame:
datasets_per_report_step = []
if report_steps is None:
try:
datasets_per_report_step.append(
_read_file(_run_path / input_file, 0)
)
filename = substitute_runpath_name(input_file, iens, iter)
datasets_per_report_step.append(_read_file(_run_path / filename, 0))
except (InvalidResponseFile, FileNotFoundError) as err:
errors.append(err)
else:
for report_step in report_steps:
filename = input_file % report_step
filename = substitute_runpath_name(
input_file % report_step, iens, iter
)
try:
datasets_per_report_step.append(
_read_file(_run_path / filename, report_step)
Expand Down
17 changes: 9 additions & 8 deletions src/ert/config/gen_kw_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from scipy.stats import norm
from typing_extensions import Self

from ert.substitutions import substitute_runpath_name

from ._str_to_bool import str_to_bool
from .parameter_config import ParameterConfig, parse_config
from .parsing import ConfigValidationError, ConfigWarning, ErrorInfo
Expand Down Expand Up @@ -237,7 +239,7 @@ def sample_or_load(
self, real_nr: int, random_seed: int, ensemble_size: int
) -> xr.Dataset:
if self.forward_init_file:
return self.read_from_runpath(Path(), real_nr)
return self.read_from_runpath(Path(), real_nr, 0)

_logger.info(f"Sampling parameter {self.name} for realization {real_nr}")
keys = [e.name for e in self.transform_functions]
Expand All @@ -260,14 +262,14 @@ def read_from_runpath(
self,
run_path: Path,
real_nr: int,
iteration: int,
) -> xr.Dataset:
keys = [e.name for e in self.transform_functions]
if not self.forward_init_file:
raise ValueError("loading gen_kw values requires forward_init_file")

parameter_value = self._values_from_file(
real_nr,
self.forward_init_file,
substitute_runpath_name(self.forward_init_file, real_nr, iteration),
keys,
)

Expand Down Expand Up @@ -304,7 +306,9 @@ def write_to_runpath(
}

if self.template_file is not None and self.output_file is not None:
target_file = self.output_file
target_file = substitute_runpath_name(
self.output_file, real_nr, ensemble.iteration
)
if target_file.startswith("/"):
target_file = target_file[1:]
(run_path / target_file).parent.mkdir(exist_ok=True, parents=True)
Expand Down Expand Up @@ -382,10 +386,7 @@ def transform(self, array: npt.ArrayLike) -> npt.NDArray[np.float64]:
return array

@staticmethod
def _values_from_file(
realization: int, name_format: str, keys: List[str]
) -> npt.NDArray[np.double]:
file_name = name_format % realization
def _values_from_file(file_name: str, keys: List[str]) -> npt.NDArray[np.double]:
df = pd.read_csv(file_name, sep=r"\s+", header=None)
# This means we have a key: value mapping in the
# file otherwise it is just a list of values
Expand Down
3 changes: 2 additions & 1 deletion src/ert/config/parameter_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def sample_or_load(
random_seed: int,
ensemble_size: int,
) -> xr.Dataset:
return self.read_from_runpath(Path(), real_nr)
return self.read_from_runpath(Path(), real_nr, 0)

@abstractmethod
def __len__(self) -> int:
Expand All @@ -69,6 +69,7 @@ def read_from_runpath(
self,
run_path: Path,
real_nr: int,
iteration: int,
) -> xr.Dataset:
"""
This function is responsible for converting the parameter
Expand Down
2 changes: 1 addition & 1 deletion src/ert/config/response_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ResponseConfig(ABC):
has_finalized_keys: bool = False

@abstractmethod
def read_from_file(self, run_path: str, iens: int) -> polars.DataFrame:
def read_from_file(self, run_path: str, iens: int, iter: int) -> polars.DataFrame:
"""Reads the data for the response from run_path.
Raises:
Expand Down
6 changes: 4 additions & 2 deletions src/ert/config/summary_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional, Set, Union, no_type_check

from ert.substitutions import substitute_runpath_name

from ._read_summary import read_summary
from .ensemble_config import Refcase
from .parsing import ConfigDict, ConfigKeys
Expand Down Expand Up @@ -37,8 +39,8 @@ def expected_input_files(self) -> List[str]:
base = self.input_files[0]
return [f"{base}.UNSMRY", f"{base}.SMSPEC"]

def read_from_file(self, run_path: str, iens: int) -> polars.DataFrame:
filename = self.input_files[0].replace("<IENS>", str(iens))
def read_from_file(self, run_path: str, iens: int, iter: int) -> polars.DataFrame:
filename = substitute_runpath_name(self.input_files[0], iens, iter)
_, keys, time_map, data = read_summary(f"{run_path}/{filename}", self.keys)
if len(data) == 0 or len(keys) == 0:
# https://github.com/equinor/ert/issues/6974
Expand Down
14 changes: 9 additions & 5 deletions src/ert/config/surface_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import xtgeo
from typing_extensions import Self

from ert.substitutions import substitute_runpath_name

from ._option_dict import option_dict
from ._str_to_bool import str_to_bool
from .parameter_config import ParameterConfig
Expand Down Expand Up @@ -97,10 +99,10 @@ def from_config_list(cls, surface: List[str]) -> Self:
def __len__(self) -> int:
return self.ncol * self.nrow

def read_from_runpath(self, run_path: Path, real_nr: int) -> xr.Dataset:
file_name = self.forward_init_file
if "%d" in file_name:
file_name = file_name % real_nr # noqa
def read_from_runpath(
self, run_path: Path, real_nr: int, iteration: int
) -> xr.Dataset:
file_name = substitute_runpath_name(self.forward_init_file, real_nr, iteration)
file_path = run_path / file_name
if not file_path.exists():
raise ValueError(
Expand Down Expand Up @@ -137,7 +139,9 @@ def write_to_runpath(
values=data.values,
)

file_path = run_path / self.output_file
file_path = run_path / substitute_runpath_name(
str(self.output_file), real_nr, ensemble.iteration
)
file_path.parent.mkdir(exist_ok=True, parents=True)
surf.to_file(file_path, fformat="irap_ascii")

Expand Down
19 changes: 11 additions & 8 deletions src/ert/enkf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ert.config.ert_config import forward_model_data_to_json
from ert.config.forward_model_step import ForwardModelStep
from ert.config.model_config import ModelConfig
from ert.substitutions import Substitutions
from ert.substitutions import Substitutions, substitute_runpath_name

from .config import (
ExtParamConfig,
Expand Down Expand Up @@ -126,23 +126,26 @@ def _generate_parameter_files(
_value_export_json(run_path, export_base_name, exports)


def _manifest_to_json(ensemble: Ensemble, iens: int = 0) -> Dict[str, Any]:
def _manifest_to_json(ensemble: Ensemble, iens: int, iter: int) -> Dict[str, Any]:
manifest = {}
# Add expected parameter files to manifest
for param_config in ensemble.experiment.parameter_configuration.values():
assert isinstance(
param_config,
(ExtParamConfig, GenKwConfig, Field, SurfaceConfig),
)
if param_config.forward_init and param_config.forward_init_file is not None:
file_path = param_config.forward_init_file.replace("%d", str(iens))
if param_config.forward_init and ensemble.iteration == 0:
assert param_config.forward_init_file is not None
file_path = substitute_runpath_name(
param_config.forward_init_file, iens, iter
)
manifest[param_config.name] = file_path
elif param_config.output_file is not None and not param_config.forward_init:
manifest[param_config.name] = str(param_config.output_file)
# Add expected response files to manifest
for respons_config in ensemble.experiment.response_configuration.values():
for input_file in respons_config.expected_input_files:
manifest[f"{respons_config.response_type}_{input_file}"] = input_file
manifest[f"{respons_config.response_type}_{input_file}"] = (
substitute_runpath_name(input_file, iens, iter)
)

return manifest

Expand Down Expand Up @@ -265,7 +268,7 @@ def create_run_path(
orjson.dumps(forward_model_output, option=orjson.OPT_NON_STR_KEYS)
)
# Write MANIFEST file to runpath use to avoid NFS sync issues
data = _manifest_to_json(ensemble, run_arg.iens)
data = _manifest_to_json(ensemble, run_arg.iens, run_arg.itr)
with open(run_path / "manifest.json", mode="wb") as fptr:
fptr.write(orjson.dumps(data, option=orjson.OPT_NON_STR_KEYS))

Expand Down
Loading

0 comments on commit 456919e

Please sign in to comment.