diff --git a/cognify/optimizer/core/flow.py b/cognify/optimizer/core/flow.py index dda2234..a5f91c2 100644 --- a/cognify/optimizer/core/flow.py +++ b/cognify/optimizer/core/flow.py @@ -121,6 +121,27 @@ def eq_transform_path(self, other: dict[str, str]) -> bool: @dataclass class OptConfig: + """Configuration for optimization of each layer + + Attributes: + n_trials (int): number of iterations of search. + + throughput (int, optional): number of trials to run in parallel. Defaults to 2. + + log_dir (str): directory to save logs. + + evolve_interval (int): interval to evolve the dynamic cogs. + + opt_log_path (str): path to save optimization logs. + + param_save_path (str): path to save optimized parameters. + + frugal_eval_cost (bool): whether to favor cheaper evaluations in early stage. + + use_SH_allocation (bool): whether to use Successive Halving strategy. + + patience (tuple[float,float,int], optional): tuple of (quality_min_delta, cost_min_delta, n_iteration) to set the early stop threshold. + """ n_trials: int throughput: int = field(default=2) log_dir: str = field(default=None) @@ -129,6 +150,12 @@ class OptConfig: param_save_path: str = field(default=None) frugal_eval_cost: bool = field(default=True) use_SH_allocation: bool = field(default=False) + patience: tuple[float,float,int] = field(default=(0.01,0.01,5)) + + def __post_init__(self): + quality_delta, cost_delta, n_iteration = self.patience + if quality_delta < 0 or cost_delta < 0 or n_iteration < 0: + raise ValueError("patience values should be non-negative") def finalize(self): if not os.path.exists(self.log_dir): @@ -143,6 +170,18 @@ def update(self, other: "OptConfig"): for key, value in other.__dict__.items(): if value is not None: setattr(self, key, value) + + @property + def _early_stop_quality_delta(self): + return self.patience[0] + + @property + def _early_stop_cost_delta(self): + return self.patience[1] + + @property + def _early_stop_n_iteration(self): + return self.patience[2] @dataclass diff --git a/cognify/optimizer/core/unified_layer_opt.py b/cognify/optimizer/core/unified_layer_opt.py index ba77f23..753fe6b 100644 --- a/cognify/optimizer/core/unified_layer_opt.py +++ b/cognify/optimizer/core/unified_layer_opt.py @@ -149,6 +149,8 @@ def __init__( self.base_quality = base_quality self.base_cost = base_cost self.hierarchy_level = hierarchy_level + self._should_stop = False # flag to early stop when convergence + self._patience_budget = None # number of iterations to wait for improvement before early stop def prepare_opt_env(self): self.params = defaultdict(list) @@ -385,7 +387,35 @@ def add_constraint(self, score, trial: optuna.trial.Trial): trial.set_user_attr(qc_identifier, constraint_result) # NOTE: add system attr at loading time # trial.set_system_attr(_base._CONSTRAINTS_KEY, constraint_result) - + + def _update_best_trial(self, eval_result: EvaluationResult): + with self._study_lock: + current_score, current_cost = self.get_eval_feedback(eval_result) + if not self._should_stop and self._patience_budget is not None: + impv = False + score_threshold = self.top_down_info.opt_config._early_stop_quality_delta + cost_threshold = self.top_down_info.opt_config._early_stop_cost_delta + # reset if score or cost is improved + if current_score is not None and current_score >= self._best_score * (1 + score_threshold): + self._patience_budget = self.top_down_info.opt_config._early_stop_n_iteration + impv = True + if current_cost is not None and current_cost <= self._lowest_cost * (1 - cost_threshold): + self._patience_budget = self.top_down_info.opt_config._early_stop_n_iteration + impv = True + if not impv: + self._patience_budget -= 1 + # early stop if patience budget is used up + if self._patience_budget <= 0: + self._should_stop = True + + if current_score is not None and current_cost is not None: + self._best_score = ( + current_score if self._best_score is None else max(self._best_score, current_score) + ) + self._lowest_cost = ( + current_cost if self._lowest_cost is None else min(self._lowest_cost, current_cost) + ) + def update( self, trial: optuna.trial.Trial, @@ -405,6 +435,8 @@ def update( f"- {self.name} - Trial {trial.number} result: score= {score:.2f}, cost@1000= {price*1000:.3f}" ) self.opt_cost += eval_result.total_eval_cost + + self._update_best_trial(eval_result) # update study if any dynamic params can evolve with self._study_lock: @@ -529,18 +561,10 @@ def _optimize(self, base_program: list[Module]): num_current_trials = len(self.opt_logs) pbar_position = ask_for_position() - def _update_pbar(pbar, eval_result: EvaluationResult): - score, cost = self.get_eval_feedback(eval_result) - if score is not None and cost is not None: - self._best_score = ( - score if self._best_score is None else max(self._best_score, score) - ) - self._lowest_cost = ( - cost if self._lowest_cost is None else min(self._lowest_cost, cost) - ) - pbar.set_description( - self._gen_opt_bar_desc(self._best_score, self._lowest_cost, self.opt_cost) - ) + def _update_pbar(pbar): + pbar.set_description( + self._gen_opt_bar_desc(self._best_score, self._lowest_cost, self.opt_cost) + ) pbar.update(1) initial_score = self._best_score if self._best_score is not None else 0.0 @@ -555,7 +579,7 @@ def _update_pbar(pbar, eval_result: EvaluationResult): counter = 0 if opt_config.throughput == 1: for _ in range(opt_config.n_trials): - if _should_exit(): + if _should_exit() or self._should_stop: break result = self._optimize_iteration(base_program) if result is None or not result.complete: @@ -568,7 +592,7 @@ def _update_pbar(pbar, eval_result: EvaluationResult): self.save_ckpt( opt_config.opt_log_path, opt_config.param_save_path ) - _update_pbar(pbar, result) + _update_pbar(pbar) else: with ThreadPoolExecutor(max_workers=opt_config.throughput) as executor: futures = [ @@ -588,8 +612,8 @@ def _update_pbar(pbar, eval_result: EvaluationResult): opt_config.opt_log_path, opt_config.param_save_path, ) - _update_pbar(pbar, result) - if _should_exit(): + _update_pbar(pbar) + if _should_exit() or self._should_stop: executor.shutdown(wait=False, cancel_futures=True) break except Exception as e: @@ -710,6 +734,8 @@ def optimize( # prepare optimization environment current_tdi.initialize() self.top_down_info = current_tdi + if current_tdi.opt_config.patience[2] > 0: + self._patience_budget = current_tdi.opt_config.patience[2] self.prepare_opt_env() # load previous optimization logs if exists @@ -817,7 +843,8 @@ def update( f"- {self.name} - Trial {trial.number} result: score= {score:.2f}, cost@1000= {price*1000:.3f}" ) self.opt_cost += eval_result.total_eval_cost - + + self._update_best_trial(eval_result) # update study if any dynamic params can evolve with self._study_lock: self.add_constraint(score, trial) diff --git a/cognify/optimizer/core/upper_layer.py b/cognify/optimizer/core/upper_layer.py index 9ed4eee..07a30ff 100644 --- a/cognify/optimizer/core/upper_layer.py +++ b/cognify/optimizer/core/upper_layer.py @@ -136,6 +136,7 @@ def run_and_prune(self): key=lambda i: (-outer_indicators[i][0], outer_indicators[i][1]), ) runs_left_to_run = sorted_indicator_indices[: len(self.ready_to_run) // 2] + # TODO: consider remove early stopped runs from ready list self.ready_to_run = [self.ready_to_run[i] for i in runs_left_to_run] def execute(self): @@ -268,7 +269,7 @@ def _optimize(self, base_program): opt_config = self.top_down_info.opt_config n_iters = opt_config.n_trials // opt_config.throughput for i in range(n_iters): - if _should_exit(): + if _should_exit() or self._should_stop: break self._optimize_SH(base_program) if _should_exit():