Skip to content

Commit

Permalink
Merge pull request #2910 from activeloopai/inverted_index
Browse files Browse the repository at this point in the history
Deeplake Inverted Index.
  • Loading branch information
nvoxland-al authored Aug 27, 2024
2 parents eeef7cb + 093a457 commit 379897e
Show file tree
Hide file tree
Showing 9 changed files with 879 additions and 121 deletions.
1 change: 1 addition & 0 deletions deeplake/api/tests/test_access_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def test_access_method(s3_ds_generator):


@pytest.mark.slow
@pytest.mark.flaky(reruns=3)
def test_access_method_with_creds(
hub_cloud_ds_generator, hub_cloud_dev_managed_creds_key
):
Expand Down
7 changes: 7 additions & 0 deletions deeplake/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ def __getitem__(
enabled_tensors=self.enabled_tensors,
view_base=self._view_base or self,
libdeeplake_dataset=self.libdeeplake_dataset,
index_params=self.index_params,
)
elif "/" in item:
splt = posixpath.split(item)
Expand Down Expand Up @@ -619,6 +620,7 @@ def __getitem__(
enabled_tensors=enabled_tensors,
view_base=self._view_base or self,
libdeeplake_dataset=self.libdeeplake_dataset,
index_params=self.index_params,
)
elif isinstance(item, tuple) and len(item) and isinstance(item[0], str):
ret = self
Expand Down Expand Up @@ -648,6 +650,7 @@ def __getitem__(
enabled_tensors=self.enabled_tensors,
view_base=self._view_base or self,
libdeeplake_dataset=self.libdeeplake_dataset,
index_params=self.index_params,
)
else:
raise InvalidKeyTypeError(item)
Expand Down Expand Up @@ -2925,6 +2928,7 @@ def parent(self):
path=self.path,
link_creds=self.link_creds,
libdeeplake_dataset=self.libdeeplake_dataset,
index_params=self.index_params,
)
self.storage.autoflush = autoflush
return ds
Expand All @@ -2948,6 +2952,7 @@ def root(self):
link_creds=self.link_creds,
view_base=self._view_base,
libdeeplake_dataset=self.libdeeplake_dataset,
index_params=self.index_params,
)
self.storage.autoflush = autoflush
return ds
Expand All @@ -2971,6 +2976,7 @@ def no_view_dataset(self):
pad_tensors=self._pad_tensors,
enabled_tensors=self.enabled_tensors,
libdeeplake_dataset=self.libdeeplake_dataset,
index_params=self.index_params,
)

def _create_group(self, name: str) -> "Dataset":
Expand Down Expand Up @@ -4927,6 +4933,7 @@ def max_view(self):
pad_tensors=True,
enabled_tensors=self.enabled_tensors,
libdeeplake_dataset=self.libdeeplake_dataset,
index_params=self.index_params,
)

def random_split(self, lengths: Sequence[Union[int, float]]):
Expand Down
244 changes: 151 additions & 93 deletions deeplake/core/index_maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class INDEX_OP_TYPE(Enum):
INCREMENTAL_INDEX = 3


def is_embedding_tensor(tensor):
def validate_embedding_tensor(tensor):
"""Check if a tensor is an embedding tensor."""

valid_names = ["embedding", "embeddings"]
valid_names = ["embedding"]

return (
tensor.htype == "embedding"
Expand All @@ -29,16 +29,9 @@ def is_embedding_tensor(tensor):
)


def validate_embedding_tensor(tensor):
def validate_text_tensor(tensor):
"""Check if a tensor is an embedding tensor."""

valid_names = ["embedding"]

return (
tensor.htype == "embedding"
or tensor.meta.name in valid_names
or tensor.key in valid_names
)
return tensor.htype == "text"


def fetch_embedding_tensor(dataset):
Expand All @@ -49,23 +42,34 @@ def fetch_embedding_tensor(dataset):
return None


def index_exists(dataset):
def index_exists_emb(emb_tensor):
"""Check if the Index already exists."""
emb_tensor = fetch_embedding_tensor(dataset)
if emb_tensor is not None:
emb = validate_embedding_tensor(emb_tensor)
if emb:
vdb_indexes = emb_tensor.fetch_vdb_indexes()
if len(vdb_indexes) == 0:
return False
else:
return True
else:
return False
return False


def index_partition_count(dataset):
emb_tensor = fetch_embedding_tensor(dataset)
if emb_tensor is not None:
vdb_indexes = emb_tensor.fetch_vdb_indexes()
def index_exists_txt(txt_tensor):
"""Check if the Index already exists."""
txt = validate_text_tensor(txt_tensor)
if txt:
vdb_indexes = txt_tensor.fetch_vdb_indexes()
if len(vdb_indexes) == 0:
return False
else:
return True
return False


def index_partition_count(tensor):
is_emb_tensor = validate_embedding_tensor(tensor)
if is_emb_tensor:
vdb_indexes = tensor.fetch_vdb_indexes()
if len(vdb_indexes) == 0:
return 1
else:
Expand Down Expand Up @@ -133,19 +137,28 @@ def check_index_params(self):
return False


def index_operation_type_dataset(self, num_rows, changed_data_len):
if not index_exists(self):
if self.index_params is None:
def index_operation_type_dataset(
tensor, dataset, num_rows, changed_data_len, is_embedding=False
):
if is_embedding:
vdb_index_exists = index_exists_emb(tensor)
if not vdb_index_exists:
if dataset.index_params is None:
return INDEX_OP_TYPE.NOOP
if changed_data_len == 0:
return INDEX_OP_TYPE.NOOP
threshold = dataset.index_params.get("threshold", -1)
below_threshold = threshold <= 0 or num_rows < threshold
if not below_threshold:
return INDEX_OP_TYPE.CREATE_INDEX

return INDEX_OP_TYPE.INCREMENTAL_INDEX
else:
# for Text tensor i.e. inverted index,
vdb_index_exists = index_exists_txt(tensor)
if not vdb_index_exists or changed_data_len == 0:
return INDEX_OP_TYPE.NOOP
threshold = self.index_params.get("threshold", -1)
below_threshold = threshold <= 0 or num_rows < threshold
if not below_threshold:
return INDEX_OP_TYPE.CREATE_INDEX

if not check_vdb_indexes(self) or changed_data_len == 0:
return INDEX_OP_TYPE.NOOP

return INDEX_OP_TYPE.INCREMENTAL_INDEX
return INDEX_OP_TYPE.REGENERATE_INDEX


def get_index_metric(metric):
Expand All @@ -158,9 +171,23 @@ def get_index_metric(metric):


def normalize_additional_params(params: dict) -> dict:
mapping = {"efconstruction": "efConstruction", "m": "M", "partitions": "partitions"}

allowed_keys = ["efConstruction", "m", "partitions"]
mapping = {
"efconstruction": "efConstruction",
"m": "M",
"partitions": "partitions",
"bloom": "bloom_filter_size",
"bloom_size": "bloom_filter_size",
"Segment_Size": "segment_size",
"seg_size": "segment_size",
}

allowed_keys = [
"efConstruction",
"m",
"partitions",
"bloom_filter_size",
"segment_size",
]

# New dictionary to store the result with desired key format
result_dict = {}
Expand All @@ -187,20 +214,16 @@ def normalize_additional_params(params: dict) -> dict:
return result_dict


def check_vdb_indexes(dataset):
tensors = dataset.tensors

vdb_index_present = False
for _, tensor in tensors.items():
is_embedding = is_embedding_tensor(tensor)
has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes")
try:
vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0
except AttributeError:
vdb_index_ids_present = False
def check_embedding_vdb_indexes(tensor):
is_embedding = validate_embedding_tensor(tensor)
has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes")
try:
vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0
except AttributeError:
vdb_index_ids_present = False

if is_embedding and has_vdb_indexes and vdb_index_ids_present:
return True
if is_embedding and has_vdb_indexes and vdb_index_ids_present:
return True
return False


Expand All @@ -210,12 +233,18 @@ def _incr_maintenance_vdb_indexes(
try:
is_embedding = tensor.htype == "embedding"
has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes")

is_text = tensor.htype == "text"
if is_text:
raise Exception(
"Inverted index does not support incremental index maintenance."
)
try:
vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0
except AttributeError:
vdb_index_ids_present = False

if is_embedding and has_vdb_indexes and vdb_index_ids_present:
if (is_embedding or is_text) and has_vdb_indexes and vdb_index_ids_present:
for vdb_index in tensor.meta.vdb_indexes:
tensor.update_vdb_index(
operation_kind=index_operation,
Expand All @@ -226,14 +255,15 @@ def _incr_maintenance_vdb_indexes(
raise Exception(f"An error occurred while regenerating VDB indexes: {e}")


# Routine to identify the index Operation.
def index_operation_vectorstore(self):
if not index_used(self.exec_option):
return None

emb_tensor = fetch_embedding_tensor(self.dataset)
if emb_tensor is None:
return None

if index_exists(self.dataset) and check_index_params(self):
if index_exists_emb(emb_tensor) and check_index_params(self):
return emb_tensor.get_vdb_indexes()[0]["distance"]

threshold = self.index_params.get("threshold", -1)
Expand Down Expand Up @@ -262,48 +292,76 @@ def index_operation_vectorstore(self):


def index_operation_dataset(self, dml_type, rowids):
emb_tensor = fetch_embedding_tensor(self)
if emb_tensor is None:
return

index_operation_type = index_operation_type_dataset(
self,
emb_tensor.chunk_engine.num_samples,
len(rowids),
)

if index_operation_type == INDEX_OP_TYPE.NOOP:
return
if (
index_operation_type == INDEX_OP_TYPE.CREATE_INDEX
or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX
):
if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX:
try:
vdb_indexes = emb_tensor.get_vdb_indexes()
for vdb_index in vdb_indexes:
emb_tensor.delete_vdb_index(vdb_index["id"])
except Exception as e:
raise Exception(
f"An error occurred while regenerating VDB indexes: {e}"
)
distance_str = self.index_params.get("distance_metric", "COS")
additional_params_dict = self.index_params.get("additional_params", None)
distance = get_index_metric(distance_str.upper())
if additional_params_dict and len(additional_params_dict) > 0:
param_dict = normalize_additional_params(additional_params_dict)
emb_tensor.create_vdb_index(
"hnsw_1", distance=distance, additional_params=param_dict
tensors = self.tensors
for _, tensor in tensors.items():
is_embedding_tensor = validate_embedding_tensor(tensor)
is_text_tensor = validate_text_tensor(tensor)
index_operation_type = INDEX_OP_TYPE.NOOP

if is_embedding_tensor or is_text_tensor:
index_operation_type = index_operation_type_dataset(
tensor,
self,
tensor.chunk_engine.num_samples,
len(rowids),
is_embedding_tensor,
)
else:
emb_tensor.create_vdb_index("hnsw_1", distance=distance)
elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX:
partition_count = index_partition_count(self)
if partition_count > 1:
_incr_maintenance_vdb_indexes(
emb_tensor, rowids, dml_type, is_partitioned=True
)
continue

if index_operation_type == INDEX_OP_TYPE.NOOP:
continue
if (
index_operation_type == INDEX_OP_TYPE.CREATE_INDEX
or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX
):
saved_vdb_indexes = []
if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX:
try:
vdb_indexes = tensor.get_vdb_indexes()
for vdb_index in vdb_indexes:
saved_vdb_indexes.append(vdb_index)
tensor.delete_vdb_index(vdb_index["id"])
except Exception as e:
raise Exception(
f"An error occurred while regenerating VDB indexes: {e}"
)
if is_embedding_tensor:
distance_str = self.index_params.get("distance_metric", "COS")
additional_params_dict = self.index_params.get(
"additional_params", None
)
distance = get_index_metric(distance_str.upper())
if additional_params_dict and len(additional_params_dict) > 0:
param_dict = normalize_additional_params(additional_params_dict)
tensor.create_vdb_index(
"hnsw_1", distance=distance, additional_params=param_dict
)
else:
tensor.create_vdb_index("hnsw_1", distance=distance)
elif is_text_tensor:
if len(saved_vdb_indexes) > 0:
for vdb_index in saved_vdb_indexes:
id = vdb_index["id"]
additional_params_dict = vdb_index.get(
"additional_params", None
)
if additional_params_dict and len(additional_params_dict) > 0:
param_dict = normalize_additional_params(
additional_params_dict
)
tensor.create_vdb_index(id, additional_params=param_dict)
else:
tensor.create_vdb_index(id)
continue
elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX:
partition_count = index_partition_count(tensor)
if partition_count > 1:
_incr_maintenance_vdb_indexes(
tensor, rowids, dml_type, is_partitioned=True
)
else:
_incr_maintenance_vdb_indexes(tensor, rowids, dml_type)
continue
else:
_incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type)
else:
raise Exception("Unknown index operation")
raise Exception("Unknown index operation")
Loading

0 comments on commit 379897e

Please sign in to comment.