Skip to content

Commit

Permalink
Flushfix (#289)
Browse files Browse the repository at this point in the history
* added config for flush timeout

* update Dockerfile to use python 3.10
  • Loading branch information
jreadey authored Nov 28, 2023
1 parent f6e49d2 commit c8c51c3
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 54 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#FROM python:3.10 AS hsds-base
FROM hdfgroup/hdf5lib:1.14.0 as hsds-base
FROM python:3.10 AS hsds-base
# FROM hdfgroup/hdf5lib:1.14.0 as hsds-base

# Install Curl
RUN apt-get update; apt-get -y install curl
Expand Down
1 change: 1 addition & 0 deletions admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ store_read_timeout: 1 # time to cancel storage read request if no response
store_read_sleep_interval: 0.1 # time to sleep between checking on read request
max_pending_write_requests: 20 # maxium number of inflight write requests
flush_sleep_interval: 1 # time to wait between checking on dirty objects
flush_timeout: 10 # max time to wait on all I/O operations to complete for a flush
min_chunk_size: 1m # 1 MB
max_chunk_size: 4m # 4 MB
max_request_size: 100m # 100 MB - should be no smaller than client_max_body_size in nginx tmpl (if using nginx)
Expand Down
4 changes: 2 additions & 2 deletions hsds/chunk_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ async def GET_Value(request):
resp_body = await jsonResponse(
resp, resp_json, ignore_nan=ignore_nan, body_only=True
)
log.debug(f"jsonResponse returned: {resp_body}")
log.debug(f"jsonResponse returned: {len(resp_body)} items")
resp_body = resp_body.encode("utf-8")
await resp.write(resp_body)
await resp.write_eof()
Expand Down Expand Up @@ -1296,7 +1296,7 @@ async def POST_Value(request):
resp_body = await jsonResponse(
resp, resp_json, ignore_nan=ignore_nan, body_only=True
)
log.debug(f"jsonResponse returned: {resp_body}")
log.debug(f"jsonResponse returned: {len(resp_body)} items")
resp_body = resp_body.encode("utf-8")
await resp.write(resp_body)
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ async def get_chunk(
# also note - get deflate and shuffle will update the deflate and
# shuffle map so that the s3sync will do the right thing
filter_ops = getFilterOps(app, dset_json, item_size)
log.debug(f"filter_ops: {filter_ops}")

if s3path:
try:
Expand Down Expand Up @@ -1035,7 +1036,7 @@ async def get_chunk(
break
if chunk_arr is None:
msg = f"s3 read for chunk {chunk_id} timed-out, "
msg += "initiaiting a new read"
msg += "initiating a new read"
log.warn(msg)

if chunk_arr is None:
Expand Down
6 changes: 3 additions & 3 deletions hsds/group_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def PUT_Group(request):
Used to flush all objects under a root group to S3
"""

flush_time_out = config.get("s3_sync_interval") * 2
flush_timeout = config.get("flush_timeout", default=10)
flush_sleep_interval = config.get("flush_sleep_interval")
log.request(request)
app = request.app
Expand Down Expand Up @@ -186,7 +186,7 @@ async def PUT_Group(request):
log.debug(f"flushop - waiting on {len(flush_set)} items")

if len(flush_set) > 0:
while time.time() - flush_start < flush_time_out:
while time.time() - flush_start < flush_timeout:
await asyncio.sleep(flush_sleep_interval) # wait a bit
# check to see if the items in our flush set are still there
remaining_set = set()
Expand All @@ -210,7 +210,7 @@ async def PUT_Group(request):

if len(flush_set) > 0:
msg = f"flushop - {len(flush_set)} items not updated after "
msg += f"{flush_time_out} seconds"
msg += f"{flush_timeout} seconds"
log.warn(msg)
log.debug(f"flush set: {flush_set}")
raise HTTPServiceUnavailable()
Expand Down
6 changes: 6 additions & 0 deletions hsds/util/dsetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,16 @@ def getShuffleFilter(dset_json):
filters = getFilters(dset_json)
FILTER_CLASSES = ("H5Z_FILTER_SHUFFLE", "H5Z_FILTER_BITSHUFFLE")
for filter in filters:
log.debug(f"filter: {filter}")
if "class" not in filter:
log.warn(f"filter option: {filter} with no class key")
continue
filter_class = filter["class"]
if filter_class in FILTER_CLASSES:
log.debug(f"found filter: {filter}")
return filter
else:
log.warn(f"unexpected filter class: {filter_class}")

log.debug("Shuffle filter not used")
return None
Expand Down Expand Up @@ -177,9 +181,11 @@ def getFilterOps(app, dset_json, item_size):
else:
filter_ops["level"] = int(compressionFilter["level"])

if filter_ops:
filter_ops["item_size"] = item_size
log.debug(f"save filter ops: {filter_ops} for {dset_id}")
filter_map[dset_id] = filter_ops # save

return filter_ops
else:
return None
Expand Down
21 changes: 14 additions & 7 deletions hsds/util/storUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _shuffle(codec, data, item_size=4):
shuffler = codecs.Shuffle(item_size)
arr = shuffler.encode(data)
elif codec == 2:
# bit shuffle, use bitshuffle packge
# bit shuffle, use bitshuffle package
if isinstance(data, bytes):
# bitshufle is expecting numpy array
data = np.frombuffer(data, dtype=np.dtype("uint8"))
Expand Down Expand Up @@ -353,15 +353,22 @@ async def getStorBytes(app,
compressor = None
if filter_ops:
log.debug(f"getStorBytes for {key} with filter_ops: {filter_ops}")
if filter_ops.get("shuffle") == "shuffle":
shuffle = filter_ops["item_size"]
log.debug("using shuffle filter")
elif filter_ops.get("shuffle") == "bitshuffle":
shuffle = 2
log.debug("using bitshuffle filter")
if "shuffle" in filter_ops:
shuffle = filter_ops["shuffle"]
if shuffle == 1:
log.debug("using shuffle filter")
elif shuffle == 2:
log.debug("using bitshuffle filter")
else:
log.debug("no shuffle filter")
else:
log.debug("shuffle filter not set in filter_ops")

if "compressor" in filter_ops:
compressor = filter_ops["compressor"]
log.debug(f"using compressor: {compressor}")
else:
log.debug("compressor not set in filter ops")
item_size = filter_ops["item_size"]

kwargs = {"bucket": bucket, "key": key, "offset": offset, "length": length}
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"bitshuffle",
"botocore",
"cryptography",
"h5py >= 3.6.0",
"numcodecs",
"numpy",
"psutil",
Expand All @@ -50,6 +51,7 @@ dependencies = [
"pyyaml",
"requests-unixsocket",
"simplejson",
"s3fs",
]

[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/acl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def testGetAcls(self):
rsp = self.session.get(req, headers=headers)
self.assertEqual(rsp.status_code, 403) # forbidden
else:
print("user2name not set")
print("user2_name not set")

def testPutAcl(self):
print("testPutAcl", self.base_domain)
Expand Down
43 changes: 7 additions & 36 deletions tests/integ/attr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,6 @@ def testPutFixedUTF8String(self):
rsp = self.session.get(req, headers=headers)
self.assertEqual(rsp.status_code, 200)
rspJson = json.loads(rsp.text)
print(rspJson)
self.assertTrue("hrefs" in rspJson)
self.assertTrue("value" in rspJson)
self.assertEqual(rspJson["value"], text)
Expand Down Expand Up @@ -663,10 +662,8 @@ def testPutFixedUTF8StringBinary(self):
rsp = self.session.get(req, headers=headers)
self.assertEqual(rsp.status_code, 200)
rspJson = json.loads(rsp.text)
print(rspJson)
self.assertTrue("hrefs" in rspJson)
self.assertTrue("value" in rspJson)
print(f"Retrieved UTF8 string: {rspJson['value']}")
self.assertEqual(rspJson["value"], character_text)
self.assertTrue("type" in rspJson)
type_json = rspJson["type"]
Expand Down Expand Up @@ -741,9 +738,7 @@ def testPutVLenInt(self):
"base": {"class": "H5T_INTEGER", "base": "H5T_STD_I32LE"},
}
value = [
[
1,
],
[1, ],
[1, 2],
[1, 2, 3],
[1, 2, 3, 4],
Expand Down Expand Up @@ -776,12 +771,7 @@ def testPutVLenInt(self):
self.assertTrue("class" in shape_json)
self.assertEqual(shape_json["class"], "H5S_SIMPLE")
self.assertTrue("dims" in shape_json)
self.assertEqual(
shape_json["dims"],
[
4,
],
)
self.assertEqual(shape_json["dims"], [4, ])

def testPutInvalid(self):
print("testPutInvalid", self.base_domain)
Expand Down Expand Up @@ -1019,9 +1009,7 @@ def testPutVlenObjReference(self):
vlen_type = {"class": "H5T_VLEN", "base": ref_type}
attr_name = "obj_ref"
value = [
[
g1_1_id,
],
[g1_1_id, ],
[g1_1_id, g1_2_id],
[g1_1_id, g1_2_id, g1_3_id],
]
Expand Down Expand Up @@ -1050,12 +1038,7 @@ def testPutVlenObjReference(self):
self.assertTrue("class" in rsp_shape)
self.assertEqual(rsp_shape["class"], "H5S_SIMPLE")
self.assertTrue("dims" in rsp_shape)
self.assertEqual(
rsp_shape["dims"],
[
3,
],
)
self.assertEqual(rsp_shape["dims"], [3, ])
self.assertTrue("value" in rspJson)
vlen_values = rspJson["value"]
self.assertEqual(len(vlen_values), 3)
Expand Down Expand Up @@ -1123,9 +1106,7 @@ def testPutCompoundObjReference(self):
]
data = {
"type": compound_type,
"shape": [
1,
],
"shape": [1, ],
"value": value,
}
req = self.endpoint + "/groups/" + g1_id + "/attributes/" + attr_name
Expand Down Expand Up @@ -1173,19 +1154,9 @@ def testPutCompoundObjReference(self):
self.assertTrue("class" in rsp_shape)
self.assertEqual(rsp_shape["class"], "H5S_SIMPLE")
self.assertTrue("dims" in rsp_shape)
self.assertEqual(
rsp_shape["dims"],
[
1,
],
)
self.assertEqual(rsp_shape["dims"], [1, ])
self.assertTrue("value" in rspJson)
self.assertEqual(
rspJson["value"],
[
[dset_id, 0],
],
)
self.assertEqual(rspJson["value"], [[dset_id, 0], ])

def testPutNoData(self):
# Test PUT value for 1d attribute without any data provided
Expand Down
1 change: 0 additions & 1 deletion tests/integ/domain_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ def testCreateDomain(self):
self.assertTrue(k in rspJson)

root_id = rspJson["root"]
print("root_id:", root_id)

# verify that putting the same domain again fails with a 409 error
rsp = self.session.put(req, headers=headers)
Expand Down
8 changes: 7 additions & 1 deletion tests/integ/value_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ def testPut1DDatasetBinary(self):
rsp = self.session.put(req, data=data, headers=headers_bin_req)
self.assertEqual(rsp.status_code, 200)

# Try writing less than the expected number of bytes
# should return a 400 status
partial_data = data[4:]
rsp = self.session.put(req, data=partial_data, headers=headers_bin_req)
self.assertEqual(rsp.status_code, 400)

# read back the data
rsp = self.session.get(req, headers=headers_bin_rsp)
self.assertEqual(rsp.status_code, 200)
Expand Down Expand Up @@ -3564,7 +3570,7 @@ def testPutFixedUTF8StringDatasetBinary(self):
# read value back as binary
rsp = self.session.get(req, headers=headers_bin_rsp)
self.assertEqual(rsp.status_code, 200)
self.assertEqual(rsp.text, text)
self.assertEqual(rsp.content, binary_text)

# write different utf8 binary string of same overall byte length
text = "this is the chinese character for the number eight: 888"
Expand Down

0 comments on commit c8c51c3

Please sign in to comment.