diff --git a/bench_tq.sh b/bench_tq.sh new file mode 100644 index 000000000..ffa0d10e3 --- /dev/null +++ b/bench_tq.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -e + +go test -c ./pkg/varlog +./varlog.test -test.v -test.run - -test.bench BenchmarkTransmitQueue -test.count 20 -test.benchmem -test.timeout 10h | tee out +grep -v PreAlloc out | sed 's,/NoAlloc,,g' >out.NoAlloc +grep -v NoAlloc out | sed 's,/PreAlloc,,g' >out.PreAlloc +benchstat out.NoAlloc out.PreAlloc diff --git a/pkg/varlog/benchmark_test.go b/pkg/varlog/benchmark_test.go new file mode 100644 index 000000000..2e3cd6745 --- /dev/null +++ b/pkg/varlog/benchmark_test.go @@ -0,0 +1,60 @@ +package varlog + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/kakao/varlog/internal/storagenode/client" + "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/proto/varlogpb" +) + +func BenchmarkTransmitQueue(b *testing.B) { + sizes := []int{1 << 7, 1 << 10, 1 << 13} + tcs := []struct { + generate func(int) *transmitQueue + name string + }{ + { + name: "NoAlloc", + generate: func(int) *transmitQueue { + return &transmitQueue{ + pq: &PriorityQueue{}, + } + }, + }, + { + name: "PreAlloc", + generate: func(size int) *transmitQueue { + return &transmitQueue{ + pq: newPriorityQueue(size), + } + }, + }, + } + + for _, tc := range tcs { + for _, size := range sizes { + name := fmt.Sprintf("%s/%d", tc.name, size) + b.Run(name, func(b *testing.B) { + b.ResetTimer() + for range b.N { + tq := tc.generate(size) + for range size { + tr := transmitResult{ + result: client.SubscribeResult{ + LogEntry: varlogpb.LogEntry{ + LogEntryMeta: varlogpb.LogEntryMeta{ + GLSN: types.GLSN(rand.Int31()), + }, + }, + }, + } + tq.Push(tr) + } + } + }) + } + } +} diff --git a/pkg/varlog/subscribe.go b/pkg/varlog/subscribe.go index 1bc34b668..5e4576932 100644 --- a/pkg/varlog/subscribe.go +++ b/pkg/varlog/subscribe.go @@ -48,6 +48,10 @@ func (v *logImpl) subscribe(ctx context.Context, topicID types.TopicID, begin, e sleq := newSubscribedLogEntiresQueue(begin, end, closer, v.logger) tlogger := v.logger.Named("transmitter") + // The maximum length of transmitQ is end - begin in the worst case. + // Therefore, we can approximate half of it. + // TODO: Use a better approximation. + approxTransmitQSize := int((end - begin) / 2) tsm := &transmitter{ topicID: topicID, subscribers: make(map[types.LogStreamID]*subscriber), @@ -56,8 +60,7 @@ func (v *logImpl) subscribe(ctx context.Context, topicID types.TopicID, begin, e logCLManager: v.logCLManager, sleq: sleq, wanted: begin, - end: end, - transmitQ: &transmitQueue{pq: &PriorityQueue{}}, + transmitQ: &transmitQueue{pq: newPriorityQueue(approxTransmitQSize)}, transmitCV: transmitCV, timeout: subscribeOpts.timeout, runner: runner.New("transmitter", tlogger), @@ -89,6 +92,12 @@ type PriorityQueueItem interface { type PriorityQueue []PriorityQueueItem +func newPriorityQueue(size int) *PriorityQueue { + items := make([]PriorityQueueItem, 0, size) + pq := PriorityQueue(items) + return &pq +} + func (pq PriorityQueue) Len() int { return len(pq) } func (pq PriorityQueue) Less(i, j int) bool {