Skip to content

Commit

Permalink
add flag to switch off (concurrent) background training entirely (def…
Browse files Browse the repository at this point in the history
…ault to old behavior)
  • Loading branch information
Bogdan Budescu committed Dec 13, 2024
1 parent cb76f8c commit 6198d8f
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 65 deletions.
120 changes: 66 additions & 54 deletions smac/model/random_forest/multiproc_util/RFTrainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@

from numpy import typing as npt
import numpy as np
from pyrfr import regression
from pyrfr.regression import binary_rss_forest as BinaryForest
from pyrfr.regression import (binary_rss_forest as BinaryForest, default_random_engine as DefaultRandomEngine,
forest_opts as ForestOpts)

from .GrowingSharedArray import GrowingSharedArrayReaderView, GrowingSharedArray
from ..util import init_data_container
from ..util import get_rf_opts, train

from enum import Enum, auto, unique


@unique
class Concurrency(Enum):
THREADING = auto()
THREADING_SYNCED = auto()
MULTIPROC = auto()
MULTIPROC_SYNCED = auto()


SHUTDOWN = None
Expand All @@ -27,10 +37,9 @@ def debug_print(*args, file=sys.stdout, **kwargs):
print(*args, **kwargs, flush=True, file=file)
file.flush()


# TODO: the type of the value passed for the 'bounds' param below is a tuple of tuples. Might this add some memory
# dependency between the processes which might mess up the cleanup process?


def rf_training_loop(
model_queue: Queue, data_queue: Queue, data_lock: Lock,
# init rf train
Expand All @@ -39,21 +48,11 @@ def rf_training_loop(
n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int,
max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int
) -> None:
rf_opts = regression.forest_opts()
rf_opts.num_trees = n_trees
rf_opts.do_bootstrapping = bootstrapping
rf_opts.tree_opts.max_features = max_features
rf_opts.tree_opts.min_samples_to_split = min_samples_split
rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf
rf_opts.tree_opts.max_depth = max_depth
rf_opts.tree_opts.epsilon_purity = eps_purity
rf_opts.tree_opts.max_num_nodes = max_nodes
rf_opts.compute_law_of_total_variance = False
if n_points_per_tree > 0:
rf_opts.num_data_points_per_tree = n_points_per_tree

# Case to `int` incase we get an `np.integer` type
rng = regression.default_random_engine(int(seed))
rf_opts = get_rf_opts(n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth,
eps_purity, max_nodes, n_points_per_tree)

# Cast to `int` incase we get an `np.integer` type
rng = DefaultRandomEngine(int(seed))
shared_arrs = GrowingSharedArrayReaderView(data_lock)

def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]):
Expand Down Expand Up @@ -98,16 +97,9 @@ def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]):
# when shm_id changes, here we should notify main thread it can call unlink the shared memory bc we called
# close() on it
# UPDATE: we avoided the warnings by disabling tracking for shared memory
data = init_data_container(X, y, bounds)

if n_points_per_tree <= 0:
rf_opts.num_data_points_per_tree = len(X)
rf = train(rng, rf_opts, n_points_per_tree, bounds, X, y)

rf = BinaryForest()
rf.options = rf_opts
debug_print(f'TRAINER STARTS TRAINING', file=sys.stderr)
rf.fit(data, rng)
debug_print(f'TRAINER FINISHED TRAINING', file=sys.stderr)
send_to_optimization_loop_process(rf)
debug_print(f'TRAINER BYE BYE', file=sys.stderr)

Expand All @@ -120,38 +112,52 @@ def __init__(self,
n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int,
max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int,
# process synchronization
sync: bool = False) -> None:
self.sync = sync
background_training: Optional[Concurrency] = Concurrency.MULTIPROC) -> None:
self.background_training = background_training

self._model: Optional[BinaryForest] = None
self.shared_arrs: Optional[GrowingSharedArray] = None
self.model_queue: Optional[Queue] = None
self.data_queue: Optional[Queue] = None
self.training_loop_proc: Optional[Process] = None

self.open(bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth,
eps_purity, max_nodes, n_points_per_tree)
# in case we disable training in the background, and we need these objects in the main thread
self.opts: ForestOpts = get_rf_opts(n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf,
max_depth, eps_purity, max_nodes, n_points_per_tree)
self.n_points_per_tree: int = n_points_per_tree
self.bounds = tuple(bounds)

# this is NOT used when training in background
# Cast to `int` incase we get an `np.integer` type
self.rng = DefaultRandomEngine(int(seed))

self.open(seed)

super().__init__()

def open(self,
# init rf train
bounds: Iterable[tuple[float, float]], seed: int,
# rf opts
n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int,
max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> None:
self.shared_arrs = GrowingSharedArray()
self.model_queue = Queue(maxsize=1)
self.data_queue = Queue(maxsize=1)
self.training_loop_proc = Process(
target=rf_training_loop,
daemon=True,
# name='rf_trainer',
args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping,
max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes,
n_points_per_tree)
)
self.training_loop_proc.start()
def open(self, seed: int) -> None:
assert self.background_training is None or self.background_training in Concurrency
if self.background_training is None:
pass
elif self.background_training is Concurrency.THREADING:
raise NotImplementedError
elif self.background_training is Concurrency.THREADING_SYNCED:
raise NotImplementedError
else:
self.shared_arrs = GrowingSharedArray()
self.model_queue = Queue(maxsize=1)
self.data_queue = Queue(maxsize=1)
self.training_loop_proc = Process(
target=rf_training_loop,
daemon=True,
name='rf_trainer',
args=(self.model_queue, self.data_queue, self.shared_arrs.lock, self.bounds, seed, self.opts.num_trees,
self.opts.do_bootstrapping, self.opts.tree_opts.max_features,
self.opts.tree_opts.min_samples_to_split, self.opts.tree_opts.min_samples_in_leaf,
self.opts.tree_opts.max_depth, self.opts.tree_opts.epsilon_purity,
self.opts.tree_opts.max_num_nodes, self.n_points_per_tree)
)
self.training_loop_proc.start()

def close(self):
# send kill signal to training process
Expand Down Expand Up @@ -227,6 +233,7 @@ def model(self) -> BinaryForest:
raise RuntimeError("the shutdown message wasn't supposed to end up here")
else:
self._model = msg

return self._model

def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]):
Expand All @@ -244,7 +251,12 @@ def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUT
self.data_queue.put(data_info)

def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]):
self.shared_arrs.set_data(X, y)
self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X)))
if self.sync:
self._model = self.model_queue.get()
if self.background_training is None:
self._model = train(self.rng, self.opts, self.n_points_per_tree, self.bounds, X, y)
else:
if self.background_training in (Concurrency.THREADING, Concurrency.THREADING_SYNCED):
raise NotImplementedError
self.shared_arrs.set_data(X, y)
self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X)))
if self.background_training in Concurrency.MULTIPROC_SYNCED:
self._model = self.model_queue.get()
7 changes: 4 additions & 3 deletions smac/model/random_forest/random_forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import numpy as np
from ConfigSpace import ConfigurationSpace
from pyrfr import regression

from smac.constants import N_TREES, VERY_SMALL_NUMBER
from . import AbstractRandomForest
Expand Down Expand Up @@ -76,10 +75,12 @@ def __init__(
max_features = 0 if ratio_features > 1.0 else max(1, int(len(self._types) * ratio_features))

self._rf_trainer = RFTrainer(self._bounds, seed, n_trees, bootstrapping, max_features, min_samples_split,
min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree)
min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree,
background_training=None)
self._log_y = log_y

self._rng = regression.default_random_engine(int(seed))
# this is NOT used when training in background
self._rng = self._rf_trainer.rng

self._n_trees = n_trees
self._n_points_per_tree = n_points_per_tree
Expand Down
45 changes: 39 additions & 6 deletions smac/model/random_forest/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,33 @@
from typing import Iterable, TYPE_CHECKING

import numpy as np
from pyrfr import regression
from pyrfr.regression import (default_data_container as DataContainer, forest_opts as ForestOpts,
binary_rss_forest as BinaryForest, default_random_engine as DefaultRandomEngine)

if TYPE_CHECKING:
from pyrfr.regression import default_data_container as DataContainer
from numpy import typing as npt


def init_data_container(
X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], bounds: Iterable[tuple[float, float]]
) -> DataContainer:
def get_rf_opts(n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int,
max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> ForestOpts:
rf_opts = ForestOpts()
rf_opts.num_trees = n_trees
rf_opts.do_bootstrapping = bootstrapping
rf_opts.tree_opts.max_features = max_features
rf_opts.tree_opts.min_samples_to_split = min_samples_split
rf_opts.tree_opts.min_samples_in_leaf = min_samples_leaf
rf_opts.tree_opts.max_depth = max_depth
rf_opts.tree_opts.epsilon_purity = eps_purity
rf_opts.tree_opts.max_num_nodes = max_nodes
rf_opts.compute_law_of_total_variance = False
if n_points_per_tree > 0:
rf_opts.num_data_points_per_tree = n_points_per_tree

return rf_opts


def init_data_container(X: npt.NDArray[np.float64], y: npt.NDArray[np.float64],
bounds: Iterable[tuple[float, float]]) -> DataContainer:
"""Fills a pyrfr default data container s.t. the forest knows categoricals and bounds for continous data.
Parameters
Expand All @@ -31,7 +48,7 @@ def init_data_container(
The filled data container that pyrfr can interpret.
"""
# Retrieve the types and the bounds from the ConfigSpace
data = regression.default_data_container(X.shape[1])
data = DataContainer(X.shape[1])

for i, (mn, mx) in enumerate(bounds):
if np.isnan(mx):
Expand All @@ -43,3 +60,19 @@ def init_data_container(
data.add_data_point(row_X, row_y)

return data


def train(rng: DefaultRandomEngine, rf_opts: ForestOpts, n_points_per_tree: int, bounds: Iterable[tuple[float, float]],
X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> BinaryForest:
data = init_data_container(X, y, bounds)

if n_points_per_tree <= 0:
rf_opts.num_data_points_per_tree = len(X)

rf = BinaryForest()
rf.options = rf_opts

rf.fit(data, rng)

return rf

18 changes: 16 additions & 2 deletions tests/test_acquisition/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def predict_marginalized(self, X):
[np.mean(X, axis=1).reshape((1, -1))] * self.num_targets
).reshape((-1, 1))

def close(self):
pass


class MockModelDual:
def __init__(self, num_targets=1):
Expand All @@ -46,6 +49,9 @@ def predict_marginalized(self, X):
[np.mean(X, axis=1).reshape((1, -1))] * self.num_targets
).reshape((-1, 2))

def close(self):
pass


class MockPrior:
def __init__(self, pdf, max_density):
Expand Down Expand Up @@ -116,6 +122,9 @@ def predict_marginalized(self, X):
def update_prior(self, hyperparameter_dict):
self._configspace.get_hyperparameters_dict.return_value = hyperparameter_dict

def close(self):
pass


class MockModelRNG(MockModel):
def __init__(self, num_targets=1, seed=0):
Expand Down Expand Up @@ -154,17 +163,22 @@ def acquisition_function(model):
# Test AbstractAcquisitionFunction
# --------------------------------------------------------------

class CloseableString(str):
def close(self):
pass


def test_update_model_and_eta(model, acquisition_function):
model = "abc"
model = CloseableString("abc")
assert acquisition_function._eta is None
acquisition_function.update(model=model, eta=0.1)
assert acquisition_function.model == model
assert acquisition_function._eta == 0.1


def test_update_with_kwargs(acquisition_function):
acquisition_function.update(model="abc", eta=0.0, other="hi there:)")
model = CloseableString("abc")
acquisition_function.update(model=model, eta=0.0, other="hi there:)")
assert acquisition_function.model == "abc"


Expand Down

0 comments on commit 6198d8f

Please sign in to comment.