Skip to content

Commit

Permalink
Move fragment list consolidation API to pybind
Browse files Browse the repository at this point in the history
  • Loading branch information
kounelisagis committed Apr 29, 2024
1 parent eea0ffe commit c95de36
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 229 deletions.
2 changes: 1 addition & 1 deletion tiledb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
from .highlevel import (
array_exists,
array_fragments,
consolidate,
empty_like,
from_numpy,
open,
Expand All @@ -90,7 +91,6 @@
)
from .libtiledb import (
Array,
consolidate,
ls,
move,
object_type,
Expand Down
83 changes: 75 additions & 8 deletions tiledb/cc/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,49 @@ namespace libtiledbcpp {
using namespace tiledb;
namespace py = pybind11;

void _consolidate_timestamp(const std::string &uri, const std::optional<std::string> &key, tiledb::Config *config, const Context &ctx,
const std::optional<std::tuple<uint64_t, uint64_t>> &timestamp = std::nullopt) {
if (timestamp.has_value()) {
if (std::get<0>(*timestamp) == 0 || std::get<1>(*timestamp) == 0) {
throw std::invalid_argument("'timestamp' argument expects tuple(start: int, end: int) with non-zero values");
}

config->set("sm.consolidation.timestamp_start", std::to_string(std::get<0>(*timestamp)));
config->set("sm.consolidation.timestamp_end", std::to_string(std::get<1>(*timestamp)));
}

tiledb_encryption_type_t key_type = TILEDB_NO_ENCRYPTION;
std::string key_str;

if (key.has_value()) {
key_str = key.value();
if (!key_str.empty()) {
key_type = TILEDB_AES_256_GCM;
}
}

ctx.handle_error(tiledb_array_consolidate_with_key(
ctx.ptr().get(), uri.c_str(), key_type, key_str.c_str(), key_str.size(), config->ptr().get()));
}

void _consolidate_uris(const std::string &uri, const std::optional<std::string> &key, Config *config, const Context &ctx,
const std::vector<std::string> &fragment_uris) {
std::vector<const char *> c_strings;
c_strings.reserve(fragment_uris.size());
for (const auto &str : fragment_uris) {
c_strings.push_back(str.c_str());
}

if (key.has_value() && !key.value().empty()) {
config->set("sm.encryption_key", key.value());
}

ctx.handle_error(tiledb_array_consolidate_fragments(
ctx.ptr().get(), uri.c_str(), c_strings.data(),
fragment_uris.size(), config->ptr().get()));
}


void init_array(py::module &m) {
py::class_<tiledb::Array>(m, "Array")
//.def(py::init<py::object, py::object, py::iterable, py::object,
Expand Down Expand Up @@ -55,14 +98,38 @@ void init_array(py::module &m) {
.def("config", &Array::config)
.def("close", &Array::close)
.def("consolidate",
py::overload_cast<const Context &, const std::string &,
Config *const>(&Array::consolidate),
py::call_guard<py::gil_scoped_release>())
.def("consolidate",
py::overload_cast<const Context &, const std::string &,
tiledb_encryption_type_t, const std::string &,
Config *const>(&Array::consolidate),
py::call_guard<py::gil_scoped_release>())
[](Array &self, Config *config, const std::string &key,
const std::vector<std::string> &fragment_uris,
std::tuple<uint64_t, uint64_t> timestamp) {
if (self.query_type() == TILEDB_READ) {
throw TileDBError(
"cannot consolidate array opened in readonly mode (mode='r')");
}

tiledb::Context ctx(*config);

if (fragment_uris.size() > 0) {
if (timestamp != std::make_tuple<uint64_t, uint64_t>(0, 0)) {
PyErr_WarnEx(PyExc_DeprecationWarning,
"The `timestamp` argument is deprecated; pass a list of "
"fragment URIs to consolidate with `fragment_uris`", 1);
}
_consolidate_uris(self.uri(), key, config, ctx, fragment_uris);
} else {
_consolidate_timestamp(self.uri(), key, config, ctx, timestamp);
}
})
.def_static("_consolidate_timestamp", &_consolidate_timestamp)
.def_static("_consolidate_uris", &_consolidate_uris)
// .def("consolidate",
// py::overload_cast<const Context &, const std::string &,
// Config *const>(&Array::consolidate),
// py::call_guard<py::gil_scoped_release>())
// .def("consolidate",
// py::overload_cast<const Context &, const std::string &,
// tiledb_encryption_type_t, const std::string &,
// Config *const>(&Array::consolidate),
// py::call_guard<py::gil_scoped_release>())
//(void (Array::*)(const Context&, const std::string&,
// tiledb_encryption_type_t, const std::string&,
// Config* const)&Array::consolidate)&Array::consolidate)
Expand Down
66 changes: 66 additions & 0 deletions tiledb/highlevel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import warnings

import numpy as np

import tiledb
import tiledb.cc as lt

from .dataframe_ import create_dim

Expand Down Expand Up @@ -195,6 +198,69 @@ def array_fragments(uri, include_mbrs=False, ctx=None):
return tiledb.FragmentInfoList(uri, include_mbrs, ctx)


def consolidate(
uri, key=None, config=None, ctx=None, fragment_uris=None, timestamp=None
):
"""Consolidates TileDB array fragments for improved read performance
:param str uri: URI to the TileDB Array
:param str key: (default None) Key to decrypt array if the array is encrypted
:param tiledb.Config config: The TileDB Config with consolidation parameters set
:param tiledb.Ctx ctx: (default None) The TileDB Context
:param fragment_uris: (default None) Consolidate the array using a list of fragment file names
:param timestamp: (default None) If not None, consolidate the array using the given tuple(int, int) UNIX seconds range (inclusive). This argument will be ignored if `fragment_uris` is passed.
:rtype: str or bytes
:return: path (URI) to the consolidated TileDB Array
:raises TypeError: cannot convert path to unicode string
:raises: :py:exc:`tiledb.TileDBError`
Rather than passing the timestamp into this function, it may be set with
the config parameters `"sm.vacuum.timestamp_start"`and
`"sm.vacuum.timestamp_end"` which takes in a time in UNIX seconds. If both
are set then this function's `timestamp` argument will be used.
**Example:**
>>> import tiledb, tempfile, numpy as np, os
>>> path = tempfile.mkdtemp()
>>> with tiledb.from_numpy(path, np.zeros(4), timestamp=1) as A:
... pass
>>> with tiledb.open(path, 'w', timestamp=2) as A:
... A[:] = np.ones(4, dtype=np.int64)
>>> with tiledb.open(path, 'w', timestamp=3) as A:
... A[:] = np.ones(4, dtype=np.int64)
>>> with tiledb.open(path, 'w', timestamp=4) as A:
... A[:] = np.ones(4, dtype=np.int64)
>>> len(tiledb.array_fragments(path))
4
>>> fragment_names = [
... os.path.basename(f) for f in tiledb.array_fragments(path).uri
... ]
>>> array_uri = tiledb.consolidate(
... path, fragment_uris=[fragment_names[1], fragment_names[3]]
... )
>>> len(tiledb.array_fragments(path))
3
"""
ctx = _get_ctx(ctx)
if config is None:
config = tiledb.Config(ctx.config())

if fragment_uris is not None:
if timestamp is not None:
warnings.warn(
"The `timestamp` argument will be ignored and only fragments "
"passed to `fragment_uris` will be consolidate",
DeprecationWarning,
)
return lt.Array._consolidate_uris(uri, key, config, ctx, fragment_uris)
else:
return lt.Array._consolidate_timestamp(uri, key, config, ctx, timestamp)


def schema_like(*args, shape=None, dtype=None, ctx=None, **kwargs):
"""
Return an ArraySchema corresponding to a NumPy-like object or
Expand Down
48 changes: 0 additions & 48 deletions tiledb/libmetadata.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -385,54 +385,6 @@ cdef class Metadata:

return bool(has_key)

def consolidate(self):
"""
Consolidate array metadata. Array must be closed.
:return:
"""
# TODO: ensure that the array is not x-locked?
ctx = (<Array?> self.array).ctx
config = ctx.config()
cdef:
uint32_t rc = 0
tiledb_ctx_t* ctx_ptr = <tiledb_ctx_t*>PyCapsule_GetPointer(
ctx.__capsule__(), "ctx")
tiledb_config_t* config_ptr = NULL
tiledb_encryption_type_t key_type = TILEDB_NO_ENCRYPTION
void* key_ptr = NULL
uint32_t key_len = 0
bytes bkey
bytes buri = unicode_path(self.array.uri)
str key = (<Array?>self.array).key

if config:
config_ptr = <tiledb_config_t*>PyCapsule_GetPointer(
config.__capsule__(), "config")

if key is not None:
if isinstance(key, str):
bkey = key.encode('ascii')
else:
bkey = bytes(self.array.key)
key_type = TILEDB_AES_256_GCM
key_ptr = <void *> PyBytes_AS_STRING(bkey)
#TODO: unsafe cast here ssize_t -> uint64_t
key_len = <uint32_t> PyBytes_GET_SIZE(bkey)

cdef const char* buri_ptr = <const char*>buri

with nogil:
rc = tiledb_array_consolidate_with_key(
ctx_ptr,
buri_ptr,
key_type,
key_ptr,
key_len,
config_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

get = MutableMapping.get
update = MutableMapping.update

Expand Down
Loading

0 comments on commit c95de36

Please sign in to comment.