From 09ae7a69d70f1a483acecac88b3a7915594b8968 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Thu, 18 Jul 2024 16:01:30 +0400 Subject: [PATCH 01/17] Inverted Index Phase 1. Initial commit. --- deeplake/constants.py | 2 + deeplake/core/dataset/dataset.py | 7 ++ deeplake/core/index_maintenance.py | 157 +++++++++++++++++++++-------- deeplake/core/meta/tensor_meta.py | 3 + deeplake/core/tensor.py | 85 ++++++++++++++-- deeplake/htype.py | 2 +- 6 files changed, 202 insertions(+), 54 deletions(-) diff --git a/deeplake/constants.py b/deeplake/constants.py index 4446f7836e..604686b26f 100644 --- a/deeplake/constants.py +++ b/deeplake/constants.py @@ -326,10 +326,12 @@ DEFAULT_VECTORSTORE_DISTANCE_METRIC = "COS" DEFAULT_DEEPMEMORY_DISTANCE_METRIC = "deepmemory_distance" +DEFAULT_VECTORSTORE_INVERTED_INDEX_TENSOR = "" DEFAULT_VECTORSTORE_INDEX_PARAMS = { "threshold": -1, "distance_metric": DEFAULT_VECTORSTORE_DISTANCE_METRIC, + "inverted_index_tensor": DEFAULT_VECTORSTORE_INVERTED_INDEX_TENSOR, "additional_params": { "efConstruction": 600, "M": 32, diff --git a/deeplake/core/dataset/dataset.py b/deeplake/core/dataset/dataset.py index a207894a1d..4c01dfaa67 100644 --- a/deeplake/core/dataset/dataset.py +++ b/deeplake/core/dataset/dataset.py @@ -569,6 +569,7 @@ def __getitem__( enabled_tensors=self.enabled_tensors, view_base=self._view_base or self, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) elif "/" in item: splt = posixpath.split(item) @@ -619,6 +620,7 @@ def __getitem__( enabled_tensors=enabled_tensors, view_base=self._view_base or self, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) elif isinstance(item, tuple) and len(item) and isinstance(item[0], str): ret = self @@ -648,6 +650,7 @@ def __getitem__( enabled_tensors=self.enabled_tensors, view_base=self._view_base or self, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) else: raise InvalidKeyTypeError(item) @@ -2925,6 +2928,7 @@ def parent(self): path=self.path, link_creds=self.link_creds, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) self.storage.autoflush = autoflush return ds @@ -2948,6 +2952,7 @@ def root(self): link_creds=self.link_creds, view_base=self._view_base, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) self.storage.autoflush = autoflush return ds @@ -2971,6 +2976,7 @@ def no_view_dataset(self): pad_tensors=self._pad_tensors, enabled_tensors=self.enabled_tensors, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) def _create_group(self, name: str) -> "Dataset": @@ -4909,6 +4915,7 @@ def max_view(self): pad_tensors=True, enabled_tensors=self.enabled_tensors, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) def random_split(self, lengths: Sequence[Union[int, float]]): diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index 58e294de09..0881e5ec2a 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -40,6 +40,17 @@ def validate_embedding_tensor(tensor): or tensor.key in valid_names ) +def validate_text_tensor(tensor): + """Check if a tensor is an embedding tensor.""" + + valid_names = ["text", "id", "metadata"] + + return ( + tensor.htype == "str" + or tensor.meta.name in valid_names + or tensor.key in valid_names + ) + def fetch_embedding_tensor(dataset): tensors = dataset.tensors @@ -48,18 +59,32 @@ def fetch_embedding_tensor(dataset): return tensor return None +def fetch_text_tensor(dataset): + tensors = dataset.tensors + for _, tensor in tensors.items(): + if validate_text_tensor(tensor): + return tensor + return None + def index_exists(dataset): """Check if the Index already exists.""" emb_tensor = fetch_embedding_tensor(dataset) + txt_tensor = fetch_text_tensor(dataset) if emb_tensor is not None: vdb_indexes = emb_tensor.fetch_vdb_indexes() if len(vdb_indexes) == 0: return False else: return True - else: - return False + elif txt_tensor is not None: + vdb_indexes = txt_tensor.fetch_vdb_indexes() + if len(vdb_indexes) == 0: + return False + else: + return True + + return False def index_partition_count(dataset): @@ -132,6 +157,14 @@ def check_index_params(self): return False +def check_index_params_text(self): + txt_tensor = fetch_text_tensor(self.dataset) + indexes = txt_tensor.get_vdb_indexes() + if len(indexes) == 0: + return False + else: + return True + def index_operation_type_dataset(self, num_rows, changed_data_len): if not index_exists(self): @@ -210,12 +243,14 @@ def _incr_maintenance_vdb_indexes( try: is_embedding = tensor.htype == "embedding" has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes") + + is_text = tensor.htype == "str" try: vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0 except AttributeError: vdb_index_ids_present = False - if is_embedding and has_vdb_indexes and vdb_index_ids_present: + if (is_embedding or is_text) and has_vdb_indexes and vdb_index_ids_present: for vdb_index in tensor.meta.vdb_indexes: tensor.update_vdb_index( operation_kind=index_operation, @@ -232,45 +267,70 @@ def index_operation_vectorstore(self): return None emb_tensor = fetch_embedding_tensor(self.dataset) + txt_tensor = fetch_text_tensor(self.dataset) + + index_ext = index_exists(self.dataset) - if index_exists(self.dataset) and check_index_params(self): + if index_ext and check_index_params(self): return emb_tensor.get_vdb_indexes()[0]["distance"] + elif index_ext and check_index_params_text(self): + return txt_tensor.get_vdb_indexes()[0]["distance"] threshold = self.index_params.get("threshold", -1) below_threshold = threshold < 0 or len(self.dataset) < threshold if below_threshold: return None - if not check_index_params(self): + if emb_tensor is not None and not check_index_params(self): try: vdb_indexes = emb_tensor.get_vdb_indexes() for vdb_index in vdb_indexes: emb_tensor.delete_vdb_index(vdb_index["id"]) except Exception as e: raise Exception(f"An error occurred while removing VDB indexes: {e}") - distance_str = self.index_params.get("distance_metric", "COS") - additional_params_dict = self.index_params.get("additional_params", None) - distance = get_index_metric(distance_str.upper()) - if additional_params_dict and len(additional_params_dict) > 0: - param_dict = normalize_additional_params(additional_params_dict) - emb_tensor.create_vdb_index( - "hnsw_1", distance=distance, additional_params=param_dict - ) - else: - emb_tensor.create_vdb_index("hnsw_1", distance=distance) - return distance + + distance_str = self.index_params.get("distance_metric", "COS") + additional_params_dict = self.index_params.get("additional_params", None) + distance = get_index_metric(distance_str.upper()) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params(additional_params_dict) + emb_tensor.create_vdb_index( + "hnsw_1", distance=distance, additional_params=param_dict + ) + else: + emb_tensor.create_vdb_index("hnsw_1", distance=distance) + return distance + + if txt_tensor is not None and not check_index_params_text(self): + try: + vdb_indexes = txt_tensor.get_vdb_indexes() + for vdb_index in vdb_indexes: + txt_tensor.delete_vdb_index(vdb_index["id"]) + except Exception as e: + raise Exception(f"An error occurred while removing VDB indexes: {e}") + + txt_tensor.create_vdb_index("inverted_index1") + return None def index_operation_dataset(self, dml_type, rowids): emb_tensor = fetch_embedding_tensor(self) - if emb_tensor is None: + txt_tensor = fetch_text_tensor(self) + if emb_tensor is None and txt_tensor is None: return - index_operation_type = index_operation_type_dataset( - self, - emb_tensor.chunk_engine.num_samples, - len(rowids), - ) + if emb_tensor is not None: + index_operation_type = index_operation_type_dataset( + self, + emb_tensor.chunk_engine.num_samples, + len(rowids), + ) + elif txt_tensor is not None: + index_operation_type = index_operation_type_dataset( + self, + txt_tensor.chunk_engine.num_samples, + len(rowids), + ) if index_operation_type == INDEX_OP_TYPE.NOOP: return @@ -280,30 +340,43 @@ def index_operation_dataset(self, dml_type, rowids): ): if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX: try: - vdb_indexes = emb_tensor.get_vdb_indexes() - for vdb_index in vdb_indexes: - emb_tensor.delete_vdb_index(vdb_index["id"]) + if emb_tensor is not None: + vdb_indexes = emb_tensor.get_vdb_indexes() + for vdb_index in vdb_indexes: + emb_tensor.delete_vdb_index(vdb_index["id"]) + elif txt_tensor is not None: + vdb_indexes = txt_tensor.get_vdb_indexes() + for vdb_index in vdb_indexes: + txt_tensor.delete_vdb_index(vdb_index["id"]) except Exception as e: raise Exception( f"An error occurred while regenerating VDB indexes: {e}" ) - distance_str = self.index_params.get("distance_metric", "COS") - additional_params_dict = self.index_params.get("additional_params", None) - distance = get_index_metric(distance_str.upper()) - if additional_params_dict and len(additional_params_dict) > 0: - param_dict = normalize_additional_params(additional_params_dict) - emb_tensor.create_vdb_index( - "hnsw_1", distance=distance, additional_params=param_dict - ) - else: - emb_tensor.create_vdb_index("hnsw_1", distance=distance) + + if emb_tensor is not None: + distance_str = self.index_params.get("distance_metric", "COS") + additional_params_dict = self.index_params.get("additional_params", None) + distance = get_index_metric(distance_str.upper()) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params(additional_params_dict) + emb_tensor.create_vdb_index( + "hnsw_1", distance=distance, additional_params=param_dict + ) + else: + emb_tensor.create_vdb_index("hnsw_1", distance=distance) + elif txt_tensor is not None: + txt_tensor.create_vdb_index("inverted_index1") + elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX: - partition_count = index_partition_count(self) - if partition_count > 1: - _incr_maintenance_vdb_indexes( - emb_tensor, rowids, dml_type, is_partitioned=True - ) - else: - _incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type) + if emb_tensor is not None: + partition_count = index_partition_count(self) + if partition_count > 1: + _incr_maintenance_vdb_indexes( + emb_tensor, rowids, dml_type, is_partitioned=True + ) + else: + _incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type) + elif txt_tensor is not None: + _incr_maintenance_vdb_indexes(txt_tensor, rowids, dml_type) else: raise Exception("Unknown index operation") diff --git a/deeplake/core/meta/tensor_meta.py b/deeplake/core/meta/tensor_meta.py index 7ec4e5371a..259fce06b7 100644 --- a/deeplake/core/meta/tensor_meta.py +++ b/deeplake/core/meta/tensor_meta.py @@ -241,6 +241,9 @@ def __setstate__(self, state: Dict[str, Any]): if self.htype == "embedding" and not hasattr(self, "vdb_indexes"): self.vdb_indexes = [] self._required_meta_keys += ("vdb_indexes",) + if self.htype == "text" and not hasattr(self, "vdb_indexes"): + self.vdb_indexes = [] + self._required_meta_keys += ("vdb_indexes",) @property def nbytes(self): diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index 011733af52..c2c9a628b3 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -81,6 +81,8 @@ ) import warnings import webbrowser +import pathlib +import struct def create_tensor( @@ -1530,6 +1532,32 @@ def invalidate_libdeeplake_dataset(self): """Invalidates the libdeeplake dataset object.""" self.dataset.libdeeplake_dataset = None + def deserialize_inverted_index(self, serialized_data): + from io import BytesIO + + stream = BytesIO(serialized_data) + + # Read number of partitions + metadataSize = int.from_bytes( + stream.read(8), "little" + ) # Assuming size_t is 8 bytes + + metadata_bytes = stream.read(metadataSize) + metadata = json.loads(metadata_bytes.decode("utf-8")) + + temp_paths_size = int.from_bytes(stream.read(8), 'little') + + + + paths_string = stream.read().decode('utf-8') + temp_serialized_paths = paths_string.strip().split('\n') + + # Check if the declared number of paths match the actual number + if temp_paths_size != len(temp_serialized_paths): + raise ValueError("Mismatch between declared count and actual number of paths") + + return metadata, temp_serialized_paths + def deserialize_partitions(self, serialized_data, incremental_dml=False): from io import BytesIO @@ -1597,8 +1625,6 @@ def update_vdb_index( is_partitioned: bool = False, ): self.storage.check_readonly() - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") self.invalidate_libdeeplake_dataset() self.dataset.flush() from deeplake.enterprise.convert_to_libdeeplake import ( @@ -1734,8 +1760,6 @@ def create_vdb_index( additional_params: Optional[Dict[str, int]] = None, ): self.storage.check_readonly() - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") if not self.dataset.libdeeplake_dataset is None: ds = self.dataset.libdeeplake_dataset else: @@ -1747,6 +1771,51 @@ def create_vdb_index( ts = getattr(ds, self.meta.name) from indra import api # type: ignore + if self.meta.htype == "text": + self.meta.add_vdb_index( + id=id, type="inverted_index", distance=None + ) + try: + if additional_params is None: + index = api.vdb.generate_index( + ts, index_type="inverted_index" + ) + else: + index = api.vdb.generate_index( + ts, + index_type="inverted_index", + param=additional_params, + ) + b = index.serialize() + commit_id = self.version_state["commit_id"] + metadata, temp_serialized_paths = self.deserialize_inverted_index(b) + inverted_meta_key = get_tensor_vdb_index_key( + self.key, commit_id, f"{id}_inverted_metadata" + ) + metadata_json = json.dumps(metadata) + metadata_bytes = metadata_json.encode("utf-8") + self.storage[inverted_meta_key] = metadata_bytes + temp_serialized_paths_count = len(temp_serialized_paths) + temp_serialized_paths = [str(path) for path in temp_serialized_paths] + # Pull the file the location specified in the path and store it in the storage + for i, path in enumerate(temp_serialized_paths): + # extract the file name from the path which should after last "/" + file_name = pathlib.Path(path).name + + # read file and store it in the storage + with open(path, "rb") as f: + inv_key = get_tensor_vdb_index_key(self.key, commit_id, f"{id}_{file_name}") + self.storage[inv_key] = f.read() + # close the file + f.close() + + self.invalidate_libdeeplake_dataset() + #self.storage.flush() + except: + self.meta.remove_vdb_index(id=id) + raise + return index + if type(distance) == DistanceType: distance = distance.value self.meta.add_vdb_index( @@ -1795,8 +1864,6 @@ def create_vdb_index( def delete_vdb_index(self, id: str): self.storage.check_readonly() - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") commit_id = self.version_state["commit_id"] self.unload_vdb_index_cache() if self.is_partitioned_vdb_index(): @@ -1844,8 +1911,6 @@ def _verify_and_delete_vdb_indexes(self): raise Exception(f"An error occurred while deleting VDB indexes: {e}") def load_vdb_index(self, id: str): - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") if not self.meta.contains_vdb_index(id): raise ValueError(f"Tensor meta has no vdb index with name '{id}'.") if not self.dataset.libdeeplake_dataset is None: @@ -1866,8 +1931,6 @@ def load_vdb_index(self, id: str): raise ValueError(f"An error occurred while loading the VDB index {id}: {e}") def unload_vdb_index_cache(self): - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") if not self.dataset.libdeeplake_dataset is None: ds = self.dataset.libdeeplake_dataset else: @@ -1886,7 +1949,7 @@ def unload_vdb_index_cache(self): raise Exception(f"An error occurred while cleaning VDB Cache: {e}") def get_vdb_indexes(self) -> List[Dict[str, str]]: - if self.meta.htype != "embedding": + if self.meta.htype != "embedding" and self.meta.htype != "str": raise Exception(f"Only supported for embedding tensors.") return self.meta.vdb_indexes diff --git a/deeplake/htype.py b/deeplake/htype.py index 2933f20932..515ae45f06 100644 --- a/deeplake/htype.py +++ b/deeplake/htype.py @@ -95,7 +95,7 @@ class htype: "dtype": "Any", }, htype.LIST: {"dtype": "List"}, - htype.TEXT: {"dtype": "str"}, + htype.TEXT: {"dtype": "str", "vdb_indexes": []}, htype.TAG: {"dtype": "List"}, htype.DICOM: {"sample_compression": "dcm"}, htype.NIFTI: {}, From 8e41b7e8e55e4528098ad4bca9ae3914b26acbf4 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Tue, 13 Aug 2024 20:16:05 +0530 Subject: [PATCH 02/17] Inverted Index Phase 2. Drop inverted index. --- deeplake/core/tensor.py | 42 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index c2c9a628b3..6f490dc11a 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -1547,8 +1547,6 @@ def deserialize_inverted_index(self, serialized_data): temp_paths_size = int.from_bytes(stream.read(8), 'little') - - paths_string = stream.read().decode('utf-8') temp_serialized_paths = paths_string.strip().split('\n') @@ -1606,6 +1604,8 @@ def deserialize_partitions(self, serialized_data, incremental_dml=False): return partition_info, partitions_data, incr_info def is_partitioned_vdb_index(self): + if self.htype != "embedding": + return False vdb_indexes = self.get_vdb_indexes() if len(vdb_indexes) == 0: return False @@ -1618,6 +1618,16 @@ def is_partitioned_vdb_index(self): return True return False + def is_inverted_index(self): + if self.htype == "text": + vdb_indexes = self.get_vdb_indexes() + if len(vdb_indexes) == 0: + return False + for vdb_index in vdb_indexes: + if vdb_index["type"] == "inverted_index": + return True + return False + def update_vdb_index( self, operation_kind: int, @@ -1887,6 +1897,30 @@ def delete_vdb_index(self, id: str): f"{id}_partition_metadata", ) ) + elif self.is_inverted_index(): + metadata_file = self.storage[ + get_tensor_vdb_index_key( + self.key, + self.version_state["commit_id"], + f"{id}_inverted_metadata", + ) + ] + metadata = json.loads(metadata_file.decode("utf-8")) + segment_names = list(metadata.keys()) + # print("Parsed metadata type:", type(metadata)) + # print("Parsed metadata content:", metadata) + for name in segment_names: + partition_key = get_tensor_vdb_index_key( + self.key, self.version_state["commit_id"], f"{id}_inv_{name}" + ) + self.storage.pop(partition_key) + self.storage.pop( + get_tensor_vdb_index_key( + self.key, + self.version_state["commit_id"], + f"{id}_inverted_metadata", + ) + ) else: self.storage.pop(get_tensor_vdb_index_key(self.key, commit_id, id)) @@ -1949,8 +1983,8 @@ def unload_vdb_index_cache(self): raise Exception(f"An error occurred while cleaning VDB Cache: {e}") def get_vdb_indexes(self) -> List[Dict[str, str]]: - if self.meta.htype != "embedding" and self.meta.htype != "str": - raise Exception(f"Only supported for embedding tensors.") + if self.meta.htype != "embedding" and self.meta.htype != "text": + raise Exception(f"Only supported for embedding and text tensors.") return self.meta.vdb_indexes def fetch_vdb_indexes(self) -> List[Dict[str, str]]: From 9ce77d9d96cc1ceca23934bd126019151f037ee8 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Tue, 13 Aug 2024 23:52:31 +0530 Subject: [PATCH 03/17] Inverted Index Phase 3. Run black. --- deeplake/core/index_maintenance.py | 3 +++ deeplake/core/tensor.py | 24 ++++++++++++------------ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index 0881e5ec2a..32de9babc7 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -40,6 +40,7 @@ def validate_embedding_tensor(tensor): or tensor.key in valid_names ) + def validate_text_tensor(tensor): """Check if a tensor is an embedding tensor.""" @@ -59,6 +60,7 @@ def fetch_embedding_tensor(dataset): return tensor return None + def fetch_text_tensor(dataset): tensors = dataset.tensors for _, tensor in tensors.items(): @@ -157,6 +159,7 @@ def check_index_params(self): return False + def check_index_params_text(self): txt_tensor = fetch_text_tensor(self.dataset) indexes = txt_tensor.get_vdb_indexes() diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index 6f490dc11a..85c68c9a32 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -1545,14 +1545,16 @@ def deserialize_inverted_index(self, serialized_data): metadata_bytes = stream.read(metadataSize) metadata = json.loads(metadata_bytes.decode("utf-8")) - temp_paths_size = int.from_bytes(stream.read(8), 'little') + temp_paths_size = int.from_bytes(stream.read(8), "little") - paths_string = stream.read().decode('utf-8') - temp_serialized_paths = paths_string.strip().split('\n') + paths_string = stream.read().decode("utf-8") + temp_serialized_paths = paths_string.strip().split("\n") # Check if the declared number of paths match the actual number if temp_paths_size != len(temp_serialized_paths): - raise ValueError("Mismatch between declared count and actual number of paths") + raise ValueError( + "Mismatch between declared count and actual number of paths" + ) return metadata, temp_serialized_paths @@ -1782,14 +1784,10 @@ def create_vdb_index( from indra import api # type: ignore if self.meta.htype == "text": - self.meta.add_vdb_index( - id=id, type="inverted_index", distance=None - ) + self.meta.add_vdb_index(id=id, type="inverted_index", distance=None) try: if additional_params is None: - index = api.vdb.generate_index( - ts, index_type="inverted_index" - ) + index = api.vdb.generate_index(ts, index_type="inverted_index") else: index = api.vdb.generate_index( ts, @@ -1814,13 +1812,15 @@ def create_vdb_index( # read file and store it in the storage with open(path, "rb") as f: - inv_key = get_tensor_vdb_index_key(self.key, commit_id, f"{id}_{file_name}") + inv_key = get_tensor_vdb_index_key( + self.key, commit_id, f"{id}_{file_name}" + ) self.storage[inv_key] = f.read() # close the file f.close() self.invalidate_libdeeplake_dataset() - #self.storage.flush() + # self.storage.flush() except: self.meta.remove_vdb_index(id=id) raise From 3936301c1812a760c57e6a2b1eb1bf669b314e9c Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Fri, 23 Aug 2024 19:46:50 +0530 Subject: [PATCH 04/17] Inverted Index Phase 4. Add test Cases. --- deeplake/core/index_maintenance.py | 51 +- deeplake/core/tensor.py | 24 +- deeplake/core/tests/test_inverted_indexes.py | 484 +++++++++++++++++++ 3 files changed, 545 insertions(+), 14 deletions(-) create mode 100644 deeplake/core/tests/test_inverted_indexes.py diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index 32de9babc7..e5440457d3 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -170,6 +170,9 @@ def check_index_params_text(self): def index_operation_type_dataset(self, num_rows, changed_data_len): + if validate_text_tensor(self): + return INDEX_OP_TYPE.REGENERATE_INDEX + if not index_exists(self): if self.index_params is None: return INDEX_OP_TYPE.NOOP @@ -194,9 +197,23 @@ def get_index_metric(metric): def normalize_additional_params(params: dict) -> dict: - mapping = {"efconstruction": "efConstruction", "m": "M", "partitions": "partitions"} - - allowed_keys = ["efConstruction", "m", "partitions"] + mapping = { + "efconstruction": "efConstruction", + "m": "M", + "partitions": "partitions", + "bloom": "bloom_filter_size", + "bloom_size": "bloom_filter_size", + "Segment_Size": "segment_size", + "seg_size": "segment_size", + } + + allowed_keys = [ + "efConstruction", + "m", + "partitions", + "bloom_filter_size", + "segment_size", + ] # New dictionary to store the result with desired key format result_dict = {} @@ -337,19 +354,23 @@ def index_operation_dataset(self, dml_type, rowids): if index_operation_type == INDEX_OP_TYPE.NOOP: return - if ( - index_operation_type == INDEX_OP_TYPE.CREATE_INDEX - or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX - ): + if index_operation_type in [ + INDEX_OP_TYPE.CREATE_INDEX, + INDEX_OP_TYPE.REGENERATE_INDEX, + ]: + saved_vdb_indexes = [] + if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX: try: if emb_tensor is not None: vdb_indexes = emb_tensor.get_vdb_indexes() for vdb_index in vdb_indexes: + saved_vdb_indexes.append(vdb_index) emb_tensor.delete_vdb_index(vdb_index["id"]) elif txt_tensor is not None: vdb_indexes = txt_tensor.get_vdb_indexes() for vdb_index in vdb_indexes: + saved_vdb_indexes.append(vdb_index) txt_tensor.delete_vdb_index(vdb_index["id"]) except Exception as e: raise Exception( @@ -368,7 +389,21 @@ def index_operation_dataset(self, dml_type, rowids): else: emb_tensor.create_vdb_index("hnsw_1", distance=distance) elif txt_tensor is not None: - txt_tensor.create_vdb_index("inverted_index1") + if len(saved_vdb_indexes) > 0: + id = saved_vdb_indexes[0]["id"] + additional_params_dict = saved_vdb_indexes[0].index_params.get( + "additional_params", None + ) + else: + id = "inverted_index1" + additional_params_dict = self.index_params.get( + "additional_params", None + ) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params(additional_params_dict) + txt_tensor.create_vdb_index(id, additional_params=param_dict) + else: + txt_tensor.create_vdb_index(id) elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX: if emb_tensor is not None: diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index d309325b16..bb24870b42 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -1772,16 +1772,19 @@ def create_vdb_index( additional_params: Optional[Dict[str, int]] = None, ): """ - Create similarity search index for embedding tensor. + Create similarity search index for embedding tensor or inverted index for text tensor. Args: - id (str): Unique identifier for the index. Defaults to ``hnsw_1``. + id (str): Unique identifier for the index. Defaults to ``hnsw_1``. or ``inverted_index1``. distance (DistanceType, str): Distance metric to be used for similarity search. Possible values are "l2_norm", "cosine_similarity". Defaults to ``DistanceType.COSINE_SIMILARITY``. additional_params (Optional[Dict[str, int]]): Additional parameters for the index. - - Structure of additional params is: + - Structure of additional params is used for HNSW index: :"M": Increasing this value will increase the index build time and memory usage but will improve the search accuracy. Defaults to ``16``. :"efConstruction": Defaults to ``200``. :"partitions": If tensors contain more than 45M samples, it is recommended to use partitions to create the index. Defaults to ``1``. + - Structure of additional params is used for Inverted index: + :"bloom_filter_size": Size of the bloom filter. Defaults to ``100000``. + :"segment_size": Size of the segment in MB. Defaults to ``25``. Example: >>> ds = deeplake.load("./test/my_embedding_ds") @@ -1789,12 +1792,16 @@ def create_vdb_index( >>> ds.embedding.create_vdb_index(id="hnsw_1", distance=DistanceType.COSINE_SIMILARITY) >>> # create cosine_similarity index on embedding tensor with additional params >>> ds.embedding.create_vdb_index(id="hnsw_1", distance=DistanceType.COSINE_SIMILARITY, additional_params={"M": 32, "partitions": 1, 'efConstruction': 200}) + >>> # create inverted index on text tensor + >>> ds.text.create_vdb_index(id="inverted_index1") + >>> # create inverted index on text tensor with additional params + >>> ds.text.create_vdb_index(id="inverted_index1", additional_params={"bloom_filter_size": 1000000, "segment_size": 50}) Notes: - Index creation is supported only for embedding tensors. + Index creation is supported only for embedding tensors and text tensors. Raises: - Exception: If the tensor is not an embedding tensor. + Exception: If the tensor is not an embedding tensor or text tensor. Returns: Index: Returns the index object. @@ -1812,7 +1819,12 @@ def create_vdb_index( from indra import api # type: ignore if self.meta.htype == "text": - self.meta.add_vdb_index(id=id, type="inverted_index", distance=None) + self.meta.add_vdb_index( + id=id, + type="inverted_index", + distance=None, + additional_params=additional_params, + ) try: if additional_params is None: index = api.vdb.generate_index(ts, index_type="inverted_index") diff --git a/deeplake/core/tests/test_inverted_indexes.py b/deeplake/core/tests/test_inverted_indexes.py new file mode 100644 index 0000000000..d539597cd8 --- /dev/null +++ b/deeplake/core/tests/test_inverted_indexes.py @@ -0,0 +1,484 @@ +import deeplake +import numpy as np + +from deeplake.core.distance_type import DistanceType +from deeplake.tests.common import requires_libdeeplake +from deeplake.tests.dataset_fixtures import local_auth_ds_generator +from deeplake.util.exceptions import ReadOnlyModeError, EmbeddingTensorPopError +import pytest +import warnings + +statements = [ + "The apple fell from the tree and rolled into the river.", + "In the jungle, the giraffe munched on the leaves of a tall tree.", + "A rainbow appeared in the sky after the rain stopped.", + "The computer screen flickered as the storm intensified outside.", + "She found a book about coding with Python on the dusty shelf.", + "As the sun set, the mountain peaks were bathed in orange light.", + "The cat jumped onto the window sill to watch the birds outside.", + "He poured himself a cup of coffee and stared out at the ocean.", + "The children played under the table, laughing and giggling.", + "With a splash, the dog jumped into the river to fetch the stick.", +] + +new_statements = [ + "The quick brown fox jumps over the lazy dog.", + "Python is a widely used high-level programming language.", + "The sun shines bright over the tall mountains.", + "A cup of tea is a good companion while coding.", + "The river flowed swiftly under the wooden bridge.", + "In autumn, the leaves fall gently to the ground.", + "The stars twinkled brightly in the night sky.", + "She prefers coffee over tea during the early hours.", + "The code compiled successfully without any errors.", + "Birds fly south for the winter every year.", +] + +statements1 = [ + "The apple fell from the tree and rolled into the river.", + "In the jungle, the giraffe munched on the leaves of a tall tree.", +] + + +@requires_libdeeplake +def test_inv_index_(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements: + ds.text.append(statement) + + ds.summary() + + # create inverted index. + ds.text.create_vdb_index("inv_1") + ts = ds.text.get_vdb_indexes() + assert len(ts) == 1 + assert ts[0]["id"] == "inv_1" + + # drop the inverted index. + ds.text.delete_vdb_index("inv_1") + ts = ds.text.get_vdb_indexes() + assert len(ts) == 0 + + +@requires_libdeeplake +def test_inv_index_query(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query the inverted index this should fail as equalities are not supported. + res = ds.query(f"select * where text == 'apple'") + assert len(res) == 0 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'flickered')") + assert len(res) == 1 + assert res.index[0].values[0].value == 3 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'mountain')") + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'mountain')") + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'jumped')") + assert len(res) == 2 + assert res.index[1].values[0].value == 9 + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_query_with_hnsw(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + for statement in statements: + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append({"text": statement, "embedding": random_embedding}) + + print(ds.text[2].numpy()) + # create inverted index. + ds.text.create_vdb_index("inv_1") + ds.embedding.create_vdb_index("hnsw_1") + + # query the inverted index along with hnsw index. + v2 = ds.embedding[0].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'apple') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + # query the inverted index along with hnsw index. + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'mountain') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # query the inverted index along with hnsw index. + v2 = ds.embedding[9].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'jumped') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 9 + + ds.text.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_multiple_where_or_and(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements + new_statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query with multiple WHERE clauses using OR and AND + res = ds.query( + f"select * where CONTAINS(text, 'mountains.') and CONTAINS(text, 'bright')" + ) + print(res) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 12 + ) # "The sun shines bright over the tall mountains." + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_multiple_keywords(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements + new_statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query with multiple keywords in WHERE clause + res = ds.query( + f"select * where CONTAINS(text, 'sun') and CONTAINS(text, 'bright')" + ) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 12 + ) # "The sun shines bright over the tall mountains." + + res = ds.query( + f"select * where CONTAINS(text, 'quick') and CONTAINS(text, 'fox')" + ) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 10 + ) # "The quick brown fox jumps over the lazy dog." + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_case_insensitivity(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements + new_statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query with case insensitivity + res = ds.query( + f"select * where CONTAINS(text, 'SUN')" + ) # Case insensitive match + assert len(res) == 2 + assert ( + res.index[0].values[0].value == 5 + ) # "The sun shines bright over the tall mountains." + + res = ds.query( + f"select * where CONTAINS(text, 'PYTHON')" + ) # Case insensitive match + assert len(res) == 2 + assert ( + res.index[0].values[0].value == 4 + ) # "Python is a widely used high-level programming language." + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_multiple_where_clauses_and_filters(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("text1", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + ds.create_tensor("year", htype="text") # Changed to text + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "text1": " ".join( + statement.split()[-3:] + ), # last 3 words as a separate column + "embedding": random_embedding, + "year": str( + 2015 + (i % 7) + ), # cycles between 2015 and 2021 as strings + } + ) + + # Create inverted index on text and year + ds.text.create_vdb_index("inv_1") + ds.year.create_vdb_index("inv_year") + + # Test 1: Multiple WHERE clauses with OR and AND filters + res = ds.query( + f"select * where CONTAINS(text, 'river.') and CONTAINS(year, '2015')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 # 'apple' is in the first statement + + res = ds.query( + f"select * where CONTAINS(text, 'rainbow') or CONTAINS(text1, 'rain') and CONTAINS(year, '2017')" + ) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 2 + ) # 'rainbow' matches in the third statement + + # Test 2: Multiple keywords in WHERE clause + res = ds.query( + f"select * where CONTAINS( text, 'apple') and CONTAINS( text, 'tree')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + res = ds.query( + f"select * where CONTAINS( text, 'apple') or CONTAINS(text, 'river.')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + res = ds.query( + f"select * where CONTAINS(text, 'coding') and CONTAINS(year, '2019')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 4 + + # Test 3: Case insensitivity + res = ds.query(f"select * where CONTAINS(text, 'Apple')") + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + res = ds.query(f"select * where CONTAINS(text, 'Tree')") + assert len(res) == 1 + + ds.text.unload_vdb_index_cache() + ds.year.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_hnsw_order_by_clause(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + for statement in statements: + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append({"text": statement, "embedding": random_embedding}) + + # Create inverted index and HNSW index + ds.text.create_vdb_index("inv_1") + ds.embedding.create_vdb_index("hnsw_1") + + # Test 4: ORDER BY clause with HNSW + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'sun') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # failure case. + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where text == 'sun' order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 0 + + ds.text.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_where_condition_on_column_without_inverted_index(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("text1", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "text1": " ".join( + statement.split()[-3:] + ), # last 3 words as a separate column + "embedding": random_embedding, + } + ) + + # Create inverted index on text only + ds.text.create_vdb_index("inv_1") + ds.embedding.create_vdb_index("hnsw_1") + + # res = ds.query(f"select * where CONTAINS(text, 'sun') and CONTAINS(text, 'bright')") + + # Test 5: WHERE condition on a column without an inverted index + v2 = ds.embedding[0].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'fell') or CONTAINS(text, 'river.') or CONTAINS(text, 'rolled') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + ds.text.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_multiple_where_clauses_and_filters_with_year_text(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("text1", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + ds.create_tensor("year", htype="text") # Changed to text type + + years = ["2015", "2016", "2017", "2018", "2019", "2020", "2021"] + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "text1": " ".join( + statement.split()[-5:] + ), # last 3 words as a separate column + "embedding": random_embedding, + "year": years[i % len(years)], # cycles between 2015 and 2021 + } + ) + + # Create inverted index on text and year + ds.text.create_vdb_index("inv_1") + ds.year.create_vdb_index("inv_year") + + # Test 1: Multiple WHERE clauses with OR and AND filters + res = ds.query( + f"select * where CONTAINS(text, 'apple') or CONTAINS(text1, 'river.') and CONTAINS(year, '2016')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 # 'apple' is in the first statement + + res = ds.query( + f"select * where CONTAINS(text, 'rainbow') or CONTAINS(text1, 'dusty')" + ) + assert len(res) == 2 + assert res.index[0].values[0].value == 2 + assert res.index[1].values[0].value == 4 + + # Test 2: Multiple keywords in WHERE clause + res = ds.query( + f"select * where CONTAINS(text, 'apple') or CONTAINS(text1, 'river')" + ) + assert len(res) == 2 + assert res.index[0].values[0].value == 0 + assert res.index[1].values[0].value == 9 + + res = ds.query( + f"select * where CONTAINS(text, 'coding') or CONTAINS(year, '2020') or CONTAINS(year, '2021') or CONTAINS(year, '2019')" + ) + assert len(res) == 3 + + res = ds.query( + f"select * where CONTAINS(text, 'coding') or CONTAINS(year, '2020') or CONTAINS(year, '2021') or CONTAINS(year, '2018')" + ) + assert len(res) == 4 + + ds.text.unload_vdb_index_cache() + ds.year.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inverted_index_on_year_column_with_text(tmp_datasets_dir): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("year", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + years = ["2015", "2016", "2017", "2018", "2019", "2020", "2021"] + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "year": years[ + i % len(years) + ], # cycles between 2015 and 2021 as strings + "embedding": random_embedding, + } + ) + + # Create inverted index on year only + ds.year.create_vdb_index("inv_year") + ds.embedding.create_vdb_index("hnsw_1") + + # Test: Multiple OR conditions on year and embedding column + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(year, '2019') or CONTAINS(year, '2020') or CONTAINS(year, '2021') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + ds.year.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() From cbfcef32b74c70fa25a1f7466c7279a087f5c3f4 Mon Sep 17 00:00:00 2001 From: activesoull Date: Fri, 23 Aug 2024 20:42:50 +0400 Subject: [PATCH 05/17] bump libdeeplake version to 0.0.139 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 10edc475b7..796f19a37d 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def libdeeplake_available(): extras_require["all"] = [req_map[r] for r in all_extras] if libdeeplake_available(): - libdeeplake = "libdeeplake==0.0.138" + libdeeplake = "libdeeplake==0.0.139" extras_require["enterprise"] = [libdeeplake, "pyjwt"] extras_require["all"].append(libdeeplake) install_requires.append(libdeeplake) From 7b013da9f5267dccb5c331fb5b5a6ee95ce7a493 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Sat, 24 Aug 2024 00:05:41 +0530 Subject: [PATCH 06/17] Inverted Index Phase 5. Rectify Test Case. --- deeplake/core/index_maintenance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index e5440457d3..130d3cbf71 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -170,7 +170,8 @@ def check_index_params_text(self): def index_operation_type_dataset(self, num_rows, changed_data_len): - if validate_text_tensor(self): + txt_tensor = fetch_text_tensor(self) + if txt_tensor is not None: return INDEX_OP_TYPE.REGENERATE_INDEX if not index_exists(self): From ee9e1441c23768ca8879cb71f74673e2e7f9b950 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Sat, 24 Aug 2024 10:16:38 +0530 Subject: [PATCH 07/17] Inverted Index Phase 6. Reformat Code --- deeplake/core/index_maintenance.py | 305 ++++++++----------- deeplake/core/tensor.py | 20 +- deeplake/core/tests/test_inverted_indexes.py | 116 ++++++- 3 files changed, 239 insertions(+), 202 deletions(-) diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index 130d3cbf71..bf4bfe9419 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -17,22 +17,10 @@ class INDEX_OP_TYPE(Enum): INCREMENTAL_INDEX = 3 -def is_embedding_tensor(tensor): - """Check if a tensor is an embedding tensor.""" - - valid_names = ["embedding", "embeddings"] - - return ( - tensor.htype == "embedding" - or tensor.meta.name in valid_names - or tensor.key in valid_names - ) - - def validate_embedding_tensor(tensor): """Check if a tensor is an embedding tensor.""" - valid_names = ["embedding"] + valid_names = ["embedding", "embeddings"] return ( tensor.htype == "embedding" @@ -43,14 +31,8 @@ def validate_embedding_tensor(tensor): def validate_text_tensor(tensor): """Check if a tensor is an embedding tensor.""" - - valid_names = ["text", "id", "metadata"] - - return ( - tensor.htype == "str" - or tensor.meta.name in valid_names - or tensor.key in valid_names - ) + p = tensor.htype + return p == "text" def fetch_embedding_tensor(dataset): @@ -61,38 +43,34 @@ def fetch_embedding_tensor(dataset): return None -def fetch_text_tensor(dataset): - tensors = dataset.tensors - for _, tensor in tensors.items(): - if validate_text_tensor(tensor): - return tensor - return None - - -def index_exists(dataset): +def index_exists_emb(emb_tensor): """Check if the Index already exists.""" - emb_tensor = fetch_embedding_tensor(dataset) - txt_tensor = fetch_text_tensor(dataset) - if emb_tensor is not None: + emb = validate_embedding_tensor(emb_tensor) + if emb: vdb_indexes = emb_tensor.fetch_vdb_indexes() if len(vdb_indexes) == 0: return False else: return True - elif txt_tensor is not None: + return False + + +def index_exists_txt(txt_tensor): + """Check if the Index already exists.""" + txt = validate_text_tensor(txt_tensor) + if txt: vdb_indexes = txt_tensor.fetch_vdb_indexes() if len(vdb_indexes) == 0: return False else: return True - return False -def index_partition_count(dataset): - emb_tensor = fetch_embedding_tensor(dataset) - if emb_tensor is not None: - vdb_indexes = emb_tensor.fetch_vdb_indexes() +def index_partition_count(tensor): + is_emb_tensor = validate_embedding_tensor(tensor) + if is_emb_tensor: + vdb_indexes = tensor.fetch_vdb_indexes() if len(vdb_indexes) == 0: return 1 else: @@ -160,32 +138,28 @@ def check_index_params(self): return False -def check_index_params_text(self): - txt_tensor = fetch_text_tensor(self.dataset) - indexes = txt_tensor.get_vdb_indexes() - if len(indexes) == 0: - return False +def index_operation_type_dataset( + tensor, dataset, num_rows, changed_data_len, is_embedding=False +): + if is_embedding: + vdb_index_exists = index_exists_emb(tensor) + if not vdb_index_exists: + if dataset.index_params is None: + return INDEX_OP_TYPE.NOOP + if changed_data_len == 0: + return INDEX_OP_TYPE.NOOP + threshold = dataset.index_params.get("threshold", -1) + below_threshold = threshold <= 0 or num_rows < threshold + if not below_threshold: + return INDEX_OP_TYPE.CREATE_INDEX + + return INDEX_OP_TYPE.INCREMENTAL_INDEX else: - return True - - -def index_operation_type_dataset(self, num_rows, changed_data_len): - txt_tensor = fetch_text_tensor(self) - if txt_tensor is not None: - return INDEX_OP_TYPE.REGENERATE_INDEX - - if not index_exists(self): - if self.index_params is None: + # for Text tensor i.e. inverted index, + vdb_index_exists = index_exists_txt(tensor) + if not vdb_index_exists or changed_data_len == 0: return INDEX_OP_TYPE.NOOP - threshold = self.index_params.get("threshold", -1) - below_threshold = threshold <= 0 or num_rows < threshold - if not below_threshold: - return INDEX_OP_TYPE.CREATE_INDEX - - if not check_vdb_indexes(self) or changed_data_len == 0: - return INDEX_OP_TYPE.NOOP - - return INDEX_OP_TYPE.INCREMENTAL_INDEX + return INDEX_OP_TYPE.REGENERATE_INDEX def get_index_metric(metric): @@ -241,20 +215,16 @@ def normalize_additional_params(params: dict) -> dict: return result_dict -def check_vdb_indexes(dataset): - tensors = dataset.tensors - - vdb_index_present = False - for _, tensor in tensors.items(): - is_embedding = is_embedding_tensor(tensor) - has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes") - try: - vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0 - except AttributeError: - vdb_index_ids_present = False +def check_embedding_vdb_indexes(tensor): + is_embedding = validate_embedding_tensor(tensor) + has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes") + try: + vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0 + except AttributeError: + vdb_index_ids_present = False - if is_embedding and has_vdb_indexes and vdb_index_ids_present: - return True + if is_embedding and has_vdb_indexes and vdb_index_ids_present: + return True return False @@ -282,140 +252,111 @@ def _incr_maintenance_vdb_indexes( raise Exception(f"An error occurred while regenerating VDB indexes: {e}") -# Routine to identify the index Operation. def index_operation_vectorstore(self): if not index_used(self.exec_option): return None emb_tensor = fetch_embedding_tensor(self.dataset) - txt_tensor = fetch_text_tensor(self.dataset) - - index_ext = index_exists(self.dataset) - if index_ext and check_index_params(self): + if index_exists_emb(self.dataset) and check_index_params(self): return emb_tensor.get_vdb_indexes()[0]["distance"] - elif index_ext and check_index_params_text(self): - return txt_tensor.get_vdb_indexes()[0]["distance"] threshold = self.index_params.get("threshold", -1) below_threshold = threshold < 0 or len(self.dataset) < threshold if below_threshold: return None - if emb_tensor is not None and not check_index_params(self): + if not check_index_params(self): try: vdb_indexes = emb_tensor.get_vdb_indexes() for vdb_index in vdb_indexes: emb_tensor.delete_vdb_index(vdb_index["id"]) except Exception as e: raise Exception(f"An error occurred while removing VDB indexes: {e}") - - distance_str = self.index_params.get("distance_metric", "COS") - additional_params_dict = self.index_params.get("additional_params", None) - distance = get_index_metric(distance_str.upper()) - if additional_params_dict and len(additional_params_dict) > 0: - param_dict = normalize_additional_params(additional_params_dict) - emb_tensor.create_vdb_index( - "hnsw_1", distance=distance, additional_params=param_dict - ) - else: - emb_tensor.create_vdb_index("hnsw_1", distance=distance) - return distance - - if txt_tensor is not None and not check_index_params_text(self): - try: - vdb_indexes = txt_tensor.get_vdb_indexes() - for vdb_index in vdb_indexes: - txt_tensor.delete_vdb_index(vdb_index["id"]) - except Exception as e: - raise Exception(f"An error occurred while removing VDB indexes: {e}") - - txt_tensor.create_vdb_index("inverted_index1") - return None + distance_str = self.index_params.get("distance_metric", "COS") + additional_params_dict = self.index_params.get("additional_params", None) + distance = get_index_metric(distance_str.upper()) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params(additional_params_dict) + emb_tensor.create_vdb_index( + "hnsw_1", distance=distance, additional_params=param_dict + ) + else: + emb_tensor.create_vdb_index("hnsw_1", distance=distance) + return distance def index_operation_dataset(self, dml_type, rowids): - emb_tensor = fetch_embedding_tensor(self) - txt_tensor = fetch_text_tensor(self) - if emb_tensor is None and txt_tensor is None: - return - - if emb_tensor is not None: - index_operation_type = index_operation_type_dataset( - self, - emb_tensor.chunk_engine.num_samples, - len(rowids), - ) - elif txt_tensor is not None: - index_operation_type = index_operation_type_dataset( - self, - txt_tensor.chunk_engine.num_samples, - len(rowids), - ) - - if index_operation_type == INDEX_OP_TYPE.NOOP: - return - if index_operation_type in [ - INDEX_OP_TYPE.CREATE_INDEX, - INDEX_OP_TYPE.REGENERATE_INDEX, - ]: - saved_vdb_indexes = [] - - if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX: - try: - if emb_tensor is not None: - vdb_indexes = emb_tensor.get_vdb_indexes() - for vdb_index in vdb_indexes: - saved_vdb_indexes.append(vdb_index) - emb_tensor.delete_vdb_index(vdb_index["id"]) - elif txt_tensor is not None: - vdb_indexes = txt_tensor.get_vdb_indexes() + tensors = self.tensors + for _, tensor in tensors.items(): + is_embedding_tensor = validate_embedding_tensor(tensor) + is_text_tensor = validate_text_tensor(tensor) + index_operation_type = INDEX_OP_TYPE.NOOP + + if is_embedding_tensor or is_text_tensor: + index_operation_type = index_operation_type_dataset( + tensor, + self, + tensor.chunk_engine.num_samples, + len(rowids), + is_embedding_tensor, + ) + else: + continue + + if index_operation_type == INDEX_OP_TYPE.NOOP: + continue + if ( + index_operation_type == INDEX_OP_TYPE.CREATE_INDEX + or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX + ): + saved_vdb_indexes = [] + if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX: + try: + vdb_indexes = tensor.get_vdb_indexes() for vdb_index in vdb_indexes: saved_vdb_indexes.append(vdb_index) - txt_tensor.delete_vdb_index(vdb_index["id"]) - except Exception as e: - raise Exception( - f"An error occurred while regenerating VDB indexes: {e}" - ) - - if emb_tensor is not None: - distance_str = self.index_params.get("distance_metric", "COS") - additional_params_dict = self.index_params.get("additional_params", None) - distance = get_index_metric(distance_str.upper()) - if additional_params_dict and len(additional_params_dict) > 0: - param_dict = normalize_additional_params(additional_params_dict) - emb_tensor.create_vdb_index( - "hnsw_1", distance=distance, additional_params=param_dict - ) - else: - emb_tensor.create_vdb_index("hnsw_1", distance=distance) - elif txt_tensor is not None: - if len(saved_vdb_indexes) > 0: - id = saved_vdb_indexes[0]["id"] - additional_params_dict = saved_vdb_indexes[0].index_params.get( - "additional_params", None - ) - else: - id = "inverted_index1" + tensor.delete_vdb_index(vdb_index["id"]) + except Exception as e: + raise Exception( + f"An error occurred while regenerating VDB indexes: {e}" + ) + if is_embedding_tensor: + distance_str = self.index_params.get("distance_metric", "COS") additional_params_dict = self.index_params.get( "additional_params", None ) - if additional_params_dict and len(additional_params_dict) > 0: - param_dict = normalize_additional_params(additional_params_dict) - txt_tensor.create_vdb_index(id, additional_params=param_dict) - else: - txt_tensor.create_vdb_index(id) - - elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX: - if emb_tensor is not None: - partition_count = index_partition_count(self) + distance = get_index_metric(distance_str.upper()) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params(additional_params_dict) + tensor.create_vdb_index( + "hnsw_1", distance=distance, additional_params=param_dict + ) + else: + tensor.create_vdb_index("hnsw_1", distance=distance) + elif is_text_tensor: + if len(saved_vdb_indexes) > 0: + for vdb_index in saved_vdb_indexes: + id = vdb_index["id"] + additional_params_dict = vdb_index.get( + "additional_params", None + ) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params( + additional_params_dict + ) + tensor.create_vdb_index(id, additional_params=param_dict) + else: + tensor.create_vdb_index(id) + continue + elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX: + partition_count = index_partition_count(tensor) if partition_count > 1: _incr_maintenance_vdb_indexes( - emb_tensor, rowids, dml_type, is_partitioned=True + tensor, rowids, dml_type, is_partitioned=True ) else: - _incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type) - elif txt_tensor is not None: - _incr_maintenance_vdb_indexes(txt_tensor, rowids, dml_type) - else: - raise Exception("Unknown index operation") + _incr_maintenance_vdb_indexes(tensor, rowids, dml_type) + continue + else: + raise Exception("Unknown index operation") diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index bb24870b42..a054166d68 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -363,7 +363,9 @@ def extend( TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype. """ self._extend(samples, progressbar=progressbar, ignore_errors=ignore_errors) - if index_maintenance.validate_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): row_ids = list(range(self.num_samples - len(samples), self.num_samples)) index_maintenance.index_operation_dataset( # TODO: this might pick the wrong tensor when we support self.dataset, # index for multiple tensors in the future @@ -455,7 +457,9 @@ def append(self, sample: InputSample): """ row_ids = [self.num_samples] self._extend([sample], progressbar=False) - if index_maintenance.validate_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): index_maintenance.index_operation_dataset( # TODO: this might pick the wrong tensor when we support self.dataset, # index for multiple tensors in the future dml_type=_INDEX_OPERATION_MAPPING["ADD"], @@ -807,7 +811,9 @@ def __setitem__(self, item: Union[int, slice], value: Any): (1, 3, 3) """ self._update(item, value) - if index_maintenance.is_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): row_index = self.index[Index(item)] row_ids = list(row_index.values[0].indices(self.num_samples)) index_maintenance.index_operation_dataset( @@ -1216,7 +1222,9 @@ def pop(self, index: Optional[Union[int, List[int]]] = None): index = sorted(index, reverse=True) self._pop(index) - if index_maintenance.is_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): row_ids = index[:] index_maintenance.index_operation_dataset( self.dataset, @@ -1947,8 +1955,6 @@ def delete_vdb_index(self, id: str): ] metadata = json.loads(metadata_file.decode("utf-8")) segment_names = list(metadata.keys()) - # print("Parsed metadata type:", type(metadata)) - # print("Parsed metadata content:", metadata) for name in segment_names: partition_key = get_tensor_vdb_index_key( self.key, self.version_state["commit_id"], f"{id}_inv_{name}" @@ -2029,7 +2035,7 @@ def get_vdb_indexes(self) -> List[Dict[str, str]]: def fetch_vdb_indexes(self) -> List[Dict[str, str]]: vdb_indexes = [] - if self.meta.htype == "embedding": + if self.meta.htype == "embedding" or self.meta.htype == "text": if (not self.meta.vdb_indexes is None) and len(self.meta.vdb_indexes) > 0: vdb_indexes.extend(self.meta.vdb_indexes) return vdb_indexes diff --git a/deeplake/core/tests/test_inverted_indexes.py b/deeplake/core/tests/test_inverted_indexes.py index d539597cd8..69de99863a 100644 --- a/deeplake/core/tests/test_inverted_indexes.py +++ b/deeplake/core/tests/test_inverted_indexes.py @@ -35,13 +35,13 @@ ] statements1 = [ - "The apple fell from the tree and rolled into the river.", - "In the jungle, the giraffe munched on the leaves of a tall tree.", + "The text is about the apple falling from the tree.", + "Finally searched the jungle for the giraffe.", ] @requires_libdeeplake -def test_inv_index_(tmp_datasets_dir): +def test_inv_index_(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -63,7 +63,7 @@ def test_inv_index_(tmp_datasets_dir): @requires_libdeeplake -def test_inv_index_query(tmp_datasets_dir): +def test_inv_index_query(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -101,7 +101,7 @@ def test_inv_index_query(tmp_datasets_dir): @requires_libdeeplake -def test_inv_index_query_with_hnsw(tmp_datasets_dir): +def test_inv_index_query_with_hnsw(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -147,7 +147,7 @@ def test_inv_index_query_with_hnsw(tmp_datasets_dir): @requires_libdeeplake -def test_inv_index_multiple_where_or_and(tmp_datasets_dir): +def test_inv_index_multiple_where_or_and(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -171,7 +171,7 @@ def test_inv_index_multiple_where_or_and(tmp_datasets_dir): @requires_libdeeplake -def test_inv_index_multiple_keywords(tmp_datasets_dir): +def test_inv_index_multiple_keywords(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -202,7 +202,7 @@ def test_inv_index_multiple_keywords(tmp_datasets_dir): @requires_libdeeplake -def test_inv_index_case_insensitivity(tmp_datasets_dir): +def test_inv_index_case_insensitivity(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -233,7 +233,7 @@ def test_inv_index_case_insensitivity(tmp_datasets_dir): @requires_libdeeplake -def test_multiple_where_clauses_and_filters(tmp_datasets_dir): +def test_multiple_where_clauses_and_filters(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -307,7 +307,7 @@ def test_multiple_where_clauses_and_filters(tmp_datasets_dir): @requires_libdeeplake -def test_hnsw_order_by_clause(tmp_datasets_dir): +def test_hnsw_order_by_clause(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -343,7 +343,7 @@ def test_hnsw_order_by_clause(tmp_datasets_dir): @requires_libdeeplake -def test_where_condition_on_column_without_inverted_index(tmp_datasets_dir): +def test_where_condition_on_column_without_inverted_index(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -382,7 +382,7 @@ def test_where_condition_on_column_without_inverted_index(tmp_datasets_dir): @requires_libdeeplake -def test_multiple_where_clauses_and_filters_with_year_text(tmp_datasets_dir): +def test_multiple_where_clauses_and_filters_with_year_text(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -446,7 +446,7 @@ def test_multiple_where_clauses_and_filters_with_year_text(tmp_datasets_dir): @requires_libdeeplake -def test_inverted_index_on_year_column_with_text(tmp_datasets_dir): +def test_inverted_index_on_year_column_with_text(local_auth_ds_generator): ds = local_auth_ds_generator() with ds: ds.create_tensor("text", htype="text") @@ -482,3 +482,93 @@ def test_inverted_index_on_year_column_with_text(tmp_datasets_dir): ds.year.unload_vdb_index_cache() ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inverted_index_regeneration(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'flickered')") + assert len(res) == 1 + assert res.index[0].values[0].value == 3 + + for statement in statements1: + ds.text.append(statement) + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'flickered')") + assert len(res) == 1 + assert res.index[0].values[0].value == 3 + + res = ds.query(f"select * where CONTAINS(text, 'searched')") + assert len(res) == 1 + assert res.index[0].values[0].value == 11 + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inverted_index_multiple_tensor_maintenance(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("year", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + years = ["2015", "2016", "2017", "2018", "2019", "2020", "2021"] + years1 = ["2022", "2023"] + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "year": years[ + i % len(years) + ], # cycles between 2015 and 2021 as strings + "embedding": random_embedding, + } + ) + + # Create inverted index on year only + ds.year.create_vdb_index("inv_year") + ds.embedding.create_vdb_index("hnsw_1") + + # Test: Multiple OR conditions on year and embedding column + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(year, '2019') or CONTAINS(year, '2020') or CONTAINS(year, '2021') order by cosine_similarity(embedding, ARRAY[{s2}]) DESC limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + for i, statement in enumerate(statements1): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "year": years1[i % len(years1)], + "embedding": random_embedding, + } + ) + v2 = ds.embedding[11].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + + res = ds.query( + f"select * where CONTAINS(year, '2023') order by cosine_similarity(embedding, ARRAY[{s2}]) DESC limit 1" + ) + + assert len(res) == 1 + assert res.index[0].values[0].value == 11 + + ds.year.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() From 14c58792edf3348c62136a8e681b1cb53a812728 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Sat, 24 Aug 2024 15:31:37 +0530 Subject: [PATCH 08/17] Inverted Index Phase 7. Adjust Review comments. --- deeplake/core/index_maintenance.py | 18 +++++++----------- deeplake/core/tensor.py | 24 ++++++++++-------------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index bf4bfe9419..9efeafa508 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -19,20 +19,12 @@ class INDEX_OP_TYPE(Enum): def validate_embedding_tensor(tensor): """Check if a tensor is an embedding tensor.""" - - valid_names = ["embedding", "embeddings"] - - return ( - tensor.htype == "embedding" - or tensor.meta.name in valid_names - or tensor.key in valid_names - ) + return tensor.htype == "embedding" def validate_text_tensor(tensor): """Check if a tensor is an embedding tensor.""" - p = tensor.htype - return p == "text" + return tensor.htype == "text" def fetch_embedding_tensor(dataset): @@ -235,7 +227,11 @@ def _incr_maintenance_vdb_indexes( is_embedding = tensor.htype == "embedding" has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes") - is_text = tensor.htype == "str" + is_text = tensor.htype == "text" + if is_text: + raise Exception( + "Inverted index does not support incremental index maintenance." + ) try: vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0 except AttributeError: diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index a054166d68..2d3c61cd43 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -1571,18 +1571,13 @@ def deserialize_partitions(self, serialized_data, incremental_dml=False): stream = BytesIO(serialized_data) - # Read number of partitions - num_partitions = int.from_bytes( - stream.read(8), "little" - ) # Assuming size_t is 8 bytes + num_partitions = int.from_bytes(stream.read(8), "little") partition_info = [] for _ in range(num_partitions): - # Read partition name length and name name_length = int.from_bytes(stream.read(8), "little") name = stream.read(name_length).decode("utf-8") - # Read start and end indices start = int.from_bytes(stream.read(8), "little") end = int.from_bytes(stream.read(8), "little") @@ -1590,7 +1585,6 @@ def deserialize_partitions(self, serialized_data, incremental_dml=False): incr_info = [] if incremental_dml == True: - # Check for incremental update info incr_info_size = int.from_bytes(stream.read(8), "little") for _ in range(incr_info_size): name_length = int.from_bytes(stream.read(8), "little") @@ -1601,7 +1595,6 @@ def deserialize_partitions(self, serialized_data, incremental_dml=False): incr_info.append({"name": name, "start": start, "end": end}) - # Extract the actual data for each partition partitions_data = [] while True: size_data = stream.read(8) @@ -1645,6 +1638,8 @@ def update_vdb_index( is_partitioned: bool = False, ): self.storage.check_readonly() + if self.meta.htype != "embedding": + raise Exception(f"Only supported for embedding tensors.") self.invalidate_libdeeplake_dataset() self.dataset.flush() from deeplake.enterprise.convert_to_libdeeplake import ( @@ -1815,6 +1810,7 @@ def create_vdb_index( Index: Returns the index object. """ self.storage.check_readonly() + self.check_supported_tensor() if not self.dataset.libdeeplake_dataset is None: ds = self.dataset.libdeeplake_dataset else: @@ -1853,22 +1849,16 @@ def create_vdb_index( self.storage[inverted_meta_key] = metadata_bytes temp_serialized_paths_count = len(temp_serialized_paths) temp_serialized_paths = [str(path) for path in temp_serialized_paths] - # Pull the file the location specified in the path and store it in the storage for i, path in enumerate(temp_serialized_paths): - # extract the file name from the path which should after last "/" file_name = pathlib.Path(path).name - - # read file and store it in the storage with open(path, "rb") as f: inv_key = get_tensor_vdb_index_key( self.key, commit_id, f"{id}_{file_name}" ) self.storage[inv_key] = f.read() - # close the file f.close() self.invalidate_libdeeplake_dataset() - # self.storage.flush() except: self.meta.remove_vdb_index(id=id) raise @@ -1922,6 +1912,8 @@ def create_vdb_index( def delete_vdb_index(self, id: str): self.storage.check_readonly() + self.check_supported_tensor() + commit_id = self.version_state["commit_id"] self.unload_vdb_index_cache() if self.is_partitioned_vdb_index(): @@ -2040,6 +2032,10 @@ def fetch_vdb_indexes(self) -> List[Dict[str, str]]: vdb_indexes.extend(self.meta.vdb_indexes) return vdb_indexes + def check_supported_tensor(self): + if self.meta.htype not in ["embedding", "text"]: + raise Exception(f"Only supported for embedding and text tensors.") + def _check_compatibility_with_htype(self, htype): """Checks if the tensor is compatible with the given htype. Raises an error if not compatible. From 64ad838acd6f5ed2f5a56eb5fa918f49cefbfb58 Mon Sep 17 00:00:00 2001 From: activesoull Date: Sat, 24 Aug 2024 19:08:03 +0400 Subject: [PATCH 09/17] bump libdeeplake version to 0.0.139 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 10edc475b7..796f19a37d 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def libdeeplake_available(): extras_require["all"] = [req_map[r] for r in all_extras] if libdeeplake_available(): - libdeeplake = "libdeeplake==0.0.138" + libdeeplake = "libdeeplake==0.0.139" extras_require["enterprise"] = [libdeeplake, "pyjwt"] extras_require["all"].append(libdeeplake) install_requires.append(libdeeplake) From 0093eed2e23a268166239b834c347439c4893638 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Sat, 24 Aug 2024 23:26:39 +0530 Subject: [PATCH 10/17] Inverted Index Phase 8. --- deeplake/core/index_maintenance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index 9efeafa508..fa4cfd5f46 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -254,7 +254,7 @@ def index_operation_vectorstore(self): emb_tensor = fetch_embedding_tensor(self.dataset) - if index_exists_emb(self.dataset) and check_index_params(self): + if index_exists_emb(self) and check_index_params(self): return emb_tensor.get_vdb_indexes()[0]["distance"] threshold = self.index_params.get("threshold", -1) From 964d8de04efb77be4d9ef832666533dfa8721161 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Sun, 25 Aug 2024 01:18:09 +0530 Subject: [PATCH 11/17] Inverted Index Phase 9. --- deeplake/core/index_maintenance.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index fa4cfd5f46..99553516ab 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -19,7 +19,14 @@ class INDEX_OP_TYPE(Enum): def validate_embedding_tensor(tensor): """Check if a tensor is an embedding tensor.""" - return tensor.htype == "embedding" + + valid_names = ["embedding"] + + return ( + tensor.htype == "embedding" + or tensor.meta.name in valid_names + or tensor.key in valid_names + ) def validate_text_tensor(tensor): @@ -254,7 +261,7 @@ def index_operation_vectorstore(self): emb_tensor = fetch_embedding_tensor(self.dataset) - if index_exists_emb(self) and check_index_params(self): + if index_exists_emb(emb_tensor) and check_index_params(self): return emb_tensor.get_vdb_indexes()[0]["distance"] threshold = self.index_params.get("threshold", -1) From 16331adbb4e8e94f6224562181391a38803d8657 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Sun, 25 Aug 2024 01:19:30 +0530 Subject: [PATCH 12/17] Inverted Index Phase 10. --- deeplake/core/tensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index 2d3c61cd43..90ade0cde7 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -1801,7 +1801,7 @@ def create_vdb_index( >>> ds.text.create_vdb_index(id="inverted_index1", additional_params={"bloom_filter_size": 1000000, "segment_size": 50}) Notes: - Index creation is supported only for embedding tensors and text tensors. + Index creation is supported for embedding tensors and text tensors. Raises: Exception: If the tensor is not an embedding tensor or text tensor. From 4cd3e110e4f981901590f1afca60bea28d6e6393 Mon Sep 17 00:00:00 2001 From: activesoull Date: Mon, 26 Aug 2024 17:35:33 +0400 Subject: [PATCH 13/17] bump libdeeplake version to 0.0.140 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 796f19a37d..e67769f97c 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def libdeeplake_available(): extras_require["all"] = [req_map[r] for r in all_extras] if libdeeplake_available(): - libdeeplake = "libdeeplake==0.0.139" + libdeeplake = "libdeeplake==0.0.140" extras_require["enterprise"] = [libdeeplake, "pyjwt"] extras_require["all"].append(libdeeplake) install_requires.append(libdeeplake) From b615064fe13f3112fbde30ee8718528c31d24a29 Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Tue, 27 Aug 2024 09:56:41 +0530 Subject: [PATCH 14/17] Inverted Index Phase 11. Remove vectorstore constants for inverted index as they are not used. --- deeplake/constants.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/deeplake/constants.py b/deeplake/constants.py index 604686b26f..4446f7836e 100644 --- a/deeplake/constants.py +++ b/deeplake/constants.py @@ -326,12 +326,10 @@ DEFAULT_VECTORSTORE_DISTANCE_METRIC = "COS" DEFAULT_DEEPMEMORY_DISTANCE_METRIC = "deepmemory_distance" -DEFAULT_VECTORSTORE_INVERTED_INDEX_TENSOR = "" DEFAULT_VECTORSTORE_INDEX_PARAMS = { "threshold": -1, "distance_metric": DEFAULT_VECTORSTORE_DISTANCE_METRIC, - "inverted_index_tensor": DEFAULT_VECTORSTORE_INVERTED_INDEX_TENSOR, "additional_params": { "efConstruction": 600, "M": 32, From 190295b2976a899a29edd01f747df231d0e0ff6a Mon Sep 17 00:00:00 2001 From: Sounak Chakraborty Date: Tue, 27 Aug 2024 10:48:20 +0530 Subject: [PATCH 15/17] Inverted Index Phase 12. index_operation_vectorstore should work on on embedding tensor. in case it is None should return None. --- deeplake/core/index_maintenance.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index 99553516ab..23da781bfb 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -260,6 +260,8 @@ def index_operation_vectorstore(self): return None emb_tensor = fetch_embedding_tensor(self.dataset) + if emb_tensor is None: + return None if index_exists_emb(emb_tensor) and check_index_params(self): return emb_tensor.get_vdb_indexes()[0]["distance"] From fbb5c8269c3bb705c9c15c6e1974303509278962 Mon Sep 17 00:00:00 2001 From: activesoull Date: Tue, 27 Aug 2024 18:49:52 +0400 Subject: [PATCH 16/17] bump libdeeplake version to 0.0.140 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index e67769f97c..9b30477e2c 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def libdeeplake_available(): extras_require["all"] = [req_map[r] for r in all_extras] if libdeeplake_available(): - libdeeplake = "libdeeplake==0.0.140" + libdeeplake = "libdeeplake==0.0.141" extras_require["enterprise"] = [libdeeplake, "pyjwt"] extras_require["all"].append(libdeeplake) install_requires.append(libdeeplake) From 093a45765695dafdca14df702e5f5f564f9e46bd Mon Sep 17 00:00:00 2001 From: activesoull Date: Tue, 27 Aug 2024 18:51:59 +0400 Subject: [PATCH 17/17] mark test to rerun in case if the responses is not 200 --- deeplake/api/tests/test_access_method.py | 1 + deeplake/core/vectorstore/test_deeplake_vectorstore.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/deeplake/api/tests/test_access_method.py b/deeplake/api/tests/test_access_method.py index ac05f60468..e08251499e 100644 --- a/deeplake/api/tests/test_access_method.py +++ b/deeplake/api/tests/test_access_method.py @@ -81,6 +81,7 @@ def test_access_method(s3_ds_generator): @pytest.mark.slow +@pytest.mark.flaky(reruns=3) def test_access_method_with_creds( hub_cloud_ds_generator, hub_cloud_dev_managed_creds_key ): diff --git a/deeplake/core/vectorstore/test_deeplake_vectorstore.py b/deeplake/core/vectorstore/test_deeplake_vectorstore.py index c3af4c549a..811e231614 100644 --- a/deeplake/core/vectorstore/test_deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/test_deeplake_vectorstore.py @@ -958,6 +958,7 @@ def assert_updated_vector_store( ) @pytest.mark.parametrize("init_embedding_function", [embedding_fn3, None]) @pytest.mark.slow +@pytest.mark.flaky(reruns=3) @requires_libdeeplake def test_update_embedding( ds, @@ -1344,6 +1345,7 @@ def create_and_populate_vs( @requires_libdeeplake +@pytest.mark.flaky(reruns=3) def test_update_embedding_row_ids_and_ids_specified_should_throw_exception( local_path, vector_store_hash_ids, @@ -1368,6 +1370,7 @@ def test_update_embedding_row_ids_and_ids_specified_should_throw_exception( @requires_libdeeplake +@pytest.mark.flaky(reruns=3) def test_update_embedding_row_ids_and_filter_specified_should_throw_exception( local_path, vector_store_filters, @@ -1391,6 +1394,7 @@ def test_update_embedding_row_ids_and_filter_specified_should_throw_exception( @requires_libdeeplake +@pytest.mark.flaky(reruns=3) def test_update_embedding_query_and_filter_specified_should_throw_exception( local_path, vector_store_filters,