From cb76f8c5512e4d93d7e2099a8b738cae93ccb3fc Mon Sep 17 00:00:00 2001 From: Bogdan Budescu Date: Fri, 13 Dec 2024 12:16:36 +0200 Subject: [PATCH] - improve sync between optimization and training loops - refactor: - renames: improved legibility - easier debug printing for sync between opt and train loop processes --- .../multiproc_util/GrowingSharedArray.py | 6 +- .../random_forest/multiproc_util/RFTrainer.py | 107 ++++++++++-------- tests/test_acquisition/test_maximizers.py | 8 +- 3 files changed, 65 insertions(+), 56 deletions(-) diff --git a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py index e060c64f6..9eda8c406 100644 --- a/smac/model/random_forest/multiproc_util/GrowingSharedArray.py +++ b/smac/model/random_forest/multiproc_util/GrowingSharedArray.py @@ -39,7 +39,7 @@ def open(self, shm_id: int, size: int): self.shm_id = shm_id self.size = size - def close_impl(self, unlink=False): + def close_shm(self, unlink=False): if self.shm_X is not None: self.shm_X.close() if unlink: @@ -56,7 +56,7 @@ def close_impl(self, unlink=False): self.size = None def close(self): - self.close_impl() + self.close_shm() def __del__(self): self.close() @@ -101,7 +101,7 @@ def __init__(self): super().__init__(lock=Lock()) def close(self): - self.close_impl(unlink=True) + self.close_shm(unlink=True) def __del__(self): self.close() diff --git a/smac/model/random_forest/multiproc_util/RFTrainer.py b/smac/model/random_forest/multiproc_util/RFTrainer.py index 76f355802..383e66fb2 100644 --- a/smac/model/random_forest/multiproc_util/RFTrainer.py +++ b/smac/model/random_forest/multiproc_util/RFTrainer.py @@ -5,6 +5,7 @@ from multiprocessing import Lock, Queue, Process import queue +import sys from numpy import typing as npt import numpy as np @@ -18,6 +19,18 @@ SHUTDOWN = None +ENABLE_DBG_PRINT = False + + +def debug_print(*args, file=sys.stdout, **kwargs): + if ENABLE_DBG_PRINT: + print(*args, **kwargs, flush=True, file=file) + file.flush() + +# TODO: the type of the value passed for the 'bounds' param below is a tuple of tuples. Might this add some memory +# dependency between the processes which might mess up the cleanup process? + + def rf_training_loop( model_queue: Queue, data_queue: Queue, data_lock: Lock, # init rf train @@ -43,36 +56,44 @@ def rf_training_loop( rng = regression.default_random_engine(int(seed)) shared_arrs = GrowingSharedArrayReaderView(data_lock) + def send_to_optimization_loop_process(msg: Union[BinaryForest, type(SHUTDOWN)]): + # remove previous models from queue, if any, before pushing the latest model + while True: + try: + _ = model_queue.get(block=False) + except queue.Empty: + break + debug_print(f'TRAINER SENDING {"SHUTDOWN CONFIRM" if msg == SHUTDOWN else "MODEL"}', file=sys.stderr) + model_queue.put(msg) + debug_print(f'TRAINER SENDING {"SHUTDOWN CONFIRM" if msg == SHUTDOWN else "MODEL"} DONE', file=sys.stderr) + while True: - # print('TRAINER WAIT MSG', flush=True) - msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal - # print(f'TRAINER GOT MSG: {msg}', flush=True) - must_shutdown = msg == SHUTDOWN - # if must_shutdown: - # print(f'TRAINER GOT SHUTDOWN 1', flush=True) - - # discard all but the last msg in the queue + debug_print('TRAINER WAIT MSG', file=sys.stderr) + data_msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal + debug_print(f'TRAINER GOT MSG: {data_msg}', file=sys.stderr) + must_shutdown = data_msg == SHUTDOWN + if must_shutdown: + debug_print(f'TRAINER GOT SHUTDOWN 1', file=sys.stderr) + + # discard all but the last data_msg in the queue while True: try: - msg = data_queue.get(block=False) + data_msg = data_queue.get(block=False) except queue.Empty: break else: - # if msg == SHUTDOWN: - # print(f'TRAINER GOT SHUTDOWN 2', flush=True) - must_shutdown = must_shutdown or msg == SHUTDOWN + if data_msg == SHUTDOWN: + debug_print(f'TRAINER GOT SHUTDOWN 2', file=sys.stderr) + must_shutdown = must_shutdown or data_msg == SHUTDOWN if must_shutdown: shared_arrs.close() - # TODO: empty queue before pushing SHUTDOWN - # print(f'TRAINER SENDS SHUTDOWN CONFIRMATION', flush=True) - model_queue.put(SHUTDOWN) - # print(f'TRAINER FINISHED SEND SHUTDOWN CONFIRMATION', flush=True) + send_to_optimization_loop_process(SHUTDOWN) + # don't kill current process until we make sure the queue's underlying pipe is flushed model_queue.close() - # model_queue.join_thread() # TODO: enable this again - # print(f'TRAINER BYE BYE', flush=True) + model_queue.join_thread() break - shm_id, size = msg + shm_id, size = data_msg X, y = shared_arrs.get_data(shm_id, size) # when shm_id changes, here we should notify main thread it can call unlink the shared memory bc we called # close() on it @@ -84,19 +105,11 @@ def rf_training_loop( rf = BinaryForest() rf.options = rf_opts + debug_print(f'TRAINER STARTS TRAINING', file=sys.stderr) rf.fit(data, rng) - - # print(f'TRAINER FINISHED TRAINING', flush=True) - - # remove previous models from queue, if any, before pushing the latest model - while True: - try: - _ = model_queue.get(block=False) - except queue.Empty: - break - # print(f'TRAINER SENDING MODEL', flush=True) - model_queue.put(rf) - # print(f'TRAINER SENDING MODEL DONE', flush=True) + debug_print(f'TRAINER FINISHED TRAINING', file=sys.stderr) + send_to_optimization_loop_process(rf) + debug_print(f'TRAINER BYE BYE', file=sys.stderr) class RFTrainer: @@ -131,7 +144,9 @@ def open(self, self.model_queue = Queue(maxsize=1) self.data_queue = Queue(maxsize=1) self.training_loop_proc = Process( - target=rf_training_loop, daemon=True, name='rf_trainer', + target=rf_training_loop, + daemon=True, + # name='rf_trainer', args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping, max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree) @@ -142,21 +157,23 @@ def close(self): # send kill signal to training process if self.data_queue is not None: if self.training_loop_proc is not None: - # print('MAIN SEND SHUTDOWN', flush=True) + debug_print('MAIN SEND SHUTDOWN') self.send_to_training_loop_proc(SHUTDOWN) - # print('MAIN FINISHED SEND SHUTDOWN', flush=True) + debug_print('MAIN FINISHED SEND SHUTDOWN') # make sure the shutdown message is flush before moving on self.data_queue.close() self.data_queue.join_thread() + del self.data_queue + self.data_queue = None # wait till the training process died if self.model_queue is not None and self.training_loop_proc is not None and self.training_loop_proc.is_alive(): # flush the model queue, and store the latest model while True: - # print('MAIN WAIT SHUTDOWN CONFIRM', flush=True) + debug_print('MAIN WAIT SHUTDOWN CONFIRM') msg = self.model_queue.get() - # print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else msg} ' - # f'AFTER WAITING FOR SHUTDOWN CONFIRMATION', flush=True) + debug_print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else "MODEL"}' + f' AFTER WAITING FOR SHUTDOWN CONFIRMATION') # wait for SHUTDOWN message, because that guarantees that shared_arrs.close() has been called within # the training process; this way we make sure we call unlink only after close has had the chance to be # called within the child process @@ -172,15 +189,7 @@ def close(self): del self.training_loop_proc self.training_loop_proc = None - # I think this might be redundant, since according to the official docs, close and join_thread are called - # anyway when garbage-collecting queues, and we don't use JoinableQueues - if self.data_queue is not None: - del self.data_queue - self.data_queue = None - if self.model_queue is not None: - # self.model_queue.close() - # self.model_queue.join_thread() del self.model_queue self.model_queue = None @@ -221,6 +230,9 @@ def model(self) -> BinaryForest: return self._model def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]): + if self.data_queue is None: + raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data') + # empty queue before pushing new data onto it while True: try: @@ -233,11 +245,6 @@ def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUT def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]): self.shared_arrs.set_data(X, y) - - if self.data_queue is None: - raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data') - self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X))) - if self.sync: self._model = self.model_queue.get() diff --git a/tests/test_acquisition/test_maximizers.py b/tests/test_acquisition/test_maximizers.py index 9fbd89892..fdac06619 100644 --- a/tests/test_acquisition/test_maximizers.py +++ b/tests/test_acquisition/test_maximizers.py @@ -480,9 +480,11 @@ def random_search(): def main(): - # TODO: running all these three IN THIS ORDER causes a hang, probably because of the dependency graph growing too - # complex for the garbage collector to handle, so RFTrainer.close() is never called. In order to avoid hangs while - # running tests, we explicitly call RFTrainer.close() during model fixture teardown + from smac.model.random_forest.multiproc_util import RFTrainer + RFTrainer.ENABLE_DBG_PRINT = True + # TODO: running ALL these three IN THIS ORDER causes a hang, probably because of the dependency graph growing too + # complex and circular for the garbage collector to handle, so RFTrainer.close() is never called. In order to avoid + # hangs while running tests, we explicitly call RFTrainer.close() in model fixture teardown print('differential_evolution:') differential_evolution() print('\nmin_repro_differential_evolution_bug:')