Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add convergence early stop #16

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions cognify/optimizer/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,27 @@ def eq_transform_path(self, other: dict[str, str]) -> bool:

@dataclass
class OptConfig:
"""Configuration for optimization of each layer

Attributes:
n_trials (int): number of iterations of search.

throughput (int, optional): number of trials to run in parallel. Defaults to 2.

log_dir (str): directory to save logs.

evolve_interval (int): interval to evolve the dynamic cogs.

opt_log_path (str): path to save optimization logs.

param_save_path (str): path to save optimized parameters.

frugal_eval_cost (bool): whether to favor cheaper evaluations in early stage.

use_SH_allocation (bool): whether to use Successive Halving strategy.

patience (tuple[float,float,int], optional): tuple of (quality_min_delta, cost_min_delta, n_iteration) to set the early stop threshold.
"""
n_trials: int
throughput: int = field(default=2)
log_dir: str = field(default=None)
Expand All @@ -129,6 +150,12 @@ class OptConfig:
param_save_path: str = field(default=None)
frugal_eval_cost: bool = field(default=True)
use_SH_allocation: bool = field(default=False)
patience: tuple[float,float,int] = field(default=(0.01,0.01,5))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add more description to what each element in the tuple means. better with a macro, at least add a comment here and when you use it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added doc string for this class

jiange91 marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self):
quality_delta, cost_delta, n_iteration = self.patience
if quality_delta < 0 or cost_delta < 0 or n_iteration < 0:
raise ValueError("patience values should be non-negative")

def finalize(self):
if not os.path.exists(self.log_dir):
Expand All @@ -143,6 +170,18 @@ def update(self, other: "OptConfig"):
for key, value in other.__dict__.items():
if value is not None:
setattr(self, key, value)

@property
def _early_stop_quality_delta(self):
return self.patience[0]

@property
def _early_stop_cost_delta(self):
return self.patience[1]

@property
def _early_stop_n_iteration(self):
return self.patience[2]


@dataclass
Expand Down
63 changes: 45 additions & 18 deletions cognify/optimizer/core/unified_layer_opt.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ def __init__(
self.base_quality = base_quality
self.base_cost = base_cost
self.hierarchy_level = hierarchy_level
self._should_stop = False # flag to early stop when convergence
self._patience_budget = None # number of iterations to wait for improvement before early stop

def prepare_opt_env(self):
self.params = defaultdict(list)
Expand Down Expand Up @@ -385,7 +387,35 @@ def add_constraint(self, score, trial: optuna.trial.Trial):
trial.set_user_attr(qc_identifier, constraint_result)
# NOTE: add system attr at loading time
# trial.set_system_attr(_base._CONSTRAINTS_KEY, constraint_result)


def _update_best_trial(self, eval_result: EvaluationResult):
with self._study_lock:
current_score, current_cost = self.get_eval_feedback(eval_result)
if not self._should_stop and self._patience_budget is not None:
impv = False
score_threshold = self.top_down_info.opt_config._early_stop_quality_delta
cost_threshold = self.top_down_info.opt_config._early_stop_cost_delta
# reset if score or cost is improved
if current_score is not None and current_score >= self._best_score * (1 + score_threshold):
self._patience_budget = self.top_down_info.opt_config._early_stop_n_iteration
impv = True
if current_cost is not None and current_cost <= self._lowest_cost * (1 - cost_threshold):
self._patience_budget = self.top_down_info.opt_config._early_stop_n_iteration
impv = True
if not impv:
self._patience_budget -= 1
# early stop if patience budget is used up
if self._patience_budget <= 0:
self._should_stop = True

if current_score is not None and current_cost is not None:
self._best_score = (
current_score if self._best_score is None else max(self._best_score, current_score)
)
self._lowest_cost = (
current_cost if self._lowest_cost is None else min(self._lowest_cost, current_cost)
)

def update(
self,
trial: optuna.trial.Trial,
Expand All @@ -405,6 +435,8 @@ def update(
f"- {self.name} - Trial {trial.number} result: score= {score:.2f}, cost@1000= {price*1000:.3f}"
)
self.opt_cost += eval_result.total_eval_cost

self._update_best_trial(eval_result)

# update study if any dynamic params can evolve
with self._study_lock:
Expand Down Expand Up @@ -529,18 +561,10 @@ def _optimize(self, base_program: list[Module]):
num_current_trials = len(self.opt_logs)
pbar_position = ask_for_position()

def _update_pbar(pbar, eval_result: EvaluationResult):
score, cost = self.get_eval_feedback(eval_result)
if score is not None and cost is not None:
self._best_score = (
score if self._best_score is None else max(self._best_score, score)
)
self._lowest_cost = (
cost if self._lowest_cost is None else min(self._lowest_cost, cost)
)
pbar.set_description(
self._gen_opt_bar_desc(self._best_score, self._lowest_cost, self.opt_cost)
)
def _update_pbar(pbar):
pbar.set_description(
self._gen_opt_bar_desc(self._best_score, self._lowest_cost, self.opt_cost)
)
pbar.update(1)

initial_score = self._best_score if self._best_score is not None else 0.0
Expand All @@ -555,7 +579,7 @@ def _update_pbar(pbar, eval_result: EvaluationResult):
counter = 0
if opt_config.throughput == 1:
for _ in range(opt_config.n_trials):
if _should_exit():
if _should_exit() or self._should_stop:
break
result = self._optimize_iteration(base_program)
if result is None or not result.complete:
Expand All @@ -568,7 +592,7 @@ def _update_pbar(pbar, eval_result: EvaluationResult):
self.save_ckpt(
opt_config.opt_log_path, opt_config.param_save_path
)
_update_pbar(pbar, result)
_update_pbar(pbar)
else:
with ThreadPoolExecutor(max_workers=opt_config.throughput) as executor:
futures = [
Expand All @@ -588,8 +612,8 @@ def _update_pbar(pbar, eval_result: EvaluationResult):
opt_config.opt_log_path,
opt_config.param_save_path,
)
_update_pbar(pbar, result)
if _should_exit():
_update_pbar(pbar)
if _should_exit() or self._should_stop:
executor.shutdown(wait=False, cancel_futures=True)
break
except Exception as e:
Expand Down Expand Up @@ -710,6 +734,8 @@ def optimize(
# prepare optimization environment
current_tdi.initialize()
self.top_down_info = current_tdi
if current_tdi.opt_config.patience[2] > 0:
self._patience_budget = current_tdi.opt_config.patience[2]
self.prepare_opt_env()

# load previous optimization logs if exists
Expand Down Expand Up @@ -817,7 +843,8 @@ def update(
f"- {self.name} - Trial {trial.number} result: score= {score:.2f}, cost@1000= {price*1000:.3f}"
)
self.opt_cost += eval_result.total_eval_cost


self._update_best_trial(eval_result)
# update study if any dynamic params can evolve
with self._study_lock:
self.add_constraint(score, trial)
Expand Down
3 changes: 2 additions & 1 deletion cognify/optimizer/core/upper_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def run_and_prune(self):
key=lambda i: (-outer_indicators[i][0], outer_indicators[i][1]),
)
runs_left_to_run = sorted_indicator_indices[: len(self.ready_to_run) // 2]
# TODO: consider remove early stopped runs from ready list
self.ready_to_run = [self.ready_to_run[i] for i in runs_left_to_run]

def execute(self):
Expand Down Expand Up @@ -268,7 +269,7 @@ def _optimize(self, base_program):
opt_config = self.top_down_info.opt_config
n_iters = opt_config.n_trials // opt_config.throughput
for i in range(n_iters):
if _should_exit():
if _should_exit() or self._should_stop:
break
self._optimize_SH(base_program)
if _should_exit():
Expand Down