Skip to content

Commit

Permalink
perf(client): prealloc pkg/varlog.(*transmitter).transmitQueue
Browse files Browse the repository at this point in the history
This PR preallocates `pkg/varlog.(*transmitter).transmitQueue`.

Benchmark:

```
goos: darwin
goarch: amd64
pkg: github.com/kakao/varlog/pkg/varlog
cpu: Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
                     │ out.NoAlloc │            out.PreAlloc             │
                     │   sec/op    │   sec/op     vs base                │
TransmitQueue/128-8    17.88µ ± 1%   16.57µ ± 0%   -7.34% (p=0.000 n=20)
TransmitQueue/1024-8   149.7µ ± 0%   136.3µ ± 0%   -8.93% (p=0.000 n=20)
TransmitQueue/8192-8   1.313m ± 1%   1.156m ± 0%  -11.97% (p=0.000 n=20)
geomean                152.0µ        137.7µ        -9.43%

                     │  out.NoAlloc  │             out.PreAlloc             │
                     │     B/op      │     B/op      vs base                │
TransmitQueue/128-8     14.40Ki ± 0%   12.29Ki ± 0%  -14.65% (p=0.000 n=20)
TransmitQueue/1024-8   138.40Ki ± 0%   98.04Ki ± 0%  -29.16% (p=0.000 n=20)
TransmitQueue/8192-8   1114.4Ki ± 0%   768.0Ki ± 0%  -31.08% (p=0.000 n=20)
geomean                 130.5Ki        97.45Ki       -25.31%

                     │ out.NoAlloc │            out.PreAlloc            │
                     │  allocs/op  │  allocs/op   vs base               │
TransmitQueue/128-8     138.0 ± 0%    131.0 ± 0%  -5.07% (p=0.000 n=20)
TransmitQueue/1024-8   1.038k ± 0%   1.027k ± 0%  -1.06% (p=0.000 n=20)
TransmitQueue/8192-8   8.211k ± 0%   8.195k ± 0%  -0.19% (p=0.000 n=20)
geomean                1.056k        1.033k       -2.13%
```
  • Loading branch information
ijsong committed Mar 5, 2024
1 parent a58015b commit 43ba3be
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
9 changes: 9 additions & 0 deletions bench_tq.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

set -e

go test -c ./pkg/varlog
./varlog.test -test.v -test.run - -test.bench BenchmarkTransmitQueue -test.count 20 -test.benchmem -test.timeout 10h | tee out
grep -v PreAlloc out | sed 's,/NoAlloc,,g' >out.NoAlloc
grep -v NoAlloc out | sed 's,/PreAlloc,,g' >out.PreAlloc
benchstat out.NoAlloc out.PreAlloc
60 changes: 60 additions & 0 deletions pkg/varlog/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package varlog

import (
"fmt"
"math/rand"
"testing"

"github.com/kakao/varlog/internal/storagenode/client"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)

func BenchmarkTransmitQueue(b *testing.B) {
sizes := []int{1 << 7, 1 << 10, 1 << 13}
tcs := []struct {
generate func(int) *transmitQueue
name string
}{
{
name: "NoAlloc",
generate: func(int) *transmitQueue {
return &transmitQueue{
pq: &PriorityQueue{},
}
},
},
{
name: "PreAlloc",
generate: func(size int) *transmitQueue {
return &transmitQueue{
pq: newPriorityQueue(size / 2),
}
},
},
}

for _, tc := range tcs {
for _, size := range sizes {
name := fmt.Sprintf("%s/%d", tc.name, size)
b.Run(name, func(b *testing.B) {
b.ResetTimer()
for range b.N {
tq := tc.generate(size)
for range size {
tr := transmitResult{
result: client.SubscribeResult{
LogEntry: varlogpb.LogEntry{
LogEntryMeta: varlogpb.LogEntryMeta{
GLSN: types.GLSN(rand.Int31()),
},
},
},
}
tq.Push(tr)
}
}
})
}
}
}
12 changes: 11 additions & 1 deletion pkg/varlog/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (v *logImpl) subscribe(ctx context.Context, topicID types.TopicID, begin, e
sleq := newSubscribedLogEntiresQueue(begin, end, closer, v.logger)

tlogger := v.logger.Named("transmitter")
// The maximum length of transmitQ is end - begin in the worst case.
// Therefore, we can approximate half of it.
// TODO: Use a better approximation.
approxTransmitQSize := int((end - begin) / 2)

Check warning on line 54 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L54

Added line #L54 was not covered by tests
tsm := &transmitter{
topicID: topicID,
subscribers: make(map[types.LogStreamID]*subscriber),
Expand All @@ -57,7 +61,7 @@ func (v *logImpl) subscribe(ctx context.Context, topicID types.TopicID, begin, e
sleq: sleq,
wanted: begin,
end: end,
transmitQ: &transmitQueue{pq: &PriorityQueue{}},
transmitQ: &transmitQueue{pq: newPriorityQueue(approxTransmitQSize)},

Check warning on line 64 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L64

Added line #L64 was not covered by tests
transmitCV: transmitCV,
timeout: subscribeOpts.timeout,
runner: runner.New("transmitter", tlogger),
Expand Down Expand Up @@ -89,6 +93,12 @@ type PriorityQueueItem interface {

type PriorityQueue []PriorityQueueItem

func newPriorityQueue(size int) *PriorityQueue {
items := make([]PriorityQueueItem, 0, size)
pq := PriorityQueue(items)
return &pq

Check warning on line 99 in pkg/varlog/subscribe.go

View check run for this annotation

Codecov / codecov/patch

pkg/varlog/subscribe.go#L96-L99

Added lines #L96 - L99 were not covered by tests
}

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
Expand Down

0 comments on commit 43ba3be

Please sign in to comment.