Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(runtime): filelock issues #161

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
a961144
fix: Local variable doesn't exist
eddiebergman Nov 28, 2024
2482552
optim: Recompute cost usage at sample, not in report
eddiebergman Nov 28, 2024
9df0351
fix(runtime): Allow multiple retries of creating/loading NePSState.
eddiebergman Nov 28, 2024
115dc9b
style: Fix pre-commit
eddiebergman Nov 28, 2024
8b274e6
ux(state): Improve error contents for VersionMistmatchError
eddiebergman Nov 28, 2024
b3f76bd
doc: Add comment on why we do not retry reporting
eddiebergman Nov 28, 2024
8c59d04
fix: Template string in error message
eddiebergman Nov 28, 2024
f4ef73b
fix: Don't reload configurations after checking `pending()` state
eddiebergman Nov 28, 2024
2b1e69b
doc: Type fix
eddiebergman Nov 28, 2024
8186289
optim: Reduce IO calls
eddiebergman Nov 28, 2024
b2c9eac
ux: Improve logging
eddiebergman Nov 28, 2024
cf8469f
fix: Bump retries on getting next task to 10
eddiebergman Nov 29, 2024
801f764
fix: Add time delays before retries
eddiebergman Nov 29, 2024
2b5fc11
refactor: Fixup name of error
eddiebergman Nov 29, 2024
906fd4f
fix: Add retry to checking if worker should stop
eddiebergman Nov 29, 2024
9c88668
optim: Reduce object creation overhead
eddiebergman Nov 29, 2024
f0f3f64
optim: Remove use of `set`, cheaper string op
eddiebergman Nov 29, 2024
8bf5051
feat: Allow setting of retries from ENV vars
eddiebergman Nov 29, 2024
62e130b
doc: Extra logging on issue w.r.t. trial already existing
eddiebergman Nov 29, 2024
5b54ce5
test: Fixup expectation on output of `get_all_trials()`
eddiebergman Nov 29, 2024
ee8e90e
fix: Add retry counter to checking if should stop
eddiebergman Nov 29, 2024
d075b19
doc: Remove spammy lock acquisition debug logs
eddiebergman Nov 29, 2024
c0b5b77
fix: Better check to not overdo sampling
eddiebergman Nov 29, 2024
c7592c1
fix: Add timeout for locking `post_run_csv`
eddiebergman Nov 29, 2024
0ff3d41
fix: Add highly generous timeout for post_run_csv
eddiebergman Nov 29, 2024
20b492a
fix: Switch to `lockf` for file-locking on linux
eddiebergman Dec 2, 2024
9d9651d
refactor: Favour larger, longer locks with lockf
eddiebergman Dec 2, 2024
3a286b3
fix: Use context manager for sample_trial
eddiebergman Dec 2, 2024
4b873f0
fix: Passing loaded trials to sample
eddiebergman Dec 2, 2024
a44f551
fix: Passing loaded trials to sample (2)
eddiebergman Dec 2, 2024
bb18db7
optim: reading/loading of items
eddiebergman Dec 2, 2024
363c94d
optim: save torch tensors as numpy
eddiebergman Dec 2, 2024
e0527e4
fix: Sample successful => evaluating
eddiebergman Dec 2, 2024
dac1f34
fix: grace period for os sync
eddiebergman Dec 2, 2024
209912d
refactor: Rename to
eddiebergman Dec 9, 2024
b5a01f5
refactor: Use pickle cache
eddiebergman Dec 9, 2024
65b2bf9
refactor: Move GRACE usage to worker
eddiebergman Dec 9, 2024
b146d05
fix: Load in from disk if cache missing
eddiebergman Dec 9, 2024
56d8acc
debug: Add log file
eddiebergman Dec 9, 2024
925d912
debug: more...
eddiebergman Dec 9, 2024
981eb2d
debug: MORE...
eddiebergman Dec 9, 2024
cc60738
debug: last ditch effort
eddiebergman Dec 9, 2024
f3d23d1
debug: More info on lock method
eddiebergman Dec 9, 2024
c440b7e
debug: MOREEEEEEEE
eddiebergman Dec 9, 2024
129bf32
debug: "...and more"
eddiebergman Dec 9, 2024
0603b43
debug: "you know the drill"
eddiebergman Dec 9, 2024
2339ce7
debug: "..."
eddiebergman Dec 9, 2024
7d52f3d
fix: Delay config dir creation until config written
eddiebergman Dec 9, 2024
5093b73
fix: Yay, remove debugging statements
eddiebergman Dec 9, 2024
a51a286
optim: Speed optimizations
eddiebergman Dec 9, 2024
c6a5003
optim: Bunch of micro-optimizations
eddiebergman Dec 10, 2024
93c8d31
fix: Remove dummy check
eddiebergman Dec 10, 2024
a3610bc
optim: More aggresive consolidation of cache
eddiebergman Dec 10, 2024
7a749b1
feat: Pass in batch size
eddiebergman Dec 11, 2024
722ef9d
fix: Initial design size
eddiebergman Dec 11, 2024
f629096
ux: Improve log message for lock timeout
eddiebergman Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion neps/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def run(
loss_value_on_error: None | float = Default(None),
cost_value_on_error: None | float = Default(None),
pre_load_hooks: Iterable | None = Default(None),
sample_batch_size: int | None = Default(None),
searcher: (
Literal[
"default",
Expand Down Expand Up @@ -98,6 +99,8 @@ def run(
cost_value_on_error: Setting this and loss_value_on_error to any float will
supress any error and will use given cost value instead. default: None
pre_load_hooks: List of functions that will be called before load_results().
sample_batch_size: The number of samples to ask for in a single call to the
optimizer.
searcher: Which optimizer to use. Can be a string identifier, an
instance of BaseOptimizer, or a Path to a custom optimizer.
**searcher_kwargs: Will be passed to the searcher. This is usually only needed by
Expand Down Expand Up @@ -236,6 +239,7 @@ def run(
ignore_errors=settings.ignore_errors,
overwrite_optimization_dir=settings.overwrite_working_directory,
pre_load_hooks=settings.pre_load_hooks,
sample_batch_size=settings.sample_batch_size,
)

if settings.post_run_summary:
Expand Down Expand Up @@ -278,7 +282,8 @@ def _run_args(
"mobster",
"asha",
]
| BaseOptimizer | dict
| BaseOptimizer
| dict
) = "default",
**searcher_kwargs,
) -> tuple[BaseOptimizer, dict]:
Expand Down
83 changes: 58 additions & 25 deletions neps/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import os
from collections.abc import Callable
from typing import Any, TypeVar
from typing import Any, Literal, TypeVar

T = TypeVar("T")
V = TypeVar("V")
Expand All @@ -28,6 +28,38 @@ def is_nullable(e: str) -> bool:
return e.lower() in ("none", "n", "null")


def yaml_or_json(e: str) -> Literal["yaml", "json"]:
"""Check if an environment variable is either yaml or json."""
if e.lower() in ("yaml", "json"):
return e.lower() # type: ignore
raise ValueError(f"Expected 'yaml' or 'json', got '{e}'.")


LINUX_FILELOCK_FUNCTION = get_env(
"NEPS_LINUX_FILELOCK_FUNCTION",
parse=str,
default="lockf",
)
MAX_RETRIES_GET_NEXT_TRIAL = get_env(
"NEPS_MAX_RETRIES_GET_NEXT_TRIAL",
parse=int,
default=10,
)
MAX_RETRIES_SET_EVALUATING = get_env(
"NEPS_MAX_RETRIES_SET_EVALUATING",
parse=int,
default=10,
)
MAX_RETRIES_CREATE_LOAD_STATE = get_env(
"NEPS_MAX_RETRIES_CREATE_LOAD_STATE",
parse=int,
default=10,
)
MAX_RETRIES_WORKER_CHECK_SHOULD_STOP = get_env(
"NEPS_MAX_RETRIES_WORKER_CHECK_SHOULD_STOP",
parse=int,
default=3,
)
TRIAL_FILELOCK_POLL = get_env(
"NEPS_TRIAL_FILELOCK_POLL",
parse=float,
Expand All @@ -38,40 +70,31 @@ def is_nullable(e: str) -> bool:
parse=lambda e: None if is_nullable(e) else float(e),
default=120,
)

SEED_SNAPSHOT_FILELOCK_POLL = get_env(
"NEPS_SEED_SNAPSHOT_FILELOCK_POLL",
FS_SYNC_GRACE_BASE = get_env(
"NEPS_FS_SYNC_GRACE_BASE",
parse=float,
default=0.05,
default=0.00, # Keep it low initially to not punish synced os
)
SEED_SNAPSHOT_FILELOCK_TIMEOUT = get_env(
"NEPS_SEED_SNAPSHOT_FILELOCK_TIMEOUT",
parse=lambda e: None if is_nullable(e) else float(e),
default=120,
)

OPTIMIZER_INFO_FILELOCK_POLL = get_env(
"NEPS_OPTIMIZER_INFO_FILELOCK_POLL",
FS_SYNC_GRACE_INC = get_env(
"NEPS_FS_SYNC_GRACE_INC",
parse=float,
default=0.05,
)
OPTIMIZER_INFO_FILELOCK_TIMEOUT = get_env(
"NEPS_OPTIMIZER_INFO_FILELOCK_TIMEOUT",
parse=lambda e: None if is_nullable(e) else float(e),
default=120,
default=0.1,
)

OPTIMIZER_STATE_FILELOCK_POLL = get_env(
"NEPS_OPTIMIZER_STATE_FILELOCK_POLL",
# NOTE: We want this to be greater than the trials filelock, so that
# anything requesting to just update the trials is more likely to obtain it
# as those operations tend to be faster than something that requires optimizer
# state.
STATE_FILELOCK_POLL = get_env(
"NEPS_STATE_FILELOCK_POLL",
parse=float,
default=0.05,
default=0.20,
)
OPTIMIZER_STATE_FILELOCK_TIMEOUT = get_env(
"NEPS_OPTIMIZER_STATE_FILELOCK_TIMEOUT",
STATE_FILELOCK_TIMEOUT = get_env(
"NEPS_STATE_FILELOCK_TIMEOUT",
parse=lambda e: None if is_nullable(e) else float(e),
default=120,
)

GLOBAL_ERR_FILELOCK_POLL = get_env(
"NEPS_GLOBAL_ERR_FILELOCK_POLL",
parse=float,
Expand All @@ -82,3 +105,13 @@ def is_nullable(e: str) -> bool:
parse=lambda e: None if is_nullable(e) else float(e),
default=120,
)
TRIAL_CACHE_MAX_UPDATES_BEFORE_CONSOLIDATION = get_env(
"NEPS_TRIAL_CACHE_MAX_UPDATES_BEFORE_CONSOLIDATION",
parse=int,
default=10,
)
CONFIG_SERIALIZE_FORMAT: Literal["yaml", "json"] = get_env( # type: ignore
"NEPS_CONFIG_SERIALIZE_FORMAT",
parse=yaml_or_json,
default="yaml",
)
33 changes: 11 additions & 22 deletions neps/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

from typing import Any


class NePSError(Exception):
"""Base class for all NePS exceptions.
Expand All @@ -11,35 +13,22 @@ class NePSError(Exception):
"""


class VersionMismatchError(NePSError):
"""Raised when the version of a resource does not match the expected version."""


class VersionedResourceAlreadyExistsError(NePSError):
"""Raised when a version already exists when trying to create a new versioned
data.
"""


class VersionedResourceRemovedError(NePSError):
"""Raised when a version already exists when trying to create a new versioned
data.
"""


class VersionedResourceDoesNotExistsError(NePSError):
"""Raised when a versioned resource does not exist at a location."""


class LockFailedError(NePSError):
"""Raised when a lock cannot be acquired."""


class TrialAlreadyExistsError(VersionedResourceAlreadyExistsError):
class TrialAlreadyExistsError(NePSError):
"""Raised when a trial already exists in the store."""

def __init__(self, trial_id: str, *args: Any) -> None:
super().__init__(trial_id, *args)
self.trial_id = trial_id

def __str__(self) -> str:
return f"Trial with id {self.trial_id} already exists!"


class TrialNotFoundError(VersionedResourceDoesNotExistsError):
class TrialNotFoundError(NePSError):
"""Raised when a trial already exists in the store."""


Expand Down
21 changes: 19 additions & 2 deletions neps/optimizers/base_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from abc import abstractmethod
from collections.abc import Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, overload

from neps.state.trial import Report, Trial

Expand Down Expand Up @@ -106,12 +106,29 @@ def __init__(
self.learning_curve_on_error = learning_curve_on_error
self.ignore_errors = ignore_errors

@overload
def ask(
self,
trials: Mapping[str, Trial],
budget_info: BudgetInfo | None,
n: int,
) -> list[SampledConfig]: ...

@overload
def ask(
self,
trials: Mapping[str, Trial],
budget_info: BudgetInfo | None,
n: None = None,
) -> SampledConfig: ...

@abstractmethod
def ask(
self,
trials: Mapping[str, Trial],
budget_info: BudgetInfo | None,
) -> SampledConfig:
n: int | None = None,
) -> SampledConfig | list[SampledConfig]:
"""Sample a new configuration.

Args:
Expand Down
74 changes: 57 additions & 17 deletions neps/optimizers/bayesian_optimization/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,64 @@ def __init__(
self.cost_on_log_scale = cost_on_log_scale
self.device = device
self.sample_default_first = sample_default_first
self.n_initial_design = initial_design_size
self.init_design: list[dict[str, Any]] | None = None

if initial_design_size is not None:
self.n_initial_design = initial_design_size
else:
self.n_initial_design = len(pipeline_space.numerical) + len(
pipeline_space.categoricals
)

@override
def ask(
self,
trials: Mapping[str, Trial],
budget_info: BudgetInfo | None = None,
) -> SampledConfig:
n: int | None = None,
) -> SampledConfig | list[SampledConfig]:
_n = 1 if n is None else n
n_sampled = len(trials)
config_id = str(n_sampled + 1)
config_ids = iter(str(i + 1) for i in range(n_sampled, n_sampled + _n))
space = self.pipeline_space

# If we havn't passed the intial design phase
if self.init_design is None:
self.init_design = make_initial_design(
sampled_configs: list[SampledConfig] = []

# If the amount of configs evaluated is less than the initial design
# requirement, keep drawing from initial design
n_evaluated = sum(
1
for trial in trials.values()
if trial.report is not None and trial.report.loss is not None
)
if n_evaluated < self.n_initial_design:
design_samples = make_initial_design(
space=space,
encoder=self.encoder,
sample_default_first=self.sample_default_first,
sampler=self.prior if self.prior is not None else "sobol",
seed=None, # TODO: Seeding
sample_size=(
"ndim" if self.n_initial_design is None else self.n_initial_design
sample_default_first=(
self.sample_default_first if n_sampled == 0 else False
),
sampler=self.prior if self.prior is not None else "uniform",
seed=None, # TODO: Seeding
sample_size=_n,
sample_fidelity="max",
)

if n_sampled < len(self.init_design):
return SampledConfig(id=config_id, config=self.init_design[n_sampled])
sampled_configs.extend(
[
SampledConfig(id=config_id, config=config)
for config_id, config in zip(
config_ids,
design_samples,
strict=False,
)
]
)
if len(sampled_configs) == _n:
if n is None:
return sampled_configs[0]

return sampled_configs

# Otherwise, we encode trials and setup to fit and acquire from a GP
data, encoder = encode_trials_for_gp(
Expand All @@ -179,13 +208,13 @@ def ask(
prior = None
if self.prior:
pibo_exp_term = _pibo_exp_term(
n_sampled, encoder.ncols, len(self.init_design)
n_sampled, encoder.ncols, self.n_initial_design
)
# If the exp term is insignificant, skip prior acq. weighting
prior = None if pibo_exp_term < 1e-4 else self.prior

gp = make_default_single_obj_gp(x=data.x, y=data.y, encoder=encoder)
candidate = fit_and_acquire_from_gp(
candidates = fit_and_acquire_from_gp(
gp=gp,
x_train=data.x,
encoder=encoder,
Expand All @@ -200,11 +229,22 @@ def ask(
prune_baseline=True,
),
prior=prior,
n_candidates_required=_n,
pibo_exp_term=pibo_exp_term,
costs=data.cost if self.use_cost else None,
cost_percentage_used=cost_percent,
costs_on_log_scale=self.cost_on_log_scale,
)

config = encoder.decode(candidate)[0]
return SampledConfig(id=config_id, config=config)
configs = encoder.decode(candidates)
sampled_configs.extend(
[
SampledConfig(id=config_id, config=config)
for config_id, config in zip(config_ids, configs, strict=True)
]
)

if n is None:
return sampled_configs[0]

return sampled_configs
7 changes: 1 addition & 6 deletions neps/optimizers/initial_design.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ def make_initial_design( # noqa: PLR0912, C901
)

if sample_default_first:
# TODO: No way to pass a seed to the sampler
default = {
name: hp.default if hp.default is not None else hp.sample_value()
for name, hp in space.hyperparameters.items()
}
configs.append({**default, **fids()})
configs.append({**space.default_config, **fids()})

ndims = len(space.numerical) + len(space.categoricals)
if sample_size == "ndim":
Expand Down
Loading
Loading