Skip to content

Commit

Permalink
Pass multiple metrics into the optimizer (microsoft#723)
Browse files Browse the repository at this point in the history
We will need this for multi-objective optimization as well as for
training the optimizers that can take muti-dimensional input

* [x] Make `Storage.Experiment.load()` return multiple scores
* [x] Fix unit tests to check for loading multi-dimensional scores from
the DB
* [x] Make `Optimizer.register()` and `.bulk_register()` take
multi-dimensional trial scores
* [x] Fix the Optimizer unit tests to check for registering
multi-dimensional scores
* [x] Check the Scheduler and optimization loop unit tests to see if we
need to adjust the types etc.

**NOTE:** In this PR, we _do not_ change mlos_core: we will still pass a
single scalar into it and do not change the API on mlos_core side. We
will change mlos_core in the subsequent PR to minimize the diff.

Part of microsoft#692
  • Loading branch information
motus authored Apr 29, 2024
1 parent a7a28ef commit 58464b9
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 77 deletions.
9 changes: 6 additions & 3 deletions mlos_bench/mlos_bench/optimizers/base_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from mlos_bench.config.schemas import ConfigSchema
from mlos_bench.services.base_service import Service
from mlos_bench.environments.status import Status
from mlos_bench.tunables.tunable import TunableValue
from mlos_bench.tunables.tunable_groups import TunableGroups
from mlos_bench.optimizers.convert_configspace import tunable_groups_to_configspace

Expand Down Expand Up @@ -223,7 +224,9 @@ def supports_preload(self) -> bool:
return True

@abstractmethod
def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]],
def bulk_register(self,
configs: Sequence[dict],
scores: Sequence[Optional[Dict[str, TunableValue]]],
status: Optional[Sequence[Status]] = None) -> bool:
"""
Pre-load the optimizer with the bulk data from previous experiments.
Expand All @@ -232,9 +235,9 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float
----------
configs : Sequence[dict]
Records of tunable values from other experiments.
scores : Sequence[float]
scores : Sequence[Optional[Dict[str, TunableValue]]]
Benchmark results from experiments that correspond to `configs`.
status : Optional[Sequence[float]]
status : Optional[Sequence[Status]]
Status of the experiments that correspond to `configs`.
Returns
Expand Down
11 changes: 6 additions & 5 deletions mlos_bench/mlos_bench/optimizers/grid_search_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from mlos_bench.optimizers.track_best_optimizer import TrackBestOptimizer
from mlos_bench.optimizers.convert_configspace import configspace_data_to_tunable_values
from mlos_bench.services.base_service import Service
from mlos_bench.util import nullable

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,18 +107,20 @@ def suggested_configs(self) -> Iterable[Dict[str, TunableValue]]:
# See NOTEs above.
return (dict(zip(self._config_keys, config)) for config in self._suggested_configs)

def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]],
def bulk_register(self,
configs: Sequence[dict],
scores: Sequence[Optional[Dict[str, TunableValue]]],
status: Optional[Sequence[Status]] = None) -> bool:
if not super().bulk_register(configs, scores, status):
return False
if status is None:
status = [Status.SUCCEEDED] * len(configs)
for (params, score, trial_status) in zip(configs, scores, status):
tunables = self._tunables.copy().assign(params)
self.register(tunables, trial_status, nullable(float, score))
self.register(tunables, trial_status, score)
if _LOG.isEnabledFor(logging.DEBUG):
(score, _) = self.get_best_observation()
_LOG.debug("Update end: %s = %s", self.target, score)
(best_score, _) = self.get_best_observation()
_LOG.debug("Update end: %s = %s", self.target, best_score)
return True

def suggest(self) -> TunableGroups:
Expand Down
11 changes: 9 additions & 2 deletions mlos_bench/mlos_bench/optimizers/mlos_core_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ def __exit__(self, ex_type: Optional[Type[BaseException]],
def name(self) -> str:
return f"{self.__class__.__name__}:{self._opt.__class__.__name__}"

def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]],
def bulk_register(self,
configs: Sequence[dict],
scores: Sequence[Optional[Dict[str, TunableValue]]],
status: Optional[Sequence[Status]] = None) -> bool:
if not super().bulk_register(configs, scores, status):
return False
df_configs = self._to_df(configs) # Impute missing values, if necessary
df_scores = pd.Series(scores, dtype=float) * self._opt_sign
df_scores = pd.Series(
[self._extract_target(score) for score in scores],
dtype=float) * self._opt_sign
if status is not None:
df_status = pd.Series(status)
df_scores[df_status != Status.SUCCEEDED] = float("inf")
Expand All @@ -108,6 +112,9 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float
_LOG.debug("Warm-up end: %s = %s", self.target, score)
return True

def _extract_target(self, scores: Optional[Dict[str, TunableValue]]) -> Optional[TunableValue]:
return None if scores is None else scores[self._opt_target]

def _to_df(self, configs: Sequence[Dict[str, TunableValue]]) -> pd.DataFrame:
"""
Select from past trials only the columns required in this experiment and
Expand Down
11 changes: 6 additions & 5 deletions mlos_bench/mlos_bench/optimizers/mock_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from mlos_bench.optimizers.track_best_optimizer import TrackBestOptimizer
from mlos_bench.services.base_service import Service
from mlos_bench.util import nullable

_LOG = logging.getLogger(__name__)

Expand All @@ -40,18 +39,20 @@ def __init__(self,
"int": lambda tunable: rnd.randint(*tunable.range),
}

def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]],
def bulk_register(self,
configs: Sequence[dict],
scores: Sequence[Optional[Dict[str, TunableValue]]],
status: Optional[Sequence[Status]] = None) -> bool:
if not super().bulk_register(configs, scores, status):
return False
if status is None:
status = [Status.SUCCEEDED] * len(configs)
for (params, score, trial_status) in zip(configs, scores, status):
tunables = self._tunables.copy().assign(params)
self.register(tunables, trial_status, nullable(float, score))
self.register(tunables, trial_status, score)
if _LOG.isEnabledFor(logging.DEBUG):
(score, _) = self.get_best_observation()
_LOG.debug("Bulk register end: %s = %s", self.target, score)
(best_score, _) = self.get_best_observation()
_LOG.debug("Bulk register end: %s = %s", self.target, best_score)
return True

def suggest(self) -> TunableGroups:
Expand Down
10 changes: 3 additions & 7 deletions mlos_bench/mlos_bench/storage/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,8 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]:
"""

@abstractmethod
def load(self,
last_trial_id: int = -1,
opt_target: Optional[str] = None
) -> Tuple[List[int], List[dict], List[Optional[float]], List[Status]]:
def load(self, last_trial_id: int = -1,
) -> Tuple[List[int], List[dict], List[Optional[Dict[str, Any]]], List[Status]]:
"""
Load (tunable values, benchmark scores, status) to warm-up the optimizer.
Expand All @@ -271,12 +269,10 @@ def load(self,
----------
last_trial_id : int
(Optional) Trial ID to start from.
opt_target : Optional[str]
Name of the optimization target.
Returns
-------
(trial_ids, configs, scores, status) : ([dict], [Optional[float]], [Status])
(trial_ids, configs, scores, status) : ([int], [dict], [Optional[dict]], [Status])
Trial ids, Tunable values, benchmark scores, and status of the trials.
"""

Expand Down
72 changes: 41 additions & 31 deletions mlos_bench/mlos_bench/storage/sql/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from pytz import UTC

from sqlalchemy import Engine, Connection, Table, column, func
from sqlalchemy import Engine, Connection, CursorResult, Table, column, func, select

from mlos_bench.environments.status import Status
from mlos_bench.tunables.tunable_groups import TunableGroups
Expand Down Expand Up @@ -109,7 +109,7 @@ def merge(self, experiment_ids: List[str]) -> None:

def load_tunable_config(self, config_id: int) -> Dict[str, Any]:
with self._engine.connect() as conn:
return self._get_params(conn, self._schema.config_param, config_id=config_id)
return self._get_key_val(conn, self._schema.config_param, "param", config_id=config_id)

def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]:
with self._engine.connect() as conn:
Expand All @@ -127,50 +127,60 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]:
return [(utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value)
for row in cur_telemetry.fetchall()]

def load(self,
last_trial_id: int = -1,
opt_target: Optional[str] = None
) -> Tuple[List[int], List[dict], List[Optional[float]], List[Status]]:
opt_target = opt_target or self._opt_target
(trial_ids, configs, scores, status) = ([], [], [], [])
def load(self, last_trial_id: int = -1,
) -> Tuple[List[int], List[dict], List[Optional[Dict[str, Any]]], List[Status]]:

with self._engine.connect() as conn:
cur_trials = conn.execute(
self._schema.trial.select().with_only_columns(
self._schema.trial.c.trial_id,
self._schema.trial.c.config_id,
self._schema.trial.c.status,
self._schema.trial_result.c.metric_value,
).join(
self._schema.trial_result, (
(self._schema.trial.c.exp_id == self._schema.trial_result.c.exp_id) &
(self._schema.trial.c.trial_id == self._schema.trial_result.c.trial_id)
), isouter=True
).where(
self._schema.trial.c.exp_id == self._experiment_id,
self._schema.trial.c.trial_id > last_trial_id,
self._schema.trial.c.status.in_(['SUCCEEDED', 'FAILED', 'TIMED_OUT']),
(self._schema.trial_result.c.metric_id.is_(None) |
(self._schema.trial_result.c.metric_id == opt_target)),
).order_by(
self._schema.trial.c.trial_id.asc(),
)
)
# Note: this iterative approach is somewhat expensive.
# TODO: Look into a better bulk fetch option.

trial_ids: List[int] = []
configs: List[Dict[str, Any]] = []
scores: List[Optional[Dict[str, Any]]] = []
status: List[Status] = []

for trial in cur_trials.fetchall():
tunables = self._get_params(
conn, self._schema.config_param, config_id=trial.config_id)
stat = Status[trial.status]
status.append(stat)
trial_ids.append(trial.trial_id)
configs.append(tunables)
scores.append(nullable(float, trial.metric_value))
status.append(Status[trial.status])
configs.append(self._get_key_val(
conn, self._schema.config_param, "param", config_id=trial.config_id))
if stat.is_succeeded():
scores.append(self._get_key_val(
conn, self._schema.trial_result, "metric",
exp_id=self._experiment_id, trial_id=trial.trial_id))
else:
scores.append(None)

return (trial_ids, configs, scores, status)

@staticmethod
def _get_params(conn: Connection, table: Table, **kwargs: Any) -> Dict[str, Any]:
cur_params = conn.execute(table.select().where(*[
column(key) == val for (key, val) in kwargs.items()]))
return {row.param_id: row.param_value for row in cur_params.fetchall()}
def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> Dict[str, Any]:
"""
Helper method to retrieve key-value pairs from the database.
(E.g., configurations, results, and telemetry).
"""
cur_result: CursorResult[Tuple[str, Any]] = conn.execute(
select(
column(f"{field}_id"),
column(f"{field}_value"),
).select_from(table).where(
*[column(key) == val for (key, val) in kwargs.items()]
)
)
# NOTE: `Row._tuple()` is NOT a protected member; the class uses `_` to avoid naming conflicts.
return dict(row._tuple() for row in cur_result.fetchall()) # pylint: disable=protected-access

@staticmethod
def _save_params(conn: Connection, table: Table,
Expand Down Expand Up @@ -202,11 +212,11 @@ def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Stor
self._schema.trial.c.status.in_(pending_status),
))
for trial in cur_trials.fetchall():
tunables = self._get_params(
conn, self._schema.config_param,
tunables = self._get_key_val(
conn, self._schema.config_param, "param",
config_id=trial.config_id)
config = self._get_params(
conn, self._schema.trial_param,
config = self._get_key_val(
conn, self._schema.trial_param, "param",
exp_id=self._experiment_id, trial_id=trial.trial_id)
yield Trial(
engine=self._engine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def test_grid_search_async_order(grid_search_opt: GridSearchOptimizer) -> None:
assert all(suggestion.get_param_values() not in suggested_shuffled for suggestion in suggested)

grid_search_opt.bulk_register([suggestion.get_param_values() for suggestion in suggested],
[score] * len(suggested),
[{"score": score}] * len(suggested),
[status] * len(suggested))

assert all(suggestion.get_param_values() not in grid_search_opt.pending_configs for suggestion in suggested)
Expand Down
Loading

0 comments on commit 58464b9

Please sign in to comment.