From c45876cef17dee92b3f4d1711b4130aa6ecabe39 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Mon, 1 Jul 2024 14:29:59 +0800 Subject: [PATCH] Add client aws_opensearch Signed-off-by: Cai Yudong --- README.md | 25 +-- pyproject.toml | 3 + vectordb_bench/backend/clients/__init__.py | 13 ++ .../clients/aws_opensearch/aws_opensearch.py | 159 ++++++++++++++++++ .../backend/clients/aws_opensearch/cli.py | 44 +++++ .../backend/clients/aws_opensearch/config.py | 58 +++++++ .../backend/clients/aws_opensearch/run.py | 125 ++++++++++++++ vectordb_bench/cli/vectordbbench.py | 2 + vectordb_bench/frontend/config/styles.py | 2 + 9 files changed, 419 insertions(+), 12 deletions(-) create mode 100644 vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py create mode 100644 vectordb_bench/backend/clients/aws_opensearch/cli.py create mode 100644 vectordb_bench/backend/clients/aws_opensearch/config.py create mode 100644 vectordb_bench/backend/clients/aws_opensearch/run.py diff --git a/README.md b/README.md index 120a2431b..566b6deb0 100644 --- a/README.md +++ b/README.md @@ -27,18 +27,19 @@ pip install vectordb-bench[pinecone] ``` All the database client supported -|Optional database client|install command| -|---------------|---------------| -|pymilvus(*default*)|`pip install vectordb-bench`| -|all|`pip install vectordb-bench[all]`| -|qdrant|`pip install vectordb-bench[qdrant]`| -|pinecone|`pip install vectordb-bench[pinecone]`| -|weaviate|`pip install vectordb-bench[weaviate]`| -|elastic|`pip install vectordb-bench[elastic]`| -|pgvector|`pip install vectordb-bench[pgvector]`| -|pgvecto.rs|`pip install vectordb-bench[pgvecto_rs]`| -|redis|`pip install vectordb-bench[redis]`| -|chromadb|`pip install vectordb-bench[chromadb]`| +| Optional database client | install command | +|--------------------------|---------------------------------------------| +| pymilvus(*default*) | `pip install vectordb-bench` | +| all | `pip install vectordb-bench[all]` | +| qdrant | `pip install vectordb-bench[qdrant]` | +| pinecone | `pip install vectordb-bench[pinecone]` | +| weaviate | `pip install vectordb-bench[weaviate]` | +| elastic | `pip install vectordb-bench[elastic]` | +| pgvector | `pip install vectordb-bench[pgvector]` | +| pgvecto.rs | `pip install vectordb-bench[pgvecto_rs]` | +| redis | `pip install vectordb-bench[redis]` | +| chromadb | `pip install vectordb-bench[chromadb]` | +| awsopensearch | `pip install vectordb-bench[awsopensearch]` | ### Run diff --git a/pyproject.toml b/pyproject.toml index edc3575c5..2a812179c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,8 @@ all = [ "psycopg2", "psycopg", "psycopg-binary", + "opensearch-dsl==2.1.0", + "opensearch-py==2.6.0", ] qdrant = [ "qdrant-client" ] @@ -72,6 +74,7 @@ pgvector = [ "psycopg", "psycopg-binary", "pgvector" ] pgvecto_rs = [ "psycopg2" ] redis = [ "redis" ] chromadb = [ "chromadb" ] +awsopensearch = [ "awsopensearch" ] zilliz_cloud = [] [project.urls] diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index 6a99661c2..2b45b71b9 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -32,6 +32,7 @@ class DB(Enum): PgVectoRS = "PgVectoRS" Redis = "Redis" Chroma = "Chroma" + AWSOpenSearch = "OpenSearch" Test = "test" @@ -78,6 +79,10 @@ def init_cls(self) -> Type[VectorDB]: from .chroma.chroma import ChromaClient return ChromaClient + if self == DB.AWSOpenSearch: + from .aws_opensearch.aws_opensearch import AWSOpenSearch + return AWSOpenSearch + @property def config_cls(self) -> Type[DBConfig]: """Import while in use""" @@ -121,6 +126,10 @@ def config_cls(self) -> Type[DBConfig]: from .chroma.config import ChromaConfig return ChromaConfig + if self == DB.AWSOpenSearch: + from .aws_opensearch.config import AWSOpenSearchConfig + return AWSOpenSearchConfig + def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseConfig]: if self == DB.Milvus: from .milvus.config import _milvus_case_config @@ -150,6 +159,10 @@ def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseCon from .pgvecto_rs.config import _pgvecto_rs_case_config return _pgvecto_rs_case_config.get(index_type) + if self == DB.AWSOpenSearch: + from .aws_opensearch.config import AWSOpenSearchIndexConfig + return AWSOpenSearchIndexConfig + # DB.Pinecone, DB.Chroma, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py new file mode 100644 index 000000000..5b0728ac8 --- /dev/null +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -0,0 +1,159 @@ +import logging +from contextlib import contextmanager +import time +from typing import Iterable, Type +from ..api import VectorDB, DBCaseConfig, DBConfig, IndexType +from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig +from opensearchpy import OpenSearch +from opensearchpy.helpers import bulk + +log = logging.getLogger(__name__) + + +class AWSOpenSearch(VectorDB): + def __init__( + self, + dim: int, + db_config: dict, + db_case_config: AWSOpenSearchIndexConfig, + index_name: str = "vdb_bench_index", # must be lowercase + id_col_name: str = "id", + vector_col_name: str = "embedding", + drop_old: bool = False, + **kwargs, + ): + self.dim = dim + self.db_config = db_config + self.case_config = db_case_config + self.index_name = index_name + self.id_col_name = id_col_name + self.category_col_names = [ + f"scalar-{categoryCount}" for categoryCount in [2, 5, 10, 100, 1000] + ] + self.vector_col_name = vector_col_name + + log.info(f"AWS_OpenSearch client config: {self.db_config}") + client = OpenSearch(**self.db_config) + if drop_old: + log.info(f"AWS_OpenSearch client drop old index: {self.index_name}") + is_existed = client.indices.exists(index=self.index_name) + if is_existed: + client.indices.delete(index=self.index_name) + self._create_index(client) + + @classmethod + def config_cls(cls) -> AWSOpenSearchConfig: + return AWSOpenSearchConfig + + @classmethod + def case_config_cls( + cls, index_type: IndexType | None = None + ) -> AWSOpenSearchIndexConfig: + return AWSOpenSearchIndexConfig + + def _create_index(self, client: OpenSearch): + settings = { + "index": { + "knn": True, + # "number_of_shards": 5, + # "refresh_interval": "600s", + } + } + mappings = { + "properties": { + self.id_col_name: {"type": "integer"}, + **{ + categoryCol: {"type": "keyword"} + for categoryCol in self.category_col_names + }, + self.vector_col_name: { + "type": "knn_vector", + "dimension": self.dim, + "method": self.case_config.index_param(), + }, + } + } + try: + client.indices.create( + index=self.index_name, body=dict(settings=settings, mappings=mappings) + ) + except Exception as e: + log.warning(f"Failed to create index: {self.index_name} error: {str(e)}") + raise e from None + + @contextmanager + def init(self) -> None: + """connect to elasticsearch""" + self.client = OpenSearch(**self.db_config) + + yield + # self.client.transport.close() + self.client = None + del self.client + + def insert_embeddings( + self, + embeddings: Iterable[list[float]], + metadata: list[int], + **kwargs, + ) -> tuple[int, Exception]: + """Insert the embeddings to the elasticsearch.""" + assert self.client is not None, "should self.init() first" + + insert_data = [] + for i in range(len(embeddings)): + insert_data.append({"index": {"_index": self.index_name, "_id": metadata[i]}}) + insert_data.append({self.vector_col_name: embeddings[i]}) + try: + resp = self.client.bulk(insert_data) + log.info(f"AWS_OpenSearch adding documents: {len(resp['items'])}") + resp = self.client.indices.stats(self.index_name) + log.info(f"Total document count in index: {resp['_all']['primaries']['indexing']['index_total']}") + return (len(embeddings), None) + except Exception as e: + log.warning(f"Failed to insert data: {self.index_name} error: {str(e)}") + time.sleep(10) + return self.insert_embeddings(embeddings, metadata) + + def search_embedding( + self, + query: list[float], + k: int = 100, + filters: dict | None = None, + ) -> list[int]: + """Get k most similar embeddings to query vector. + + Args: + query(list[float]): query embedding to look up documents similar to. + k(int): Number of most similar embeddings to return. Defaults to 100. + filters(dict, optional): filtering expression to filter the data while searching. + + Returns: + list[tuple[int, float]]: list of k most similar embeddings in (id, score) tuple to the query embedding. + """ + assert self.client is not None, "should self.init() first" + + body = { + "size": k, + "query": {"knn": {self.vector_col_name: {"vector": query, "k": k}}}, + } + try: + resp = self.client.search(index=self.index_name, body=body) + log.info(f'Search took: {resp["took"]}') + log.info(f'Search shards: {resp["_shards"]}') + log.info(f'Search hits total: {resp["hits"]["total"]}') + result = [int(d["_id"]) for d in resp["hits"]["hits"]] + # log.info(f'success! length={len(res)}') + + return result + except Exception as e: + log.warning(f"Failed to search: {self.index_name} error: {str(e)}") + raise e from None + + def optimize(self): + """optimize will be called between insertion and search in performance cases.""" + pass + + def ready_to_load(self): + """ready_to_load will be called before load in load cases.""" + pass diff --git a/vectordb_bench/backend/clients/aws_opensearch/cli.py b/vectordb_bench/backend/clients/aws_opensearch/cli.py new file mode 100644 index 000000000..5cb4ebbe1 --- /dev/null +++ b/vectordb_bench/backend/clients/aws_opensearch/cli.py @@ -0,0 +1,44 @@ +from typing import Annotated, TypedDict, Unpack + +import click +from pydantic import SecretStr + +from ....cli.cli import ( + CommonTypedDict, + HNSWFlavor2, + cli, + click_parameter_decorators_from_typed_dict, + run, +) +from .. import DB + + +class AWSOpenSearchTypedDict(TypedDict): + host: Annotated[ + str, click.option("--host", type=str, help="Db host", required=True) + ] + port: Annotated[int, click.option("--port", type=int, default=443, help="Db Port")] + user: Annotated[str, click.option("--user", type=str, default="admin", help="Db User")] + password: Annotated[str, click.option("--password", type=str, help="Db password")] + + +class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor2): + ... + + +@cli.command() +@click_parameter_decorators_from_typed_dict(AWSOpenSearchHNSWTypedDict) +def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]): + from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig + run( + db=DB.AWSOpenSearch, + db_config=AWSOpenSearchConfig( + host=parameters["host"], + port=parameters["port"], + user=parameters["user"], + password=SecretStr(parameters["password"]), + ), + db_case_config=AWSOpenSearchIndexConfig( + ), + **parameters, + ) diff --git a/vectordb_bench/backend/clients/aws_opensearch/config.py b/vectordb_bench/backend/clients/aws_opensearch/config.py new file mode 100644 index 000000000..bc82380b7 --- /dev/null +++ b/vectordb_bench/backend/clients/aws_opensearch/config.py @@ -0,0 +1,58 @@ +from enum import Enum +from pydantic import SecretStr, BaseModel + +from ..api import DBConfig, DBCaseConfig, MetricType, IndexType + + +class AWSOpenSearchConfig(DBConfig, BaseModel): + host: str = "" + port: int = 443 + user: str = "" + password: SecretStr = "" + + def to_dict(self) -> dict: + return { + "hosts": [{'host': self.host, 'port': self.port}], + "http_auth": (self.user, self.password.get_secret_value()), + "use_ssl": True, + "http_compress": True, + "verify_certs": True, + "ssl_assert_hostname": False, + "ssl_show_warn": False, + "timeout": 600, + } + + +class AWSOS_Engine(Enum): + nmslib = "nmslib" + faiss = "faiss" + lucene = "Lucene" + + +class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig): + metric_type: MetricType = MetricType.L2 + engine: AWSOS_Engine = AWSOS_Engine.nmslib + efConstruction: int = 360 + M: int = 30 + + def parse_metric(self) -> str: + if self.metric_type == MetricType.IP: + return "innerproduct" # only support faiss / nmslib, not for Lucene. + elif self.metric_type == MetricType.COSINE: + return "cosinesimil" + return "l2" + + def index_param(self) -> dict: + params = { + "name": "hnsw", + "space_type": self.parse_metric(), + "engine": self.engine.value, + "parameters": { + "ef_construction": self.efConstruction, + "m": self.M + } + } + return params + + def search_param(self) -> dict: + return {} diff --git a/vectordb_bench/backend/clients/aws_opensearch/run.py b/vectordb_bench/backend/clients/aws_opensearch/run.py new file mode 100644 index 000000000..3924cbd75 --- /dev/null +++ b/vectordb_bench/backend/clients/aws_opensearch/run.py @@ -0,0 +1,125 @@ +import time, random +from opensearchpy import OpenSearch +from opensearch_dsl import Search, Document, Text, Keyword + +_HOST = 'xxxxxx.us-west-2.es.amazonaws.com' +_PORT = 443 +_AUTH = ('admin', 'xxxxxx') # For testing only. Don't store credentials in code. + +_INDEX_NAME = 'my-dsl-index' +_BATCH = 100 +_ROWS = 100 +_DIM = 128 +_TOPK = 10 + + +def create_client(): + client = OpenSearch( + hosts=[{'host': _HOST, 'port': _PORT}], + http_compress=True, # enables gzip compression for request bodies + http_auth=_AUTH, + use_ssl=True, + verify_certs=True, + ssl_assert_hostname=False, + ssl_show_warn=False, + ) + return client + + +def create_index(client, index_name): + settings = { + "index": { + "knn": True, + "number_of_shards": 1, + "refresh_interval": "5s", + } + } + mappings = { + "properties": { + "embedding": { + "type": "knn_vector", + "dimension": _DIM, + "method": { + "engine": "nmslib", + "name": "hnsw", + "space_type": "l2", + "parameters": { + "ef_construction": 128, + "m": 24, + } + } + } + } + } + + response = client.indices.create(index=index_name, body=dict(settings=settings, mappings=mappings)) + print('\nCreating index:') + print(response) + + +def delete_index(client, index_name): + response = client.indices.delete(index=index_name) + print('\nDeleting index:') + print(response) + + +def bulk_insert(client, index_name): + # Perform bulk operations + ids = [i for i in range(_ROWS)] + vec = [[random.random() for _ in range(_DIM)] for _ in range(_ROWS)] + + docs = [] + for i in range(0, _ROWS, _BATCH): + docs.clear() + for j in range(0, _BATCH): + docs.append({"index": {"_index": index_name, "_id": ids[i+j]}}) + docs.append({"embedding": vec[i+j]}) + response = client.bulk(docs) + print('\nAdding documents:', len(response['items']), response['errors']) + response = client.indices.stats(index_name) + print('\nTotal document count in index:', response['_all']['primaries']['indexing']['index_total']) + + +def search(client, index_name): + # Search for the document. + search_body = { + "size": _TOPK, + "query": { + "knn": { + "embedding": { + "vector": [random.random() for _ in range(_DIM)], + "k": _TOPK, + } + } + } + } + while True: + response = client.search(index=index_name, body=search_body) + print(f'\nSearch took: {response["took"]}') + print(f'\nSearch shards: {response["_shards"]}') + print(f'\nSearch hits total: {response["hits"]["total"]}') + result = response["hits"]["hits"] + if len(result) != 0: + print('\nSearch results:') + for hit in response["hits"]["hits"]: + print(hit["_id"], hit["_score"]) + break + else: + print('\nSearch not ready, sleep 1s') + time.sleep(1) + + +def main(): + client = create_client() + try: + create_index(client, _INDEX_NAME) + bulk_insert(client, _INDEX_NAME) + search(client, _INDEX_NAME) + delete_index(client, _INDEX_NAME) + except Exception as e: + print(e) + delete_index(client, _INDEX_NAME) + + +if __name__ == '__main__': + main() diff --git a/vectordb_bench/cli/vectordbbench.py b/vectordb_bench/cli/vectordbbench.py index 396909cd5..0b619bbec 100644 --- a/vectordb_bench/cli/vectordbbench.py +++ b/vectordb_bench/cli/vectordbbench.py @@ -4,6 +4,7 @@ from ..backend.clients.weaviate_cloud.cli import Weaviate from ..backend.clients.zilliz_cloud.cli import ZillizAutoIndex from ..backend.clients.milvus.cli import MilvusAutoIndex +from ..backend.clients.aws_opensearch.cli import AWSOpenSearch from .cli import cli @@ -14,6 +15,7 @@ cli.add_command(Test) cli.add_command(ZillizAutoIndex) cli.add_command(MilvusAutoIndex) +cli.add_command(AWSOpenSearch) if __name__ == "__main__": diff --git a/vectordb_bench/frontend/config/styles.py b/vectordb_bench/frontend/config/styles.py index 52d1017a9..3e0fdb112 100644 --- a/vectordb_bench/frontend/config/styles.py +++ b/vectordb_bench/frontend/config/styles.py @@ -46,6 +46,7 @@ def getPatternShape(i): DB.PgVectoRS: "https://assets.zilliz.com/PG_Vector_d464f2ef5f.png", DB.Redis: "https://assets.zilliz.com/Redis_Cloud_74b8bfef39.png", DB.Chroma: "https://assets.zilliz.com/chroma_ceb3f06ed7.png", + DB.AWSOpenSearch: "https://assets.zilliz.com/opensearch_1eee37584e.jpeg", } # RedisCloud color: #0D6EFD @@ -59,4 +60,5 @@ def getPatternShape(i): DB.WeaviateCloud.value: "#20C997", DB.PgVector.value: "#4C779A", DB.Redis.value: "#0D6EFD", + DB.AWSOpenSearch.value: "#0DCAF0", }