Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flushfix #289

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#FROM python:3.10 AS hsds-base
FROM hdfgroup/hdf5lib:1.14.0 as hsds-base
FROM python:3.10 AS hsds-base
# FROM hdfgroup/hdf5lib:1.14.0 as hsds-base

# Install Curl
RUN apt-get update; apt-get -y install curl
Expand Down
1 change: 1 addition & 0 deletions admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ store_read_timeout: 1 # time to cancel storage read request if no response
store_read_sleep_interval: 0.1 # time to sleep between checking on read request
max_pending_write_requests: 20 # maxium number of inflight write requests
flush_sleep_interval: 1 # time to wait between checking on dirty objects
flush_timeout: 10 # max time to wait on all I/O operations to complete for a flush
min_chunk_size: 1m # 1 MB
max_chunk_size: 4m # 4 MB
max_request_size: 100m # 100 MB - should be no smaller than client_max_body_size in nginx tmpl (if using nginx)
Expand Down
4 changes: 2 additions & 2 deletions hsds/chunk_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ async def GET_Value(request):
resp_body = await jsonResponse(
resp, resp_json, ignore_nan=ignore_nan, body_only=True
)
log.debug(f"jsonResponse returned: {resp_body}")
log.debug(f"jsonResponse returned: {len(resp_body)} items")
resp_body = resp_body.encode("utf-8")
await resp.write(resp_body)
await resp.write_eof()
Expand Down Expand Up @@ -1296,7 +1296,7 @@ async def POST_Value(request):
resp_body = await jsonResponse(
resp, resp_json, ignore_nan=ignore_nan, body_only=True
)
log.debug(f"jsonResponse returned: {resp_body}")
log.debug(f"jsonResponse returned: {len(resp_body)} items")
resp_body = resp_body.encode("utf-8")
await resp.write(resp_body)
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ async def get_chunk(
# also note - get deflate and shuffle will update the deflate and
# shuffle map so that the s3sync will do the right thing
filter_ops = getFilterOps(app, dset_json, item_size)
log.debug(f"filter_ops: {filter_ops}")

if s3path:
try:
Expand Down Expand Up @@ -1035,7 +1036,7 @@ async def get_chunk(
break
if chunk_arr is None:
msg = f"s3 read for chunk {chunk_id} timed-out, "
msg += "initiaiting a new read"
msg += "initiating a new read"
log.warn(msg)

if chunk_arr is None:
Expand Down
6 changes: 3 additions & 3 deletions hsds/group_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def PUT_Group(request):
Used to flush all objects under a root group to S3
"""

flush_time_out = config.get("s3_sync_interval") * 2
flush_timeout = config.get("flush_timeout", default=10)
flush_sleep_interval = config.get("flush_sleep_interval")
log.request(request)
app = request.app
Expand Down Expand Up @@ -186,7 +186,7 @@ async def PUT_Group(request):
log.debug(f"flushop - waiting on {len(flush_set)} items")

if len(flush_set) > 0:
while time.time() - flush_start < flush_time_out:
while time.time() - flush_start < flush_timeout:
await asyncio.sleep(flush_sleep_interval) # wait a bit
# check to see if the items in our flush set are still there
remaining_set = set()
Expand All @@ -210,7 +210,7 @@ async def PUT_Group(request):

if len(flush_set) > 0:
msg = f"flushop - {len(flush_set)} items not updated after "
msg += f"{flush_time_out} seconds"
msg += f"{flush_timeout} seconds"
log.warn(msg)
log.debug(f"flush set: {flush_set}")
raise HTTPServiceUnavailable()
Expand Down
6 changes: 6 additions & 0 deletions hsds/util/dsetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,16 @@ def getShuffleFilter(dset_json):
filters = getFilters(dset_json)
FILTER_CLASSES = ("H5Z_FILTER_SHUFFLE", "H5Z_FILTER_BITSHUFFLE")
for filter in filters:
log.debug(f"filter: {filter}")
if "class" not in filter:
log.warn(f"filter option: {filter} with no class key")
continue
filter_class = filter["class"]
if filter_class in FILTER_CLASSES:
log.debug(f"found filter: {filter}")
return filter
else:
log.warn(f"unexpected filter class: {filter_class}")

log.debug("Shuffle filter not used")
return None
Expand Down Expand Up @@ -177,9 +181,11 @@ def getFilterOps(app, dset_json, item_size):
else:
filter_ops["level"] = int(compressionFilter["level"])

if filter_ops:
filter_ops["item_size"] = item_size
log.debug(f"save filter ops: {filter_ops} for {dset_id}")
filter_map[dset_id] = filter_ops # save

return filter_ops
else:
return None
Expand Down
21 changes: 14 additions & 7 deletions hsds/util/storUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _shuffle(codec, data, item_size=4):
shuffler = codecs.Shuffle(item_size)
arr = shuffler.encode(data)
elif codec == 2:
# bit shuffle, use bitshuffle packge
# bit shuffle, use bitshuffle package
if isinstance(data, bytes):
# bitshufle is expecting numpy array
data = np.frombuffer(data, dtype=np.dtype("uint8"))
Expand Down Expand Up @@ -353,15 +353,22 @@ async def getStorBytes(app,
compressor = None
if filter_ops:
log.debug(f"getStorBytes for {key} with filter_ops: {filter_ops}")
if filter_ops.get("shuffle") == "shuffle":
shuffle = filter_ops["item_size"]
log.debug("using shuffle filter")
elif filter_ops.get("shuffle") == "bitshuffle":
shuffle = 2
log.debug("using bitshuffle filter")
if "shuffle" in filter_ops:
shuffle = filter_ops["shuffle"]
if shuffle == 1:
log.debug("using shuffle filter")
elif shuffle == 2:
log.debug("using bitshuffle filter")
else:
log.debug("no shuffle filter")
else:
log.debug("shuffle filter not set in filter_ops")

if "compressor" in filter_ops:
compressor = filter_ops["compressor"]
log.debug(f"using compressor: {compressor}")
else:
log.debug("compressor not set in filter ops")
item_size = filter_ops["item_size"]

kwargs = {"bucket": bucket, "key": key, "offset": offset, "length": length}
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"bitshuffle",
"botocore",
"cryptography",
"h5py >= 3.6.0",
"numcodecs",
"numpy",
"psutil",
Expand All @@ -50,6 +51,7 @@ dependencies = [
"pyyaml",
"requests-unixsocket",
"simplejson",
"s3fs",
]

[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/acl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def testGetAcls(self):
rsp = self.session.get(req, headers=headers)
self.assertEqual(rsp.status_code, 403) # forbidden
else:
print("user2name not set")
print("user2_name not set")

def testPutAcl(self):
print("testPutAcl", self.base_domain)
Expand Down
43 changes: 7 additions & 36 deletions tests/integ/attr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,6 @@ def testPutFixedUTF8String(self):
rsp = self.session.get(req, headers=headers)
self.assertEqual(rsp.status_code, 200)
rspJson = json.loads(rsp.text)
print(rspJson)
self.assertTrue("hrefs" in rspJson)
self.assertTrue("value" in rspJson)
self.assertEqual(rspJson["value"], text)
Expand Down Expand Up @@ -663,10 +662,8 @@ def testPutFixedUTF8StringBinary(self):
rsp = self.session.get(req, headers=headers)
self.assertEqual(rsp.status_code, 200)
rspJson = json.loads(rsp.text)
print(rspJson)
self.assertTrue("hrefs" in rspJson)
self.assertTrue("value" in rspJson)
print(f"Retrieved UTF8 string: {rspJson['value']}")
self.assertEqual(rspJson["value"], character_text)
self.assertTrue("type" in rspJson)
type_json = rspJson["type"]
Expand Down Expand Up @@ -741,9 +738,7 @@ def testPutVLenInt(self):
"base": {"class": "H5T_INTEGER", "base": "H5T_STD_I32LE"},
}
value = [
[
1,
],
[1, ],
[1, 2],
[1, 2, 3],
[1, 2, 3, 4],
Expand Down Expand Up @@ -776,12 +771,7 @@ def testPutVLenInt(self):
self.assertTrue("class" in shape_json)
self.assertEqual(shape_json["class"], "H5S_SIMPLE")
self.assertTrue("dims" in shape_json)
self.assertEqual(
shape_json["dims"],
[
4,
],
)
self.assertEqual(shape_json["dims"], [4, ])

def testPutInvalid(self):
print("testPutInvalid", self.base_domain)
Expand Down Expand Up @@ -1019,9 +1009,7 @@ def testPutVlenObjReference(self):
vlen_type = {"class": "H5T_VLEN", "base": ref_type}
attr_name = "obj_ref"
value = [
[
g1_1_id,
],
[g1_1_id, ],
[g1_1_id, g1_2_id],
[g1_1_id, g1_2_id, g1_3_id],
]
Expand Down Expand Up @@ -1050,12 +1038,7 @@ def testPutVlenObjReference(self):
self.assertTrue("class" in rsp_shape)
self.assertEqual(rsp_shape["class"], "H5S_SIMPLE")
self.assertTrue("dims" in rsp_shape)
self.assertEqual(
rsp_shape["dims"],
[
3,
],
)
self.assertEqual(rsp_shape["dims"], [3, ])
self.assertTrue("value" in rspJson)
vlen_values = rspJson["value"]
self.assertEqual(len(vlen_values), 3)
Expand Down Expand Up @@ -1123,9 +1106,7 @@ def testPutCompoundObjReference(self):
]
data = {
"type": compound_type,
"shape": [
1,
],
"shape": [1, ],
"value": value,
}
req = self.endpoint + "/groups/" + g1_id + "/attributes/" + attr_name
Expand Down Expand Up @@ -1173,19 +1154,9 @@ def testPutCompoundObjReference(self):
self.assertTrue("class" in rsp_shape)
self.assertEqual(rsp_shape["class"], "H5S_SIMPLE")
self.assertTrue("dims" in rsp_shape)
self.assertEqual(
rsp_shape["dims"],
[
1,
],
)
self.assertEqual(rsp_shape["dims"], [1, ])
self.assertTrue("value" in rspJson)
self.assertEqual(
rspJson["value"],
[
[dset_id, 0],
],
)
self.assertEqual(rspJson["value"], [[dset_id, 0], ])

def testPutNoData(self):
# Test PUT value for 1d attribute without any data provided
Expand Down
1 change: 0 additions & 1 deletion tests/integ/domain_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ def testCreateDomain(self):
self.assertTrue(k in rspJson)

root_id = rspJson["root"]
print("root_id:", root_id)

# verify that putting the same domain again fails with a 409 error
rsp = self.session.put(req, headers=headers)
Expand Down
8 changes: 7 additions & 1 deletion tests/integ/value_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ def testPut1DDatasetBinary(self):
rsp = self.session.put(req, data=data, headers=headers_bin_req)
self.assertEqual(rsp.status_code, 200)

# Try writing less than the expected number of bytes
# should return a 400 status
partial_data = data[4:]
rsp = self.session.put(req, data=partial_data, headers=headers_bin_req)
self.assertEqual(rsp.status_code, 400)

# read back the data
rsp = self.session.get(req, headers=headers_bin_rsp)
self.assertEqual(rsp.status_code, 200)
Expand Down Expand Up @@ -3564,7 +3570,7 @@ def testPutFixedUTF8StringDatasetBinary(self):
# read value back as binary
rsp = self.session.get(req, headers=headers_bin_rsp)
self.assertEqual(rsp.status_code, 200)
self.assertEqual(rsp.text, text)
self.assertEqual(rsp.content, binary_text)

# write different utf8 binary string of same overall byte length
text = "this is the chinese character for the number eight: 888"
Expand Down
Loading