From aac676845ccb09a919f73a37ffb12a75231c39c6 Mon Sep 17 00:00:00 2001 From: Theodoros Katzalis Date: Mon, 9 Sep 2024 14:38:53 +0200 Subject: [PATCH 1/3] Close all sessions and free resources when a test completes Currently pytest hangs if a test fails, because resources are not cleaned up. Use a fixture to auto-clean during the teardown of the test. --- .../test_grpc/test_inference_servicer.py | 33 ++++++------------- tiktorch/server/device_pool.py | 32 ++++++++++++------ tiktorch/server/grpc/inference_servicer.py | 9 +++++ tiktorch/server/session_manager.py | 5 +++ 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/tests/test_server/test_grpc/test_inference_servicer.py b/tests/test_server/test_grpc/test_inference_servicer.py index 864c4d6a..c61f0710 100644 --- a/tests/test_server/test_grpc/test_inference_servicer.py +++ b/tests/test_server/test_grpc/test_inference_servicer.py @@ -27,6 +27,12 @@ def grpc_servicer(data_store): return inference_servicer.InferenceServicer(TorchDevicePool(), SessionManager(), data_store) +@pytest.fixture(autouse=True) +def clean(grpc_servicer): + yield + grpc_servicer.close_all_sessions() + + @pytest.fixture(scope="module") def grpc_stub_cls(grpc_channel): return inference_pb2_grpc.InferenceStub @@ -47,7 +53,6 @@ def method_requiring_session(self, request, grpc_stub): def test_model_session_creation(self, grpc_stub, bioimageio_model_bytes): model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes)) assert model.id - grpc_stub.CloseModelSession(model) def test_model_session_creation_using_upload_id(self, grpc_stub, data_store, bioimageio_dummy_explicit_model_bytes): id_ = data_store.put(bioimageio_dummy_explicit_model_bytes.getvalue()) @@ -55,7 +60,6 @@ def test_model_session_creation_using_upload_id(self, grpc_stub, data_store, bio rq = inference_pb2.CreateModelSessionRequest(model_uri=f"upload://{id_}", deviceIds=["cpu"]) model = grpc_stub.CreateModelSession(rq) assert model.id - grpc_stub.CloseModelSession(model) def test_model_session_creation_using_random_uri(self, grpc_stub): rq = inference_pb2.CreateModelSessionRequest(model_uri="randomSchema://", deviceIds=["cpu"]) @@ -92,36 +96,28 @@ def test_if_model_create_fails_devices_are_released(self, grpc_stub): model_blob=inference_pb2.Blob(content=b""), deviceIds=["cpu"] ) - model = None with pytest.raises(Exception): - model = grpc_stub.CreateModelSession(model_req) + grpc_stub.CreateModelSession(model_req) device_by_id = self._query_devices(grpc_stub) assert "cpu" in device_by_id assert inference_pb2.Device.Status.AVAILABLE == device_by_id["cpu"].status - if model: - grpc_stub.CloseModelSession(model) - def test_use_device(self, grpc_stub, bioimageio_model_bytes): device_by_id = self._query_devices(grpc_stub) assert "cpu" in device_by_id assert inference_pb2.Device.Status.AVAILABLE == device_by_id["cpu"].status - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) + grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) device_by_id = self._query_devices(grpc_stub) assert "cpu" in device_by_id assert inference_pb2.Device.Status.IN_USE == device_by_id["cpu"].status - grpc_stub.CloseModelSession(model) - def test_using_same_device_fails(self, grpc_stub, bioimageio_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) + grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) with pytest.raises(grpc.RpcError): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) - - grpc_stub.CloseModelSession(model) + grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) def test_closing_session_releases_devices(self, grpc_stub, bioimageio_model_bytes): model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) @@ -163,8 +159,6 @@ def test_call_predict_valid_explicit(self, grpc_stub, bioimageio_dummy_explicit_ input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)] res = grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - grpc_stub.CloseModelSession(model) - assert len(res.tensors) == 1 assert res.tensors[0].tensorId == output_tensor_id assert_array_equal(expected, converters.pb_tensor_to_numpy(res.tensors[0])) @@ -175,7 +169,6 @@ def test_call_predict_invalid_shape_explicit(self, grpc_stub, bioimageio_dummy_e input_tensors = [converters.xarray_to_pb_tensor("input", arr)] with pytest.raises(grpc.RpcError): grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - grpc_stub.CloseModelSession(model) @pytest.mark.parametrize( "shape", @@ -187,7 +180,6 @@ def test_call_predict_invalid_shape_parameterized(self, grpc_stub, shape, bioima input_tensors = [converters.xarray_to_pb_tensor("param", arr)] with pytest.raises(grpc.RpcError): grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - grpc_stub.CloseModelSession(model) def test_call_predict_invalid_tensor_ids(self, grpc_stub, bioimageio_dummy_model): model_bytes, _ = bioimageio_dummy_model @@ -197,7 +189,6 @@ def test_call_predict_invalid_tensor_ids(self, grpc_stub, bioimageio_dummy_model with pytest.raises(grpc.RpcError) as error: grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) assert error.value.details().startswith("Exception calling application: Spec invalidTensorName doesn't exist") - grpc_stub.CloseModelSession(model) def test_call_predict_invalid_axes(self, grpc_stub, bioimageio_dummy_model): model_bytes, tensor_id = bioimageio_dummy_model @@ -207,7 +198,6 @@ def test_call_predict_invalid_axes(self, grpc_stub, bioimageio_dummy_model): with pytest.raises(grpc.RpcError) as error: grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) assert error.value.details().startswith("Exception calling application: Incompatible axes") - grpc_stub.CloseModelSession(model) @pytest.mark.parametrize("shape", [(1, 1, 64, 64), (1, 1, 66, 65), (1, 1, 68, 66), (1, 1, 70, 67)]) def test_call_predict_valid_shape_parameterized(self, grpc_stub, shape, bioimageio_dummy_param_model_bytes): @@ -215,7 +205,6 @@ def test_call_predict_valid_shape_parameterized(self, grpc_stub, shape, bioimage arr = xr.DataArray(np.arange(np.prod(shape)).reshape(*shape), dims=("b", "c", "x", "y")) input_tensors = [converters.xarray_to_pb_tensor("param", arr)] grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - grpc_stub.CloseModelSession(model) @pytest.mark.skip def test_call_predict_tf(self, grpc_stub, bioimageio_dummy_tensorflow_model_bytes): @@ -227,8 +216,6 @@ def test_call_predict_tf(self, grpc_stub, bioimageio_dummy_tensorflow_model_byte input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)] res = grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - grpc_stub.CloseModelSession(model) - assert len(res.tensors) == 1 assert res.tensors[0].tensorId == output_tensor_id assert_array_equal(expected, converters.pb_tensor_to_numpy(res.tensors[0])) diff --git a/tiktorch/server/device_pool.py b/tiktorch/server/device_pool.py index 4fee295e..610ef8d0 100644 --- a/tiktorch/server/device_pool.py +++ b/tiktorch/server/device_pool.py @@ -71,10 +71,22 @@ def cuda_version(self) -> Optional[str]: @abc.abstractmethod def list_devices(self) -> List[IDevice]: """ - List devices available on server + List devices on server """ ... + def list_available_devices(self) -> List[IDevice]: + """ + List available devices on server + """ + return [device for device in self.list_devices() if device.status == DeviceStatus.AVAILABLE] + + def list_reserved_devices(self) -> List[IDevice]: + """ + List reserved devices on server + """ + return [device for device in self.list_devices() if device.status == DeviceStatus.IN_USE] + @abc.abstractmethod def lease(self, device_ids: List[str]) -> ILease: """ @@ -116,8 +128,8 @@ def terminate(self) -> None: class TorchDevicePool(IDevicePool): def __init__(self): - self.__lease_id_by_device_id = {} - self.__device_ids_by_lease_id = defaultdict(list) + self.__device_id_to_lease_id = {} + self.__lease_id_to_device_ids = defaultdict(list) self.__lock = threading.Lock() @property @@ -142,7 +154,7 @@ def list_devices(self) -> List[IDevice]: devices: List[IDevice] = [] for id_ in ids: status = DeviceStatus.AVAILABLE - if id_ in self.__lease_id_by_device_id: + if id_ in self.__device_id_to_lease_id: status = DeviceStatus.IN_USE devices.append(_Device(id_=id_, status=status)) @@ -156,21 +168,21 @@ def lease(self, device_ids: List[str]) -> ILease: with self.__lock: lease_id = uuid.uuid4().hex for dev_id in device_ids: - if dev_id in self.__lease_id_by_device_id: + if dev_id in self.__device_id_to_lease_id: raise Exception(f"Device {dev_id} is already in use") for dev_id in device_ids: - self.__lease_id_by_device_id[dev_id] = lease_id - self.__device_ids_by_lease_id[lease_id].append(dev_id) + self.__device_id_to_lease_id[dev_id] = lease_id + self.__lease_id_to_device_ids[lease_id].append(dev_id) return _Lease(self, id_=lease_id) def _get_lease_devices(self, lease_id: str) -> List[IDevice]: - return [_Device(id_=dev_id, status=DeviceStatus.IN_USE) for dev_id in self.__device_ids_by_lease_id[lease_id]] + return [_Device(id_=dev_id, status=DeviceStatus.IN_USE) for dev_id in self.__lease_id_to_device_ids[lease_id]] def _release_devices(self, lease_id: str) -> None: with self.__lock: - dev_ids = self.__device_ids_by_lease_id.pop(lease_id, []) + dev_ids = self.__lease_id_to_device_ids.pop(lease_id, []) for id_ in dev_ids: - del self.__lease_id_by_device_id[id_] + del self.__device_id_to_lease_id[id_] diff --git a/tiktorch/server/grpc/inference_servicer.py b/tiktorch/server/grpc/inference_servicer.py index f09e0bae..987bb257 100644 --- a/tiktorch/server/grpc/inference_servicer.py +++ b/tiktorch/server/grpc/inference_servicer.py @@ -53,6 +53,15 @@ def CloseModelSession(self, request: inference_pb2.ModelSession, context) -> inf self.__session_manager.close_session(request.id) return inference_pb2.Empty() + def close_all_sessions(self): + """ + Not exposed by the API + + Close all sessions ensuring that all devices are not leased + """ + self.__session_manager.close_all_sessions() + assert len(self.__device_pool.list_reserved_devices()) == 0 + def GetLogs(self, request: inference_pb2.Empty, context): yield inference_pb2.LogEntry( timestamp=int(time.time()), level=inference_pb2.LogEntry.Level.INFO, content="Sending model logs" diff --git a/tiktorch/server/session_manager.py b/tiktorch/server/session_manager.py index 3807e130..6def4a4a 100644 --- a/tiktorch/server/session_manager.py +++ b/tiktorch/server/session_manager.py @@ -82,6 +82,11 @@ def close_session(self, session_id: str) -> None: logger.debug("Closed session %s", session_id) + def close_all_sessions(self): + all_ids = tuple(self.__session_by_id.keys()) + for session_id in all_ids: + self.close_session(session_id) + def __init__(self) -> None: self.__lock = threading.Lock() self.__session_by_id: Dict[str, Session] = {} From 5e75d2227133eadc13915d3d97719719273e0170 Mon Sep 17 00:00:00 2001 From: Theodoros Katzalis Date: Mon, 9 Sep 2024 14:42:53 +0200 Subject: [PATCH 2/3] Close connection and thread when model session exits Currently when creating a pipe, the server doesn't close the connection nor the created thread when the listening loop exits. This results to EOFError as we can see in the output when running the tests. --- tiktorch/rpc/mp.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/tiktorch/rpc/mp.py b/tiktorch/rpc/mp.py index 0cd69bf4..992ed761 100644 --- a/tiktorch/rpc/mp.py +++ b/tiktorch/rpc/mp.py @@ -270,7 +270,8 @@ def __init__(self, api, conn: Connection): self._logger = None self._conn = conn self._results_queue = queue.Queue() - self._start_result_sender(conn) + self._start_result_sender = threading.Thread(target=self._sender, name="MPResultSender") + self._start_result_sender.start() @property def logger(self): @@ -281,20 +282,17 @@ def logger(self): def _send(self, msg): self._results_queue.put(msg) - def _start_result_sender(self, conn): - def _sender(): - while True: - try: - result = self._results_queue.get() - if result is self._sentinel: - break - - conn.send(result) - except Exception: - self.logger.exception("Error in result sender") + def _sender(self): + while True: + try: + result = self._results_queue.get() + if result is self._sentinel: + self.logger.debug("[MPServer] result sender thread stopped") + break - t = threading.Thread(target=_sender, name="MPResultSender") - t.start() + self._conn.send(result) + except Exception: + self.logger.exception("Error in result sender") def _send_result(self, fut): if fut.cancelled(): @@ -317,7 +315,7 @@ def _make_future(self): return f def _call_method(self, call: MethodCall): - self.logger.debug("[id: %s] Recieved '%s' method call", call.id, call.method_name) + self.logger.debug("[id: %s] Received '%s' method call", call.id, call.method_name) fut = self._make_future() self._futures.put(call.id, fut) @@ -341,7 +339,7 @@ def _call_method(self, call: MethodCall): return fut def _cancel_request(self, cancel: Cancellation): - self.logger.debug("[id: %s] Recieved cancel request", cancel.id) + self.logger.debug("[id: %s] Received cancel request", cancel.id) fut = self._futures.pop_id(cancel.id) if fut: fut.cancel() @@ -373,3 +371,7 @@ def listen(self): break except Exception: self.logger.error("Error in main loop", exc_info=1) + + self._start_result_sender.join() + self._conn.close() + self.logger.debug("Closing connection") From a0d5695967ac9845629372945c1417815424f18a Mon Sep 17 00:00:00 2001 From: Theodoros Katzalis Date: Thu, 5 Sep 2024 10:03:30 +0200 Subject: [PATCH 3/3] Fix Makefile - remove cleaning build files, `pip install -e` should be used - `pip install -e` doesn't work with multiple files, use a for loop to install the submodules --- .circleci/config.yml | 30 +++++++++++++----------------- Makefile | 24 +++++++++--------------- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 75a82c6e..cbba3f39 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,7 +2,7 @@ version: 2 jobs: checkout_code: docker: - - image: condaforge/mambaforge + - image: condaforge/miniforge3 working_directory: ~/repo steps: - checkout @@ -18,7 +18,7 @@ jobs: TIKTORCH_ENV_NAME: tiktorch-server-env TIKTORCH_ENV_PREFIX: /opt/conda/envs/tiktorch-server-env docker: - - image: condaforge/mambaforge + - image: condaforge/miniforge3 working_directory: ~/repo steps: - restore_cache: @@ -26,25 +26,21 @@ jobs: - v1-repo-{{ .Environment.CIRCLE_SHA1 }} - restore_cache: keys: - - v11-dependencies-{{ checksum "environment.yml" }} - + - v11-dependencies-{{ .Environment.CIRCLE_SHA1 }} - run: conda config --set channel_priority strict - - run: mamba update -n base -c conda-forge --update-all - - run: mamba install -c conda-forge conda-build make boa + - run: conda update -n base -c conda-forge --update-all + - run: conda install -c conda-forge conda-build make boa - run: | - if [ ! -d ${TIKTORCH_ENV_PREFIX} ]; then echo "Creating new environment ${TIKTORCH_ENV_NAME}" make devenv - fi - - save_cache: paths: - /opt/conda/envs - key: v11-dependencies-{{ checksum "environment.yml" }} + key: v11-dependencies-{{ .Environment.CIRCLE_SHA1 }} pre_commit_check: docker: - - image: condaforge/mambaforge + - image: condaforge/miniforge3 working_directory: ~/repo steps: - restore_cache: @@ -52,7 +48,7 @@ jobs: - v1-repo-{{ .Environment.CIRCLE_SHA1 }} - restore_cache: keys: - - v11-dependencies-{{ checksum "environment.yml" }} + - v11-dependencies-{{ .Environment.CIRCLE_SHA1 }} - run: name: run pre-commit @@ -63,7 +59,7 @@ jobs: tests: docker: - - image: condaforge/mambaforge + - image: condaforge/miniforge3 working_directory: ~/repo steps: - restore_cache: @@ -71,7 +67,7 @@ jobs: - v1-repo-{{ .Environment.CIRCLE_SHA1 }} - restore_cache: keys: - - v11-dependencies-{{ checksum "environment.yml" }} + - v11-dependencies-{{ .Environment.CIRCLE_SHA1 }} - run: name: run tests @@ -83,15 +79,15 @@ jobs: build_conda_packages: docker: - - image: condaforge/mambaforge + - image: condaforge/miniforge3 working_directory: ~/repo steps: - restore_cache: keys: - v1-repo-{{ .Environment.CIRCLE_SHA1 }} - - run: mamba config --set channel_priority strict - - run: mamba install -c conda-forge conda-build anaconda-client boa + - run: conda config --set channel_priority strict + - run: conda install -c conda-forge conda-build anaconda-client boa - run: name: build packages command: | diff --git a/Makefile b/Makefile index f35813ed..d7e516d7 100644 --- a/Makefile +++ b/Makefile @@ -1,18 +1,7 @@ SHELL=/bin/bash ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST)))) TIKTORCH_ENV_NAME ?= tiktorch-server-env - -sample_model: - cd tests/data/dummy && zip -r $(ROOT_DIR)/dummy.tmodel ./* - -unet2d: - cd tests/data/unet2d && zip -r $(ROOT_DIR)/unet2d.tmodel ./* - -unet2d_onnx: - cd tests/data/unet2d_onnx && zip -r $(ROOT_DIR)/onnx.tmodel ./* - -dummy_tf: - cd tests/data/dummy_tensorflow && zip -r $(ROOT_DIR)/dummy_tf.tmodel ./* +SUBMODULES = ./vendor/core-bioimage-io-python ./vendor/spec-bioimage-io protos: python -m grpc_tools.protoc -I./proto --python_out=tiktorch/proto/ --grpc_python_out=tiktorch/proto/ ./proto/*.proto @@ -21,18 +10,23 @@ protos: version: python -c "import sys; print(sys.version)" - devenv: . $$(conda info --base)/etc/profile.d/conda.sh mamba env create --file environment.yml --name $(TIKTORCH_ENV_NAME) - conda run -n $(TIKTORCH_ENV_NAME) pip install . ./vendor/core-bioimage-io-python ./vendor/spec-bioimage-io + make install_submodules run_server: . $$(conda info --base)/etc/profile.d/conda.sh; conda activate $(TIKTORCH_ENV_NAME); python -m tiktorch.server +install_submodules: + @echo "Installing submodules $(SUBMODULES)" + @for package in $(SUBMODULES) ; do \ + echo $$package ; \ + conda run -n $(TIKTORCH_ENV_NAME) pip install -e $$package ; \ + done remove_devenv: conda env remove --yes --name $(TIKTORCH_ENV_NAME) -.PHONY: protos version sample_model devenv remove_devenv dummy_tf +.PHONY: *