Skip to content

Commit

Permalink
- fix bugs in arrays backed by shared memory
Browse files Browse the repository at this point in the history
- better synchronization / signalling between optimization loop and training loop
- refactor:
  - improve shared array semantics
  - encapsulation: reuse more allocation / cleanup code
  - defensive: extra checks
- other minor fixes / improvements
  • Loading branch information
Bogdan Budescu committed Dec 12, 2024
1 parent 65edb8d commit 818ffd0
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 49 deletions.
80 changes: 50 additions & 30 deletions smac/model/random_forest/multiproc_util/GrowingSharedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,55 +23,68 @@ def __init__(self, lock: Lock):
self.shm_id: Optional[int] = None
self.shm_X: Optional[shared_memory.SharedMemory] = None
self.shm_y: Optional[shared_memory.SharedMemory] = None
self.size: Optional[int] = None

def open(self, shm_id: int):
def open(self, shm_id: int, size: int):
if shm_id != self.shm_id:
self.close()
self.shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}')
self.shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}')
self.shm_id = shm_id
self.size = size

def close(self):
def close_impl(self, unlink=False):
if self.shm_X is not None:
self.shm_X.close()
if unlink:
self.shm_X.unlink()
del self.shm_X
self.shm_X = None
if self.shm_y is not None:
self.shm_y.close()
if unlink:
self.shm_y.unlink()
del self.shm_y
self.shm_y = None
self.shm_id = None
self.size = None

def close(self):
self.close_impl()

def __del__(self):
self.close()

@property
def capacity(self) -> Optional[int]:
def capacity(self) -> int:
if self.shm_y is None:
return None
assert self.shm_y.size % np.float64.itemsize == 0
return self.shm_y.size / np.float64.itemsize
return 0
assert self.shm_y.size % np.dtype(np.float64).itemsize == 0
return self.shm_y.size // np.dtype(np.float64).itemsize

@property
def row_size(self) -> Optional[int]:
if self.shm_X is None:
return None
if self.shm_X.size == 0:
assert self.shm_y.size == 0
return 0
return None
assert self.shm_X.size % self.shm_y.size == 0
return self.shm_X.size // self.shm_y.size

def np_view(self, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
@property
def X(self):
X = np.ndarray(shape=(self.capacity, self.row_size), dtype=np.float64, buffer=self.shm_X.buf)
return X[:self.size]

@property
def y(self):
y = np.ndarray(shape=(self.capacity,), dtype=np.float64, buffer=self.shm_y.buf)
return X[:size], y[:size]
return y[:self.size]

def get_data(self, shm_id: int, size: int) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
with self.lock:
self.open(shm_id)
shared_X, shared_y = self.np_view(size)
X, y = np.array(shared_X), np.array(shared_y) # make copies
self.open(shm_id, size)
X, y = np.array(self.X), np.array(self.y) # make copies and release lock to minimize critical section

return X, y

Expand All @@ -81,6 +94,12 @@ def __init__(self):
self.growth_rate = 1.5
super().__init__(lock=Lock())

def close(self):
self.close_impl(unlink=True)

def __del__(self):
self.close()

def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> None:
assert len(X) == len(y)
assert X.ndim == 2
Expand All @@ -96,33 +115,34 @@ def set_data(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> No
if self.capacity:
n_growth = math.ceil(math.log(size / self.capacity, self.growth_rate))
capacity = int(math.ceil(self.capacity * self.growth_rate ** n_growth))
self.shm_id += 1
shm_id = self.shm_id + 1
else:
assert self.shm_X is None
assert self.shm_y is None
capacity = size
self.shm_id = 0
shm_id = 0

row_size = X.shape[1]
if self.row_size is not None:
assert X.shape[1] == self.row_size

shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{self.shm_id}', create=True,
size=capacity * self.row_size * X.dtype.itemsize)
shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{self.shm_id}', create=True,
assert row_size == self.row_size
shm_X = shared_memory.SharedMemory(f'{self.basename_X}_{shm_id}', create=True,
size=capacity * row_size * X.dtype.itemsize)
shm_y = shared_memory.SharedMemory(f'{self.basename_y}_{shm_id}', create=True,
size=capacity * y.dtype.itemsize)

with self.lock:
if grow:
if self.capacity:
assert self.shm_X is not None
self.shm_X.close()
self.shm_X.unlink()
assert self.shm_y is not None
self.shm_y.close()
self.shm_y.unlink()
# TODO: here before rallocating we unlink the underlying shared memory without making sure that the
# training loop process has had a chance to close it first, so this might lead to some warnings
# references:
# - https://stackoverflow.com/a/63004750/2447427
# - https://github.com/python/cpython/issues/84140
# - https://github.com/python/cpython/issues/82300 - provides a fix that turns off tracking
self.close()
self.shm_X = shm_X
self.shm_y = shm_y
X_buf, y_buf = self.np_view(size)
X_buf[...] = X
y_buf[...] = y

self.shm_id = shm_id
self.size = size
self.X[...] = X
self.y[...] = y
80 changes: 62 additions & 18 deletions smac/model/random_forest/multiproc_util/RFTrainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,24 @@ def rf_training_loop(

while True:
msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal
must_shutdown = msg == SHUTDOWN

# discard all but the last msg in the queue
while True:
try:
msg = data_queue.get(block=False)
except queue.Empty:
break
else:
if msg == SHUTDOWN:
return
must_shutdown |= msg == SHUTDOWN
if must_shutdown:
shared_arrs.close()
model_queue.put(SHUTDOWN)
break

shm_id, size = msg
X, y = shared_arrs.get_data(shm_id, size)
# TODO: when shm_id changes, notify main thread it can call unlink the shared memory bc we called close() on it
data = init_data_container(X, y, bounds)

if n_points_per_tree <= 0:
Expand All @@ -76,19 +82,34 @@ def rf_training_loop(


class RFTrainer:
def __init__(
# init rf train
self, bounds: Iterable[tuple[float, float]], seed: int,
# rf opts
n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int,
max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int,
# process synchronization
sync: bool = False
) -> None:
self._model: Optional[BinaryForest] = None
self.shared_arrs = GrowingSharedArray()
def __init__(self,
# init rf train
bounds: Iterable[tuple[float, float]], seed: int,
# rf opts
n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int,
max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int,
# process synchronization
sync: bool = False) -> None:
self.sync = sync

self._model: Optional[BinaryForest] = None
self.shared_arrs: Optional[GrowingSharedArray] = None
self.model_queue: Optional[Queue] = None
self.data_queue: Optional[Queue] = None
self.training_loop_proc: Optional[Process] = None

self.open(bounds, seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth,
eps_purity, max_nodes, n_points_per_tree)

super().__init__()

def open(self,
# init rf train
bounds: Iterable[tuple[float, float]], seed: int,
# rf opts
n_trees: int, bootstrapping: bool, max_features: int, min_samples_split: int, min_samples_leaf: int,
max_depth: int, eps_purity: float, max_nodes: int, n_points_per_tree: int) -> None:
self.shared_arrs = GrowingSharedArray()
self.model_queue = Queue(maxsize=1)
self.data_queue = Queue(maxsize=1)
self.training_loop_proc = Process(daemon=True, target=rf_training_loop, name='rf_trainer', args=(
Expand All @@ -97,8 +118,6 @@ def __init__(
))
self.training_loop_proc.start()

super().__init__()

def close(self):
# I think this might be redundant, since according to the official docs, close and join_thread are called
# anyway when garbage-collecting queues, and we don't use JoinableQueues
Expand All @@ -117,12 +136,28 @@ def close(self):
self.training_loop_proc = None

if self.model_queue is not None:
_ = self.model # try to flush the model queue, and store the latest model
# flush the model queue, and store the latest model
while True:
msg = self.model_queue.get()
# wait for SHUTDOWN message, because that guarantees that shared_arrs.close() has been called within
# the training process; this way we make sure we call unlink only after close has had the chance to be
# called within the child process
if msg == SHUTDOWN:
break
else:
self._model = msg
self.model_queue.close()
self.model_queue.join_thread()
del self.model_queue
self.model_queue = None

# make sure this is called after SHUTDOWN was received because we want the trainer process to call
# shared_arrs.close() before we call unlink
if self.shared_arrs is not None:
self.shared_arrs.close()
del self.shared_arrs
self.shared_arrs = None

def __del__(self):
self.close()

Expand All @@ -132,15 +167,24 @@ def model(self) -> BinaryForest:
if self.model_queue is None:
raise RuntimeError('rf training loop process has been stopped before being able to train a model')
# wait until the first training is done
self._model = self.model_queue.get()
msg = self.model_queue.get()
if msg == SHUTDOWN:
raise RuntimeError("the shutdown message wasn't supposed to end up here")
else:
self._model = msg

if self.model_queue is not None:
# discard all but the last model in the queue
while True:
try:
self._model = self.model_queue.get(block=False)
msg = self.model_queue.get(block=False)
except queue.Empty:
break
else:
if msg == SHUTDOWN:
raise RuntimeError("the shutdown message wasn't supposed to end up here")
else:
self._model = msg
return self._model

def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]):
Expand Down
3 changes: 3 additions & 0 deletions smac/model/random_forest/random_forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def __init__(
# self._seed,
# ]

def __del__(self):
self._rf.close()

@property
def meta(self) -> dict[str, Any]: # noqa: D102
meta = super().meta
Expand Down
2 changes: 2 additions & 0 deletions smac/model/random_forest/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

from typing import Iterable, TYPE_CHECKING

import numpy as np
Expand Down
2 changes: 1 addition & 1 deletion tests/test_model/test_rf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
def _get_cs(n_dimensions):
configspace = ConfigurationSpace(seed=0)
for i in range(n_dimensions):
configspace.add_hyperparameter(UniformFloatHyperparameter("x%d" % i, 0, 1))
configspace.add(UniformFloatHyperparameter("x%d" % i, 0, 1))

return configspace

Expand Down

0 comments on commit 818ffd0

Please sign in to comment.