Skip to content

Commit

Permalink
Merge pull request #218 from thodkatz/fix-eof
Browse files Browse the repository at this point in the history
Proper test cleanup and resolve EOF Error
  • Loading branch information
thodkatz authored Sep 12, 2024
2 parents cdb2c4a + a0d5695 commit 3eb9148
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 81 deletions.
30 changes: 13 additions & 17 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
checkout_code:
docker:
- image: condaforge/mambaforge
- image: condaforge/miniforge3
working_directory: ~/repo
steps:
- checkout
Expand All @@ -18,41 +18,37 @@ jobs:
TIKTORCH_ENV_NAME: tiktorch-server-env
TIKTORCH_ENV_PREFIX: /opt/conda/envs/tiktorch-server-env
docker:
- image: condaforge/mambaforge
- image: condaforge/miniforge3
working_directory: ~/repo
steps:
- restore_cache:
keys:
- v1-repo-{{ .Environment.CIRCLE_SHA1 }}
- restore_cache:
keys:
- v11-dependencies-{{ checksum "environment.yml" }}

- v11-dependencies-{{ .Environment.CIRCLE_SHA1 }}
- run: conda config --set channel_priority strict
- run: mamba update -n base -c conda-forge --update-all
- run: mamba install -c conda-forge conda-build make boa
- run: conda update -n base -c conda-forge --update-all
- run: conda install -c conda-forge conda-build make boa
- run: |
if [ ! -d ${TIKTORCH_ENV_PREFIX} ]; then
echo "Creating new environment ${TIKTORCH_ENV_NAME}"
make devenv
fi
- save_cache:
paths:
- /opt/conda/envs
key: v11-dependencies-{{ checksum "environment.yml" }}
key: v11-dependencies-{{ .Environment.CIRCLE_SHA1 }}

pre_commit_check:
docker:
- image: condaforge/mambaforge
- image: condaforge/miniforge3
working_directory: ~/repo
steps:
- restore_cache:
keys:
- v1-repo-{{ .Environment.CIRCLE_SHA1 }}
- restore_cache:
keys:
- v11-dependencies-{{ checksum "environment.yml" }}
- v11-dependencies-{{ .Environment.CIRCLE_SHA1 }}

- run:
name: run pre-commit
Expand All @@ -63,15 +59,15 @@ jobs:
tests:
docker:
- image: condaforge/mambaforge
- image: condaforge/miniforge3
working_directory: ~/repo
steps:
- restore_cache:
keys:
- v1-repo-{{ .Environment.CIRCLE_SHA1 }}
- restore_cache:
keys:
- v11-dependencies-{{ checksum "environment.yml" }}
- v11-dependencies-{{ .Environment.CIRCLE_SHA1 }}

- run:
name: run tests
Expand All @@ -83,15 +79,15 @@ jobs:
build_conda_packages:
docker:
- image: condaforge/mambaforge
- image: condaforge/miniforge3
working_directory: ~/repo
steps:
- restore_cache:
keys:
- v1-repo-{{ .Environment.CIRCLE_SHA1 }}

- run: mamba config --set channel_priority strict
- run: mamba install -c conda-forge conda-build anaconda-client boa
- run: conda config --set channel_priority strict
- run: conda install -c conda-forge conda-build anaconda-client boa
- run:
name: build packages
command: |
Expand Down
24 changes: 9 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
SHELL=/bin/bash
ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))
TIKTORCH_ENV_NAME ?= tiktorch-server-env

sample_model:
cd tests/data/dummy && zip -r $(ROOT_DIR)/dummy.tmodel ./*

unet2d:
cd tests/data/unet2d && zip -r $(ROOT_DIR)/unet2d.tmodel ./*

unet2d_onnx:
cd tests/data/unet2d_onnx && zip -r $(ROOT_DIR)/onnx.tmodel ./*

dummy_tf:
cd tests/data/dummy_tensorflow && zip -r $(ROOT_DIR)/dummy_tf.tmodel ./*
SUBMODULES = ./vendor/core-bioimage-io-python ./vendor/spec-bioimage-io

protos:
python -m grpc_tools.protoc -I./proto --python_out=tiktorch/proto/ --grpc_python_out=tiktorch/proto/ ./proto/*.proto
Expand All @@ -21,18 +10,23 @@ protos:
version:
python -c "import sys; print(sys.version)"


devenv:
. $$(conda info --base)/etc/profile.d/conda.sh
mamba env create --file environment.yml --name $(TIKTORCH_ENV_NAME)
conda run -n $(TIKTORCH_ENV_NAME) pip install . ./vendor/core-bioimage-io-python ./vendor/spec-bioimage-io
make install_submodules

run_server:
. $$(conda info --base)/etc/profile.d/conda.sh; conda activate $(TIKTORCH_ENV_NAME); python -m tiktorch.server

install_submodules:
@echo "Installing submodules $(SUBMODULES)"
@for package in $(SUBMODULES) ; do \
echo $$package ; \
conda run -n $(TIKTORCH_ENV_NAME) pip install -e $$package ; \
done

remove_devenv:
conda env remove --yes --name $(TIKTORCH_ENV_NAME)


.PHONY: protos version sample_model devenv remove_devenv dummy_tf
.PHONY: *
33 changes: 10 additions & 23 deletions tests/test_server/test_grpc/test_inference_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ def grpc_servicer(data_store):
return inference_servicer.InferenceServicer(TorchDevicePool(), SessionManager(), data_store)


@pytest.fixture(autouse=True)
def clean(grpc_servicer):
yield
grpc_servicer.close_all_sessions()


@pytest.fixture(scope="module")
def grpc_stub_cls(grpc_channel):
return inference_pb2_grpc.InferenceStub
Expand All @@ -47,15 +53,13 @@ def method_requiring_session(self, request, grpc_stub):
def test_model_session_creation(self, grpc_stub, bioimageio_model_bytes):
model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes))
assert model.id
grpc_stub.CloseModelSession(model)

def test_model_session_creation_using_upload_id(self, grpc_stub, data_store, bioimageio_dummy_explicit_model_bytes):
id_ = data_store.put(bioimageio_dummy_explicit_model_bytes.getvalue())

rq = inference_pb2.CreateModelSessionRequest(model_uri=f"upload://{id_}", deviceIds=["cpu"])
model = grpc_stub.CreateModelSession(rq)
assert model.id
grpc_stub.CloseModelSession(model)

def test_model_session_creation_using_random_uri(self, grpc_stub):
rq = inference_pb2.CreateModelSessionRequest(model_uri="randomSchema://", deviceIds=["cpu"])
Expand Down Expand Up @@ -92,36 +96,28 @@ def test_if_model_create_fails_devices_are_released(self, grpc_stub):
model_blob=inference_pb2.Blob(content=b""), deviceIds=["cpu"]
)

model = None
with pytest.raises(Exception):
model = grpc_stub.CreateModelSession(model_req)
grpc_stub.CreateModelSession(model_req)

device_by_id = self._query_devices(grpc_stub)
assert "cpu" in device_by_id
assert inference_pb2.Device.Status.AVAILABLE == device_by_id["cpu"].status

if model:
grpc_stub.CloseModelSession(model)

def test_use_device(self, grpc_stub, bioimageio_model_bytes):
device_by_id = self._query_devices(grpc_stub)
assert "cpu" in device_by_id
assert inference_pb2.Device.Status.AVAILABLE == device_by_id["cpu"].status

model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"]))
grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"]))

device_by_id = self._query_devices(grpc_stub)
assert "cpu" in device_by_id
assert inference_pb2.Device.Status.IN_USE == device_by_id["cpu"].status

grpc_stub.CloseModelSession(model)

def test_using_same_device_fails(self, grpc_stub, bioimageio_model_bytes):
model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"]))
grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"]))
with pytest.raises(grpc.RpcError):
model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"]))

grpc_stub.CloseModelSession(model)
grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"]))

def test_closing_session_releases_devices(self, grpc_stub, bioimageio_model_bytes):
model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"]))
Expand Down Expand Up @@ -163,8 +159,6 @@ def test_call_predict_valid_explicit(self, grpc_stub, bioimageio_dummy_explicit_
input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)]
res = grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors))

grpc_stub.CloseModelSession(model)

assert len(res.tensors) == 1
assert res.tensors[0].tensorId == output_tensor_id
assert_array_equal(expected, converters.pb_tensor_to_numpy(res.tensors[0]))
Expand All @@ -175,7 +169,6 @@ def test_call_predict_invalid_shape_explicit(self, grpc_stub, bioimageio_dummy_e
input_tensors = [converters.xarray_to_pb_tensor("input", arr)]
with pytest.raises(grpc.RpcError):
grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors))
grpc_stub.CloseModelSession(model)

@pytest.mark.parametrize(
"shape",
Expand All @@ -187,7 +180,6 @@ def test_call_predict_invalid_shape_parameterized(self, grpc_stub, shape, bioima
input_tensors = [converters.xarray_to_pb_tensor("param", arr)]
with pytest.raises(grpc.RpcError):
grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors))
grpc_stub.CloseModelSession(model)

def test_call_predict_invalid_tensor_ids(self, grpc_stub, bioimageio_dummy_model):
model_bytes, _ = bioimageio_dummy_model
Expand All @@ -197,7 +189,6 @@ def test_call_predict_invalid_tensor_ids(self, grpc_stub, bioimageio_dummy_model
with pytest.raises(grpc.RpcError) as error:
grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors))
assert error.value.details().startswith("Exception calling application: Spec invalidTensorName doesn't exist")
grpc_stub.CloseModelSession(model)

def test_call_predict_invalid_axes(self, grpc_stub, bioimageio_dummy_model):
model_bytes, tensor_id = bioimageio_dummy_model
Expand All @@ -207,15 +198,13 @@ def test_call_predict_invalid_axes(self, grpc_stub, bioimageio_dummy_model):
with pytest.raises(grpc.RpcError) as error:
grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors))
assert error.value.details().startswith("Exception calling application: Incompatible axes")
grpc_stub.CloseModelSession(model)

@pytest.mark.parametrize("shape", [(1, 1, 64, 64), (1, 1, 66, 65), (1, 1, 68, 66), (1, 1, 70, 67)])
def test_call_predict_valid_shape_parameterized(self, grpc_stub, shape, bioimageio_dummy_param_model_bytes):
model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_dummy_param_model_bytes))
arr = xr.DataArray(np.arange(np.prod(shape)).reshape(*shape), dims=("b", "c", "x", "y"))
input_tensors = [converters.xarray_to_pb_tensor("param", arr)]
grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors))
grpc_stub.CloseModelSession(model)

@pytest.mark.skip
def test_call_predict_tf(self, grpc_stub, bioimageio_dummy_tensorflow_model_bytes):
Expand All @@ -227,8 +216,6 @@ def test_call_predict_tf(self, grpc_stub, bioimageio_dummy_tensorflow_model_byte
input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)]
res = grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors))

grpc_stub.CloseModelSession(model)

assert len(res.tensors) == 1
assert res.tensors[0].tensorId == output_tensor_id
assert_array_equal(expected, converters.pb_tensor_to_numpy(res.tensors[0]))
34 changes: 18 additions & 16 deletions tiktorch/rpc/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ def __init__(self, api, conn: Connection):
self._logger = None
self._conn = conn
self._results_queue = queue.Queue()
self._start_result_sender(conn)
self._start_result_sender = threading.Thread(target=self._sender, name="MPResultSender")
self._start_result_sender.start()

@property
def logger(self):
Expand All @@ -281,20 +282,17 @@ def logger(self):
def _send(self, msg):
self._results_queue.put(msg)

def _start_result_sender(self, conn):
def _sender():
while True:
try:
result = self._results_queue.get()
if result is self._sentinel:
break

conn.send(result)
except Exception:
self.logger.exception("Error in result sender")
def _sender(self):
while True:
try:
result = self._results_queue.get()
if result is self._sentinel:
self.logger.debug("[MPServer] result sender thread stopped")
break

t = threading.Thread(target=_sender, name="MPResultSender")
t.start()
self._conn.send(result)
except Exception:
self.logger.exception("Error in result sender")

def _send_result(self, fut):
if fut.cancelled():
Expand All @@ -317,7 +315,7 @@ def _make_future(self):
return f

def _call_method(self, call: MethodCall):
self.logger.debug("[id: %s] Recieved '%s' method call", call.id, call.method_name)
self.logger.debug("[id: %s] Received '%s' method call", call.id, call.method_name)
fut = self._make_future()
self._futures.put(call.id, fut)

Expand All @@ -341,7 +339,7 @@ def _call_method(self, call: MethodCall):
return fut

def _cancel_request(self, cancel: Cancellation):
self.logger.debug("[id: %s] Recieved cancel request", cancel.id)
self.logger.debug("[id: %s] Received cancel request", cancel.id)
fut = self._futures.pop_id(cancel.id)
if fut:
fut.cancel()
Expand Down Expand Up @@ -373,3 +371,7 @@ def listen(self):
break
except Exception:
self.logger.error("Error in main loop", exc_info=1)

self._start_result_sender.join()
self._conn.close()
self.logger.debug("Closing connection")
Loading

0 comments on commit 3eb9148

Please sign in to comment.