Skip to content

Commit

Permalink
refine log out for datanode(milvus-io#25763)
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Aug 8, 2023
1 parent 54c0e64 commit 96fee37
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
7 changes: 4 additions & 3 deletions internal/datanode/channel_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,11 @@ func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) {
c.segMu.Lock()
defer c.segMu.Unlock()

log.Info("updating segment num row", zap.Int64("segmentID", segID), zap.Int64("numRows", numRows))
seg, ok := c.segments[segID]
if ok && seg.notFlushed() {
seg.numRows += numRows
log.Info("updated segment num row", zap.Int64("segmentID", segID),
zap.Int64("addedNumRows", numRows), zap.Int64("numRowsSum", seg.numRows))
return
}

Expand All @@ -620,11 +621,11 @@ func (c *ChannelMeta) updateSingleSegmentMemorySize(segID UniqueID) {
memorySize := c.calculateSegmentMemorySize(segID)
c.segMu.Lock()
defer c.segMu.Unlock()
log.Info("updating segment memorySize", zap.Int64("segmentID", segID),
zap.Int64("memorySize", memorySize))
seg, ok := c.segments[segID]
if ok {
seg.memorySize = memorySize
log.Info("updated segment memorySize", zap.Int64("segmentID", segID),
zap.Int64("memorySize", seg.memorySize))
return
}

Expand Down
10 changes: 5 additions & 5 deletions internal/datanode/flow_graph_insert_buffer_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,17 +463,17 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
zap.Any("position", endPosition),
zap.String("channel", ibNode.channelName),
)
// check if task pool is full
if !task.dropped && !task.flushed && ibNode.flushManager.isFull() {
log.RatedWarn(10, "task pool is full, skip it")
continue
}
// check if segment is syncing
segment := ibNode.channel.getSegment(task.segmentID)
if !task.dropped && !task.flushed && segment.isSyncing() {
log.RatedInfo(10, "segment is syncing, skip it")
continue
}
// check if task pool is full
if !task.dropped && !task.flushed && ibNode.flushManager.isFull() {
log.RatedWarn(10, "task pool is full, skip it")
continue
}
segment.setSyncing(true)
log.Info("insertBufferNode start syncing bufferData")
// use the flushed pk stats to take current stat
Expand Down
7 changes: 6 additions & 1 deletion internal/datanode/flow_graph_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ func (fm *flowgraphManager) execute(totalMemory uint64) {
return
}

if float64(total) < float64(totalMemory)*Params.DataNodeCfg.MemoryWatermark.GetAsFloat() {
memoryWatermark := float64(totalMemory) * Params.DataNodeCfg.MemoryWatermark.GetAsFloat()
if float64(total) < memoryWatermark {
log.RatedDebug(5, "skip force sync because memory level is not high enough",
zap.Float64("current_total_memory_usage", float64(total)),
zap.Float64("current_memory_watermark", memoryWatermark),
zap.Any("channel_memory_usages", channels))
return
}

Expand Down

0 comments on commit 96fee37

Please sign in to comment.