diff --git a/llama-index-core/llama_index/core/indices/base.py b/llama-index-core/llama_index/core/indices/base.py index 60ffbea10b520..e066678306724 100644 --- a/llama-index-core/llama_index/core/indices/base.py +++ b/llama-index-core/llama_index/core/indices/base.py @@ -152,6 +152,8 @@ def set_index_id(self, index_id: str) -> None: # add the new index struct self._index_struct.index_id = index_id self._storage_context.index_store.add_index_struct(self._index_struct) + if hasattr(self._vector_store, "move_nodes"): + self._vector_store.move_nodes(from_index_id=old_id, to_index_id=index_id) @property def docstore(self) -> BaseDocumentStore: diff --git a/llama-index-core/llama_index/core/indices/property_graph/base.py b/llama-index-core/llama_index/core/indices/property_graph/base.py index 5bb876b04844d..3b73703176641 100644 --- a/llama-index-core/llama_index/core/indices/property_graph/base.py +++ b/llama-index-core/llama_index/core/indices/property_graph/base.py @@ -378,6 +378,7 @@ def as_retriever( ): sub_retrievers.append( VectorContextRetriever( + index_id=self.index_id, graph_store=self.property_graph_store, vector_store=self.vector_store, include_text=include_text, diff --git a/llama-index-core/llama_index/core/indices/vector_store/base.py b/llama-index-core/llama_index/core/indices/vector_store/base.py index a655302f12277..bd0772ebb324e 100644 --- a/llama-index-core/llama_index/core/indices/vector_store/base.py +++ b/llama-index-core/llama_index/core/indices/vector_store/base.py @@ -189,7 +189,7 @@ async def _async_add_nodes_to_index( nodes_batch = await self._aget_node_with_embedding( nodes_batch, show_progress ) - new_ids = await self._vector_store.async_add(nodes_batch, **insert_kwargs) + new_ids = await self._vector_store.async_add(nodes_batch, index_id=index_struct.index_id, **insert_kwargs) # if the vector store doesn't store text, we need to add the nodes to the # index struct and document store @@ -230,7 +230,7 @@ def _add_nodes_to_index( for nodes_batch in iter_batch(nodes, self._insert_batch_size): nodes_batch = self._get_node_with_embedding(nodes_batch, show_progress) - new_ids = self._vector_store.add(nodes_batch, **insert_kwargs) + new_ids = self._vector_store.add(nodes_batch, index_id=index_struct.index_id, **insert_kwargs) if not self._vector_store.stores_text or self._store_nodes_override: # NOTE: if the vector store doesn't store text, diff --git a/llama-index-core/llama_index/core/indices/vector_store/retrievers/retriever.py b/llama-index-core/llama_index/core/indices/vector_store/retrievers/retriever.py index 3a42924549494..82ca15abc178e 100644 --- a/llama-index-core/llama_index/core/indices/vector_store/retrievers/retriever.py +++ b/llama-index-core/llama_index/core/indices/vector_store/retrievers/retriever.py @@ -177,7 +177,7 @@ def _get_nodes_with_embeddings( self, query_bundle_with_embeddings: QueryBundle ) -> List[NodeWithScore]: query = self._build_vector_store_query(query_bundle_with_embeddings) - query_result = self._vector_store.query(query, **self._kwargs) + query_result = self._vector_store.query(query, index_id=self._index.index_id, **self._kwargs) return self._build_node_list_from_query_result(query_result) async def _aget_nodes_with_embeddings( diff --git a/llama-index-core/llama_index/core/vector_stores/simple.py b/llama-index-core/llama_index/core/vector_stores/simple.py index 1343f265a2274..d4c62d7024050 100644 --- a/llama-index-core/llama_index/core/vector_stores/simple.py +++ b/llama-index-core/llama_index/core/vector_stores/simple.py @@ -240,6 +240,7 @@ def get_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, ) -> List[BaseNode]: """Get nodes.""" raise NotImplementedError("SimpleVectorStore does not store nodes directly.") @@ -247,7 +248,8 @@ def get_nodes( def add( self, nodes: Sequence[BaseNode], - **add_kwargs: Any, + index_id: Optional[str] = None, + **kwargs: Any, ) -> List[str]: """Add nodes to index.""" for node in nodes: @@ -287,6 +289,7 @@ def delete_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, **delete_kwargs: Any, ) -> None: filter_fn = _build_metadata_filter_fn( @@ -310,13 +313,14 @@ def node_filter_fn(node_id: str) -> bool: del self.data.text_id_to_ref_doc_id[node_id] self.data.metadata_dict.pop(node_id, None) - def clear(self) -> None: + def clear(self, index_id: Optional[str] = None) -> None: """Clear the store.""" self.data = SimpleVectorStoreData() def query( self, query: VectorStoreQuery, + index_id: Optional[str] = None, **kwargs: Any, ) -> VectorStoreQueryResult: """Get nodes for response.""" diff --git a/llama-index-core/llama_index/core/vector_stores/types.py b/llama-index-core/llama_index/core/vector_stores/types.py index 4d582e6832c96..1299427f87997 100644 --- a/llama-index-core/llama_index/core/vector_stores/types.py +++ b/llama-index-core/llama_index/core/vector_stores/types.py @@ -333,10 +333,14 @@ class BasePydanticVectorStore(BaseComponent, ABC): def client(self) -> Any: """Get client.""" + def move_nodes(self, from_index_id: str, to_index_id: str): + pass + def get_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, ) -> List[BaseNode]: """Get nodes from vector store.""" raise NotImplementedError("get_nodes not implemented") @@ -345,14 +349,16 @@ async def aget_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, ) -> List[BaseNode]: """Asynchronously get nodes from vector store.""" - return self.get_nodes(node_ids, filters) + return self.get_nodes(node_ids, filters, index_id) @abstractmethod def add( self, nodes: Sequence[BaseNode], + index_id: Optional[str] = None, **kwargs: Any, ) -> List[str]: """Add nodes to vector store.""" @@ -360,6 +366,7 @@ def add( async def async_add( self, nodes: Sequence[BaseNode], + index_id: Optional[str] = None, **kwargs: Any, ) -> List[str]: """ @@ -367,7 +374,7 @@ async def async_add( NOTE: this is not implemented for all vector stores. If not implemented, it will just call add synchronously. """ - return self.add(nodes, **kwargs) + return self.add(nodes, index_id, **kwargs) @abstractmethod def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: @@ -386,6 +393,7 @@ def delete_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, **delete_kwargs: Any, ) -> None: """Delete nodes from vector store.""" @@ -395,32 +403,33 @@ async def adelete_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, **delete_kwargs: Any, ) -> None: """Asynchronously delete nodes from vector store.""" - self.delete_nodes(node_ids, filters) + self.delete_nodes(node_ids, filters, index_id) - def clear(self) -> None: + def clear(self, index_id: Optional[str] = None) -> None: """Clear all nodes from configured vector store.""" raise NotImplementedError("clear not implemented") - async def aclear(self) -> None: + async def aclear(self, index_id: Optional[str] = None) -> None: """Asynchronously clear all nodes from configured vector store.""" - self.clear() + self.clear(index_id=index_id) @abstractmethod - def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult: + def query(self, query: VectorStoreQuery, index_id: Optional[str] = None, **kwargs: Any) -> VectorStoreQueryResult: """Query vector store.""" async def aquery( - self, query: VectorStoreQuery, **kwargs: Any + self, query: VectorStoreQuery, index_id: Optional[str] = None, **kwargs: Any ) -> VectorStoreQueryResult: """ Asynchronously query vector store. NOTE: this is not implemented for all vector stores. If not implemented, it will just call query synchronously. """ - return self.query(query, **kwargs) + return self.query(query, index_id, **kwargs) def persist( self, persist_path: str, fs: Optional[fsspec.AbstractFileSystem] = None diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/llama_index/vector_stores/azureaisearch/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/llama_index/vector_stores/azureaisearch/base.py index 0601a408fc41c..f7712e0079f27 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/llama_index/vector_stores/azureaisearch/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/llama_index/vector_stores/azureaisearch/base.py @@ -294,6 +294,12 @@ def _create_index(self, index_name: Optional[str]) -> None: filterable=True, hidden=self._field_mapping["id"] in self._hidden_field_keys, ), + SimpleField( + name=self._field_mapping["index_id"], + type="Edm.String", + filterable=True, + hidden=self._field_mapping["index_id"] in self._hidden_field_keys, + ), SearchableField( name=self._field_mapping["chunk"], type="Edm.String", @@ -741,6 +747,7 @@ def __init__( # Default field mapping field_mapping = { "id": id_field_key, + "index_id": "index_id", "chunk": chunk_field_key, "embedding": embedding_field_key, "metadata": metadata_string_field_key, @@ -796,9 +803,17 @@ def _default_index_mapping( return index_doc + def move_nodes(self, from_index_id: str, to_index_id: str): + nodes = self.get_nodes(index_id=from_index_id) + updates = [ + {"id": n.id_, "index_id": to_index_id} for n in nodes + ] + self._search_client.merge_documents(updates) + def add( self, nodes: List[BaseNode], + index_id: Optional[str] = None, **add_kwargs: Any, ) -> List[str]: """ @@ -825,7 +840,7 @@ def add( logger.debug(f"Processing embedding: {node.node_id}") ids.append(node.node_id) - index_document = self._create_index_document(node) + index_document = self._create_index_document(node, index_id) document_size = len(json.dumps(index_document).encode("utf-8")) documents.append(index_document) accumulated_size += document_size @@ -857,6 +872,7 @@ def add( async def async_add( self, nodes: List[BaseNode], + index_id: Optional[str] = None, **add_kwargs: Any, ) -> List[str]: """ @@ -891,7 +907,7 @@ async def async_add( logger.debug(f"Processing embedding: {node.node_id}") ids.append(node.node_id) - index_document = self._create_index_document(node) + index_document = self._create_index_document(node, index_id) document_size = len(json.dumps(index_document).encode("utf-8")) documents.append(index_document) accumulated_size += document_size @@ -920,10 +936,11 @@ async def async_add( return ids - def _create_index_document(self, node: BaseNode) -> Dict[str, Any]: + def _create_index_document(self, node: BaseNode, index_id: str) -> Dict[str, Any]: """Create AI Search index document from embedding result.""" doc: Dict[str, Any] = {} doc["id"] = node.node_id + doc["index_id"] = index_id doc["chunk"] = node.get_content(metadata_mode=MetadataMode.NONE) or "" doc["embedding"] = node.get_embedding() doc["doc_id"] = node.ref_doc_id @@ -1004,6 +1021,7 @@ def delete_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, **delete_kwargs: Any, ) -> None: """ @@ -1012,7 +1030,10 @@ def delete_nodes( if node_ids is None and filters is None: raise ValueError("Either node_ids or filters must be provided") - filter = self._build_filter_delete_query(node_ids, filters) + user_filter = self._build_filter_delete_query(node_ids, filters) + filter = f'({self._field_mapping["index_id"]} eq \'{index_id}\')' + if user_filter: + filter += f' and ({user_filter})' batch_size = 1000 @@ -1038,6 +1059,7 @@ def delete_nodes( async def adelete_nodes( self, node_ids: Optional[List[str]] = None, + index_id: Optional[str] = None, filters: Optional[MetadataFilters] = None, **delete_kwargs: Any, ) -> None: @@ -1047,7 +1069,10 @@ async def adelete_nodes( if node_ids is None and filters is None: raise ValueError("Either node_ids or filters must be provided") - filter = self._build_filter_delete_query(node_ids, filters) + user_filter = self._build_filter_delete_query(node_ids, filters) + filter = f'({self._field_mapping["index_id"]} eq \'{index_id}\')' + if user_filter: + filter += f' and ({user_filter})' batch_size = 1000 @@ -1155,10 +1180,11 @@ def _create_odata_filter(self, metadata_filters: MetadataFilters) -> str: return odata_expr - def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult: - odata_filter = None + def query(self, query: VectorStoreQuery, index_id: Optional[str] = None, **kwargs: Any) -> VectorStoreQueryResult: + odata_filter = f'{self._field_mapping["index_id"]} eq \'{index_id}\'' if query.filters is not None: - odata_filter = self._create_odata_filter(query.filters) + odata_filter = f'({odata_filter}) and ({self._create_odata_filter(query.filters)})' + azure_query_result_search: AzureQueryResultSearchBase = ( AzureQueryResultSearchDefault( query, self._field_mapping, odata_filter, self._search_client @@ -1179,17 +1205,17 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul return azure_query_result_search.search() async def aquery( - self, query: VectorStoreQuery, **kwargs: Any + self, query: VectorStoreQuery, index_id: Optional[str] = None, **kwargs: Any ) -> VectorStoreQueryResult: - odata_filter = None + odata_filter = f'{self._field_mapping["index_id"]} eq {index_id}' # NOTE: users can provide odata_filters directly to the query odata_filters = kwargs.get("odata_filters") if odata_filters is not None: - odata_filter = odata_filter + odata_filter = f'({odata_filter}) and ({odata_filters})' else: if query.filters is not None: - odata_filter = self._create_odata_filter(query.filters) + odata_filter = f'({odata_filter}) and ({self._create_odata_filter(query.filters)})' azure_query_result_search: AzureQueryResultSearchBase = ( AzureQueryResultSearchDefault( @@ -1245,6 +1271,7 @@ def get_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, limit: Optional[int] = None, ) -> List[BaseNode]: """Get nodes from the Azure AI Search index. @@ -1260,7 +1287,11 @@ def get_nodes( if not self._search_client: raise ValueError("Search client not initialized") - filter_str = self._build_filter_str(self._field_mapping, node_ids, filters) + user_filter_str = self._build_filter_str(self._field_mapping, node_ids, filters) + if user_filter_str: + filter_str = f'({self._field_mapping["index_id"]} eq \'{index_id}\') and ({user_filter_str})' + else: + filter_str = f'{self._field_mapping["index_id"]} eq \'{index_id}\'' nodes = [] batch_size = 1000 # Azure Search batch size limit @@ -1291,6 +1322,7 @@ async def aget_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, limit: Optional[int] = None, ) -> List[BaseNode]: """Get nodes asynchronously from the Azure AI Search index. @@ -1306,7 +1338,8 @@ async def aget_nodes( if not self._async_search_client: raise ValueError("Async Search client not initialized") - filter_str = self._build_filter_str(self._field_mapping, node_ids, filters) + user_filter_str = self._build_filter_str(self._field_mapping, node_ids, filters) + filter_str = f'({self._field_mapping["index_id"]} eq \'{index_id}\') and ({user_filter_str})' nodes = [] batch_size = 1000 # Azure Search batch size limit diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/tests/test_azureaisearch.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/tests/test_azureaisearch.py index 8bfa66ce77b4b..bd43c660eb85c 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/tests/test_azureaisearch.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-azureaisearch/tests/test_azureaisearch.py @@ -113,7 +113,7 @@ def test_azureaisearch_add_two_batches() -> None: vector_store = create_mock_vector_store(search_client) nodes = create_sample_documents(11) - ids = vector_store.add(nodes) + ids = vector_store.add(nodes, index_id="42") call_count = index_documents_batch_instance.add_upload_actions.call_count @@ -134,7 +134,7 @@ def test_azureaisearch_add_one_batch() -> None: vector_store = create_mock_vector_store(search_client) nodes = create_sample_documents(11) - ids = vector_store.add(nodes) + ids = vector_store.add(nodes, index_id="42") call_count = index_documents_batch_instance.add_upload_actions.call_count @@ -203,6 +203,7 @@ def test_azureaisearch_query() -> None: mock_search_results = [ { "id": "test_id_1", + "index_id": "42", "chunk": "test chunk 1", "content": "test chunk 1", "metadata": json.dumps({"key": "value1"}), @@ -211,6 +212,7 @@ def test_azureaisearch_query() -> None: }, { "id": "test_id_2", + "index_id": "42", "chunk": "test chunk 2", "content": "test chunk 2", "metadata": json.dumps({"key": "value2"}), @@ -230,7 +232,7 @@ def test_azureaisearch_query() -> None: ) # Execute the query - result = vector_store.query(query) + result = vector_store.query(query, index_id="42") # Assert the search method was called with correct parameters search_client.search.assert_called_once_with( @@ -242,7 +244,7 @@ def test_azureaisearch_query() -> None: ], top=2, select=["id", "content", "metadata", "doc_id"], - filter=None, + filter="index_id eq '42'" ) # Assert the result structure diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/llama_index/vector_stores/chroma/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/llama_index/vector_stores/chroma/base.py index 8a3d953da9773..9b745fc15ec25 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/llama_index/vector_stores/chroma/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/llama_index/vector_stores/chroma/base.py @@ -2,10 +2,14 @@ import logging import math -from typing import Any, Dict, Generator, List, Optional, cast +from typing import Any, Dict, Generator, List, Optional, cast, Sequence import chromadb +from chromadb.api import ClientAPI from chromadb.api.models.Collection import Collection +from chromadb.api.types import ( + IncludeEnum, +) from llama_index.core.bridge.pydantic import Field, PrivateAttr from llama_index.core.schema import BaseNode, MetadataMode, TextNode from llama_index.core.utils import truncate_text @@ -20,7 +24,6 @@ metadata_dict_to_node, node_to_metadata_dict, ) - logger = logging.getLogger(__name__) @@ -141,6 +144,7 @@ class ChromaVectorStore(BasePydanticVectorStore): flat_metadata: bool = True collection_name: Optional[str] + # _client: ClientAPI = PrivateAttr() host: Optional[str] port: Optional[str] ssl: bool @@ -182,6 +186,7 @@ def __init__( else: self._collection = cast(Collection, chroma_collection) + @classmethod def from_collection(cls, collection: Any) -> "ChromaVectorStore": try: @@ -235,11 +240,19 @@ def from_params( def class_name(cls) -> str: return "ChromaVectorStore" + def move_nodes(self, from_index_id: str, to_index_id: str): + all_metadatas = self._collection.get(where={"__index_id__": from_index_id}, include=[IncludeEnum.metadatas]) + ids = all_metadatas["ids"] + updated_metadatas = [{**m, "__index_id__": to_index_id} for m in all_metadatas["metadatas"]] + self._collection.update(ids, metadatas=updated_metadatas) + def get_nodes( self, - node_ids: Optional[List[str]], - filters: Optional[List[MetadataFilters]] = None, + node_ids: Optional[List[str]] = None, + filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, ) -> List[BaseNode]: + """Get nodes from index. Args: @@ -247,9 +260,8 @@ def get_nodes( filters (List[MetadataFilters]): list of metadata filters """ - if not self._collection: - raise ValueError("Collection not initialized") - + # if not self._collection: + # raise ValueError("Collection not initialized") node_ids = node_ids or [] if filters: @@ -257,14 +269,16 @@ def get_nodes( else: where = None - result = self._get(None, where=where, ids=node_ids) + result = self._get(None, where=where, ids=node_ids, index_id=index_id) return result.nodes - def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]: + def add(self, nodes: List[BaseNode], index_id: Optional[str] = None, **add_kwargs: Any) -> List[str]: + """Add nodes to index. Args: + index_id: Id of the index the nodes will be added too nodes: List[BaseNode]: list of nodes with embeddings """ @@ -285,6 +299,7 @@ def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]: metadata_dict = node_to_metadata_dict( node, remove_text=True, flat_metadata=self.flat_metadata ) + metadata_dict["__index_id__"] = index_id for key in metadata_dict: if metadata_dict[key] is None: metadata_dict[key] = "" @@ -315,7 +330,9 @@ def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: def delete_nodes( self, node_ids: Optional[List[str]] = None, - filters: Optional[List[MetadataFilters]] = None, + filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, + **delete_kwargs: Any, ) -> None: """Delete nodes from index. @@ -334,19 +351,29 @@ def delete_nodes( else: where = None + where = { + "$and": [ + { + {"__index_id__": index_id}, + where + } + ] + } if where is not None else {"__index_id__": index_id} + self._collection.delete(ids=node_ids, where=where) - def clear(self) -> None: + def clear(self, index_id: Optional[str] = None) -> None: """Clear the collection.""" - ids = self._collection.get()["ids"] + + ids = self._collection.get(where={"__index_id__": index_id})["ids"] self._collection.delete(ids=ids) @property def client(self) -> Any: """Return client.""" - return self._collection + return self._client - def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult: + def query(self, query: VectorStoreQuery, index_id: Optional[str] = None, **kwargs: Any) -> VectorStoreQueryResult: """Query index for top k most similar nodes. Args: @@ -366,18 +393,27 @@ def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResul where = kwargs.pop("where", None) if not query.query_embedding: - return self._get(limit=query.similarity_top_k, where=where, **kwargs) + return self._get(limit=query.similarity_top_k, where=where, index_id=index_id, **kwargs) return self._query( query_embeddings=query.query_embedding, n_results=query.similarity_top_k, where=where, + index_id=index_id, **kwargs, ) def _query( - self, query_embeddings: List["float"], n_results: int, where: dict, **kwargs + self, query_embeddings: List["float"], n_results: int, where: dict, index_id: str, **kwargs ) -> VectorStoreQueryResult: + + where = { + "$and": [ + {"__index_id__": index_id}, + where + ] + } if where else {"__index_id__": index_id} + results = self._collection.query( query_embeddings=query_embeddings, n_results=n_results, @@ -427,8 +463,18 @@ def _query( return VectorStoreQueryResult(nodes=nodes, similarities=similarities, ids=ids) def _get( - self, limit: Optional[int], where: dict, **kwargs + self, limit: Optional[int], where: dict, index_id: str, **kwargs ) -> VectorStoreQueryResult: + + where = { + "$and": [ + { + "__index_id__": index_id, + }, + where + ] + } if where else {"__index_id__": index_id} + results = self._collection.get( limit=limit, where=where, @@ -442,10 +488,9 @@ def _get( if not results["ids"]: results["ids"] = [[]] - for node_id, text, metadata in zip( - results["ids"], results["documents"], results["metadatas"] - ): + for node_id, text, metadata in zip(results["ids"], results["documents"], results["metadatas"]): try: + metadata.pop("__index_id__") node = metadata_dict_to_node(metadata) node.set_content(text) except Exception: diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/tests/test_chromadb.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/tests/test_chromadb.py index 050cfbecd63e7..6e3e1cb9bd30c 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/tests/test_chromadb.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-chroma/tests/test_chromadb.py @@ -149,14 +149,14 @@ async def test_add_to_chromadb_and_query( use_async: bool, ) -> None: if use_async: - await vector_store.async_add(node_embeddings) + await vector_store.async_add(node_embeddings, index_id="42") res = await vector_store.aquery( - VectorStoreQuery(query_embedding=[1.0, 0.0, 0.0], similarity_top_k=1) + VectorStoreQuery(query_embedding=[1.0, 0.0, 0.0], similarity_top_k=1), index_id="42" ) else: - vector_store.add(node_embeddings) + vector_store.add(node_embeddings, index_id="42") res = vector_store.query( - VectorStoreQuery(query_embedding=[1.0, 0.0, 0.0], similarity_top_k=1) + VectorStoreQuery(query_embedding=[1.0, 0.0, 0.0], similarity_top_k=1), index_id="42" ) assert res.nodes assert res.nodes[0].get_content() == "lorem ipsum" @@ -179,13 +179,14 @@ async def test_add_to_chromadb_and_query_by_metafilters_only( ) if use_async: - await vector_store.async_add(node_embeddings) + await vector_store.async_add(node_embeddings, index_id="42") res = await vector_store.aquery( - VectorStoreQuery(filters=filters, similarity_top_k=1) + VectorStoreQuery(filters=filters, similarity_top_k=1), + index_id="42" ) else: - vector_store.add(node_embeddings) - res = vector_store.query(VectorStoreQuery(filters=filters, similarity_top_k=1)) + vector_store.add(node_embeddings, index_id="42") + res = vector_store.query(VectorStoreQuery(filters=filters, similarity_top_k=1), index_id="42") assert ( res.nodes[0].get_content() @@ -196,13 +197,14 @@ async def test_add_to_chromadb_and_query_by_metafilters_only( def test_get_nodes( vector_store: ChromaVectorStore, node_embeddings: List[TextNode] ) -> None: - vector_store.add(node_embeddings) + vector_store.add(node_embeddings, index_id="42") res = vector_store.get_nodes( node_ids=[ "c330d77f-90bd-4c51-9ed2-57d8d693b3b0", "c3d1e1dd-8fb4-4b8f-b7ea-7fa96038d39d", "c3ew11cd-8fb4-4b8f-b7ea-7fa96038d39d", - ] + ], + index_id="42" ) assert len(res) == 3 assert res[0].get_content() == "lorem ipsum" @@ -213,19 +215,21 @@ def test_get_nodes( def test_delete_nodes( vector_store: ChromaVectorStore, node_embeddings: List[TextNode] ) -> None: - vector_store.add(node_embeddings) + vector_store.add(node_embeddings, index_id="42") vector_store.delete_nodes( node_ids=[ "c330d77f-90bd-4c51-9ed2-57d8d693b3b0", "c3d1e1dd-8fb4-4b8f-b7ea-7fa96038d39d", - ] + ], + index_id="42" ) res = vector_store.get_nodes( node_ids=[ "c330d77f-90bd-4c51-9ed2-57d8d693b3b0", "c3d1e1dd-8fb4-4b8f-b7ea-7fa96038d39d", "c3ew11cd-8fb4-4b8f-b7ea-7fa96038d39d", - ] + ], + index_id = "42" ) assert len(res) == 1 assert res[0].get_content() == "lorem ipsum" @@ -235,13 +239,14 @@ def test_delete_nodes( def test_clear( vector_store: ChromaVectorStore, node_embeddings: List[TextNode] ) -> None: - vector_store.add(node_embeddings) - vector_store.clear() + vector_store.add(node_embeddings, index_id="42") + vector_store.clear(index_id="42") res = vector_store.get_nodes( node_ids=[ "c330d77f-90bd-4c51-9ed2-57d8d693b3b0", "c3d1e1dd-8fb4-4b8f-b7ea-7fa96038d39d", "c3ew11cd-8fb4-4b8f-b7ea-7fa96038d39d", - ] + ], + index_id="42" ) assert len(res) == 0 diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/llama_index/vector_stores/postgres/base.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/llama_index/vector_stores/postgres/base.py index e195b1da29d2d..e622bde2fb064 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/llama_index/vector_stores/postgres/base.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/llama_index/vector_stores/postgres/base.py @@ -7,6 +7,8 @@ import psycopg2 # noqa import sqlalchemy import sqlalchemy.ext.asyncio +from sqlalchemy.sql.elements import SQLCoreOperations + from llama_index.core.bridge.pydantic import PrivateAttr from llama_index.core.schema import BaseNode, MetadataMode, TextNode from llama_index.core.vector_stores.types import ( @@ -71,6 +73,7 @@ class TSVector(TypeDecorator): class HybridAbstractData(base): # type: ignore __abstract__ = True # this line is necessary id = Column(BIGINT, primary_key=True, autoincrement=True) + index_id = Column(VARCHAR, nullable=False) text = Column(VARCHAR, nullable=False) metadata_ = Column(metadata_dtype) node_id = Column(VARCHAR) @@ -98,6 +101,7 @@ class HybridAbstractData(base): # type: ignore class AbstractData(base): # type: ignore __abstract__ = True # this line is necessary id = Column(BIGINT, primary_key=True, autoincrement=True) + index_id = Column(VARCHAR, nullable=False) text = Column(VARCHAR, nullable=False) metadata_ = Column(metadata_dtype) node_id = Column(VARCHAR) @@ -409,9 +413,10 @@ def _initialize(self) -> None: self._create_hnsw_index() self._is_initialized = True - def _node_to_table_row(self, node: BaseNode) -> Any: + def _node_to_table_row(self, node: BaseNode, index_id: str) -> Any: return self._table_class( node_id=node.node_id, + index_id=index_id, embedding=node.get_embedding(), text=node.get_content(metadata_mode=MetadataMode.NONE), metadata_=node_to_metadata_dict( @@ -421,24 +426,36 @@ def _node_to_table_row(self, node: BaseNode) -> Any: ), ) - def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]: + def move_nodes(self, from_index_id: str, to_index_id: str): + from sqlalchemy import update + with self._session() as session, session.begin(): + stmt = ( + update(self._table_class). + where(self._table_class.index_id == from_index_id). + values(index_id=to_index_id) + ) + session.execute(stmt) + session.commit() + print("") + + def add(self, nodes: List[BaseNode], index_id: Optional[str] = None, **add_kwargs: Any) -> List[str]: self._initialize() ids = [] with self._session() as session, session.begin(): for node in nodes: ids.append(node.node_id) - item = self._node_to_table_row(node) + item = self._node_to_table_row(node, index_id) session.add(item) session.commit() return ids - async def async_add(self, nodes: List[BaseNode], **kwargs: Any) -> List[str]: + async def async_add(self, nodes: List[BaseNode], index_id: Optional[str] = None, **kwargs: Any) -> List[str]: self._initialize() ids = [] async with self._async_session() as session, session.begin(): for node in nodes: ids.append(node.node_id) - item = self._node_to_table_row(node) + item = self._node_to_table_row(node, index_id) session.add(item) await session.commit() return ids @@ -545,17 +562,27 @@ def _recursively_apply_filters(self, filters: List[MetadataFilters]) -> Any: def _apply_filters_and_limit( self, stmt: "Select", + index_id: str, limit: int, metadata_filters: Optional[MetadataFilters] = None, ) -> Any: + from sqlalchemy.sql import and_ if metadata_filters: stmt = stmt.where( # type: ignore - self._recursively_apply_filters(metadata_filters) + and_( + self._table_class.index_id == index_id, + self._recursively_apply_filters(metadata_filters) + ) + ) + else: + stmt = stmt.where( # type: ignore + self._table_class.index_id == index_id, ) return stmt.limit(limit) # type: ignore def _build_query( self, + index_id: str, embedding: Optional[List[float]], limit: int = 10, metadata_filters: Optional[MetadataFilters] = None, @@ -570,16 +597,17 @@ def _build_query( self._table_class.embedding.cosine_distance(embedding).label("distance"), ).order_by(text("distance asc")) - return self._apply_filters_and_limit(stmt, limit, metadata_filters) + return self._apply_filters_and_limit(stmt, index_id, limit, metadata_filters) def _query_with_score( self, embedding: Optional[List[float]], + index_id: str, limit: int = 10, metadata_filters: Optional[MetadataFilters] = None, **kwargs: Any, ) -> List[DBEmbeddingRow]: - stmt = self._build_query(embedding, limit, metadata_filters) + stmt = self._build_query(index_id, embedding, limit, metadata_filters) with self._session() as session, session.begin(): from sqlalchemy import text @@ -614,11 +642,12 @@ def _query_with_score( async def _aquery_with_score( self, embedding: Optional[List[float]], + index_id: str, limit: int = 10, metadata_filters: Optional[MetadataFilters] = None, **kwargs: Any, ) -> List[DBEmbeddingRow]: - stmt = self._build_query(embedding, limit, metadata_filters) + stmt = self._build_query(index_id, embedding, limit, metadata_filters) async with self._async_session() as async_session, async_session.begin(): from sqlalchemy import text @@ -650,6 +679,7 @@ async def _aquery_with_score( def _build_sparse_query( self, query_str: Optional[str], + index_id: str, limit: int, metadata_filters: Optional[MetadataFilters] = None, ) -> Any: @@ -694,15 +724,16 @@ def get_col_spec(self, **kw: Any) -> str: ) # type: ignore - return self._apply_filters_and_limit(stmt, limit, metadata_filters) + return self._apply_filters_and_limit(stmt, index_id, limit, metadata_filters) async def _async_sparse_query_with_rank( self, query_str: Optional[str] = None, + index_id: Optional[str] = None, limit: int = 10, metadata_filters: Optional[MetadataFilters] = None, ) -> List[DBEmbeddingRow]: - stmt = self._build_sparse_query(query_str, limit, metadata_filters) + stmt = self._build_sparse_query(query_str, index_id, limit, metadata_filters) async with self._async_session() as async_session, async_session.begin(): res = await async_session.execute(stmt) return [ @@ -718,10 +749,11 @@ async def _async_sparse_query_with_rank( def _sparse_query_with_rank( self, query_str: Optional[str] = None, + index_id: Optional[str] = None, limit: int = 10, metadata_filters: Optional[MetadataFilters] = None, ) -> List[DBEmbeddingRow]: - stmt = self._build_sparse_query(query_str, limit, metadata_filters) + stmt = self._build_sparse_query(query_str, index_id, limit, metadata_filters) with self._session() as session, session.begin(): res = session.execute(stmt) return [ @@ -735,7 +767,7 @@ def _sparse_query_with_rank( ] async def _async_hybrid_query( - self, query: VectorStoreQuery, **kwargs: Any + self, query: VectorStoreQuery, index_id: str, **kwargs: Any ) -> List[DBEmbeddingRow]: import asyncio @@ -747,12 +779,13 @@ async def _async_hybrid_query( results = await asyncio.gather( self._aquery_with_score( query.query_embedding, + index_id, query.similarity_top_k, query.filters, **kwargs, ), self._async_sparse_query_with_rank( - query.query_str, sparse_top_k, query.filters + query.query_str, index_id, sparse_top_k, query.filters ), ) @@ -761,7 +794,7 @@ async def _async_hybrid_query( return _dedup_results(all_results) def _hybrid_query( - self, query: VectorStoreQuery, **kwargs: Any + self, query: VectorStoreQuery, index_id: str, **kwargs: Any ) -> List[DBEmbeddingRow]: if query.alpha is not None: _logger.warning("postgres hybrid search does not support alpha parameter.") @@ -770,13 +803,14 @@ def _hybrid_query( dense_results = self._query_with_score( query.query_embedding, + index_id, query.similarity_top_k, query.filters, **kwargs, ) sparse_results = self._sparse_query_with_rank( - query.query_str, sparse_top_k, query.filters + query.query_str, index_id, sparse_top_k, query.filters ) all_results = dense_results + sparse_results @@ -810,22 +844,23 @@ def _db_rows_to_query_result( ) async def aquery( - self, query: VectorStoreQuery, **kwargs: Any + self, query: VectorStoreQuery, index_id: Optional[str] = None, **kwargs: Any ) -> VectorStoreQueryResult: self._initialize() if query.mode == VectorStoreQueryMode.HYBRID: - results = await self._async_hybrid_query(query, **kwargs) + results = await self._async_hybrid_query(query, index_id, **kwargs) elif query.mode in [ VectorStoreQueryMode.SPARSE, VectorStoreQueryMode.TEXT_SEARCH, ]: sparse_top_k = query.sparse_top_k or query.similarity_top_k results = await self._async_sparse_query_with_rank( - query.query_str, sparse_top_k, query.filters + query.query_str, index_id, sparse_top_k, query.filters ) elif query.mode == VectorStoreQueryMode.DEFAULT: results = await self._aquery_with_score( query.query_embedding, + index_id, query.similarity_top_k, query.filters, **kwargs, @@ -835,21 +870,22 @@ async def aquery( return self._db_rows_to_query_result(results) - def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult: + def query(self, query: VectorStoreQuery, index_id: Optional[str] = None, **kwargs: Any) -> VectorStoreQueryResult: self._initialize() if query.mode == VectorStoreQueryMode.HYBRID: - results = self._hybrid_query(query, **kwargs) + results = self._hybrid_query(query, index_id, **kwargs) elif query.mode in [ VectorStoreQueryMode.SPARSE, VectorStoreQueryMode.TEXT_SEARCH, ]: sparse_top_k = query.sparse_top_k or query.similarity_top_k results = self._sparse_query_with_rank( - query.query_str, sparse_top_k, query.filters + query.query_str, index_id, sparse_top_k, query.filters ) elif query.mode == VectorStoreQueryMode.DEFAULT: results = self._query_with_score( query.query_embedding, + index_id, query.similarity_top_k, query.filters, **kwargs, @@ -875,6 +911,7 @@ def delete_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, **delete_kwargs: Any, ) -> None: """Deletes nodes. @@ -887,16 +924,20 @@ def delete_nodes( return from sqlalchemy import delete + from sqlalchemy.sql import and_ self._initialize() with self._session() as session, session.begin(): stmt = delete(self._table_class) + wheres = [self._table_class.index_id == index_id] if node_ids: - stmt = stmt.where(self._table_class.node_id.in_(node_ids)) + wheres.append(self._table_class.node_id.in_(node_ids)) if filters: - stmt = stmt.where(self._recursively_apply_filters(filters)) + wheres.append(self._recursively_apply_filters(filters)) + + stmt = stmt.where(and_(*wheres)) session.execute(stmt) session.commit() @@ -905,6 +946,7 @@ async def adelete_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, **delete_kwargs: Any, ) -> None: """Deletes nodes asynchronously. @@ -917,32 +959,36 @@ async def adelete_nodes( return from sqlalchemy import delete + from sqlalchemy.sql import and_, column self._initialize() async with self._async_session() as async_session, async_session.begin(): stmt = delete(self._table_class) + wheres = [self._table_class.index_id == index_id] if node_ids: - stmt = stmt.where(self._table_class.node_id.in_(node_ids)) + wheres.append(self._table_class.node_id.in_(node_ids)) if filters: - stmt = stmt.where(self._recursively_apply_filters(filters)) + wheres.append(self._recursively_apply_filters(filters)) + stmt = stmt.where(and_(*wheres)) await async_session.execute(stmt) await async_session.commit() - def clear(self) -> None: + def clear(self, index_id: Optional[str] = None) -> None: """Clears table.""" from sqlalchemy import delete + from sqlalchemy.sql import and_, column self._initialize() with self._session() as session, session.begin(): - stmt = delete(self._table_class) + stmt = delete(self._table_class).where(self._table_class.index_id == index_id) session.execute(stmt) session.commit() - async def aclear(self) -> None: + async def aclear(self, index_id: Optional[str] = None) -> None: """Asynchronously clears table.""" from sqlalchemy import delete @@ -957,6 +1003,7 @@ def get_nodes( self, node_ids: Optional[List[str]] = None, filters: Optional[MetadataFilters] = None, + index_id: Optional[str] = None, ) -> List[BaseNode]: """Get nodes from vector store.""" assert ( @@ -965,6 +1012,7 @@ def get_nodes( self._initialize() from sqlalchemy import select + from sqlalchemy.sql import and_ stmt = select( self._table_class.node_id, @@ -973,12 +1021,16 @@ def get_nodes( self._table_class.embedding, ) + wheres = [self._table_class.index_id == index_id] if node_ids: - stmt = stmt.where(self._table_class.node_id.in_(node_ids)) + wheres.append(self._table_class.node_id.in_(node_ids)) if filters: - filter_clause = self._recursively_apply_filters(filters) - stmt = stmt.where(filter_clause) + wheres.append(self._recursively_apply_filters(filters)) + + stmt = stmt.where(and_(*wheres)) + + nodes: List[BaseNode] = [] diff --git a/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/tests/test_postgres.py b/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/tests/test_postgres.py index 4804c8dba4e76..5dcc4ee66796b 100644 --- a/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/tests/test_postgres.py +++ b/llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/tests/test_postgres.py @@ -25,7 +25,7 @@ PARAMS: Dict[str, Union[str, int]] = { "host": "localhost", "user": "postgres", - "password": "mark90", + "password": "password", "port": 5432, } TEST_DB = "test_vector_db" @@ -295,16 +295,16 @@ async def test_add_to_db_and_query( pg: PGVectorStore, node_embeddings: List[TextNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") q = VectorStoreQuery(query_embedding=_get_sample_vector(1.0), similarity_top_k=1) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 1 assert res.nodes[0].node_id == "aaa" @@ -317,18 +317,18 @@ async def test_query_hnsw( pg_hnsw: PGVectorStore, node_embeddings: List[TextNode], use_async: bool ): if use_async: - await pg_hnsw.async_add(node_embeddings) + await pg_hnsw.async_add(node_embeddings, index_id="42") else: - pg_hnsw.add(node_embeddings) + pg_hnsw.add(node_embeddings, index_id="42") assert isinstance(pg_hnsw, PGVectorStore) assert hasattr(pg_hnsw, "_engine") q = VectorStoreQuery(query_embedding=_get_sample_vector(1.0), similarity_top_k=1) if use_async: - res = await pg_hnsw.aquery(q) + res = await pg_hnsw.aquery(q, index_id="42") else: - res = pg_hnsw.query(q) + res = pg_hnsw.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 1 assert res.nodes[0].node_id == "aaa" @@ -341,9 +341,9 @@ async def test_add_to_db_and_query_with_metadata_filters( pg: PGVectorStore, node_embeddings: List[TextNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") filters = MetadataFilters( @@ -353,9 +353,9 @@ async def test_add_to_db_and_query_with_metadata_filters( query_embedding=_get_sample_vector(0.5), similarity_top_k=10, filters=filters ) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 1 assert res.nodes[0].node_id == "bbb" @@ -368,9 +368,9 @@ async def test_add_to_db_and_query_with_metadata_filters_with_in_operator( pg: PGVectorStore, node_embeddings: List[TextNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") filters = MetadataFilters( @@ -386,9 +386,9 @@ async def test_add_to_db_and_query_with_metadata_filters_with_in_operator( query_embedding=_get_sample_vector(0.5), similarity_top_k=10, filters=filters ) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 1 assert res.nodes[0].node_id == "bbb" @@ -401,9 +401,9 @@ async def test_add_to_db_and_query_with_metadata_filters_with_in_operator_and_si pg: PGVectorStore, node_embeddings: List[TextNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") filters = MetadataFilters( @@ -419,9 +419,9 @@ async def test_add_to_db_and_query_with_metadata_filters_with_in_operator_and_si query_embedding=_get_sample_vector(0.5), similarity_top_k=10, filters=filters ) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 1 assert res.nodes[0].node_id == "bbb" @@ -434,9 +434,9 @@ async def test_add_to_db_and_query_with_metadata_filters_with_contains_operator( pg: PGVectorStore, node_embeddings: List[TextNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") filters = MetadataFilters( @@ -452,9 +452,9 @@ async def test_add_to_db_and_query_with_metadata_filters_with_contains_operator( query_embedding=_get_sample_vector(0.5), similarity_top_k=10, filters=filters ) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 1 assert res.nodes[0].node_id == "ccc" @@ -467,18 +467,18 @@ async def test_add_to_db_query_and_delete( pg: PGVectorStore, node_embeddings: List[TextNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") q = VectorStoreQuery(query_embedding=_get_sample_vector(0.1), similarity_top_k=1) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 1 assert res.nodes[0].node_id == "bbb" @@ -493,9 +493,9 @@ async def test_sparse_query( use_async: bool, ) -> None: if use_async: - await pg_hybrid.async_add(hybrid_node_embeddings) + await pg_hybrid.async_add(hybrid_node_embeddings, index_id="42") else: - pg_hybrid.add(hybrid_node_embeddings) + pg_hybrid.add(hybrid_node_embeddings, index_id="42") assert isinstance(pg_hybrid, PGVectorStore) assert hasattr(pg_hybrid, "_engine") @@ -508,9 +508,9 @@ async def test_sparse_query( ) if use_async: - res = await pg_hybrid.aquery(q) + res = await pg_hybrid.aquery(q, index_id="42") else: - res = pg_hybrid.query(q) + res = pg_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 2 assert res.nodes[0].node_id == "ccc" @@ -526,9 +526,9 @@ async def test_hybrid_query( use_async: bool, ) -> None: if use_async: - await pg_hybrid.async_add(hybrid_node_embeddings) + await pg_hybrid.async_add(hybrid_node_embeddings, index_id="42") else: - pg_hybrid.add(hybrid_node_embeddings) + pg_hybrid.add(hybrid_node_embeddings, index_id="42") assert isinstance(pg_hybrid, PGVectorStore) assert hasattr(pg_hybrid, "_engine") @@ -541,9 +541,9 @@ async def test_hybrid_query( ) if use_async: - res = await pg_hybrid.aquery(q) + res = await pg_hybrid.aquery(q, index_id="42") else: - res = pg_hybrid.query(q) + res = pg_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 3 assert res.nodes[0].node_id == "aaa" @@ -559,9 +559,9 @@ async def test_hybrid_query( ) if use_async: - res = await pg_hybrid.aquery(q) + res = await pg_hybrid.aquery(q, index_id="42") else: - res = pg_hybrid.query(q) + res = pg_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 4 assert res.nodes[0].node_id == "aaa" @@ -578,9 +578,9 @@ async def test_hybrid_query( ) if use_async: - res = await pg_hybrid.aquery(q) + res = await pg_hybrid.aquery(q, index_id="42") else: - res = pg_hybrid.query(q) + res = pg_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 4 assert res.nodes[0].node_id == "aaa" @@ -598,9 +598,9 @@ async def test_hybrid_query( use_async: bool, ) -> None: if use_async: - await pg_hnsw_hybrid.async_add(hybrid_node_embeddings) + await pg_hnsw_hybrid.async_add(hybrid_node_embeddings, index_id="42") else: - pg_hnsw_hybrid.add(hybrid_node_embeddings) + pg_hnsw_hybrid.add(hybrid_node_embeddings, index_id="42") assert isinstance(pg_hnsw_hybrid, PGVectorStore) assert hasattr(pg_hnsw_hybrid, "_engine") @@ -613,9 +613,9 @@ async def test_hybrid_query( ) if use_async: - res = await pg_hnsw_hybrid.aquery(q) + res = await pg_hnsw_hybrid.aquery(q, index_id="42") else: - res = pg_hnsw_hybrid.query(q) + res = pg_hnsw_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 3 assert res.nodes[0].node_id == "aaa" @@ -631,9 +631,9 @@ async def test_hybrid_query( ) if use_async: - res = await pg_hnsw_hybrid.aquery(q) + res = await pg_hnsw_hybrid.aquery(q, index_id="42") else: - res = pg_hnsw_hybrid.query(q) + res = pg_hnsw_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 4 assert res.nodes[0].node_id == "aaa" @@ -650,9 +650,9 @@ async def test_hybrid_query( ) if use_async: - res = await pg_hnsw_hybrid.aquery(q) + res = await pg_hnsw_hybrid.aquery(q, index_id="42") else: - res = pg_hnsw_hybrid.query(q) + res = pg_hnsw_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 4 assert res.nodes[0].node_id == "aaa" @@ -670,9 +670,9 @@ async def test_add_to_db_and_hybrid_query_with_metadata_filters( use_async: bool, ) -> None: if use_async: - await pg_hybrid.async_add(hybrid_node_embeddings) + await pg_hybrid.async_add(hybrid_node_embeddings, index_id="42") else: - pg_hybrid.add(hybrid_node_embeddings) + pg_hybrid.add(hybrid_node_embeddings, index_id="42") assert isinstance(pg_hybrid, PGVectorStore) assert hasattr(pg_hybrid, "_engine") filters = MetadataFilters( @@ -686,9 +686,9 @@ async def test_add_to_db_and_hybrid_query_with_metadata_filters( mode=VectorStoreQueryMode.HYBRID, ) if use_async: - res = await pg_hybrid.aquery(q) + res = await pg_hybrid.aquery(q, index_id="42") else: - res = pg_hybrid.query(q) + res = pg_hybrid.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 2 assert res.nodes[0].node_id == "bbb" @@ -706,7 +706,7 @@ def test_hybrid_query_fails_if_no_query_str_provided( ) with pytest.raises(Exception) as exc: - pg_hybrid.query(q) + pg_hybrid.query(q, index_id="42") assert str(exc) == "query_str must be specified for a sparse vector query." @@ -718,16 +718,16 @@ async def test_add_to_db_and_query_index_nodes( pg: PGVectorStore, index_node_embeddings: List[BaseNode], use_async: bool ) -> None: if use_async: - await pg.async_add(index_node_embeddings) + await pg.async_add(index_node_embeddings, index_id="42") else: - pg.add(index_node_embeddings) + pg.add(index_node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") q = VectorStoreQuery(query_embedding=_get_sample_vector(5.0), similarity_top_k=2) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert res.nodes assert len(res.nodes) == 2 assert res.nodes[0].node_id == "aaa_ref" @@ -744,9 +744,9 @@ async def test_delete_nodes( pg: PGVectorStore, node_embeddings: List[BaseNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") @@ -755,35 +755,35 @@ async def test_delete_nodes( # test deleting nothing if use_async: - await pg.adelete_nodes() + await pg.adelete_nodes(index_id="42") else: - pg.delete_nodes() + pg.delete_nodes(index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i in res.ids for i in ["aaa", "bbb", "ccc"]) # test deleting element that doesn't exist if use_async: - await pg.adelete_nodes(["asdf"]) + await pg.adelete_nodes(["asdf"], index_id="42") else: - pg.delete_nodes(["asdf"]) + pg.delete_nodes(["asdf"], index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i in res.ids for i in ["aaa", "bbb", "ccc"]) # test deleting list if use_async: - await pg.adelete_nodes(["aaa", "bbb"]) + await pg.adelete_nodes(["aaa", "bbb"], index_id="42") else: - pg.delete_nodes(["aaa", "bbb"]) + pg.delete_nodes(["aaa", "bbb"], index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i not in res.ids for i in ["aaa", "bbb"]) assert "ccc" in res.ids @@ -795,9 +795,9 @@ async def test_delete_nodes_metadata( pg: PGVectorStore, node_embeddings: List[BaseNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") @@ -815,13 +815,13 @@ async def test_delete_nodes_metadata( ] ) if use_async: - await pg.adelete_nodes(["aaa", "bbb"], filters=filters) + await pg.adelete_nodes(["aaa", "bbb"], filters=filters, index_id="42") else: - pg.delete_nodes(["aaa", "bbb"], filters=filters) + pg.delete_nodes(["aaa", "bbb"], filters=filters, index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i in res.ids for i in ["aaa", "ccc", "ddd"]) assert "bbb" not in res.ids @@ -836,13 +836,13 @@ async def test_delete_nodes_metadata( ] ) if use_async: - await pg.adelete_nodes(["aaa"], filters=filters) + await pg.adelete_nodes(["aaa"], filters=filters, index_id="42") else: - pg.delete_nodes(["aaa"], filters=filters) + pg.delete_nodes(["aaa"], filters=filters, index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i not in res.ids for i in ["bbb", "aaa"]) assert all(i in res.ids for i in ["ccc", "ddd"]) @@ -857,13 +857,13 @@ async def test_delete_nodes_metadata( ] ) if use_async: - await pg.adelete_nodes(["ccc"], filters=filters) + await pg.adelete_nodes(["ccc"], filters=filters, index_id="42") else: - pg.delete_nodes(["ccc"], filters=filters) + pg.delete_nodes(["ccc"], filters=filters, index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i not in res.ids for i in ["bbb", "aaa"]) assert all(i in res.ids for i in ["ccc", "ddd"]) @@ -878,13 +878,13 @@ async def test_delete_nodes_metadata( ] ) if use_async: - await pg.adelete_nodes(filters=filters) + await pg.adelete_nodes(filters=filters, index_id="42") else: - pg.delete_nodes(filters=filters) + pg.delete_nodes(filters=filters, index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i not in res.ids for i in ["bbb", "aaa", "ddd"]) assert "ccc" in res.ids @@ -903,9 +903,9 @@ async def test_hnsw_index_creation( # calling add will make the db initialization run for pg in pg_hnsw_multiple: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") # these are the actual table and index names that PGVectorStore automatically created data_test_table_name = f"data_{TEST_TABLE_NAME}" @@ -931,9 +931,9 @@ async def test_clear( pg: PGVectorStore, node_embeddings: List[BaseNode], use_async: bool ) -> None: if use_async: - await pg.async_add(node_embeddings) + await pg.async_add(node_embeddings, index_id="42") else: - pg.add(node_embeddings) + pg.add(node_embeddings, index_id="42") assert isinstance(pg, PGVectorStore) assert hasattr(pg, "_engine") @@ -941,20 +941,20 @@ async def test_clear( q = VectorStoreQuery(query_embedding=_get_sample_vector(0.5), similarity_top_k=10) if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i in res.ids for i in ["bbb", "aaa", "ddd", "ccc"]) if use_async: - await pg.aclear() + await pg.aclear(index_id="42") else: - pg.clear() + pg.clear(index_id="42") if use_async: - res = await pg.aquery(q) + res = await pg.aquery(q, index_id="42") else: - res = pg.query(q) + res = pg.query(q, index_id="42") assert all(i not in res.ids for i in ["bbb", "aaa", "ddd", "ccc"]) assert len(res.ids) == 0 @@ -1026,8 +1026,8 @@ def test_get_nodes_parametrized( expected_node_ids: List[str], ) -> None: """Test get_nodes method with various combinations of node_ids and filters.""" - pg.add(node_embeddings) - nodes = pg.get_nodes(node_ids=node_ids, filters=filters) + pg.add(node_embeddings, index_id="42") + nodes = pg.get_nodes(node_ids=node_ids, filters=filters, index_id="42") retrieved_ids = [node.node_id for node in nodes] assert set(retrieved_ids) == set(expected_node_ids) assert len(retrieved_ids) == len(expected_node_ids) diff --git a/testsrc/storage_structure/__init__.py b/testsrc/storage_structure/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/testsrc/storage_structure/reverse_engineer_structure.py b/testsrc/storage_structure/reverse_engineer_structure.py new file mode 100644 index 0000000000000..427b20b95379c --- /dev/null +++ b/testsrc/storage_structure/reverse_engineer_structure.py @@ -0,0 +1,50 @@ +import logging +import sys + +from llama_index.embeddings.huggingface import HuggingFaceEmbedding + +from llama_index.core.node_parser import SentenceSplitter +from llama_index.llms.openai import OpenAI + +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) + +from llama_index.core import ( + VectorStoreIndex, + SimpleDirectoryReader, + load_index_from_storage, + StorageContext, Settings, +) + +def main(): + storage_context = StorageContext.from_defaults() + + documents = SimpleDirectoryReader(input_files=["/tmp/foo.txt"]).load_data() + documents[0].id_ = "DOC_1" + splitter = SentenceSplitter( + chunk_size=15, + chunk_overlap=0, + id_func=lambda i, node: f"NODE_{i}", + ) + index = VectorStoreIndex.from_documents(documents, transformations=[splitter], storage_context=storage_context) + + index.set_index_id("vector_index") + index.storage_context.persist("./storage/reverse_engineer_structure") + + # Load the index from storage + storage_context = StorageContext.from_defaults(persist_dir="./storage/reverse_engineer_structure") + index = load_index_from_storage(storage_context, index_id="vector_index") + + +if __name__ == '__main__': + llm = OpenAI( + api_base="http://localhost:5000/v1", + api_key="sk-ollama", + model="gpt-4-turbo", + ) + embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5") + Settings.llm = llm + Settings.embed_model = embed_model + Settings.context_window = 8192 + main() +