Skip to content

Commit

Permalink
fix(benchmark): refine the benchmark tool for log subscription
Browse files Browse the repository at this point in the history
This update fixes the benchmark tool, enabling it to subscribe to all logs from
specified a topic or a log stream. It introduces a mechanism where a loader
fetches logs, with the number of logs defined by the "--subscribe-size" CLI
flag.
  • Loading branch information
ijsong committed Feb 26, 2024
1 parent e0ef909 commit 9199784
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 114 deletions.
92 changes: 57 additions & 35 deletions cmd/benchmark/test_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,78 @@ import (

var (
flagClusterID = flags.ClusterID
flagTarget = &cli.StringSliceFlag{

flagTarget = &cli.StringSliceFlag{
Name: "target",
Category: "Common: ",
Required: true,
Usage: "The target of the benchmark load formatted by \"topic1:logstream1,topic2:logstream2,...<topic_id:logstream_id>\"",
}
flagMRAddrs = &cli.StringSliceFlag{
Name: "address",
Category: "Common: ",
Required: true,
}
flagMsgSize = &cli.UintSliceFlag{
Name: "message-size",
Aliases: []string{"msg-size"},
Value: cli.NewUintSlice(benchmark.DefaultMessageSize),
Usage: "Message sizes for each load target",
flagDuration = &cli.DurationFlag{
Name: "duration",
Category: "Common: ",
Value: benchmark.DefaultDuration,
}
flagBatchSize = &cli.UintSliceFlag{
Name: "batch-size",
Value: cli.NewUintSlice(benchmark.DefaultBatchSize),
Usage: "Batch sizes for each load target",
flagReportInterval = &cli.DurationFlag{
Name: "report-interval",
Category: "Common: ",
Value: benchmark.DefaultReportInterval,
}
flagAppenders = &cli.UintSliceFlag{
Name: "appenders",
Aliases: []string{"appenders-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of appenders for each load target",
flagPrintJSON = &cli.BoolFlag{
Name: "print-json",
Category: "Common: ",
Usage: "Print json output if it is set",
}
flagSubscribers = &cli.UintSliceFlag{
Name: "subscribers",
Aliases: []string{"subscribers-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of subscribers for each load target",
flagSingleConnPerTarget = &cli.BoolFlag{
Name: "single-conn-per-target",
Category: "Common: ",
Usage: "Use single connection shared by appenders in a target. Each target uses different connection.",
}
flagDuration = &cli.DurationFlag{
Name: "duration",
Value: benchmark.DefaultDuration,

flagAppenders = &cli.UintSliceFlag{
Name: "appenders",
Category: "Append: ",
Aliases: []string{"appenders-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of appenders for each load target",
}
flagReportInterval = &cli.DurationFlag{
Name: "report-interval",
Value: benchmark.DefaultReportInterval,
flagMsgSize = &cli.UintSliceFlag{
Name: "message-size",
Category: "Append: ",
Aliases: []string{"msg-size"},
Value: cli.NewUintSlice(benchmark.DefaultMessageSize),
Usage: "Message sizes for each load target",
}
flagPrintJSON = &cli.BoolFlag{
Name: "print-json",
Usage: "Print json output if it is set",
flagBatchSize = &cli.UintSliceFlag{
Name: "batch-size",
Category: "Append: ",
Value: cli.NewUintSlice(benchmark.DefaultBatchSize),
Usage: "Batch sizes for each load target",
}
flagPipelineSize = &cli.IntFlag{
Name: "pipeline-size",
Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.",
Value: 0,
Name: "pipeline-size",
Category: "Append: ",
Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.",
Value: 0,
}
flagSingleConnPerTarget = &cli.BoolFlag{
Name: "single-conn-per-target",
Usage: "Use single connection shared by appenders in a target. Each target uses different connection.",

flagSubscribers = &cli.UintSliceFlag{
Name: "subscribers",
Category: "Subscribe: ",
Aliases: []string{"subscribers-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of subscribers for each load target",
}
flagSubscribeSize = &cli.IntFlag{
Name: "subscribe-size",
Category: "Subscribe: ",
Usage: "The number of messages to subscribe at once",
Value: 1000,
}
)

Expand All @@ -85,6 +105,7 @@ func newCommandTest() *cli.Command {
flagPrintJSON,
flagPipelineSize,
flagSingleConnPerTarget,
flagSubscribeSize,

Check warning on line 108 in cmd/benchmark/test_command.go

View check run for this annotation

Codecov / codecov/patch

cmd/benchmark/test_command.go#L108

Added line #L108 was not covered by tests
},
Action: runCommandTest,
}
Expand Down Expand Up @@ -118,6 +139,7 @@ func runCommandTest(c *cli.Context) error {
}
}
target.PipelineSize = c.Int(flagPipelineSize.Name)
target.SubscribeSize = c.Int(flagSubscribeSize.Name)

Check warning on line 142 in cmd/benchmark/test_command.go

View check run for this annotation

Codecov / codecov/patch

cmd/benchmark/test_command.go#L142

Added line #L142 was not covered by tests
targets[idx] = target
}

Expand Down
6 changes: 4 additions & 2 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func New(opts ...Option) (bm *Benchmark, err error) {

// Run starts Loaders and metric reporter. It blocks until the loaders are finished.
func (bm *Benchmark) Run() error {
g, ctx := errgroup.WithContext(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)

Check warning on line 74 in internal/benchmark/benchmark.go

View check run for this annotation

Codecov / codecov/patch

internal/benchmark/benchmark.go#L72-L74

Added lines #L72 - L74 were not covered by tests

benchmarkTimer := time.NewTimer(bm.duration)

Expand All @@ -87,6 +89,7 @@ func (bm *Benchmark) Run() error {
close(bm.stopC)
wg.Done()
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
cancel()

Check warning on line 92 in internal/benchmark/benchmark.go

View check run for this annotation

Codecov / codecov/patch

internal/benchmark/benchmark.go#L92

Added line #L92 was not covered by tests
}()
for {
select {
Expand All @@ -105,7 +108,6 @@ func (bm *Benchmark) Run() error {
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
}
}

}()

for _, tw := range bm.loaders {
Expand Down
Loading

0 comments on commit 9199784

Please sign in to comment.