Skip to content

Commit

Permalink
Check if handshake finished before term zmq ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
pziecina-nv committed Nov 10, 2023
1 parent 0bc9ea5 commit 36c4bec
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions pytriton/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def __init__(
ipc_socket_path = self._workspace.path / f"ipc_proxy_backend_{model_name}"
self._shared_memory_socket = f"ipc://{ipc_socket_path.as_posix()}"
self._data_store_socket = self._workspace.path / "data_store.sock"
self._handshake_thread: Optional[threading.Thread] = None
self._shutdown_event = threading.Event()
self._triton_model_config: Optional[TritonModelConfig] = None
self._model_events_observers: typing.List[ModelEventsHandler] = []

Expand Down Expand Up @@ -204,11 +206,16 @@ def setup(self) -> None:
inference_handler.on_proxy_backend_event(self._on_proxy_backend_event)
inference_handler.start()
self._inference_handlers.append(inference_handler)
handshake_th = threading.Thread(target=self._model_proxy_handshake, daemon=True)
handshake_th.start()
self._handshake_thread = threading.Thread(target=self._model_proxy_handshake, daemon=True)
self._handshake_thread.start()

def clean(self) -> None:
"""Post unload actions to perform on model."""
self._shutdown_event.set()
if self._handshake_thread is not None:
self._handshake_thread.join()
self._handshake_thread = None

with self._observers_lock:
LOGGER.debug("Clearing model events observers")
self._model_events_observers.clear()
Expand Down Expand Up @@ -284,18 +291,23 @@ def _get_triton_model_config(self) -> TritonModelConfig:
def _model_proxy_handshake(self) -> None:
socket = self.zmq_context.socket(zmq.REP)
socket.bind(self._shared_memory_socket)

try:
for i in range(len(self.infer_functions)):
socket.recv()
authkey = multiprocessing.current_process().authkey
instance_data = {
"shared-memory-socket": f"{self._shared_memory_socket}_{i}",
"data-store-socket": self._data_store_socket.as_posix(),
"auth-key": base64.b64encode(authkey).decode("utf-8"),
}
json_payload = json.dumps(instance_data)
socket.send_string(json_payload)
while not self._shutdown_event.is_set():
ready_to_read, _, _ = zmq.select([socket], [], [], 0.1)
if not ready_to_read:
continue

socket.recv()
authkey = multiprocessing.current_process().authkey
instance_data = {
"shared-memory-socket": f"{self._shared_memory_socket}_{i}",
"data-store-socket": self._data_store_socket.as_posix(),
"auth-key": base64.b64encode(authkey).decode("utf-8"),
}
json_payload = json.dumps(instance_data)
socket.send_string(json_payload)
break
except Exception as exception:
LOGGER.error("Internal proxy backend error. It will be closed.")
LOGGER.exception(exception)
Expand Down

0 comments on commit 36c4bec

Please sign in to comment.