Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multilink #296

Merged
merged 18 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 73 additions & 21 deletions hsds/attr_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import time
from bisect import bisect_left

from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound
from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound, HTTPGone
from aiohttp.web_exceptions import HTTPInternalServerError
from aiohttp.web import json_response

from .util.attrUtil import validateAttributeName
from .util.attrUtil import validateAttributeName, isEqualAttr
from .util.hdf5dtype import getItemSize, createDataType
from .util.dsetUtil import getShapeDims
from .util.arrayUtil import arrayToBytes, jsonToArray, decodeData
Expand Down Expand Up @@ -270,21 +270,31 @@ async def POST_Attributes(request):
if encoding:
kwargs["encoding"] = encoding

missing_names = set()

for attr_name in titles:
if attr_name not in attr_dict:
missing_names.add(attr_name)
continue
des_attr = _getAttribute(attr_name, obj_json, **kwargs)
attr_list.append(des_attr)

resp_json = {"attributes": attr_list}
if not attr_list:
msg = f"POST attributes - requested {len(titles)} but none were found"
log.warn(msg)
raise HTTPNotFound()
if len(attr_list) != len(titles):

if missing_names:
msg = f"POST attributes - requested {len(titles)} attributes but only "
msg += f"{len(attr_list)} were found"
log.warn(msg)
# one or more attributes not found, check to see if any
# had been previously deleted
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
for attr_name in missing_names:
if attr_name in attr_delete_set:
log.info(f"attribute: {attr_name} was previously deleted, returning 410")
raise HTTPGone()
log.info("one or mores attributes not found, returning 404")
raise HTTPNotFound()
log.debug(f"POST attributes returning: {resp_json}")
resp = json_response(resp_json)
Expand Down Expand Up @@ -392,18 +402,28 @@ async def PUT_Attributes(request):

attributes = obj_json["attributes"]

# check for conflicts, also set timestamp
create_time = time.time()
new_attribute = False # set this if we have any new attributes
# check for conflicts
new_attributes = set() # attribute names that are new or replacements
for attr_name in items:
attribute = items[attr_name]
if attr_name in attributes:
log.debug(f"attribute {attr_name} exists")
if replace:
old_item = attributes[attr_name]
try:
is_dup = isEqualAttr(attribute, old_item)
except TypeError:
log.error(f"isEqualAttr TypeError - new: {attribute} old: {old_item}")
raise HTTPInternalServerError()
if is_dup:
log.debug(f"duplicate attribute: {attr_name}")
continue
elif replace:
# don't change the create timestamp
log.debug(f"attribute {attr_name} exists, but will be updated")
old_item = attributes[attr_name]
attribute["created"] = old_item["created"]
new_attributes.add(attr_name)
else:
# Attribute already exists, return a 409
msg = f"Attempt to overwrite attribute: {attr_name} "
Expand All @@ -414,18 +434,30 @@ async def PUT_Attributes(request):
# set the timestamp
log.debug(f"new attribute {attr_name}")
attribute["created"] = create_time
new_attribute = True
new_attributes.add(attr_name)

# ok - all set, create the attributes
for attr_name in items:
# if any of the attribute names was previously deleted,
# remove from the deleted set
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()

# ok - all set, add the attributes
for attr_name in new_attributes:
log.debug(f"adding attribute {attr_name}")
attr_json = items[attr_name]
attributes[attr_name] = attr_json

# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

if new_attribute:
if attr_name in attr_delete_set:
attr_delete_set.remove(attr_name)

if new_attributes:
# update the obj lastModified
now = time.time()
obj_json["lastModified"] = now
# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
status = 201
else:
status = 200
Expand Down Expand Up @@ -490,15 +522,35 @@ async def DELETE_Attributes(request):
# return a list of attributes based on sorted dictionary keys
attributes = obj_json["attributes"]

# add attribute names to deleted set, so we can return a 410 if they
# are requested in the future
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()
deleted_attrs[obj_id] = attr_delete_set

save_obj = False # set to True if anything is actually modified
for attr_name in attr_names:
if attr_name in attr_delete_set:
log.warn(f"attribute {attr_name} already deleted")
continue

if attr_name not in attributes:
msg = f"Attribute {attr_name} not found in objid: {obj_id}"
msg = f"Attribute {attr_name} not found in obj id: {obj_id}"
log.warn(msg)
raise HTTPNotFound()

del attributes[attr_name]

await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
attr_delete_set.add(attr_name)
save_obj = True

if save_obj:
# update the object lastModified
now = time.time()
obj_json["lastModified"] = now
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

resp_json = {}
resp = json_response(resp_json)
Expand Down
28 changes: 3 additions & 25 deletions hsds/attr_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from .util.httpUtil import getHref
from .util.httpUtil import getAcceptType, jsonResponse
from .util.idUtil import isValidUuid, getCollectionForId, getRootObjId
from .util.idUtil import isValidUuid, getRootObjId
from .util.authUtil import getUserPasswordFromRequest, validateUserPassword
from .util.domainUtil import getDomainFromRequest, isValidDomain
from .util.domainUtil import getBucketForDomain, verifyRoot
Expand Down Expand Up @@ -1106,7 +1106,7 @@ async def PUT_AttributeValue(request):


async def POST_Attributes(request):
"""HTTP method to get multiple attribute values"""
"""HTTP method to get multiple attributes """
log.request(request)
app = request.app
log.info("POST_Attributes")
Expand Down Expand Up @@ -1247,7 +1247,6 @@ async def POST_Attributes(request):
elif len(items) == 1:
# just make a request the datanode
obj_id = list(items.keys())[0]
collection = getCollectionForId(obj_id)
attr_names = items[obj_id]
kwargs = {"attr_names": attr_names, "bucket": bucket}
if not include_data:
Expand All @@ -1259,12 +1258,6 @@ async def POST_Attributes(request):

attributes = await getAttributes(app, obj_id, **kwargs)

# mixin hrefs
for attribute in attributes:
attr_name = attribute["name"]
attr_href = f"/{collection}/{obj_id}/attributes/{attr_name}"
attribute["href"] = getHref(request, attr_href)

resp_json["attributes"] = attributes
else:
# get multi obj
Expand All @@ -1288,31 +1281,16 @@ async def POST_Attributes(request):
msg = f"DomainCrawler returned: {len(crawler._obj_dict)} objects"
log.info(msg)
attributes = crawler._obj_dict
# mixin hrefs
# log attributes returned for each obj_id
for obj_id in attributes:
obj_attributes = attributes[obj_id]
msg = f"POST_Attributes, obj_id {obj_id} "
msg += f"returned {len(obj_attributes)}"
log.debug(msg)

collection = getCollectionForId(obj_id)
for attribute in obj_attributes:
log.debug(f"attribute: {attribute}")
attr_name = attribute["name"]
attr_href = f"/{collection}/{obj_id}/attributes/{attr_name}"
attribute["href"] = getHref(request, attr_href)
log.debug(f"got {len(attributes)} attributes")
resp_json["attributes"] = attributes

hrefs = []
collection = getCollectionForId(req_id)
obj_uri = "/" + collection + "/" + req_id
href = getHref(request, obj_uri + "/attributes")
hrefs.append({"rel": "self", "href": href})
hrefs.append({"rel": "home", "href": getHref(request, "/")})
hrefs.append({"rel": "owner", "href": getHref(request, obj_uri)})
resp_json["hrefs"] = hrefs

resp = await jsonResponse(request, resp_json, ignore_nan=ignore_nan)
log.response(request, resp=resp)
return resp
Expand Down
11 changes: 0 additions & 11 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ async def write_chunk_hyperslab(
np_arr: numpy array of data to be written
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"write_chunk_hyperslab, chunk_id: {chunk_id}, slices: {slices}, "
msg += f"bucket: {bucket}"
log.info(msg)
Expand Down Expand Up @@ -181,8 +178,6 @@ async def read_chunk_hyperslab(
entire object)
bucket: s3 bucket to read from
"""
if not bucket:
bucket = config.get("bucket_name")

if chunk_map is None:
log.error("expected chunk_map to be set")
Expand Down Expand Up @@ -444,9 +439,6 @@ async def read_point_sel(
arr: numpy array to store read bytes
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"read_point_sel, chunk_id: {chunk_id}, bucket: {bucket}"
log.info(msg)

Expand Down Expand Up @@ -549,9 +541,6 @@ async def write_point_sel(
point_data: index of arr element to update for a given point
"""

if not bucket:
bucket = config.get("bucket_name")

msg = f"write_point_sel, chunk_id: {chunk_id}, points: {point_list}, "
msg += f"data: {point_data}"
log.info(msg)
Expand Down
17 changes: 4 additions & 13 deletions hsds/ctype_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from aiohttp.web_exceptions import HTTPBadRequest, HTTPGone
from json import JSONDecodeError
from .util.httpUtil import http_post, http_put, http_delete, getHref, respJsonAssemble
from .util.httpUtil import http_post, http_delete, getHref, respJsonAssemble
from .util.httpUtil import jsonResponse
from .util.idUtil import isValidUuid, getDataNodeUrl, createObjId
from .util.authUtil import getUserPasswordFromRequest, aclCheck
Expand All @@ -25,7 +25,7 @@
from .util.domainUtil import getBucketForDomain, verifyRoot
from .util.hdf5dtype import validateTypeItem, getBaseTypeJson
from .servicenode_lib import getDomainJson, getObjectJson, validateAction
from .servicenode_lib import getObjectIdByPath, getPathForObjectId
from .servicenode_lib import getObjectIdByPath, getPathForObjectId, putHardLink
from . import hsds_logger as log


Expand Down Expand Up @@ -223,22 +223,13 @@ async def POST_Datatype(request):
ctype_json = {"id": ctype_id, "root": root_id, "type": datatype}
log.debug(f"create named type, body: {ctype_json}")
req = getDataNodeUrl(app, ctype_id) + "/datatypes"
params = {}
if bucket:
params["bucket"] = bucket
params = {"bucket": bucket}

type_json = await http_post(app, req, data=ctype_json, params=params)

# create link if requested
if link_id and link_title:
link_json = {}
link_json["id"] = ctype_id
link_json["class"] = "H5L_TYPE_HARD"
link_req = getDataNodeUrl(app, link_id)
link_req += "/groups/" + link_id + "/links/" + link_title
log.debug("PUT link - : " + link_req)
put_rsp = await http_put(app, link_req, data=link_json, params=params)
log.debug(f"PUT Link resp: {put_rsp}")
await putHardLink(app, link_id, link_title, tgt_id=ctype_id, bucket=bucket)

# datatype creation successful
resp = await jsonResponse(request, type_json, status=201)
Expand Down
10 changes: 6 additions & 4 deletions hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from .domain_dn import GET_Domain, PUT_Domain, DELETE_Domain, PUT_ACL
from .group_dn import GET_Group, POST_Group, DELETE_Group, PUT_Group
from .group_dn import POST_Root
from .link_dn import GET_Links, GET_Link, PUT_Link, DELETE_Link
from .link_dn import GET_Links, POST_Links, PUT_Links, DELETE_Links
from .attr_dn import GET_Attributes, POST_Attributes
from .attr_dn import PUT_Attributes, DELETE_Attributes
from .ctype_dn import GET_Datatype, POST_Datatype, DELETE_Datatype
Expand Down Expand Up @@ -59,9 +59,9 @@ async def init():
app.router.add_route("PUT", "/groups/{id}", PUT_Group)
app.router.add_route("POST", "/groups", POST_Group)
app.router.add_route("GET", "/groups/{id}/links", GET_Links)
app.router.add_route("GET", "/groups/{id}/links/{title}", GET_Link)
app.router.add_route("DELETE", "/groups/{id}/links/{title}", DELETE_Link)
app.router.add_route("PUT", "/groups/{id}/links/{title}", PUT_Link)
app.router.add_route("POST", "/groups/{id}/links", POST_Links)
app.router.add_route("DELETE", "/groups/{id}/links", DELETE_Links)
app.router.add_route("PUT", "/groups/{id}/links", PUT_Links)
app.router.add_route("GET", "/groups/{id}/attributes", GET_Attributes)
app.router.add_route("POST", "/groups/{id}/attributes", POST_Attributes)
app.router.add_route("DELETE", "/groups/{id}/attributes", DELETE_Attributes)
Expand Down Expand Up @@ -299,6 +299,8 @@ def create_app():
}
app["chunk_cache"] = LruCache(**kwargs)
app["deleted_ids"] = set()
app["deleted_attrs"] = {} # map of objectid to set of deleted attribute names
app["deleted_links"] = {} # map of objecctid to set of deleted link names
# map of objids to timestamp and bucket of which they were last updated
app["dirty_ids"] = {}
# map of dataset ids to deflate levels (if compressed)
Expand Down
Loading
Loading