From 58464b926d7405e88c18fee6271b6dc395eabeef Mon Sep 17 00:00:00 2001 From: Sergiy Matusevych Date: Mon, 29 Apr 2024 10:27:53 -0700 Subject: [PATCH] Pass multiple metrics into the optimizer (#723) We will need this for multi-objective optimization as well as for training the optimizers that can take muti-dimensional input * [x] Make `Storage.Experiment.load()` return multiple scores * [x] Fix unit tests to check for loading multi-dimensional scores from the DB * [x] Make `Optimizer.register()` and `.bulk_register()` take multi-dimensional trial scores * [x] Fix the Optimizer unit tests to check for registering multi-dimensional scores * [x] Check the Scheduler and optimization loop unit tests to see if we need to adjust the types etc. **NOTE:** In this PR, we _do not_ change mlos_core: we will still pass a single scalar into it and do not change the API on mlos_core side. We will change mlos_core in the subsequent PR to minimize the diff. Part of #692 --- .../mlos_bench/optimizers/base_optimizer.py | 9 ++- .../optimizers/grid_search_optimizer.py | 11 +-- .../optimizers/mlos_core_optimizer.py | 11 ++- .../mlos_bench/optimizers/mock_optimizer.py | 11 +-- mlos_bench/mlos_bench/storage/base_storage.py | 10 +-- .../mlos_bench/storage/sql/experiment.py | 72 +++++++++++-------- .../optimizers/grid_search_optimizer_test.py | 2 +- .../optimizers/opt_bulk_register_test.py | 66 +++++++++++------ .../mlos_bench/tests/storage/exp_load_test.py | 4 +- 9 files changed, 119 insertions(+), 77 deletions(-) diff --git a/mlos_bench/mlos_bench/optimizers/base_optimizer.py b/mlos_bench/mlos_bench/optimizers/base_optimizer.py index 21818695190..5825c3467c9 100644 --- a/mlos_bench/mlos_bench/optimizers/base_optimizer.py +++ b/mlos_bench/mlos_bench/optimizers/base_optimizer.py @@ -20,6 +20,7 @@ from mlos_bench.config.schemas import ConfigSchema from mlos_bench.services.base_service import Service from mlos_bench.environments.status import Status +from mlos_bench.tunables.tunable import TunableValue from mlos_bench.tunables.tunable_groups import TunableGroups from mlos_bench.optimizers.convert_configspace import tunable_groups_to_configspace @@ -223,7 +224,9 @@ def supports_preload(self) -> bool: return True @abstractmethod - def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]], + def bulk_register(self, + configs: Sequence[dict], + scores: Sequence[Optional[Dict[str, TunableValue]]], status: Optional[Sequence[Status]] = None) -> bool: """ Pre-load the optimizer with the bulk data from previous experiments. @@ -232,9 +235,9 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float ---------- configs : Sequence[dict] Records of tunable values from other experiments. - scores : Sequence[float] + scores : Sequence[Optional[Dict[str, TunableValue]]] Benchmark results from experiments that correspond to `configs`. - status : Optional[Sequence[float]] + status : Optional[Sequence[Status]] Status of the experiments that correspond to `configs`. Returns diff --git a/mlos_bench/mlos_bench/optimizers/grid_search_optimizer.py b/mlos_bench/mlos_bench/optimizers/grid_search_optimizer.py index 72385d75040..52785ae7438 100644 --- a/mlos_bench/mlos_bench/optimizers/grid_search_optimizer.py +++ b/mlos_bench/mlos_bench/optimizers/grid_search_optimizer.py @@ -20,7 +20,6 @@ from mlos_bench.optimizers.track_best_optimizer import TrackBestOptimizer from mlos_bench.optimizers.convert_configspace import configspace_data_to_tunable_values from mlos_bench.services.base_service import Service -from mlos_bench.util import nullable _LOG = logging.getLogger(__name__) @@ -108,7 +107,9 @@ def suggested_configs(self) -> Iterable[Dict[str, TunableValue]]: # See NOTEs above. return (dict(zip(self._config_keys, config)) for config in self._suggested_configs) - def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]], + def bulk_register(self, + configs: Sequence[dict], + scores: Sequence[Optional[Dict[str, TunableValue]]], status: Optional[Sequence[Status]] = None) -> bool: if not super().bulk_register(configs, scores, status): return False @@ -116,10 +117,10 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float status = [Status.SUCCEEDED] * len(configs) for (params, score, trial_status) in zip(configs, scores, status): tunables = self._tunables.copy().assign(params) - self.register(tunables, trial_status, nullable(float, score)) + self.register(tunables, trial_status, score) if _LOG.isEnabledFor(logging.DEBUG): - (score, _) = self.get_best_observation() - _LOG.debug("Update end: %s = %s", self.target, score) + (best_score, _) = self.get_best_observation() + _LOG.debug("Update end: %s = %s", self.target, best_score) return True def suggest(self) -> TunableGroups: diff --git a/mlos_bench/mlos_bench/optimizers/mlos_core_optimizer.py b/mlos_bench/mlos_bench/optimizers/mlos_core_optimizer.py index dfc51d44321..2db7c04bd12 100644 --- a/mlos_bench/mlos_bench/optimizers/mlos_core_optimizer.py +++ b/mlos_bench/mlos_bench/optimizers/mlos_core_optimizer.py @@ -90,12 +90,16 @@ def __exit__(self, ex_type: Optional[Type[BaseException]], def name(self) -> str: return f"{self.__class__.__name__}:{self._opt.__class__.__name__}" - def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]], + def bulk_register(self, + configs: Sequence[dict], + scores: Sequence[Optional[Dict[str, TunableValue]]], status: Optional[Sequence[Status]] = None) -> bool: if not super().bulk_register(configs, scores, status): return False df_configs = self._to_df(configs) # Impute missing values, if necessary - df_scores = pd.Series(scores, dtype=float) * self._opt_sign + df_scores = pd.Series( + [self._extract_target(score) for score in scores], + dtype=float) * self._opt_sign if status is not None: df_status = pd.Series(status) df_scores[df_status != Status.SUCCEEDED] = float("inf") @@ -108,6 +112,9 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float _LOG.debug("Warm-up end: %s = %s", self.target, score) return True + def _extract_target(self, scores: Optional[Dict[str, TunableValue]]) -> Optional[TunableValue]: + return None if scores is None else scores[self._opt_target] + def _to_df(self, configs: Sequence[Dict[str, TunableValue]]) -> pd.DataFrame: """ Select from past trials only the columns required in this experiment and diff --git a/mlos_bench/mlos_bench/optimizers/mock_optimizer.py b/mlos_bench/mlos_bench/optimizers/mock_optimizer.py index 1f73877b1b2..b8ff69aafba 100644 --- a/mlos_bench/mlos_bench/optimizers/mock_optimizer.py +++ b/mlos_bench/mlos_bench/optimizers/mock_optimizer.py @@ -17,7 +17,6 @@ from mlos_bench.optimizers.track_best_optimizer import TrackBestOptimizer from mlos_bench.services.base_service import Service -from mlos_bench.util import nullable _LOG = logging.getLogger(__name__) @@ -40,7 +39,9 @@ def __init__(self, "int": lambda tunable: rnd.randint(*tunable.range), } - def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]], + def bulk_register(self, + configs: Sequence[dict], + scores: Sequence[Optional[Dict[str, TunableValue]]], status: Optional[Sequence[Status]] = None) -> bool: if not super().bulk_register(configs, scores, status): return False @@ -48,10 +49,10 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float status = [Status.SUCCEEDED] * len(configs) for (params, score, trial_status) in zip(configs, scores, status): tunables = self._tunables.copy().assign(params) - self.register(tunables, trial_status, nullable(float, score)) + self.register(tunables, trial_status, score) if _LOG.isEnabledFor(logging.DEBUG): - (score, _) = self.get_best_observation() - _LOG.debug("Bulk register end: %s = %s", self.target, score) + (best_score, _) = self.get_best_observation() + _LOG.debug("Bulk register end: %s = %s", self.target, best_score) return True def suggest(self) -> TunableGroups: diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index e8bc9cdcac8..5d3574ac83f 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -256,10 +256,8 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]: """ @abstractmethod - def load(self, - last_trial_id: int = -1, - opt_target: Optional[str] = None - ) -> Tuple[List[int], List[dict], List[Optional[float]], List[Status]]: + def load(self, last_trial_id: int = -1, + ) -> Tuple[List[int], List[dict], List[Optional[Dict[str, Any]]], List[Status]]: """ Load (tunable values, benchmark scores, status) to warm-up the optimizer. @@ -271,12 +269,10 @@ def load(self, ---------- last_trial_id : int (Optional) Trial ID to start from. - opt_target : Optional[str] - Name of the optimization target. Returns ------- - (trial_ids, configs, scores, status) : ([dict], [Optional[float]], [Status]) + (trial_ids, configs, scores, status) : ([int], [dict], [Optional[dict]], [Status]) Trial ids, Tunable values, benchmark scores, and status of the trials. """ diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index f5737b40ed3..e73e4865146 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -13,7 +13,7 @@ from pytz import UTC -from sqlalchemy import Engine, Connection, Table, column, func +from sqlalchemy import Engine, Connection, CursorResult, Table, column, func, select from mlos_bench.environments.status import Status from mlos_bench.tunables.tunable_groups import TunableGroups @@ -109,7 +109,7 @@ def merge(self, experiment_ids: List[str]) -> None: def load_tunable_config(self, config_id: int) -> Dict[str, Any]: with self._engine.connect() as conn: - return self._get_params(conn, self._schema.config_param, config_id=config_id) + return self._get_key_val(conn, self._schema.config_param, "param", config_id=config_id) def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]: with self._engine.connect() as conn: @@ -127,50 +127,60 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]: return [(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value) for row in cur_telemetry.fetchall()] - def load(self, - last_trial_id: int = -1, - opt_target: Optional[str] = None - ) -> Tuple[List[int], List[dict], List[Optional[float]], List[Status]]: - opt_target = opt_target or self._opt_target - (trial_ids, configs, scores, status) = ([], [], [], []) + def load(self, last_trial_id: int = -1, + ) -> Tuple[List[int], List[dict], List[Optional[Dict[str, Any]]], List[Status]]: + with self._engine.connect() as conn: cur_trials = conn.execute( self._schema.trial.select().with_only_columns( self._schema.trial.c.trial_id, self._schema.trial.c.config_id, self._schema.trial.c.status, - self._schema.trial_result.c.metric_value, - ).join( - self._schema.trial_result, ( - (self._schema.trial.c.exp_id == self._schema.trial_result.c.exp_id) & - (self._schema.trial.c.trial_id == self._schema.trial_result.c.trial_id) - ), isouter=True ).where( self._schema.trial.c.exp_id == self._experiment_id, self._schema.trial.c.trial_id > last_trial_id, self._schema.trial.c.status.in_(['SUCCEEDED', 'FAILED', 'TIMED_OUT']), - (self._schema.trial_result.c.metric_id.is_(None) | - (self._schema.trial_result.c.metric_id == opt_target)), ).order_by( self._schema.trial.c.trial_id.asc(), ) ) - # Note: this iterative approach is somewhat expensive. - # TODO: Look into a better bulk fetch option. + + trial_ids: List[int] = [] + configs: List[Dict[str, Any]] = [] + scores: List[Optional[Dict[str, Any]]] = [] + status: List[Status] = [] + for trial in cur_trials.fetchall(): - tunables = self._get_params( - conn, self._schema.config_param, config_id=trial.config_id) + stat = Status[trial.status] + status.append(stat) trial_ids.append(trial.trial_id) - configs.append(tunables) - scores.append(nullable(float, trial.metric_value)) - status.append(Status[trial.status]) + configs.append(self._get_key_val( + conn, self._schema.config_param, "param", config_id=trial.config_id)) + if stat.is_succeeded(): + scores.append(self._get_key_val( + conn, self._schema.trial_result, "metric", + exp_id=self._experiment_id, trial_id=trial.trial_id)) + else: + scores.append(None) + return (trial_ids, configs, scores, status) @staticmethod - def _get_params(conn: Connection, table: Table, **kwargs: Any) -> Dict[str, Any]: - cur_params = conn.execute(table.select().where(*[ - column(key) == val for (key, val) in kwargs.items()])) - return {row.param_id: row.param_value for row in cur_params.fetchall()} + def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> Dict[str, Any]: + """ + Helper method to retrieve key-value pairs from the database. + (E.g., configurations, results, and telemetry). + """ + cur_result: CursorResult[Tuple[str, Any]] = conn.execute( + select( + column(f"{field}_id"), + column(f"{field}_value"), + ).select_from(table).where( + *[column(key) == val for (key, val) in kwargs.items()] + ) + ) + # NOTE: `Row._tuple()` is NOT a protected member; the class uses `_` to avoid naming conflicts. + return dict(row._tuple() for row in cur_result.fetchall()) # pylint: disable=protected-access @staticmethod def _save_params(conn: Connection, table: Table, @@ -202,11 +212,11 @@ def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Stor self._schema.trial.c.status.in_(pending_status), )) for trial in cur_trials.fetchall(): - tunables = self._get_params( - conn, self._schema.config_param, + tunables = self._get_key_val( + conn, self._schema.config_param, "param", config_id=trial.config_id) - config = self._get_params( - conn, self._schema.trial_param, + config = self._get_key_val( + conn, self._schema.trial_param, "param", exp_id=self._experiment_id, trial_id=trial.trial_id) yield Trial( engine=self._engine, diff --git a/mlos_bench/mlos_bench/tests/optimizers/grid_search_optimizer_test.py b/mlos_bench/mlos_bench/tests/optimizers/grid_search_optimizer_test.py index 878e6f6c0d8..c56b66b9e1c 100644 --- a/mlos_bench/mlos_bench/tests/optimizers/grid_search_optimizer_test.py +++ b/mlos_bench/mlos_bench/tests/optimizers/grid_search_optimizer_test.py @@ -239,7 +239,7 @@ def test_grid_search_async_order(grid_search_opt: GridSearchOptimizer) -> None: assert all(suggestion.get_param_values() not in suggested_shuffled for suggestion in suggested) grid_search_opt.bulk_register([suggestion.get_param_values() for suggestion in suggested], - [score] * len(suggested), + [{"score": score}] * len(suggested), [status] * len(suggested)) assert all(suggestion.get_param_values() not in grid_search_opt.pending_configs for suggestion in suggested) diff --git a/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py b/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py index 1128a97c85b..b287194411c 100644 --- a/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py +++ b/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py @@ -6,7 +6,7 @@ Unit tests for mock mlos_bench optimizer. """ -from typing import Optional, List +from typing import Dict, List, Optional import pytest @@ -14,6 +14,7 @@ from mlos_bench.optimizers.base_optimizer import Optimizer from mlos_bench.optimizers.mock_optimizer import MockOptimizer from mlos_bench.optimizers.mlos_core_optimizer import MlosCoreOptimizer +from mlos_bench.tunables.tunable import TunableValue # pylint: disable=redefined-outer-name @@ -31,11 +32,16 @@ def mock_configs_str(mock_configs: List[dict]) -> List[dict]: @pytest.fixture -def mock_scores() -> List[Optional[float]]: +def mock_scores() -> List[Optional[Dict[str, TunableValue]]]: """ Mock benchmark results from earlier experiments. """ - return [None, 88.88, 66.66, 99.99] + return [ + None, + {"score": 88.88}, + {"score": 66.66}, + {"score": 99.99}, + ] @pytest.fixture @@ -46,8 +52,10 @@ def mock_status() -> List[Status]: return [Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED] -def _test_opt_update_min(opt: Optimizer, configs: List[dict], - scores: List[float], status: Optional[List[Status]] = None) -> None: +def _test_opt_update_min(opt: Optimizer, + configs: List[dict], + scores: List[Optional[Dict[str, TunableValue]]], + status: Optional[List[Status]] = None) -> None: """ Test the bulk update of the optimizer on the minimization problem. """ @@ -63,8 +71,10 @@ def _test_opt_update_min(opt: Optimizer, configs: List[dict], } -def _test_opt_update_max(opt: Optimizer, configs: List[dict], - scores: List[float], status: Optional[List[Status]] = None) -> None: +def _test_opt_update_max(opt: Optimizer, + configs: List[dict], + scores: List[Optional[Dict[str, TunableValue]]], + status: Optional[List[Status]] = None) -> None: """ Test the bulk update of the optimizer on the maximization problem. """ @@ -80,8 +90,10 @@ def _test_opt_update_max(opt: Optimizer, configs: List[dict], } -def test_update_mock_min(mock_opt: MockOptimizer, mock_configs: List[dict], - mock_scores: List[float], mock_status: List[Status]) -> None: +def test_update_mock_min(mock_opt: MockOptimizer, + mock_configs: List[dict], + mock_scores: List[Optional[Dict[str, TunableValue]]], + mock_status: List[Status]) -> None: """ Test the bulk update of the mock optimizer on the minimization problem. """ @@ -95,48 +107,60 @@ def test_update_mock_min(mock_opt: MockOptimizer, mock_configs: List[dict], } -def test_update_mock_min_str(mock_opt: MockOptimizer, mock_configs_str: List[dict], - mock_scores: List[float], mock_status: List[Status]) -> None: +def test_update_mock_min_str(mock_opt: MockOptimizer, + mock_configs_str: List[dict], + mock_scores: List[Optional[Dict[str, TunableValue]]], + mock_status: List[Status]) -> None: """ Test the bulk update of the mock optimizer with all-strings data. """ _test_opt_update_min(mock_opt, mock_configs_str, mock_scores, mock_status) -def test_update_mock_max(mock_opt_max: MockOptimizer, mock_configs: List[dict], - mock_scores: List[float], mock_status: List[Status]) -> None: +def test_update_mock_max(mock_opt_max: MockOptimizer, + mock_configs: List[dict], + mock_scores: List[Optional[Dict[str, TunableValue]]], + mock_status: List[Status]) -> None: """ Test the bulk update of the mock optimizer on the maximization problem. """ _test_opt_update_max(mock_opt_max, mock_configs, mock_scores, mock_status) -def test_update_flaml(flaml_opt: MlosCoreOptimizer, mock_configs: List[dict], - mock_scores: List[float], mock_status: List[Status]) -> None: +def test_update_flaml(flaml_opt: MlosCoreOptimizer, + mock_configs: List[dict], + mock_scores: List[Optional[Dict[str, TunableValue]]], + mock_status: List[Status]) -> None: """ Test the bulk update of the FLAML optimizer. """ _test_opt_update_min(flaml_opt, mock_configs, mock_scores, mock_status) -def test_update_flaml_max(flaml_opt_max: MlosCoreOptimizer, mock_configs: List[dict], - mock_scores: List[float], mock_status: List[Status]) -> None: +def test_update_flaml_max(flaml_opt_max: MlosCoreOptimizer, + mock_configs: List[dict], + mock_scores: List[Optional[Dict[str, TunableValue]]], + mock_status: List[Status]) -> None: """ Test the bulk update of the FLAML optimizer. """ _test_opt_update_max(flaml_opt_max, mock_configs, mock_scores, mock_status) -def test_update_smac(smac_opt: MlosCoreOptimizer, mock_configs: List[dict], - mock_scores: List[float], mock_status: List[Status]) -> None: +def test_update_smac(smac_opt: MlosCoreOptimizer, + mock_configs: List[dict], + mock_scores: List[Optional[Dict[str, TunableValue]]], + mock_status: List[Status]) -> None: """ Test the bulk update of the SMAC optimizer. """ _test_opt_update_min(smac_opt, mock_configs, mock_scores, mock_status) -def test_update_smac_max(smac_opt_max: MlosCoreOptimizer, mock_configs: List[dict], - mock_scores: List[float], mock_status: List[Status]) -> None: +def test_update_smac_max(smac_opt_max: MlosCoreOptimizer, + mock_configs: List[dict], + mock_scores: List[Optional[Dict[str, TunableValue]]], + mock_status: List[Status]) -> None: """ Test the bulk update of the SMAC optimizer. """ diff --git a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py index bdab29a22e0..3552f0b30a1 100644 --- a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py +++ b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py @@ -115,7 +115,7 @@ def test_exp_trial_update_categ(exp_storage: Storage.Experiment, 'kernel_sched_migration_cost_ns': '-1', 'vmSize': 'Standard_B4ms' }], - [99.9], + [{"score": "99.9", "benchmark": "test"}], [Status.SUCCEEDED] ) @@ -156,7 +156,7 @@ def test_exp_trial_pending_3(exp_storage: Storage.Experiment, (trial_ids, configs, scores, status) = exp_storage.load() assert trial_ids == [trial_fail.trial_id, trial_succ.trial_id] assert len(configs) == 2 - assert scores == [None, score] + assert scores == [None, {"score": f"{score}"}] assert status == [Status.FAILED, Status.SUCCEEDED] assert tunable_groups.copy().assign(configs[0]).reset() == trial_fail.tunables assert tunable_groups.copy().assign(configs[1]).reset() == trial_succ.tunables