Skip to content

Commit

Permalink
Change the logic about waiting for loading collection/partitions (#627)
Browse files Browse the repository at this point in the history
Signed-off-by: dragondriver <[email protected]>
  • Loading branch information
longjiquan authored and XuanYang-cn committed Aug 15, 2021
1 parent 0ab47ba commit 0888e23
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 244 deletions.
302 changes: 183 additions & 119 deletions grpc-proto/gen/milvus_pb2.py

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions grpc-proto/milvus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ message GetCollectionStatisticsResponse {
repeated common.KeyValuePair stats = 2;
}

enum ShowCollectionsType {
enum ShowType {
All = 0;
InMemory = 1;
}
Expand All @@ -126,13 +126,17 @@ message ShowCollectionsRequest {
common.MsgBase base = 1; // must
string db_name = 2;
uint64 time_stamp = 3;
ShowCollectionsType type = 4;
ShowType type = 4;
repeated string collection_names = 5; // show collections in query nodes, showType = InMemory
}

message ShowCollectionsResponse {
common.Status status = 1;
repeated string collection_names = 2;
repeated int64 collection_ids = 3;
repeated uint64 created_timestamps = 4; // hybrid timestamps
repeated uint64 created_utc_timestamps = 5; // physical timestamps
repeated int64 inMemory_percentages = 6; // load percentage on query nodes
}

message CreatePartitionRequest {
Expand Down Expand Up @@ -187,12 +191,17 @@ message ShowPartitionsRequest {
string db_name = 2;
string collection_name = 3; // must
int64 collectionID = 4;
repeated string partition_names = 5; // show partitions in query nodes, showType = InMemory
ShowType type = 6;
}

message ShowPartitionsResponse {
common.Status status = 1;
repeated string partition_names = 2;
repeated int64 partitionIDs = 3;
repeated uint64 created_timestamps = 4; // hybrid timestamps
repeated uint64 created_utc_timestamps = 5; // physical timestamps
repeated int64 inMemory_percentages = 6; // load percentage on querynode
}

message DescribeSegmentRequest {
Expand Down
1 change: 1 addition & 0 deletions pymilvus/client/configs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# TODO(dragondriver): add more default configs to here
class DefaultConfigs:
MaxSearchResultSize = 100 * 1024 * 1024
WaitTimeDurationWhenLoad = 0.5 # in seconds
69 changes: 69 additions & 0 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .hooks import BaseSearchHook
from .client_hooks import SearchHook, HybridSearchHook
from ..settings import DefaultConfig as config
from .configs import DefaultConfigs

from .asynch import (
SearchFuture,
Expand Down Expand Up @@ -820,6 +821,9 @@ def load_collection_progress(self, collection_name, timeout=None):

@error_handler()
def wait_for_loading_collection(self, collection_name, timeout=None):
return self._wait_for_loading_collection_v2(collection_name, timeout)

def _wait_for_loading_collection_v1(self, collection_name, timeout=None):
"""
Block until load collection complete.
"""
Expand All @@ -833,6 +837,32 @@ def wait_for_loading_collection(self, collection_name, timeout=None):
if 0 <= unloaded_segments.get(info.segmentID, -1) <= info.num_rows:
unloaded_segments.pop(info.segmentID)

def _wait_for_loading_collection_v2(self, collection_name, timeout=None):
"""
Block until load collection complete.
"""
request = Prepare.show_collections_request([collection_name])

while True:
future = self._stub.ShowCollections.future(request, wait_for_ready=True, timeout=timeout)
response = future.result()

if response.status.error_code != 0:
raise BaseException(response.status.error_code, response.status.reason)

ol = len(response.collection_names)
pl = len(response.inMemory_percentages)

if ol != pl:
raise BaseException(ErrorCode.UnexpectedError,
f"len(collection_names) ({ol}) != len(inMemory_percentages) ({pl})")

for i, coll_name in enumerate(response.collection_names):
if coll_name == collection_name and response.inMemory_percentages[i] == 100:
return

time.sleep(DefaultConfigs.WaitTimeDurationWhenLoad)

@error_handler()
def release_collection(self, db_name, collection_name, timeout=None):
request = Prepare.release_collection(db_name, collection_name)
Expand Down Expand Up @@ -870,6 +900,9 @@ def _check():

@error_handler()
def wait_for_loading_partitions(self, collection_name, partition_names, timeout=None):
return self._wait_for_loading_partitions_v2(collection_name, partition_names, timeout)

def _wait_for_loading_partitions_v1(self, collection_name, partition_names, timeout=None):
"""
Block until load partition complete.
"""
Expand All @@ -894,6 +927,42 @@ def wait_for_loading_partitions(self, collection_name, partition_names, timeout=
if 0 <= unloaded_segments.get(info.segmentID, -1) <= info.num_rows:
unloaded_segments.pop(info.segmentID)

def _wait_for_loading_partitions_v2(self, collection_name, partition_names, timeout=None):
"""
Block until load partition complete.
"""
request = Prepare.show_partitions_request(collection_name, partition_names)

while True:
future = self._stub.ShowPartitions.future(request, wait_for_ready=True, timeout=timeout)
response = future.result()

status = response.status
if status.error_code != 0:
raise BaseException(status.error_code, status.reason)

ol = len(response.partition_names)
pl = len(response.inMemory_percentages)

if ol != pl:
raise BaseException(ErrorCode.UnexpectedError,
f"len(partition_names) ({ol}) != len(inMemory_percentages) ({pl})")

loaded_histogram = dict()
for i, par_name in enumerate(response.partition_names):
loaded_histogram[par_name] = response.inMemory_percentages[i]

ok = True
for par_name in partition_names:
if loaded_histogram.get(par_name, 0) != 100:
ok = False
break

if ok:
return

time.sleep(DefaultConfigs.WaitTimeDurationWhenLoad)

@error_handler()
def load_partitions_progress(self, collection_name, partition_names, timeout=None):
"""
Expand Down
26 changes: 22 additions & 4 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import mmh3

from .exceptions import ParamError
from .check import check_pass_param

from ..grpc_gen import milvus_pb2 as grpc_types

Expand Down Expand Up @@ -143,8 +144,16 @@ def collection_stats_request(cls, collection_name):
return milvus_types.CollectionStatsRequest(collection_name=collection_name)

@classmethod
def show_collections_request(cls):
return milvus_types.ShowCollectionsRequest()
def show_collections_request(cls, collection_names=None):
req = milvus_types.ShowCollectionsRequest()
if collection_names:
if not isinstance(collection_names, (list,)):
raise ParamError(f"collection_names must be a list of strings, but got: {collection_names}")
for collection_name in collection_names:
check_pass_param(collection_name=collection_name)
req.collection_names.extend(collection_names)
req.type = milvus_types.ShowType.InMemory
return req

@classmethod
def create_partition_request(cls, collection_name, partition_name):
Expand All @@ -163,8 +172,17 @@ def partition_stats_request(cls, collection_name, partition_name):
return milvus_types.PartitionStatsRequest(collection_name=collection_name, partition_name=partition_name)

@classmethod
def show_partitions_request(cls, collection_name):
return milvus_types.ShowPartitionsRequest(collection_name=collection_name)
def show_partitions_request(cls, collection_name, partition_names=None):
check_pass_param(collection_name=collection_name)
req = milvus_types.ShowPartitionsRequest(collection_name=collection_name)
if partition_names:
if not isinstance(partition_names, (list,)):
raise ParamError(f"partition_names must be a list of strings, but got: {partition_names}")
for partition_name in partition_names:
check_pass_param(partition_name=partition_name)
req.partition_names.extend(partition_names)
req.type = milvus_types.ShowType.InMemory
return req

@classmethod
def empty(cls):
Expand Down
Loading

0 comments on commit 0888e23

Please sign in to comment.