diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 716aa3fafb45c..f33ef8b48e95c 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -595,10 +595,11 @@ func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) { c.segMu.Lock() defer c.segMu.Unlock() - log.Info("updating segment num row", zap.Int64("segmentID", segID), zap.Int64("numRows", numRows)) seg, ok := c.segments[segID] if ok && seg.notFlushed() { seg.numRows += numRows + log.Info("updated segment num row", zap.Int64("segmentID", segID), + zap.Int64("addedNumRows", numRows), zap.Int64("numRowsSum", seg.numRows)) return } @@ -620,11 +621,11 @@ func (c *ChannelMeta) updateSingleSegmentMemorySize(segID UniqueID) { memorySize := c.calculateSegmentMemorySize(segID) c.segMu.Lock() defer c.segMu.Unlock() - log.Info("updating segment memorySize", zap.Int64("segmentID", segID), - zap.Int64("memorySize", memorySize)) seg, ok := c.segments[segID] if ok { seg.memorySize = memorySize + log.Info("updated segment memorySize", zap.Int64("segmentID", segID), + zap.Int64("memorySize", seg.memorySize)) return } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 0b96253bf6b26..76c177289ddbd 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -463,17 +463,17 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, zap.Any("position", endPosition), zap.String("channel", ibNode.channelName), ) - // check if task pool is full - if !task.dropped && !task.flushed && ibNode.flushManager.isFull() { - log.RatedWarn(10, "task pool is full, skip it") - continue - } // check if segment is syncing segment := ibNode.channel.getSegment(task.segmentID) if !task.dropped && !task.flushed && segment.isSyncing() { log.RatedInfo(10, "segment is syncing, skip it") continue } + // check if task pool is full + if !task.dropped && !task.flushed && ibNode.flushManager.isFull() { + log.RatedWarn(10, "task pool is full, skip it") + continue + } segment.setSyncing(true) log.Info("insertBufferNode start syncing bufferData") // use the flushed pk stats to take current stat diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 0e5da3f371d29..0309eeaaf8b9a 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -91,7 +91,12 @@ func (fm *flowgraphManager) execute(totalMemory uint64) { return } - if float64(total) < float64(totalMemory)*Params.DataNodeCfg.MemoryWatermark.GetAsFloat() { + memoryWatermark := float64(totalMemory) * Params.DataNodeCfg.MemoryWatermark.GetAsFloat() + if float64(total) < memoryWatermark { + log.RatedDebug(5, "skip force sync because memory level is not high enough", + zap.Float64("current_total_memory_usage", float64(total)), + zap.Float64("current_memory_watermark", memoryWatermark), + zap.Any("channel_memory_usages", channels)) return }