Skip to content

Commit

Permalink
Merge branch 'optimize-exception' into 'master'
Browse files Browse the repository at this point in the history
avoid too much memory delay to avoid too much memory usage

See merge request !28
  • Loading branch information
元守 committed Nov 5, 2020
2 parents 1a18643 + 37821e0 commit 8ddfdee
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 14 deletions.
4 changes: 2 additions & 2 deletions consistence/coordinator_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo)
}
begin := time.Now()
tp.writeHold.Lock()
defer tp.writeHold.Unlock()
if time.Since(begin) > time.Second*3 {
// timeout for waiting
coordLog.Infof("timeout while enable write for topic: %v", tp.GetData().topicInfo.GetTopicDesp())
Expand All @@ -465,7 +466,6 @@ func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo)
}
}
}
tp.writeHold.Unlock()

if err != nil {
ret = *err
Expand Down Expand Up @@ -499,6 +499,7 @@ func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo
begin := time.Now()

tp.writeHold.Lock()
defer tp.writeHold.Unlock()
if time.Since(begin) > time.Second*3 {
// timeout for waiting
err = ErrOperationExpired
Expand All @@ -522,7 +523,6 @@ func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo
self.nsqdCoord.switchStateForMaster(tp, localTopic, false)
}
}
tp.writeHold.Unlock()
if err != nil {
ret = *err
return &ret
Expand Down
2 changes: 1 addition & 1 deletion consistence/nsqd_coordinator_cluster_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ func (ncoord *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoord
wstart := time.Now()
if isWrite {
coord.writeHold.Lock()
defer coord.writeHold.Unlock()
if time.Since(wstart) > maxWriteWaitTimeout {
return ErrOperationExpired
}
defer coord.writeHold.Unlock()
}

if coord.IsExiting() {
Expand Down
72 changes: 72 additions & 0 deletions consistence/nsqd_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,78 @@ func testNsqdCoordCatchupCleanOldData(t *testing.T, meta TopicMetaInfo) {
t.Log(logs3)
}

func TestNsqdCoordPutMessageOpExpired(t *testing.T) {
topic := "coordTestTopic"
partition := 1

testRPCTimeoutAndWait = true
defer func() {
testRPCTimeoutAndWait = false
}()
if testing.Verbose() {
SetCoordLogger(newTestLogger(t), levellogger.LOG_DETAIL)
glog.SetFlags(0, "", "", true, true, 1)
glog.StartWorker(time.Second)
} else {
SetCoordLogger(newTestLogger(t), levellogger.LOG_DEBUG)
}

nsqd1, randPort1, nodeInfo1, data1 := newNsqdNode(t, "id1")
defer os.RemoveAll(data1)
defer nsqd1.Exit()
nsqdCoord1 := startNsqdCoord(t, strconv.Itoa(randPort1), data1, "id1", nsqd1, true)
nsqdCoord1.Start()
defer nsqdCoord1.Stop()
time.Sleep(time.Second)

nsqd2, randPort2, _, data2 := newNsqdNode(t, "id2")
defer os.RemoveAll(data2)
defer nsqd2.Exit()
nsqdCoord2 := startNsqdCoord(t, strconv.Itoa(randPort2), data2, "id2", nsqd2, true)
nsqdCoord2.Start()
defer nsqdCoord2.Stop()

var topicInitInfo RpcAdminTopicInfo
topicInitInfo.Name = topic
topicInitInfo.Partition = partition
topicInitInfo.Epoch = 1
topicInitInfo.EpochForWrite = 1
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord1.myNode.GetID())
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord2.myNode.GetID())
topicInitInfo.Leader = nsqdCoord1.myNode.GetID()
topicInitInfo.Replica = 2
ensureTopicOnNsqdCoord(nsqdCoord1, topicInitInfo)
ensureTopicOnNsqdCoord(nsqdCoord2, topicInitInfo)
leaderSession := &TopicLeaderSession{
LeaderNode: nodeInfo1,
LeaderEpoch: 1,
Session: "fake123",
}
// normal test
ensureTopicLeaderSession(nsqdCoord1, topic, partition, leaderSession)
ensureTopicLeaderSession(nsqdCoord2, topic, partition, leaderSession)
ensureTopicDisableWrite(nsqdCoord1, topic, partition, false)
ensureTopicDisableWrite(nsqdCoord2, topic, partition, false)
topicData1 := nsqd1.GetTopic(topic, partition, false)
topicData1.GetChannel("ch1")
go func() {
time.Sleep(time.Millisecond)
_, _, _, _, err := nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0)
test.Equal(t, ErrOperationExpired.ToErrorType(), err)
}()
go func() {
time.Sleep(time.Millisecond)
_, _, _, _, err := nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0)
test.Equal(t, ErrOperationExpired.ToErrorType(), err)
}()
_, _, _, _, err := nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0)
test.NotNil(t, err)

time.Sleep(time.Millisecond)
_, _, _, _, err = nsqdCoord1.PutMessageBodyToCluster(topicData1, []byte("123"), 0)
test.NotNil(t, err)
}

func TestNsqdCoordPutMessageAndSyncChannelOffset(t *testing.T) {
testNsqdCoordPutMessageAndSyncChannelOffset(t, false)
}
Expand Down
9 changes: 9 additions & 0 deletions consistence/nsqd_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"google.golang.org/grpc"
)

var testRPCTimeoutAndWait bool

const (
RPC_TIMEOUT = time.Duration(time.Second * 5)
RPC_TIMEOUT_SHORT = time.Duration(time.Second * 3)
Expand Down Expand Up @@ -158,6 +160,13 @@ func (nrpc *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interf
for retry < 5 {
retry++
reply, err = nrpc.dc.Call(method, arg)
if testRPCTimeoutAndWait {
time.Sleep(maxWriteWaitTimeout)
e := gorpc.ErrCanceled
e.Timeout = true
e.Connection = true
err = e
}
if err != nil {
cerr, ok := err.(*gorpc.ClientError)
if (ok && cerr.Connection) || nrpc.ShouldRemoved() {
Expand Down
18 changes: 12 additions & 6 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,7 @@ LOOP:
}

resetReaderFlag := atomic.LoadInt32(&c.needResetReader)
deCnt := atomic.LoadInt64(&c.deferredCount)
if resetReaderFlag > 0 {
nsqLog.Infof("reset the reader : %v", c.GetConfirmed())
err = c.resetReaderToConfirmed()
Expand Down Expand Up @@ -1903,12 +1904,14 @@ LOOP:
} else if readBackendWait {
readChan = nil
needReadBackend = false
} else if atomic.LoadInt32(&c.waitingConfirm) > maxWin {
} else if atomic.LoadInt32(&c.waitingConfirm) > maxWin ||
c.isTooMuchDeferredInMem(deCnt) {
if nsqLog.Level() >= levellogger.LOG_DEBUG {
nsqLog.LogDebugf("channel %v reader is holding: %v, %v",
nsqLog.LogDebugf("channel %v-%v reader is holding: %v, %v, mem defer: %v",
c.GetTopicName(),
c.GetName(),
atomic.LoadInt32(&c.waitingConfirm),
c.GetConfirmed())
c.GetConfirmed(), deCnt)
}
atomic.StoreInt32(&c.needNotifyRead, 1)

Expand Down Expand Up @@ -1978,10 +1981,13 @@ LOOP:
if waitingReq > 0 {
notified := c.nsqdNotify.NotifyScanChannel(c, false)
if !notified {
// try later
go func() {
// try later, avoid too much
c.nsqdNotify.PushTopicJob(c.GetTopicName(), func() {
c.nsqdNotify.NotifyScanChannel(c, true)
}()
nsqLog.LogDebugf("notify refill req done %v-%v from requeue", c.GetTopicName(), c.GetName())
})
} else {
nsqLog.LogDebugf("notify refill req done %v-%v from requeue", c.GetTopicName(), c.GetName())
}
}
select {
Expand Down
89 changes: 89 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,95 @@ func TestChannelReqNowTooMuch(t *testing.T) {
ast.True(t, reqCnt >= count*MaxAttempts/2)
}

func TestChannelReqTooMuchInDeferShouldNotContinueReadBackend(t *testing.T) {
count := 30
opts := NewOptions()
opts.SyncEvery = 1
opts.MaxRdyCount = 100
opts.MaxConfirmWin = 10
opts.Logger = newTestLogger(t)
opts.MsgTimeout = 100 * time.Millisecond
// use large to delay the period scan
opts.QueueScanRefreshInterval = 10 * time.Second
opts.QueueScanInterval = time.Millisecond * 100
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_requeued_toomuch" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopicIgnPart(topicName)
channel := topic.GetChannel("channel")

for i := 0; i < count; i++ {
msg := NewMessage(0, []byte("test"))
_, _, _, _, err := topic.PutMessage(msg)
equal(t, err, nil)
}

channel.AddClient(1, NewFakeConsumer(1))
start := time.Now()
reqCnt := 0
timeout := 0
lastDelay := time.Now()
for time.Since(start) < time.Second*5 {
select {
case <-time.After(time.Second):
timeout++
case outputMsg, ok := <-channel.clientMsgChan:
if !ok {
t.Error("eror recv")
return
}
channel.inFlightMutex.Lock()
waitingReq := len(channel.waitingRequeueMsgs)
reqChanLen := len(channel.requeuedMsgChan)
channel.inFlightMutex.Unlock()

reqCnt++
now := time.Now()
t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen)
// should not read too much from backend
ast.True(t, int64(outputMsg.ID) <= opts.MaxConfirmWin+1, "should not read backend too much")
channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout)
// requeue with different timeout to make sure the memory deferred cnt is high
// since after timeout deferred cnt will be reset
lastDelay = lastDelay.Add(time.Millisecond * 101)
delay := time.Since(lastDelay)
channel.RequeueMessage(1, "", outputMsg.ID, delay, false)
}
}

t.Logf("total req cnt: %v, timeout: %v", reqCnt, timeout)
ast.True(t, int64(reqCnt) >= opts.MaxConfirmWin*2)
ast.Equal(t, 0, timeout)
start = time.Now()
for time.Since(start) < time.Second*5 {
if channel.Depth() == 0 {
break
}
select {
case <-time.After(time.Second):
timeout++
case outputMsg, ok := <-channel.clientMsgChan:
if !ok {
t.Error("eror recv")
return
}
channel.inFlightMutex.Lock()
waitingReq := len(channel.waitingRequeueMsgs)
reqChanLen := len(channel.requeuedMsgChan)
channel.inFlightMutex.Unlock()

reqCnt++
now := time.Now()
t.Logf("consume %v at %s , %v, %v", outputMsg.ID, now, waitingReq, reqChanLen)
channel.StartInFlightTimeout(outputMsg, NewFakeConsumer(1), "", opts.MsgTimeout)
channel.FinishMessage(1, "", outputMsg.ID)
}
}
ast.Equal(t, int64(0), channel.Depth())
}

func TestChannelEmptyWhileConfirmDelayMsg(t *testing.T) {
// test confirm delay counter while empty channel
opts := NewOptions()
Expand Down
8 changes: 4 additions & 4 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type INsqdNotify interface {
NotifyStateChanged(v interface{}, needPersist bool)
ReqToEnd(*Channel, *Message, time.Duration) error
NotifyScanChannel(c *Channel, wait bool) bool
PushTopicJob(*Topic, func())
PushTopicJob(string, func())
}

type ReqToEndFunc func(*Channel, *Message, time.Duration) error
Expand Down Expand Up @@ -1002,8 +1002,8 @@ func (n *NSQD) resizeTopicJobPool(tnum int, jobCh chan func(), closeCh chan int)
})
}

func (n *NSQD) PushTopicJob(t *Topic, job func()) {
h := int(murmur3.Sum32([]byte(t.GetFullName())))
func (n *NSQD) PushTopicJob(shardingName string, job func()) {
h := int(murmur3.Sum32([]byte(shardingName)))
index := h % len(n.topicJobChList)
for i := 0; i < len(n.topicJobChList); i++ {
ch := n.topicJobChList[(index+i)%len(n.topicJobChList)]
Expand All @@ -1013,7 +1013,7 @@ func (n *NSQD) PushTopicJob(t *Topic, job func()) {
default:
}
}
nsqLog.LogDebugf("%v topic job push ignored: %T", t.GetFullName(), job)
nsqLog.LogDebugf("%v topic job push ignored: %T", shardingName, job)
}

func doJob(job func()) {
Expand Down
2 changes: 1 addition & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func (t *Topic) ForceFlushForChannels(wait bool) {
}

func (t *Topic) notifyChEndChanged(force bool) {
t.nsqdNotify.PushTopicJob(t, func() { t.flushForChannels(force) })
t.nsqdNotify.PushTopicJob(t.GetTopicName(), func() { t.flushForChannels(force) })
}

func (t *Topic) flushForChannels(forceUpdate bool) {
Expand Down

0 comments on commit 8ddfdee

Please sign in to comment.