From 7492c48fd88529a7f43da5d8dd63b9445ed97c08 Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Wed, 11 Dec 2024 10:46:26 +0200 Subject: [PATCH] work in progress: keep handle to shared memory until change instead of re-opening every time --- smac/model/random_forest/random_forest.py | 46 +++++++++++------------ 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/smac/model/random_forest/random_forest.py b/smac/model/random_forest/random_forest.py index b1c6d403f..db789845c 100644 --- a/smac/model/random_forest/random_forest.py +++ b/smac/model/random_forest/random_forest.py @@ -5,7 +5,6 @@ import math from multiprocessing import Process, Queue, Lock, shared_memory -from contextlib import contextmanager import numpy as np import numpy.typing as npt @@ -36,21 +35,13 @@ def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool: return np.issubdtype(dtype2, dtype1) and np.issubdtype(dtype1, dtype2) -@contextmanager -def single_read_shared_mem(name: str): - shm = shared_memory.SharedMemory(name) - try: - yield shm - finally: - shm.close() - - class GrowingSharedArrayReaderView: basename_X: str = 'X' basename_y: str = 'y' def __init__(self, lock: Lock): self.lock = lock + self.shm_id: Optional[int] = None self.shm_X: Optional[shared_memory.SharedMemory] = None self.shm_y: Optional[shared_memory.SharedMemory] = None @@ -83,22 +74,28 @@ def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.fl return X[:size], y[:size] def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: - # TODO: measure perf: currently, we opted for releasing the memory immediately after read, which might make - # things a bit slower because we need to create the shared mem object every time even if it wasn't changed. - # This has the advantage that it requires less memory when reallocation occurs, and it simplifies memory - # management (i.e., I don't know if `unlink` blocks until all handles are `close`d) - # [parentheses in 'with' only work starting with python 3.10] - with self.lock, single_read_shared_mem(f'{self.basename_X}_{shm_id}') as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: - self.shm_X, self.shm_y = shm_X, shm_y + with self.lock: + # single_read_shared_mem() as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y: + if shm_id != self.shm_id: + self.shm_X.close() + del self.shm_X + self.shm_X = None + + self.shm_y.close() + del self.shm_y + self.shm_y = None + + self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}') + self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}') + shared_X, shared_y = self.np_view(size) X, y = np.array(shared_X), np.array(shared_y) # make copies - self.shm_X = self.shm_y = None + return X, y class GrowingSharedArray(GrowingSharedArrayReaderView): def __init__(self): - self.shm_id: int = 0 self.growth_rate = 1.5 super().__init__(lock=Lock()) @@ -122,6 +119,7 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No assert self.shm_X is None assert self.shm_y is None capacity = size + self.shm_id = 0 if self.row_size is not None: assert X.shape[1] == self.row_size @@ -153,8 +151,6 @@ def __init__(self): self.model_lock = Lock() self.model_queue = Queue(maxsize=1) - self.X = None - self.y = None self.opts = None self.data_queue = Queue(maxsize=1) @@ -176,7 +172,7 @@ def model(self): self._model = model return self._model - def submit_for_training(self, data: DataContainer, opts: ForestOpts): + def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], opts: ForestOpts): # use condition variable to wake up the trainer thread if it's sleeping with self.data_cv: assert data is not None @@ -198,6 +194,8 @@ def run(self) -> None: # wait for training to finish before receiving a new configuration to try, depending on CPU load; we might # have to replace the Event by a Condition + data = self._init_data_container(X, y) + _rf = regression.binary_rss_forest() _rf.options = self.opts @@ -337,14 +335,12 @@ def _train(self, X: np.ndarray, y: np.ndarray) -> RandomForest: # self.X = X # self.y = y.flatten() - data = self._init_data_container(X, y) - if self._n_points_per_tree <= 0: self._rf_opts.num_data_points_per_tree = X.shape[0] else: self._rf_opts.num_data_points_per_tree = self._n_points_per_tree - self._rf.submit_for_training(data, self._rf_opts) + self._rf.submit_for_training(X, y, self._rf_opts) # call this to make sure that there exists a trained model before returning (actually, not sure this is # required, since we check within predict() anyway)