Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Satisfy code quality checks and generalize to non-pyqg use-cases #12

Merged
merged 9 commits into from
Apr 27, 2023
217 changes: 111 additions & 106 deletions eqn_disco/hybrid_symbolic.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
"""Hybrid symbolic module."""
from typing import Optional, List, Dict, Any, Union, Tuple

from gplearn.functions import _Function as Function
from typing import Optional, List, Dict, Any, Tuple, Union

import gplearn.genetic
from gplearn.functions import _Function as Function
import numpy as np
import xarray as xr
from scipy.stats import pearsonr
from sklearn.linear_model import LinearRegression

import pyqg

from .utils import FeatureExtractor, Parameterization
from .utils import (
FeatureExtractor,
Parameterization,
ArrayLike,
ModelLike,
ensure_numpy,
)


def make_custom_gplearn_functions(data_set: xr.Dataset):
def make_custom_gplearn_functions(data_set: xr.Dataset, spatial_funcs: List[str]):
"""Define custom gplearn functions for spatial derivatives.

The functions are specific to the spatial shape of that particular dataset.

Parameters
----------
data_set : xarray.Dataset
Dataset generated from pyqg.QGModel runs
Dataset containing spatial variables.

spatial_funcs : List[str]
Names of spatial differential operators to explore during genetic
programming. Must be supported by the FeatureExtractor class, e.g.
['ddx', 'ddy', 'laplacian', 'advected'].

Returns
-------
Expand All @@ -33,35 +39,29 @@ def make_custom_gplearn_functions(data_set: xr.Dataset):
"""
extractor = FeatureExtractor(data_set)

# TODO: What are r and x?
# TODO: Add a docstring.
def apply_spatial(func, x):
r = func(x.reshape(data_set.q.shape))
if isinstance(r, xr.DataArray):
r = r.data
return r.reshape(x.shape)

funcs = {
"ddx": lambda x: apply_spatial(extractor.ddx, x),
"ddy": lambda x: apply_spatial(extractor.ddy, x),
"lap": lambda x: apply_spatial(extractor.laplacian, x),
"adv": lambda x: apply_spatial(extractor.advected, x),
}
def apply_spatial(function_name: str, flat_array: ArrayLike) -> np.ndarray:
"""Apply a `spatial_function` to an initially `flat_array`."""
spatial_func = getattr(extractor, function_name)
spatial_array = flat_array.reshape(extractor.spatial_shape)
spatial_output = ensure_numpy(spatial_func(spatial_array))
return spatial_output.reshape(flat_array.shape)

# create gplearn function objects to represent these transformations
return [
Function(function=funcs["ddx"], name="ddx", arity=1),
Function(function=funcs["ddy"], name="ddy", arity=1),
Function(function=funcs["lap"], name="laplacian", arity=1),
Function(function=funcs["adv"], name="advected", arity=1),
Function(
function=lambda x, name=function_name: apply_spatial(name, x),
name=function_name,
arity=1,
)
for function_name in spatial_funcs
]


def run_gplearn_iteration(
data_set: xr.Dataset,
target: np.ndarray,
base_feats: Optional[List[str]] = None,
base_funcs: Optional[List[str]] = None,
base_features: Optional[List[str]] = None,
base_functions: Optional[List[str]] = None,
spatial_functions: Optional[List[str]] = None,
**kwargs: Dict[str, Any],
):
"""Run gplearn for one iteration using custom spatial derivatives.
Expand All @@ -74,10 +74,14 @@ def run_gplearn_iteration(
Target spatial field to be predicted from dataset attributes
base_features : List[str]
Features from the dataset to be used as the initial set of atomic
inputs to genetic programming
inputs to genetic programming. Defaults to ['q', 'u', 'v'].
base_functions : List[str]
Names of gplearn built-in functions to explore during genetic
programming (in addition to differential operators)
programming (in addition to differential operators). Defaults to
['add', 'mul'].
spatial_functions : List[str]
Names of spatial differential operators to explore during genetic
programming. Defaults to ['ddx', 'ddy', 'laplacian', 'advected'].
**kwargs : dict
Additional arguments to pass to gplearn.genetic.SymbolicRegressor

Expand All @@ -88,11 +92,20 @@ def run_gplearn_iteration(
on functions of the dataset's ``base_features``.

"""
base_feats = ["q", "u", "v"] if base_feats is None else base_feats
base_funcs = ["add", "mul"] if base_funcs is None else base_funcs
base_features = ["q", "u", "v"] if base_features is None else base_features
base_functions = ["add", "mul"] if base_functions is None else base_functions
spatial_functions = (
["ddx", "ddy", "laplacian", "advected"]
if spatial_functions is None
else spatial_functions
)
spatial_functions = make_custom_gplearn_functions(data_set, spatial_functions)
function_set = base_functions + spatial_functions # type: ignore

# Flatten the input and target data
inputs = np.array([data_set[feature].data.reshape(-1) for feature in base_feats]).T
inputs = np.array(
[data_set[feature].data.reshape(-1) for feature in base_features]
).T
targets = target.reshape(-1)

gplearn_kwargs = {
Expand All @@ -113,32 +126,26 @@ def run_gplearn_iteration(

# Configure gplearn to run with a relatively small population
# and for relatively few generations (again for performance)
# TODO: Can we rename ``sr`` to ``symbolic_regression``?
sr = gplearn.genetic.SymbolicRegressor(
feature_names=base_feats,
function_set=(
base_funcs + make_custom_gplearn_functions(data_set) # use our custom ops
),
regressor = gplearn.genetic.SymbolicRegressor(
feature_names=base_features,
function_set=function_set,
**gplearn_kwargs,
)

# Fit the model
sr.fit(inputs, targets)
regressor.fit(inputs, targets)

# Return the result
return sr
return regressor


class LinearSymbolicRegression(Parameterization):
"""Linear symbolic regress parameterization.

Parameters
----------
lr1 : sklearn.linear_model.LinearRegression
Linear model to predict the upper layer's target based on input
expressions
lr2 : sklearn.linear_model.LinearRegression
Linear model to predict the lower layer's target based on input
models : List[sklearn.linear_model.LinearRegression]
Linear models to predict each layer's target based on input
expressions
inputs : List[str]
List of string input expressions and functions that can be
Expand All @@ -151,16 +158,15 @@ class LinearSymbolicRegression(Parameterization):

def __init__(
self,
lr1: LinearRegression,
lr2: LinearRegression,
models: List[LinearRegression],
inputs: List[str],
target: str,
):
"""Build ``LinearSymbolicRegression``."""
self.lr1 = lr1
self.lr2 = lr2
self.models = models
self.inputs = inputs
self.target = target
super().__init__()

@property
def targets(self) -> List[str]:
Expand All @@ -174,21 +180,7 @@ def targets(self) -> List[str]:
"""
return [self.target]

@property
def models(self) -> List[LinearRegression]:
"""Return the models.

Returns
-------
List[LinearRegression]
The models.

"""
return [self.lr1, self.lr2]

# TODO: Are we okay with the number of arguments differing from the method
# in the base class?
def predict(self, m: Union[pyqg.QGModel, xr.Dataset]):
def predict(self, model_or_dataset: ModelLike) -> Dict[str, ArrayLike]:
"""Make a prediction for a given model or dataset.

Parameters
Expand All @@ -205,28 +197,28 @@ def predict(self, m: Union[pyqg.QGModel, xr.Dataset]):
model or dataset).

"""
extract = FeatureExtractor(m)
extract = FeatureExtractor(model_or_dataset)

x = extract(self.inputs)
inputs = extract(self.inputs)

preds = []

# Do some slightly annoying reshaping to properly apply LR coefficients
# to data that may or may not have extra batch dimensions
for idx, lr_model in enumerate(self.models):
data_indices = [slice(None) for _ in x.shape]
data_indices[-3] = idx
x_z = x[tuple(data_indices)]
coef_indices = [np.newaxis for _ in x_z.shape]
coef_indices[0] = slice(None)
c_z = lr_model.coef_[tuple(coef_indices)]
pred_z = (x_z * c_z).sum(axis=0)
preds.append(pred_z)
data_indices = [slice(None) for _ in inputs.shape]
data_indices[-3] = idx # type: ignore
layer_inputs = inputs[tuple(data_indices)]
coef_indices = [np.newaxis for _ in layer_inputs.shape]
coef_indices[0] = slice(None) # type: ignore
layer_coefs = lr_model.coef_[tuple(coef_indices)]
layer_preds = (layer_inputs * layer_coefs).sum(axis=0)
preds.append(layer_preds)

preds = np.stack(preds, axis=-3)
res = {}
res[self.target] = preds
return res
return res # type: ignore

@classmethod
def fit(
Expand Down Expand Up @@ -255,26 +247,36 @@ def fit(
Resulting linear regression parameterization.

"""
lrs = []
for z in [0, 1]:
extract = FeatureExtractor(data_set.isel(lev=z))
lrs.append(
LinearRegression(fit_intercept=False).fit(
extract(inputs, flat=True), extract(target, flat=True)
)
extractors = [FeatureExtractor(layer) for layer in _each_layer(data_set)]

models = [
LinearRegression(fit_intercept=False).fit(
extract(inputs, flat=True), extract(target, flat=True)
)
# TODO: Pylint says this is bad. Can we unpack lrs before passing it?
return cls(*lrs, inputs, target)
for extract in extractors
]

return cls(models, inputs, target)


def corr(spatial_data_a: xr.DataArray, spatial_data_b: xr.DataArray) -> float:
def _each_layer(
data: Union[xr.Dataset, xr.DataArray]
) -> List[Union[xr.Dataset, xr.DataArray]]:
"""Return a list representing the `data` broken out by vertical layer."""
if "lev" in data.dims:
return [data.isel(lev=z) for z in range(len(data.lev))]

return [data]


def _corr(spatial_data_a: ArrayLike, spatial_data_b: ArrayLike) -> float:
"""Return the Pearson correlation between two spatial data arrays.

Parameters
----------
a : xarray.DataArray
a : Union[xarray.DataArray, numpy.ndarray]
First spatial data array
b : xarray.DataArray
b : Union[xarray.DataArray, numpy.ndarray]
Second spatial data array

Returns
Expand All @@ -284,8 +286,8 @@ def corr(spatial_data_a: xr.DataArray, spatial_data_b: xr.DataArray) -> float:

"""
return pearsonr(
np.array(spatial_data_a.data).ravel(),
np.array(spatial_data_b.data).ravel(),
ensure_numpy(spatial_data_a).ravel(),
ensure_numpy(spatial_data_b).ravel(),
)[0]


Expand All @@ -298,15 +300,14 @@ def hybrid_symbolic_regression( # pylint: disable=too-many-locals
) -> Tuple[List[str], List[LinearRegression]]:
"""Run hybrid symbolic and linear regression.

Uses symbolic regression to find expressions correlated wit the output,
Uses symbolic regression to find expressions correlated with the output,
then fitting linear regression to get an exact expression, then running
symbolic regression again on the resulting residuals (repeating until
``max_iters``).


Parameters
----------
ds : xarray.Dataset
data_set : xarray.Dataset
Dataset of pyqg.QGModel runs.
target : str
String representing target variable to predict. Defaults to subgrid
Expand All @@ -328,23 +329,27 @@ def hybrid_symbolic_regression( # pylint: disable=too-many-locals
"""
extract = FeatureExtractor(data_set)
residual = data_set[target]
terms, vals, lrs = [], [], []
terms, vals, hybrid_regressors = [], [], []

try:
for i in range(max_iters):
for lev in [0, 1]:
# TODO: Can we rename ``sr`` to ``symbolic_regression``?
sr = run_gplearn_iteration(
data_set.isel(lev=lev), target=residual.isel(lev=lev).data, **kw
for data_set_layer, residual_layer in zip(
_each_layer(data_set), _each_layer(residual) # type: ignore
):
symbolic_regressor = run_gplearn_iteration(
data_set_layer, target=residual_layer.data, **kw # type: ignore
)
new_term = str(
symbolic_regressor._program # pylint: disable=protected-access
)
new_term = str(sr._program)
new_vals = extract(new_term)
# Prevent spurious duplicates, e.g. ddx(q) and ddx(add(1,q))
if not any(corr(new_vals, v) > 0.99 for v in vals):
if not any(_corr(new_vals, v) > 0.99 for v in vals):
terms.append(new_term)
vals.append(new_vals)
lrs.append(LinearSymbolicRegression.fit(data_set, terms, target))
preds = lrs[-1].test_offline(data_set).load()
hybrid_regressor = LinearSymbolicRegression.fit(data_set, terms, target)
hybrid_regressors.append(hybrid_regressor)
preds = hybrid_regressor.test_offline(data_set).load()
residual = (data_set[target] - preds[f"{target}_predictions"]).load()
if verbose:
print(f"Iteration {i+1}")
Expand All @@ -355,4 +360,4 @@ def hybrid_symbolic_regression( # pylint: disable=too-many-locals
if verbose:
print("Exiting early due to keyboard interrupt")

return terms, lrs
return terms, hybrid_regressors
Loading