Skip to content

Commit

Permalink
Merge pull request #277 from HDFGroup/reduce_extent
Browse files Browse the repository at this point in the history
Reduce extent
  • Loading branch information
derobins authored Oct 27, 2023
2 parents bbee0e3 + 8eec439 commit 50b2077
Show file tree
Hide file tree
Showing 20 changed files with 2,168 additions and 863 deletions.
4 changes: 2 additions & 2 deletions hsds/async_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from .util.idUtil import getObjId, isValidChunkId, getCollectionForId
from .util.chunkUtil import getDatasetId, getNumChunks, ChunkIterator
from .util.hdf5dtype import getItemSize, createDataType
from .util.arrayUtil import getShapeDims, getNumElements, bytesToArray
from .util.arrayUtil import getNumElements, bytesToArray
from .util.dsetUtil import getHyperslabSelection, getFilterOps, getChunkDims
from .util.dsetUtil import getDatasetLayoutClass, getDatasetLayout
from .util.dsetUtil import getDatasetLayoutClass, getDatasetLayout, getShapeDims

from .util.storUtil import getStorKeys, putStorJSONObj, getStorJSONObj
from .util.storUtil import deleteStorObj, getStorBytes, isStorObj
Expand Down
7 changes: 4 additions & 3 deletions hsds/attr_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
from .util.attrUtil import validateAttributeName, getRequestCollectionName
from .util.hdf5dtype import validateTypeItem, getBaseTypeJson
from .util.hdf5dtype import createDataType, getItemSize
from .util.arrayUtil import jsonToArray, getShapeDims, getNumElements
from .util.arrayUtil import jsonToArray, getNumElements
from .util.arrayUtil import bytesArrayToList
from .util.dsetUtil import getShapeDims
from .servicenode_lib import getDomainJson, getObjectJson, validateAction
from . import hsds_logger as log
from . import config
Expand Down Expand Up @@ -377,7 +378,7 @@ async def PUT_Attribute(request):
msg = "Bad Request: input data doesn't match selection"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
log.info(f"Got: {arr.size} array elements")
log.debug(f"Got: {arr.size} array elements")
else:
value = None

Expand Down Expand Up @@ -717,7 +718,7 @@ async def PUT_AttributeValue(request):
msg = "Bad Request: input data doesn't match selection"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
log.info(f"Got: {arr.size} array elements")
log.debug(f"Got: {arr.size} array elements")

# ready to add attribute now
attr_json = {}
Expand Down
116 changes: 73 additions & 43 deletions hsds/chunk_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
import time
import traceback
import random
from asyncio import CancelledError
import numpy as np
Expand All @@ -27,12 +28,13 @@
from .util.httpUtil import isUnixDomainUrl
from .util.idUtil import getDataNodeUrl, getNodeCount
from .util.hdf5dtype import createDataType
from .util.dsetUtil import getSliceQueryParam
from .util.dsetUtil import getSliceQueryParam, getShapeDims
from .util.dsetUtil import getSelectionShape, getChunkLayout
from .util.chunkUtil import getChunkCoverage, getDataCoverage
from .util.chunkUtil import getChunkIdForPartition, getQueryDtype
from .util.arrayUtil import jsonToArray, getShapeDims, getNumpyValue
from .util.arrayUtil import jsonToArray, getNumpyValue
from .util.arrayUtil import getNumElements, arrayToBytes, bytesToArray

from . import config
from . import hsds_logger as log

Expand All @@ -43,6 +45,33 @@
)


def getFillValue(dset_json):
""" Return the fill value of the given dataset as a numpy array.
If no fill value is defined, return an zero array of given type """

# NOTE - this is copy of the function in dset_lib, but needed to put
# here to avoid a circular dependency

fill_value = None
type_json = dset_json["type"]
dt = createDataType(type_json)

if "creationProperties" in dset_json:
cprops = dset_json["creationProperties"]
if "fillValue" in cprops:
fill_value_prop = cprops["fillValue"]
log.debug(f"got fill_value_prop: {fill_value_prop}")
encoding = cprops.get("fillValue_encoding")
fill_value = getNumpyValue(fill_value_prop, dt=dt, encoding=encoding)
if fill_value:
arr = np.empty((1,), dtype=dt, order="C")
arr[...] = fill_value
else:
arr = np.zeros([1,], dtype=dt, order="C")

return arr


async def write_chunk_hyperslab(
app, chunk_id, dset_json, slices, arr, bucket=None, client=None
):
Expand Down Expand Up @@ -70,20 +99,45 @@ async def write_chunk_hyperslab(
log.error(f"No type found in dset_json: {dset_json}")
raise HTTPInternalServerError()

params = {}
layout = getChunkLayout(dset_json)
log.debug(f"getChunkCoverage({chunk_id}, {slices}, {layout})")
chunk_sel = getChunkCoverage(chunk_id, slices, layout)
if chunk_sel is None:
log.warn(f"getChunkCoverage returned None for: {chunk_id}, {slices}, {layout}")
return
log.debug(f"chunk_sel: {chunk_sel}")
data_sel = getDataCoverage(chunk_id, slices, layout)
log.debug(f"data_sel: {data_sel}")
log.debug(f"arr.shape: {arr.shape}")
arr_chunk = arr[data_sel]

# broadcast data if arr has one element and no stride is set
do_broadcast = True
if np.prod(arr.shape) != 1:
do_broadcast = False
else:
for s in slices:
if s.step is None:
continue
if s.step > 1:
do_broadcast = False

if do_broadcast:
log.debug(f"broadcasting {arr}")
# just broadcast data value across selection
params["element_count"] = 1
arr_chunk = arr
else:
arr_chunk = arr[data_sel]

req = getDataNodeUrl(app, chunk_id)
req += "/chunks/" + chunk_id

log.debug(f"PUT chunk req: {req}")
data = arrayToBytes(arr_chunk)

log.debug(f"PUT chunk req: {req}, {len(data)} bytes")

# pass itemsize, type, dimensions, and selection as query params
params = {}
select = getSliceQueryParam(chunk_sel)
params["select"] = select
if bucket:
Expand Down Expand Up @@ -125,18 +179,6 @@ async def read_chunk_hyperslab(
return

msg = f"read_chunk_hyperslab, chunk_id: {chunk_id},"
"""
msg += " slices: ["
for s in slices:
if isinstance(s, slice):
msg += f"{s},"
else:
if len(s) > 5:
# avoid large output lines
msg += f"[{s[0]}, {s[1]}, ..., {s[-2]}, {s[-1]}],"
else:
msg += f"{s},"
"""
msg += f" bucket: {bucket}"
if query is not None:
msg += f" query: {query} limit: {limit}"
Expand Down Expand Up @@ -309,16 +351,13 @@ async def read_chunk_hyperslab(
# TBD: this needs to be fixed up for variable length dtypes
nrows = len(array_data) // query_dtype.itemsize
try:
chunk_arr = bytesToArray(
array_data,
query_dtype,
[
nrows,
],
)
chunk_arr = bytesToArray(array_data, query_dtype, (nrows,))
except ValueError as ve:
log.warn(f"bytesToArray ValueError: {ve}")
raise HTTPBadRequest()
if chunk_arr.shape[0] != nrows:
log.error(f"expected chunk shape to be ({nrows},), but got {chunk_arr.shape[0]}")
raise HTTPInternalServerError()
# save result to chunk_info
# chunk results will be merged later
chunk_info["query_rsp"] = chunk_arr
Expand Down Expand Up @@ -404,14 +443,8 @@ async def read_point_sel(
np_arr_rsp = None
dt = np_arr.dtype

fill_value = None
# initialize to fill_value if specified
if "creationProperties" in dset_json:
cprops = dset_json["creationProperties"]
if "fillValue" in cprops:
fill_value_prop = cprops["fillValue"]
encoding = cprops.get("fillValue_encoding")
fill_value = getNumpyValue(fill_value_prop, dt=dt, encoding=encoding)
fill_value = getFillValue(dset_json)

def defaultArray():
# no data, return zero array
Expand Down Expand Up @@ -817,26 +850,23 @@ async def do_work(self, chunk_id, client=None):
)
except HTTPServiceUnavailable as sue:
status_code = 503
log.warn(
f"HTTPServiceUnavailable for {self._action}({chunk_id}): {sue}"
)
msg = f"HTTPServiceUnavailable for {self._action}({chunk_id}): {sue}"
log.warn(msg)
except Exception as e:
status_code = 500
log.error(
f"Unexpected exception {type(e)} for {self._action}({chunk_id}): {e} "
)
msg = f"Unexpected exception {type(e)} for {self._action}({chunk_id}): {e} "
log.error(msg)
tb = traceback.format_exc()
print("traceback:", tb)
retry += 1
if status_code == 200:
break
if retry == max_retries:
log.error(
f"ChunkCrawler action: {self._action} failed after: {retry} retries"
)
msg = f"ChunkCrawler action: {self._action} failed after: {retry} retries"
log.error(msg)
else:
sleep_time = retry_exp * 2 ** retry + random.uniform(0, 0.1)
log.warn(
f"ChunkCrawler.doWork - retry: {retry}, sleeping for {sleep_time:.2f}"
)
msg = f"ChunkCrawler.doWork - retry: {retry}, sleeping for {sleep_time:.2f}"
await asyncio.sleep(sleep_time)

# save status_code
Expand Down
48 changes: 39 additions & 9 deletions hsds/chunk_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from aiohttp.web import json_response, StreamResponse

from .util.httpUtil import request_read, getContentType
from .util.arrayUtil import bytesToArray, arrayToBytes
from .util.arrayUtil import bytesToArray, arrayToBytes, getBroadcastShape
from .util.idUtil import getS3Key, validateInPartition, isValidUuid
from .util.storUtil import isStorObj, deleteStorObj
from .util.hdf5dtype import createDataType
from .util.dsetUtil import getSelectionList, getChunkLayout
from .util.dsetUtil import getSelectionList, getChunkLayout, getShapeDims
from .util.dsetUtil import getSelectionShape, getChunkInitializer
from .util.chunkUtil import getChunkIndex, getDatasetId, chunkQuery
from .util.chunkUtil import chunkWriteSelection, chunkReadSelection
Expand All @@ -48,6 +48,7 @@ async def PUT_Chunk(request):
limit = 0
bucket = None
input_arr = None
element_count = None

if "query" in params:
query = params["query"]
Expand Down Expand Up @@ -77,6 +78,15 @@ async def PUT_Chunk(request):
log.warn(msg)
raise HTTPInternalServerError(reason=msg)

if "element_count" in params:
try:
element_count = int(params["element_count"])
except ValueError:
msg = "invalid element_count"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
log.debug(f"element_count param: {element_count}")

try:
validateInPartition(app, chunk_id)
except KeyError:
Expand Down Expand Up @@ -130,14 +140,21 @@ async def PUT_Chunk(request):
log.debug(f"PUT_Chunk slices: {selection}")

mshape = getSelectionShape(selection)
num_elements = 1
for extent in mshape:
num_elements *= extent
if element_count is not None:
bcshape = getBroadcastShape(mshape, element_count)
log.debug(f"ussing bcshape: {bcshape}")
else:
bcshape = None

if bcshape:
num_elements = np.prod(bcshape)
else:
num_elements = np.prod(mshape)

if getChunkInitializer(dset_json):
chunk_init = True
elif query:
chunk_init = False # don't initalize new chunks on query update
chunk_init = False # don't initialize new chunks on query update
else:
chunk_init = True

Expand Down Expand Up @@ -220,7 +237,6 @@ async def PUT_Chunk(request):
return
else:
# regular chunk update

# check that the content_length is what we expect
if itemsize != "H5T_VARIABLE":
log.debug(f"expect content_length: {num_elements*itemsize}")
Expand All @@ -229,7 +245,7 @@ async def PUT_Chunk(request):
actual = request.content_length
if itemsize != "H5T_VARIABLE":
expected = num_elements * itemsize
if expected != actual:
if expected % actual != 0:
msg = f"Expected content_length of: {expected}, but got: {actual}"
log.error(msg)
raise HTTPBadRequest(reason=msg)
Expand All @@ -243,7 +259,16 @@ async def PUT_Chunk(request):
log.error(msg)
raise HTTPInternalServerError()

input_arr = bytesToArray(input_bytes, dt, mshape)
input_arr = bytesToArray(input_bytes, dt, [num_elements, ])
if bcshape:
input_arr = input_arr.reshape(bcshape)
log.debug(f"broadcasting {bcshape} to mshape {mshape}")
arr_tmp = np.zeros(mshape, dtype=dt)
arr_tmp[...] = input_arr
input_arr = arr_tmp
else:
input_arr = input_arr.reshape(mshape)

kwargs = {"chunk_arr": chunk_arr, "slices": selection, "data": input_arr}
is_dirty = chunkWriteSelection(**kwargs)

Expand Down Expand Up @@ -375,6 +400,8 @@ async def GET_Chunk(request):
dset_id = getDatasetId(chunk_id)

dset_json = await get_metadata_obj(app, dset_id, bucket=bucket)
shape_dims = getShapeDims(dset_json["shape"])
log.debug(f"shape_dims: {shape_dims}")
dims = getChunkLayout(dset_json)
log.debug(f"GET_Chunk - got dims: {dims}")

Expand All @@ -385,6 +412,9 @@ async def GET_Chunk(request):
select = None # get slices for entire datashape
if select is not None:
log.debug(f"GET_Chunk - using select string: {select}")
else:
log.debug("GET_Chunk - no selection string")

try:
selection = getSelectionList(select, dims)
except ValueError as ve:
Expand Down
Loading

0 comments on commit 50b2077

Please sign in to comment.