Skip to content

Commit

Permalink
Add optimze to get best performance
Browse files Browse the repository at this point in the history
VectorDBBench need this, so that we don't need to change
VDB everytime when server breaks some rules.

This function makes sure VDB results are reproduceable.

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Aug 22, 2023
1 parent 0a70b58 commit 0aa6b7e
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import copy
import json
import time
from typing import Dict, List, Optional, Union

import pandas as pd
Expand All @@ -20,6 +21,7 @@
from pymilvus.client.types import (
CompactionPlans,
CompactionState,
LoadState,
Replica,
cmp_consistency_level,
get_consistency_level,
Expand All @@ -29,6 +31,7 @@
DataTypeNotMatchException,
ExceptionsMessage,
IndexNotExistException,
MilvusException,
PartitionAlreadyExistException,
PartitionNotExistException,
SchemaNotReadyException,
Expand Down Expand Up @@ -1360,3 +1363,41 @@ def get_replicas(self, timeout: Optional[float] = None, **kwargs) -> Replica:
def describe(self, timeout: Optional[float] = None):
conn = self._get_connection()
return conn.describe_collection(self.name, timeout=timeout)

def optimize(self, timeout: Optional[float] = None, **kwargs):
"""Optimize the server to gain the best performance.
Be careful, by default this method may hang very very long.
The collection should be INDEXED before optimize.
"""

timeout = timeout or 12 * 60 * 60 # set default timeout to 12hrs

start_time = time.time()
conn = self._get_connection()

# check if indexed
if not self.has_index():
raise MilvusException(message="Please index before calling optimize")

self.flush(timeout=timeout)
index = self.index()

def has_pending_rows() -> bool:
info = conn.get_index_build_progress(self.name, index.index_name, timeout=timeout)
return info.get("pending_index_rows", -1) > 0

while(True) :
if not has_pending_rows():
self.compact()
self.wait_for_compaction_completed()
if not has_pending_rows():
break

if time.time() - start_time > timeout:
raise MilvusException(message=f"Wait for optimize timeout in {timeout}s")

time.sleep(5)

if conn.get_load_state(self.name) not in (LoadState.NotExist, LoadState.NotLoad):
self.load(_refresh=True)

0 comments on commit 0aa6b7e

Please sign in to comment.