diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 383e66fb2..41b6b975c 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -9,11 +9,21 @@ from numpy import typing as npt import numpy as np -from pyrfr import regression -from pyrfr.regression import binary_rss_forest as BinaryForest +from pyrfr.regression import (binary_rss_forest as BinaryForest, default_random_engine as DefaultRandomEngine, + forest_opts as ForestOpts) from .GrowingSharedArray import GrowingSharedArrayReaderView, GrowingSharedArray -from ..util import init_data_container +from ..util import get_rf_opts, train + +from enum import Enum, auto, unique + + +@unique +class Concurrency(Enum): + THREADING = auto() + THREADING_SYNCED = auto() + MULTIPROC = auto() + MULTIPROC_SYNCED = auto() SHUTDOWN = None @@ -27,10 +37,9 @@ def debug_print(*args, file=sys.stdout, **kwargs): print(*args, **kwargs, flush=True, file=file) file.flush() + # TODO: the type of the value passed for the 'bounds' param below is a tuple of tuples. Might this add some memory # dependency between the processes which might mess up the cleanup process? - - def rf_training_loop( model_queue: Queue, data_queue: Queue, data_lock: Lock, # init rf train @@ -39,21 +48,11 @@ def rf_training_loop( n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int ) -> None: - rf_opts = regression.forest_opts() - rf_opts.num_trees = n_trees - rf_opts.do_bootstrapping = bootstrapping - rf_opts.tree_opts.max_features = max_features - rf_opts.tree_opts.min_samples_to_split = min_samples_split - rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf - rf_opts.tree_opts.max_depth = max_depth - rf_opts.tree_opts.epsilon_purity = eps_purity - rf_opts.tree_opts.max_num_nodes = max_nodes - rf_opts.compute_law_of_total_variance = False - if n_points_per_tree > 0: - rf_opts.num_data_points_per_tree = n_points_per_tree - - # Case to `int` incase we get an `np.integer` type - rng = regression.default_random_engine(int(seed)) + rf_opts = get_rf_opts(n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, + eps_purity, max_nodes, n_points_per_tree) + + # Cast to `int` incase we get an `np.integer` type + rng = DefaultRandomEngine(int(seed)) shared_arrs = GrowingSharedArrayReaderView(data_lock) def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]): @@ -98,16 +97,9 @@ def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]): # when shm_id changes, here we should notify main thread it can call unlink the shared memory bc we called # close() on it # UPDATE: we avoided the warnings by disabling tracking for shared memory - data = init_data_container(X, y, bounds) - if n_points_per_tree <= 0: - rf_opts.num_data_points_per_tree = len(X) + rf = train(rng, rf_opts, n_points_per_tree, bounds, X, y) - rf = BinaryForest() - rf.options = rf_opts - debug_print(f'TRAINER STARTS TRAINING', file=sys.stderr) - rf.fit(data, rng) - debug_print(f'TRAINER FINISHED TRAINING', file=sys.stderr) send_to_optimization_loop_process(rf) debug_print(f'TRAINER BYE BYE', file=sys.stderr) @@ -120,8 +112,8 @@ def __init__(self, n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int, # process synchronization - sync: bool = False) -> None: - self.sync = sync + background_training: Optional[Concurrency] = Concurrency.MULTIPROC) -> None: + self.background_training = background_training self._model: Optional[BinaryForest] = None self.shared_arrs: Optional[GrowingSharedArray] = None @@ -129,29 +121,43 @@ def __init__(self, self.data_queue: Optional[Queue] = None self.training_loop_proc: Optional[Process] = None - self.open(bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, - eps_purity, max_nodes, n_points_per_tree) + # in case we disable training in the background, and we need these objects in the main thread + self.opts: ForestOpts = get_rf_opts(n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, + max_depth, eps_purity, max_nodes, n_points_per_tree) + self.n_points_per_tree: int = n_points_per_tree + self.bounds = tuple(bounds) + + # this is NOT used when training in background + # Cast to `int` incase we get an `np.integer` type + self.rng = DefaultRandomEngine(int(seed)) + + self.open(seed) super().__init__() - def open(self, - # init rf train - bounds: Iterable[tuple[float, float]], seed: int, - # rf opts - n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, - max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> None: - self.shared_arrs = GrowingSharedArray() - self.model_queue = Queue(maxsize=1) - self.data_queue = Queue(maxsize=1) - self.training_loop_proc = Process( - target=rf_training_loop, - daemon=True, - # name='rf_trainer', - args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, - max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, - n_points_per_tree) - ) - self.training_loop_proc.start() + def open(self, seed: int) -> None: + assert self.background_training is None or self.background_training in Concurrency + if self.background_training is None: + pass + elif self.background_training is Concurrency.THREADING: + raise NotImplementedError + elif self.background_training is Concurrency.THREADING_SYNCED: + raise NotImplementedError + else: + self.shared_arrs = GrowingSharedArray() + self.model_queue = Queue(maxsize=1) + self.data_queue = Queue(maxsize=1) + self.training_loop_proc = Process( + target=rf_training_loop, + daemon=True, + name='rf_trainer', + args=(self.model_queue, self.data_queue, self.shared_arrs.lock, self.bounds, seed, self.opts.num_trees, + self.opts.do_bootstrapping, self.opts.tree_opts.max_features, + self.opts.tree_opts.min_samples_to_split, self.opts.tree_opts.min_samples_in_leaf, + self.opts.tree_opts.max_depth, self.opts.tree_opts.epsilon_purity, + self.opts.tree_opts.max_num_nodes, self.n_points_per_tree) + ) + self.training_loop_proc.start() def close(self): # send kill signal to training process @@ -227,6 +233,7 @@ def model(self) -> BinaryForest: raise RuntimeError("the shutdown message wasn't supposed to end up here") else: self._model = msg + return self._model def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]): @@ -244,7 +251,12 @@ def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUT self.data_queue.put(data_info) def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): - self.shared_arrs.set_data(X, y) - self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) - if self.sync: - self._model = self.model_queue.get() + if self.background_training is None: + self._model = train(self.rng, self.opts, self.n_points_per_tree, self.bounds, X, y) + else: + if self.background_training in (Concurrency.THREADING, Concurrency.THREADING_SYNCED): + raise NotImplementedError + self.shared_arrs.set_data(X, y) + self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) + if self.background_training in Concurrency.MULTIPROC_SYNCED: + self._model = self.model_queue.get() diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index 4049ffd1a..9afcda9b5 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -4,7 +4,6 @@ import numpy as np from ConfigSpace import ConfigurationSpace -from pyrfr import regression from smac.constants import N_TREES, VERY_SMALL_NUMBER from . import AbstractRandomForest @@ -76,10 +75,12 @@ def __init__( max_features = 0 if ratio_features > 1.0 else max(1, int(len(self._types) * ratio_features)) self._rf_trainer = RFTrainer(self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, - min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree) + min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree, + background_training=None) self._log_y = log_y - self._rng = regression.default_random_engine(int(seed)) + # this is NOT used when training in background + self._rng = self._rf_trainer.rng self._n_trees = n_trees self._n_points_per_tree = n_points_per_tree diff --git a/smac/model/random_forest/util.py b/smac/model/random_forest/util.py index 973445a3b..d520eb1d3 100644 --- a/smac/model/random_forest/util.py +++ b/smac/model/random_forest/util.py @@ -6,16 +6,33 @@ from typing import Iterable, TYPE_CHECKING import numpy as np -from pyrfr import regression +from pyrfr.regression import (default_data_container as DataContainer, forest_opts as ForestOpts, + binary_rss_forest as BinaryForest, default_random_engine as DefaultRandomEngine) if TYPE_CHECKING: - from pyrfr.regression import default_data_container as DataContainer from numpy import typing as npt -def init_data_container( - X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], bounds: Iterable[tuple[float, float]] -) -> DataContainer: +def get_rf_opts(n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int, + max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> ForestOpts: + rf_opts = ForestOpts() + rf_opts.num_trees = n_trees + rf_opts.do_bootstrapping = bootstrapping + rf_opts.tree_opts.max_features = max_features + rf_opts.tree_opts.min_samples_to_split = min_samples_split + rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf + rf_opts.tree_opts.max_depth = max_depth + rf_opts.tree_opts.epsilon_purity = eps_purity + rf_opts.tree_opts.max_num_nodes = max_nodes + rf_opts.compute_law_of_total_variance = False + if n_points_per_tree > 0: + rf_opts.num_data_points_per_tree = n_points_per_tree + + return rf_opts + + +def init_data_container(X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], + bounds: Iterable[tuple[float, float]]) -> DataContainer: """Fills a pyrfr default data container s.t. the forest knows categoricals and bounds for continous data. Parameters @@ -31,7 +48,7 @@ def init_data_container( The filled data container that pyrfr can interpret. """ # Retrieve the types and the bounds from the ConfigSpace - data = regression.default_data_container(X.shape[1]) + data = DataContainer(X.shape[1]) for i, (mn, mx) in enumerate(bounds): if np.isnan(mx): @@ -43,3 +60,19 @@ def init_data_container( data.add_data_point(row_X, row_y) return data + + +def train(rng: DefaultRandomEngine, rf_opts: ForestOpts, n_points_per_tree: int, bounds: Iterable[tuple[float, float]], + X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> BinaryForest: + data = init_data_container(X, y, bounds) + + if n_points_per_tree <= 0: + rf_opts.num_data_points_per_tree = len(X) + + rf = BinaryForest() + rf.options = rf_opts + + rf.fit(data, rng) + + return rf + diff --git a/tests/test_acquisition/test_functions.py b/tests/test_acquisition/test_functions.py index 53c8b5f3a..f4e093a72 100644 --- a/tests/test_acquisition/test_functions.py +++ b/tests/test_acquisition/test_functions.py @@ -36,6 +36,9 @@ def predict_marginalized(self, X): [np.mean(X, axis=1).reshape((1, -1))] * self.num_targets ).reshape((-1, 1)) + def close(self): + pass + class MockModelDual: def __init__(self, num_targets=1): @@ -46,6 +49,9 @@ def predict_marginalized(self, X): [np.mean(X, axis=1).reshape((1, -1))] * self.num_targets ).reshape((-1, 2)) + def close(self): + pass + class MockPrior: def __init__(self, pdf, max_density): @@ -116,6 +122,9 @@ def predict_marginalized(self, X): def update_prior(self, hyperparameter_dict): self._configspace.get_hyperparameters_dict.return_value = hyperparameter_dict + def close(self): + pass + class MockModelRNG(MockModel): def __init__(self, num_targets=1, seed=0): @@ -154,9 +163,13 @@ def acquisition_function(model): # Test AbstractAcquisitionFunction # -------------------------------------------------------------- +class CloseableString(str): + def close(self): + pass + def test_update_model_and_eta(model, acquisition_function): - model = "abc" + model = CloseableString("abc") assert acquisition_function._eta is None acquisition_function.update(model=model, eta=0.1) assert acquisition_function.model == model @@ -164,7 +177,8 @@ def test_update_model_and_eta(model, acquisition_function): def test_update_with_kwargs(acquisition_function): - acquisition_function.update(model="abc", eta=0.0, other="hi there:)") + model = CloseableString("abc") + acquisition_function.update(model=model, eta=0.0, other="hi there:)") assert acquisition_function.model == "abc"