diff --git a/eqn_disco/hybrid_symbolic.py b/eqn_disco/hybrid_symbolic.py index 0bc176d..d4a4b67 100644 --- a/eqn_disco/hybrid_symbolic.py +++ b/eqn_disco/hybrid_symbolic.py @@ -1,28 +1,34 @@ """Hybrid symbolic module.""" -from typing import Optional, List, Dict, Any, Union, Tuple - -from gplearn.functions import _Function as Function +from typing import Optional, List, Dict, Any, Tuple, Union import gplearn.genetic +from gplearn.functions import _Function as Function import numpy as np import xarray as xr from scipy.stats import pearsonr from sklearn.linear_model import LinearRegression -import pyqg - -from .utils import FeatureExtractor, Parameterization +from .utils import ( + FeatureExtractor, + Parameterization, + ArrayLike, + ModelLike, + ensure_numpy, +) -def make_custom_gplearn_functions(data_set: xr.Dataset): +def make_custom_gplearn_functions(data_set: xr.Dataset, spatial_funcs: List[str]): """Define custom gplearn functions for spatial derivatives. - The functions are specific to the spatial shape of that particular dataset. - Parameters ---------- data_set : xarray.Dataset - Dataset generated from pyqg.QGModel runs + Dataset containing spatial variables. + + spatial_funcs : List[str] + Names of spatial differential operators to explore during genetic + programming. Must be supported by the FeatureExtractor class, e.g. + ['ddx', 'ddy', 'laplacian', 'advected']. Returns ------- @@ -33,35 +39,29 @@ def make_custom_gplearn_functions(data_set: xr.Dataset): """ extractor = FeatureExtractor(data_set) - # TODO: What are r and x? - # TODO: Add a docstring. - def apply_spatial(func, x): - r = func(x.reshape(data_set.q.shape)) - if isinstance(r, xr.DataArray): - r = r.data - return r.reshape(x.shape) - - funcs = { - "ddx": lambda x: apply_spatial(extractor.ddx, x), - "ddy": lambda x: apply_spatial(extractor.ddy, x), - "lap": lambda x: apply_spatial(extractor.laplacian, x), - "adv": lambda x: apply_spatial(extractor.advected, x), - } + def apply_spatial(function_name: str, flat_array: ArrayLike) -> np.ndarray: + """Apply a `spatial_function` to an initially `flat_array`.""" + spatial_func = getattr(extractor, function_name) + spatial_array = flat_array.reshape(extractor.spatial_shape) + spatial_output = ensure_numpy(spatial_func(spatial_array)) + return spatial_output.reshape(flat_array.shape) - # create gplearn function objects to represent these transformations return [ - Function(function=funcs["ddx"], name="ddx", arity=1), - Function(function=funcs["ddy"], name="ddy", arity=1), - Function(function=funcs["lap"], name="laplacian", arity=1), - Function(function=funcs["adv"], name="advected", arity=1), + Function( + function=lambda x, name=function_name: apply_spatial(name, x), + name=function_name, + arity=1, + ) + for function_name in spatial_funcs ] def run_gplearn_iteration( data_set: xr.Dataset, target: np.ndarray, - base_feats: Optional[List[str]] = None, - base_funcs: Optional[List[str]] = None, + base_features: Optional[List[str]] = None, + base_functions: Optional[List[str]] = None, + spatial_functions: Optional[List[str]] = None, **kwargs: Dict[str, Any], ): """Run gplearn for one iteration using custom spatial derivatives. @@ -74,10 +74,14 @@ def run_gplearn_iteration( Target spatial field to be predicted from dataset attributes base_features : List[str] Features from the dataset to be used as the initial set of atomic - inputs to genetic programming + inputs to genetic programming. Defaults to ['q', 'u', 'v']. base_functions : List[str] Names of gplearn built-in functions to explore during genetic - programming (in addition to differential operators) + programming (in addition to differential operators). Defaults to + ['add', 'mul']. + spatial_functions : List[str] + Names of spatial differential operators to explore during genetic + programming. Defaults to ['ddx', 'ddy', 'laplacian', 'advected']. **kwargs : dict Additional arguments to pass to gplearn.genetic.SymbolicRegressor @@ -88,11 +92,20 @@ def run_gplearn_iteration( on functions of the dataset's ``base_features``. """ - base_feats = ["q", "u", "v"] if base_feats is None else base_feats - base_funcs = ["add", "mul"] if base_funcs is None else base_funcs + base_features = ["q", "u", "v"] if base_features is None else base_features + base_functions = ["add", "mul"] if base_functions is None else base_functions + spatial_functions = ( + ["ddx", "ddy", "laplacian", "advected"] + if spatial_functions is None + else spatial_functions + ) + spatial_functions = make_custom_gplearn_functions(data_set, spatial_functions) + function_set = base_functions + spatial_functions # type: ignore # Flatten the input and target data - inputs = np.array([data_set[feature].data.reshape(-1) for feature in base_feats]).T + inputs = np.array( + [data_set[feature].data.reshape(-1) for feature in base_features] + ).T targets = target.reshape(-1) gplearn_kwargs = { @@ -113,20 +126,17 @@ def run_gplearn_iteration( # Configure gplearn to run with a relatively small population # and for relatively few generations (again for performance) - # TODO: Can we rename ``sr`` to ``symbolic_regression``? - sr = gplearn.genetic.SymbolicRegressor( - feature_names=base_feats, - function_set=( - base_funcs + make_custom_gplearn_functions(data_set) # use our custom ops - ), + regressor = gplearn.genetic.SymbolicRegressor( + feature_names=base_features, + function_set=function_set, **gplearn_kwargs, ) # Fit the model - sr.fit(inputs, targets) + regressor.fit(inputs, targets) # Return the result - return sr + return regressor class LinearSymbolicRegression(Parameterization): @@ -134,11 +144,8 @@ class LinearSymbolicRegression(Parameterization): Parameters ---------- - lr1 : sklearn.linear_model.LinearRegression - Linear model to predict the upper layer's target based on input - expressions - lr2 : sklearn.linear_model.LinearRegression - Linear model to predict the lower layer's target based on input + models : List[sklearn.linear_model.LinearRegression] + Linear models to predict each layer's target based on input expressions inputs : List[str] List of string input expressions and functions that can be @@ -151,16 +158,15 @@ class LinearSymbolicRegression(Parameterization): def __init__( self, - lr1: LinearRegression, - lr2: LinearRegression, + models: List[LinearRegression], inputs: List[str], target: str, ): """Build ``LinearSymbolicRegression``.""" - self.lr1 = lr1 - self.lr2 = lr2 + self.models = models self.inputs = inputs self.target = target + super().__init__() @property def targets(self) -> List[str]: @@ -174,21 +180,7 @@ def targets(self) -> List[str]: """ return [self.target] - @property - def models(self) -> List[LinearRegression]: - """Return the models. - - Returns - ------- - List[LinearRegression] - The models. - - """ - return [self.lr1, self.lr2] - - # TODO: Are we okay with the number of arguments differing from the method - # in the base class? - def predict(self, m: Union[pyqg.QGModel, xr.Dataset]): + def predict(self, model_or_dataset: ModelLike) -> Dict[str, ArrayLike]: """Make a prediction for a given model or dataset. Parameters @@ -205,28 +197,28 @@ def predict(self, m: Union[pyqg.QGModel, xr.Dataset]): model or dataset). """ - extract = FeatureExtractor(m) + extract = FeatureExtractor(model_or_dataset) - x = extract(self.inputs) + inputs = extract(self.inputs) preds = [] # Do some slightly annoying reshaping to properly apply LR coefficients # to data that may or may not have extra batch dimensions for idx, lr_model in enumerate(self.models): - data_indices = [slice(None) for _ in x.shape] - data_indices[-3] = idx - x_z = x[tuple(data_indices)] - coef_indices = [np.newaxis for _ in x_z.shape] - coef_indices[0] = slice(None) - c_z = lr_model.coef_[tuple(coef_indices)] - pred_z = (x_z * c_z).sum(axis=0) - preds.append(pred_z) + data_indices = [slice(None) for _ in inputs.shape] + data_indices[-3] = idx # type: ignore + layer_inputs = inputs[tuple(data_indices)] + coef_indices = [np.newaxis for _ in layer_inputs.shape] + coef_indices[0] = slice(None) # type: ignore + layer_coefs = lr_model.coef_[tuple(coef_indices)] + layer_preds = (layer_inputs * layer_coefs).sum(axis=0) + preds.append(layer_preds) preds = np.stack(preds, axis=-3) res = {} res[self.target] = preds - return res + return res # type: ignore @classmethod def fit( @@ -255,26 +247,36 @@ def fit( Resulting linear regression parameterization. """ - lrs = [] - for z in [0, 1]: - extract = FeatureExtractor(data_set.isel(lev=z)) - lrs.append( - LinearRegression(fit_intercept=False).fit( - extract(inputs, flat=True), extract(target, flat=True) - ) + extractors = [FeatureExtractor(layer) for layer in _each_layer(data_set)] + + models = [ + LinearRegression(fit_intercept=False).fit( + extract(inputs, flat=True), extract(target, flat=True) ) - # TODO: Pylint says this is bad. Can we unpack lrs before passing it? - return cls(*lrs, inputs, target) + for extract in extractors + ] + + return cls(models, inputs, target) -def corr(spatial_data_a: xr.DataArray, spatial_data_b: xr.DataArray) -> float: +def _each_layer( + data: Union[xr.Dataset, xr.DataArray] +) -> List[Union[xr.Dataset, xr.DataArray]]: + """Return a list representing the `data` broken out by vertical layer.""" + if "lev" in data.dims: + return [data.isel(lev=z) for z in range(len(data.lev))] + + return [data] + + +def _corr(spatial_data_a: ArrayLike, spatial_data_b: ArrayLike) -> float: """Return the Pearson correlation between two spatial data arrays. Parameters ---------- - a : xarray.DataArray + a : Union[xarray.DataArray, numpy.ndarray] First spatial data array - b : xarray.DataArray + b : Union[xarray.DataArray, numpy.ndarray] Second spatial data array Returns @@ -284,8 +286,8 @@ def corr(spatial_data_a: xr.DataArray, spatial_data_b: xr.DataArray) -> float: """ return pearsonr( - np.array(spatial_data_a.data).ravel(), - np.array(spatial_data_b.data).ravel(), + ensure_numpy(spatial_data_a).ravel(), + ensure_numpy(spatial_data_b).ravel(), )[0] @@ -298,15 +300,14 @@ def hybrid_symbolic_regression( # pylint: disable=too-many-locals ) -> Tuple[List[str], List[LinearRegression]]: """Run hybrid symbolic and linear regression. - Uses symbolic regression to find expressions correlated wit the output, + Uses symbolic regression to find expressions correlated with the output, then fitting linear regression to get an exact expression, then running symbolic regression again on the resulting residuals (repeating until ``max_iters``). - Parameters ---------- - ds : xarray.Dataset + data_set : xarray.Dataset Dataset of pyqg.QGModel runs. target : str String representing target variable to predict. Defaults to subgrid @@ -328,23 +329,27 @@ def hybrid_symbolic_regression( # pylint: disable=too-many-locals """ extract = FeatureExtractor(data_set) residual = data_set[target] - terms, vals, lrs = [], [], [] + terms, vals, hybrid_regressors = [], [], [] try: for i in range(max_iters): - for lev in [0, 1]: - # TODO: Can we rename ``sr`` to ``symbolic_regression``? - sr = run_gplearn_iteration( - data_set.isel(lev=lev), target=residual.isel(lev=lev).data, **kw + for data_set_layer, residual_layer in zip( + _each_layer(data_set), _each_layer(residual) # type: ignore + ): + symbolic_regressor = run_gplearn_iteration( + data_set_layer, target=residual_layer.data, **kw # type: ignore + ) + new_term = str( + symbolic_regressor._program # pylint: disable=protected-access ) - new_term = str(sr._program) new_vals = extract(new_term) # Prevent spurious duplicates, e.g. ddx(q) and ddx(add(1,q)) - if not any(corr(new_vals, v) > 0.99 for v in vals): + if not any(_corr(new_vals, v) > 0.99 for v in vals): terms.append(new_term) vals.append(new_vals) - lrs.append(LinearSymbolicRegression.fit(data_set, terms, target)) - preds = lrs[-1].test_offline(data_set).load() + hybrid_regressor = LinearSymbolicRegression.fit(data_set, terms, target) + hybrid_regressors.append(hybrid_regressor) + preds = hybrid_regressor.test_offline(data_set).load() residual = (data_set[target] - preds[f"{target}_predictions"]).load() if verbose: print(f"Iteration {i+1}") @@ -355,4 +360,4 @@ def hybrid_symbolic_regression( # pylint: disable=too-many-locals if verbose: print("Exiting early due to keyboard interrupt") - return terms, lrs + return terms, hybrid_regressors diff --git a/eqn_disco/utils.py b/eqn_disco/utils.py index 6170fcf..08e9e43 100644 --- a/eqn_disco/utils.py +++ b/eqn_disco/utils.py @@ -1,24 +1,49 @@ """Utility functions.""" import operator -from typing import List, Dict, Union, Any +from typing import List, Dict, Union, Any, Tuple, Optional import re -import pyqg + +try: + import pyqg + + IMPORTED_PYQG = True +except ImportError: + IMPORTED_PYQG = False import numpy as np import xarray as xr import matplotlib.pyplot as plt -ModelLike = Union[pyqg.Model, xr.Dataset] +ModelLike = Union[pyqg.Model, xr.Dataset] if IMPORTED_PYQG else xr.Dataset ArrayLike = Union[np.ndarray, xr.DataArray] Numeric = Union[ArrayLike, int, float] StringOrNumeric = Union[str, Numeric] -class Parameterization(pyqg.Parameterization): +def ensure_numpy(array: ArrayLike) -> np.ndarray: + """Ensure a given array-like input is converted to numpy. + + Parameters + ---------- + array : Union[numpy.ndarray, xarray.DataArray] + Input array + + Returns + ------- + numpy.ndarray + Numpy representation of input array + """ + if isinstance(array, xr.DataArray): + return array.data + return array + + +class Parameterization(pyqg.Parameterization if IMPORTED_PYQG else object): # type: ignore """Helper class for defining parameterizations. - This extends the normal pyqg parameterization framework to handle + This extends the normal pyqg.Parameterization framework to handle prediction of either subgrid forcings or fluxes, as well as to apply to - either pyqg.Models orxarray.Datasets. + either pyqg.Models or xarray.Datasets. Can also be used without pyqg, though + in a more limited fashion. """ @@ -30,23 +55,24 @@ def targets(self) -> List[str]: ------- List[str] List of parameterization targets returned by this parameterization. - Valid options are "q_forcing_total", "q_subgrid_forcing", - "u_subgrid_forcing", "v_subgrid_forcing", "uq_subgrid_flux", - "vq_subgrid_flux", "uu_subgrid_flux", "vv_subgrid_flux", and - "uv_subgrid_flux". See the dataset description notebook or the - paper for more details on the meanings of these target fields and - how they're used. + If using within pyqg, valid options are "q_forcing_total", + "q_subgrid_forcing", "u_subgrid_forcing", "v_subgrid_forcing", + "uq_subgrid_flux", "vq_subgrid_flux", "uu_subgrid_flux", + "vv_subgrid_flux", and "uv_subgrid_flux". See the dataset + description notebook or the paper for more details on the meanings + of these target fields and how they're used. """ raise NotImplementedError - def predict(self) -> Dict[str, ArrayLike]: + def predict(self, model_or_dataset: ModelLike) -> Dict[str, ArrayLike]: """Subgrid forcing predictions, as a dictionary of target => array. Parameters ---------- - model : Union[pyqg.QGModel, xarray.Dataset] - Model for which we are making subgrid forcing predictions. + model_or_dataset : Union[pyqg.QGModel, xarray.Dataset] + Model or dataset for which we are making subgrid forcing + predictions. Returns ------- @@ -58,20 +84,6 @@ def predict(self) -> Dict[str, ArrayLike]: """ raise NotImplementedError - @property - def spatial_res(self) -> int: - """Spatial res of pyqg.QGModel to which this parameterization applies. - - Currently only supports 64 to replicate the paper, but could be easily - extended. - - Returns - ------- - int - Spatial resolution, i.e. pyqg.QGModel.nx - """ - return 64 # Future work should generalize this. - @property def parameterization_type(self) -> str: """Return the parameterization type. @@ -87,14 +99,14 @@ def parameterization_type(self) -> str: Indication of whether the parameterization targets PV or velocity. """ + assert IMPORTED_PYQG, "pyqg must be installed to use this method" + if any(q in self.targets[0] for q in ["q_forcing", "q_subgrid"]): return "q_parameterization" return "uv_parameterization" - def __call__( - self, model: ModelLike - ) -> Union[np.ndarray, tuple[np.ndarray, np.ndarray]]: + def __call__(self, model: ModelLike) -> Union[np.ndarray, Tuple[np.ndarray, ...]]: """Invoke the parameterization in the format required by pyqg. Parameters @@ -112,9 +124,12 @@ def __call__( type as the model's PV variable. """ - ensure_array = lambda x: (x.data if isinstance(x, xr.DataArray) else x).astype( - model.q.dtype - ) + assert IMPORTED_PYQG, "pyqg must be installed to use this method" + + def _ensure_array(array: ArrayLike) -> np.ndarray: + """Convert an array-like to numpy with model-compatible dtype.""" + return ensure_numpy(array).astype(model.q.dtype) + preds = self.predict(model) keys = list(sorted(preds.keys())) # these are the same as our targets assert keys == self.targets @@ -124,33 +139,37 @@ def __call__( if len(keys) == 1: # if there's only one target, it's a PV parameterization, and we can # just return the array - return ensure_array(preds[keys[0]]) - elif keys == ["uq_subgrid_flux", "vq_subgrid_flux"]: + return _ensure_array(preds[keys[0]]) + if keys == ["uq_subgrid_flux", "vq_subgrid_flux"]: # these are PV subgrid fluxes; we need to take their divergence extractor = FeatureExtractor(model) - return ensure_array( + return _ensure_array( extractor.ddx(preds["uq_subgrid_flux"]) + extractor.ddy(preds["vq_subgrid_flux"]) ) - elif "uu_subgrid_flux" in keys and len(keys) == 3: + if "uu_subgrid_flux" in keys and len(keys) == 3: # these are velocity subgrid fluxes; we need to take two sets of # divergences and return a tuple extractor = FeatureExtractor(model) return ( - ensure_array( + _ensure_array( extractor.ddx(preds["uu_subgrid_flux"]) + extractor.ddy(preds["uv_subgrid_flux"]) ), - ensure_array( + _ensure_array( extractor.ddx(preds["uv_subgrid_flux"]) + extractor.ddy(preds["vv_subgrid_flux"]) ), ) - else: - # this is a "simple" velocity parameterization; return a tuple - return tuple(ensure_array(preds[k]) for k in keys) - - def run_online(self, sampling_freq: int = 1000, **kwargs) -> xr.Dataset: + # Otherwise, this is a "simple" velocity parameterization; return a tuple + return tuple(_ensure_array(preds[k]) for k in keys) + + def run_online( + self, + sampling_freq: int = 1000, + nx: int = 64, # pylint: disable=invalid-name + **kwargs, + ) -> xr.Dataset: """Initialize and run a parameterized pyqg.QGModel. Saves snapshots periodically. @@ -158,9 +177,12 @@ def run_online(self, sampling_freq: int = 1000, **kwargs) -> xr.Dataset: Parameters ---------- sampling_freq : int - Number of timesteps (hours) between saving snapshots. + Number of timesteps (hours) between saving snapshots. Defaults to + 1000. + spatial_res : int + Number of horizontal grid points for the model. Defaults to 64. **kwargs : dict - Simulation parameters to pass to pyqg.QGModel. + Other simulation parameters to pass to pyqg.QGModel. Returns ------- @@ -168,27 +190,29 @@ def run_online(self, sampling_freq: int = 1000, **kwargs) -> xr.Dataset: Dataset of parameterized model run snapshots """ + assert IMPORTED_PYQG, "pyqg must be installed to run this method" + # Initialize a pyqg model with this parameterization params = dict(kwargs) params[self.parameterization_type] = self - params["nx"] = self.spatial_res + params["nx"] = nx model = pyqg.QGModel(**params) # Run it, saving snapshots snapshots = [] - while model.t < m.tmax: + while model.t < model.tmax: if model.tc % sampling_freq == 0: snapshots.append(model.to_dataset().copy(deep=True)) - model._step_forward() + model._step_forward() # pylint: disable=protected-access data_set = xr.concat(snapshots, dim="time") # Diagnostics get dropped by this procedure since they're only present for # part of the timeseries; resolve this by saving the most recent # diagnostics (they're already time-averaged so this is ok) - for k, v in snapshots[-1].variables.items(): - if k not in data_set: - data_set[k] = v.isel(time=-1) + for key, value in snapshots[-1].variables.items(): + if key not in data_set: + data_set[key] = value.isel(time=-1) # Drop complex variables since they're redundant and can't be saved complex_vars = [k for k, v in data_set.variables.items() if np.iscomplexobj(v)] @@ -220,12 +244,9 @@ def test_offline(self, data_set: xr.Dataset) -> xr.Dataset: test[f"{key}_predictions"] = truth * 0 + val preds = test[f"{key}_predictions"] error = (truth - preds) ** 2 + true_var = (truth - truth.mean()) ** 2 - true_centered = truth - truth.mean() - pred_centered = preds - preds.mean() - true_var = true_centered**2 - - def dims_except(*dims): + def dims_except(*dims, key=key): return [d for d in test[key].dims if d not in dims] time = dims_except("x", "y", "lev") @@ -269,7 +290,9 @@ class FeatureExtractor: """ - def __call__(self, feature_or_features: Union[str, List[str]], flat: bool = False): + def __call__( + self, feature_or_features: Union[str, List[str]], flat: bool = False + ) -> np.ndarray: """Extract the given feature/features from underlying dataset/ model. Parameters @@ -286,38 +309,65 @@ def __call__(self, feature_or_features: Union[str, List[str]], flat: bool = Fals Array of values of corresponding features. """ - arr = lambda x: x.data if isinstance(x, xr.DataArray) else x if isinstance(feature_or_features, str): - res = arr(self.extract_feature(feature_or_features)) + res = ensure_numpy(self.extract_feature(feature_or_features)) # type: ignore if flat: res = res.reshape(-1) else: - res = np.array([arr(self.extract_feature(f)) for f in feature_or_features]) + res = np.array( + [ensure_numpy(self.extract_feature(f)) for f in feature_or_features] # type: ignore + ) if flat: res = res.reshape(len(feature_or_features), -1).T return res - def __init__(self, model_or_dataset: ModelLike): + def __init__( + self, model_or_dataset: ModelLike, example_realspace_input: Optional[str] = None + ): """Build ``FeatureExtractor``.""" - self.m = model_or_dataset + self.model = model_or_dataset self.cache = {} - if hasattr(self.m, "_ik"): - self.ik, self.il = np.meshgrid(self.m._ik, self.m._il) - elif hasattr(self.m, "fft"): - self.ik = 1j * self.m.k - self.il = 1j * self.m.l + assert hasattr( + self.model, "x" + ), "dataset must have horizontal realspace dimension" + assert hasattr( + self.model, "k" + ), "dataset must have horizontal spectral dimension" + assert hasattr( + self.model, "y" + ), "dataset must have vertical realspace dimension" + assert hasattr(self.model, "l"), "dataset must have vertical spectral dimension" + + if example_realspace_input is None: + if hasattr(self.model, "q"): + example_realspace_input = "q" + elif isinstance(self.model, xr.Dataset): + example_realspace_input = next( + key for key, val in self.model.items() if "x" in val.dims + ) + self.example_realspace_input = getattr(self.model, example_realspace_input) + + if hasattr(self.model, "_ik"): + self.ik, self.il = np.meshgrid( # pylint: disable=invalid-name + self.model._ik, self.model._il + ) + elif hasattr(self.model, "fft"): + self.ik = 1j * self.model.k # pylint: disable=invalid-name + self.il = 1j * self.model.l # pylint: disable=invalid-name else: - k, l = np.meshgrid(self.m.k, self.m.l) - self.ik = 1j * k - self.il = 1j * l + k, l = np.meshgrid( # pylint: disable=invalid-name + self.model.k, self.model.l + ) + self.ik = 1j * k # pylint: disable=invalid-name + self.il = 1j * l # pylint: disable=invalid-name - self.nx = self.ik.shape[0] + self.nx = self.ik.shape[0] # pylint: disable=invalid-name self.wv2 = self.ik**2 + self.il**2 # Helpers for taking FFTs / deciding if we need to - def fft(self, x: ArrayLike) -> ArrayLike: + def fft(self, real_array: ArrayLike) -> ArrayLike: """Compute the FFT of ``x``. Parameters @@ -333,17 +383,33 @@ def fft(self, x: ArrayLike) -> ArrayLike: """ try: # pyqg.Models will respond to `fft` (which might be pyFFTW, which is fast) - return self.m.fft(x) + return self.model.fft(real_array) except AttributeError: # if we got an attribute error, that means we have an xarray.Dataset. # use numpy FFTs and return a data array instead. - dims = [dict(y="l", x="k").get(d, d) for d in self["q"].dims] - coords = dict([(d, self[d]) for d in dims]) + dims = self.spectral_dims return xr.DataArray( - np.fft.rfftn(x, axes=(-2, -1)), dims=dims, coords=coords + np.fft.rfftn(real_array, axes=(-2, -1)), + dims=dims, + coords={d: self[d] for d in dims}, ) - def ifft(self, x: ArrayLike) -> ArrayLike: + @property + def spatial_shape(self) -> Tuple[int, ...]: + """Spatial shape of variables in real space.""" + return self.example_realspace_input.shape + + @property + def spatial_dims(self) -> Tuple[str, ...]: + """Names of spatial dimensions in real space.""" + return self.example_realspace_input.dims + + @property + def spectral_dims(self) -> List[str]: + """Names of spatial dimensions in spectral space.""" + return [{"y": "l", "x": "k"}.get(d, d) for d in self.spatial_dims] + + def ifft(self, spectral_array: ArrayLike) -> ArrayLike: """Compute the inverse FFT of ``x``. Parameters @@ -358,67 +424,87 @@ def ifft(self, x: ArrayLike) -> ArrayLike: """ try: - return self.m.ifft(x) + return self.model.ifft(spectral_array) except AttributeError: - return self["q"] * 0 + np.fft.irfftn(x, axes=(-2, -1)) + # Convert numpy to xarray by adding 0 with the right dimensions + realspace_array = np.fft.irfftn(spectral_array, axes=(-2, -1)) + zero_as_data_array = self.example_realspace_input * 0 + return zero_as_data_array + realspace_array - def is_real(self, arr: ArrayLike) -> bool: + def _is_real(self, arr: ArrayLike) -> bool: + """Check if a given array is in real space.""" return len(set(arr.shape[-2:])) == 1 - def real(self, feature: StringOrNumeric) -> ArrayLike: + def _real(self, feature: StringOrNumeric) -> ArrayLike: + """Load and convert a feature to real space, if necessary.""" arr = self[feature] - if isinstance(arr, float): - return arr - if self.is_real(arr): + if self._is_real(arr): return arr return self.ifft(arr) - def compl(self, feature: StringOrNumeric) -> ArrayLike: + def _compl(self, feature: StringOrNumeric) -> ArrayLike: + """Load and convert a feature to spectral space, if necessary.""" arr = self[feature] - if isinstance(arr, float): - return arr - if self.is_real(arr): + if self._is_real(arr): return self.fft(arr) return arr # Spectral derivatrives - def ddxh(self, f: StringOrNumeric) -> ArrayLike: - return self.ik * self.compl(f) - - def ddyh(self, f: StringOrNumeric) -> ArrayLike: - return self.il * self.compl(f) - - def divh(self, x: StringOrNumeric, y: StringOrNumeric) -> ArrayLike: - return self.ddxh(x) + self.ddyh(y) - - def curlh(self, x: StringOrNumeric, y: StringOrNumeric) -> ArrayLike: - return self.ddxh(y) - self.ddyh(x) - - def laplacianh(self, x: StringOrNumeric) -> ArrayLike: - return self.wv2 * self.compl(x) - - def advectedh(self, x_: StringOrNumeric) -> ArrayLike: - x = self.real(x_) - return self.ddxh(x * self.m.ufull) + self.ddyh(x * self.m.vfull) + def ddxh(self, field: StringOrNumeric) -> ArrayLike: + """Compute the horizontal derivative of ``field`` in spectral space.""" + return self.ik * self._compl(field) + + def ddyh(self, field: StringOrNumeric) -> ArrayLike: + """Compute the vertical derivative of ``field`` in spectral space.""" + return self.il * self._compl(field) + + def divh(self, field_x: StringOrNumeric, field_y: StringOrNumeric) -> ArrayLike: + """Compute the divergence of a vector field in spectral space.""" + return self.ddxh(field_x) + self.ddyh(field_y) + + def curlh(self, field_x: StringOrNumeric, field_y: StringOrNumeric) -> ArrayLike: + """Compute the curl of a vector field in spectral space.""" + return self.ddxh(field_y) - self.ddyh(field_x) + + def laplacianh(self, field: StringOrNumeric) -> ArrayLike: + """Compute the Laplacian of a field in spectral space.""" + return self.wv2 * self._compl(field) + + def advectedh(self, possibly_spectral_field: StringOrNumeric) -> ArrayLike: + """Advect a field in spectral space.""" + assert hasattr( + self.model, "ufull" + ), "Model must have `ufull` and `vfull` to advect" + assert hasattr( + self.model, "vfull" + ), "Model must have `ufull` and `vfull` to advect" + field = self._real(possibly_spectral_field) + return self.ddxh(field * self.model.ufull) + self.ddyh(field * self.model.vfull) # Real counterparts - def ddx(self, f: StringOrNumeric) -> ArrayLike: - return self.real(self.ddxh(f)) + def ddx(self, field: StringOrNumeric) -> ArrayLike: + """Compute the horizontal derivative of a field.""" + return self._real(self.ddxh(field)) - def ddy(self, f: StringOrNumeric) -> ArrayLike: - return self.real(self.ddyh(f)) + def ddy(self, field: StringOrNumeric) -> ArrayLike: + """Compute the vertical derivative of a field.""" + return self._real(self.ddyh(field)) - def laplacian(self, x: StringOrNumeric) -> ArrayLike: - return self.real(self.laplacianh(x)) + def laplacian(self, field: StringOrNumeric) -> ArrayLike: + """Compute the Laplacian of a field.""" + return self._real(self.laplacianh(field)) - def advected(self, x: StringOrNumeric) -> ArrayLike: - return self.real(self.advectedh(x)) + def advected(self, field: StringOrNumeric) -> ArrayLike: + """Advect a field.""" + return self._real(self.advectedh(field)) - def curl(self, x: StringOrNumeric, y: StringOrNumeric) -> ArrayLike: - return self.real(self.curlh(x, y)) + def curl(self, field_x: StringOrNumeric, field_y: StringOrNumeric) -> ArrayLike: + """Compute the curl of two fields.""" + return self._real(self.curlh(field_x, field_y)) - def div(self, x: StringOrNumeric, y: StringOrNumeric) -> ArrayLike: - return self.real(self.divh(x, y)) + def div(self, field_x: StringOrNumeric, field_y: StringOrNumeric) -> ArrayLike: + """Compute the divergence of two fields.""" + return self._real(self.divh(field_x, field_y)) # Main function: interpreting a string as a feature def extract_feature(self, feature: str) -> Numeric: @@ -447,7 +533,7 @@ def extract_feature(self, feature: str) -> Numeric: binary_operator "mul" | "add" | "sub" | "pow" | "div" | "curl" number - [\-\d\.]+ + "-"? [0-9]+ "."? [0-9]* variable_name .* @@ -466,8 +552,10 @@ def extract_feature(self, feature: str) -> Numeric: - ``add``, which adds two expressions - ``sub``, which subtracts the second expression from the first - ``pow``, which takes the first expression to the power of the second - - ``div``, which takes the divergence of the vector field whose x and y components are given by the two expressions, respectively - - ``curl``, which takes the curl of the vector field whose x and y components are given by the two expressions, respectively + - ``div``, which takes the divergence of the vector field whose x + and y components are given by the two expressions, respectively + - ``curl``, which takes the curl of the vector field whose x and y + components are given by the two expressions, respectively Parameters ---------- @@ -481,76 +569,84 @@ def extract_feature(self, feature: str) -> Numeric: Numeric or array-like expression representing the value of the feature. """ - # Helper to recurse on each side of an arity-2 expression - def extract_pair(s): + + def extract_pair(string: str) -> Tuple[Numeric, Numeric]: + """Extract two features from a comma-separated pair.""" depth = 0 - for i, char in enumerate(s): + for i, char in enumerate(string): if char == "(": depth += 1 elif char == ")": depth -= 1 elif char == "," and depth == 0: - return self.extract_feature(s[:i].strip()), self.extract_feature( - s[i + 1 :].strip() + return ( + self.extract_feature(string[:i].strip()), + self.extract_feature(string[i + 1 :].strip()), ) - raise ValueError(f"string {s} is not a comma-separated pair") + raise ValueError(f"string {string} is not a comma-separated pair") - real_or_spectral = lambda arr: arr + [a + "h" for a in arr] + def real_or_spectral(arr: List[str]) -> List[str]: + """Convert a list of strings to a list of real/spectral versions.""" + return arr + [a + "h" for a in arr] - if not self.extracted(feature): + if not self._extracted(feature): # Check if the feature looks like "function(expr1, expr2)" # (better would be to write a grammar + use a parser, # but this is a very simple DSL) match = re.search(r"^([a-z]+)\((.*)\)$", feature) if match: - op, inner = match.group(1), match.group(2) - if op in ["mul", "add", "sub", "pow"]: - self.cache[feature] = getattr(operator, op)(*extract_pair(inner)) - elif op in ["neg", "abs"]: - self.cache[feature] = getattr(operator, op)( + op_name, inner = match.group(1), match.group(2) + if op_name in ["mul", "add", "sub", "pow"]: + self.cache[feature] = getattr(operator, op_name)( + *extract_pair(inner) + ) + elif op_name in ["neg", "abs"]: + self.cache[feature] = getattr(operator, op_name)( + self.extract_feature(inner) + ) + elif op_name in real_or_spectral(["div", "curl"]): + self.cache[feature] = getattr(self, op_name)(*extract_pair(inner)) + elif op_name in real_or_spectral( + ["ddx", "ddy", "advected", "laplacian"] + ): + self.cache[feature] = getattr(self, op_name)( self.extract_feature(inner) ) - elif op in real_or_spectral(["div", "curl"]): - self.cache[feature] = getattr(self, op)(*extract_pair(inner)) - elif op in real_or_spectral(["ddx", "ddy", "advected", "laplacian"]): - self.cache[feature] = getattr(self, op)(self.extract_feature(inner)) else: raise ValueError(f"could not interpret {feature}") - elif re.search(f"^[\-\d\.]+$", feature): - # ensure numbers still work - return float(feature) elif feature == "streamfunction": # hack to make streamfunctions work in both datasets & pyqg.Models self.cache[feature] = self.ifft(self["ph"]) + elif re.search(r"^\-?\d+\.?\d*$", feature): + # ensure numbers still work + return float(feature) else: raise ValueError(f"could not interpret {feature}") return self[feature] - def extracted(self, key: str) -> bool: - return key in self.cache or hasattr(self.m, key) + def _extracted(self, key: str) -> bool: + """Check if a feature has already been extracted.""" + return key in self.cache or hasattr(self.model, key) # A bit of additional hackery to allow for the reading of features or properties def __getitem__(self, attribute: StringOrNumeric) -> Any: + """Read an attribute from the model or cache, or echo back if numeric.""" if isinstance(attribute, str): if attribute in self.cache: return self.cache[attribute] - elif re.search(r"^[\-\d\.]+$", attribute): + if re.search(r"^[\-\d\.]+$", attribute): return float(attribute) - else: - return getattr(self.m, attribute) - elif any( - [ - isinstance(attribute, kls) - for kls in [xr.DataArray, np.ndarray, int, float] - ] + return getattr(self.model, attribute) + if any( + isinstance(attribute, kls) for kls in [xr.DataArray, np.ndarray, int, float] ): return attribute - else: - raise KeyError(attribute) + raise KeyError(attribute) def energy_budget_term(model, term): + """Compute an energy budget term, handling parameterization contributions if present.""" val = model[term] if "paramspec_" + term in model: val += model["paramspec_" + term] @@ -558,6 +654,7 @@ def energy_budget_term(model, term): def energy_budget_figure(models, skip=0): + """Plot the energy budget for a set of models.""" fig = plt.figure(figsize=(12, 5)) vmax = 0 for i, term in enumerate(["KEflux", "APEflux", "APEgenspec", "KEfrictionspec"]): @@ -586,3 +683,49 @@ def energy_budget_figure(models, skip=0): axis.set_ylim(-vmax, vmax) plt.tight_layout() return fig + + +def example_non_pyqg_data_set( + grid_length: int = 8, num_samples: int = 20 +) -> xr.Dataset: + """Create a simple xarray dataset for testing the library without `pyqg`. + + This dataset has a single variable called `inputs` with `x`, `y`, and + `batch` coordinates. It also has spectral coordinates `k` and `l` defined. + It can be used in various methods of the library without needing to invoke + `pyqg`. + + Parameters + ---------- + grid_length : int + The length of the grid in each dimension. + num_samples : int + The number of samples in the dataset. + + Returns + ------- + xr.Dataset + The dataset. + + """ + grid = np.linspace(0, 1, grid_length) + inputs = np.random.normal(size=(num_samples, grid_length, grid_length)) + vertical_wavenumbers = ( + 2 + * np.pi + * np.append(np.arange(0.0, grid_length / 2), np.arange(-grid_length / 2, 0.0)) + ) + horizontal_wavenumbers = 2 * np.pi * np.arange(0.0, grid_length / 2 + 1) + + return xr.Dataset( + data_vars={ + "inputs": (("batch", "y", "x"), inputs), + }, + coords={ + "x": grid, + "y": grid, + "l": vertical_wavenumbers, + "k": horizontal_wavenumbers, + "batch": np.arange(num_samples), + }, + ) diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..f2e4e72 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,2 @@ +[mypy] +disable_error_code = attr-defined, import, valid-type, var-annotated \ No newline at end of file diff --git a/tests/test_hybrid_symbolic.py b/tests/test_hybrid_symbolic.py index ac89bd9..fe95a5b 100644 --- a/tests/test_hybrid_symbolic.py +++ b/tests/test_hybrid_symbolic.py @@ -1,8 +1,8 @@ import pyqg import numpy as np import xarray as xr -from eqn_disco.hybrid_symbolic import LinearSymbolicRegression -from eqn_disco.utils import FeatureExtractor +from eqn_disco.hybrid_symbolic import LinearSymbolicRegression, run_gplearn_iteration, hybrid_symbolic_regression +from eqn_disco.utils import FeatureExtractor, example_non_pyqg_data_set def test_linear_symbolic(): model = pyqg.QGModel() @@ -27,3 +27,57 @@ def test_linear_symbolic(): model2 = pyqg.QGModel(parameterization=parameterization) model2._step_forward() + + +def test_run_gplearn_iteration(): + data_set = example_non_pyqg_data_set() + + extractor = FeatureExtractor(data_set, example_realspace_input="inputs") + + target = extractor.extract_feature('ddx(inputs)').data + + regressor = run_gplearn_iteration( + data_set, + target, + base_features=['inputs'], + base_functions=[], + spatial_functions=['ddx'], + population_size=100, + generations=10, + metric='mse', + random_state=42 + ) + + result = str(regressor._program) + + assert result == 'ddx(inputs)' + + +def test_hybrid_symbolic_regression(): + data_set = example_non_pyqg_data_set() + + extractor = FeatureExtractor(data_set, example_realspace_input="inputs") + + data_set['target'] = extractor.extract_feature('ddx(inputs)') + + terms, hybrid_regressors = hybrid_symbolic_regression( + data_set, + target='target', + max_iters=2, + verbose=False, + base_features=['inputs'], + base_functions=[], + spatial_functions=['ddx'], + population_size=100, + generations=10, + metric='mse', + random_state=42 + ) + + assert terms == ['ddx(inputs)'] + + regressor = hybrid_regressors[-1] + + assert len(regressor.models) == 1 + + np.testing.assert_allclose(regressor.models[0].coef_[0], 1.0) \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py index 0a2eb4a..f2aba39 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -13,3 +13,54 @@ def test_feature_extractor(): lap_adv_q2 = extractor2.extract_feature('laplacian(advected(q))') np.testing.assert_allclose(lap_adv_q1, lap_adv_q2.data[0]) + + +def test_parameterization_with_pyqg(): + class MyParameterization(utils.Parameterization): + @property + def targets(self): + return ['q_subgrid_forcing'] + + def predict(self, model_or_dataset: utils.ModelLike): + return dict(q_subgrid_forcing=model_or_dataset.q * 0.0 + 1e-8) + + parameterization = MyParameterization() + + # Test that the parameterization can be used within pyqg + model = pyqg.QGModel(parameterization=parameterization) + model._step_forward() + + # Test that pyqg-invoking methods work + data_set = parameterization.run_online(dt=7200.0, tmax=7200.0, tavestart=0.0) + assert len(data_set.time) == 1 + assert len(data_set.lev) == 2 + assert len(data_set.x) == len(data_set.y) == 64 + + +def test_parameterization_without_pyqg(): + class MyParameterization(utils.Parameterization): + @property + def targets(self): + return ['target'] + + def predict(self, model_or_dataset: utils.ModelLike): + return dict( + target=getattr(model_or_dataset, 'inputs') ** 2 + ) + + data_set = utils.example_non_pyqg_data_set() + + extractor = utils.FeatureExtractor(data_set) + + data_set['target'] = extractor.extract_feature('inputs') ** 2 + + parameterization = MyParameterization() + result = parameterization.test_offline(data_set) + + np.testing.assert_allclose(result.mse, 0.0) + np.testing.assert_allclose(result.skill, 1.0) + np.testing.assert_allclose(result.correlation, 1.0) + + np.testing.assert_allclose(result.target_spatial_mse, np.zeros((8, 8))) + np.testing.assert_allclose(result.target_spatial_skill, np.ones((8, 8))) + np.testing.assert_allclose(result.target_spatial_correlation, np.ones((8, 8))) \ No newline at end of file