Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(client): use transmitResultPool to reduce heap allocs #722

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions pkg/varlog/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"math/rand"
"testing"

"github.com/kakao/varlog/internal/storagenode/client"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func BenchmarkTransmitQueue(b *testing.B) {
Expand Down Expand Up @@ -42,15 +40,8 @@ func BenchmarkTransmitQueue(b *testing.B) {
for range b.N {
tq := tc.generate(size)
for range size {
tr := transmitResult{
result: client.SubscribeResult{
LogEntry: varlogpb.LogEntry{
LogEntryMeta: varlogpb.LogEntryMeta{
GLSN: types.GLSN(rand.Int31()),
},
},
},
}
tr := &transmitResult{}
tr.result.GLSN = types.GLSN(rand.Int31())
tq.Push(tr)
}
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/varlog/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,31 @@
return item
}

var transmitResultPool = sync.Pool{
New: func() any {
return &transmitResult{}
},

Check warning on line 128 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L126-L128

Added lines #L126 - L128 were not covered by tests
}

type transmitResult struct {
logStreamID types.LogStreamID
storageNodeID types.StorageNodeID
result client.SubscribeResult
}

func (t transmitResult) Priority() uint64 {
func newTransmitResult(snid types.StorageNodeID, lsid types.LogStreamID) *transmitResult {
t := transmitResultPool.Get().(*transmitResult)
t.storageNodeID = snid
t.logStreamID = lsid
return t

Check warning on line 141 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L137-L141

Added lines #L137 - L141 were not covered by tests
}

func (t *transmitResult) Release() {
*t = transmitResult{}
transmitResultPool.Put(t)

Check warning on line 146 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L144-L146

Added lines #L144 - L146 were not covered by tests
}

func (t *transmitResult) Priority() uint64 {

Check warning on line 149 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L149

Added line #L149 was not covered by tests
return uint64(t.result.GLSN)
}

Expand All @@ -137,37 +155,33 @@
mu sync.Mutex
}

func (tq *transmitQueue) Push(r transmitResult) {
func (tq *transmitQueue) Push(r *transmitResult) {

Check warning on line 158 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L158

Added line #L158 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

heap.Push(tq.pq, r)
}

func (tq *transmitQueue) Pop() (transmitResult, bool) {
func (tq *transmitQueue) Pop() (*transmitResult, bool) {

Check warning on line 165 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L165

Added line #L165 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false

Check warning on line 170 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L170

Added line #L170 was not covered by tests
}

return heap.Pop(tq.pq).(transmitResult), true
return heap.Pop(tq.pq).(*transmitResult), true

Check warning on line 173 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L173

Added line #L173 was not covered by tests
}

func (tq *transmitQueue) Front() (transmitResult, bool) {
func (tq *transmitQueue) Front() (*transmitResult, bool) {

Check warning on line 176 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L176

Added line #L176 was not covered by tests
tq.mu.Lock()
defer tq.mu.Unlock()

if tq.pq.Len() == 0 {
return transmitResult{
result: client.InvalidSubscribeResult,
}, false
return nil, false

Check warning on line 181 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L181

Added line #L181 was not covered by tests
}

return (*tq.pq)[0].(transmitResult), true
return (*tq.pq)[0].(*transmitResult), true

Check warning on line 184 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L184

Added line #L184 was not covered by tests
}

type subscriber struct {
Expand Down Expand Up @@ -233,11 +247,7 @@
case <-ctx.Done():
return
case res, ok := <-s.resultC:
r := transmitResult{
storageNodeID: s.storageNodeID,
logStreamID: s.logStreamID,
}

r := newTransmitResult(s.storageNodeID, s.logStreamID)

Check warning on line 250 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L250

Added line #L250 was not covered by tests
if ok {
r.result = res
} else {
Expand Down Expand Up @@ -391,7 +401,7 @@
p.refreshSubscriber(ctx) //nolint:errcheck,revive // TODO: Handle an error returned.
}

func (p *transmitter) handleError(r transmitResult) error {
func (p *transmitter) handleError(r *transmitResult) error {

Check warning on line 404 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L404

Added line #L404 was not covered by tests
s, ok := p.subscribers[r.logStreamID]
if !ok {
return nil
Expand All @@ -404,7 +414,7 @@
return r.result.Error
}

func (p *transmitter) handleResult(r transmitResult) error {
func (p *transmitter) handleResult(r *transmitResult) error {

Check warning on line 417 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L417

Added line #L417 was not covered by tests
var err error

/* NOTE Ignore transmitResult with GLSN less than p.wanted.
Expand Down Expand Up @@ -436,6 +446,7 @@
if res.result.GLSN <= p.wanted {
res, _ := p.transmitQ.Pop()
err := p.handleResult(res)
res.Release()

Check warning on line 449 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L449

Added line #L449 was not covered by tests
if p.wanted == p.end ||
errors.Is(err, verrors.ErrTrimmed) {
return false
Expand Down