Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multilink #296

Merged
merged 18 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ http_streaming: true # enable HTTP streaming
k8s_dn_label_selector: app=hsds # Selector for getting data node pods from a k8s deployment (https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors)
k8s_namespace: null # Specifies if a the client should be limited to a specific namespace. Useful for some RBAC configurations.
restart_policy: on-failure # Docker restart policy
domain_req_max_objects_limit: 500 # maximum number of objects to return in GET domain request with use_cache
# the following two values with give backoff times of approx: 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, 12.8
dn_max_retries: 7 # number of time to retry DN requests
dn_retry_backoff_exp: 0.1 # backoff factor for retries
Expand All @@ -99,3 +98,4 @@ data_cache_max_req_size: 128k # max size for rangeget fetches
data_cache_expire_time: 3600 # expire cache items after one hour
data_cache_page_size: 4m # page size for range get cache, set to zero to disable proxy
data_cache_max_concurrent_read: 16 # maximum number of inflight storage read requests
domain_req_max_objects_limit: 500 # maximum number of objects to return in GET domain request with use_cache
3 changes: 1 addition & 2 deletions hsds/attr_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
from aiohttp.web import StreamResponse
from json import JSONDecodeError

from .util.httpUtil import getHref
from .util.httpUtil import getAcceptType, jsonResponse
from .util.httpUtil import getAcceptType, jsonResponse, getHref
from .util.idUtil import isValidUuid, getRootObjId
from .util.authUtil import getUserPasswordFromRequest, validateUserPassword
from .util.domainUtil import getDomainFromRequest, isValidDomain
Expand Down
13 changes: 3 additions & 10 deletions hsds/ctype_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

from aiohttp.web_exceptions import HTTPBadRequest, HTTPGone
from json import JSONDecodeError
from .util.httpUtil import http_post, http_delete, getHref, respJsonAssemble
from .util.httpUtil import http_post, getHref, respJsonAssemble
from .util.httpUtil import jsonResponse
from .util.idUtil import isValidUuid, getDataNodeUrl, createObjId
from .util.authUtil import getUserPasswordFromRequest, aclCheck
from .util.authUtil import validateUserPassword
from .util.domainUtil import getDomainFromRequest, getPathForDomain, isValidDomain
from .util.domainUtil import getBucketForDomain, verifyRoot
from .util.hdf5dtype import validateTypeItem, getBaseTypeJson
from .servicenode_lib import getDomainJson, getObjectJson, validateAction
from .servicenode_lib import getDomainJson, getObjectJson, validateAction, deleteObj
from .servicenode_lib import getObjectIdByPath, getPathForObjectId, putHardLink
from . import hsds_logger as log

Expand Down Expand Up @@ -242,8 +242,6 @@ async def DELETE_Datatype(request):
"""HTTP method to delete a committed type resource"""
log.request(request)
app = request.app
meta_cache = app["meta_cache"]

ctype_id = request.match_info.get("id")
if not ctype_id:
msg = "Missing committed type id"
Expand Down Expand Up @@ -273,12 +271,7 @@ async def DELETE_Datatype(request):

await validateAction(app, domain, ctype_id, username, "delete")

req = getDataNodeUrl(app, ctype_id) + "/datatypes/" + ctype_id

await http_delete(app, req, params=params)

if ctype_id in meta_cache:
del meta_cache[ctype_id] # remove from cache
await deleteObj(app, ctype_id, bucket=bucket)

resp = await jsonResponse(request, {})
log.response(request, resp=resp)
Expand Down
17 changes: 11 additions & 6 deletions hsds/domain_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
max_objects_limit=0,
raise_error=False
):
log.info(f"DomainCrawler.__init__ root_id: {len(objs)} objs")
log.info(f"DomainCrawler.__init__ action: {action} root_id: {len(objs)} objs")
log.debug(f"params: {params}")
self._app = app
self._action = action
Expand Down Expand Up @@ -127,7 +127,7 @@ async def put_attributes(self, obj_id, attr_items):

async def get_obj_json(self, obj_id):
""" get the given obj_json for the obj_id.
for each group found, search the links if include_links is set """
for each group found, search the links if follow_links is set """
log.debug(f"get_obj_json: {obj_id}")
collection = getCollectionForId(obj_id)
kwargs = {}
Expand Down Expand Up @@ -207,11 +207,16 @@ async def get_obj_json(self, obj_id):
log.debug(f"DomainCrawler - adding link_id: {link_id}")
self._obj_dict[link_id] = {} # placeholder for obj id
self._q.put_nowait(link_id)
if not self._params.get("include_links"):
# don't keep the links
del obj_json["links"]

async def get_links(self, grp_id, titles=None):
""" if titles is set, get all the links in grp_id that
have a title in the list. Otherwise, return all links for the object. """
log.debug(f"get_links: {grp_id}, titles; {titles}")
log.debug(f"get_links: {grp_id}")
if titles:
log.debug(f"titles; {titles}")
collection = getCollectionForId(grp_id)
if collection != "groups":
log.warn(f"get_links, expected groups id but got: {grp_id}")
Expand All @@ -221,7 +226,6 @@ async def get_links(self, grp_id, titles=None):
kwargs["titles"] = titles
if self._params.get("bucket"):
kwargs["bucket"] = self._params["bucket"]

if self._params.get("follow_links"):
follow_links = True
else:
Expand Down Expand Up @@ -388,7 +392,6 @@ async def work(self):

async def fetch(self, obj_id):
log.debug(f"DomainCrawler fetch for id: {obj_id}")
log.debug(f"action: {self._action}")
if self._action == "get_obj":
log.debug("DomainCrawler - get obj")
# just get the obj json
Expand Down Expand Up @@ -427,7 +430,9 @@ async def fetch(self, obj_id):
await self.put_attributes(obj_id, attr_items)
elif self._action == "get_link":
log.debug("DomainCrawlwer - get links")
if obj_id not in self._objs:
log.debug(f"self._objs: {self._objs}, type: {type(self._objs)}")

if self._objs is None or obj_id not in self._objs:
link_titles = None # fetch all links for this object
else:
link_titles = self._objs[obj_id]
Expand Down
135 changes: 71 additions & 64 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
from aiohttp.web_exceptions import HTTPConflict, HTTPServiceUnavailable
from aiohttp import ClientResponseError
from aiohttp.web import json_response
from requests.sessions import merge_setting

from .util.httpUtil import getObjectClass, http_post, http_put, http_get, http_delete
from .util.httpUtil import getObjectClass, http_post, http_put, http_delete
from .util.httpUtil import getHref, respJsonAssemble
from .util.httpUtil import jsonResponse
from .util.idUtil import getDataNodeUrl, createObjId, getCollectionForId
Expand All @@ -47,60 +46,54 @@
from . import config


async def get_collections(app, root_id, bucket=None):
async def get_collections(app, root_id, bucket=None, max_objects_limit=None):
"""Return the object ids for given root."""

log.info(f"get_collections for {root_id}")
groups = {}
datasets = {}
datatypes = {}
lookup_ids = set()
lookup_ids.add(root_id)
params = {"bucket": bucket}

while lookup_ids:
grp_id = lookup_ids.pop()
req = getDataNodeUrl(app, grp_id)
req += "/groups/" + grp_id + "/links"
log.debug("collection get LINKS: " + req)
try:
# throws 404 if doesn't exist
links_json = await http_get(app, req, params=params)
except HTTPNotFound:
log.warn(f"get_collection, group {grp_id} not found")
continue

log.debug(f"got links json from dn for group_id: {grp_id}")
links = links_json["links"]
log.debug(f"get_collection: got links: {links}")
for link in links:
if link["class"] != "H5L_TYPE_HARD":
continue
link_id = link["id"]
obj_type = getCollectionForId(link_id)
if obj_type == "groups":
if link_id in groups:
continue # been here before
groups[link_id] = {}
lookup_ids.add(link_id)
elif obj_type == "datasets":
if link_id in datasets:
continue
datasets[link_id] = {}
elif obj_type == "datatypes":
if link_id in datatypes:
continue
datatypes[link_id] = {}
else:
msg = "get_collection: unexpected link object type: "
msg += f"{obj_type}"
log.error(merge_setting)
HTTPInternalServerError()
crawler_params = {
"include_attrs": False,
"include_links": False,
"bucket": bucket,
"follow_links": True,
}

if max_objects_limit:
crawler_params["max_objects_limit"] = max_objects_limit

crawler = DomainCrawler(app, [root_id, ], action="get_obj", params=crawler_params)
await crawler.crawl()
if max_objects_limit and len(crawler._obj_dict) >= max_objects_limit:
msg = "get_collections - too many objects: "
msg += f"{len(crawler._obj_dict)}, returning None"
log.info(msg)
return None
else:
msg = f"DomainCrawler returned: {len(crawler._obj_dict)} object ids"
log.info(msg)

group_ids = set()
dataset_ids = set()
datatype_ids = set()

for obj_id in crawler._obj_dict:
obj_type = getCollectionForId(obj_id)
if obj_type == "groups":
group_ids.add(obj_id)
elif obj_type == "datasets":
dataset_ids.add(obj_id)
elif obj_type == "datatypes":
datatype_ids.add(obj_id)
else:
log.warn(f"get_collections - unexpected id type: {obj_id}")
if root_id in group_ids:
group_ids.remove(root_id) # don't include the root id
print(f"get_collections - group_ids: {group_ids}")

result = {}
result["groups"] = groups
result["datasets"] = datasets
result["datatypes"] = datatypes
result["groups"] = group_ids
result["datasets"] = dataset_ids
result["datatypes"] = datatype_ids
return result


Expand All @@ -114,9 +107,10 @@ async def getDomainObjects(app, root_id, include_attrs=False, bucket=None):

crawler_params = {
"include_attrs": include_attrs,
"bucket": bucket,
"include_links": True,
"follow_links": True,
"max_objects_limit": max_objects_limit,
"bucket": bucket,
}

crawler = DomainCrawler(app, [root_id, ], action="get_obj", params=crawler_params)
Expand Down Expand Up @@ -263,15 +257,13 @@ async def get_domains(request):
if pattern:
# do a pattern match on the basename
basename = op.basename(domain)
log.debug(
f"get_domains: checking {basename} against pattern: {pattern}"
)
msg = f"get_domains: checking {basename} against pattern: {pattern}"
log.debug(msg)
try:
got_match = globmatch(basename, pattern)
except ValueError as ve:
log.warn(
f"get_domains, invalid query pattern {pattern}, ValueError: {ve}"
)
msg = f"get_domains, invalid query pattern {pattern}, ValueError: {ve}"
log.warn(msg)
raise HTTPBadRequest(reason="invalid query pattern")
if got_match:
log.debug("get_domains - got_match")
Expand Down Expand Up @@ -502,14 +494,14 @@ async def GET_Domain(request):
h5path = params["h5path"]

# select which object to perform path search under
root_id = parent_id if parent_id else domain_json["root"]
base_id = parent_id if parent_id else domain_json["root"]

# getObjectIdByPath throws 404 if not found
obj_id, domain, _ = await getObjectIdByPath(
app, root_id, h5path, bucket=bucket, domain=domain,
app, base_id, h5path, bucket=bucket, domain=domain,
follow_soft_links=follow_soft_links,
follow_external_links=follow_external_links)
log.info(f"get obj_id: {obj_id} from h5path: {h5path}")
log.info(f"got obj_id: {obj_id} from h5path: {h5path}")
# get authoritative state for object from DN (even if
# it's in the meta_cache).
kwargs = {"refresh": True, "bucket": bucket,
Expand Down Expand Up @@ -632,11 +624,14 @@ async def POST_Domain(request):
params = request.rel_url.query
log.debug(f"POST_Domain query params: {params}")

parent_id = None
include_links = False
include_attrs = False
follow_soft_links = False
follow_external_links = False

if "parent_id" in params and params["parent_id"]:
parent_id = params["parent_id"]
if "include_links" in params and params["include_links"]:
mattjala marked this conversation as resolved.
Show resolved Hide resolved
include_links = True
if "include_attrs" in params and params["include_attrs"]:
Expand Down Expand Up @@ -710,22 +705,34 @@ async def POST_Domain(request):
log.error("No acls key found in domain")
raise HTTPInternalServerError()

log.debug(f"got domain_json: {domain_json}")
if "root" not in domain_json:
msg = f"{domain} is a folder, not a domain"
log.warn(msg)
raise HTTPBadRequest(reason=msg)

root_id = domain_json["root"]

# select which object to perform path search under
base_id = parent_id if parent_id else root_id

log.debug(f"POST_Domain with h5paths: {h5paths} from: {base_id}")
# validate that the requesting user has permission to read this domain
# aclCheck throws exception if not authorized
aclCheck(app, domain_json, "read", username)

json_objs = {}

# TBD: the following could be made more efficient for
# cases where a large number of h5paths are given...
for h5path in h5paths:
root_id = domain_json["root"]

# getObjectIdByPath throws 404 if not found
obj_id, domain, _ = await getObjectIdByPath(
app, root_id, h5path, bucket=bucket, domain=domain,
app, base_id, h5path, bucket=bucket, domain=domain,
follow_soft_links=follow_soft_links,
follow_external_links=follow_external_links)
log.info(f"get obj_id: {obj_id} from h5path: {h5path}")

log.info(f"got obj_id: {obj_id} from h5path: {h5path}")
# get authoritative state for object from DN (even if
# it's in the meta_cache).
kwargs = {"refresh": True, "bucket": bucket,
Expand Down
19 changes: 19 additions & 0 deletions hsds/dset_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -904,3 +904,22 @@ async def reduceShape(app, dset_json, shape_update, bucket=None):
await removeChunks(app, delete_ids, bucket=bucket)
else:
log.info("no chunks need deletion for shape reduction")


async def deleteAllChunks(app, dset_id, bucket=None):
""" Delete any allocated chunks for the given dataset """

log.info(f"deleteAllChunks for {dset_id}")

# get all chunk ids for chunks that have been allocated
chunk_ids = await getAllocatedChunkIds(app, dset_id, bucket=bucket)
chunk_ids.sort()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double sort


if chunk_ids:
chunk_ids = list(chunk_ids)
chunk_ids.sort()
msg = f"deleteAllChunks for {dset_id} - these chunks will need to be deleted: {chunk_ids}"
log.debug(msg)
await removeChunks(app, chunk_ids, bucket=bucket)
else:
log.info(f"deleteAllChunks for {dset_id} - no chunks need deletion")
Loading
Loading