Skip to content

Commit

Permalink
Update loading case
Browse files Browse the repository at this point in the history
Signed-off-by: cqy123456 <[email protected]>
  • Loading branch information
cqy123456 authored and XuanYang-cn committed Jun 21, 2023
1 parent 5c3d135 commit 4e0d448
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 35 deletions.
2 changes: 1 addition & 1 deletion vectordb_bench/backend/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def insert_embeddings(
embeddings: list[list[float]],
metadata: list[int],
kwargs: Any,
) -> int:
) -> (int, Exception):
"""Insert the embeddings to the vector database. The default number of embeddings for
each insert_embeddings is 5000.
Expand Down
6 changes: 3 additions & 3 deletions vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def insert_embeddings(
self,
embeddings: Iterable[list[float]],
metadata: list[int],
) -> int:
) -> (int, Exception):
"""Insert the embeddings to the elasticsearch."""
assert self.client is not None, "should self.init() first"

Expand All @@ -99,10 +99,10 @@ def insert_embeddings(
]
try:
bulk_insert_res = bulk(self.client, insert_data)
return bulk_insert_res[0]
return (bulk_insert_res[0], None)
except Exception as e:
log.warning(f"Failed to insert data: {self.indice} error: {str(e)}")
raise e from None
return (0, e)

def search_embedding(
self,
Expand Down
26 changes: 16 additions & 10 deletions vectordb_bench/backend/clients/milvus/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

log = logging.getLogger(__name__)

MILVUS_LOAD_REQS_SIZE = 1.5 * 1024 *1024

class Milvus(VectorDB):
def __init__(
Expand All @@ -29,6 +30,7 @@ def __init__(
self.db_config = db_config
self.case_config = db_case_config
self.collection_name = collection_name
self.batch_size = int(MILVUS_LOAD_REQS_SIZE / (dim *4))

self._primary_field = "pk"
self._scalar_field = "id"
Expand Down Expand Up @@ -139,22 +141,26 @@ def insert_embeddings(
embeddings: Iterable[list[float]],
metadata: list[int],
**kwargs: Any,
) -> int:
) -> (int, Exception):
"""Insert embeddings into Milvus. should call self.init() first"""
# use the first insert_embeddings to init collection
assert self.col is not None
insert_data = [
metadata,
metadata,
embeddings,
]

assert len(embeddings) == len(metadata)
insert_count = 0
try:
res = self.col.insert(insert_data, **kwargs)
return len(res.primary_keys)
for batch_start_offset in range(0, len(embeddings), self.batch_size):
batch_end_offset = min(batch_start_offset + self.batch_size, len(embeddings))
insert_data = [
metadata[batch_start_offset : batch_end_offset],
metadata[batch_start_offset : batch_end_offset],
embeddings[batch_start_offset : batch_end_offset],
]
res = self.col.insert(insert_data, **kwargs)
insert_count += len(res.primary_keys)
except MilvusException as e:
log.warning("Failed to insert data")
raise e from None
return (insert_count, e)
return (insert_count, None)

def search_embedding(
self,
Expand Down
25 changes: 15 additions & 10 deletions vectordb_bench/backend/clients/pinecone/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,22 @@ def insert_embeddings(
self,
embeddings: list[list[float]],
metadata: list[int],
) -> list[str]:
) -> (int, Exception):
assert len(embeddings) == len(metadata)
for batch_start_offset in range(0, len(embeddings), self.batch_size):
batch_end_offset = min(batch_start_offset + self.batch_size, len(embeddings))
insert_datas = []
for i in range(batch_start_offset, batch_end_offset):
insert_data = (str(metadata[i]), embeddings[i], {
self._metadata_key: metadata[i]})
insert_datas.append(insert_data)
self.index.upsert(insert_datas)
return len(embeddings)
insert_count = 0
try:
for batch_start_offset in range(0, len(embeddings), self.batch_size):
batch_end_offset = min(batch_start_offset + self.batch_size, len(embeddings))
insert_datas = []
for i in range(batch_start_offset, batch_end_offset):
insert_data = (str(metadata[i]), embeddings[i], {
self._metadata_key: metadata[i]})
insert_datas.append(insert_data)
self.index.upsert(insert_datas)
insert_count += batch_end_offset - batch_start_offset
except Exception as e:
return (insert_count, e)
return (len(embeddings), None)

def search_embedding(
self,
Expand Down
7 changes: 3 additions & 4 deletions vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def insert_embeddings(
embeddings: list[list[float]],
metadata: list[int],
**kwargs: Any,
) -> list[str]:
) -> (int, Exception):
"""Insert embeddings into Milvus. should call self.init() first"""
assert self.qdrant_client is not None
try:
Expand All @@ -127,11 +127,10 @@ def insert_embeddings(
wait=True,
points=Batch(ids=metadata, payloads=[{self._primary_field: v} for v in metadata], vectors=embeddings)
)

return len(metadata)
return (len(metadata), None)
except Exception as e:
log.info(f"Failed to insert data, {e}")
raise e from None
return (0, e)

def search_embedding(
self,
Expand Down
11 changes: 6 additions & 5 deletions vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def insert_embeddings(
embeddings: Iterable[list[float]],
metadata: list[int],
**kwargs: Any,
) -> int:
) -> (int, Exception):
"""Insert embeddings into Weaviate"""
assert self.client.schema.exists(self.collection_name)

insert_count = 0
try:
with self.client.batch as batch:
with self.client.batch as batch:
batch.batch_size = len(metadata)
batch.dynamic = True
res = []
Expand All @@ -114,10 +114,11 @@ def insert_embeddings(
class_name=self.collection_name,
vector=embeddings[i]
))
return len(res)
insert_count += 1
return (len(res), None)
except WeaviateBaseError as e:
log.warning(f"Failed to insert data, error: {str(e)}")
raise e from None
return (insert_count, e)

def search_embedding(
self,
Expand Down
48 changes: 46 additions & 2 deletions vectordb_bench/backend/runner/serial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

NUM_PER_BATCH = config.NUM_PER_BATCH
LOAD_TIMEOUT = 24 * 60 * 60
LOAD_MAX_TRY_COUNT = 10
WAITTING_TIME = 60

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,17 +45,59 @@ def insert_data(self, left_id: int = 0) -> int:
embeddings = all_embeddings[batch_id*NUM_PER_BATCH: (batch_id+1)*NUM_PER_BATCH]

log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Start inserting {len(metadata)} embeddings")
insert_count = self.db.insert_embeddings(
insert_count, error = self.db.insert_embeddings(
embeddings=embeddings,
metadata=metadata,
)
if error != None:
raise error
log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Finish inserting {len(metadata)} embeddings")

assert insert_count == len(metadata)
count += insert_count
log.info(f"({mp.current_process().name:16}) Finish inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}")
return count

def endless_insert_data(self, left_id: int = 0) -> int:
with self.db.init():
all_embeddings = self.shared_emb

# unique id for endlessness insertion
all_metadata = [i+left_id for i in self.train_id]

num_conc_batches = math.ceil(len(all_embeddings)/NUM_PER_BATCH)
log.info(f"({mp.current_process().name:16}) Start inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}")
count = 0
for batch_id in range(self.seq_batches):
retry_count = 0
already_insert_count = 0
metadata = all_metadata[batch_id*NUM_PER_BATCH : (batch_id+1)*NUM_PER_BATCH]
embeddings = all_embeddings[batch_id*NUM_PER_BATCH : (batch_id+1)*NUM_PER_BATCH]

log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Start inserting {len(metadata)} embeddings")
while retry_count < LOAD_MAX_TRY_COUNT:
previous_beg, current_beg = 0, 0
insert_count, error = self.db.insert_embeddings(
embeddings=embeddings[already_insert_count :],
metadata=metadata[already_insert_count :],
)
already_insert_count += insert_count
if error != None:
retry_count += 1
time.sleep(WAITTING_TIME)

log.info(f"Failed to insert data, try {retry_count} time")
if retry_count >= LOAD_MAX_TRY_COUNT:
raise error
else:
break
log.debug(f"({mp.current_process().name:16}) batch [{batch_id:3}/{num_conc_batches}], Finish inserting {len(metadata)} embeddings")

assert already_insert_count == len(metadata)
count += already_insert_count
log.info(f"({mp.current_process().name:16}) Finish inserting {len(all_embeddings)} embeddings in batch {NUM_PER_BATCH}")
return count

@utils.time_it
def _insert_all_batches(self) -> int:
"""Performance case only"""
Expand All @@ -70,7 +114,7 @@ def run_endlessness(self) -> int:
with self.db.init():
self.db.ready_to_load()
while time.perf_counter() - start_time < config.CASE_TIMEOUT_IN_SECOND:
count = self.insert_data(left_id=max_load_count)
count = self.endless_insert_data(left_id=max_load_count)
max_load_count += count
times += 1
log.info(f"Loaded {times} entire dataset, current max load counts={utils.numerize(max_load_count)}, {max_load_count}")
Expand Down

0 comments on commit 4e0d448

Please sign in to comment.