diff --git a/vectordb_bench/backend/clients/api.py b/vectordb_bench/backend/clients/api.py index 17a29c6f7..49ec18df1 100644 --- a/vectordb_bench/backend/clients/api.py +++ b/vectordb_bench/backend/clients/api.py @@ -131,7 +131,7 @@ def insert_embeddings( embeddings: list[list[float]], metadata: list[int], kwargs: Any, - ) -> int: + ) -> (int, Exception): """Insert the embeddings to the vector database. The default number of embeddings for each insert_embeddings is 5000. diff --git a/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py b/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py index 666f8ebe8..1f88233e8 100644 --- a/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py +++ b/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py @@ -83,7 +83,7 @@ def insert_embeddings( self, embeddings: Iterable[list[float]], metadata: list[int], - ) -> int: + ) -> (int, Exception): """Insert the embeddings to the elasticsearch.""" assert self.client is not None, "should self.init() first" @@ -99,10 +99,10 @@ def insert_embeddings( ] try: bulk_insert_res = bulk(self.client, insert_data) - return bulk_insert_res[0] + return (bulk_insert_res[0], None) except Exception as e: log.warning(f"Failed to insert data: {self.indice} error: {str(e)}") - raise e from None + return (0, e) def search_embedding( self, diff --git a/vectordb_bench/backend/clients/milvus/milvus.py b/vectordb_bench/backend/clients/milvus/milvus.py index e21f5f9fc..fcb5b324e 100644 --- a/vectordb_bench/backend/clients/milvus/milvus.py +++ b/vectordb_bench/backend/clients/milvus/milvus.py @@ -13,6 +13,7 @@ log = logging.getLogger(__name__) +MILVUS_LOAD_REQS_SIZE = 1.5 * 1024 *1024 class Milvus(VectorDB): def __init__( @@ -29,6 +30,7 @@ def __init__( self.db_config = db_config self.case_config = db_case_config self.collection_name = collection_name + self.batch_size = int(MILVUS_LOAD_REQS_SIZE / (dim *4)) self._primary_field = "pk" self._scalar_field = "id" @@ -139,22 +141,26 @@ def insert_embeddings( embeddings: Iterable[list[float]], metadata: list[int], **kwargs: Any, - ) -> int: + ) -> (int, Exception): """Insert embeddings into Milvus. should call self.init() first""" # use the first insert_embeddings to init collection assert self.col is not None - insert_data = [ - metadata, - metadata, - embeddings, - ] - + assert len(embeddings) == len(metadata) + insert_count = 0 try: - res = self.col.insert(insert_data, **kwargs) - return len(res.primary_keys) + for batch_start_offset in range(0, len(embeddings), self.batch_size): + batch_end_offset = min(batch_start_offset + self.batch_size, len(embeddings)) + insert_data = [ + metadata[batch_start_offset : batch_end_offset], + metadata[batch_start_offset : batch_end_offset], + embeddings[batch_start_offset : batch_end_offset], + ] + res = self.col.insert(insert_data, **kwargs) + insert_count += len(res.primary_keys) except MilvusException as e: log.warning("Failed to insert data") - raise e from None + return (insert_count, e) + return (insert_count, None) def search_embedding( self, diff --git a/vectordb_bench/backend/clients/pinecone/pinecone.py b/vectordb_bench/backend/clients/pinecone/pinecone.py index d487d4827..9cafd0466 100644 --- a/vectordb_bench/backend/clients/pinecone/pinecone.py +++ b/vectordb_bench/backend/clients/pinecone/pinecone.py @@ -76,17 +76,22 @@ def insert_embeddings( self, embeddings: list[list[float]], metadata: list[int], - ) -> list[str]: + ) -> (int, Exception): assert len(embeddings) == len(metadata) - for batch_start_offset in range(0, len(embeddings), self.batch_size): - batch_end_offset = min(batch_start_offset + self.batch_size, len(embeddings)) - insert_datas = [] - for i in range(batch_start_offset, batch_end_offset): - insert_data = (str(metadata[i]), embeddings[i], { - self._metadata_key: metadata[i]}) - insert_datas.append(insert_data) - self.index.upsert(insert_datas) - return len(embeddings) + insert_count = 0 + try: + for batch_start_offset in range(0, len(embeddings), self.batch_size): + batch_end_offset = min(batch_start_offset + self.batch_size, len(embeddings)) + insert_datas = [] + for i in range(batch_start_offset, batch_end_offset): + insert_data = (str(metadata[i]), embeddings[i], { + self._metadata_key: metadata[i]}) + insert_datas.append(insert_data) + self.index.upsert(insert_datas) + insert_count += batch_end_offset - batch_start_offset + except Exception as e: + return (insert_count, e) + return (len(embeddings), None) def search_embedding( self, diff --git a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py index db256eba0..6dcbc8f03 100644 --- a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py +++ b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py @@ -117,7 +117,7 @@ def insert_embeddings( embeddings: list[list[float]], metadata: list[int], **kwargs: Any, - ) -> list[str]: + ) -> (int, Exception): """Insert embeddings into Milvus. should call self.init() first""" assert self.qdrant_client is not None try: @@ -127,11 +127,10 @@ def insert_embeddings( wait=True, points=Batch(ids=metadata, payloads=[{self._primary_field: v} for v in metadata], vectors=embeddings) ) - - return len(metadata) + return (len(metadata), None) except Exception as e: log.info(f"Failed to insert data, {e}") - raise e from None + return (0, e) def search_embedding( self, diff --git a/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py b/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py index b03063eaf..5852dfeb6 100644 --- a/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py +++ b/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py @@ -99,12 +99,12 @@ def insert_embeddings( embeddings: Iterable[list[float]], metadata: list[int], **kwargs: Any, - ) -> int: + ) -> (int, Exception): """Insert embeddings into Weaviate""" assert self.client.schema.exists(self.collection_name) - + insert_count = 0 try: - with self.client.batch as batch: + with self.client.batch as batch: batch.batch_size = len(metadata) batch.dynamic = True res = [] @@ -114,10 +114,11 @@ def insert_embeddings( class_name=self.collection_name, vector=embeddings[i] )) - return len(res) + insert_count += 1 + return (len(res), None) except WeaviateBaseError as e: log.warning(f"Failed to insert data, error: {str(e)}") - raise e from None + return (insert_count, e) def search_embedding( self, diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py index 149585610..03c879b8f 100644 --- a/vectordb_bench/backend/runner/serial_runner.py +++ b/vectordb_bench/backend/runner/serial_runner.py @@ -15,6 +15,8 @@ NUM_PER_BATCH = config.NUM_PER_BATCH LOAD_TIMEOUT = 24 * 60 * 60 +LOAD_MAX_TRY_COUNT = 10 +WAITTING_TIME = 60 log = logging.getLogger(__name__) @@ -43,10 +45,12 @@ def insert_data(self, left_id: int = 0) -> int: embeddings = all_embeddings[batch_id*NUM_PER_BATCH: (batch_id+1)*NUM_PER_BATCH] log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Start inserting {len(metadata)} embeddings") - insert_count = self.db.insert_embeddings( + insert_count, error = self.db.insert_embeddings( embeddings=embeddings, metadata=metadata, ) + if error != None: + raise error log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Finish inserting {len(metadata)} embeddings") assert insert_count == len(metadata) @@ -54,6 +58,46 @@ def insert_data(self, left_id: int = 0) -> int: log.info(f"({mp.current_process().name:16}) Finish inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}") return count + def endless_insert_data(self, left_id: int = 0) -> int: + with self.db.init(): + all_embeddings = self.shared_emb + + # unique id for endlessness insertion + all_metadata = [i+left_id for i in self.train_id] + + num_conc_batches = math.ceil(len(all_embeddings)/NUM_PER_BATCH) + log.info(f"({mp.current_process().name:16}) Start inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}") + count = 0 + for batch_id in range(self.seq_batches): + retry_count = 0 + already_insert_count = 0 + metadata = all_metadata[batch_id*NUM_PER_BATCH : (batch_id+1)*NUM_PER_BATCH] + embeddings = all_embeddings[batch_id*NUM_PER_BATCH : (batch_id+1)*NUM_PER_BATCH] + + log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Start inserting {len(metadata)} embeddings") + while retry_count < LOAD_MAX_TRY_COUNT: + previous_beg, current_beg = 0, 0 + insert_count, error = self.db.insert_embeddings( + embeddings=embeddings[already_insert_count :], + metadata=metadata[already_insert_count :], + ) + already_insert_count += insert_count + if error != None: + retry_count += 1 + time.sleep(WAITTING_TIME) + + log.info(f"Failed to insert data, try {retry_count} time") + if retry_count >= LOAD_MAX_TRY_COUNT: + raise error + else: + break + log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Finish inserting {len(metadata)} embeddings") + + assert already_insert_count == len(metadata) + count += already_insert_count + log.info(f"({mp.current_process().name:16}) Finish inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}") + return count + @utils.time_it def _insert_all_batches(self) -> int: """Performance case only""" @@ -70,7 +114,7 @@ def run_endlessness(self) -> int: with self.db.init(): self.db.ready_to_load() while time.perf_counter() - start_time < config.CASE_TIMEOUT_IN_SECOND: - count = self.insert_data(left_id=max_load_count) + count = self.endless_insert_data(left_id=max_load_count) max_load_count += count times += 1 log.info(f"Loaded {times} entire dataset, current max load counts={utils.numerize(max_load_count)}, {max_load_count}")