From a79d3c1979948513e30c9d18423549463641d457 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Sat, 24 Feb 2024 01:54:47 +0900 Subject: [PATCH] perf(client): use transmitResultPool to reduce heap allocs Struct pkg/varlog.(transmitResult) is an element of transmitQueue. It is allocated in heap memory to be pushed into transmitQueue. This PR introduces transmitResultPool to reuse transmitResult as possible. --- pkg/varlog/subscribe.go | 49 +++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/pkg/varlog/subscribe.go b/pkg/varlog/subscribe.go index c6da7e6e8..523ce0c4f 100644 --- a/pkg/varlog/subscribe.go +++ b/pkg/varlog/subscribe.go @@ -118,13 +118,31 @@ func (pq *PriorityQueue) Pop() interface{} { return item } +var transmitResultPool = sync.Pool{ + New: func() any { + return &transmitResult{} + }, +} + type transmitResult struct { logStreamID types.LogStreamID storageNodeID types.StorageNodeID result client.SubscribeResult } -func (t transmitResult) Priority() uint64 { +func newTransmitResult(snid types.StorageNodeID, lsid types.LogStreamID) *transmitResult { + t := transmitResultPool.Get().(*transmitResult) + t.storageNodeID = snid + t.logStreamID = lsid + return t +} + +func (t *transmitResult) Release() { + *t = transmitResult{} + transmitResultPool.Put(t) +} + +func (t *transmitResult) Priority() uint64 { return uint64(t.result.GLSN) } @@ -133,37 +151,33 @@ type transmitQueue struct { mu sync.Mutex } -func (tq *transmitQueue) Push(r transmitResult) { +func (tq *transmitQueue) Push(r *transmitResult) { tq.mu.Lock() defer tq.mu.Unlock() heap.Push(tq.pq, r) } -func (tq *transmitQueue) Pop() (transmitResult, bool) { +func (tq *transmitQueue) Pop() (*transmitResult, bool) { tq.mu.Lock() defer tq.mu.Unlock() if tq.pq.Len() == 0 { - return transmitResult{ - result: client.InvalidSubscribeResult, - }, false + return nil, false } - return heap.Pop(tq.pq).(transmitResult), true + return heap.Pop(tq.pq).(*transmitResult), true } -func (tq *transmitQueue) Front() (transmitResult, bool) { +func (tq *transmitQueue) Front() (*transmitResult, bool) { tq.mu.Lock() defer tq.mu.Unlock() if tq.pq.Len() == 0 { - return transmitResult{ - result: client.InvalidSubscribeResult, - }, false + return nil, false } - return (*tq.pq)[0].(transmitResult), true + return (*tq.pq)[0].(*transmitResult), true } type subscriber struct { @@ -229,11 +243,7 @@ func (s *subscriber) subscribe(ctx context.Context) { case <-ctx.Done(): return case res, ok := <-s.resultC: - r := transmitResult{ - storageNodeID: s.storageNodeID, - logStreamID: s.logStreamID, - } - + r := newTransmitResult(s.storageNodeID, s.logStreamID) if ok { r.result = res } else { @@ -387,7 +397,7 @@ func (p *transmitter) handleTimeout(ctx context.Context) { p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned. } -func (p *transmitter) handleError(r transmitResult) error { +func (p *transmitter) handleError(r *transmitResult) error { s, ok := p.subscribers[r.logStreamID] if !ok { return nil @@ -400,7 +410,7 @@ func (p *transmitter) handleError(r transmitResult) error { return r.result.Error } -func (p *transmitter) handleResult(r transmitResult) error { +func (p *transmitter) handleResult(r *transmitResult) error { var err error /* NOTE Ignore transmitResult with GLSN less than p.wanted. @@ -432,6 +442,7 @@ func (p *transmitter) transmitLoop(ctx context.Context) bool { if res.result.GLSN <= p.wanted { res, _ := p.transmitQ.Pop() err := p.handleResult(res) + res.Release() if p.wanted == p.end || errors.Is(err, verrors.ErrTrimmed) { return false