Skip to content

Commit

Permalink
- add cleanup semantics in hierarchy of objects containing random for…
Browse files Browse the repository at this point in the history
…ests to terminate the training loop process gracefully (to as high as an extent as possible)

- add (and then disable) some code that prints to console to help debug inter-process synchronization
- refactor: renames for improved legibility
- refactor: encapsulate and reuse
- add option to run testing code without pytest for debug
- modify some testing code to avoid deprecation warnings
  • Loading branch information
Bogdan Budescu committed Dec 13, 2024
1 parent ae9b821 commit f05d0bf
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 40 deletions.
7 changes: 7 additions & 0 deletions smac/acquisition/function/abstract_acquisition_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ class AbstractAcquisitionFunction:
def __init__(self) -> None:
self._model: AbstractModel | None = None

def close(self):
if self._model:
self._model.close()

def __del__(self):
self.close()

@property
def name(self) -> str:
"""Returns the full name of the acquisition function."""
Expand Down
7 changes: 7 additions & 0 deletions smac/acquisition/maximizer/abstract_acqusition_maximizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def __init__(
self._seed = seed
self._rng = np.random.RandomState(seed=seed)

def close(self):
if self.acquisition_function:
self.acquisition_function.close()

def __del__(self):
self.close()

@property
def acquisition_function(self) -> AbstractAcquisitionFunction | None:
"""The acquisition function used for maximization."""
Expand Down
13 changes: 13 additions & 0 deletions smac/facade/abstract_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,19 @@ def __init__(
# every time new information are available
self._optimizer.register_callback(self._intensifier.get_callback(), index=0)

def close(self):
if self._model:
self._model.close()
if self._acquisition_function:
self._acquisition_function.close()
if self._acquisition_maximizer:
self._acquisition_maximizer.close()
if self._config_selector:
self._config_selector.close()

def __del__(self):
self.close()

@property
def scenario(self) -> Scenario:
"""The scenario object which holds all environment information."""
Expand Down
7 changes: 7 additions & 0 deletions smac/intensifier/abstract_intensifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ def __init__(
# Reset everything
self.reset()

def close(self):
if self._config_selector:
self._config_selector.close()

def __del__(self):
self.close()

def reset(self) -> None:
"""Reset the internal variables of the intensifier."""
self._tf_seeds: list[int] = []
Expand Down
11 changes: 11 additions & 0 deletions smac/main/config_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ def __init__(
# Processed configurations should be stored here; this is important to not return the same configuration twice
self._processed_configs: list[Configuration] = []

def close(self):
if self._model:
self._model.close()
if self._acquisition_maximizer:
self._acquisition_maximizer.close()
if self._acquisition_function:
self._acquisition_function.close()

def __del__(self):
self.close()

def _set_components(
self,
initial_design: AbstractInitialDesign,
Expand Down
6 changes: 6 additions & 0 deletions smac/model/abstract_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ def __init__(
# Initial types array which is used to reset the type array at every call to `self.train()`
self._initial_types = copy.deepcopy(self._types)

def close(self):
pass

def __del__(self):
self.close()

@property
def meta(self) -> dict[str, Any]:
"""Returns the meta data of the created object."""
Expand Down
6 changes: 3 additions & 3 deletions smac/model/random_forest/multiproc_util/GrowingSharedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from multiprocessing import Lock

# from multiprocessing.shared_memory import SharedMemory
from .SharedMemory import SharedMemory as TrackableSharedMemory
def SharedMemory(*args, **kwargs) -> TrackableSharedMemory:
return TrackableSharedMemory(*args, track=False, **kwargs)
from .SharedMemory import SharedMemory as UntrackableSharedMemory
def SharedMemory(*args, **kwargs) -> UntrackableSharedMemory:
return UntrackableSharedMemory(*args, track=False, **kwargs)


import numpy as np
Expand Down
88 changes: 61 additions & 27 deletions smac/model/random_forest/multiproc_util/RFTrainer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from typing import Iterable, Optional
from typing import Iterable, Optional, Union

from multiprocessing import Lock, Queue, Process
import queue
Expand Down Expand Up @@ -44,8 +44,12 @@ def rf_training_loop(
shared_arrs = GrowingSharedArrayReaderView(data_lock)

while True:
# print('TRAINER WAIT MSG', flush=True)
msg = data_queue.get() # if queue is empty, wait for training data or shutdown signal
# print(f'TRAINER GOT MSG: {msg}', flush=True)
must_shutdown = msg == SHUTDOWN
# if must_shutdown:
# print(f'TRAINER GOT SHUTDOWN 1', flush=True)

# discard all but the last msg in the queue
while True:
Expand All @@ -54,10 +58,18 @@ def rf_training_loop(
except queue.Empty:
break
else:
# if msg == SHUTDOWN:
# print(f'TRAINER GOT SHUTDOWN 2', flush=True)
must_shutdown = must_shutdown or msg == SHUTDOWN
if must_shutdown:
shared_arrs.close()
# TODO: empty queue before pushing SHUTDOWN
# print(f'TRAINER SENDS SHUTDOWN CONFIRMATION', flush=True)
model_queue.put(SHUTDOWN)
# print(f'TRAINER FINISHED SEND SHUTDOWN CONFIRMATION', flush=True)
model_queue.close()
# model_queue.join_thread() # TODO: enable this again
# print(f'TRAINER BYE BYE', flush=True)
break

shm_id, size = msg
Expand All @@ -74,13 +86,17 @@ def rf_training_loop(
rf.options = rf_opts
rf.fit(data, rng)

# print(f'TRAINER FINISHED TRAINING', flush=True)

# remove previous models from queue, if any, before pushing the latest model
while True:
try:
_ = model_queue.get(block=False)
except queue.Empty:
break
# print(f'TRAINER SENDING MODEL', flush=True)
model_queue.put(rf)
# print(f'TRAINER SENDING MODEL DONE', flush=True)


class RFTrainer:
Expand Down Expand Up @@ -114,42 +130,57 @@ def open(self,
self.shared_arrs = GrowingSharedArray()
self.model_queue = Queue(maxsize=1)
self.data_queue = Queue(maxsize=1)
self.training_loop_proc = Process(daemon=True, target=rf_training_loop, name='rf_trainer', args=(
self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping,
max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes, n_points_per_tree
))
self.training_loop_proc = Process(
target=rf_training_loop, daemon=True, name='rf_trainer',
args=(self.model_queue, self.data_queue, self.shared_arrs.lock, tuple(bounds), seed, n_trees, bootstrapping,
max_features, min_samples_split, min_samples_leaf, max_depth, eps_purity, max_nodes,
n_points_per_tree)
)
self.training_loop_proc.start()

def close(self):
# I think this might be redundant, since according to the official docs, close and join_thread are called
# anyway when garbage-collecting queues, and we don't use JoinableQueues
# send kill signal to training process
if self.data_queue is not None:
if self.training_loop_proc is not None:
self.data_queue.put(SHUTDOWN)
# print('MAIN SEND SHUTDOWN', flush=True)
self.send_to_training_loop_proc(SHUTDOWN)
# print('MAIN FINISHED SEND SHUTDOWN', flush=True)
# make sure the shutdown message is flush before moving on
self.data_queue.close()
self.data_queue.join_thread()
del self.data_queue
self.data_queue = None

if self.training_loop_proc is not None:
# wait for training to finish
self.training_loop_proc.join() # TODO: fix: this happens to hang
del self.training_loop_proc
self.training_loop_proc = None

if self.model_queue is not None:
# wait till the training process died
if self.model_queue is not None and self.training_loop_proc is not None and self.training_loop_proc.is_alive():
# flush the model queue, and store the latest model
while True:
# print('MAIN WAIT SHUTDOWN CONFIRM', flush=True)
msg = self.model_queue.get()
# print(f'MAIN RECEIVED {"SHUTDOWN CONFIRMATION" if msg == SHUTDOWN else msg} '
# f'AFTER WAITING FOR SHUTDOWN CONFIRMATION', flush=True)
# wait for SHUTDOWN message, because that guarantees that shared_arrs.close() has been called within
# the training process; this way we make sure we call unlink only after close has had the chance to be
# called within the child process
if msg == SHUTDOWN:
break
else:
self._model = msg
self.model_queue.close()
self.model_queue.join_thread()

if self.training_loop_proc is not None:
# wait for training to finish
if self.training_loop_proc.is_alive():
self.training_loop_proc.join()
del self.training_loop_proc
self.training_loop_proc = None

# I think this might be redundant, since according to the official docs, close and join_thread are called
# anyway when garbage-collecting queues, and we don't use JoinableQueues
if self.data_queue is not None:
del self.data_queue
self.data_queue = None

if self.model_queue is not None:
# self.model_queue.close()
# self.model_queue.join_thread()
del self.model_queue
self.model_queue = None

Expand Down Expand Up @@ -189,21 +220,24 @@ def model(self) -> BinaryForest:
self._model = msg
return self._model

def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]):
self.shared_arrs.set_data(X, y)

if self.data_queue is None:
raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data')

# flush queue before pushing new data onto it
def send_to_training_loop_proc(self, data_info: Union[tuple[int, int], type[SHUTDOWN]]):
# empty queue before pushing new data onto it
while True:
try:
old_data = self.data_queue.get(block=False)
except queue.Empty:
break
else:
assert old_data != SHUTDOWN
self.data_queue.put((self.shared_arrs.shm_id, len(X)))
self.data_queue.put(data_info)

def submit_for_training(self, X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]):
self.shared_arrs.set_data(X, y)

if self.data_queue is None:
raise RuntimeError('rf training loop process has been stopped, so we cannot submit new training data')

self.send_to_training_loop_proc((self.shared_arrs.shm_id, len(X)))

if self.sync:
self._model = self.model_queue.get()
5 changes: 4 additions & 1 deletion smac/model/random_forest/random_forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ def __init__(
# self._seed,
# ]

def __del__(self):
def close(self):
self._rf_trainer.close()

def __del__(self):
self.close()

@property
def meta(self) -> dict[str, Any]: # noqa: D102
meta = super().meta
Expand Down
Loading

0 comments on commit f05d0bf

Please sign in to comment.