Skip to content

Commit

Permalink
multiop put for links
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Jan 12, 2024
1 parent db68988 commit f9df229
Show file tree
Hide file tree
Showing 13 changed files with 1,122 additions and 117 deletions.
94 changes: 73 additions & 21 deletions hsds/attr_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import time
from bisect import bisect_left

from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound
from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound, HTTPGone
from aiohttp.web_exceptions import HTTPInternalServerError
from aiohttp.web import json_response

from .util.attrUtil import validateAttributeName
from .util.attrUtil import validateAttributeName, isEqualAttr
from .util.hdf5dtype import getItemSize, createDataType
from .util.dsetUtil import getShapeDims
from .util.arrayUtil import arrayToBytes, jsonToArray, decodeData
Expand Down Expand Up @@ -270,21 +270,31 @@ async def POST_Attributes(request):
if encoding:
kwargs["encoding"] = encoding

missing_names = set()

for attr_name in titles:
if attr_name not in attr_dict:
missing_names.add(attr_name)
continue
des_attr = _getAttribute(attr_name, obj_json, **kwargs)
attr_list.append(des_attr)

resp_json = {"attributes": attr_list}
if not attr_list:
msg = f"POST attributes - requested {len(titles)} but none were found"
log.warn(msg)
raise HTTPNotFound()
if len(attr_list) != len(titles):

if missing_names:
msg = f"POST attributes - requested {len(titles)} attributes but only "
msg += f"{len(attr_list)} were found"
log.warn(msg)
# one or more attributes not found, check to see if any
# had been previously deleted
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
for attr_name in missing_names:
if attr_name in attr_delete_set:
log.info(f"attribute: {attr_name} was previously deleted, returning 410")
raise HTTPGone()
log.info("one or mores attributes not found, returning 404")
raise HTTPNotFound()
log.debug(f"POST attributes returning: {resp_json}")
resp = json_response(resp_json)
Expand Down Expand Up @@ -392,18 +402,28 @@ async def PUT_Attributes(request):

attributes = obj_json["attributes"]

# check for conflicts, also set timestamp
create_time = time.time()
new_attribute = False # set this if we have any new attributes
# check for conflicts
new_attributes = set() # attribute names that are new or replacements
for attr_name in items:
attribute = items[attr_name]
if attr_name in attributes:
log.debug(f"attribute {attr_name} exists")
if replace:
old_item = attributes[attr_name]
try:
is_dup = isEqualAttr(attribute, old_item)
except TypeError:
log.error(f"isEqualAttr TypeError - new: {attribute} old: {old_item}")
raise HTTPInternalServerError()
if is_dup:
log.debug(f"duplicate attribute: {attr_name}")
continue
elif replace:
# don't change the create timestamp
log.debug(f"attribute {attr_name} exists, but will be updated")
old_item = attributes[attr_name]
attribute["created"] = old_item["created"]
new_attributes.add(attr_name)
else:
# Attribute already exists, return a 409
msg = f"Attempt to overwrite attribute: {attr_name} "
Expand All @@ -414,18 +434,30 @@ async def PUT_Attributes(request):
# set the timestamp
log.debug(f"new attribute {attr_name}")
attribute["created"] = create_time
new_attribute = True
new_attributes.add(attr_name)

# ok - all set, create the attributes
for attr_name in items:
# if any of the attribute names was previously deleted,
# remove from the deleted set
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()

# ok - all set, add the attributes
for attr_name in new_attributes:
log.debug(f"adding attribute {attr_name}")
attr_json = items[attr_name]
attributes[attr_name] = attr_json

# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

if new_attribute:
if attr_name in attr_delete_set:
attr_delete_set.remove(attr_name)

if new_attributes:
# update the obj lastModified
now = time.time()
obj_json["lastModified"] = now
# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
status = 201
else:
status = 200
Expand Down Expand Up @@ -490,15 +522,35 @@ async def DELETE_Attributes(request):
# return a list of attributes based on sorted dictionary keys
attributes = obj_json["attributes"]

# add attribute names to deleted set, so we can return a 410 if they
# are requested in the future
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()
deleted_attrs[obj_id] = attr_delete_set

save_obj = False # set to True if anything is actually modified
for attr_name in attr_names:
if attr_name in attr_delete_set:
log.warn(f"attribute {attr_name} already deleted")
continue

if attr_name not in attributes:
msg = f"Attribute {attr_name} not found in objid: {obj_id}"
msg = f"Attribute {attr_name} not found in obj id: {obj_id}"
log.warn(msg)
raise HTTPNotFound()

del attributes[attr_name]

await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
attr_delete_set.add(attr_name)
save_obj = True

if save_obj:
# update the object lastModified
now = time.time()
obj_json["lastModified"] = now
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

resp_json = {}
resp = json_response(resp_json)
Expand Down
2 changes: 2 additions & 0 deletions hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ def create_app():
}
app["chunk_cache"] = LruCache(**kwargs)
app["deleted_ids"] = set()
app["deleted_attrs"] = {} # map of objectid to set of deleted attribute names
app["deleted_links"] = {} # map of objecctid to set of deleted link names
# map of objids to timestamp and bucket of which they were last updated
app["dirty_ids"] = {}
# map of dataset ids to deflate levels (if compressed)
Expand Down
35 changes: 34 additions & 1 deletion hsds/domain_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from aiohttp.web_exceptions import HTTPInternalServerError, HTTPNotFound, HTTPGone

from .util.idUtil import getCollectionForId, getDataNodeUrl
from .servicenode_lib import getObjectJson, getAttributes, putAttributes, getLinks
from .servicenode_lib import getObjectJson, getAttributes, putAttributes, getLinks, putLinks
from . import hsds_logger as log


Expand Down Expand Up @@ -295,6 +295,30 @@ async def get_links(self, grp_id, titles=None):
else:
log.debug(f"link: {link_id} already in object dict")

async def put_links(self, grp_id, link_items):
# write the given links for the obj_id
log.debug(f"put_links for {grp_id}, {len(link_items)} links")
req = getDataNodeUrl(self._app, grp_id)
req += f"/groups/{grp_id}/links"
kwargs = {}
if "bucket" in self._params:
kwargs["bucket"] = self._params["bucket"]
status = None
try:
status = await putLinks(self._app, grp_id, link_items, **kwargs)
except HTTPConflict:
log.warn("DomainCrawler - got HTTPConflict from http_put")
status = 409
except HTTPServiceUnavailable:
status = 503
except HTTPInternalServerError:
status = 500
except Exception as e:
log.error(f"unexpected exception {e}")

log.debug(f"DomainCrawler fetch for {grp_id} - returning status: {status}")
self._obj_dict[grp_id] = {"status": status}

def get_status(self):
""" return the highest status of any of the returned objects """
status = None
Expand Down Expand Up @@ -419,7 +443,16 @@ async def fetch(self, obj_id):

log.debug(f"DomainCrawler - get link titles: {link_titles}")
await self.get_links(obj_id, link_titles)
elif self._action == "put_link":
log.debug("DomainCrawlwer - put links")
# write links
if self._objs and obj_id not in self._objs:
log.error(f"couldn't find {obj_id} in self._objs")
return
link_items = self._objs[obj_id]
log.debug(f"got {len(link_items)} link items for {obj_id}")

await self.put_links(obj_id, link_items)
else:
msg = f"DomainCrawler: unexpected action: {self._action}"
log.error(msg)
Expand Down
125 changes: 125 additions & 0 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,131 @@ async def getScanTime(app, root_id, bucket=None):
return root_scan


async def POST_Domain(request):
""" return object defined by h5path list """

log.request(request)
app = request.app
params = request.rel_url.query
log.debug(f"POST_Domain query params: {params}")

include_links = False
include_attrs = False
follow_soft_links = False
follow_external_links = False

if "include_links" in params and params["include_links"]:
include_links = True
if "include_attrs" in params and params["include_attrs"]:
include_attrs = True
if "follow_soft_links" in params and params["follow_soft_links"]:
follow_soft_links = True
if "follow_external_links" in params and params["follow_external_links"]:
follow_external_links = True

if not request.has_body:
msg = "POST Domain with no body"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

try:
body = await request.json()
except json.JSONDecodeError:
msg = "Unable to load JSON body"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

if "h5paths" in body:
h5paths = body["h5paths"]
if not isinstance(h5paths, list):
msg = f"expected list for h5paths but got: {type(h5paths)}"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
else:
msg = "expected h5paths key in body"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

(username, pswd) = getUserPasswordFromRequest(request)
if username is None and app["allow_noauth"]:
username = "default"
else:
await validateUserPassword(app, username, pswd)

domain = None
try:
domain = getDomainFromRequest(request)
except ValueError:
log.warn(f"Invalid domain: {domain}")
raise HTTPBadRequest(reason="Invalid domain name")

bucket = getBucketForDomain(domain)
log.debug(f"GET_Domain domain: {domain} bucket: {bucket}")

if not bucket:
# no bucket defined, raise 400
msg = "Bucket not provided"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
if bucket:
checkBucketAccess(app, bucket)

if not domain:
msg = "no domain given"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

log.info(f"got domain: {domain}")

domain_json = await getDomainJson(app, domain, reload=True)

if domain_json is None:
log.warn(f"domain: {domain} not found")
raise HTTPNotFound()

if "acls" not in domain_json:
log.error("No acls key found in domain")
raise HTTPInternalServerError()

log.debug(f"got domain_json: {domain_json}")
# validate that the requesting user has permission to read this domain
# aclCheck throws exception if not authorized
aclCheck(app, domain_json, "read", username)

json_objs = {}

for h5path in h5paths:
root_id = domain_json["root"]

# getObjectIdByPath throws 404 if not found
obj_id, domain, _ = await getObjectIdByPath(
app, root_id, h5path, bucket=bucket, domain=domain,
follow_soft_links=follow_soft_links,
follow_external_links=follow_external_links)
log.info(f"get obj_id: {obj_id} from h5path: {h5path}")
# get authoritative state for object from DN (even if
# it's in the meta_cache).
kwargs = {"refresh": True, "bucket": bucket,
"include_attrs": include_attrs, "include_links": include_links}
log.debug(f"kwargs for getObjectJson: {kwargs}")

obj_json = await getObjectJson(app, obj_id, **kwargs)

obj_json = respJsonAssemble(obj_json, params, obj_id)

obj_json["domain"] = getPathForDomain(domain)

# client may not know class of object retrieved via path
obj_json["class"] = getObjectClass(obj_id)

json_objs[h5path] = obj_json

jsonRsp = {"h5paths": json_objs}
resp = await jsonResponse(request, jsonRsp)
log.response(request, resp=resp)
return resp


async def PUT_Domain(request):
"""HTTP method to create a new domain"""
log.request(request)
Expand Down
Loading

0 comments on commit f9df229

Please sign in to comment.