diff --git a/deeplake/api/tests/test_access_method.py b/deeplake/api/tests/test_access_method.py index ac05f60468..e08251499e 100644 --- a/deeplake/api/tests/test_access_method.py +++ b/deeplake/api/tests/test_access_method.py @@ -81,6 +81,7 @@ def test_access_method(s3_ds_generator): @pytest.mark.slow +@pytest.mark.flaky(reruns=3) def test_access_method_with_creds( hub_cloud_ds_generator, hub_cloud_dev_managed_creds_key ): diff --git a/deeplake/core/dataset/dataset.py b/deeplake/core/dataset/dataset.py index 5126e472fe..40feca2f30 100644 --- a/deeplake/core/dataset/dataset.py +++ b/deeplake/core/dataset/dataset.py @@ -569,6 +569,7 @@ def __getitem__( enabled_tensors=self.enabled_tensors, view_base=self._view_base or self, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) elif "/" in item: splt = posixpath.split(item) @@ -619,6 +620,7 @@ def __getitem__( enabled_tensors=enabled_tensors, view_base=self._view_base or self, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) elif isinstance(item, tuple) and len(item) and isinstance(item[0], str): ret = self @@ -648,6 +650,7 @@ def __getitem__( enabled_tensors=self.enabled_tensors, view_base=self._view_base or self, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) else: raise InvalidKeyTypeError(item) @@ -2925,6 +2928,7 @@ def parent(self): path=self.path, link_creds=self.link_creds, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) self.storage.autoflush = autoflush return ds @@ -2948,6 +2952,7 @@ def root(self): link_creds=self.link_creds, view_base=self._view_base, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) self.storage.autoflush = autoflush return ds @@ -2971,6 +2976,7 @@ def no_view_dataset(self): pad_tensors=self._pad_tensors, enabled_tensors=self.enabled_tensors, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) def _create_group(self, name: str) -> "Dataset": @@ -4927,6 +4933,7 @@ def max_view(self): pad_tensors=True, enabled_tensors=self.enabled_tensors, libdeeplake_dataset=self.libdeeplake_dataset, + index_params=self.index_params, ) def random_split(self, lengths: Sequence[Union[int, float]]): diff --git a/deeplake/core/index_maintenance.py b/deeplake/core/index_maintenance.py index 58e294de09..23da781bfb 100644 --- a/deeplake/core/index_maintenance.py +++ b/deeplake/core/index_maintenance.py @@ -17,10 +17,10 @@ class INDEX_OP_TYPE(Enum): INCREMENTAL_INDEX = 3 -def is_embedding_tensor(tensor): +def validate_embedding_tensor(tensor): """Check if a tensor is an embedding tensor.""" - valid_names = ["embedding", "embeddings"] + valid_names = ["embedding"] return ( tensor.htype == "embedding" @@ -29,16 +29,9 @@ def is_embedding_tensor(tensor): ) -def validate_embedding_tensor(tensor): +def validate_text_tensor(tensor): """Check if a tensor is an embedding tensor.""" - - valid_names = ["embedding"] - - return ( - tensor.htype == "embedding" - or tensor.meta.name in valid_names - or tensor.key in valid_names - ) + return tensor.htype == "text" def fetch_embedding_tensor(dataset): @@ -49,23 +42,34 @@ def fetch_embedding_tensor(dataset): return None -def index_exists(dataset): +def index_exists_emb(emb_tensor): """Check if the Index already exists.""" - emb_tensor = fetch_embedding_tensor(dataset) - if emb_tensor is not None: + emb = validate_embedding_tensor(emb_tensor) + if emb: vdb_indexes = emb_tensor.fetch_vdb_indexes() if len(vdb_indexes) == 0: return False else: return True - else: - return False + return False -def index_partition_count(dataset): - emb_tensor = fetch_embedding_tensor(dataset) - if emb_tensor is not None: - vdb_indexes = emb_tensor.fetch_vdb_indexes() +def index_exists_txt(txt_tensor): + """Check if the Index already exists.""" + txt = validate_text_tensor(txt_tensor) + if txt: + vdb_indexes = txt_tensor.fetch_vdb_indexes() + if len(vdb_indexes) == 0: + return False + else: + return True + return False + + +def index_partition_count(tensor): + is_emb_tensor = validate_embedding_tensor(tensor) + if is_emb_tensor: + vdb_indexes = tensor.fetch_vdb_indexes() if len(vdb_indexes) == 0: return 1 else: @@ -133,19 +137,28 @@ def check_index_params(self): return False -def index_operation_type_dataset(self, num_rows, changed_data_len): - if not index_exists(self): - if self.index_params is None: +def index_operation_type_dataset( + tensor, dataset, num_rows, changed_data_len, is_embedding=False +): + if is_embedding: + vdb_index_exists = index_exists_emb(tensor) + if not vdb_index_exists: + if dataset.index_params is None: + return INDEX_OP_TYPE.NOOP + if changed_data_len == 0: + return INDEX_OP_TYPE.NOOP + threshold = dataset.index_params.get("threshold", -1) + below_threshold = threshold <= 0 or num_rows < threshold + if not below_threshold: + return INDEX_OP_TYPE.CREATE_INDEX + + return INDEX_OP_TYPE.INCREMENTAL_INDEX + else: + # for Text tensor i.e. inverted index, + vdb_index_exists = index_exists_txt(tensor) + if not vdb_index_exists or changed_data_len == 0: return INDEX_OP_TYPE.NOOP - threshold = self.index_params.get("threshold", -1) - below_threshold = threshold <= 0 or num_rows < threshold - if not below_threshold: - return INDEX_OP_TYPE.CREATE_INDEX - - if not check_vdb_indexes(self) or changed_data_len == 0: - return INDEX_OP_TYPE.NOOP - - return INDEX_OP_TYPE.INCREMENTAL_INDEX + return INDEX_OP_TYPE.REGENERATE_INDEX def get_index_metric(metric): @@ -158,9 +171,23 @@ def get_index_metric(metric): def normalize_additional_params(params: dict) -> dict: - mapping = {"efconstruction": "efConstruction", "m": "M", "partitions": "partitions"} - - allowed_keys = ["efConstruction", "m", "partitions"] + mapping = { + "efconstruction": "efConstruction", + "m": "M", + "partitions": "partitions", + "bloom": "bloom_filter_size", + "bloom_size": "bloom_filter_size", + "Segment_Size": "segment_size", + "seg_size": "segment_size", + } + + allowed_keys = [ + "efConstruction", + "m", + "partitions", + "bloom_filter_size", + "segment_size", + ] # New dictionary to store the result with desired key format result_dict = {} @@ -187,20 +214,16 @@ def normalize_additional_params(params: dict) -> dict: return result_dict -def check_vdb_indexes(dataset): - tensors = dataset.tensors - - vdb_index_present = False - for _, tensor in tensors.items(): - is_embedding = is_embedding_tensor(tensor) - has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes") - try: - vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0 - except AttributeError: - vdb_index_ids_present = False +def check_embedding_vdb_indexes(tensor): + is_embedding = validate_embedding_tensor(tensor) + has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes") + try: + vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0 + except AttributeError: + vdb_index_ids_present = False - if is_embedding and has_vdb_indexes and vdb_index_ids_present: - return True + if is_embedding and has_vdb_indexes and vdb_index_ids_present: + return True return False @@ -210,12 +233,18 @@ def _incr_maintenance_vdb_indexes( try: is_embedding = tensor.htype == "embedding" has_vdb_indexes = hasattr(tensor.meta, "vdb_indexes") + + is_text = tensor.htype == "text" + if is_text: + raise Exception( + "Inverted index does not support incremental index maintenance." + ) try: vdb_index_ids_present = len(tensor.meta.vdb_indexes) > 0 except AttributeError: vdb_index_ids_present = False - if is_embedding and has_vdb_indexes and vdb_index_ids_present: + if (is_embedding or is_text) and has_vdb_indexes and vdb_index_ids_present: for vdb_index in tensor.meta.vdb_indexes: tensor.update_vdb_index( operation_kind=index_operation, @@ -226,14 +255,15 @@ def _incr_maintenance_vdb_indexes( raise Exception(f"An error occurred while regenerating VDB indexes: {e}") -# Routine to identify the index Operation. def index_operation_vectorstore(self): if not index_used(self.exec_option): return None emb_tensor = fetch_embedding_tensor(self.dataset) + if emb_tensor is None: + return None - if index_exists(self.dataset) and check_index_params(self): + if index_exists_emb(emb_tensor) and check_index_params(self): return emb_tensor.get_vdb_indexes()[0]["distance"] threshold = self.index_params.get("threshold", -1) @@ -262,48 +292,76 @@ def index_operation_vectorstore(self): def index_operation_dataset(self, dml_type, rowids): - emb_tensor = fetch_embedding_tensor(self) - if emb_tensor is None: - return - - index_operation_type = index_operation_type_dataset( - self, - emb_tensor.chunk_engine.num_samples, - len(rowids), - ) - - if index_operation_type == INDEX_OP_TYPE.NOOP: - return - if ( - index_operation_type == INDEX_OP_TYPE.CREATE_INDEX - or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX - ): - if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX: - try: - vdb_indexes = emb_tensor.get_vdb_indexes() - for vdb_index in vdb_indexes: - emb_tensor.delete_vdb_index(vdb_index["id"]) - except Exception as e: - raise Exception( - f"An error occurred while regenerating VDB indexes: {e}" - ) - distance_str = self.index_params.get("distance_metric", "COS") - additional_params_dict = self.index_params.get("additional_params", None) - distance = get_index_metric(distance_str.upper()) - if additional_params_dict and len(additional_params_dict) > 0: - param_dict = normalize_additional_params(additional_params_dict) - emb_tensor.create_vdb_index( - "hnsw_1", distance=distance, additional_params=param_dict + tensors = self.tensors + for _, tensor in tensors.items(): + is_embedding_tensor = validate_embedding_tensor(tensor) + is_text_tensor = validate_text_tensor(tensor) + index_operation_type = INDEX_OP_TYPE.NOOP + + if is_embedding_tensor or is_text_tensor: + index_operation_type = index_operation_type_dataset( + tensor, + self, + tensor.chunk_engine.num_samples, + len(rowids), + is_embedding_tensor, ) else: - emb_tensor.create_vdb_index("hnsw_1", distance=distance) - elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX: - partition_count = index_partition_count(self) - if partition_count > 1: - _incr_maintenance_vdb_indexes( - emb_tensor, rowids, dml_type, is_partitioned=True - ) + continue + + if index_operation_type == INDEX_OP_TYPE.NOOP: + continue + if ( + index_operation_type == INDEX_OP_TYPE.CREATE_INDEX + or index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX + ): + saved_vdb_indexes = [] + if index_operation_type == INDEX_OP_TYPE.REGENERATE_INDEX: + try: + vdb_indexes = tensor.get_vdb_indexes() + for vdb_index in vdb_indexes: + saved_vdb_indexes.append(vdb_index) + tensor.delete_vdb_index(vdb_index["id"]) + except Exception as e: + raise Exception( + f"An error occurred while regenerating VDB indexes: {e}" + ) + if is_embedding_tensor: + distance_str = self.index_params.get("distance_metric", "COS") + additional_params_dict = self.index_params.get( + "additional_params", None + ) + distance = get_index_metric(distance_str.upper()) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params(additional_params_dict) + tensor.create_vdb_index( + "hnsw_1", distance=distance, additional_params=param_dict + ) + else: + tensor.create_vdb_index("hnsw_1", distance=distance) + elif is_text_tensor: + if len(saved_vdb_indexes) > 0: + for vdb_index in saved_vdb_indexes: + id = vdb_index["id"] + additional_params_dict = vdb_index.get( + "additional_params", None + ) + if additional_params_dict and len(additional_params_dict) > 0: + param_dict = normalize_additional_params( + additional_params_dict + ) + tensor.create_vdb_index(id, additional_params=param_dict) + else: + tensor.create_vdb_index(id) + continue + elif index_operation_type == INDEX_OP_TYPE.INCREMENTAL_INDEX: + partition_count = index_partition_count(tensor) + if partition_count > 1: + _incr_maintenance_vdb_indexes( + tensor, rowids, dml_type, is_partitioned=True + ) + else: + _incr_maintenance_vdb_indexes(tensor, rowids, dml_type) + continue else: - _incr_maintenance_vdb_indexes(emb_tensor, rowids, dml_type) - else: - raise Exception("Unknown index operation") + raise Exception("Unknown index operation") diff --git a/deeplake/core/meta/tensor_meta.py b/deeplake/core/meta/tensor_meta.py index 7ec4e5371a..259fce06b7 100644 --- a/deeplake/core/meta/tensor_meta.py +++ b/deeplake/core/meta/tensor_meta.py @@ -241,6 +241,9 @@ def __setstate__(self, state: Dict[str, Any]): if self.htype == "embedding" and not hasattr(self, "vdb_indexes"): self.vdb_indexes = [] self._required_meta_keys += ("vdb_indexes",) + if self.htype == "text" and not hasattr(self, "vdb_indexes"): + self.vdb_indexes = [] + self._required_meta_keys += ("vdb_indexes",) @property def nbytes(self): diff --git a/deeplake/core/tensor.py b/deeplake/core/tensor.py index 8038a8d7eb..90ade0cde7 100644 --- a/deeplake/core/tensor.py +++ b/deeplake/core/tensor.py @@ -81,6 +81,8 @@ ) import warnings import webbrowser +import pathlib +import struct def create_tensor( @@ -361,7 +363,9 @@ def extend( TensorDtypeMismatchError: Dtype for array must be equal to or castable to this tensor's dtype. """ self._extend(samples, progressbar=progressbar, ignore_errors=ignore_errors) - if index_maintenance.validate_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): row_ids = list(range(self.num_samples - len(samples), self.num_samples)) index_maintenance.index_operation_dataset( # TODO: this might pick the wrong tensor when we support self.dataset, # index for multiple tensors in the future @@ -453,7 +457,9 @@ def append(self, sample: InputSample): """ row_ids = [self.num_samples] self._extend([sample], progressbar=False) - if index_maintenance.validate_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): index_maintenance.index_operation_dataset( # TODO: this might pick the wrong tensor when we support self.dataset, # index for multiple tensors in the future dml_type=_INDEX_OPERATION_MAPPING["ADD"], @@ -805,7 +811,9 @@ def __setitem__(self, item: Union[int, slice], value: Any): (1, 3, 3) """ self._update(item, value) - if index_maintenance.is_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): row_index = self.index[Index(item)] row_ids = list(row_index.values[0].indices(self.num_samples)) index_maintenance.index_operation_dataset( @@ -1214,7 +1222,9 @@ def pop(self, index: Optional[Union[int, List[int]]] = None): index = sorted(index, reverse=True) self._pop(index) - if index_maintenance.is_embedding_tensor(self): + if index_maintenance.validate_embedding_tensor( + self + ) or index_maintenance.validate_text_tensor(self): row_ids = index[:] index_maintenance.index_operation_dataset( self.dataset, @@ -1530,23 +1540,44 @@ def invalidate_libdeeplake_dataset(self): """Invalidates the libdeeplake dataset object.""" self.dataset.libdeeplake_dataset = None - def deserialize_partitions(self, serialized_data, incremental_dml=False): + def deserialize_inverted_index(self, serialized_data): from io import BytesIO stream = BytesIO(serialized_data) # Read number of partitions - num_partitions = int.from_bytes( + metadataSize = int.from_bytes( stream.read(8), "little" ) # Assuming size_t is 8 bytes + metadata_bytes = stream.read(metadataSize) + metadata = json.loads(metadata_bytes.decode("utf-8")) + + temp_paths_size = int.from_bytes(stream.read(8), "little") + + paths_string = stream.read().decode("utf-8") + temp_serialized_paths = paths_string.strip().split("\n") + + # Check if the declared number of paths match the actual number + if temp_paths_size != len(temp_serialized_paths): + raise ValueError( + "Mismatch between declared count and actual number of paths" + ) + + return metadata, temp_serialized_paths + + def deserialize_partitions(self, serialized_data, incremental_dml=False): + from io import BytesIO + + stream = BytesIO(serialized_data) + + num_partitions = int.from_bytes(stream.read(8), "little") + partition_info = [] for _ in range(num_partitions): - # Read partition name length and name name_length = int.from_bytes(stream.read(8), "little") name = stream.read(name_length).decode("utf-8") - # Read start and end indices start = int.from_bytes(stream.read(8), "little") end = int.from_bytes(stream.read(8), "little") @@ -1554,7 +1585,6 @@ def deserialize_partitions(self, serialized_data, incremental_dml=False): incr_info = [] if incremental_dml == True: - # Check for incremental update info incr_info_size = int.from_bytes(stream.read(8), "little") for _ in range(incr_info_size): name_length = int.from_bytes(stream.read(8), "little") @@ -1565,7 +1595,6 @@ def deserialize_partitions(self, serialized_data, incremental_dml=False): incr_info.append({"name": name, "start": start, "end": end}) - # Extract the actual data for each partition partitions_data = [] while True: size_data = stream.read(8) @@ -1578,6 +1607,8 @@ def deserialize_partitions(self, serialized_data, incremental_dml=False): return partition_info, partitions_data, incr_info def is_partitioned_vdb_index(self): + if self.htype != "embedding": + return False vdb_indexes = self.get_vdb_indexes() if len(vdb_indexes) == 0: return False @@ -1590,6 +1621,16 @@ def is_partitioned_vdb_index(self): return True return False + def is_inverted_index(self): + if self.htype == "text": + vdb_indexes = self.get_vdb_indexes() + if len(vdb_indexes) == 0: + return False + for vdb_index in vdb_indexes: + if vdb_index["type"] == "inverted_index": + return True + return False + def update_vdb_index( self, operation_kind: int, @@ -1734,16 +1775,19 @@ def create_vdb_index( additional_params: Optional[Dict[str, int]] = None, ): """ - Create similarity search index for embedding tensor. + Create similarity search index for embedding tensor or inverted index for text tensor. Args: - id (str): Unique identifier for the index. Defaults to ``hnsw_1``. + id (str): Unique identifier for the index. Defaults to ``hnsw_1``. or ``inverted_index1``. distance (DistanceType, str): Distance metric to be used for similarity search. Possible values are "l2_norm", "cosine_similarity". Defaults to ``DistanceType.COSINE_SIMILARITY``. additional_params (Optional[Dict[str, int]]): Additional parameters for the index. - - Structure of additional params is: + - Structure of additional params is used for HNSW index: :"M": Increasing this value will increase the index build time and memory usage but will improve the search accuracy. Defaults to ``16``. :"efConstruction": Defaults to ``200``. :"partitions": If tensors contain more than 45M samples, it is recommended to use partitions to create the index. Defaults to ``1``. + - Structure of additional params is used for Inverted index: + :"bloom_filter_size": Size of the bloom filter. Defaults to ``100000``. + :"segment_size": Size of the segment in MB. Defaults to ``25``. Example: >>> ds = deeplake.load("./test/my_embedding_ds") @@ -1751,19 +1795,22 @@ def create_vdb_index( >>> ds.embedding.create_vdb_index(id="hnsw_1", distance=DistanceType.COSINE_SIMILARITY) >>> # create cosine_similarity index on embedding tensor with additional params >>> ds.embedding.create_vdb_index(id="hnsw_1", distance=DistanceType.COSINE_SIMILARITY, additional_params={"M": 32, "partitions": 1, 'efConstruction': 200}) + >>> # create inverted index on text tensor + >>> ds.text.create_vdb_index(id="inverted_index1") + >>> # create inverted index on text tensor with additional params + >>> ds.text.create_vdb_index(id="inverted_index1", additional_params={"bloom_filter_size": 1000000, "segment_size": 50}) Notes: - Index creation is supported only for embedding tensors. + Index creation is supported for embedding tensors and text tensors. Raises: - Exception: If the tensor is not an embedding tensor. + Exception: If the tensor is not an embedding tensor or text tensor. Returns: Index: Returns the index object. """ self.storage.check_readonly() - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") + self.check_supported_tensor() if not self.dataset.libdeeplake_dataset is None: ds = self.dataset.libdeeplake_dataset else: @@ -1775,6 +1822,48 @@ def create_vdb_index( ts = getattr(ds, self.meta.name) from indra import api # type: ignore + if self.meta.htype == "text": + self.meta.add_vdb_index( + id=id, + type="inverted_index", + distance=None, + additional_params=additional_params, + ) + try: + if additional_params is None: + index = api.vdb.generate_index(ts, index_type="inverted_index") + else: + index = api.vdb.generate_index( + ts, + index_type="inverted_index", + param=additional_params, + ) + b = index.serialize() + commit_id = self.version_state["commit_id"] + metadata, temp_serialized_paths = self.deserialize_inverted_index(b) + inverted_meta_key = get_tensor_vdb_index_key( + self.key, commit_id, f"{id}_inverted_metadata" + ) + metadata_json = json.dumps(metadata) + metadata_bytes = metadata_json.encode("utf-8") + self.storage[inverted_meta_key] = metadata_bytes + temp_serialized_paths_count = len(temp_serialized_paths) + temp_serialized_paths = [str(path) for path in temp_serialized_paths] + for i, path in enumerate(temp_serialized_paths): + file_name = pathlib.Path(path).name + with open(path, "rb") as f: + inv_key = get_tensor_vdb_index_key( + self.key, commit_id, f"{id}_{file_name}" + ) + self.storage[inv_key] = f.read() + f.close() + + self.invalidate_libdeeplake_dataset() + except: + self.meta.remove_vdb_index(id=id) + raise + return index + if type(distance) == DistanceType: distance = distance.value self.meta.add_vdb_index( @@ -1823,8 +1912,8 @@ def create_vdb_index( def delete_vdb_index(self, id: str): self.storage.check_readonly() - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") + self.check_supported_tensor() + commit_id = self.version_state["commit_id"] self.unload_vdb_index_cache() if self.is_partitioned_vdb_index(): @@ -1848,6 +1937,28 @@ def delete_vdb_index(self, id: str): f"{id}_partition_metadata", ) ) + elif self.is_inverted_index(): + metadata_file = self.storage[ + get_tensor_vdb_index_key( + self.key, + self.version_state["commit_id"], + f"{id}_inverted_metadata", + ) + ] + metadata = json.loads(metadata_file.decode("utf-8")) + segment_names = list(metadata.keys()) + for name in segment_names: + partition_key = get_tensor_vdb_index_key( + self.key, self.version_state["commit_id"], f"{id}_inv_{name}" + ) + self.storage.pop(partition_key) + self.storage.pop( + get_tensor_vdb_index_key( + self.key, + self.version_state["commit_id"], + f"{id}_inverted_metadata", + ) + ) else: self.storage.pop(get_tensor_vdb_index_key(self.key, commit_id, id)) @@ -1872,8 +1983,6 @@ def _verify_and_delete_vdb_indexes(self): raise Exception(f"An error occurred while deleting VDB indexes: {e}") def load_vdb_index(self, id: str): - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") if not self.meta.contains_vdb_index(id): raise ValueError(f"Tensor meta has no vdb index with name '{id}'.") if not self.dataset.libdeeplake_dataset is None: @@ -1894,8 +2003,6 @@ def load_vdb_index(self, id: str): raise ValueError(f"An error occurred while loading the VDB index {id}: {e}") def unload_vdb_index_cache(self): - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") if not self.dataset.libdeeplake_dataset is None: ds = self.dataset.libdeeplake_dataset else: @@ -1914,17 +2021,21 @@ def unload_vdb_index_cache(self): raise Exception(f"An error occurred while cleaning VDB Cache: {e}") def get_vdb_indexes(self) -> List[Dict[str, str]]: - if self.meta.htype != "embedding": - raise Exception(f"Only supported for embedding tensors.") + if self.meta.htype != "embedding" and self.meta.htype != "text": + raise Exception(f"Only supported for embedding and text tensors.") return self.meta.vdb_indexes def fetch_vdb_indexes(self) -> List[Dict[str, str]]: vdb_indexes = [] - if self.meta.htype == "embedding": + if self.meta.htype == "embedding" or self.meta.htype == "text": if (not self.meta.vdb_indexes is None) and len(self.meta.vdb_indexes) > 0: vdb_indexes.extend(self.meta.vdb_indexes) return vdb_indexes + def check_supported_tensor(self): + if self.meta.htype not in ["embedding", "text"]: + raise Exception(f"Only supported for embedding and text tensors.") + def _check_compatibility_with_htype(self, htype): """Checks if the tensor is compatible with the given htype. Raises an error if not compatible. diff --git a/deeplake/core/tests/test_inverted_indexes.py b/deeplake/core/tests/test_inverted_indexes.py new file mode 100644 index 0000000000..69de99863a --- /dev/null +++ b/deeplake/core/tests/test_inverted_indexes.py @@ -0,0 +1,574 @@ +import deeplake +import numpy as np + +from deeplake.core.distance_type import DistanceType +from deeplake.tests.common import requires_libdeeplake +from deeplake.tests.dataset_fixtures import local_auth_ds_generator +from deeplake.util.exceptions import ReadOnlyModeError, EmbeddingTensorPopError +import pytest +import warnings + +statements = [ + "The apple fell from the tree and rolled into the river.", + "In the jungle, the giraffe munched on the leaves of a tall tree.", + "A rainbow appeared in the sky after the rain stopped.", + "The computer screen flickered as the storm intensified outside.", + "She found a book about coding with Python on the dusty shelf.", + "As the sun set, the mountain peaks were bathed in orange light.", + "The cat jumped onto the window sill to watch the birds outside.", + "He poured himself a cup of coffee and stared out at the ocean.", + "The children played under the table, laughing and giggling.", + "With a splash, the dog jumped into the river to fetch the stick.", +] + +new_statements = [ + "The quick brown fox jumps over the lazy dog.", + "Python is a widely used high-level programming language.", + "The sun shines bright over the tall mountains.", + "A cup of tea is a good companion while coding.", + "The river flowed swiftly under the wooden bridge.", + "In autumn, the leaves fall gently to the ground.", + "The stars twinkled brightly in the night sky.", + "She prefers coffee over tea during the early hours.", + "The code compiled successfully without any errors.", + "Birds fly south for the winter every year.", +] + +statements1 = [ + "The text is about the apple falling from the tree.", + "Finally searched the jungle for the giraffe.", +] + + +@requires_libdeeplake +def test_inv_index_(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements: + ds.text.append(statement) + + ds.summary() + + # create inverted index. + ds.text.create_vdb_index("inv_1") + ts = ds.text.get_vdb_indexes() + assert len(ts) == 1 + assert ts[0]["id"] == "inv_1" + + # drop the inverted index. + ds.text.delete_vdb_index("inv_1") + ts = ds.text.get_vdb_indexes() + assert len(ts) == 0 + + +@requires_libdeeplake +def test_inv_index_query(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query the inverted index this should fail as equalities are not supported. + res = ds.query(f"select * where text == 'apple'") + assert len(res) == 0 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'flickered')") + assert len(res) == 1 + assert res.index[0].values[0].value == 3 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'mountain')") + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'mountain')") + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'jumped')") + assert len(res) == 2 + assert res.index[1].values[0].value == 9 + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_query_with_hnsw(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + for statement in statements: + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append({"text": statement, "embedding": random_embedding}) + + print(ds.text[2].numpy()) + # create inverted index. + ds.text.create_vdb_index("inv_1") + ds.embedding.create_vdb_index("hnsw_1") + + # query the inverted index along with hnsw index. + v2 = ds.embedding[0].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'apple') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + # query the inverted index along with hnsw index. + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'mountain') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # query the inverted index along with hnsw index. + v2 = ds.embedding[9].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'jumped') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 9 + + ds.text.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_multiple_where_or_and(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements + new_statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query with multiple WHERE clauses using OR and AND + res = ds.query( + f"select * where CONTAINS(text, 'mountains.') and CONTAINS(text, 'bright')" + ) + print(res) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 12 + ) # "The sun shines bright over the tall mountains." + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_multiple_keywords(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements + new_statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query with multiple keywords in WHERE clause + res = ds.query( + f"select * where CONTAINS(text, 'sun') and CONTAINS(text, 'bright')" + ) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 12 + ) # "The sun shines bright over the tall mountains." + + res = ds.query( + f"select * where CONTAINS(text, 'quick') and CONTAINS(text, 'fox')" + ) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 10 + ) # "The quick brown fox jumps over the lazy dog." + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inv_index_case_insensitivity(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements + new_statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query with case insensitivity + res = ds.query( + f"select * where CONTAINS(text, 'SUN')" + ) # Case insensitive match + assert len(res) == 2 + assert ( + res.index[0].values[0].value == 5 + ) # "The sun shines bright over the tall mountains." + + res = ds.query( + f"select * where CONTAINS(text, 'PYTHON')" + ) # Case insensitive match + assert len(res) == 2 + assert ( + res.index[0].values[0].value == 4 + ) # "Python is a widely used high-level programming language." + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_multiple_where_clauses_and_filters(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("text1", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + ds.create_tensor("year", htype="text") # Changed to text + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "text1": " ".join( + statement.split()[-3:] + ), # last 3 words as a separate column + "embedding": random_embedding, + "year": str( + 2015 + (i % 7) + ), # cycles between 2015 and 2021 as strings + } + ) + + # Create inverted index on text and year + ds.text.create_vdb_index("inv_1") + ds.year.create_vdb_index("inv_year") + + # Test 1: Multiple WHERE clauses with OR and AND filters + res = ds.query( + f"select * where CONTAINS(text, 'river.') and CONTAINS(year, '2015')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 # 'apple' is in the first statement + + res = ds.query( + f"select * where CONTAINS(text, 'rainbow') or CONTAINS(text1, 'rain') and CONTAINS(year, '2017')" + ) + assert len(res) == 1 + assert ( + res.index[0].values[0].value == 2 + ) # 'rainbow' matches in the third statement + + # Test 2: Multiple keywords in WHERE clause + res = ds.query( + f"select * where CONTAINS( text, 'apple') and CONTAINS( text, 'tree')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + res = ds.query( + f"select * where CONTAINS( text, 'apple') or CONTAINS(text, 'river.')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + res = ds.query( + f"select * where CONTAINS(text, 'coding') and CONTAINS(year, '2019')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 4 + + # Test 3: Case insensitivity + res = ds.query(f"select * where CONTAINS(text, 'Apple')") + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + res = ds.query(f"select * where CONTAINS(text, 'Tree')") + assert len(res) == 1 + + ds.text.unload_vdb_index_cache() + ds.year.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_hnsw_order_by_clause(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + for statement in statements: + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append({"text": statement, "embedding": random_embedding}) + + # Create inverted index and HNSW index + ds.text.create_vdb_index("inv_1") + ds.embedding.create_vdb_index("hnsw_1") + + # Test 4: ORDER BY clause with HNSW + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'sun') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + # failure case. + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where text == 'sun' order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 0 + + ds.text.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_where_condition_on_column_without_inverted_index(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("text1", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "text1": " ".join( + statement.split()[-3:] + ), # last 3 words as a separate column + "embedding": random_embedding, + } + ) + + # Create inverted index on text only + ds.text.create_vdb_index("inv_1") + ds.embedding.create_vdb_index("hnsw_1") + + # res = ds.query(f"select * where CONTAINS(text, 'sun') and CONTAINS(text, 'bright')") + + # Test 5: WHERE condition on a column without an inverted index + v2 = ds.embedding[0].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(text, 'fell') or CONTAINS(text, 'river.') or CONTAINS(text, 'rolled') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 + + ds.text.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_multiple_where_clauses_and_filters_with_year_text(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("text1", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + ds.create_tensor("year", htype="text") # Changed to text type + + years = ["2015", "2016", "2017", "2018", "2019", "2020", "2021"] + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "text1": " ".join( + statement.split()[-5:] + ), # last 3 words as a separate column + "embedding": random_embedding, + "year": years[i % len(years)], # cycles between 2015 and 2021 + } + ) + + # Create inverted index on text and year + ds.text.create_vdb_index("inv_1") + ds.year.create_vdb_index("inv_year") + + # Test 1: Multiple WHERE clauses with OR and AND filters + res = ds.query( + f"select * where CONTAINS(text, 'apple') or CONTAINS(text1, 'river.') and CONTAINS(year, '2016')" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 0 # 'apple' is in the first statement + + res = ds.query( + f"select * where CONTAINS(text, 'rainbow') or CONTAINS(text1, 'dusty')" + ) + assert len(res) == 2 + assert res.index[0].values[0].value == 2 + assert res.index[1].values[0].value == 4 + + # Test 2: Multiple keywords in WHERE clause + res = ds.query( + f"select * where CONTAINS(text, 'apple') or CONTAINS(text1, 'river')" + ) + assert len(res) == 2 + assert res.index[0].values[0].value == 0 + assert res.index[1].values[0].value == 9 + + res = ds.query( + f"select * where CONTAINS(text, 'coding') or CONTAINS(year, '2020') or CONTAINS(year, '2021') or CONTAINS(year, '2019')" + ) + assert len(res) == 3 + + res = ds.query( + f"select * where CONTAINS(text, 'coding') or CONTAINS(year, '2020') or CONTAINS(year, '2021') or CONTAINS(year, '2018')" + ) + assert len(res) == 4 + + ds.text.unload_vdb_index_cache() + ds.year.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inverted_index_on_year_column_with_text(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("year", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + years = ["2015", "2016", "2017", "2018", "2019", "2020", "2021"] + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "year": years[ + i % len(years) + ], # cycles between 2015 and 2021 as strings + "embedding": random_embedding, + } + ) + + # Create inverted index on year only + ds.year.create_vdb_index("inv_year") + ds.embedding.create_vdb_index("hnsw_1") + + # Test: Multiple OR conditions on year and embedding column + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(year, '2019') or CONTAINS(year, '2020') or CONTAINS(year, '2021') order by l2_norm(embedding - ARRAY[{s2}]) limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + ds.year.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inverted_index_regeneration(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + for statement in statements: + ds.text.append(statement) + + # create inverted index. + ds.text.create_vdb_index("inv_1") + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'flickered')") + assert len(res) == 1 + assert res.index[0].values[0].value == 3 + + for statement in statements1: + ds.text.append(statement) + + # query the inverted index. + res = ds.query(f"select * where CONTAINS(text, 'flickered')") + assert len(res) == 1 + assert res.index[0].values[0].value == 3 + + res = ds.query(f"select * where CONTAINS(text, 'searched')") + assert len(res) == 1 + assert res.index[0].values[0].value == 11 + + ds.text.unload_vdb_index_cache() + + +@requires_libdeeplake +def test_inverted_index_multiple_tensor_maintenance(local_auth_ds_generator): + ds = local_auth_ds_generator() + with ds: + ds.create_tensor("text", htype="text") + ds.create_tensor("year", htype="text") + ds.create_tensor("embedding", htype="embedding", dtype=np.float32) + + years = ["2015", "2016", "2017", "2018", "2019", "2020", "2021"] + years1 = ["2022", "2023"] + + for i, statement in enumerate(statements): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "year": years[ + i % len(years) + ], # cycles between 2015 and 2021 as strings + "embedding": random_embedding, + } + ) + + # Create inverted index on year only + ds.year.create_vdb_index("inv_year") + ds.embedding.create_vdb_index("hnsw_1") + + # Test: Multiple OR conditions on year and embedding column + v2 = ds.embedding[5].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + res = ds.query( + f"select * where CONTAINS(year, '2019') or CONTAINS(year, '2020') or CONTAINS(year, '2021') order by cosine_similarity(embedding, ARRAY[{s2}]) DESC limit 1" + ) + assert len(res) == 1 + assert res.index[0].values[0].value == 5 + + for i, statement in enumerate(statements1): + random_embedding = np.random.random_sample(384).astype(np.float32) + ds.append( + { + "text": statement, + "year": years1[i % len(years1)], + "embedding": random_embedding, + } + ) + v2 = ds.embedding[11].numpy(fetch_chunks=True) + s2 = ",".join(str(c) for c in v2) + + res = ds.query( + f"select * where CONTAINS(year, '2023') order by cosine_similarity(embedding, ARRAY[{s2}]) DESC limit 1" + ) + + assert len(res) == 1 + assert res.index[0].values[0].value == 11 + + ds.year.unload_vdb_index_cache() + ds.embedding.unload_vdb_index_cache() diff --git a/deeplake/core/vectorstore/test_deeplake_vectorstore.py b/deeplake/core/vectorstore/test_deeplake_vectorstore.py index c3af4c549a..811e231614 100644 --- a/deeplake/core/vectorstore/test_deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/test_deeplake_vectorstore.py @@ -958,6 +958,7 @@ def assert_updated_vector_store( ) @pytest.mark.parametrize("init_embedding_function", [embedding_fn3, None]) @pytest.mark.slow +@pytest.mark.flaky(reruns=3) @requires_libdeeplake def test_update_embedding( ds, @@ -1344,6 +1345,7 @@ def create_and_populate_vs( @requires_libdeeplake +@pytest.mark.flaky(reruns=3) def test_update_embedding_row_ids_and_ids_specified_should_throw_exception( local_path, vector_store_hash_ids, @@ -1368,6 +1370,7 @@ def test_update_embedding_row_ids_and_ids_specified_should_throw_exception( @requires_libdeeplake +@pytest.mark.flaky(reruns=3) def test_update_embedding_row_ids_and_filter_specified_should_throw_exception( local_path, vector_store_filters, @@ -1391,6 +1394,7 @@ def test_update_embedding_row_ids_and_filter_specified_should_throw_exception( @requires_libdeeplake +@pytest.mark.flaky(reruns=3) def test_update_embedding_query_and_filter_specified_should_throw_exception( local_path, vector_store_filters, diff --git a/deeplake/htype.py b/deeplake/htype.py index 2933f20932..515ae45f06 100644 --- a/deeplake/htype.py +++ b/deeplake/htype.py @@ -95,7 +95,7 @@ class htype: "dtype": "Any", }, htype.LIST: {"dtype": "List"}, - htype.TEXT: {"dtype": "str"}, + htype.TEXT: {"dtype": "str", "vdb_indexes": []}, htype.TAG: {"dtype": "List"}, htype.DICOM: {"sample_compression": "dcm"}, htype.NIFTI: {}, diff --git a/setup.py b/setup.py index 10edc475b7..9b30477e2c 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def libdeeplake_available(): extras_require["all"] = [req_map[r] for r in all_extras] if libdeeplake_available(): - libdeeplake = "libdeeplake==0.0.138" + libdeeplake = "libdeeplake==0.0.141" extras_require["enterprise"] = [libdeeplake, "pyjwt"] extras_require["all"].append(libdeeplake) install_requires.append(libdeeplake)