diff --git a/consistence/coordinator_rpc.go b/consistence/coordinator_rpc.go index d9826992..025e65c1 100644 --- a/consistence/coordinator_rpc.go +++ b/consistence/coordinator_rpc.go @@ -445,6 +445,7 @@ func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) } begin := time.Now() tp.writeHold.Lock() + defer tp.writeHold.Unlock() if time.Since(begin) > time.Second*3 { // timeout for waiting coordLog.Infof("timeout while enable write for topic: %v", tp.GetData().topicInfo.GetTopicDesp()) @@ -465,7 +466,6 @@ func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) } } } - tp.writeHold.Unlock() if err != nil { ret = *err @@ -499,6 +499,7 @@ func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo begin := time.Now() tp.writeHold.Lock() + defer tp.writeHold.Unlock() if time.Since(begin) > time.Second*3 { // timeout for waiting err = ErrOperationExpired @@ -522,7 +523,6 @@ func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo self.nsqdCoord.switchStateForMaster(tp, localTopic, false) } } - tp.writeHold.Unlock() if err != nil { ret = *err return &ret diff --git a/consistence/nsqd_coordinator_cluster_write.go b/consistence/nsqd_coordinator_cluster_write.go index 3eabb530..7949019b 100644 --- a/consistence/nsqd_coordinator_cluster_write.go +++ b/consistence/nsqd_coordinator_cluster_write.go @@ -350,10 +350,10 @@ func (ncoord *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoord wstart := time.Now() if isWrite { coord.writeHold.Lock() + defer coord.writeHold.Unlock() if time.Since(wstart) > maxWriteWaitTimeout { return ErrOperationExpired } - defer coord.writeHold.Unlock() } if coord.IsExiting() { diff --git a/consistence/nsqd_coordinator_test.go b/consistence/nsqd_coordinator_test.go index 712bce68..1893a7fe 100644 --- a/consistence/nsqd_coordinator_test.go +++ b/consistence/nsqd_coordinator_test.go @@ -1527,6 +1527,78 @@ func testNsqdCoordCatchupCleanOldData(t *testing.T, meta TopicMetaInfo) { t.Log(logs3) } +func TestNsqdCoordPutMessageOpExpired(t *testing.T) { + topic := "coordTestTopic" + partition := 1 + + testRPCTimeoutAndWait = true + defer func() { + testRPCTimeoutAndWait = false + }() + if testing.Verbose() { + SetCoordLogger(newTestLogger(t), levellogger.LOG_DETAIL) + glog.SetFlags(0, "", "", true, true, 1) + glog.StartWorker(time.Second) + } else { + SetCoordLogger(newTestLogger(t), levellogger.LOG_DEBUG) + } + + nsqd1, randPort1, nodeInfo1, data1 := newNsqdNode(t, "id1") + defer os.RemoveAll(data1) + defer nsqd1.Exit() + nsqdCoord1 := startNsqdCoord(t, strconv.Itoa(randPort1), data1, "id1", nsqd1, true) + nsqdCoord1.Start() + defer nsqdCoord1.Stop() + time.Sleep(time.Second) + + nsqd2, randPort2, _, data2 := newNsqdNode(t, "id2") + defer os.RemoveAll(data2) + defer nsqd2.Exit() + nsqdCoord2 := startNsqdCoord(t, strconv.Itoa(randPort2), data2, "id2", nsqd2, true) + nsqdCoord2.Start() + defer nsqdCoord2.Stop() + + var topicInitInfo RpcAdminTopicInfo + topicInitInfo.Name = topic + topicInitInfo.Partition = partition + topicInitInfo.Epoch = 1 + topicInitInfo.EpochForWrite = 1 + topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord1.myNode.GetID()) + topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord2.myNode.GetID()) + topicInitInfo.Leader = nsqdCoord1.myNode.GetID() + topicInitInfo.Replica = 2 + ensureTopicOnNsqdCoord(nsqdCoord1, topicInitInfo) + ensureTopicOnNsqdCoord(nsqdCoord2, topicInitInfo) + leaderSession := &TopicLeaderSession{ + LeaderNode: nodeInfo1, + LeaderEpoch: 1, + Session: "fake123", + } + // normal test + ensureTopicLeaderSession(nsqdCoord1, topic, partition, leaderSession) + ensureTopicLeaderSession(nsqdCoord2, topic, partition, leaderSession) + ensureTopicDisableWrite(nsqdCoord1, topic, partition, false) + ensureTopicDisableWrite(nsqdCoord2, topic, partition, false) + topicData1 := nsqd1.GetTopic(topic, partition, false) + topicData1.GetChannel("ch1") + go func() { + time.Sleep(time.Millisecond) + _, _, _, _, err := nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0) + test.Equal(t, ErrOperationExpired.ToErrorType(), err) + }() + go func() { + time.Sleep(time.Millisecond) + _, _, _, _, err := nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0) + test.Equal(t, ErrOperationExpired.ToErrorType(), err) + }() + _, _, _, _, err := nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0) + test.NotNil(t, err) + + time.Sleep(time.Millisecond) + _, _, _, _, err = nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0) + test.NotNil(t, err) +} + func TestNsqdCoordPutMessageAndSyncChannelOffset(t *testing.T) { testNsqdCoordPutMessageAndSyncChannelOffset(t, false) } diff --git a/consistence/nsqd_rpc_client.go b/consistence/nsqd_rpc_client.go index d2a8516e..67029d65 100644 --- a/consistence/nsqd_rpc_client.go +++ b/consistence/nsqd_rpc_client.go @@ -13,6 +13,8 @@ import ( "google.golang.org/grpc" ) +var testRPCTimeoutAndWait bool + const ( RPC_TIMEOUT = time.Duration(time.Second * 5) RPC_TIMEOUT_SHORT = time.Duration(time.Second * 3) @@ -158,6 +160,13 @@ func (nrpc *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interf for retry < 5 { retry++ reply, err = nrpc.dc.Call(method, arg) + if testRPCTimeoutAndWait { + time.Sleep(maxWriteWaitTimeout) + e := gorpc.ErrCanceled + e.Timeout = true + e.Connection = true + err = e + } if err != nil { cerr, ok := err.(*gorpc.ClientError) if (ok && cerr.Connection) || nrpc.ShouldRemoved() { diff --git a/nsqd/channel.go b/nsqd/channel.go index af3b61d7..f1dc6549 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -1873,6 +1873,7 @@ LOOP: } resetReaderFlag := atomic.LoadInt32(&c.needResetReader) + deCnt := atomic.LoadInt64(&c.deferredCount) if resetReaderFlag > 0 { nsqLog.Infof("reset the reader : %v", c.GetConfirmed()) err = c.resetReaderToConfirmed() @@ -1903,12 +1904,14 @@ LOOP: } else if readBackendWait { readChan = nil needReadBackend = false - } else if atomic.LoadInt32(&c.waitingConfirm) > maxWin { + } else if atomic.LoadInt32(&c.waitingConfirm) > maxWin || + c.isTooMuchDeferredInMem(deCnt) { if nsqLog.Level() >= levellogger.LOG_DEBUG { - nsqLog.LogDebugf("channel %v reader is holding: %v, %v", + nsqLog.LogDebugf("channel %v-%v reader is holding: %v, %v, mem defer: %v", + c.GetTopicName(), c.GetName(), atomic.LoadInt32(&c.waitingConfirm), - c.GetConfirmed()) + c.GetConfirmed(), deCnt) } atomic.StoreInt32(&c.needNotifyRead, 1) @@ -1978,10 +1981,13 @@ LOOP: if waitingReq > 0 { notified := c.nsqdNotify.NotifyScanChannel(c, false) if !notified { - // try later - go func() { + // try later, avoid too much + c.nsqdNotify.PushTopicJob(c.GetTopicName(), func() { c.nsqdNotify.NotifyScanChannel(c, true) - }() + nsqLog.LogDebugf("notify refill req done %v-%v from requeue", c.GetTopicName(), c.GetName()) + }) + } else { + nsqLog.LogDebugf("notify refill req done %v-%v from requeue", c.GetTopicName(), c.GetName()) } } select { diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 90ef7645..df8cec46 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -340,6 +340,95 @@ func TestChannelReqNowTooMuch(t *testing.T) { ast.True(t, reqCnt >= count*MaxAttempts/2) } +func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) { + count := 30 + opts := NewOptions() + opts.SyncEvery = 1 + opts.MaxRdyCount = 100 + opts.MaxConfirmWin = 10 + opts.Logger = newTestLogger(t) + opts.MsgTimeout = 100 * time.Millisecond + // use large to delay the period scan + opts.QueueScanRefreshInterval = 10 * time.Second + opts.QueueScanInterval = time.Millisecond * 100 + _, _, nsqd := mustStartNSQD(opts) + defer os.RemoveAll(opts.DataPath) + defer nsqd.Exit() + + topicName := "test_requeued_toomuch" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopicIgnPart(topicName) + channel := topic.GetChannel("channel") + + for i := 0; i < count; i++ { + msg := NewMessage(0, []byte("test")) + _, _, _, _, err := topic.PutMessage(msg) + equal(t, err, nil) + } + + channel.AddClient(1, NewFakeConsumer(1)) + start := time.Now() + reqCnt := 0 + timeout := 0 + lastDelay := time.Now() + for time.Since(start) < time.Second*5 { + select { + case <-time.After(time.Second): + timeout++ + case outputMsg, ok := <-channel.clientMsgChan: + if !ok { + t.Error("eror recv") + return + } + channel.inFlightMutex.Lock() + waitingReq := len(channel.waitingRequeueMsgs) + reqChanLen := len(channel.requeuedMsgChan) + channel.inFlightMutex.Unlock() + + reqCnt++ + now := time.Now() + t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen) + // should not read too much from backend + ast.True(t, int64(outputMsg.ID) <= opts.MaxConfirmWin+1, "should not read backend too much") + channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout) + // requeue with different timeout to make sure the memory deferred cnt is high + // since after timeout deferred cnt will be reset + lastDelay = lastDelay.Add(time.Millisecond * 101) + delay := time.Since(lastDelay) + channel.RequeueMessage(1, "", outputMsg.ID, delay, false) + } + } + + t.Logf("total req cnt: %v, timeout: %v", reqCnt, timeout) + ast.True(t, int64(reqCnt) >= opts.MaxConfirmWin*2) + ast.Equal(t, 0, timeout) + start = time.Now() + for time.Since(start) < time.Second*5 { + if channel.Depth() == 0 { + break + } + select { + case <-time.After(time.Second): + timeout++ + case outputMsg, ok := <-channel.clientMsgChan: + if !ok { + t.Error("eror recv") + return + } + channel.inFlightMutex.Lock() + waitingReq := len(channel.waitingRequeueMsgs) + reqChanLen := len(channel.requeuedMsgChan) + channel.inFlightMutex.Unlock() + + reqCnt++ + now := time.Now() + t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen) + channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout) + channel.FinishMessage(1, "", outputMsg.ID) + } + } + ast.Equal(t, int64(0), channel.Depth()) +} + func TestChannelEmptyWhileConfirmDelayMsg(t *testing.T) { // test confirm delay counter while empty channel opts := NewOptions() diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index c8e0697a..87a13ce7 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -59,7 +59,7 @@ type INsqdNotify interface { NotifyStateChanged(v interface{}, needPersist bool) ReqToEnd(*Channel, *Message, time.Duration) error NotifyScanChannel(c *Channel, wait bool) bool - PushTopicJob(*Topic, func()) + PushTopicJob(string, func()) } type ReqToEndFunc func(*Channel, *Message, time.Duration) error @@ -1002,8 +1002,8 @@ func (n *NSQD) resizeTopicJobPool(tnum int, jobCh chan func(), closeCh chan int) }) } -func (n *NSQD) PushTopicJob(t *Topic, job func()) { - h := int(murmur3.Sum32([]byte(t.GetFullName()))) +func (n *NSQD) PushTopicJob(shardingName string, job func()) { + h := int(murmur3.Sum32([]byte(shardingName))) index := h % len(n.topicJobChList) for i := 0; i < len(n.topicJobChList); i++ { ch := n.topicJobChList[(index+i)%len(n.topicJobChList)] @@ -1013,7 +1013,7 @@ func (n *NSQD) PushTopicJob(t *Topic, job func()) { default: } } - nsqLog.LogDebugf("%v topic job push ignored: %T", t.GetFullName(), job) + nsqLog.LogDebugf("%v topic job push ignored: %T", shardingName, job) } func doJob(job func()) { diff --git a/nsqd/topic.go b/nsqd/topic.go index 59be7c8e..26e47bd4 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -973,7 +973,7 @@ func (t *Topic) ForceFlushForChannels(wait bool) { } func (t *Topic) notifyChEndChanged(force bool) { - t.nsqdNotify.PushTopicJob(t, func() { t.flushForChannels(force) }) + t.nsqdNotify.PushTopicJob(t.GetTopicName(), func() { t.flushForChannels(force) }) } func (t *Topic) flushForChannels(forceUpdate bool) {