Skip to content

Commit

Permalink
Use FlushTs and ColelctionName in GetFlushState (#1675)
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Sep 7, 2023
1 parent 2964d41 commit affb8a6
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 242 deletions.
20 changes: 13 additions & 7 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,10 @@ def get_collection_stats(self, collection_name: str, timeout: Optional[float] =
raise MilvusException(status.error_code, status.reason)

@retry_on_rpc_failure()
def get_flush_state(self, segment_ids: List[int], timeout: Optional[float] = None, **kwargs):
req = Prepare.get_flush_state_request(segment_ids)
def get_flush_state(
self, collection_name: str, flush_ts: int, timeout: Optional[float] = None, **kwargs
):
req = Prepare.get_flush_state_request(collection_name, flush_ts)
future = self._stub.GetFlushState.future(req, timeout=timeout)
response = future.result()
status = response.status
Expand All @@ -1238,14 +1240,18 @@ def get_persistent_segment_infos(
return response.infos # todo: A wrapper class of PersistentSegmentInfo
raise MilvusException(status.error_code, status.reason)

def _wait_for_flushed(self, segment_ids: List[int], timeout: Optional[float] = None, **kwargs):
def _wait_for_flushed(
self, collection_name: str, flush_ts: int, timeout: Optional[float] = None, **kwargs
):
flush_ret = False
start = time.time()
while not flush_ret:
flush_ret = self.get_flush_state(segment_ids, timeout, **kwargs)
flush_ret = self.get_flush_state(collection_name, flush_ts, timeout, **kwargs)
end = time.time()
if timeout is not None and end - start > timeout:
raise MilvusException(message=f"wait for flush timeout, segment ids: {segment_ids}")
raise MilvusException(
message=f"wait for flush timeout, collection: {collection_name}"
)

if not flush_ret:
time.sleep(0.5)
Expand All @@ -1266,8 +1272,8 @@ def flush(self, collection_names: list, timeout: Optional[float] = None, **kwarg

def _check():
for collection_name in collection_names:
segment_ids = future.result().coll_segIDs[collection_name].data
self._wait_for_flushed(segment_ids, timeout=timeout)
flush_ts = future.result().coll_flush_ts[collection_name]
self._wait_for_flushed(collection_name, flush_ts, timeout=timeout)

if kwargs.get("_async", False):
flush_future = FlushFuture(future)
Expand Down
4 changes: 2 additions & 2 deletions pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,8 @@ def get_persistent_segment_info_request(cls, collection_name: str):
return milvus_types.GetPersistentSegmentInfoRequest(collectionName=collection_name)

@classmethod
def get_flush_state_request(cls, segment_ids: List[int]):
return milvus_types.GetFlushStateRequest(segmentIDs=segment_ids)
def get_flush_state_request(cls, collection_name: str, flush_ts: int):
return milvus_types.GetFlushStateRequest(collection_name=collection_name, flush_ts=flush_ts)

@classmethod
def get_query_segment_info_request(cls, collection_name: str):
Expand Down
2 changes: 1 addition & 1 deletion pymilvus/grpc_gen/milvus-proto
452 changes: 228 additions & 224 deletions pymilvus/grpc_gen/milvus_pb2.py

Large diffs are not rendered by default.

37 changes: 29 additions & 8 deletions pymilvus/grpc_gen/milvus_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ class FlushRequest(_message.Message):
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ..., collection_names: _Optional[_Iterable[str]] = ...) -> None: ...

class FlushResponse(_message.Message):
__slots__ = ["status", "db_name", "coll_segIDs", "flush_coll_segIDs", "coll_seal_times"]
__slots__ = ["status", "db_name", "coll_segIDs", "flush_coll_segIDs", "coll_seal_times", "coll_flush_ts"]
class CollSegIDsEntry(_message.Message):
__slots__ = ["key", "value"]
KEY_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -789,17 +789,26 @@ class FlushResponse(_message.Message):
key: str
value: int
def __init__(self, key: _Optional[str] = ..., value: _Optional[int] = ...) -> None: ...
class CollFlushTsEntry(_message.Message):
__slots__ = ["key", "value"]
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: int
def __init__(self, key: _Optional[str] = ..., value: _Optional[int] = ...) -> None: ...
STATUS_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
COLL_SEGIDS_FIELD_NUMBER: _ClassVar[int]
FLUSH_COLL_SEGIDS_FIELD_NUMBER: _ClassVar[int]
COLL_SEAL_TIMES_FIELD_NUMBER: _ClassVar[int]
COLL_FLUSH_TS_FIELD_NUMBER: _ClassVar[int]
status: _common_pb2.Status
db_name: str
coll_segIDs: _containers.MessageMap[str, _schema_pb2.LongArray]
flush_coll_segIDs: _containers.MessageMap[str, _schema_pb2.LongArray]
coll_seal_times: _containers.ScalarMap[str, int]
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., db_name: _Optional[str] = ..., coll_segIDs: _Optional[_Mapping[str, _schema_pb2.LongArray]] = ..., flush_coll_segIDs: _Optional[_Mapping[str, _schema_pb2.LongArray]] = ..., coll_seal_times: _Optional[_Mapping[str, int]] = ...) -> None: ...
coll_flush_ts: _containers.ScalarMap[str, int]
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., db_name: _Optional[str] = ..., coll_segIDs: _Optional[_Mapping[str, _schema_pb2.LongArray]] = ..., flush_coll_segIDs: _Optional[_Mapping[str, _schema_pb2.LongArray]] = ..., coll_seal_times: _Optional[_Mapping[str, int]] = ..., coll_flush_ts: _Optional[_Mapping[str, int]] = ...) -> None: ...

class QueryRequest(_message.Message):
__slots__ = ["base", "db_name", "collection_name", "expr", "output_fields", "partition_names", "travel_timestamp", "guarantee_timestamp", "query_params", "not_return_all_meta", "consistency_level", "use_default_consistency"]
Expand Down Expand Up @@ -884,8 +893,12 @@ class CalcDistanceResults(_message.Message):
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., int_dist: _Optional[_Union[_schema_pb2.IntArray, _Mapping]] = ..., float_dist: _Optional[_Union[_schema_pb2.FloatArray, _Mapping]] = ...) -> None: ...

class FlushAllRequest(_message.Message):
__slots__ = []
def __init__(self) -> None: ...
__slots__ = ["base", "db_name"]
BASE_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
base: _common_pb2.MsgBase
db_name: str
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., db_name: _Optional[str] = ...) -> None: ...

class FlushAllResponse(_message.Message):
__slots__ = ["status", "flush_all_ts"]
Expand Down Expand Up @@ -1118,10 +1131,16 @@ class CompactionMergeInfo(_message.Message):
def __init__(self, sources: _Optional[_Iterable[int]] = ..., target: _Optional[int] = ...) -> None: ...

class GetFlushStateRequest(_message.Message):
__slots__ = ["segmentIDs"]
__slots__ = ["segmentIDs", "flush_ts", "db_name", "collection_name"]
SEGMENTIDS_FIELD_NUMBER: _ClassVar[int]
FLUSH_TS_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
COLLECTION_NAME_FIELD_NUMBER: _ClassVar[int]
segmentIDs: _containers.RepeatedScalarFieldContainer[int]
def __init__(self, segmentIDs: _Optional[_Iterable[int]] = ...) -> None: ...
flush_ts: int
db_name: str
collection_name: str
def __init__(self, segmentIDs: _Optional[_Iterable[int]] = ..., flush_ts: _Optional[int] = ..., db_name: _Optional[str] = ..., collection_name: _Optional[str] = ...) -> None: ...

class GetFlushStateResponse(_message.Message):
__slots__ = ["status", "flushed"]
Expand All @@ -1132,12 +1151,14 @@ class GetFlushStateResponse(_message.Message):
def __init__(self, status: _Optional[_Union[_common_pb2.Status, _Mapping]] = ..., flushed: bool = ...) -> None: ...

class GetFlushAllStateRequest(_message.Message):
__slots__ = ["base", "flush_all_ts"]
__slots__ = ["base", "flush_all_ts", "db_name"]
BASE_FIELD_NUMBER: _ClassVar[int]
FLUSH_ALL_TS_FIELD_NUMBER: _ClassVar[int]
DB_NAME_FIELD_NUMBER: _ClassVar[int]
base: _common_pb2.MsgBase
flush_all_ts: int
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., flush_all_ts: _Optional[int] = ...) -> None: ...
db_name: str
def __init__(self, base: _Optional[_Union[_common_pb2.MsgBase, _Mapping]] = ..., flush_all_ts: _Optional[int] = ..., db_name: _Optional[str] = ...) -> None: ...

class GetFlushAllStateResponse(_message.Message):
__slots__ = ["status", "flushed"]
Expand Down

0 comments on commit affb8a6

Please sign in to comment.