Skip to content

Commit

Permalink
work in progress: keep handle to shared memory until change instead o…
Browse files Browse the repository at this point in the history
…f re-opening every time
  • Loading branch information
Bogdan Budescu committed Dec 11, 2024
1 parent 642d8fc commit 7492c48
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions smac/model/random_forest/random_forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import math
from multiprocessing import Process, Queue, Lock, shared_memory
from contextlib import contextmanager

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -36,21 +35,13 @@ def dtypes_are_equal(dtype1: np.dtype, dtype2: np.dtype) -> bool:
return np.issubdtype(dtype2, dtype1) and np.issubdtype(dtype1, dtype2)


@contextmanager
def single_read_shared_mem(name: str):
shm = shared_memory.SharedMemory(name)
try:
yield shm
finally:
shm.close()


class GrowingSharedArrayReaderView:
basename_X: str = 'X'
basename_y: str = 'y'

def __init__(self, lock: Lock):
self.lock = lock
self.shm_id: Optional[int] = None
self.shm_X: Optional[shared_memory.SharedMemory] = None
self.shm_y: Optional[shared_memory.SharedMemory] = None

Expand Down Expand Up @@ -83,22 +74,28 @@ def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.fl
return X[:size], y[:size]

def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
# TODO: measure perf: currently, we opted for releasing the memory immediately after read, which might make
# things a bit slower because we need to create the shared mem object every time even if it wasn't changed.
# This has the advantage that it requires less memory when reallocation occurs, and it simplifies memory
# management (i.e., I don't know if `unlink` blocks until all handles are `close`d)
# [parentheses in 'with' only work starting with python 3.10]
with self.lock, single_read_shared_mem(f'{self.basename_X}_{shm_id}') as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y:
self.shm_X, self.shm_y = shm_X, shm_y
with self.lock:
# single_read_shared_mem() as shm_X, single_read_shared_mem(f'{self.basename_y}_{shm_id}') as shm_y:
if shm_id != self.shm_id:
self.shm_X.close()
del self.shm_X
self.shm_X = None

self.shm_y.close()
del self.shm_y
self.shm_y = None

self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}')
self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}')

shared_X, shared_y = self.np_view(size)
X, y = np.array(shared_X), np.array(shared_y) # make copies
self.shm_X = self.shm_y = None

return X, y


class GrowingSharedArray(GrowingSharedArrayReaderView):
def __init__(self):
self.shm_id: int = 0
self.growth_rate = 1.5
super().__init__(lock=Lock())

Expand All @@ -122,6 +119,7 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No
assert self.shm_X is None
assert self.shm_y is None
capacity = size
self.shm_id = 0

if self.row_size is not None:
assert X.shape[1] == self.row_size
Expand Down Expand Up @@ -153,8 +151,6 @@ def __init__(self):
self.model_lock = Lock()
self.model_queue = Queue(maxsize=1)

self.X = None
self.y = None
self.opts = None
self.data_queue = Queue(maxsize=1)

Expand All @@ -176,7 +172,7 @@ def model(self):
self._model = model
return self._model

def submit_for_training(self, data: DataContainer, opts: ForestOpts):
def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64], opts: ForestOpts):
# use condition variable to wake up the trainer thread if it's sleeping
with self.data_cv:
assert data is not None
Expand All @@ -198,6 +194,8 @@ def run(self) -> None:
# wait for training to finish before receiving a new configuration to try, depending on CPU load; we might
# have to replace the Event by a Condition

data = self._init_data_container(X, y)

_rf = regression.binary_rss_forest()
_rf.options = self.opts

Expand Down Expand Up @@ -337,14 +335,12 @@ def _train(self, X: np.ndarray, y: np.ndarray) -> RandomForest:
# self.X = X
# self.y = y.flatten()

data = self._init_data_container(X, y)

if self._n_points_per_tree <= 0:
self._rf_opts.num_data_points_per_tree = X.shape[0]
else:
self._rf_opts.num_data_points_per_tree = self._n_points_per_tree

self._rf.submit_for_training(data, self._rf_opts)
self._rf.submit_for_training(X, y, self._rf_opts)

# call this to make sure that there exists a trained model before returning (actually, not sure this is
# required, since we check within predict() anyway)
Expand Down

0 comments on commit 7492c48

Please sign in to comment.