Skip to content

Commit

Permalink
Move fragment list consolidation API to pybind (#1999)
Browse files Browse the repository at this point in the history
  • Loading branch information
kounelisagis authored Jul 8, 2024
1 parent 01661c5 commit 7353729
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 216 deletions.
2 changes: 1 addition & 1 deletion tiledb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
array_exists,
array_fragments,
as_built,
consolidate,
empty_like,
from_numpy,
open,
Expand All @@ -93,7 +94,6 @@
Array,
DenseArrayImpl,
SparseArrayImpl,
consolidate,
ls,
move,
object_type,
Expand Down
48 changes: 38 additions & 10 deletions tiledb/cc/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,45 @@ void init_array(py::module &m) {
.def("config", &Array::config)
.def("close", &Array::close)
.def("consolidate",
py::overload_cast<const Context &, const std::string &,
Config *const>(&Array::consolidate),
py::call_guard<py::gil_scoped_release>())
[](Array &self, const Context &ctx, Config *config) {
if (self.query_type() == TILEDB_READ) {
throw TileDBError("cannot consolidate array opened in readonly "
"mode (mode='r')");
}
Array::consolidate(ctx, self.uri(), config);
})
.def("consolidate",
py::overload_cast<const Context &, const std::string &,
tiledb_encryption_type_t, const std::string &,
Config *const>(&Array::consolidate),
py::call_guard<py::gil_scoped_release>())
//(void (Array::*)(const Context&, const std::string&,
// tiledb_encryption_type_t, const std::string&,
// Config* const)&Array::consolidate)&Array::consolidate)
[](Array &self, const Context &ctx,
const std::vector<std::string> &fragment_uris, Config *config) {
if (self.query_type() == TILEDB_READ) {
throw TileDBError("cannot consolidate array opened in readonly "
"mode (mode='r')");
}
std::vector<const char *> c_strings;
c_strings.reserve(fragment_uris.size());
for (const auto &str : fragment_uris) {
c_strings.push_back(str.c_str());
}

Array::consolidate(ctx, self.uri(), c_strings.data(),
fragment_uris.size(), config);
})
.def("consolidate",
[](Array &self, const Context &ctx,
const std::tuple<int, int> &timestamp, Config *config) {
if (self.query_type() == TILEDB_READ) {
throw TileDBError("cannot consolidate array opened in readonly "
"mode (mode='r')");
}
int start, end;
std::tie(start, end) = timestamp;

config->set("sm.consolidation.timestamp_start",
std::to_string(start));
config->set("sm.consolidation.timestamp_end", std::to_string(end));

Array::consolidate(ctx, self.uri(), config);
})
.def("vacuum", &Array::vacuum)
.def("create",
py::overload_cast<const std::string &, const ArraySchema &,
Expand Down
67 changes: 67 additions & 0 deletions tiledb/highlevel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
import warnings

import numpy as np

import tiledb
import tiledb.cc as lt

from .dataframe_ import create_dim

Expand Down Expand Up @@ -197,6 +199,71 @@ def array_fragments(uri, include_mbrs=False, ctx=None):
return tiledb.FragmentInfoList(uri, include_mbrs, ctx)


def consolidate(uri, config=None, ctx=None, fragment_uris=None, timestamp=None):
"""Consolidates TileDB array fragments for improved read performance
:param str uri: URI to the TileDB Array
:param str key: (default None) Key to decrypt array if the array is encrypted
:param tiledb.Config config: The TileDB Config with consolidation parameters set
:param tiledb.Ctx ctx: (default None) The TileDB Context
:param fragment_uris: (default None) Consolidate the array using a list of fragment file names
:param timestamp: (default None) If not None, consolidate the array using the given tuple(int, int) UNIX seconds range (inclusive). This argument will be ignored if `fragment_uris` is passed.
:rtype: str or bytes
:return: path (URI) to the consolidated TileDB Array
:raises TypeError: cannot convert path to unicode string
:raises: :py:exc:`tiledb.TileDBError`
Rather than passing the timestamp into this function, it may be set with
the config parameters `"sm.vacuum.timestamp_start"`and
`"sm.vacuum.timestamp_end"` which takes in a time in UNIX seconds. If both
are set then this function's `timestamp` argument will be used.
**Example:**
>>> import tiledb, tempfile, numpy as np, os
>>> path = tempfile.mkdtemp()
>>> with tiledb.from_numpy(path, np.zeros(4), timestamp=1) as A:
... pass
>>> with tiledb.open(path, 'w', timestamp=2) as A:
... A[:] = np.ones(4, dtype=np.int64)
>>> with tiledb.open(path, 'w', timestamp=3) as A:
... A[:] = np.ones(4, dtype=np.int64)
>>> with tiledb.open(path, 'w', timestamp=4) as A:
... A[:] = np.ones(4, dtype=np.int64)
>>> len(tiledb.array_fragments(path))
4
>>> fragment_names = [
... os.path.basename(f) for f in tiledb.array_fragments(path).uri
... ]
>>> array_uri = tiledb.consolidate(
... path, fragment_uris=[fragment_names[1], fragment_names[3]]
... )
>>> len(tiledb.array_fragments(path))
3
"""
ctx = _get_ctx(ctx)
if config is None:
config = lt.Config()

arr = lt.Array(ctx, uri, lt.QueryType.WRITE)

if fragment_uris is not None:
if timestamp is not None:
warnings.warn(
"The `timestamp` argument will be ignored and only fragments "
"passed to `fragment_uris` will be consolidated",
DeprecationWarning,
)
return arr.consolidate(ctx, fragment_uris, config)
elif timestamp is not None:
return arr.consolidate(ctx, timestamp, config)
else:
return arr.consolidate(ctx, config)


def schema_like(*args, shape=None, dtype=None, ctx=None, **kwargs):
"""
Return an ArraySchema corresponding to a NumPy-like object or
Expand Down
Loading

0 comments on commit 7353729

Please sign in to comment.