Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linkmod updates #317

Merged
merged 3 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 95 additions & 108 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ async def get_chunk_bytes(
log.warn(msg)
raise HTTPBadRequest(reason=msg)

# number of bytes in the hd5 chunk
# number of bytes in the hdf5 chunk
h5_size = np.prod(hyper_dims) * item_size
log.debug(f"h5 chunk size: {h5_size}")

Expand Down
15 changes: 15 additions & 0 deletions hsds/link_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,21 @@ async def POST_Links(request):
log.warn(msg)
raise HTTPBadRequest(reason=msg)

if follow_links and titles:
msg = "titles list can not be used with follow_links"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

if limit and titles:
msg = "Limit parameter can not be used with titles list"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

if create_order and titles:
msg = "CreateOrder parameter can not be used with titles"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

# construct an item list from titles and group_ids
items = {}
if group_ids is None:
Expand Down
2 changes: 1 addition & 1 deletion hsds/util/s3Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _get_client_kwargs(self):
kwargs["endpoint_url"] = self._s3_gateway
kwargs["use_ssl"] = self._use_ssl
kwargs["config"] = self._aio_config
log.debug(f"s3 kwargs: {kwargs}")
# log.debug(f"s3 kwargs: {kwargs}")
return kwargs

def _renewToken(self):
Expand Down
2 changes: 2 additions & 0 deletions tests/integ/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,6 @@ def getLink(domain, grp_id, title):
raise KeyError(f"expected link key in {rspJson}")
link_json = rspJson["link"]

session.close()

return link_json
8 changes: 8 additions & 0 deletions tests/integ/link_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,14 @@ def testPostLinkMultiple(self):
else:
self.assertTrue(False) # unexpected

# try follow_links with titles. Should return a 400
params = {"follow_links": 1}
grp_ids = [root_id, ]
titles = ["dset1.1.1", "dset1.1.2", "dset2.1", "dset2.2"]
payload = {"grp_ids": grp_ids, "titles": titles}
rsp = self.session.post(req, data=json.dumps(payload), params=params, headers=headers)
self.assertEqual(rsp.status_code, 400) # titles not allowed with follow_links

def testPostLinksGroupList(self):
domain = self.base_domain + "/testPostLinksGroupList.h5"
helper.setupDomain(domain)
Expand Down
137 changes: 79 additions & 58 deletions tools/link_mod.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,52 +44,80 @@ def printUsage():
sys.exit()


def getFileUri(dset_json):
# older format used "layout" key to store file_uri.
# newer ones use "creationProperties"
# return whichever one has a file_uri or None if neither do

if "creationProperties" in dset_json:
cpl = dset_json["creationProperties"]
if "layout" in cpl:
cpl_layout = cpl["layout"]
if "file_uri" in cpl_layout:
return cpl_layout["file_uri"]
# not found under cpl
if "layout" in dset_json:
layout = dset_json["layout"]
if "file_uri" in layout:
return layout["file_uri"]

# no found in either
return None


def setFileUri(dset_json, file_uri):
if "creationProperties" in dset_json:
cpl = dset_json["creationProperties"]
if "layout" in cpl:
cpl_layout = cpl["layout"]
if "file_uri" in cpl_layout:
cpl_layout["file_uri"] = file_uri
log.info(f"updated creationProperties layout with {file_uri}")
return
# not found under cpl
if "layout" in dset_json:
layout = dset_json["layout"]
if "file_uri" in layout:
layout["file_uri"] = file_uri
log.info(f"updated dset layout with {file_uri}")
return

# no found in either
log.warning("expected to find file_uri for update")


async def checkDataset(app, dset_key):
log.info(f"checkDataset for key: {dset_key}")
dset_json = await getStorJSONObj(app, dset_key)
log.debug(f"get dset_json: {dset_json}")
dset_id = dset_json["id"]
prefix_old = app["prefix_old"]
prefix_new = app["prefix_new"]
do_update = app["do_update"]
indirect_dataset_keys = app["indirect_dataset_keys"]
app["dataset_count"] += 1
log.info(f"checkDataset for id: {dset_id}")
if "layout" not in dset_json:
log.info("no layout found")
return
layout_json = dset_json["layout"]
if "class" not in layout_json:
log.warn(f"no class found in layout for id: {dset_id}")
file_uri = getFileUri(dset_json)

if not file_uri:
log.debug(f"no file_uri for {dset_key}")
return
layout_class = layout_json["class"]
log.info(f"got layout_class: {layout_class}")
if layout_class in ("H5D_CONTIGUOUS_REF", "H5D_CHUNKED_REF"):
if "file_uri" not in layout_json:
log.warn(
f"Expected to find key 'file_uri' in layout_json for id: {dset_id}"
)
return
file_uri = layout_json["file_uri"]
if file_uri.startswith(prefix_old):
prefix_len = len(prefix_old)
new_file_uri = prefix_new + file_uri[prefix_len:]
log.info(f"replacing uri: {file_uri} with {new_file_uri}")
app["matched_dset_uri"] += 1
if do_update:
# update the dataset json
layout_json["file_uri"] = new_file_uri
dset_json["layout"] = layout_json
# write back to storage
try:
await putStorJSONObj(app, dset_key, dset_json)
log.info(f"dataset {dset_id} updated")
except Exception as e:
log.error(f"get exception writing dataset json: {e}")
elif layout_class == "H5D_CHUNKED_REF_INDIRECT":
# add to list to be scanned later
indirect_dataset_keys += dset_key[: -len(".dataset.json")]
else:
log.info(f"skipping check for layout_class: {layout_class}")

log.debug(f"got file_uri: {file_uri}")

if file_uri.startswith(prefix_old):
prefix_len = len(prefix_old)
new_file_uri = prefix_new + file_uri[prefix_len:]
log.info(f"replacing uri: {file_uri} with {new_file_uri}")
app["matched_dset_uri"] += 1
if do_update:
setFileUri(dset_json, new_file_uri)

# write back to storage
try:
await putStorJSONObj(app, dset_key, dset_json)
log.info(f"dataset {dset_id} updated")
except Exception as e:
log.error(f"get exception writing dataset json: {e}")


async def getKeysCallback(app, s3keys):
Expand All @@ -104,16 +132,11 @@ async def getKeysCallback(app, s3keys):
raise ValueError("Invalid getKeysCallback")

prefix = app["root_prefix"]
prefix_len = len(prefix)
for s3key in s3keys:
if not s3key.startswith(prefix):
log.error(f"Unexpected key {s3key} for prefix: {prefix}")
raise ValueError("invalid s3key for getKeysCallback")
if not s3key.endswith(".dataset.json"):
log.info(f"got unexpected key {s3key}, ignoring")
continue
dset_key = prefix + s3key[prefix_len:]
log.info(f"getKeys - :{dset_key}")
dset_key = s3key + ".dataset.json"
await checkDataset(app, dset_key)

log.info("getKeysCallback complete")
Expand All @@ -126,36 +149,30 @@ async def run_scan(app, rootid, update=False):
if not root_key.endswith("/.group.json"):
raise ValueError("unexpected root key")
root_prefix = root_key[: -(len(".group.json"))]
root_prefix += "d/"
log.info(f"getting s3 keys with prefix: {root_prefix}")
app["root_prefix"] = root_prefix

try:
await getStorKeys(
app,
prefix=root_prefix,
suffix=".dataset.json",
deliminator="/",
include_stats=False,
callback=getKeysCallback,
)
except ClientError as ce:
log.error(f"removeKeys - getS3Keys faiiled: {ce}")
except HTTPNotFound:
log.warn(
f"getStorKeys - HTTPNotFound error for getStorKeys with prefix: {root_prefix}"
)
msg = f"getStorKeys - HTTPNotFound error for getStorKeys with prefix: {root_prefix}"
log.warn(msg)
except HTTPInternalServerError:
log.error(
f"getStorKeys - HTTPInternalServerError for getStorKeys with prefix: {root_prefix}"
)
msg = f"getStorKeys - HTTPInternalServerError for getStorKeys with prefix: {root_prefix}"
log.error(msg)
except Exception as e:
log.error(
f"getStorKeys - Unexpected Exception for getStorKeys with prefix: {root_prefix}: {e}"
)

# update all chunks for datasets with H5D_CHUNKED_REF_INDIRECT layout
indirect_dataset_keys = app["indirect_dataset_keys"]
for prefix in indirect_dataset_keys:
log.info(f"got inidirect prefix: {prefix}")
# TBD...
msg = "getStorKeys - Unexpected Exception for getStorKeys with prefix: "
msg += f"{root_prefix}: {e}"
log.error(msg)

await releaseStorageClient(app)

Expand Down Expand Up @@ -185,6 +202,10 @@ def main():
print("prefix_old and prefix_new or the same")
sys.exit(1)

# setup log config
log_level = "WARN" # ERROR, WARN, INFO, or DEBUG
log.setLogConfig(log_level)

# we need to setup a asyncio loop to query s3
loop = asyncio.get_event_loop()

Expand Down
Loading