Skip to content

Commit

Permalink
Move _write_array to Python Array class
Browse files Browse the repository at this point in the history
  • Loading branch information
kounelisagis committed Dec 4, 2024
1 parent 46419a7 commit 213490e
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 277 deletions.
172 changes: 172 additions & 0 deletions tiledb/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,178 @@ def domain_index(self):
def dindex(self):
return self.domain_index

def _write_array(
tiledb_array,
subarray,
coordinates: list,
buffer_names: list,
values: list,
labels: dict,
nullmaps: dict,
issparse: bool,
):
# used for buffer conversion (local import to avoid circularity)
from .main import array_to_buffer

isfortran = False
nattr = len(buffer_names)
nlabel = len(labels)

# Create arrays to hold buffer sizes
nbuffer = nattr + nlabel
if issparse:
nbuffer += tiledb_array.schema.ndim
buffer_sizes = np.zeros((nbuffer,), dtype=np.uint64)
buffer_offsets_sizes = np.zeros((nbuffer,), dtype=np.uint64)

# Create lists for data and offset buffers
output_values = list()
output_offsets = list()

# Set data and offset buffers for attributes
for i in range(nattr):
# if dtype is ASCII, ensure all characters are valid
if tiledb_array.schema.attr(i).isascii:
try:
values[i] = np.asarray(values[i], dtype=np.bytes_)
except Exception as exc:
raise tiledb.TileDBError(
f'dtype of attr {tiledb_array.schema.attr(i).name} is "ascii" but attr_val contains invalid ASCII characters'
)

attr = tiledb_array.schema.attr(i)

if attr.isvar:
try:
if attr.isnullable:
if np.issubdtype(attr.dtype, np.str_) or np.issubdtype(
attr.dtype, np.bytes_
):
attr_val = np.array(
["" if v is None else v for v in values[i]]
)
else:
attr_val = np.nan_to_num(values[i])
else:
attr_val = values[i]
buffer, offsets = array_to_buffer(attr_val, True, False)
except Exception as exc:
raise type(exc)(
f"Failed to convert buffer for attribute: '{attr.name}'"
) from exc
buffer_offsets_sizes[i] = offsets.nbytes
else:
buffer, offsets = values[i], None

buffer_sizes[i] = buffer.nbytes
output_values.append(buffer)
output_offsets.append(offsets)

# Check value layouts
if len(values) and nattr > 1:
value = output_values[0]
isfortran = value.ndim > 1 and value.flags.f_contiguous
for value in values:
if value.ndim > 1 and value.flags.f_contiguous and not isfortran:
raise ValueError("mixed C and Fortran array layouts")

# Set data and offsets buffers for dimensions (sparse arrays only)
ibuffer = nattr
if issparse:
for dim_idx, coords in enumerate(coordinates):
if tiledb_array.schema.domain.dim(dim_idx).isvar:
buffer, offsets = array_to_buffer(coords, True, False)
buffer_sizes[ibuffer] = buffer.nbytes
buffer_offsets_sizes[ibuffer] = offsets.nbytes
else:
buffer, offsets = coords, None
buffer_sizes[ibuffer] = buffer.nbytes
output_values.append(buffer)
output_offsets.append(offsets)

name = tiledb_array.schema.domain.dim(dim_idx).name
buffer_names.append(name)

ibuffer = ibuffer + 1

for label_name, label_values in labels.items():
# Append buffer name
buffer_names.append(label_name)
# Get label data buffer and offsets buffer for the labels
dim_label = tiledb_array.schema.dim_label(label_name)
if dim_label.isvar:
buffer, offsets = array_to_buffer(label_values, True, False)
buffer_sizes[ibuffer] = buffer.nbytes
buffer_offsets_sizes[ibuffer] = offsets.nbytes
else:
buffer, offsets = label_values, None
buffer_sizes[ibuffer] = buffer.nbytes
# Append the buffers
output_values.append(buffer)
output_offsets.append(offsets)

ibuffer = ibuffer + 1

# Allocate the query
ctx = lt.Context(tiledb_array.ctx)
q = lt.Query(ctx, tiledb_array.array, lt.QueryType.WRITE)

# Set the layout
layout = (
lt.LayoutType.UNORDERED
if issparse
else (lt.LayoutType.COL_MAJOR if isfortran else lt.LayoutType.ROW_MAJOR)
)
q.layout = layout

# Create and set the subarray for the query (dense arrays only)
if not issparse:
q.set_subarray(subarray)

# Set buffers on the query
for i, buffer_name in enumerate(buffer_names):
# Set data buffer
q.set_data_buffer(
buffer_name, output_values[i], buffer_sizes[i], ctx
)

# Set offsets buffer
if output_offsets[i] is not None:
# output_offsets[i] = output_offsets[i].astype(np.uint64)
q.set_offsets_buffer(
buffer_name, output_offsets[i], len(output_offsets[i], ctx)
)

# Set validity buffer
if buffer_name in nullmaps:
nulmap = nullmaps[buffer_name]
# nulmap = nulmap.astype(np.uint8)
q.set_validity_buffer(buffer_name, nulmap, len(nulmap), ctx)

q._submit()
q.finalize()

fragment_info = tiledb_array.last_fragment_info
if fragment_info is not False:
if not isinstance(fragment_info, dict):
raise ValueError(
f"Expected fragment_info to be a dict, got {type(fragment_info)}"
)
fragment_info.clear()

result = dict()
num_fragments = q.fragment_num()

if num_fragments < 1:
return result

for fragment_idx in range(0, num_fragments):
fragment_uri = q.fragment_uri(fragment_idx)
fragment_t1, fragment_t2 = q.fragment_timestamp_range(fragment_idx)
result[fragment_uri] = (fragment_t1, fragment_t2)

fragment_info.update(result)

def label_index(self, labels):
"""Retrieve data cells with multi-range, domain-inclusive indexing by label.
Returns the cross-product of the ranges.
Expand Down
6 changes: 1 addition & 5 deletions tiledb/dense_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,7 @@ def _setitem_impl(self, selection, val, nullmaps: dict):
f"validity bitmap, got {type(val)}"
)

from .libtiledb import _write_array_wrapper

_write_array_wrapper(
self, subarray, [], attributes, values, labels, nullmaps, False
)
self._write_array(subarray, [], attributes, values, labels, nullmaps, False)

def __array__(self, dtype=None, **kw):
"""Implementation of numpy __array__ protocol (internal).
Expand Down
Loading

0 comments on commit 213490e

Please sign in to comment.