From 213490e9b88dcfdf0618cae99e3d71009370e239 Mon Sep 17 00:00:00 2001 From: Agisilaos Kounelis Date: Wed, 4 Dec 2024 15:15:28 +0200 Subject: [PATCH] Move _write_array to Python Array class --- tiledb/array.py | 172 ++++++++++++++++++++++++++ tiledb/dense_array.py | 6 +- tiledb/libtiledb.pyx | 266 ----------------------------------------- tiledb/sparse_array.py | 8 +- 4 files changed, 175 insertions(+), 277 deletions(-) diff --git a/tiledb/array.py b/tiledb/array.py index 02fbf08aaf..452dde67a7 100644 --- a/tiledb/array.py +++ b/tiledb/array.py @@ -769,6 +769,178 @@ def domain_index(self): def dindex(self): return self.domain_index + def _write_array( + tiledb_array, + subarray, + coordinates: list, + buffer_names: list, + values: list, + labels: dict, + nullmaps: dict, + issparse: bool, + ): + # used for buffer conversion (local import to avoid circularity) + from .main import array_to_buffer + + isfortran = False + nattr = len(buffer_names) + nlabel = len(labels) + + # Create arrays to hold buffer sizes + nbuffer = nattr + nlabel + if issparse: + nbuffer += tiledb_array.schema.ndim + buffer_sizes = np.zeros((nbuffer,), dtype=np.uint64) + buffer_offsets_sizes = np.zeros((nbuffer,), dtype=np.uint64) + + # Create lists for data and offset buffers + output_values = list() + output_offsets = list() + + # Set data and offset buffers for attributes + for i in range(nattr): + # if dtype is ASCII, ensure all characters are valid + if tiledb_array.schema.attr(i).isascii: + try: + values[i] = np.asarray(values[i], dtype=np.bytes_) + except Exception as exc: + raise tiledb.TileDBError( + f'dtype of attr {tiledb_array.schema.attr(i).name} is "ascii" but attr_val contains invalid ASCII characters' + ) + + attr = tiledb_array.schema.attr(i) + + if attr.isvar: + try: + if attr.isnullable: + if np.issubdtype(attr.dtype, np.str_) or np.issubdtype( + attr.dtype, np.bytes_ + ): + attr_val = np.array( + ["" if v is None else v for v in values[i]] + ) + else: + attr_val = np.nan_to_num(values[i]) + else: + attr_val = values[i] + buffer, offsets = array_to_buffer(attr_val, True, False) + except Exception as exc: + raise type(exc)( + f"Failed to convert buffer for attribute: '{attr.name}'" + ) from exc + buffer_offsets_sizes[i] = offsets.nbytes + else: + buffer, offsets = values[i], None + + buffer_sizes[i] = buffer.nbytes + output_values.append(buffer) + output_offsets.append(offsets) + + # Check value layouts + if len(values) and nattr > 1: + value = output_values[0] + isfortran = value.ndim > 1 and value.flags.f_contiguous + for value in values: + if value.ndim > 1 and value.flags.f_contiguous and not isfortran: + raise ValueError("mixed C and Fortran array layouts") + + # Set data and offsets buffers for dimensions (sparse arrays only) + ibuffer = nattr + if issparse: + for dim_idx, coords in enumerate(coordinates): + if tiledb_array.schema.domain.dim(dim_idx).isvar: + buffer, offsets = array_to_buffer(coords, True, False) + buffer_sizes[ibuffer] = buffer.nbytes + buffer_offsets_sizes[ibuffer] = offsets.nbytes + else: + buffer, offsets = coords, None + buffer_sizes[ibuffer] = buffer.nbytes + output_values.append(buffer) + output_offsets.append(offsets) + + name = tiledb_array.schema.domain.dim(dim_idx).name + buffer_names.append(name) + + ibuffer = ibuffer + 1 + + for label_name, label_values in labels.items(): + # Append buffer name + buffer_names.append(label_name) + # Get label data buffer and offsets buffer for the labels + dim_label = tiledb_array.schema.dim_label(label_name) + if dim_label.isvar: + buffer, offsets = array_to_buffer(label_values, True, False) + buffer_sizes[ibuffer] = buffer.nbytes + buffer_offsets_sizes[ibuffer] = offsets.nbytes + else: + buffer, offsets = label_values, None + buffer_sizes[ibuffer] = buffer.nbytes + # Append the buffers + output_values.append(buffer) + output_offsets.append(offsets) + + ibuffer = ibuffer + 1 + + # Allocate the query + ctx = lt.Context(tiledb_array.ctx) + q = lt.Query(ctx, tiledb_array.array, lt.QueryType.WRITE) + + # Set the layout + layout = ( + lt.LayoutType.UNORDERED + if issparse + else (lt.LayoutType.COL_MAJOR if isfortran else lt.LayoutType.ROW_MAJOR) + ) + q.layout = layout + + # Create and set the subarray for the query (dense arrays only) + if not issparse: + q.set_subarray(subarray) + + # Set buffers on the query + for i, buffer_name in enumerate(buffer_names): + # Set data buffer + q.set_data_buffer( + buffer_name, output_values[i], buffer_sizes[i], ctx + ) + + # Set offsets buffer + if output_offsets[i] is not None: + # output_offsets[i] = output_offsets[i].astype(np.uint64) + q.set_offsets_buffer( + buffer_name, output_offsets[i], len(output_offsets[i], ctx) + ) + + # Set validity buffer + if buffer_name in nullmaps: + nulmap = nullmaps[buffer_name] + # nulmap = nulmap.astype(np.uint8) + q.set_validity_buffer(buffer_name, nulmap, len(nulmap), ctx) + + q._submit() + q.finalize() + + fragment_info = tiledb_array.last_fragment_info + if fragment_info is not False: + if not isinstance(fragment_info, dict): + raise ValueError( + f"Expected fragment_info to be a dict, got {type(fragment_info)}" + ) + fragment_info.clear() + + result = dict() + num_fragments = q.fragment_num() + + if num_fragments < 1: + return result + + for fragment_idx in range(0, num_fragments): + fragment_uri = q.fragment_uri(fragment_idx) + fragment_t1, fragment_t2 = q.fragment_timestamp_range(fragment_idx) + result[fragment_uri] = (fragment_t1, fragment_t2) + + fragment_info.update(result) + def label_index(self, labels): """Retrieve data cells with multi-range, domain-inclusive indexing by label. Returns the cross-product of the ranges. diff --git a/tiledb/dense_array.py b/tiledb/dense_array.py index 8423e9457e..b1e12b1a95 100644 --- a/tiledb/dense_array.py +++ b/tiledb/dense_array.py @@ -575,11 +575,7 @@ def _setitem_impl(self, selection, val, nullmaps: dict): f"validity bitmap, got {type(val)}" ) - from .libtiledb import _write_array_wrapper - - _write_array_wrapper( - self, subarray, [], attributes, values, labels, nullmaps, False - ) + self._write_array(subarray, [], attributes, values, labels, nullmaps, False) def __array__(self, dtype=None, **kw): """Implementation of numpy __array__ protocol (internal). diff --git a/tiledb/libtiledb.pyx b/tiledb/libtiledb.pyx index de540896d0..687a5dc668 100644 --- a/tiledb/libtiledb.pyx +++ b/tiledb/libtiledb.pyx @@ -45,272 +45,6 @@ def version(): tiledb_version(&major, &minor, &rev) return major, minor, rev - -# note: this function is cdef, so it must return a python object in order to -# properly forward python exceptions raised within the function. See: -# https://cython.readthedocs.io/en/latest/src/userguide/language_basics.html#error-return-values -cdef dict get_query_fragment_info(tiledb_ctx_t* ctx_ptr, - tiledb_query_t* query_ptr): - - cdef int rc = TILEDB_OK - cdef uint32_t num_fragments - cdef Py_ssize_t fragment_idx - cdef const char* fragment_uri_ptr - cdef unicode fragment_uri - cdef uint64_t fragment_t1, fragment_t2 - cdef dict result = dict() - - rc = tiledb_query_get_fragment_num(ctx_ptr, query_ptr, &num_fragments) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - if (num_fragments < 1): - return result - - for fragment_idx in range(0, num_fragments): - - rc = tiledb_query_get_fragment_uri(ctx_ptr, query_ptr, fragment_idx, &fragment_uri_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - rc = tiledb_query_get_fragment_timestamp_range( - ctx_ptr, query_ptr, fragment_idx, &fragment_t1, &fragment_t2) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - fragment_uri = fragment_uri_ptr.decode('UTF-8') - result[fragment_uri] = (fragment_t1, fragment_t2) - - return result - -def _write_array_wrapper( - object tiledb_array, - object subarray, - list coordinates, - list buffer_names, - list values, - dict labels, - dict nullmaps, - bint issparse, - ): - - cdef tiledb_ctx_t* ctx_ptr = safe_ctx_ptr(tiledb_array.ctx) - cdef tiledb_array_t* array_ptr = PyCapsule_GetPointer(tiledb_array.array.__capsule__(), "array") - cdef dict fragment_info = tiledb_array.last_fragment_info - _write_array(ctx_ptr, array_ptr, tiledb_array, subarray, coordinates, buffer_names, values, labels, nullmaps, fragment_info, issparse) - -cdef _write_array( - tiledb_ctx_t* ctx_ptr, - tiledb_array_t* array_ptr, - object tiledb_array, - object subarray, - list coordinates, - list buffer_names, - list values, - dict labels, - dict nullmaps, - dict fragment_info, - bint issparse, - ): - - # used for buffer conversion (local import to avoid circularity) - from .main import array_to_buffer - - cdef bint isfortran = False - cdef Py_ssize_t nattr = len(buffer_names) - cdef Py_ssize_t nlabel = len(labels) - - # Create arrays to hold buffer sizes - cdef Py_ssize_t nbuffer = nattr + nlabel - if issparse: - nbuffer += tiledb_array.schema.ndim - cdef np.ndarray buffer_sizes = np.zeros((nbuffer,), dtype=np.uint64) - cdef np.ndarray buffer_offsets_sizes = np.zeros((nbuffer,), dtype=np.uint64) - cdef np.ndarray nullmaps_sizes = np.zeros((nbuffer,), dtype=np.uint64) - - # Create lists for data and offset buffers - output_values = list() - output_offsets = list() - - # Set data and offset buffers for attributes - for i in range(nattr): - # if dtype is ASCII, ensure all characters are valid - if tiledb_array.schema.attr(i).isascii: - try: - values[i] = np.asarray(values[i], dtype=np.bytes_) - except Exception as exc: - raise TileDBError(f'dtype of attr {tiledb_array.schema.attr(i).name} is "ascii" but attr_val contains invalid ASCII characters') - - attr = tiledb_array.schema.attr(i) - - if attr.isvar: - try: - if attr.isnullable: - if(np.issubdtype(attr.dtype, np.str_) - or np.issubdtype(attr.dtype, np.bytes_)): - attr_val = np.array(["" if v is None else v for v in values[i]]) - else: - attr_val = np.nan_to_num(values[i]) - else: - attr_val = values[i] - buffer, offsets = array_to_buffer(attr_val, True, False) - except Exception as exc: - raise type(exc)(f"Failed to convert buffer for attribute: '{attr.name}'") from exc - buffer_offsets_sizes[i] = offsets.nbytes - else: - buffer, offsets = values[i], None - - buffer_sizes[i] = buffer.nbytes - output_values.append(buffer) - output_offsets.append(offsets) - - # Check value layouts - if len(values) and nattr > 1: - value = output_values[0] - isfortran = value.ndim > 1 and value.flags.f_contiguous - for value in values: - if value.ndim > 1 and value.flags.f_contiguous and not isfortran: - raise ValueError("mixed C and Fortran array layouts") - - # Set data and offsets buffers for dimensions (sparse arrays only) - ibuffer = nattr - if issparse: - for dim_idx, coords in enumerate(coordinates): - if tiledb_array.schema.domain.dim(dim_idx).isvar: - buffer, offsets = array_to_buffer(coords, True, False) - buffer_sizes[ibuffer] = buffer.nbytes - buffer_offsets_sizes[ibuffer] = offsets.nbytes - else: - buffer, offsets = coords, None - buffer_sizes[ibuffer] = buffer.nbytes - output_values.append(buffer) - output_offsets.append(offsets) - - name = tiledb_array.schema.domain.dim(dim_idx).name - buffer_names.append(name) - - ibuffer = ibuffer + 1 - - for label_name, label_values in labels.items(): - # Append buffer name - buffer_names.append(label_name) - # Get label data buffer and offsets buffer for the labels - dim_label = tiledb_array.schema.dim_label(label_name) - if dim_label.isvar: - buffer, offsets = array_to_buffer(label_values, True, False) - buffer_sizes[ibuffer] = buffer.nbytes - buffer_offsets_sizes[ibuffer] = offsets.nbytes - else: - buffer, offsets = label_values, None - buffer_sizes[ibuffer] = buffer.nbytes - # Append the buffers - output_values.append(buffer) - output_offsets.append(offsets) - - ibuffer = ibuffer + 1 - - - # Allocate the query - cdef int rc = TILEDB_OK - cdef tiledb_query_t* query_ptr = NULL - rc = tiledb_query_alloc(ctx_ptr, array_ptr, TILEDB_WRITE, &query_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - # Set layout - cdef tiledb_layout_t layout = ( - TILEDB_UNORDERED - if issparse - else (TILEDB_COL_MAJOR if isfortran else TILEDB_ROW_MAJOR) - ) - rc = tiledb_query_set_layout(ctx_ptr, query_ptr, layout) - if rc != TILEDB_OK: - tiledb_query_free(&query_ptr) - _raise_ctx_err(ctx_ptr, rc) - - # Create and set the subarray for the query (dense arrays only) - cdef np.ndarray s_start - cdef np.ndarray s_end - cdef np.dtype dim_dtype = None - cdef void* s_start_ptr = NULL - cdef void* s_end_ptr = NULL - cdef tiledb_subarray_t* subarray_ptr = NULL - if not issparse: - subarray_ptr = PyCapsule_GetPointer( - subarray.__capsule__(), "subarray") - # Set the subarray on the query - rc = tiledb_query_set_subarray_t(ctx_ptr, query_ptr, subarray_ptr) - if rc != TILEDB_OK: - tiledb_query_free(&query_ptr) - _raise_ctx_err(ctx_ptr, rc) - - # Set buffers on the query - cdef bytes bname - cdef void* buffer_ptr = NULL - cdef uint64_t* offsets_buffer_ptr = NULL - cdef uint8_t* nulmap_buffer_ptr = NULL - cdef uint64_t* buffer_sizes_ptr = np.PyArray_DATA(buffer_sizes) - cdef uint64_t* offsets_buffer_sizes_ptr = np.PyArray_DATA(buffer_offsets_sizes) - cdef uint64_t* nullmaps_sizes_ptr = np.PyArray_DATA(nullmaps_sizes) - try: - for i, buffer_name in enumerate(buffer_names): - # Get utf-8 version of the name for C-API calls - bname = buffer_name.encode('UTF-8') - - # Set data buffer - buffer_ptr = np.PyArray_DATA(output_values[i]) - rc = tiledb_query_set_data_buffer( - ctx_ptr, query_ptr, bname, buffer_ptr, &(buffer_sizes_ptr[i])) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - # Set offsets buffer - if output_offsets[i] is not None: - offsets_buffer_ptr = np.PyArray_DATA(output_offsets[i]) - rc = tiledb_query_set_offsets_buffer( - ctx_ptr, - query_ptr, - bname, - offsets_buffer_ptr, - &(offsets_buffer_sizes_ptr[i]) - ) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - # Set validity buffer - if buffer_name in nullmaps: - # NOTE: validity map is owned *by the caller* - nulmap = nullmaps[buffer_name] - nullmaps_sizes[i] = len(nulmap) - nulmap_buffer_ptr = np.PyArray_DATA(nulmap) - rc = tiledb_query_set_validity_buffer( - ctx_ptr, - query_ptr, - bname, - nulmap_buffer_ptr, - &(nullmaps_sizes_ptr[i]) - ) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - with nogil: - rc = tiledb_query_submit(ctx_ptr, query_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - rc = tiledb_query_finalize(ctx_ptr, query_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - if fragment_info is not False: - assert(type(fragment_info) is dict) - fragment_info.clear() - fragment_info.update(get_query_fragment_info(ctx_ptr, query_ptr)) - - finally: - tiledb_query_free(&query_ptr) - return - cdef _raise_tiledb_error(tiledb_error_t* err_ptr): cdef const char* err_msg_ptr = NULL ret = tiledb_error_message(err_ptr, &err_msg_ptr) diff --git a/tiledb/sparse_array.py b/tiledb/sparse_array.py index 47bbd5a909..978f6affa1 100644 --- a/tiledb/sparse_array.py +++ b/tiledb/sparse_array.py @@ -84,11 +84,8 @@ def _setitem_impl_sparse(self, selection, val, nullmaps: dict): index_domain_coords(self.schema.domain, idx, not set_dims_only) ) - from .libtiledb import _write_array_wrapper - if set_dims_only: - _write_array_wrapper( - self, + self._write_array( None, sparse_coords, sparse_attributes, @@ -191,8 +188,7 @@ def _setitem_impl_sparse(self, selection, val, nullmaps: dict): "Sparse write input data count does not match number of attributes" ) - _write_array_wrapper( - self, + self._write_array( None, sparse_coords, sparse_attributes,