Skip to content

Commit

Permalink
Factor _write_array out of Cython (#2115)
Browse files Browse the repository at this point in the history
  • Loading branch information
kounelisagis authored Dec 6, 2024
1 parent c0e126a commit 76e62f6
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 283 deletions.
169 changes: 169 additions & 0 deletions tiledb/array.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import warnings
from typing import Dict, List

import numpy as np

import tiledb
import tiledb.cc as lt

from .ctx import Config, Ctx, default_ctx
from .datatypes import DataType
from .domain_indexer import DomainIndexer
from .enumeration import Enumeration
from .metadata import Metadata
Expand Down Expand Up @@ -769,6 +771,173 @@ def domain_index(self):
def dindex(self):
return self.domain_index

def _write_array(
self,
subarray,
coordinates: List,
buffer_names: List,
values: List,
labels: Dict,
nullmaps: Dict,
issparse: bool,
):
# used for buffer conversion (local import to avoid circularity)
from .main import array_to_buffer

isfortran = False
nattr = len(buffer_names)
nlabel = len(labels)

# Create arrays to hold buffer sizes
nbuffer = nattr + nlabel
if issparse:
nbuffer += self.schema.ndim
buffer_sizes = np.zeros((nbuffer,), dtype=np.uint64)

# Create lists for data and offset buffers
output_values = list()
output_offsets = list()

# Set data and offset buffers for attributes
for i in range(nattr):
attr = self.schema.attr(i)
# if dtype is ASCII, ensure all characters are valid
if attr.isascii:
try:
values[i] = np.asarray(values[i], dtype=np.bytes_)
except Exception as exc:
raise tiledb.TileDBError(
f'dtype of attr {attr.name} is "ascii" but attr_val contains invalid ASCII characters'
)

if attr.isvar:
try:
if attr.isnullable:
if np.issubdtype(attr.dtype, np.str_) or np.issubdtype(
attr.dtype, np.bytes_
):
attr_val = np.array(
["" if v is None else v for v in values[i]]
)
else:
attr_val = np.nan_to_num(values[i])
else:
attr_val = values[i]
buffer, offsets = array_to_buffer(attr_val, True, False)
except Exception as exc:
raise type(exc)(
f"Failed to convert buffer for attribute: '{attr.name}'"
) from exc
else:
buffer, offsets = values[i], None

buffer_sizes[i] = buffer.nbytes // (attr.dtype.itemsize or 1)
output_values.append(buffer)
output_offsets.append(offsets)

# Check value layouts
if len(values) and nattr > 1:
value = output_values[0]
isfortran = value.ndim > 1 and value.flags.f_contiguous
for value in values:
if value.ndim > 1 and value.flags.f_contiguous and not isfortran:
raise ValueError("mixed C and Fortran array layouts")

# Set data and offsets buffers for dimensions (sparse arrays only)
ibuffer = nattr
if issparse:
for dim_idx, coords in enumerate(coordinates):
dim = self.schema.domain.dim(dim_idx)
if dim.isvar:
buffer, offsets = array_to_buffer(coords, True, False)
else:
buffer, offsets = coords, None
buffer_sizes[ibuffer] = buffer.nbytes // (dim.dtype.itemsize or 1)
output_values.append(buffer)
output_offsets.append(offsets)

name = dim.name
buffer_names.append(name)

ibuffer = ibuffer + 1

for label_name, label_values in labels.items():
# Append buffer name
buffer_names.append(label_name)
# Get label data buffer and offsets buffer for the labels
dim_label = self.schema.dim_label(label_name)
if dim_label.isvar:
buffer, offsets = array_to_buffer(label_values, True, False)
else:
buffer, offsets = label_values, None
buffer_sizes[ibuffer] = buffer.nbytes // (dim_label.dtype.itemsize or 1)
# Append the buffers
output_values.append(buffer)
output_offsets.append(offsets)

ibuffer = ibuffer + 1

# Allocate the query
ctx = lt.Context(self.ctx)
q = lt.Query(ctx, self.array, lt.QueryType.WRITE)

# Set the layout
q.layout = (
lt.LayoutType.UNORDERED
if issparse
else (lt.LayoutType.COL_MAJOR if isfortran else lt.LayoutType.ROW_MAJOR)
)

# Create and set the subarray for the query (dense arrays only)
if not issparse:
q.set_subarray(subarray)

# Set buffers on the query
for i, buffer_name in enumerate(buffer_names):
# Set data buffer
ncells = DataType.from_numpy(output_values[i].dtype).ncells
q.set_data_buffer(
buffer_name,
output_values[i],
buffer_sizes[i] * ncells,
)

# Set offsets buffer
if output_offsets[i] is not None:
output_offsets[i] = output_offsets[i].astype(np.uint64)
q.set_offsets_buffer(
buffer_name, output_offsets[i], output_offsets[i].size
)

# Set validity buffer
if buffer_name in nullmaps:
nulmap = nullmaps[buffer_name]
q.set_validity_buffer(buffer_name, nulmap, nulmap.size)

q._submit()
q.finalize()

fragment_info = self.last_fragment_info
if fragment_info != False:
if not isinstance(fragment_info, dict):
raise ValueError(
f"Expected fragment_info to be a dict, got {type(fragment_info)}"
)
fragment_info.clear()

result = dict()
num_fragments = q.fragment_num()

if num_fragments < 1:
return result

for fragment_idx in range(0, num_fragments):
fragment_uri = q.fragment_uri(fragment_idx)
fragment_t1, fragment_t2 = q.fragment_timestamp_range(fragment_idx)
result[fragment_uri] = (fragment_t1, fragment_t2)

fragment_info.update(result)

def label_index(self, labels):
"""Retrieve data cells with multi-range, domain-inclusive indexing by label.
Returns the cross-product of the ranges.
Expand Down
15 changes: 9 additions & 6 deletions tiledb/cc/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ void init_query(py::module &m) {

.def("fragment_uri", &Query::fragment_uri)

.def("fragment_timestamp_range", &Query::fragment_timestamp_range)

.def("query_status", &Query::query_status)

.def("set_condition", &Query::set_condition)
Expand All @@ -71,13 +73,14 @@ void init_query(py::module &m) {
// uint64_t))&Query::set_data_buffer);

.def("set_data_buffer",
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_data_buffer(name, const_cast<void *>(a.data()), buff_size);
[](Query &q, std::string name, py::array a, uint64_t nelements) {
QueryExperimental::set_data_buffer(
q, name, const_cast<void *>(a.data()), nelements);
})

.def("set_offsets_buffer",
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_offsets_buffer(name, (uint64_t *)(a.data()), buff_size);
[](Query &q, std::string name, py::array a, uint64_t nelements) {
q.set_offsets_buffer(name, (uint64_t *)(a.data()), nelements);
})

.def("set_subarray",
Expand All @@ -86,8 +89,8 @@ void init_query(py::module &m) {
})

.def("set_validity_buffer",
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_validity_buffer(name, (uint8_t *)(a.data()), buff_size);
[](Query &q, std::string name, py::array a, uint64_t nelements) {
q.set_validity_buffer(name, (uint8_t *)(a.data()), nelements);
})

.def("_submit", &Query::submit, py::call_guard<py::gil_scoped_release>())
Expand Down
6 changes: 1 addition & 5 deletions tiledb/dense_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,7 @@ def _setitem_impl(self, selection, val, nullmaps: dict):
f"validity bitmap, got {type(val)}"
)

from .libtiledb import _write_array_wrapper

_write_array_wrapper(
self, subarray, [], attributes, values, labels, nullmaps, False
)
self._write_array(subarray, [], attributes, values, labels, nullmaps, False)

def __array__(self, dtype=None, **kw):
"""Implementation of numpy __array__ protocol (internal).
Expand Down
Loading

0 comments on commit 76e62f6

Please sign in to comment.