Skip to content

Commit

Permalink
ENH: Add FLUID column and simplify column names in inplace_volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
tnatt committed Dec 12, 2024
1 parent aaf3b7d commit 94da8ad
Show file tree
Hide file tree
Showing 4 changed files with 541 additions and 50 deletions.
88 changes: 72 additions & 16 deletions src/fmu/dataio/export/rms/inplace_volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import warnings
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Final

Expand All @@ -24,7 +25,17 @@
_logger: Final = null_logger(__name__)


_TABLE_INDEX_COLUMNS: Final = ("ZONE", "REGION", "FACIES", "LICENCE")
_TABLE_INDEX_COLUMNS: Final = ("ZONE", "REGION", "FACIES", "LICENSE")
_FLUID_COLUMN: Final = "FLUID"


class _Fluid(str, Enum):
"""Fluid types"""

OIL = "OIL"
GAS = "GAS"
WATER = "WATER"


# rename columns to FMU standard
_RENAME_COLUMNS_FROM_RMS: Final = {
Expand Down Expand Up @@ -58,11 +69,11 @@ class _ExportVolumetricsRMS:
volume_job_name: str

def __post_init__(self) -> None:
_logger.debug("Process data, estiblish state prior to export.")
_logger.debug("Process data, establish state prior to export.")
self._config = load_global_config()
self._volume_job = self._get_rms_volume_job_settings()
self._volume_table_name = self._read_volume_table_name_from_job()
self._dataframe = self._voltable_as_dataframe()
self._dataframe = self._get_table_with_volumes()
_logger.debug("Process data... DONE")

@property
Expand All @@ -72,8 +83,13 @@ def _classification(self) -> Classification:

@property
def _table_index(self) -> list[str]:
"""Get index columns present in the dataframe."""
return [col for col in _TABLE_INDEX_COLUMNS if col in self._dataframe]
"""Get default classification."""
return self._get_table_index_columns(self._dataframe)

@staticmethod
def _get_table_index_columns(table: pd.DataFrame) -> list[str]:
"""Get index columns present in a table."""
return [col for col in _TABLE_INDEX_COLUMNS if col in table]

def _get_rms_volume_job_settings(self) -> dict:
"""Get information out from the RMS job API."""
Expand Down Expand Up @@ -101,21 +117,61 @@ def _read_volume_table_name_from_job(self) -> str:
_logger.debug("The volume table name is %s", volume_table_name)
return volume_table_name

def _voltable_as_dataframe(self) -> pd.DataFrame:
"""Convert table to pandas dataframe"""
def _get_table_with_volumes(self) -> pd.DataFrame:
"""
Get a volumetric table from RMS converted into a pandas
dataframe on standard format for the inplace_volumes product.
"""
table = self._get_table_from_rms()
table = self._convert_table_from_rms_to_legacy_format(table)
table_index = self._get_table_index_columns(table)
return self._convert_table_from_legacy_to_standard_format(table, table_index)

def _get_table_from_rms(self) -> pd.DataFrame:
"""Fetch volumetric table from RMS and convert to pandas dataframe"""
_logger.debug("Read values and convert to pandas dataframe...")

dict_values = (
self.project.volumetric_tables[self._volume_table_name]
.get_data_table()
.to_dict()
return pd.DataFrame.from_dict(
(
self.project.volumetric_tables[self._volume_table_name]
.get_data_table()
.to_dict()
)
)
return (
pd.DataFrame.from_dict(dict_values)
.rename(columns=_RENAME_COLUMNS_FROM_RMS)
.drop("REAL", axis=1, errors="ignore")

@staticmethod
def _convert_table_from_rms_to_legacy_format(table: pd.DataFrame) -> pd.DataFrame:
"""Rename columns to legacy naming standard and drop REAL column if present."""
_logger.debug("Converting dataframe from RMS to legacy format...")
return table.rename(columns=_RENAME_COLUMNS_FROM_RMS).drop(
columns="REAL", errors="ignore"
)

@staticmethod
def _convert_table_from_legacy_to_standard_format(
table: pd.DataFrame, table_index: list[str]
) -> pd.DataFrame:
"""
Transformation of a dataframe containing fluid-specific column data into a
standardized format with unified column names, e.g. 'BULK_OIL' and 'PORV_OIL'
are renamed into 'BULK' and 'PORV' columns. To separate the data an additional
FLUID column is added that indicates the type of fluid the row represents.
"""

tables = []
for fluid in [_Fluid.GAS.value, _Fluid.OIL.value]:
fluid_columns = [col for col in table.columns if col.endswith(f"_{fluid}")]
if fluid_columns:
fluid_table = table[table_index + fluid_columns].copy()

# drop fluid suffix from columns to get standard names
fluid_table.columns = fluid_table.columns.str.replace(f"_{fluid}", "")
# add the fluid as column entry instead
fluid_table[_FLUID_COLUMN] = fluid.lower()

tables.append(fluid_table)

return pd.concat(tables, ignore_index=True) if tables else pd.DataFrame()

def _export_volume_table(self) -> ExportResult:
"""Do the actual volume table export using dataio setup."""

Expand Down
Loading

0 comments on commit 94da8ad

Please sign in to comment.