From 36c4bec96a643d6fa0b616848071198ff62b2a68 Mon Sep 17 00:00:00 2001 From: Pawel Ziecina Date: Thu, 9 Nov 2023 23:56:20 +0000 Subject: [PATCH] Check if handshake finished before term zmq ctx --- pytriton/models/model.py | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/pytriton/models/model.py b/pytriton/models/model.py index a8f000e..365200d 100644 --- a/pytriton/models/model.py +++ b/pytriton/models/model.py @@ -121,6 +121,8 @@ def __init__( ipc_socket_path = self._workspace.path / f"ipc_proxy_backend_{model_name}" self._shared_memory_socket = f"ipc://{ipc_socket_path.as_posix()}" self._data_store_socket = self._workspace.path / "data_store.sock" + self._handshake_thread: Optional[threading.Thread] = None + self._shutdown_event = threading.Event() self._triton_model_config: Optional[TritonModelConfig] = None self._model_events_observers: typing.List[ModelEventsHandler] = [] @@ -204,11 +206,16 @@ def setup(self) -> None: inference_handler.on_proxy_backend_event(self._on_proxy_backend_event) inference_handler.start() self._inference_handlers.append(inference_handler) - handshake_th = threading.Thread(target=self._model_proxy_handshake, daemon=True) - handshake_th.start() + self._handshake_thread = threading.Thread(target=self._model_proxy_handshake, daemon=True) + self._handshake_thread.start() def clean(self) -> None: """Post unload actions to perform on model.""" + self._shutdown_event.set() + if self._handshake_thread is not None: + self._handshake_thread.join() + self._handshake_thread = None + with self._observers_lock: LOGGER.debug("Clearing model events observers") self._model_events_observers.clear() @@ -284,18 +291,23 @@ def _get_triton_model_config(self) -> TritonModelConfig: def _model_proxy_handshake(self) -> None: socket = self.zmq_context.socket(zmq.REP) socket.bind(self._shared_memory_socket) - try: for i in range(len(self.infer_functions)): - socket.recv() - authkey = multiprocessing.current_process().authkey - instance_data = { - "shared-memory-socket": f"{self._shared_memory_socket}_{i}", - "data-store-socket": self._data_store_socket.as_posix(), - "auth-key": base64.b64encode(authkey).decode("utf-8"), - } - json_payload = json.dumps(instance_data) - socket.send_string(json_payload) + while not self._shutdown_event.is_set(): + ready_to_read, _, _ = zmq.select([socket], [], [], 0.1) + if not ready_to_read: + continue + + socket.recv() + authkey = multiprocessing.current_process().authkey + instance_data = { + "shared-memory-socket": f"{self._shared_memory_socket}_{i}", + "data-store-socket": self._data_store_socket.as_posix(), + "auth-key": base64.b64encode(authkey).decode("utf-8"), + } + json_payload = json.dumps(instance_data) + socket.send_string(json_payload) + break except Exception as exception: LOGGER.error("Internal proxy backend error. It will be closed.") LOGGER.exception(exception)