From 5857f5cb12f1c09c4e487906acafbe1398703535 Mon Sep 17 00:00:00 2001 From: Sheharyar Ahmad Date: Thu, 1 Aug 2024 13:00:09 +0500 Subject: [PATCH] Added pgvectorscale client (#355) * pgvectorscale client added * added pgvectorscale dependencies to enable independent client installation * Bug fix vector type not found in the database. --- README.md | 1 + pyproject.toml | 1 + vectordb_bench/backend/clients/__init__.py | 13 + vectordb_bench/backend/clients/api.py | 1 + .../backend/clients/pgvectorscale/config.py | 111 +++++++ .../clients/pgvectorscale/pgvectorscale.py | 272 ++++++++++++++++++ .../components/run_test/caseSelector.py | 10 + .../frontend/config/dbCaseConfigs.py | 123 ++++++++ vectordb_bench/models.py | 8 + 9 files changed, 540 insertions(+) create mode 100644 vectordb_bench/backend/clients/pgvectorscale/config.py create mode 100644 vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py diff --git a/README.md b/README.md index 11aaed3b7..1ce4564ef 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ All the database client supported | elastic | `pip install vectordb-bench[elastic]` | | pgvector | `pip install vectordb-bench[pgvector]` | | pgvecto.rs | `pip install vectordb-bench[pgvecto_rs]` | +| pgvectorscale | `pip install vectordb-bench[pgvectorscale]` | | redis | `pip install vectordb-bench[redis]` | | memorydb | `pip install vectordb-bench[memorydb]` | | chromadb | `pip install vectordb-bench[chromadb]` | diff --git a/pyproject.toml b/pyproject.toml index de60cc349..4da9ce2cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ pinecone = [ "pinecone-client" ] weaviate = [ "weaviate-client" ] elastic = [ "elasticsearch" ] pgvector = [ "psycopg", "psycopg-binary", "pgvector" ] +pgvectorscale = [ "psycopg", "psycopg-binary", "pgvector" ] pgvecto_rs = [ "pgvecto_rs[psycopg3]>=0.2.1" ] redis = [ "redis" ] memorydb = [ "memorydb" ] diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index c638208d9..3e87e1fbe 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -30,6 +30,7 @@ class DB(Enum): WeaviateCloud = "WeaviateCloud" PgVector = "PgVector" PgVectoRS = "PgVectoRS" + PgVectorScale = "PgVectorScale" Redis = "Redis" MemoryDB = "MemoryDB" Chroma = "Chroma" @@ -71,6 +72,10 @@ def init_cls(self) -> Type[VectorDB]: if self == DB.PgVectoRS: from .pgvecto_rs.pgvecto_rs import PgVectoRS return PgVectoRS + + if self == DB.PgVectorScale: + from .pgvectorscale.pgvectorscale import PgVectorScale + return PgVectorScale if self == DB.Redis: from .redis.redis import Redis @@ -123,6 +128,10 @@ def config_cls(self) -> Type[DBConfig]: from .pgvecto_rs.config import PgVectoRSConfig return PgVectoRSConfig + if self == DB.PgVectorScale: + from .pgvectorscale.config import PgVectorScaleConfig + return PgVectorScaleConfig + if self == DB.Redis: from .redis.config import RedisConfig return RedisConfig @@ -172,6 +181,10 @@ def case_config_cls(self, index_type: IndexType | None = None) -> Type[DBCaseCon from .aws_opensearch.config import AWSOpenSearchIndexConfig return AWSOpenSearchIndexConfig + if self == DB.PgVectorScale: + from .pgvectorscale.config import _pgvectorscale_case_config + return _pgvectorscale_case_config.get(index_type) + # DB.Pinecone, DB.Chroma, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/api.py b/vectordb_bench/backend/clients/api.py index d9ec5d83b..faa36712d 100644 --- a/vectordb_bench/backend/clients/api.py +++ b/vectordb_bench/backend/clients/api.py @@ -15,6 +15,7 @@ class MetricType(str, Enum): class IndexType(str, Enum): HNSW = "HNSW" DISKANN = "DISKANN" + STREAMING_DISKANN = "DISKANN" IVFFlat = "IVF_FLAT" IVFSQ8 = "IVF_SQ8" Flat = "FLAT" diff --git a/vectordb_bench/backend/clients/pgvectorscale/config.py b/vectordb_bench/backend/clients/pgvectorscale/config.py new file mode 100644 index 000000000..bd9f6106b --- /dev/null +++ b/vectordb_bench/backend/clients/pgvectorscale/config.py @@ -0,0 +1,111 @@ +from abc import abstractmethod +from typing import TypedDict +from pydantic import BaseModel, SecretStr +from typing_extensions import LiteralString +from ..api import DBCaseConfig, DBConfig, IndexType, MetricType + +POSTGRE_URL_PLACEHOLDER = "postgresql://%s:%s@%s/%s" + + +class PgVectorScaleConfigDict(TypedDict): + """These keys will be directly used as kwargs in psycopg connection string, + so the names must match exactly psycopg API""" + + user: str + password: str + host: str + port: int + dbname: str + + +class PgVectorScaleConfig(DBConfig): + user_name: SecretStr = SecretStr("postgres") + password: SecretStr + host: str = "localhost" + port: int = 5432 + db_name: str + + def to_dict(self) -> PgVectorScaleConfigDict: + user_str = self.user_name.get_secret_value() + pwd_str = self.password.get_secret_value() + return { + "host": self.host, + "port": self.port, + "dbname": self.db_name, + "user": user_str, + "password": pwd_str, + } + + +class PgVectorScaleIndexConfig(BaseModel, DBCaseConfig): + metric_type: MetricType | None = None + create_index_before_load: bool = False + create_index_after_load: bool = True + + def parse_metric(self) -> str: + if self.metric_type == MetricType.COSINE: + return "vector_cosine_ops" + return "" + + def parse_metric_fun_op(self) -> LiteralString: + if self.metric_type == MetricType.COSINE: + return "<=>" + return "" + + def parse_metric_fun_str(self) -> str: + if self.metric_type == MetricType.COSINE: + return "cosine_distance" + return "" + + @abstractmethod + def index_param(self) -> dict: + ... + + @abstractmethod + def search_param(self) -> dict: + ... + + @abstractmethod + def session_param(self) -> dict: + ... + + +class PgVectorScaleStreamingDiskANNConfig(PgVectorScaleIndexConfig): + index: IndexType = IndexType.STREAMING_DISKANN + storage_layout: str | None + num_neighbors: int | None + search_list_size: int | None + max_alpha: float | None + num_dimensions: int | None + num_bits_per_dimension: int | None + query_search_list_size: int | None + query_rescore: int | None + + def index_param(self) -> dict: + return { + "metric": self.parse_metric(), + "index_type": self.index.value, + "options": { + "storage_layout": self.storage_layout, + "num_neighbors": self.num_neighbors, + "search_list_size": self.search_list_size, + "max_alpha": self.max_alpha, + "num_dimensions": self.num_dimensions, + }, + } + + def search_param(self) -> dict: + return { + "metric": self.parse_metric(), + "metric_fun_op": self.parse_metric_fun_op(), + } + + def session_param(self) -> dict: + return { + "diskann.query_search_list_size": self.query_search_list_size, + "diskann.query_rescore": self.query_rescore, + } + +_pgvectorscale_case_config = { + IndexType.STREAMING_DISKANN: PgVectorScaleStreamingDiskANNConfig, +} diff --git a/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py b/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py new file mode 100644 index 000000000..7c8c314c2 --- /dev/null +++ b/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py @@ -0,0 +1,272 @@ +"""Wrapper around the Pgvectorscale vector database over VectorDB""" + +import logging +import pprint +from contextlib import contextmanager +from typing import Any, Generator, Optional, Tuple + +import numpy as np +import psycopg +from pgvector.psycopg import register_vector +from psycopg import Connection, Cursor, sql + +from ..api import VectorDB +from .config import PgVectorScaleConfigDict, PgVectorScaleIndexConfig + +log = logging.getLogger(__name__) + + +class PgVectorScale(VectorDB): + """Use psycopg instructions""" + + conn: psycopg.Connection[Any] | None = None + coursor: psycopg.Cursor[Any] | None = None + + def __init__( + self, + dim: int, + db_config: PgVectorScaleConfigDict, + db_case_config: PgVectorScaleIndexConfig, + collection_name: str = "pg_vectorscale_collection", + drop_old: bool = False, + **kwargs, + ): + self.name = "PgVectorScale" + self.db_config = db_config + self.case_config = db_case_config + self.table_name = collection_name + self.dim = dim + + self._index_name = "pgvectorscale_index" + self._primary_field = "id" + self._vector_field = "embedding" + + self.conn, self.cursor = self._create_connection(**self.db_config) + + log.info(f"{self.name} config values: {self.db_config}\n{self.case_config}") + if not any( + ( + self.case_config.create_index_before_load, + self.case_config.create_index_after_load, + ) + ): + err = f"{self.name} config must create an index using create_index_before_load or create_index_after_load" + log.error(err) + raise RuntimeError( + f"{err}\n{pprint.pformat(self.db_config)}\n{pprint.pformat(self.case_config)}" + ) + + if drop_old: + self._drop_index() + self._drop_table() + self._create_table(dim) + if self.case_config.create_index_before_load: + self._create_index() + + self.cursor.close() + self.conn.close() + self.cursor = None + self.conn = None + + @staticmethod + def _create_connection(**kwargs) -> Tuple[Connection, Cursor]: + conn = psycopg.connect(**kwargs) + conn.cursor().execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE") + conn.commit() + register_vector(conn) + conn.autocommit = False + cursor = conn.cursor() + + assert conn is not None, "Connection is not initialized" + assert cursor is not None, "Cursor is not initialized" + + return conn, cursor + + @contextmanager + def init(self) -> Generator[None, None, None]: + self.conn, self.cursor = self._create_connection(**self.db_config) + + # index configuration may have commands defined that we should set during each client session + session_options: dict[str, Any] = self.case_config.session_param() + + if len(session_options) > 0: + for setting_name, setting_val in session_options.items(): + command = sql.SQL("SET {setting_name} " + "= {setting_val};").format( + setting_name=sql.Identifier(setting_name), + setting_val=sql.Identifier(str(setting_val)), + ) + log.debug(command.as_string(self.cursor)) + self.cursor.execute(command) + self.conn.commit() + + self._unfiltered_search = sql.Composed( + [ + sql.SQL("SELECT id FROM public.{} ORDER BY embedding ").format( + sql.Identifier(self.table_name) + ), + sql.SQL(self.case_config.search_param()["metric_fun_op"]), + sql.SQL(" %s::vector LIMIT %s::int"), + ] + ) + + try: + yield + finally: + self.cursor.close() + self.conn.close() + self.cursor = None + self.conn = None + + def _drop_table(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop table : {self.table_name}") + + self.cursor.execute( + sql.SQL("DROP TABLE IF EXISTS public.{table_name}").format( + table_name=sql.Identifier(self.table_name) + ) + ) + self.conn.commit() + + def ready_to_load(self): + pass + + def optimize(self): + self._post_insert() + + def _post_insert(self): + log.info(f"{self.name} post insert before optimize") + if self.case_config.create_index_after_load: + self._drop_index() + self._create_index() + + def _drop_index(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop index : {self._index_name}") + + drop_index_sql = sql.SQL("DROP INDEX IF EXISTS {index_name}").format( + index_name=sql.Identifier(self._index_name) + ) + log.debug(drop_index_sql.as_string(self.cursor)) + self.cursor.execute(drop_index_sql) + self.conn.commit() + + def _create_index(self): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client create index : {self._index_name}") + + index_param: dict[str, Any] = self.case_config.index_param() + + options = [] + for option_name, option_val in index_param["options"].items(): + if option_val is not None: + options.append( + sql.SQL("{option_name} = {val}").format( + option_name=sql.Identifier(option_name), + val=sql.Identifier(str(option_val)), + ) + ) + + num_bits_per_dimension = "2" if self.dim < 900 else "1" + options.append( + sql.SQL("{option_name} = {val}").format( + option_name=sql.Identifier("num_bits_per_dimension"), + val=sql.Identifier(num_bits_per_dimension), + ) + ) + + if any(options): + with_clause = sql.SQL("WITH ({});").format(sql.SQL(", ").join(options)) + else: + with_clause = sql.Composed(()) + + index_create_sql = sql.SQL( + """ + CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} + USING {index_type} (embedding {embedding_metric}) + """ + ).format( + index_name=sql.Identifier(self._index_name), + table_name=sql.Identifier(self.table_name), + index_type=sql.Identifier(index_param["index_type"].lower()), + embedding_metric=sql.Identifier(index_param["metric"]), + ) + index_create_sql_with_with_clause = ( + index_create_sql + with_clause + ).join(" ") + log.debug(index_create_sql_with_with_clause.as_string(self.cursor)) + self.cursor.execute(index_create_sql_with_with_clause) + self.conn.commit() + + def _create_table(self, dim: int): + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + try: + log.info(f"{self.name} client create table : {self.table_name}") + + self.cursor.execute( + sql.SQL( + "CREATE TABLE IF NOT EXISTS public.{table_name} (id BIGINT PRIMARY KEY, embedding vector({dim}));" + ).format(table_name=sql.Identifier(self.table_name), dim=dim) + ) + self.conn.commit() + except Exception as e: + log.warning( + f"Failed to create pgvectorscale table: {self.table_name} error: {e}" + ) + raise e from None + + def insert_embeddings( + self, + embeddings: list[list[float]], + metadata: list[int], + **kwargs: Any, + ) -> Tuple[int, Optional[Exception]]: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + try: + metadata_arr = np.array(metadata) + embeddings_arr = np.array(embeddings) + + with self.cursor.copy( + sql.SQL("COPY public.{table_name} FROM STDIN (FORMAT BINARY)").format( + table_name=sql.Identifier(self.table_name) + ) + ) as copy: + copy.set_types(["bigint", "vector"]) + for i, row in enumerate(metadata_arr): + copy.write_row((row, embeddings_arr[i])) + self.conn.commit() + + if kwargs.get("last_batch"): + self._post_insert() + + return len(metadata), None + except Exception as e: + log.warning( + f"Failed to insert data into pgvector table ({self.table_name}), error: {e}" + ) + return 0, e + + def search_embedding( + self, + query: list[float], + k: int = 100, + filters: dict | None = None, + timeout: int | None = None, + ) -> list[int]: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + q = np.asarray(query) + # TODO add filters support + result = self.cursor.execute( + self._unfiltered_search, (q, k), prepare=True, binary=True + ) + + return [int(i[0]) for i in result.fetchall()] diff --git a/vectordb_bench/frontend/components/run_test/caseSelector.py b/vectordb_bench/frontend/components/run_test/caseSelector.py index 58799deff..5597bbc61 100644 --- a/vectordb_bench/frontend/components/run_test/caseSelector.py +++ b/vectordb_bench/frontend/components/run_test/caseSelector.py @@ -100,6 +100,16 @@ def caseConfigSetting(st, dbToCaseClusterConfigs, uiCaseItem: UICaseItem, active value=config.inputConfig["value"], help=config.inputHelp, ) + elif config.inputType == InputType.Float: + caseConfig[config.label] = column.number_input( + config.displayLabel if config.displayLabel else config.label.value, + step=config.inputConfig.get("step", 0.1), + min_value=config.inputConfig["min"], + max_value=config.inputConfig["max"], + key=key, + value=config.inputConfig["value"], + help=config.inputHelp, + ) k += 1 if k == 0: columns[1].write("Auto") diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 687f1efbf..13634eca5 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -148,6 +148,7 @@ class InputType(IntEnum): Text = 20001 Number = 20002 Option = 20003 + Float = 20004 class CaseConfigInput(BaseModel): @@ -169,6 +170,7 @@ class CaseConfigInput(BaseModel): IndexType.IVFFlat.value, IndexType.IVFSQ8.value, IndexType.DISKANN.value, + IndexType.STREAMING_DISKANN.value, IndexType.Flat.value, IndexType.AUTOINDEX.value, IndexType.GPU_IVF_FLAT.value, @@ -178,6 +180,104 @@ class CaseConfigInput(BaseModel): }, ) + +CaseConfigParamInput_IndexType_PgVectorScale = CaseConfigInput( + label=CaseConfigParamType.IndexType, + inputHelp="Select Index Type", + inputType=InputType.Option, + inputConfig={ + "options": [ + IndexType.STREAMING_DISKANN.value, + ], + }, +) + + +CaseConfigParamInput_storage_layout = CaseConfigInput( + label=CaseConfigParamType.storage_layout, + inputHelp="Select Storage Layout", + inputType=InputType.Option, + inputConfig={ + "options": [ + "memory_optimized", + "plain", + ], + }, +) + +CaseConfigParamInput_num_neighbors = CaseConfigInput( + label=CaseConfigParamType.num_neighbors, + inputType=InputType.Number, + inputConfig={ + "min": 10, + "max": 300, + "value": 50, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.STREAMING_DISKANN.value, +) + +CaseConfigParamInput_search_list_size = CaseConfigInput( + label=CaseConfigParamType.search_list_size, + inputType=InputType.Number, + inputConfig={ + "min": 10, + "max": 300, + "value": 100, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.STREAMING_DISKANN.value, +) + +CaseConfigParamInput_max_alpha = CaseConfigInput( + label=CaseConfigParamType.max_alpha, + inputType=InputType.Float, + inputConfig={ + "min": 0.1, + "max": 2.0, + "value": 1.2, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.STREAMING_DISKANN.value, +) + +CaseConfigParamInput_num_dimensions = CaseConfigInput( + label=CaseConfigParamType.num_dimensions, + inputType=InputType.Number, + inputConfig={ + "min": 0, + "max": 2000, + "value": 0, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.STREAMING_DISKANN.value, +) + +CaseConfigParamInput_query_search_list_size = CaseConfigInput( + label=CaseConfigParamType.query_search_list_size, + inputType=InputType.Number, + inputConfig={ + "min": 50, + "max": 150, + "value": 100, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.STREAMING_DISKANN.value, +) + + +CaseConfigParamInput_query_rescore = CaseConfigInput( + label=CaseConfigParamType.query_rescore, + inputType=InputType.Number, + inputConfig={ + "min": 0, + "max": 150, + "value": 50, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.STREAMING_DISKANN.value, +) + CaseConfigParamInput_IndexType_PgVector = CaseConfigInput( label=CaseConfigParamType.IndexType, inputHelp="Select Index Type", @@ -427,6 +527,7 @@ class CaseConfigInput(BaseModel): in [IndexType.GPU_IVF_PQ.value], ) + CaseConfigParamInput_Nbits_PQ = CaseConfigInput( label=CaseConfigParamType.nbits, inputType=InputType.Number, @@ -770,6 +871,24 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_ZillizLevel, ] +PgVectorScaleLoadingConfig = [ + CaseConfigParamInput_IndexType_PgVectorScale, + CaseConfigParamInput_num_neighbors, + CaseConfigParamInput_storage_layout, + CaseConfigParamInput_search_list_size, + CaseConfigParamInput_max_alpha, +] + +PgVectorScalePerformanceConfig = [ + CaseConfigParamInput_IndexType_PgVectorScale, + CaseConfigParamInput_num_neighbors, + CaseConfigParamInput_storage_layout, + CaseConfigParamInput_search_list_size, + CaseConfigParamInput_max_alpha, + CaseConfigParamInput_query_rescore, + CaseConfigParamInput_query_search_list_size, +] + CASE_CONFIG_MAP = { DB.Milvus: { CaseLabel.Load: MilvusLoadConfig, @@ -794,4 +913,8 @@ class CaseConfigInput(BaseModel): CaseLabel.Load: PgVectoRSLoadingConfig, CaseLabel.Performance: PgVectoRSPerformanceConfig, }, + DB.PgVectorScale: { + CaseLabel.Load: PgVectorScaleLoadingConfig, + CaseLabel.Performance: PgVectorScalePerformanceConfig, + }, } diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index bc99e8595..75b427091 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -62,6 +62,14 @@ class CaseConfigParamType(Enum): level = "level" maintenance_work_mem = "maintenance_work_mem" max_parallel_workers = "max_parallel_workers" + storage_layout = "storage_layout" + num_neighbors = "num_neighbors" + search_list_size = "search_list_size" + max_alpha = "max_alpha" + num_dimensions = "num_dimensions" + num_bits_per_dimension = "num_bits_per_dimension" + query_search_list_size = "query_search_list_size" + query_rescore = "query_rescore" class CustomizedCase(BaseModel):