Skip to content

Commit

Permalink
Fix search on empty segments set bug (milvus-io#26136)
Browse files Browse the repository at this point in the history
Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby authored Aug 8, 2023
1 parent 703f0a1 commit 54c0e64
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 40 deletions.
9 changes: 1 addition & 8 deletions internal/querycoordv2/observers/leader_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,14 @@ func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dis
continue
}

readableVersion := int64(0)
if existInCurrentTarget {
readableVersion = o.target.GetCollectionTargetVersion(s.CollectionID, meta.CurrentTarget)
} else {
readableVersion = o.target.GetCollectionTargetVersion(s.CollectionID, meta.NextTarget)
}

if !ok || version.GetVersion() < s.Version { // Leader misses this segment
ctx := context.Background()
resp, err := o.broker.GetSegmentInfo(ctx, s.GetID())
if err != nil || len(resp.GetInfos()) == 0 {
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
continue
}
loadInfo := utils.PackSegmentLoadInfo(resp, nil, readableVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)

ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Set,
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/observers/leader_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(resp, nil, view.TargetVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)

expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Expand Down Expand Up @@ -218,7 +218,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(resp, nil, view.TargetVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)

expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Expand Down Expand Up @@ -352,7 +352,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(4, view2)
loadInfo := utils.PackSegmentLoadInfo(resp, nil, view.TargetVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, nil)

expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Expand Down
9 changes: 1 addition & 8 deletions internal/querycoordv2/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
indexes = nil
}

readableVersion := int64(0)
switch GetTaskType(task) {
case TaskTypeGrow:
readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.NextTarget)
case TaskTypeMove, TaskTypeUpdate:
readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.CurrentTarget)
}
loadInfo := utils.PackSegmentLoadInfo(resp, indexes, readableVersion)
loadInfo := utils.PackSegmentLoadInfo(resp, indexes)

// Get shard leader for the given replica and segment
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel())
Expand Down
25 changes: 12 additions & 13 deletions internal/querycoordv2/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met

// packSegmentLoadInfo packs SegmentLoadInfo for given segment,
// packs with index if withIndex is true, this fetch indexes from IndexCoord
func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb.FieldIndexInfo, readableVersion int64) *querypb.SegmentLoadInfo {
func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb.FieldIndexInfo) *querypb.SegmentLoadInfo {
var (
deltaPosition *msgpb.MsgPosition
positionSrc string
Expand Down Expand Up @@ -96,18 +96,17 @@ func PackSegmentLoadInfo(resp *datapb.GetSegmentInfoResponse, indexes []*querypb
zap.Duration("tsLag", tsLag))
}
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
BinlogPaths: segment.Binlogs,
NumOfRows: segment.NumOfRows,
Statslogs: segment.Statslogs,
Deltalogs: segment.Deltalogs,
InsertChannel: segment.InsertChannel,
IndexInfos: indexes,
StartPosition: segment.GetStartPosition(),
DeltaPosition: deltaPosition,
ReadableVersion: readableVersion,
SegmentID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
BinlogPaths: segment.Binlogs,
NumOfRows: segment.NumOfRows,
Statslogs: segment.Statslogs,
Deltalogs: segment.Deltalogs,
InsertChannel: segment.InsertChannel,
IndexInfos: indexes,
StartPosition: segment.GetStartPosition(),
DeltaPosition: deltaPosition,
}
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)
return loadInfo
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/utils/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Test_packLoadSegmentRequest(t *testing.T) {
proto.Clone(segmentInfo).(*datapb.SegmentInfo),
},
}
req := PackSegmentLoadInfo(resp, nil, 0)
req := PackSegmentLoadInfo(resp, nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t2, req.GetDeltaPosition().Timestamp)
Expand All @@ -67,7 +67,7 @@ func Test_packLoadSegmentRequest(t *testing.T) {
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{segInfo},
}
req := PackSegmentLoadInfo(resp, nil, 0)
req := PackSegmentLoadInfo(resp, nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t1, req.GetDeltaPosition().Timestamp)
Expand All @@ -79,7 +79,7 @@ func Test_packLoadSegmentRequest(t *testing.T) {
resp := &datapb.GetSegmentInfoResponse{
Infos: []*datapb.SegmentInfo{segInfo},
}
req := PackSegmentLoadInfo(resp, nil, 0)
req := PackSegmentLoadInfo(resp, nil)
assert.NotNil(t, req.GetDeltaPosition())
assert.Equal(t, mockPChannel, req.GetDeltaPosition().ChannelName)
assert.Equal(t, t0, req.GetDeltaPosition().Timestamp)
Expand Down
9 changes: 4 additions & 5 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,10 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
// alter distribution
entries := lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) SegmentEntry {
return SegmentEntry{
SegmentID: info.GetSegmentID(),
PartitionID: info.GetPartitionID(),
NodeID: req.GetDstNodeID(),
Version: req.GetVersion(),
TargetVersion: info.GetReadableVersion(),
SegmentID: info.GetSegmentID(),
PartitionID: info.GetPartitionID(),
NodeID: req.GetDstNodeID(),
Version: req.GetVersion(),
}
})
sd.distribution.AddDistributions(entries...)
Expand Down
4 changes: 4 additions & 0 deletions internal/querynodev2/delegator/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (d *distribution) AddDistributions(entries ...SegmentEntry) {
defer d.mut.Unlock()

for _, entry := range entries {
if s, ok := d.sealedSegments[entry.SegmentID]; ok {
// remain the target version for already loaded segment to void skipping this segment when executing search
entry.TargetVersion = s.TargetVersion
}
d.sealedSegments[entry.SegmentID] = entry
d.offlines.Remove(entry.SegmentID)
}
Expand Down

0 comments on commit 54c0e64

Please sign in to comment.