Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multilink #296

Merged
merged 18 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 3 additions & 25 deletions hsds/attr_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from .util.httpUtil import getHref
from .util.httpUtil import getAcceptType, jsonResponse
from .util.idUtil import isValidUuid, getCollectionForId, getRootObjId
from .util.idUtil import isValidUuid, getRootObjId
from .util.authUtil import getUserPasswordFromRequest, validateUserPassword
from .util.domainUtil import getDomainFromRequest, isValidDomain
from .util.domainUtil import getBucketForDomain, verifyRoot
Expand Down Expand Up @@ -1106,7 +1106,7 @@ async def PUT_AttributeValue(request):


async def POST_Attributes(request):
"""HTTP method to get multiple attribute values"""
"""HTTP method to get multiple attributes """
log.request(request)
app = request.app
log.info("POST_Attributes")
Expand Down Expand Up @@ -1247,7 +1247,6 @@ async def POST_Attributes(request):
elif len(items) == 1:
# just make a request the datanode
obj_id = list(items.keys())[0]
collection = getCollectionForId(obj_id)
attr_names = items[obj_id]
kwargs = {"attr_names": attr_names, "bucket": bucket}
if not include_data:
Expand All @@ -1259,12 +1258,6 @@ async def POST_Attributes(request):

attributes = await getAttributes(app, obj_id, **kwargs)

# mixin hrefs
for attribute in attributes:
attr_name = attribute["name"]
attr_href = f"/{collection}/{obj_id}/attributes/{attr_name}"
attribute["href"] = getHref(request, attr_href)

resp_json["attributes"] = attributes
else:
# get multi obj
Expand All @@ -1288,31 +1281,16 @@ async def POST_Attributes(request):
msg = f"DomainCrawler returned: {len(crawler._obj_dict)} objects"
log.info(msg)
attributes = crawler._obj_dict
# mixin hrefs
# log attributes returned for each obj_id
for obj_id in attributes:
obj_attributes = attributes[obj_id]
msg = f"POST_Attributes, obj_id {obj_id} "
msg += f"returned {len(obj_attributes)}"
log.debug(msg)

collection = getCollectionForId(obj_id)
for attribute in obj_attributes:
log.debug(f"attribute: {attribute}")
attr_name = attribute["name"]
attr_href = f"/{collection}/{obj_id}/attributes/{attr_name}"
attribute["href"] = getHref(request, attr_href)
log.debug(f"got {len(attributes)} attributes")
resp_json["attributes"] = attributes

hrefs = []
collection = getCollectionForId(req_id)
obj_uri = "/" + collection + "/" + req_id
href = getHref(request, obj_uri + "/attributes")
hrefs.append({"rel": "self", "href": href})
hrefs.append({"rel": "home", "href": getHref(request, "/")})
hrefs.append({"rel": "owner", "href": getHref(request, obj_uri)})
resp_json["hrefs"] = hrefs

resp = await jsonResponse(request, resp_json, ignore_nan=ignore_nan)
log.response(request, resp=resp)
return resp
Expand Down
11 changes: 0 additions & 11 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ async def write_chunk_hyperslab(
np_arr: numpy array of data to be written
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"write_chunk_hyperslab, chunk_id: {chunk_id}, slices: {slices}, "
msg += f"bucket: {bucket}"
log.info(msg)
Expand Down Expand Up @@ -181,8 +178,6 @@ async def read_chunk_hyperslab(
entire object)
bucket: s3 bucket to read from
"""
if not bucket:
bucket = config.get("bucket_name")

if chunk_map is None:
log.error("expected chunk_map to be set")
Expand Down Expand Up @@ -444,9 +439,6 @@ async def read_point_sel(
arr: numpy array to store read bytes
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"read_point_sel, chunk_id: {chunk_id}, bucket: {bucket}"
log.info(msg)

Expand Down Expand Up @@ -549,9 +541,6 @@ async def write_point_sel(
point_data: index of arr element to update for a given point
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"write_point_sel, chunk_id: {chunk_id}, points: {point_list}, "
msg += f"data: {point_data}"
log.info(msg)
Expand Down
17 changes: 4 additions & 13 deletions hsds/ctype_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from aiohttp.web_exceptions import HTTPBadRequest, HTTPGone
from json import JSONDecodeError
from .util.httpUtil import http_post, http_put, http_delete, getHref, respJsonAssemble
from .util.httpUtil import http_post, http_delete, getHref, respJsonAssemble
from .util.httpUtil import jsonResponse
from .util.idUtil import isValidUuid, getDataNodeUrl, createObjId
from .util.authUtil import getUserPasswordFromRequest, aclCheck
Expand All @@ -25,7 +25,7 @@
from .util.domainUtil import getBucketForDomain, verifyRoot
from .util.hdf5dtype import validateTypeItem, getBaseTypeJson
from .servicenode_lib import getDomainJson, getObjectJson, validateAction
from .servicenode_lib import getObjectIdByPath, getPathForObjectId
from .servicenode_lib import getObjectIdByPath, getPathForObjectId, putHardLink
from . import hsds_logger as log


Expand Down Expand Up @@ -223,22 +223,13 @@ async def POST_Datatype(request):
ctype_json = {"id": ctype_id, "root": root_id, "type": datatype}
log.debug(f"create named type, body: {ctype_json}")
req = getDataNodeUrl(app, ctype_id) + "/datatypes"
params = {}
if bucket:
params["bucket"] = bucket
params = {"bucket": bucket}

type_json = await http_post(app, req, data=ctype_json, params=params)

# create link if requested
if link_id and link_title:
link_json = {}
link_json["id"] = ctype_id
link_json["class"] = "H5L_TYPE_HARD"
link_req = getDataNodeUrl(app, link_id)
link_req += "/groups/" + link_id + "/links/" + link_title
log.debug("PUT link - : " + link_req)
put_rsp = await http_put(app, link_req, data=link_json, params=params)
log.debug(f"PUT Link resp: {put_rsp}")
await putHardLink(app, link_id, link_title, tgt_id=ctype_id, bucket=bucket)

# datatype creation successful
resp = await jsonResponse(request, type_json, status=201)
Expand Down
8 changes: 4 additions & 4 deletions hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from .domain_dn import GET_Domain, PUT_Domain, DELETE_Domain, PUT_ACL
from .group_dn import GET_Group, POST_Group, DELETE_Group, PUT_Group
from .group_dn import POST_Root
from .link_dn import GET_Links, GET_Link, PUT_Link, DELETE_Link
from .link_dn import GET_Links, POST_Links, PUT_Links, DELETE_Links
from .attr_dn import GET_Attributes, POST_Attributes
from .attr_dn import PUT_Attributes, DELETE_Attributes
from .ctype_dn import GET_Datatype, POST_Datatype, DELETE_Datatype
Expand Down Expand Up @@ -59,9 +59,9 @@ async def init():
app.router.add_route("PUT", "/groups/{id}", PUT_Group)
app.router.add_route("POST", "/groups", POST_Group)
app.router.add_route("GET", "/groups/{id}/links", GET_Links)
app.router.add_route("GET", "/groups/{id}/links/{title}", GET_Link)
app.router.add_route("DELETE", "/groups/{id}/links/{title}", DELETE_Link)
app.router.add_route("PUT", "/groups/{id}/links/{title}", PUT_Link)
app.router.add_route("POST", "/groups/{id}/links", POST_Links)
app.router.add_route("DELETE", "/groups/{id}/links", DELETE_Links)
app.router.add_route("PUT", "/groups/{id}/links", PUT_Links)
app.router.add_route("GET", "/groups/{id}/attributes", GET_Attributes)
app.router.add_route("POST", "/groups/{id}/attributes", POST_Attributes)
app.router.add_route("DELETE", "/groups/{id}/attributes", DELETE_Attributes)
Expand Down
112 changes: 108 additions & 4 deletions hsds/domain_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
from aiohttp.web_exceptions import HTTPServiceUnavailable, HTTPConflict, HTTPBadRequest
from aiohttp.web_exceptions import HTTPInternalServerError, HTTPNotFound, HTTPGone


from .util.idUtil import getCollectionForId, getDataNodeUrl

from .servicenode_lib import getObjectJson, getAttributes, putAttributes
from .servicenode_lib import getObjectJson, getAttributes, putAttributes, getLinks
from . import hsds_logger as log


Expand Down Expand Up @@ -210,6 +208,93 @@ async def get_obj_json(self, obj_id):
self._obj_dict[link_id] = {} # placeholder for obj id
self._q.put_nowait(link_id)

async def get_links(self, grp_id, titles=None):
""" if titles is set, get all the links in grp_id that
have a title in the list. Otherwise, return all links for the object. """
log.debug(f"get_links: {grp_id}, titles; {titles}")
collection = getCollectionForId(grp_id)
if collection != "groups":
log.warn(f"get_links, expected groups id but got: {grp_id}")
return
kwargs = {}
if titles:
kwargs["titles"] = titles
if self._params.get("bucket"):
kwargs["bucket"] = self._params["bucket"]

if self._params.get("follow_links"):
follow_links = True
else:
follow_links = False

log.debug(f"follow_links: {follow_links}")
log.debug(f"getLinks kwargs: {kwargs}")

links = None
status = 200
try:
links = await getLinks(self._app, grp_id, **kwargs)
except HTTPNotFound:
status = 404
except HTTPServiceUnavailable:
status = 503
except HTTPInternalServerError:
status = 500
except Exception as e:
log.error(f"unexpected exception {e}")
status = 500
log.debug(f"getObjectJson status: {status}")

if links is None:
msg = f"DomainCrawler - get_links for {grp_id} "
if status >= 500:
msg += f"failed, status: {status}"
log.error(msg)
else:
msg += f"returned status: {status}"
log.warn(msg)
return

log.debug(f"DomainCrawler - got links for {grp_id}")
log.debug(f"save to obj_dict: {links}")

self._obj_dict[grp_id] = links # store the links

# if follow_links, add any group links to the lookup ids set
if follow_links:
log.debug(f"follow links for {grp_id}")
for link_obj in links:
log.debug(f"follow links for: {link_obj}")
if 'title' not in link_obj:
log.warn(f"expected to find title in link_json: {link_obj}")
continue
title = link_obj["title"]
log.debug(f"DomainCrawler - got link: {title}")
num_objects = len(self._obj_dict)
if self._params.get("max_objects_limit") is not None:
max_objects_limit = self._params["max_objects_limit"]
if num_objects >= max_objects_limit:
msg = "DomainCrawler reached limit of "
msg += f"{max_objects_limit}"
log.info(msg)
break
if link_obj["class"] != "H5L_TYPE_HARD":
# just follow hardlinks
log.debug("not hard link,continue")
continue
link_id = link_obj["id"]
if getCollectionForId(link_id) != "groups":
# only groups can have links
log.debug(f"link id: {link_id} is not for a group, continue")
continue
if link_id not in self._obj_dict:
# haven't seen this object yet, get obj json
log.debug(f"DomainCrawler - adding link_id: {link_id} to queue")
self._obj_dict[link_id] = {} # placeholder for obj id
self._q.put_nowait(link_id)
else:
log.debug(f"link: {link_id} already in object dict")

def get_status(self):
""" return the highest status of any of the returned objects """
status = None
Expand Down Expand Up @@ -304,7 +389,7 @@ async def fetch(self, obj_id):
log.warn("expected at least one name in attr_names list")
return

log.debug(f"DomainCrawler - got attribute names: {attr_names}")
log.debug(f"DomainCrawler - get attribute names: {attr_names}")
await self.get_attributes(obj_id, attr_names)
elif self._action == "put_attr":
log.debug("DomainCrawler - put attributes")
Expand All @@ -316,6 +401,25 @@ async def fetch(self, obj_id):
log.debug(f"got {len(attr_items)} attr_items")

await self.put_attributes(obj_id, attr_items)
elif self._action == "get_link":
log.debug("DomainCrawlwer - get links")
if obj_id not in self._objs:
link_titles = None # fetch all links for this object
else:
link_titles = self._objs[obj_id]
if link_titles is None:
log.debug(f"fetch all links for {obj_id}")
else:
if not isinstance(link_titles, list):
log.error("expected list for link titles")
return
if len(link_titles) == 0:
log.warn("expected at least one name in link titles list")
return

log.debug(f"DomainCrawler - get link titles: {link_titles}")
await self.get_links(obj_id, link_titles)

else:
msg = f"DomainCrawler: unexpected action: {self._action}"
log.error(msg)
Expand Down
14 changes: 1 addition & 13 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ async def GET_Domain(request):
bucket = getBucketForDomain(domain)
log.debug(f"GET_Domain domain: {domain} bucket: {bucket}")

if not bucket and not config.get("bucket_name"):
if not bucket:
# no bucket defined, raise 400
msg = "Bucket not provided"
log.warn(msg)
Expand Down Expand Up @@ -1354,10 +1354,6 @@ async def GET_Datasets(request):
raise HTTPBadRequest(reason=msg)

bucket = getBucketForDomain(domain)
if not bucket:
bucket = config.get("bucket_name")
else:
checkBucketAccess(app, bucket)

# verify the domain
try:
Expand Down Expand Up @@ -1448,10 +1444,6 @@ async def GET_Groups(request):
raise HTTPBadRequest(reason=msg)

bucket = getBucketForDomain(domain)
if not bucket:
bucket = config.get("bucket_name")
else:
checkBucketAccess(app, bucket)

# use reload to get authoritative domain json
try:
Expand Down Expand Up @@ -1537,10 +1529,6 @@ async def GET_Datatypes(request):
raise HTTPBadRequest(reason=msg)

bucket = getBucketForDomain(domain)
if not bucket:
bucket = config.get("bucket_name")
else:
checkBucketAccess(app, bucket)

# use reload to get authoritative domain json
try:
Expand Down
15 changes: 3 additions & 12 deletions hsds/dset_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from .util.hdf5dtype import validateTypeItem, createDataType, getBaseTypeJson
from .util.hdf5dtype import getItemSize
from .servicenode_lib import getDomainJson, getObjectJson, getDsetJson, getPathForObjectId
from .servicenode_lib import getObjectIdByPath, validateAction, getRootInfo, doFlush
from .servicenode_lib import getObjectIdByPath, validateAction, getRootInfo, doFlush, putHardLink
from .dset_lib import reduceShape
from . import config
from . import hsds_logger as log
Expand Down Expand Up @@ -1170,22 +1170,13 @@ async def POST_Dataset(request):

log.debug(f"create dataset: {dataset_json}")
req = getDataNodeUrl(app, dset_id) + "/datasets"
params = {}
if bucket:
params["bucket"] = bucket
params = {"bucket": bucket}

post_json = await http_post(app, req, data=dataset_json, params=params)

# create link if requested
if link_id and link_title:
link_json = {}
link_json["id"] = dset_id
link_json["class"] = "H5L_TYPE_HARD"
link_req = getDataNodeUrl(app, link_id)
link_req += "/groups/" + link_id + "/links/" + link_title
log.info("PUT link - : " + link_req)
put_rsp = await http_put(app, link_req, data=link_json, params=params)
log.debug(f"PUT Link resp: {put_rsp}")
await putHardLink(app, link_id, link_title, tgt_id=dset_id, bucket=bucket)

# dataset creation successful
resp = await jsonResponse(request, post_json, status=201)
Expand Down
Loading
Loading