Skip to content

Commit

Permalink
- improve sync between optimization and training loops
Browse files Browse the repository at this point in the history
- refactor:
  - renames: improved legibility
  - easier debug printing for sync between opt and train loop processes
  • Loading branch information
Bogdan Budescu committed Dec 13, 2024
1 parent f05d0bf commit cb76f8c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 56 deletions.
6 changes: 3 additions & 3 deletions smac/model/random_forest/multiproc_util/GrowingSharedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def open(self, shm_id: int, size: int):
self.shm_id = shm_id
self.size = size

def close_impl(self, unlink=False):
def close_shm(self, unlink=False):
if self.shm_X is not None:
self.shm_X.close()
if unlink:
Expand All @@ -56,7 +56,7 @@ def close_impl(self, unlink=False):
self.size = None

def close(self):
self.close_impl()
self.close_shm()

def __del__(self):
self.close()
Expand Down Expand Up @@ -101,7 +101,7 @@ def __init__(self):
super().__init__(lock=Lock())

def close(self):
self.close_impl(unlink=True)
self.close_shm(unlink=True)

def __del__(self):
self.close()
Expand Down
107 changes: 57 additions & 50 deletions smac/model/random_forest/multiproc_util/RFTrainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from multiprocessing import Lock, Queue, Process
import queue
import sys

from numpy import typing as npt
import numpy as np
Expand All @@ -18,6 +19,18 @@
SHUTDOWN = None


ENABLE_DBG_PRINT = False


def debug_print(*args, file=sys.stdout, **kwargs):
if ENABLE_DBG_PRINT:
print(*args, **kwargs, flush=True, file=file)
file.flush()

# TODO: the type of the value passed for the 'bounds' param below is a tuple of tuples. Might this add some memory
# dependency between the processes which might mess up the cleanup process?


def rf_training_loop(
model_queue: Queue, data_queue: Queue, data_lock: Lock,
# init rf train
Expand All @@ -43,36 +56,44 @@ def rf_training_loop(
rng = regression.default_random_engine(int(seed))
shared_arrs = GrowingSharedArrayReaderView(data_lock)

def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]):
# remove previous models from queue, if any, before pushing the latest model
while True:
try:
_ = model_queue.get(block=False)
except queue.Empty:
break
debug_print(f'TRAINER SENDING {"SHUTDOWN CONFIRM" if msg == SHUTDOWN else "MODEL"}', file=sys.stderr)
model_queue.put(msg)
debug_print(f'TRAINER SENDING {"SHUTDOWN CONFIRM" if msg == SHUTDOWN else "MODEL"} DONE', file=sys.stderr)

while True:
# print('TRAINER WAIT MSG', flush=True)
msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal
# print(f'TRAINER GOT MSG: {msg}', flush=True)
must_shutdown = msg == SHUTDOWN
# if must_shutdown:
# print(f'TRAINER GOT SHUTDOWN 1', flush=True)

# discard all but the last msg in the queue
debug_print('TRAINER WAIT MSG', file=sys.stderr)
data_msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal
debug_print(f'TRAINER GOT MSG: {data_msg}', file=sys.stderr)
must_shutdown = data_msg == SHUTDOWN
if must_shutdown:
debug_print(f'TRAINER GOT SHUTDOWN 1', file=sys.stderr)

# discard all but the last data_msg in the queue
while True:
try:
msg = data_queue.get(block=False)
data_msg = data_queue.get(block=False)
except queue.Empty:
break
else:
# if msg == SHUTDOWN:
# print(f'TRAINER GOT SHUTDOWN 2', flush=True)
must_shutdown = must_shutdown or msg == SHUTDOWN
if data_msg == SHUTDOWN:
debug_print(f'TRAINER GOT SHUTDOWN 2', file=sys.stderr)
must_shutdown = must_shutdown or data_msg == SHUTDOWN
if must_shutdown:
shared_arrs.close()
# TODO: empty queue before pushing SHUTDOWN
# print(f'TRAINER SENDS SHUTDOWN CONFIRMATION', flush=True)
model_queue.put(SHUTDOWN)
# print(f'TRAINER FINISHED SEND SHUTDOWN CONFIRMATION', flush=True)
send_to_optimization_loop_process(SHUTDOWN)
# don't kill current process until we make sure the queue's underlying pipe is flushed
model_queue.close()
# model_queue.join_thread() # TODO: enable this again
# print(f'TRAINER BYE BYE', flush=True)
model_queue.join_thread()
break

shm_id, size = msg
shm_id, size = data_msg
X, y = shared_arrs.get_data(shm_id, size)
# when shm_id changes, here we should notify main thread it can call unlink the shared memory bc we called
# close() on it
Expand All @@ -84,19 +105,11 @@ def rf_training_loop(

rf = BinaryForest()
rf.options = rf_opts
debug_print(f'TRAINER STARTS TRAINING', file=sys.stderr)
rf.fit(data, rng)

# print(f'TRAINER FINISHED TRAINING', flush=True)

# remove previous models from queue, if any, before pushing the latest model
while True:
try:
_ = model_queue.get(block=False)
except queue.Empty:
break
# print(f'TRAINER SENDING MODEL', flush=True)
model_queue.put(rf)
# print(f'TRAINER SENDING MODEL DONE', flush=True)
debug_print(f'TRAINER FINISHED TRAINING', file=sys.stderr)
send_to_optimization_loop_process(rf)
debug_print(f'TRAINER BYE BYE', file=sys.stderr)


class RFTrainer:
Expand Down Expand Up @@ -131,7 +144,9 @@ def open(self,
self.model_queue = Queue(maxsize=1)
self.data_queue = Queue(maxsize=1)
self.training_loop_proc = Process(
target=rf_training_loop, daemon=True, name='rf_trainer',
target=rf_training_loop,
daemon=True,
# name='rf_trainer',
args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping,
max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes,
n_points_per_tree)
Expand All @@ -142,21 +157,23 @@ def close(self):
# send kill signal to training process
if self.data_queue is not None:
if self.training_loop_proc is not None:
# print('MAIN SEND SHUTDOWN', flush=True)
debug_print('MAIN SEND SHUTDOWN')
self.send_to_training_loop_proc(SHUTDOWN)
# print('MAIN FINISHED SEND SHUTDOWN', flush=True)
debug_print('MAIN FINISHED SEND SHUTDOWN')
# make sure the shutdown message is flush before moving on
self.data_queue.close()
self.data_queue.join_thread()
del self.data_queue
self.data_queue = None

# wait till the training process died
if self.model_queue is not None and self.training_loop_proc is not None and self.training_loop_proc.is_alive():
# flush the model queue, and store the latest model
while True:
# print('MAIN WAIT SHUTDOWN CONFIRM', flush=True)
debug_print('MAIN WAIT SHUTDOWN CONFIRM')
msg = self.model_queue.get()
# print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else msg} '
# f'AFTER WAITING FOR SHUTDOWN CONFIRMATION', flush=True)
debug_print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else "MODEL"}'
f' AFTER WAITING FOR SHUTDOWN CONFIRMATION')
# wait for SHUTDOWN message, because that guarantees that shared_arrs.close() has been called within
# the training process; this way we make sure we call unlink only after close has had the chance to be
# called within the child process
Expand All @@ -172,15 +189,7 @@ def close(self):
del self.training_loop_proc
self.training_loop_proc = None

# I think this might be redundant, since according to the official docs, close and join_thread are called
# anyway when garbage-collecting queues, and we don't use JoinableQueues
if self.data_queue is not None:
del self.data_queue
self.data_queue = None

if self.model_queue is not None:
# self.model_queue.close()
# self.model_queue.join_thread()
del self.model_queue
self.model_queue = None

Expand Down Expand Up @@ -221,6 +230,9 @@ def model(self) -> BinaryForest:
return self._model

def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]):
if self.data_queue is None:
raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data')

# empty queue before pushing new data onto it
while True:
try:
Expand All @@ -233,11 +245,6 @@ def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUT

def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]):
self.shared_arrs.set_data(X, y)

if self.data_queue is None:
raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data')

self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X)))

if self.sync:
self._model = self.model_queue.get()
8 changes: 5 additions & 3 deletions tests/test_acquisition/test_maximizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,11 @@ def random_search():


def main():
# TODO: running all these three IN THIS ORDER causes a hang, probably because of the dependency graph growing too
# complex for the garbage collector to handle, so RFTrainer.close() is never called. In order to avoid hangs while
# running tests, we explicitly call RFTrainer.close() during model fixture teardown
from smac.model.random_forest.multiproc_util import RFTrainer
RFTrainer.ENABLE_DBG_PRINT = True
# TODO: running ALL these three IN THIS ORDER causes a hang, probably because of the dependency graph growing too
# complex and circular for the garbage collector to handle, so RFTrainer.close() is never called. In order to avoid
# hangs while running tests, we explicitly call RFTrainer.close() in model fixture teardown
print('differential_evolution:')
differential_evolution()
print('\nmin_repro_differential_evolution_bug:')
Expand Down

0 comments on commit cb76f8c

Please sign in to comment.