Skip to content

Commit

Permalink
post method for links
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Jan 8, 2024
1 parent 634e6b1 commit acec4be
Show file tree
Hide file tree
Showing 12 changed files with 643 additions and 136 deletions.
28 changes: 3 additions & 25 deletions hsds/attr_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from .util.httpUtil import getHref
from .util.httpUtil import getAcceptType, jsonResponse
from .util.idUtil import isValidUuid, getCollectionForId, getRootObjId
from .util.idUtil import isValidUuid, getRootObjId
from .util.authUtil import getUserPasswordFromRequest, validateUserPassword
from .util.domainUtil import getDomainFromRequest, isValidDomain
from .util.domainUtil import getBucketForDomain, verifyRoot
Expand Down Expand Up @@ -1106,7 +1106,7 @@ async def PUT_AttributeValue(request):


async def POST_Attributes(request):
"""HTTP method to get multiple attribute values"""
"""HTTP method to get multiple attributes """
log.request(request)
app = request.app
log.info("POST_Attributes")
Expand Down Expand Up @@ -1247,7 +1247,6 @@ async def POST_Attributes(request):
elif len(items) == 1:
# just make a request the datanode
obj_id = list(items.keys())[0]
collection = getCollectionForId(obj_id)
attr_names = items[obj_id]
kwargs = {"attr_names": attr_names, "bucket": bucket}
if not include_data:
Expand All @@ -1259,12 +1258,6 @@ async def POST_Attributes(request):

attributes = await getAttributes(app, obj_id, **kwargs)

# mixin hrefs
for attribute in attributes:
attr_name = attribute["name"]
attr_href = f"/{collection}/{obj_id}/attributes/{attr_name}"
attribute["href"] = getHref(request, attr_href)

resp_json["attributes"] = attributes
else:
# get multi obj
Expand All @@ -1288,31 +1281,16 @@ async def POST_Attributes(request):
msg = f"DomainCrawler returned: {len(crawler._obj_dict)} objects"
log.info(msg)
attributes = crawler._obj_dict
# mixin hrefs
# log attributes returned for each obj_id
for obj_id in attributes:
obj_attributes = attributes[obj_id]
msg = f"POST_Attributes, obj_id {obj_id} "
msg += f"returned {len(obj_attributes)}"
log.debug(msg)

collection = getCollectionForId(obj_id)
for attribute in obj_attributes:
log.debug(f"attribute: {attribute}")
attr_name = attribute["name"]
attr_href = f"/{collection}/{obj_id}/attributes/{attr_name}"
attribute["href"] = getHref(request, attr_href)
log.debug(f"got {len(attributes)} attributes")
resp_json["attributes"] = attributes

hrefs = []
collection = getCollectionForId(req_id)
obj_uri = "/" + collection + "/" + req_id
href = getHref(request, obj_uri + "/attributes")
hrefs.append({"rel": "self", "href": href})
hrefs.append({"rel": "home", "href": getHref(request, "/")})
hrefs.append({"rel": "owner", "href": getHref(request, obj_uri)})
resp_json["hrefs"] = hrefs

resp = await jsonResponse(request, resp_json, ignore_nan=ignore_nan)
log.response(request, resp=resp)
return resp
Expand Down
11 changes: 0 additions & 11 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ async def write_chunk_hyperslab(
np_arr: numpy array of data to be written
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"write_chunk_hyperslab, chunk_id: {chunk_id}, slices: {slices}, "
msg += f"bucket: {bucket}"
log.info(msg)
Expand Down Expand Up @@ -181,8 +178,6 @@ async def read_chunk_hyperslab(
entire object)
bucket: s3 bucket to read from
"""
if not bucket:
bucket = config.get("bucket_name")

if chunk_map is None:
log.error("expected chunk_map to be set")
Expand Down Expand Up @@ -444,9 +439,6 @@ async def read_point_sel(
arr: numpy array to store read bytes
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"read_point_sel, chunk_id: {chunk_id}, bucket: {bucket}"
log.info(msg)

Expand Down Expand Up @@ -549,9 +541,6 @@ async def write_point_sel(
point_data: index of arr element to update for a given point
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"write_point_sel, chunk_id: {chunk_id}, points: {point_list}, "
msg += f"data: {point_data}"
log.info(msg)
Expand Down
112 changes: 108 additions & 4 deletions hsds/domain_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
from aiohttp.web_exceptions import HTTPServiceUnavailable, HTTPConflict, HTTPBadRequest
from aiohttp.web_exceptions import HTTPInternalServerError, HTTPNotFound, HTTPGone


from .util.idUtil import getCollectionForId, getDataNodeUrl

from .servicenode_lib import getObjectJson, getAttributes, putAttributes
from .servicenode_lib import getObjectJson, getAttributes, putAttributes, getLinks
from . import hsds_logger as log


Expand Down Expand Up @@ -210,6 +208,93 @@ async def get_obj_json(self, obj_id):
self._obj_dict[link_id] = {} # placeholder for obj id
self._q.put_nowait(link_id)

async def get_links(self, grp_id, titles=None):
""" if titles is set, get all the links in grp_id that
have a title in the list. Otherwise, return all links for the object. """
log.debug(f"get_links: {grp_id}, titles; {titles}")
collection = getCollectionForId(grp_id)
if collection != "groups":
log.warn(f"get_links, expected groups id but got: {grp_id}")
return
kwargs = {}
if titles:
kwargs["titles"] = titles
if self._params.get("bucket"):
kwargs["bucket"] = self._params["bucket"]

if self._params.get("follow_links"):
follow_links = True
else:
follow_links = False

log.debug(f"follow_links: {follow_links}")
log.debug(f"getLinks kwargs: {kwargs}")

links = None
status = 200
try:
links = await getLinks(self._app, grp_id, **kwargs)
except HTTPNotFound:
status = 404
except HTTPServiceUnavailable:
status = 503
except HTTPInternalServerError:
status = 500
except Exception as e:
log.error(f"unexpected exception {e}")
status = 500
log.debug(f"getObjectJson status: {status}")

if links is None:
msg = f"DomainCrawler - get_links for {grp_id} "
if status >= 500:
msg += f"failed, status: {status}"
log.error(msg)
else:
msg += f"returned status: {status}"
log.warn(msg)
return

log.debug(f"DomainCrawler - got links for {grp_id}")
log.debug(f"save to obj_dict: {links}")

self._obj_dict[grp_id] = links # store the links

# if follow_links, add any group links to the lookup ids set
if follow_links:
log.debug(f"follow links for {grp_id}")
for link_obj in links:
log.debug(f"follow links for: {link_obj}")
if 'title' not in link_obj:
log.warn(f"expected to find title in link_json: {link_obj}")
continue
title = link_obj["title"]
log.debug(f"DomainCrawler - got link: {title}")
num_objects = len(self._obj_dict)
if self._params.get("max_objects_limit") is not None:
max_objects_limit = self._params["max_objects_limit"]
if num_objects >= max_objects_limit:
msg = "DomainCrawler reached limit of "
msg += f"{max_objects_limit}"
log.info(msg)
break
if link_obj["class"] != "H5L_TYPE_HARD":
# just follow hardlinks
log.debug("not hard link,continue")
continue
link_id = link_obj["id"]
if getCollectionForId(link_id) != "groups":
# only groups can have links
log.debug(f"link id: {link_id} is not for a group, continue")
continue
if link_id not in self._obj_dict:
# haven't seen this object yet, get obj json
log.debug(f"DomainCrawler - adding link_id: {link_id} to queue")
self._obj_dict[link_id] = {} # placeholder for obj id
self._q.put_nowait(link_id)
else:
log.debug(f"link: {link_id} already in object dict")

def get_status(self):
""" return the highest status of any of the returned objects """
status = None
Expand Down Expand Up @@ -304,7 +389,7 @@ async def fetch(self, obj_id):
log.warn("expected at least one name in attr_names list")
return

log.debug(f"DomainCrawler - got attribute names: {attr_names}")
log.debug(f"DomainCrawler - get attribute names: {attr_names}")
await self.get_attributes(obj_id, attr_names)
elif self._action == "put_attr":
log.debug("DomainCrawler - put attributes")
Expand All @@ -316,6 +401,25 @@ async def fetch(self, obj_id):
log.debug(f"got {len(attr_items)} attr_items")

await self.put_attributes(obj_id, attr_items)
elif self._action == "get_link":
log.debug("DomainCrawlwer - get links")
if obj_id not in self._objs:
link_titles = None # fetch all links for this object
else:
link_titles = self._objs[obj_id]
if link_titles is None:
log.debug(f"fetch all links for {obj_id}")
else:
if not isinstance(link_titles, list):
log.error("expected list for link titles")
return
if len(link_titles) == 0:
log.warn("expected at least one name in link titles list")
return

log.debug(f"DomainCrawler - get link titles: {link_titles}")
await self.get_links(obj_id, link_titles)

else:
msg = f"DomainCrawler: unexpected action: {self._action}"
log.error(msg)
Expand Down
14 changes: 1 addition & 13 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ async def GET_Domain(request):
bucket = getBucketForDomain(domain)
log.debug(f"GET_Domain domain: {domain} bucket: {bucket}")

if not bucket and not config.get("bucket_name"):
if not bucket:
# no bucket defined, raise 400
msg = "Bucket not provided"
log.warn(msg)
Expand Down Expand Up @@ -1354,10 +1354,6 @@ async def GET_Datasets(request):
raise HTTPBadRequest(reason=msg)

bucket = getBucketForDomain(domain)
if not bucket:
bucket = config.get("bucket_name")
else:
checkBucketAccess(app, bucket)

# verify the domain
try:
Expand Down Expand Up @@ -1448,10 +1444,6 @@ async def GET_Groups(request):
raise HTTPBadRequest(reason=msg)

bucket = getBucketForDomain(domain)
if not bucket:
bucket = config.get("bucket_name")
else:
checkBucketAccess(app, bucket)

# use reload to get authoritative domain json
try:
Expand Down Expand Up @@ -1537,10 +1529,6 @@ async def GET_Datatypes(request):
raise HTTPBadRequest(reason=msg)

bucket = getBucketForDomain(domain)
if not bucket:
bucket = config.get("bucket_name")
else:
checkBucketAccess(app, bucket)

# use reload to get authoritative domain json
try:
Expand Down
51 changes: 38 additions & 13 deletions hsds/link_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from aiohttp.web_exceptions import HTTPInternalServerError
from aiohttp.web import json_response

from .util.arrayUtil import decodeData
from .util.idUtil import isValidUuid
from .util.linkUtil import validateLinkName
from .datanode_lib import get_obj_id, get_metadata_obj, save_metadata_obj
Expand Down Expand Up @@ -185,7 +184,44 @@ async def POST_Links(request):
log.info(f"Link name {title} not found in group: {group_id}")
raise HTTPNotFound()
link_json = links[title]
link_list.append(link_json)
item = {}
if "class" not in link_json:
log.warn(f"expected to find class key for link: {title}")
continue
link_class = link_json["class"]
item["class"] = link_class
if "created" not in link_json:
log.warn(f"expected to find created time for link: {title}")
link_created = 0
else:
link_created = link_json["created"]
item["created"] = link_created
if link_class == "H5L_TYPE_HARD":
if "id" not in link_json:
log.warn(f"expected to id for hard linK: {title}")
continue
item["id"] = link_json["id"]
elif link_class == "H5L_TYPE_SOFT":
if "h5path" not in link_json:
log.warn(f"expected to find h5path for soft link: {title}")
continue
item["h5path"] = link_json["h5path"]
elif link_class == "H5L_TYPE_EXTERNAL":
if "h5path" not in link_json:
log.warn(f"expected to find h5path for external link: {title}")
continue
item["h5path"] = link_json["h5path"]
if "h5domain" not in link_json:
log.warn(f"expted to find h5domain for external link: {title}")
continue
item["h5domain"] = link_json["h5domain"]
else:
log.warn(f"unexpected to link class {link_class} for link: {title}")
continue

item["title"] = title

link_list.append(item)

if not link_list:
msg = f"POST link - requested {len(titles)} but none were found"
Expand Down Expand Up @@ -328,15 +364,6 @@ async def DELETE_Links(request):
log.warn(msg)
raise HTTPBadRequest(reason=msg)

if "encoding" in params:
encoding = params["encoding"]
if encoding != "base64":
msg = "only base64 encoding is supported"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
else:
encoding = None

if "separator" in params:
separator = params["separator"]
else:
Expand All @@ -358,8 +385,6 @@ async def DELETE_Links(request):
raise HTTPBadRequest(reason=msg)

titles_param = params["titles"]
if encoding:
titles_param = decodeData(titles_param).decode("utf-8")

titles = titles_param.split(separator)

Expand Down
Loading

0 comments on commit acec4be

Please sign in to comment.