diff --git a/Dockerfile b/Dockerfile index 9a75164b..d2228a4f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ -#FROM python:3.10 AS hsds-base -FROM hdfgroup/hdf5lib:1.14.0 as hsds-base +FROM python:3.10 AS hsds-base +# FROM hdfgroup/hdf5lib:1.14.0 as hsds-base # Install Curl RUN apt-get update; apt-get -y install curl diff --git a/admin/config/config.yml b/admin/config/config.yml index 2d680467..4b1c6231 100755 --- a/admin/config/config.yml +++ b/admin/config/config.yml @@ -40,6 +40,7 @@ store_read_timeout: 1 # time to cancel storage read request if no response store_read_sleep_interval: 0.1 # time to sleep between checking on read request max_pending_write_requests: 20 # maxium number of inflight write requests flush_sleep_interval: 1 # time to wait between checking on dirty objects +flush_timeout: 10 # max time to wait on all I/O operations to complete for a flush min_chunk_size: 1m # 1 MB max_chunk_size: 4m # 4 MB max_request_size: 100m # 100 MB - should be no smaller than client_max_body_size in nginx tmpl (if using nginx) diff --git a/hsds/chunk_sn.py b/hsds/chunk_sn.py index cd102e46..14588d91 100755 --- a/hsds/chunk_sn.py +++ b/hsds/chunk_sn.py @@ -1040,7 +1040,7 @@ async def GET_Value(request): resp_body = await jsonResponse( resp, resp_json, ignore_nan=ignore_nan, body_only=True ) - log.debug(f"jsonResponse returned: {resp_body}") + log.debug(f"jsonResponse returned: {len(resp_body)} items") resp_body = resp_body.encode("utf-8") await resp.write(resp_body) await resp.write_eof() @@ -1296,7 +1296,7 @@ async def POST_Value(request): resp_body = await jsonResponse( resp, resp_json, ignore_nan=ignore_nan, body_only=True ) - log.debug(f"jsonResponse returned: {resp_body}") + log.debug(f"jsonResponse returned: {len(resp_body)} items") resp_body = resp_body.encode("utf-8") await resp.write(resp_body) except Exception as e: diff --git a/hsds/datanode_lib.py b/hsds/datanode_lib.py index d365ce2a..8cddd76c 100644 --- a/hsds/datanode_lib.py +++ b/hsds/datanode_lib.py @@ -992,6 +992,7 @@ async def get_chunk( # also note - get deflate and shuffle will update the deflate and # shuffle map so that the s3sync will do the right thing filter_ops = getFilterOps(app, dset_json, item_size) + log.debug(f"filter_ops: {filter_ops}") if s3path: try: @@ -1035,7 +1036,7 @@ async def get_chunk( break if chunk_arr is None: msg = f"s3 read for chunk {chunk_id} timed-out, " - msg += "initiaiting a new read" + msg += "initiating a new read" log.warn(msg) if chunk_arr is None: diff --git a/hsds/group_dn.py b/hsds/group_dn.py index a68eedea..a30107fb 100755 --- a/hsds/group_dn.py +++ b/hsds/group_dn.py @@ -145,7 +145,7 @@ async def PUT_Group(request): Used to flush all objects under a root group to S3 """ - flush_time_out = config.get("s3_sync_interval") * 2 + flush_timeout = config.get("flush_timeout", default=10) flush_sleep_interval = config.get("flush_sleep_interval") log.request(request) app = request.app @@ -186,7 +186,7 @@ async def PUT_Group(request): log.debug(f"flushop - waiting on {len(flush_set)} items") if len(flush_set) > 0: - while time.time() - flush_start < flush_time_out: + while time.time() - flush_start < flush_timeout: await asyncio.sleep(flush_sleep_interval) # wait a bit # check to see if the items in our flush set are still there remaining_set = set() @@ -210,7 +210,7 @@ async def PUT_Group(request): if len(flush_set) > 0: msg = f"flushop - {len(flush_set)} items not updated after " - msg += f"{flush_time_out} seconds" + msg += f"{flush_timeout} seconds" log.warn(msg) log.debug(f"flush set: {flush_set}") raise HTTPServiceUnavailable() diff --git a/hsds/util/dsetUtil.py b/hsds/util/dsetUtil.py index 363407bc..dd9a8500 100644 --- a/hsds/util/dsetUtil.py +++ b/hsds/util/dsetUtil.py @@ -127,12 +127,16 @@ def getShuffleFilter(dset_json): filters = getFilters(dset_json) FILTER_CLASSES = ("H5Z_FILTER_SHUFFLE", "H5Z_FILTER_BITSHUFFLE") for filter in filters: + log.debug(f"filter: {filter}") if "class" not in filter: log.warn(f"filter option: {filter} with no class key") continue filter_class = filter["class"] if filter_class in FILTER_CLASSES: + log.debug(f"found filter: {filter}") return filter + else: + log.warn(f"unexpected filter class: {filter_class}") log.debug("Shuffle filter not used") return None @@ -177,9 +181,11 @@ def getFilterOps(app, dset_json, item_size): else: filter_ops["level"] = int(compressionFilter["level"]) + if filter_ops: filter_ops["item_size"] = item_size log.debug(f"save filter ops: {filter_ops} for {dset_id}") filter_map[dset_id] = filter_ops # save + return filter_ops else: return None diff --git a/hsds/util/storUtil.py b/hsds/util/storUtil.py index 79f631d5..9226c6a0 100644 --- a/hsds/util/storUtil.py +++ b/hsds/util/storUtil.py @@ -98,7 +98,7 @@ def _shuffle(codec, data, item_size=4): shuffler = codecs.Shuffle(item_size) arr = shuffler.encode(data) elif codec == 2: - # bit shuffle, use bitshuffle packge + # bit shuffle, use bitshuffle package if isinstance(data, bytes): # bitshufle is expecting numpy array data = np.frombuffer(data, dtype=np.dtype("uint8")) @@ -353,15 +353,22 @@ async def getStorBytes(app, compressor = None if filter_ops: log.debug(f"getStorBytes for {key} with filter_ops: {filter_ops}") - if filter_ops.get("shuffle") == "shuffle": - shuffle = filter_ops["item_size"] - log.debug("using shuffle filter") - elif filter_ops.get("shuffle") == "bitshuffle": - shuffle = 2 - log.debug("using bitshuffle filter") + if "shuffle" in filter_ops: + shuffle = filter_ops["shuffle"] + if shuffle == 1: + log.debug("using shuffle filter") + elif shuffle == 2: + log.debug("using bitshuffle filter") + else: + log.debug("no shuffle filter") + else: + log.debug("shuffle filter not set in filter_ops") + if "compressor" in filter_ops: compressor = filter_ops["compressor"] log.debug(f"using compressor: {compressor}") + else: + log.debug("compressor not set in filter ops") item_size = filter_ops["item_size"] kwargs = {"bucket": bucket, "key": key, "offset": offset, "length": length} diff --git a/pyproject.toml b/pyproject.toml index cc362a8f..e693e13c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "bitshuffle", "botocore", "cryptography", + "h5py >= 3.6.0", "numcodecs", "numpy", "psutil", @@ -50,6 +51,7 @@ dependencies = [ "pyyaml", "requests-unixsocket", "simplejson", + "s3fs", ] [project.optional-dependencies] diff --git a/tests/integ/acl_test.py b/tests/integ/acl_test.py index 22833a3b..a75c8c0b 100644 --- a/tests/integ/acl_test.py +++ b/tests/integ/acl_test.py @@ -232,7 +232,7 @@ def testGetAcls(self): rsp = self.session.get(req, headers=headers) self.assertEqual(rsp.status_code, 403) # forbidden else: - print("user2name not set") + print("user2_name not set") def testPutAcl(self): print("testPutAcl", self.base_domain) diff --git a/tests/integ/attr_test.py b/tests/integ/attr_test.py index b4aa5f83..e6fabeda 100644 --- a/tests/integ/attr_test.py +++ b/tests/integ/attr_test.py @@ -600,7 +600,6 @@ def testPutFixedUTF8String(self): rsp = self.session.get(req, headers=headers) self.assertEqual(rsp.status_code, 200) rspJson = json.loads(rsp.text) - print(rspJson) self.assertTrue("hrefs" in rspJson) self.assertTrue("value" in rspJson) self.assertEqual(rspJson["value"], text) @@ -663,10 +662,8 @@ def testPutFixedUTF8StringBinary(self): rsp = self.session.get(req, headers=headers) self.assertEqual(rsp.status_code, 200) rspJson = json.loads(rsp.text) - print(rspJson) self.assertTrue("hrefs" in rspJson) self.assertTrue("value" in rspJson) - print(f"Retrieved UTF8 string: {rspJson['value']}") self.assertEqual(rspJson["value"], character_text) self.assertTrue("type" in rspJson) type_json = rspJson["type"] @@ -741,9 +738,7 @@ def testPutVLenInt(self): "base": {"class": "H5T_INTEGER", "base": "H5T_STD_I32LE"}, } value = [ - [ - 1, - ], + [1, ], [1, 2], [1, 2, 3], [1, 2, 3, 4], @@ -776,12 +771,7 @@ def testPutVLenInt(self): self.assertTrue("class" in shape_json) self.assertEqual(shape_json["class"], "H5S_SIMPLE") self.assertTrue("dims" in shape_json) - self.assertEqual( - shape_json["dims"], - [ - 4, - ], - ) + self.assertEqual(shape_json["dims"], [4, ]) def testPutInvalid(self): print("testPutInvalid", self.base_domain) @@ -1019,9 +1009,7 @@ def testPutVlenObjReference(self): vlen_type = {"class": "H5T_VLEN", "base": ref_type} attr_name = "obj_ref" value = [ - [ - g1_1_id, - ], + [g1_1_id, ], [g1_1_id, g1_2_id], [g1_1_id, g1_2_id, g1_3_id], ] @@ -1050,12 +1038,7 @@ def testPutVlenObjReference(self): self.assertTrue("class" in rsp_shape) self.assertEqual(rsp_shape["class"], "H5S_SIMPLE") self.assertTrue("dims" in rsp_shape) - self.assertEqual( - rsp_shape["dims"], - [ - 3, - ], - ) + self.assertEqual(rsp_shape["dims"], [3, ]) self.assertTrue("value" in rspJson) vlen_values = rspJson["value"] self.assertEqual(len(vlen_values), 3) @@ -1123,9 +1106,7 @@ def testPutCompoundObjReference(self): ] data = { "type": compound_type, - "shape": [ - 1, - ], + "shape": [1, ], "value": value, } req = self.endpoint + "/groups/" + g1_id + "/attributes/" + attr_name @@ -1173,19 +1154,9 @@ def testPutCompoundObjReference(self): self.assertTrue("class" in rsp_shape) self.assertEqual(rsp_shape["class"], "H5S_SIMPLE") self.assertTrue("dims" in rsp_shape) - self.assertEqual( - rsp_shape["dims"], - [ - 1, - ], - ) + self.assertEqual(rsp_shape["dims"], [1, ]) self.assertTrue("value" in rspJson) - self.assertEqual( - rspJson["value"], - [ - [dset_id, 0], - ], - ) + self.assertEqual(rspJson["value"], [[dset_id, 0], ]) def testPutNoData(self): # Test PUT value for 1d attribute without any data provided diff --git a/tests/integ/domain_test.py b/tests/integ/domain_test.py index 85ca3f89..670a02c3 100755 --- a/tests/integ/domain_test.py +++ b/tests/integ/domain_test.py @@ -321,7 +321,6 @@ def testCreateDomain(self): self.assertTrue(k in rspJson) root_id = rspJson["root"] - print("root_id:", root_id) # verify that putting the same domain again fails with a 409 error rsp = self.session.put(req, headers=headers) diff --git a/tests/integ/value_test.py b/tests/integ/value_test.py index b25bd4c8..f68d1771 100755 --- a/tests/integ/value_test.py +++ b/tests/integ/value_test.py @@ -219,6 +219,12 @@ def testPut1DDatasetBinary(self): rsp = self.session.put(req, data=data, headers=headers_bin_req) self.assertEqual(rsp.status_code, 200) + # Try writing less than the expected number of bytes + # should return a 400 status + partial_data = data[4:] + rsp = self.session.put(req, data=partial_data, headers=headers_bin_req) + self.assertEqual(rsp.status_code, 400) + # read back the data rsp = self.session.get(req, headers=headers_bin_rsp) self.assertEqual(rsp.status_code, 200) @@ -3564,7 +3570,7 @@ def testPutFixedUTF8StringDatasetBinary(self): # read value back as binary rsp = self.session.get(req, headers=headers_bin_rsp) self.assertEqual(rsp.status_code, 200) - self.assertEqual(rsp.text, text) + self.assertEqual(rsp.content, binary_text) # write different utf8 binary string of same overall byte length text = "this is the chinese character for the number eight: 888"