diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a3ab40..a79aed6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,15 @@ - Parameters in ParamSpace can also be indexed by name - Parameters now have search_space property, to modify the optimizer search space from the full space - Continuous parameters have search_min/search_max; Discete parameteres have search_categories +- Constraints are now defined by Constraint class +- Input constraints can now be included in ParamSpace, and serialized from there +- Output constraints can now be included in Campaign, and serialized from there +- New interface class IParamSpace to address circular import issues between ParamSpace and Constraint ### Modified - Optimizer and Campaign X_space attributes are now assigned using setter -### Remvoed +### Removed - Torch device references and options (GPU compatibility may be re-added) ## [0.8.4] diff --git a/obsidian/campaign/campaign.py b/obsidian/campaign/campaign.py index 8ed10c6..90d324c 100644 --- a/obsidian/campaign/campaign.py +++ b/obsidian/campaign/campaign.py @@ -4,7 +4,9 @@ from obsidian.optimizer import Optimizer, BayesianOptimizer from obsidian.experiment import ExpDesigner from obsidian.objectives import Objective, Objective_Sequence, obj_class_dict +from obsidian.constraints import Output_Constraint, const_class_dict from obsidian.exceptions import IncompatibleObjectiveError +from obsidian.utils import tensordict_to_dict import obsidian import pandas as pd @@ -42,6 +44,7 @@ class Campaign(): def __init__(self, X_space: ParamSpace, target: Target | list[Target], + constraints: Output_Constraint | list[Output_Constraint] | None = None, optimizer: Optimizer | None = None, designer: ExpDesigner | None = None, objective: Objective | None = None, @@ -59,6 +62,9 @@ def __init__(self, self.set_target(target) self.set_objective(objective) + self.output_constraints = None + self.constrain_outputs(constraints) + # Non-object attributes self.iter = 0 self.seed = seed @@ -269,59 +275,6 @@ def X(self) -> pd.DataFrame: """ return self.data[list(self.X_space.X_names)] - def save_state(self) -> dict: - """ - Saves the state of the Campaign object as a dictionary. - - Returns: - dict: A dictionary containing the saved state of the Campaign object. - """ - obj_dict = {} - obj_dict['X_space'] = self.X_space.save_state() - obj_dict['optimizer'] = self.optimizer.save_state() - obj_dict['data'] = self.data.to_dict() - obj_dict['target'] = [t.save_state() for t in self.target] - if self.objective: - obj_dict['objective'] = self.objective.save_state() - obj_dict['seed'] = self.seed - - return obj_dict - - @classmethod - def load_state(cls, - obj_dict: dict): - """ - Loads the state of the campaign from a dictionary. - - Args: - cls (Campaign): The class object. - obj_dict (dict): A dictionary containing the campaign state. - - Returns: - Campaign: A new campaign object with the loaded state. - """ - - if 'objective' in obj_dict: - if obj_dict['objective']['name'] == 'Objective_Sequence': - new_objective = Objective_Sequence.load_state(obj_dict['objective']) - else: - obj_class = obj_class_dict[obj_dict['objective']['name']] - new_objective = obj_class.load_state(obj_dict['objective']) - else: - new_objective = None - - new_campaign = cls(X_space=ParamSpace.load_state(obj_dict['X_space']), - target=[Target.load_state(t_dict) for t_dict in obj_dict['target']], - optimizer=BayesianOptimizer.load_state(obj_dict['optimizer']), - objective=new_objective, - seed=obj_dict['seed']) - new_campaign.data = pd.DataFrame(obj_dict['data']) - new_campaign.data.index = new_campaign.data.index.astype('int') - - new_campaign.iter = new_campaign.data['Iteration'].astype('int').max() - - return new_campaign - def __repr__(self): """String representation of object""" return f"obsidian Campaign for {getattr(self,'y_names', None)}; {getattr(self,'m_exp', 0)} observations" @@ -353,7 +306,9 @@ def suggest(self, **optim_kwargs): try: # In case X_space has changed, re-set the optimizer X_space self.optimizer.set_X_space(self.X_space) - X, eval = self.optimizer.suggest(objective=self.objective, **optim_kwargs) + X, eval = self.optimizer.suggest(objective=self.objective, + out_constraints=self.output_constraints, + **optim_kwargs) return (X, eval) except Exception: warnings.warn('Optimization failed') @@ -430,3 +385,82 @@ def _analyze(self): ) return + + def constrain_outputs(self, + constraints: Output_Constraint | list[Output_Constraint] | None) -> None: + """ + Sets optional output constraints for the campaign. + """ + if constraints is not None: + if isinstance(constraints, Output_Constraint): + constraints = [constraints] + self.output_constraints = constraints + + return + + def clear_output_constraints(self): + """Clears output constraints""" + self.output_constraints = None + + def save_state(self) -> dict: + """ + Saves the state of the Campaign object as a dictionary. + + Returns: + dict: A dictionary containing the saved state of the Campaign object. + """ + obj_dict = {} + obj_dict['X_space'] = self.X_space.save_state() + obj_dict['optimizer'] = self.optimizer.save_state() + obj_dict['data'] = self.data.to_dict() + obj_dict['target'] = [t.save_state() for t in self.target] + if self.objective: + obj_dict['objective'] = self.objective.save_state() + obj_dict['seed'] = self.seed + + if getattr(self, 'output_constraints', None): + obj_dict['output_constraints'] = [{'class': const.__class__.__name__, + 'state': tensordict_to_dict(const.state_dict())} + for const in self.output_constraints] + + return obj_dict + + @classmethod + def load_state(cls, + obj_dict: dict): + """ + Loads the state of the campaign from a dictionary. + + Args: + cls (Campaign): The class object. + obj_dict (dict): A dictionary containing the campaign state. + + Returns: + Campaign: A new campaign object with the loaded state. + """ + + if 'objective' in obj_dict: + if obj_dict['objective']['name'] == 'Objective_Sequence': + new_objective = Objective_Sequence.load_state(obj_dict['objective']) + else: + obj_class = obj_class_dict[obj_dict['objective']['name']] + new_objective = obj_class.load_state(obj_dict['objective']) + else: + new_objective = None + + new_campaign = cls(X_space=ParamSpace.load_state(obj_dict['X_space']), + target=[Target.load_state(t_dict) for t_dict in obj_dict['target']], + optimizer=BayesianOptimizer.load_state(obj_dict['optimizer']), + objective=new_objective, + seed=obj_dict['seed']) + new_campaign.data = pd.DataFrame(obj_dict['data']) + new_campaign.data.index = new_campaign.data.index.astype('int') + + new_campaign.iter = new_campaign.data['Iteration'].astype('int').max() + + if 'output_constraints' in obj_dict: + for const_dict in obj_dict['output_constraints']: + const = const_class_dict[const_dict['class']](new_campaign.target, **const_dict['state']) + new_campaign.constrain_outputs(const) + + return new_campaign diff --git a/obsidian/constraints/__init__.py b/obsidian/constraints/__init__.py index 8c8f863..c9211ea 100644 --- a/obsidian/constraints/__init__.py +++ b/obsidian/constraints/__init__.py @@ -1,4 +1,6 @@ """Constraints: Restrict the recommended space during optimization""" +from .base import * from .input import * from .output import * +from .config import * diff --git a/obsidian/constraints/base.py b/obsidian/constraints/base.py new file mode 100644 index 0000000..0c18f4c --- /dev/null +++ b/obsidian/constraints/base.py @@ -0,0 +1,20 @@ +"""Base class for obsidian constraints""" + + +from abc import abstractmethod, ABC +from torch.nn import Module + + +class Constraint(ABC, Module): + """ + Base class for constraints, which restrict the input or output space + of a model or optimization problem + """ + + def __init__(self) -> None: + super().__init__() + return + + @abstractmethod + def forward(self): + pass # pragma: no cover diff --git a/obsidian/constraints/config.py b/obsidian/constraints/config.py new file mode 100644 index 0000000..50be2d3 --- /dev/null +++ b/obsidian/constraints/config.py @@ -0,0 +1,17 @@ +"""Method pointers and config for constraints""" + +from .input import ( + Linear_Constraint, + BatchVariance_Constraint +) + +from .output import ( + Blank_Constraint, + L1_Constraint +) + +const_class_dict = {'Linear_Constraint': Linear_Constraint, + 'BatchVariance_Constraint': BatchVariance_Constraint, + 'Blank_Constraint': Blank_Constraint, + 'L1_Constraint': L1_Constraint + } diff --git a/obsidian/constraints/input.py b/obsidian/constraints/input.py index 3c36557..1b07235 100644 --- a/obsidian/constraints/input.py +++ b/obsidian/constraints/input.py @@ -1,135 +1,180 @@ """Constraints on the input features of a parameter space""" -from obsidian.parameters import ParamSpace, Param_Continuous +from obsidian.parameters import IParamSpace, Param_Continuous +from obsidian.constraints import Constraint from obsidian.config import TORCH_DTYPE import torch from torch import Tensor +from abc import abstractmethod # See https://botorch.org/api/optim.html, optimize_acqf "equality_constraints", "inequality_constraints", # and "nonlinear_inequality_constraints" -# Equality = list(tuple) / tuple = {indices = Tensor, coefficients = Tensor, rhs = float} / sum_i(X[indices[i]] * coefficients[i]) = rhs -# Inquality = list(tuple) / tuple = (indices = Tensor, coefficients = Tensor, rhs = float) / sum_i(X[indices[i]] * coefficients[i]) >= rhs -# Use 1-d tensor of indices for intra point constraint -# Use 2-d tensor of indices for inter point constraint where indices[i] = (k_i, l_i, ...) +class Input_Constraint(Constraint): + """ + Input constraint for a given parameter space. -# Nonlinear inequality = list(tuple) / tuple = (constraint(x) = callable, intra-point = bool) -# Intra piont: callable takes X in (d) and return scalar -# Intra point: when q = 1, or when applying the same constraint to each candidate in the batch -# Inter point: callable takes X in (q x d) and returns scalar -# Inter point: When q > 1 and the constraints are applied to an entire batch + Note: Saving and loading input constraints is managed by ParamSpace. + The interface class IParamSpace is used here to avoid circular imports + with constraints that depend on ParamSpace, which saves/loads constraints. + """ + def __init__(self, + X_space: IParamSpace): + super().__init__() + self.X_space = X_space -def InConstraint_Generic(X_space: ParamSpace, - indices: list[float | int] = [0], - coeff: list[float | int | list] = [0], - rhs: int | float = -1) -> tuple[Tensor, Tensor, Tensor]: +class Linear_Constraint(Input_Constraint): """ - Creates an input constraint for a given parameter space. + Input constraint for a given parameter space. + + Note: SUM(LHS) <= RHS equivalent to -SUM(LHS) >= -RHS - Args: + Linear constraints must return: + tuple = (indices = Tensor, coefficients = Tensor, rhs = float) + where sum_i(X[indices[i]] * coefficients[i]) = rhs (equality) or >= rhs (inequality) + + Attributes: X_space (ParamSpace): The parameter space object. - indices (list[float | int], optional): The indices of the parameters to be constrained. Defaults to ``[0]``. - coeff (list[float | int | list], optional): The coefficients for the parameters in the constraint equation. Defaults to ``[0]``. - rhs (int | float, optional): The right-hand side value of the constraint equation. Defaults to ``-1``. - - Returns: - tuple[Tensor, Tensor, Tensor]: A tuple containing the indices, coefficients, and right-hand side value of the constraint. - - Raises: - TypeError: If X_space is not an obsidian ParamSpace - TypeError: If indices or coeff are not a list - TypeError: If rhs is not numeric - TypeError: If indices are not continuous parameters - - Notes: - - The constraint equation is of the form: coeff_X1 * X1 + coeff_X2 * X2 + ... = rhs - - The indices and coefficients should correspond to continuous parameters in the parameter space. - - The constraint equation is transformed to the encoded space before returning. - - Example: - X_space = ParamSpace(...) - indices = [0, 1] - coeff = [1, -2] - rhs = 10 - constraint = InConstraint_Generic(X_space, indices, coeff, rhs) - """ - - if not isinstance(X_space, ParamSpace): - raise TypeError('X_space must be a ParamSpace object') - if not isinstance(indices, list): - raise TypeError('Indices must be a list') - if not isinstance(coeff, list): - raise TypeError('Coefficients must be a list') - if not isinstance(rhs, (int, float)): - raise TypeError('RHS must be a scalar') - - # Determine which parameters in the decoded space are indicated - i_t = [X_space.tinv_map[i] for i in indices] - - for i in i_t: - if not isinstance(X_space.params[i], Param_Continuous): - raise TypeError('Indeces for input constraints must be \ - for continuous parameters only') + ind (list[float | int], optional): The indices of the parameters to be constrained. + Defaults to ``[0]``. + weights (list[float | int | list], optional): The coefficients for the parameters in the + constraint equation. Defaults to ``[1]``. + rhs (int | float, optional): The right-hand side value of the constraint equation. + Defaults to ``-1``. + equality (bool, optional): Whether the constraint is an equality (=) or inequality (>=) + constraint. Defaults to ``False`` for inequality constraint. - # We neeed to write the constraints on the encoded space - # SUM(LHS) <= RHS equivalent to -SUM(LHS) >= -RHS + """ + def __init__(self, + X_space: IParamSpace, + ind: list[float | int] = [0], + weights: list[float | int] = [1], + rhs: int | float = -1, + equality: bool = False) -> None: + super().__init__(X_space) + + self.register_buffer('ind', torch.tensor(ind, dtype=torch.int64)) + self.register_buffer('weights', torch.tensor(weights, dtype=TORCH_DTYPE)) + self.register_buffer('rhs', torch.tensor(rhs, dtype=TORCH_DTYPE)) + self.register_buffer('equality', torch.tensor(equality, dtype=torch.bool)) + + # Determine which parameters in the decoded space are indicated + self.ind_t = torch.tensor([X_space.t_map[i] for i in ind], + dtype=torch.int64) + for i in ind: + if not isinstance(X_space.params[i], Param_Continuous): + raise TypeError('Indeces for input constraints must be \ + for continuous parameters only') + + # We neeed to write the constraints on the encoded space + # To convert (raw -> encoded) for a continuous param with min-max scaling: + # w_t = w * range + # rhs_t = rhs - sum(w_t * min) + weights_t = [] + rhs_t = rhs + for w, i in zip(weights, ind): + param_i = X_space.params[i] + rhs_t -= param_i.min*w + weights_t.append(w * param_i.range) + self.weights_t = torch.tensor(weights_t, dtype=TORCH_DTYPE) + self.rhs_t = torch.tensor(rhs_t, dtype=TORCH_DTYPE) + + def forward(self) -> tuple[Tensor, Tensor, Tensor]: + """ + Forward method for the input constraint. + + Forms the linear constraint in the tuple that can be handled by BoTorch optimizer. + + Returns: + tuple[Tensor, Tensor, Tensor]: A tuple containing the indices, coefficients, + and right-hand side value of the constraint in the encoded space. + + """ + return self.ind_t, self.weights_t, self.rhs_t + + def __repr__(self) -> str: + """String representation of object""" + return (f'{self.__class__.__name__}(ind={self.ind},' + f'weights={self.weights}, rhs={self.rhs}, equality={self.equality})') + + +class Nonlinear_Constraint(Input_Constraint): + """ + Abstract class for nonlinear constraints + + Nonlinear inequality constraints must return: + tuple = (constraint(x) = callable, intra-point = bool) + + where: + Intra point: callable takes X in (d) and return scalar + Intra point: when q = 1, or when applying the same constraint to each + candidate in the batch + Inter point: callable takes X in (q x d) and returns scalar + Inter point: When q > 1 and the constraints are applied to an entire + batch of candidates + + Use 1-d tensor of indices for intra point constraint + Use 2-d tensor of indices for inter point constraint where indices[i] = (k_i, l_i, ...) - c_t = [] - rhs_t = rhs - for c, i in zip(coeff, i_t): - param_i = X_space.params[i] - rhs_t -= param_i.min*c - c_t.append(c * param_i.range) + """ + @abstractmethod + def forward(self) -> tuple[callable, bool]: + """ + Forward method for the input constraint. - linear_constraint = (torch.tensor(indices), - torch.tensor([float(c) for c in c_t], dtype=TORCH_DTYPE), - torch.tensor(rhs_t, dtype=TORCH_DTYPE)) + Returns: + tuple[callable, bool]: A tuple containing the callable function and a boolean + indicating whether the constraint is an intra-point constraint. - return linear_constraint + """ + pass # pragma: no cover -def InConstraint_ConstantDim(X_space: ParamSpace, - dim: int, - tol: int | float = 0.01) -> tuple[callable, bool]: +class BatchVariance_Constraint(Nonlinear_Constraint): """ - Constraint which maintains one parameter at a relatively constant (but still freely optimized) - value within a set of suggestions. Useful for simplifying operational complexity of large- - scale experimental optimization (e.g. temperature on a well plate) - - Args: + Constraint how much one parameter can vary over a batch of optimized points + + Attributes: X_space (ParamSpace): The parameter space object. - dim (int): The dimension of the constant parameter. - tol (int | float, optional): The tolerance value. Defaults to ``0.01``. + indices (list[float | int], optional): The indices of the parameters to be constrained. + Defaults to ``[0]``. + coeff (list[float | int | list], optional): The coefficients for the parameters in the + constraint equation. Defaults to ``[1]``. + rhs (int | float, optional): The right-hand side value of the constraint equation. + Defaults to ``-1``. - Returns: - tuple[callable, bool]: A tuple containing the constraint function and a boolean indicating if it is an inter-point constraint. - - Raises: - TypeError: If X_space is not an obsidian ParamSpace - TypeError: If dim is not an integer - TypeError: If tol is not numeric - TypeError: If the dimension is not a continuous parameter - """ + def __init__(self, + X_space: IParamSpace, + ind: int, + tol: int | float = 0.01) -> None: + + super().__init__(X_space) - if not isinstance(X_space, ParamSpace): - raise TypeError('X_space must be a ParamSpace object') - if not isinstance(dim, int): - raise TypeError('Dimension must be an integer') - if not isinstance(tol, (int, float)): - raise TypeError('Tolerance must be a scalar') - if not isinstance(X_space.params[dim], Param_Continuous): - raise TypeError('Constant dimension must be a continuous parameter') + self.register_buffer('ind', torch.tensor(ind, dtype=torch.int64)) + self.register_buffer('tol', torch.tensor(tol, dtype=TORCH_DTYPE)) - t_dim = X_space.t_map[dim] + # Determine which parameters in the decoded space are indicated + self.ind_t = torch.tensor(X_space.t_map[ind], dtype=torch.int64) + if not isinstance(X_space.params[ind], Param_Continuous): + raise TypeError('Indeces for input constraints must be \ + for continuous parameters only') - def nl_func(X: Tensor) -> Tensor: - X_range = X.max(dim=0).values - X.min(dim=0).values - target_range = X_range[t_dim] - return tol - target_range + def forward(self): - nonlinear_constraint = (nl_func, False) # False for inter-point - return nonlinear_constraint + def nl_func(X: Tensor) -> Tensor: + X_range = X.max(dim=0).values - X.min(dim=0).values + target_range = X_range[self.ind_t] + return self.tol - target_range + + nonlinear_constraint = (nl_func, False) # False for inter-point + + return nonlinear_constraint + + def __repr__(self) -> str: + """String representation of object""" + return (f'{self.__class__.__name__}(ind={self.ind},' + f'tol={self.tol})') diff --git a/obsidian/constraints/output.py b/obsidian/constraints/output.py index dfc624c..a28f05c 100644 --- a/obsidian/constraints/output.py +++ b/obsidian/constraints/output.py @@ -1,49 +1,85 @@ """Constraints on the output responses of a model""" +from .base import Constraint + from obsidian.parameters import Target from obsidian.utils import unscale_samples +from obsidian.config import TORCH_DTYPE import torch from torch import Tensor from typing import Callable -# Negative values imply feasibility! -# Note that these are OUTPUT constraints - -def OutConstraint_Blank(target: Target | list[Target]) -> Callable: +class Output_Constraint(Constraint): """ - Dummy constraint function that proposes all samples as feasible. + Output constraint for a given set of targets. - Args: - target (Target or list[Target]): The target or list of targets. - - Returns: - callable: callable constraint function + Must return a callable function that computes feasibility, where + negative values imply feasible space. + Note: Saving and loading input constraints is managed by Campaign """ - def constraint(samples: Tensor) -> Tensor: - samples = unscale_samples(samples, target) - feasibility = -1*torch.ones(size=samples.shape).max(dim=-1).values - return feasibility - return constraint + def __init__(self, + target: Target | list[Target]): + super().__init__() + self.target = self._validate_target(target) + def _validate_target(self, target: Target | list[Target]): -def OutConstraint_L1(target: Target | list[Target], - offset: int | float = 1) -> Callable: - """ - Calculates the L1 (absolute-value penalized) constraint + if not isinstance(target, (Target, list)): + raise TypeError('Target must be a Target object or a list of Target objects') + if isinstance(target, list): + for t in target: + if not isinstance(t, Target): + raise TypeError('Target must be a Target object or a list of Target objects') + if isinstance(target, Target): + target = [target] + + return target - Args: - target (Target | list[Target]): The target value or a list of target values. - offset (int | float, optional): The offset value for the constraint. Defaults to 1. - Returns: - callable: callable constraint function +class Blank_Constraint(Output_Constraint): + """ + Dummy constraint function that proposes all samples as feasible. + """ + def __init__(self, + target: Target | list[Target]): + super().__init__(target) + def forward(self, + scale: bool = True) -> Callable: + def constraint(samples: Tensor) -> Tensor: + if scale: + samples = unscale_samples(samples, self.target) + feasibility = -1*torch.ones(size=samples.shape).max(dim=-1).values + return feasibility + return constraint + + def __repr__(self): + """String representation of object""" + return f'{self.__class__.__name__}' + + +class L1_Constraint(Output_Constraint): """ - def constraint(samples: Tensor) -> Tensor: - samples = unscale_samples(samples, target) - feasibility = (samples.sum(dim=-1) - offset) - return feasibility - return constraint + Calculates the L1 (absolute-value penalized) constraint + """ + def __init__(self, + target: Target | list[Target], + offset: int | float = 1): + super().__init__(target) + self.register_buffer('offset', torch.tensor(offset, dtype=TORCH_DTYPE)) + + def forward(self, + scale: bool = True) -> Callable: + def constraint(samples: Tensor) -> Tensor: + if scale: + samples = unscale_samples(samples, self.target) + feasibility = (samples.sum(dim=-1) - self.offset) + return feasibility + return constraint + + def __repr__(self): + """String representation of object""" + return f'{self.__class__.__name__}(offset={self.offset})' diff --git a/obsidian/optimizer/bayesian.py b/obsidian/optimizer/bayesian.py index e835177..f43c22e 100644 --- a/obsidian/optimizer/bayesian.py +++ b/obsidian/optimizer/bayesian.py @@ -7,6 +7,7 @@ from obsidian.acquisition import aq_class_dict, aq_defaults, aq_hp_defaults, valid_aqs from obsidian.surrogates import model_class_dict from obsidian.objectives import Index_Objective, Objective_Sequence +from obsidian.constraints import Linear_Constraint, Nonlinear_Constraint, Output_Constraint from obsidian.exceptions import IncompatibleObjectiveError, UnsupportedError, UnfitError, DataWarning from obsidian.config import TORCH_DTYPE @@ -25,7 +26,6 @@ from torch import Tensor import pandas as pd import numpy as np -from typing import Callable import warnings @@ -580,10 +580,10 @@ def suggest(self, optim_samples: int = 512, optim_restarts: int = 10, objective: MCAcquisitionObjective | None = None, - out_constraints: list[Callable] | None = None, - eq_constraints: tuple[Tensor, Tensor, float] | None = None, - ineq_constraints: tuple[Tensor, Tensor, float] | None = None, - nleq_constraints: tuple[Callable, bool] | None = None, + out_constraints: Output_Constraint | list[Output_Constraint] | None = None, + eq_constraints: Linear_Constraint | list[Linear_Constraint] | None = None, + ineq_constraints: Linear_Constraint | list[Linear_Constraint] | None = None, + nleq_constraints: Nonlinear_Constraint | list[Nonlinear_Constraint] | None = None, task_index: int = 0, fixed_var: dict[str: float | str] | None = None, X_pending: pd.DataFrame | None = None, @@ -632,15 +632,14 @@ def suggest(self, of the acquisition function. The default value is ``10``. objective (MCAcquisitionObjective, optional): The objective function to be used for optimization. The default is ``None``. - out_constraints (list of Callable, optional): A list of constraints to be applied to the output space. - The default is ``None``. - eq_constraints (tuple of Tensor, Tensor, float, optional): A tuple of tensors representing the equality - constraints, the target values, and the tolerance. The default is ``None``. - ineq_constraints (tuple of Tensor, Tensor, float, optional): A tuple of tensors representing the inequality - constraints, the target values, and the tolerance. The default is ``None``. - nleq_constraints (tuple of Callable, bool, optional): A tuple of functions representing the nonlinear - inequality constraints and a boolean indicating whether the constraints are active. - The default is ``None``. + out_constraints (Output_Constraint | list[Output_Constraint], optional): An output constraint, or a list + thereof, restricting the search space by outcomes. The default is ``None``. + eq_constraints (Linear_Constraint | list[Linear_Constraint], optional): A linear constraint, or a list + thereof, restricting the search space by equality (=). The default is ``None``. + ineq_constraints (Linear_Constraint | list[Linear_Constraint], optional): A linear constraint, or a list + thereof, restricting the search space by inequality (>=). The default is ``None``. + nleq_constraints (Nonlinear_Constraint | list[Nonlinear_Constraint], optional): A nonlinear constraint, + or a list thereof, restricting the search space by nonlinear feasibility. The default is ``None``. task_index (int, optional): The index of the task to optimize for multi-task models. The default is ``0``. fixed_var (dict(str:float), optional): Name of a variable and setting, over which the suggestion should be fixed. Default values is ``None`` @@ -661,7 +660,6 @@ def suggest(self, IncorrectObjectiveError: If the objective does not successfully execute on a sample. TypeError: If the acquisition is not a list of strings or dictionaries. UnsupportedError: If the provided acquisition function does not support output constraints. - UnsupportedError: If nonlinear constraints are provided with discrete features. """ @@ -760,29 +758,53 @@ def suggest(self, aq_kwargs = {'model': model, 'sampler': sampler, 'X_pending': X_t_pending} aq_kwargs.update(self._parse_aq_kwargs(aq_str, aq_hps, m_batch, target_locs, X_t_pending, objective)) - - # Type check for constraints - for constraint_type in eq_constraints, ineq_constraints, nleq_constraints, out_constraints: - if constraint_type: - if not isinstance(constraint_type, list): - raise TypeError('Constraints must be passed as lists of callables') - + + # Raise errors related to certain constraints if aq_str in ['UCB', 'Mean', 'TS', 'SF', 'SR', 'NIPV']: if out_constraints is not None: raise UnsupportedError('Provided aquisition function does not support output constraints') else: - aq_kwargs['constraints'] = out_constraints + if out_constraints and not isinstance(out_constraints, list): + out_constraints = [out_constraints] + aq_kwargs['constraints'] = [c.forward(scale=objective is None) + for c in out_constraints] if out_constraints else None + + # If NoneType, coerce to list + if not eq_constraints: + eq_constraints = [] + if not ineq_constraints: + ineq_constraints = [] + if not nleq_constraints: + nleq_constraints = [] + + # Coerce input constraints to lists + if not isinstance(eq_constraints, list): + eq_constraints = [eq_constraints] + if not isinstance(ineq_constraints, list): + ineq_constraints = [ineq_constraints] + if not isinstance(nleq_constraints, list): + nleq_constraints = [nleq_constraints] + + # Append X_space constraints + if getattr(self.X_space, 'linear_constraints', []): + for c in self.X_space.linear_constraints: + if c.equality: + eq_constraints.append(c) + else: + ineq_constraints.append(c) + if getattr(self.X_space, 'nonlinear_constraints', []): + nleq_constraints += self.X_space.nonlinear_constraints # Input constraints are used by optim_acqf and friends - optim_kwargs = {'equality_constraints': eq_constraints, - 'inequality_constraints': ineq_constraints, - 'nonlinear_inequality_constraints': nleq_constraints} + optim_kwargs = {'equality_constraints': [c() for c in eq_constraints] if eq_constraints else None, + 'inequality_constraints': [c() for c in ineq_constraints] if ineq_constraints else None, + 'nonlinear_inequality_constraints': [c() for c in nleq_constraints] if nleq_constraints else None} optim_options = {} # Can optionally specify batch_limit or max_iter # If nonlinear constraints are used, BoTorch doesn't provide an ic_generator # Must provide manual samples = just use random initialization - if nleq_constraints is not None: + if nleq_constraints: X_ic = torch.ones((optim_samples, 1 if fixed_features_list else m_batch, self.X_space.n_tdim))*torch.rand(1) optim_kwargs['batch_initial_conditions'] = X_ic if fixed_features_list: diff --git a/obsidian/parameters/__init__.py b/obsidian/parameters/__init__.py index 2cf3c9c..48bd78b 100644 --- a/obsidian/parameters/__init__.py +++ b/obsidian/parameters/__init__.py @@ -1,9 +1,9 @@ """Parameters: Define the classification of input features""" +from .targets import * from .base import * from .continuous import * from .discrete import * from .param_space import * -from .targets import * from .transforms import * from .utils import * diff --git a/obsidian/parameters/base.py b/obsidian/parameters/base.py index 25d1222..f6dff7c 100644 --- a/obsidian/parameters/base.py +++ b/obsidian/parameters/base.py @@ -60,3 +60,30 @@ def load_state(cls, Parameter: A new instance of the Parameter class with the loaded state. """ return cls(**obj_dict) + + +class IParamSpace(ABC): + """ + Interface for parameter space classes. + """ + + def __init__(self, params: list[Parameter]): + self.params = tuple(params) + + def __iter__(self): + """Iterate over the parameters in the parameter space""" + return iter(self.params) + + def __len__(self): + """Number of parameters in the parameter space""" + return len(self.params) + + def __repr__(self): + """String representation of object""" + return f"{self.__class__.__name__}(params={[p.name for p in self]})" + + def __getitem__(self, index: int | str) -> Parameter: + """Retrieve a parameter by index""" + if isinstance(index, str): + index = self.X_names.index(index) + return self.params[index] diff --git a/obsidian/parameters/config.py b/obsidian/parameters/config.py new file mode 100644 index 0000000..780268a --- /dev/null +++ b/obsidian/parameters/config.py @@ -0,0 +1,19 @@ +"""Method pointers and config for parameters""" + +from .continuous import ( + Param_Continuous, + Param_Observational +) +from .discrete import ( + Param_Categorical, + Param_Ordinal, + Param_Discrete_Numeric, + Task, +) + +param_class_dict = {'Param_Continuous': Param_Continuous, + 'Param_Categorical': Param_Categorical, + 'Param_Ordinal': Param_Ordinal, + 'Param_Discrete_Numeric': Param_Discrete_Numeric, + 'Param_Observational': Param_Observational, + 'Task': Task} diff --git a/obsidian/parameters/param_space.py b/obsidian/parameters/param_space.py index bc8a2b5..4d9bb78 100644 --- a/obsidian/parameters/param_space.py +++ b/obsidian/parameters/param_space.py @@ -1,21 +1,39 @@ """A collection of parameters jointly defining an operating or optimization space""" -from .continuous import Parameter, Param_Continuous, Param_Observational +from .base import ( + IParamSpace, + Parameter +) +from .continuous import ( + Param_Continuous, + Param_Observational +) from .discrete import ( - Param_Categorical, Param_Ordinal, - Param_Discrete, Param_Discrete_Numeric, - Task, CAT_SEP + Param_Categorical, + Param_Ordinal, + Param_Discrete, + Param_Discrete_Numeric, + Task, + CAT_SEP ) -from abc import ABC -import torch -import numpy as np -import pandas as pd +from .config import param_class_dict +from obsidian.constraints import ( + Input_Constraint, + Linear_Constraint, + Nonlinear_Constraint, + const_class_dict +) from obsidian.exceptions import UnsupportedError +from obsidian.utils import tensordict_to_dict + +import torch +import numpy as np +import pandas as pd -class ParamSpace(ABC): +class ParamSpace(IParamSpace): """ Class designed to define the parameter space in which an optimization can be conducted. @@ -104,26 +122,12 @@ def __init__(self, self.X_t_task_idx = next(i_t for i_t, i in self.tinv_map.items() if self[i] in self.X_task) else: self.X_t_task_idx = None - - return - - def __iter__(self): - """Iterate over the parameters in the parameter space""" - return iter(self.params) - - def __len__(self): - """Number of parameters in the parameter space""" - return len(self.params) - def __repr__(self): - """String representation of object""" - return f"{self.__class__.__name__}(params={[p.name for p in self]})" + # Set up storage for constraints + self.linear_constraints = [] + self.nonlinear_constraints = [] - def __getitem__(self, index: int | str) -> Parameter: - """Retrieve a parameter by index""" - if isinstance(index, str): - index = self.X_names.index(index) - return self.params[index] + return def map_transform(self) -> dict: """ @@ -247,10 +251,49 @@ def search_space(self) -> pd.DataFrame: return X_search_t - def open_search(self): + def open_search(self) -> None: """Set the search space to the parameter space""" for param in self: param.open_search() + + def mean(self) -> pd.DataFrame: + """ + Calculates the mean values for each parameter in the parameter space. + + Returns: + pd.DataFrame: A DataFrame containing the mean values for each parameter. + """ + row = {} + for param_i in self: + if isinstance(param_i, Param_Continuous): # Mean of continuous + row[param_i.name] = param_i.unit_demap([0.5])[0] + elif isinstance(param_i, Param_Discrete): # First of discrete + row[param_i.name] = param_i.categories[0] + + df_mean = pd.DataFrame([row]) + + return df_mean + + def constrain_inputs(self, + constraint: Input_Constraint) -> None: + """ + Constrains the input space based on the specified equality, inequality, or nonlinear constraint. + + Args: + constraint (Input_Constraint): The constraint to be applied to the input space. + """ + if isinstance(constraint, Linear_Constraint): + self.linear_constraints.append(constraint) + elif isinstance(constraint, Nonlinear_Constraint): + self.nonlinear_constraints.append(constraint) + + return + + def clear_constraints(self) -> None: + """Clears all constraints from the input space.""" + self.linear_constraints = [] + self.nonlinear_constraints = [] + return def save_state(self) -> dict: """ @@ -261,7 +304,18 @@ def save_state(self) -> dict: """ obj_dict = {} for param in self: - obj_dict[param.name] = {'state': param.save_state(), 'class': param.__class__.__name__} + obj_dict[param.name] = {'class': param.__class__.__name__, + 'state': param.save_state()} + + if getattr(self, 'linear_constraints', []): + obj_dict['linear_constraints'] = [{'class': const.__class__.__name__, + 'state': tensordict_to_dict(const.state_dict())} + for const in self.linear_constraints] + if getattr(self, 'nonlinear_constraints', []): + obj_dict['nonlinear_constraints'] = [{'class': const.__class__.__name__, + 'state': tensordict_to_dict(const.state_dict())} + for const in self.nonlinear_constraints] + return obj_dict @classmethod @@ -277,32 +331,22 @@ def load_state(cls, ParamSpace: A new ParamSpace object with the loaded state. """ - param_type_dict = {'Param_Continuous': Param_Continuous, - 'Param_Categorical': Param_Categorical, - 'Param_Ordinal': Param_Ordinal, - 'Param_Discrete_Numeric': Param_Discrete_Numeric, - 'Param_Observational': Param_Observational, - 'Task': Task} + params = [] for param, param_dict in obj_dict.items(): - param = param_type_dict[param_dict['class']].load_state(param_dict['state']) - params.append(param) - return cls(params=params) - - def mean(self) -> pd.DataFrame: - """ - Calculates the mean values for each parameter in the parameter space. - - Returns: - pd.DataFrame: A DataFrame containing the mean values for each parameter. - """ - row = {} - for param_i in self: - if isinstance(param_i, Param_Continuous): # Mean of continuous - row[param_i.name] = param_i.unit_demap([0.5])[0] - elif isinstance(param_i, Param_Discrete): # First of discrete - row[param_i.name] = param_i.categories[0] - - df_mean = pd.DataFrame([row]) - - return df_mean + if 'constraint' not in param: + param = param_class_dict[param_dict['class']].load_state(param_dict['state']) + params.append(param) + + new_X_space = cls(params=params) + + if 'linear_constraints' in obj_dict: + for const_dict in obj_dict['linear_constraints']: + const = const_class_dict[const_dict['class']](new_X_space, **const_dict['state']) + new_X_space.constrain_inputs(const) + if 'nonlinear_constraints' in obj_dict: + for const_dict in obj_dict['nonlinear_constraints']: + const = const_class_dict[const_dict['class']](new_X_space, **const_dict['state']) + new_X_space.constrain_inputs(const) + + return new_X_space diff --git a/obsidian/tests/test_constraints.py b/obsidian/tests/test_constraints.py index 31b5049..66d52b9 100644 --- a/obsidian/tests/test_constraints.py +++ b/obsidian/tests/test_constraints.py @@ -2,10 +2,10 @@ from obsidian.campaign import Campaign from obsidian.constraints import ( - OutConstraint_Blank, - InConstraint_Generic, - InConstraint_ConstantDim, - OutConstraint_L1 + Linear_Constraint, + BatchVariance_Constraint, + Blank_Constraint, + L1_Constraint ) from obsidian.tests.utils import DEFAULT_MOO_PATH @@ -22,42 +22,52 @@ X_space = campaign.X_space target = campaign.target -test_ineq = [[InConstraint_Generic(X_space, indices=[0, 1], coeff=[1, 1], rhs=5)]] -test_nleq = [[InConstraint_ConstantDim(X_space, dim=0, tol=0.1)]] -test_out = [[OutConstraint_Blank(target)], [OutConstraint_L1(target, offset=1)]] +test_linear = [ + Linear_Constraint(X_space, ind=[0], weights=[1], rhs=5, equality=True), + Linear_Constraint(X_space, ind=[0, 1], weights=[1, 1], rhs=5) +] +test_nonlinear = [BatchVariance_Constraint(X_space, ind=0, tol=0.1)] +test_out = [Blank_Constraint(target), L1_Constraint(target, offset=1)] # Run very short optimizations for testing test_config = {'optim_samples': 2, 'optim_restarts': 2} -@pytest.mark.parametrize('ineq_constraints', test_ineq) -def test_ineq_constraints(ineq_constraints): - X_suggest, eval_suggest = optimizer.suggest(ineq_constraints=ineq_constraints, - **test_config) +@pytest.mark.parametrize('out_const', test_out) +def test_out_constraints(out_const): + out_const.__repr__() + campaign.constrain_outputs(out_const) + X_suggest, eval_suggest = campaign.optimizer.suggest(**test_config) df_suggest = pd.concat([X_suggest, eval_suggest], axis=1) - - -@pytest.mark.parametrize('nleq_constraints', test_nleq) -def test_nleq_constraints(nleq_constraints): - X_suggest, eval_suggest = optimizer.suggest(nleq_constraints=nleq_constraints, - **test_config) + campaign.clear_output_constraints() + + +@pytest.mark.parametrize('lin_const', test_linear) +def test_ineq_constraints(lin_const): + lin_const.__repr__() + optimizer.X_space.constrain_inputs(lin_const) + X_suggest, eval_suggest = optimizer.suggest(**test_config) df_suggest = pd.concat([X_suggest, eval_suggest], axis=1) + optimizer.X_space.clear_constraints() -@pytest.mark.parametrize('out_constraints', test_out) -def test_out_constraints(out_constraints): - X_suggest, eval_suggest = optimizer.suggest(out_constraints=out_constraints, - **test_config) +@pytest.mark.parametrize('nl_const', test_nonlinear) +def test_nleq_constraints(nl_const): + nl_const.__repr__() + optimizer.X_space.constrain_inputs(nl_const) + X_suggest, eval_suggest = optimizer.suggest(**test_config) df_suggest = pd.concat([X_suggest, eval_suggest], axis=1) - + optimizer.X_space.clear_constraints() + @pytest.mark.slow def test_combo_constraints(): - X_suggest, eval_suggest = optimizer.suggest(ineq_constraints=test_ineq[0], - nleq_constraints=test_nleq[0], + X_suggest, eval_suggest = optimizer.suggest(ineq_constraints=test_linear[1], + nleq_constraints=test_nonlinear[0], out_constraints=test_out[0], **test_config) df_suggest = pd.concat([X_suggest, eval_suggest], axis=1) + optimizer.X_space.clear_constraints() if __name__ == '__main__': diff --git a/obsidian/utils.py b/obsidian/utils.py index 6c018bf..bbe4939 100644 --- a/obsidian/utils.py +++ b/obsidian/utils.py @@ -6,7 +6,7 @@ def unscale_samples(samples: Tensor, - target: Target | list[Target]) -> Tensor: + target: list[Target]) -> Tensor: """ Unscale the scaled samples based on the given target(s). @@ -23,17 +23,9 @@ def unscale_samples(samples: Tensor, ValueError: If the number of constraint targets does not match the number of output dimensions. """ - if not isinstance(target, (Target, list)): - raise TypeError('Target must be a Target object or a list of Target objects') - if isinstance(target, list): - for t in target: - if not isinstance(t, Target): - raise TypeError('Target must be a Target object or a list of Target objects') - + # samples = sample_shape x batch_shape (x q) x m shape = samples.shape - if isinstance(target, Target): - target = [target] if shape[-1] != len(target): raise ValueError('Number of constraint targets must match number of output dimensions') for i, t in enumerate(target):