Skip to content

Commit

Permalink
test: add test case for avoiding too much memory delay
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Nov 4, 2020
1 parent 845a2ab commit 37821e0
Showing 1 changed file with 89 additions and 0 deletions.
89 changes: 89 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,95 @@ func TestChannelReqNowTooMuch(t *testing.T) {
ast.True(t, reqCnt >= count*MaxAttempts/2)
}

func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) {
count := 30
opts := NewOptions()
opts.SyncEvery = 1
opts.MaxRdyCount = 100
opts.MaxConfirmWin = 10
opts.Logger = newTestLogger(t)
opts.MsgTimeout = 100 * time.Millisecond
// use large to delay the period scan
opts.QueueScanRefreshInterval = 10 * time.Second
opts.QueueScanInterval = time.Millisecond * 100
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_requeued_toomuch" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopicIgnPart(topicName)
channel := topic.GetChannel("channel")

for i := 0; i < count; i++ {
msg := NewMessage(0, []byte("test"))
_, _, _, _, err := topic.PutMessage(msg)
equal(t, err, nil)
}

channel.AddClient(1, NewFakeConsumer(1))
start := time.Now()
reqCnt := 0
timeout := 0
lastDelay := time.Now()
for time.Since(start) < time.Second*5 {
select {
case <-time.After(time.Second):
timeout++
case outputMsg, ok := <-channel.clientMsgChan:
if !ok {
t.Error("eror recv")
return
}
channel.inFlightMutex.Lock()
waitingReq := len(channel.waitingRequeueMsgs)
reqChanLen := len(channel.requeuedMsgChan)
channel.inFlightMutex.Unlock()

reqCnt++
now := time.Now()
t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen)
// should not read too much from backend
ast.True(t, int64(outputMsg.ID) <= opts.MaxConfirmWin+1, "should not read backend too much")
channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout)
// requeue with different timeout to make sure the memory deferred cnt is high
// since after timeout deferred cnt will be reset
lastDelay = lastDelay.Add(time.Millisecond * 101)
delay := time.Since(lastDelay)
channel.RequeueMessage(1, "", outputMsg.ID, delay, false)
}
}

t.Logf("total req cnt: %v, timeout: %v", reqCnt, timeout)
ast.True(t, int64(reqCnt) >= opts.MaxConfirmWin*2)
ast.Equal(t, 0, timeout)
start = time.Now()
for time.Since(start) < time.Second*5 {
if channel.Depth() == 0 {
break
}
select {
case <-time.After(time.Second):
timeout++
case outputMsg, ok := <-channel.clientMsgChan:
if !ok {
t.Error("eror recv")
return
}
channel.inFlightMutex.Lock()
waitingReq := len(channel.waitingRequeueMsgs)
reqChanLen := len(channel.requeuedMsgChan)
channel.inFlightMutex.Unlock()

reqCnt++
now := time.Now()
t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen)
channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout)
channel.FinishMessage(1, "", outputMsg.ID)
}
}
ast.Equal(t, int64(0), channel.Depth())
}

func TestChannelEmptyWhileConfirmDelayMsg(t *testing.T) {
// test confirm delay counter while empty channel
opts := NewOptions()
Expand Down

0 comments on commit 37821e0

Please sign in to comment.