From 37821e0b6efd3aaff142a3cd3c95966a76fcc852 Mon Sep 17 00:00:00 2001 From: liwen Date: Thu, 5 Nov 2020 00:20:16 +0800 Subject: [PATCH] test: add test case for avoiding too much memory delay --- nsqd/channel_test.go | 89 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 90ef7645..df8cec46 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -340,6 +340,95 @@ func TestChannelReqNowTooMuch(t *testing.T) { ast.True(t, reqCnt >= count*MaxAttempts/2) } +func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) { + count := 30 + opts := NewOptions() + opts.SyncEvery = 1 + opts.MaxRdyCount = 100 + opts.MaxConfirmWin = 10 + opts.Logger = newTestLogger(t) + opts.MsgTimeout = 100 * time.Millisecond + // use large to delay the period scan + opts.QueueScanRefreshInterval = 10 * time.Second + opts.QueueScanInterval = time.Millisecond * 100 + _, _, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topicName := "test_requeued_toomuch" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopicIgnPart(topicName) + channel := topic.GetChannel("channel") + + for i := 0; i < count; i++ { + msg := NewMessage(0, []byte("test")) + _, _, _, _, err := topic.PutMessage(msg) + equal(t, err, nil) + } + + channel.AddClient(1, NewFakeConsumer(1)) + start := time.Now() + reqCnt := 0 + timeout := 0 + lastDelay := time.Now() + for time.Since(start) < time.Second*5 { + select { + case <-time.After(time.Second): + timeout++ + case outputMsg, ok := <-channel.clientMsgChan: + if !ok { + t.Error("eror recv") + return + } + channel.inFlightMutex.Lock() + waitingReq := len(channel.waitingRequeueMsgs) + reqChanLen := len(channel.requeuedMsgChan) + channel.inFlightMutex.Unlock() + + reqCnt++ + now := time.Now() + t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen) + // should not read too much from backend + ast.True(t, int64(outputMsg.ID) <= opts.MaxConfirmWin+1, "should not read backend too much") + channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout) + // requeue with different timeout to make sure the memory deferred cnt is high + // since after timeout deferred cnt will be reset + lastDelay = lastDelay.Add(time.Millisecond * 101) + delay := time.Since(lastDelay) + channel.RequeueMessage(1, "", outputMsg.ID, delay, false) + } + } + + t.Logf("total req cnt: %v, timeout: %v", reqCnt, timeout) + ast.True(t, int64(reqCnt) >= opts.MaxConfirmWin*2) + ast.Equal(t, 0, timeout) + start = time.Now() + for time.Since(start) < time.Second*5 { + if channel.Depth() == 0 { + break + } + select { + case <-time.After(time.Second): + timeout++ + case outputMsg, ok := <-channel.clientMsgChan: + if !ok { + t.Error("eror recv") + return + } + channel.inFlightMutex.Lock() + waitingReq := len(channel.waitingRequeueMsgs) + reqChanLen := len(channel.requeuedMsgChan) + channel.inFlightMutex.Unlock() + + reqCnt++ + now := time.Now() + t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen) + channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout) + channel.FinishMessage(1, "", outputMsg.ID) + } + } + ast.Equal(t, int64(0), channel.Depth()) +} + func TestChannelEmptyWhileConfirmDelayMsg(t *testing.T) { // test confirm delay counter while empty channel opts := NewOptions()