Skip to content

Commit

Permalink
Multilink (#296)
Browse files Browse the repository at this point in the history
* refactor link dn

* post method for links

* updates based on PR feedback

* multiop put for links

* remove unreachable code

* update version string

* added multi get obj test

* cleanup TBD comments

* fix flake error

* fix parent_id for POST domain, add pattern for links

* update per PR comments

* added max_data_size for attr, pattern for link

* added follow_links for get attrs

* DomainCrawler refactor

* add pattern matching to post links

* added crawler support for limit, encoding, patterna dn create_order

* added getBoolanParam util

* fix flake8 errors
  • Loading branch information
jreadey authored Jan 24, 2024
1 parent add4033 commit 9fa156c
Show file tree
Hide file tree
Showing 26 changed files with 3,265 additions and 777 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Make sure you have Python 3 and Pip installed, then:
- Set user_name: `$ export USER_NAME=$USER`
- Set user_password: `$ export USER_PASSWORD=$USER`
- Set admin name: `$ export ADMIN_USERNAME=$USER`
- Set admin password: `$ $export ADMIN_PASSWORD=$USER`
- Set admin password: `$ export ADMIN_PASSWORD=$USER`
- Run test suite: `$ python testall.py --skip_unit`
5. (Optional) Install the h5pyd package for an h5py compatible api and tool suite: https://github.com/HDFGroup/h5pyd
6. (Optional) Post install setup (test data, home folders, cli tools, etc): [docs/post_install.md](docs/post_install.md)
Expand Down
2 changes: 1 addition & 1 deletion admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ http_streaming: true # enable HTTP streaming
k8s_dn_label_selector: app=hsds # Selector for getting data node pods from a k8s deployment (https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors)
k8s_namespace: null # Specifies if a the client should be limited to a specific namespace. Useful for some RBAC configurations.
restart_policy: on-failure # Docker restart policy
domain_req_max_objects_limit: 500 # maximum number of objects to return in GET domain request with use_cache
# the following two values with give backoff times of approx: 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, 12.8
dn_max_retries: 7 # number of time to retry DN requests
dn_retry_backoff_exp: 0.1 # backoff factor for retries
Expand All @@ -99,3 +98,4 @@ data_cache_max_req_size: 128k # max size for rangeget fetches
data_cache_expire_time: 3600 # expire cache items after one hour
data_cache_page_size: 4m # page size for range get cache, set to zero to disable proxy
data_cache_max_concurrent_read: 16 # maximum number of inflight storage read requests
domain_req_max_objects_limit: 500 # maximum number of objects to return in GET domain request with use_cache
140 changes: 116 additions & 24 deletions hsds/attr_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
import time
from bisect import bisect_left

from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound
from aiohttp.web_exceptions import HTTPBadRequest, HTTPConflict, HTTPNotFound, HTTPGone
from aiohttp.web_exceptions import HTTPInternalServerError
from aiohttp.web import json_response

from .util.attrUtil import validateAttributeName
from .util.attrUtil import validateAttributeName, isEqualAttr
from .util.hdf5dtype import getItemSize, createDataType
from .util.globparser import globmatch
from .util.dsetUtil import getShapeDims
from .util.arrayUtil import arrayToBytes, jsonToArray, decodeData
from .util.arrayUtil import bytesToArray, bytesArrayToList
from .util.arrayUtil import bytesToArray, bytesArrayToList, getNumElements
from .datanode_lib import get_obj_id, get_metadata_obj, save_metadata_obj
from . import hsds_logger as log

Expand All @@ -43,7 +44,7 @@ def _index(items, marker, create_order=False):
return -1


def _getAttribute(attr_name, obj_json, include_data=True, encoding=None):
def _getAttribute(attr_name, obj_json, include_data=True, max_data_size=0, encoding=None):
""" copy relevant fields from src to target """

if not isinstance(obj_json, dict):
Expand Down Expand Up @@ -89,6 +90,26 @@ def _getAttribute(attr_name, obj_json, include_data=True, encoding=None):
encoding = None
log.debug("base64 encoding requested")

if include_data and max_data_size > 0:
# check that the size of the data is not greater than the limit
item_size = getItemSize(type_json)
if item_size == "H5T_VARIABLE":
# could be anything, just guess as 512 bytes per element
# TBD: determine exact size
item_size = 512
dims = getShapeDims(shape_json)
num_elements = getNumElements(dims)
attr_size = item_size * num_elements
if attr_size > max_data_size:
msg = f"{attr_name} size of {attr_size} is "
msg += "larger than max_data_size, excluding data"
log.debug(msg)
include_data = False
else:
msg = f"{attr_name} size of {attr_size} is "
msg += "not larger than max_data_size, including data"
log.debug(msg)

if include_data:
value_json = src_attr["value"]
if "encoding" in src_attr:
Expand Down Expand Up @@ -143,11 +164,18 @@ async def GET_Attributes(request):
if params.get("IncludeData"):
include_data = True

max_data_size = 0
if params.get("max_data_size"):
max_data_size = int(params["max_data_size"])
pattern = None
if params.get("pattern"):
pattern = params["pattern"]

limit = None
if "Limit" in params:
try:
limit = int(params["Limit"])
log.info("GET_Links - using Limit: {}".format(limit))
log.info(f"GET_Attributes - using Limit: {limit}")
except ValueError:
msg = "Bad Request: Expected int type for limit"
log.error(msg) # should be validated by SN
Expand Down Expand Up @@ -204,7 +232,14 @@ async def GET_Attributes(request):
attr_list = []
for i in range(start_index, end_index):
attr_name = titles[i]
if pattern:
if not globmatch(attr_name, pattern):
log.debug(f"attr_name: {attr_name} did not match pattern: {pattern}")
continue

kwargs = {"include_data": include_data, "encoding": encoding}
if include_data:
kwargs["max_data_size"] = max_data_size
log.debug(f"_getAttribute kwargs: {kwargs}")
des_attr = _getAttribute(attr_name, obj_json, **kwargs)
attr_list.append(des_attr)
Expand Down Expand Up @@ -249,6 +284,9 @@ async def POST_Attributes(request):
if "IncludeData" in params and params["IncludeData"]:
include_data = True
log.debug("include attr data")
max_data_size = 0
if params.get("max_data_size"):
max_data_size = int(params["max_data_size"])
if params.get("encoding"):
encoding = params["encoding"]
log.debug("POST_Attributes requested base64 encoding")
Expand All @@ -269,22 +307,34 @@ async def POST_Attributes(request):
kwargs = {"include_data": include_data}
if encoding:
kwargs["encoding"] = encoding
if max_data_size > 0:
kwargs["max_data_size"] = max_data_size

missing_names = set()

for attr_name in titles:
if attr_name not in attr_dict:
missing_names.add(attr_name)
continue
des_attr = _getAttribute(attr_name, obj_json, **kwargs)
attr_list.append(des_attr)

resp_json = {"attributes": attr_list}
if not attr_list:
msg = f"POST attributes - requested {len(titles)} but none were found"
log.warn(msg)
raise HTTPNotFound()
if len(attr_list) != len(titles):

if missing_names:
msg = f"POST attributes - requested {len(titles)} attributes but only "
msg += f"{len(attr_list)} were found"
log.warn(msg)
# one or more attributes not found, check to see if any
# had been previously deleted
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
for attr_name in missing_names:
if attr_name in attr_delete_set:
log.info(f"attribute: {attr_name} was previously deleted, returning 410")
raise HTTPGone()
log.info("one or mores attributes not found, returning 404")
raise HTTPNotFound()
log.debug(f"POST attributes returning: {resp_json}")
resp = json_response(resp_json)
Expand Down Expand Up @@ -392,18 +442,28 @@ async def PUT_Attributes(request):

attributes = obj_json["attributes"]

# check for conflicts, also set timestamp
create_time = time.time()
new_attribute = False # set this if we have any new attributes
# check for conflicts
new_attributes = set() # attribute names that are new or replacements
for attr_name in items:
attribute = items[attr_name]
if attr_name in attributes:
log.debug(f"attribute {attr_name} exists")
if replace:
old_item = attributes[attr_name]
try:
is_dup = isEqualAttr(attribute, old_item)
except TypeError:
log.error(f"isEqualAttr TypeError - new: {attribute} old: {old_item}")
raise HTTPInternalServerError()
if is_dup:
log.debug(f"duplicate attribute: {attr_name}")
continue
elif replace:
# don't change the create timestamp
log.debug(f"attribute {attr_name} exists, but will be updated")
old_item = attributes[attr_name]
attribute["created"] = old_item["created"]
new_attributes.add(attr_name)
else:
# Attribute already exists, return a 409
msg = f"Attempt to overwrite attribute: {attr_name} "
Expand All @@ -414,18 +474,30 @@ async def PUT_Attributes(request):
# set the timestamp
log.debug(f"new attribute {attr_name}")
attribute["created"] = create_time
new_attribute = True
new_attributes.add(attr_name)

# ok - all set, create the attributes
for attr_name in items:
# if any of the attribute names was previously deleted,
# remove from the deleted set
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()

# ok - all set, add the attributes
for attr_name in new_attributes:
log.debug(f"adding attribute {attr_name}")
attr_json = items[attr_name]
attributes[attr_name] = attr_json

# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

if new_attribute:
if attr_name in attr_delete_set:
attr_delete_set.remove(attr_name)

if new_attributes:
# update the obj lastModified
now = time.time()
obj_json["lastModified"] = now
# write back to S3, save to metadata cache
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
status = 201
else:
status = 200
Expand Down Expand Up @@ -490,15 +562,35 @@ async def DELETE_Attributes(request):
# return a list of attributes based on sorted dictionary keys
attributes = obj_json["attributes"]

# add attribute names to deleted set, so we can return a 410 if they
# are requested in the future
deleted_attrs = app["deleted_attrs"]
if obj_id in deleted_attrs:
attr_delete_set = deleted_attrs[obj_id]
else:
attr_delete_set = set()
deleted_attrs[obj_id] = attr_delete_set

save_obj = False # set to True if anything is actually modified
for attr_name in attr_names:
if attr_name in attr_delete_set:
log.warn(f"attribute {attr_name} already deleted")
continue

if attr_name not in attributes:
msg = f"Attribute {attr_name} not found in objid: {obj_id}"
msg = f"Attribute {attr_name} not found in obj id: {obj_id}"
log.warn(msg)
raise HTTPNotFound()

del attributes[attr_name]

await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)
attr_delete_set.add(attr_name)
save_obj = True

if save_obj:
# update the object lastModified
now = time.time()
obj_json["lastModified"] = now
await save_metadata_obj(app, obj_id, obj_json, bucket=bucket)

resp_json = {}
resp = json_response(resp_json)
Expand Down
Loading

0 comments on commit 9fa156c

Please sign in to comment.