Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(benchmark): refine the benchmark tool for log subscription #716

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 57 additions & 35 deletions cmd/benchmark/test_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,78 @@ import (

var (
flagClusterID = flags.ClusterID
flagTarget = &cli.StringSliceFlag{

flagTarget = &cli.StringSliceFlag{
Name: "target",
Category: "Common: ",
Required: true,
Usage: "The target of the benchmark load formatted by \"topic1:logstream1,topic2:logstream2,...<topic_id:logstream_id>\"",
}
flagMRAddrs = &cli.StringSliceFlag{
Name: "address",
Category: "Common: ",
Required: true,
}
flagMsgSize = &cli.UintSliceFlag{
Name: "message-size",
Aliases: []string{"msg-size"},
Value: cli.NewUintSlice(benchmark.DefaultMessageSize),
Usage: "Message sizes for each load target",
flagDuration = &cli.DurationFlag{
Name: "duration",
Category: "Common: ",
Value: benchmark.DefaultDuration,
}
flagBatchSize = &cli.UintSliceFlag{
Name: "batch-size",
Value: cli.NewUintSlice(benchmark.DefaultBatchSize),
Usage: "Batch sizes for each load target",
flagReportInterval = &cli.DurationFlag{
Name: "report-interval",
Category: "Common: ",
Value: benchmark.DefaultReportInterval,
}
flagAppenders = &cli.UintSliceFlag{
Name: "appenders",
Aliases: []string{"appenders-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of appenders for each load target",
flagPrintJSON = &cli.BoolFlag{
Name: "print-json",
Category: "Common: ",
Usage: "Print json output if it is set",
}
flagSubscribers = &cli.UintSliceFlag{
Name: "subscribers",
Aliases: []string{"subscribers-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of subscribers for each load target",
flagSingleConnPerTarget = &cli.BoolFlag{
Name: "single-conn-per-target",
Category: "Common: ",
Usage: "Use single connection shared by appenders in a target. Each target uses different connection.",
}
flagDuration = &cli.DurationFlag{
Name: "duration",
Value: benchmark.DefaultDuration,

flagAppenders = &cli.UintSliceFlag{
Name: "appenders",
Category: "Append: ",
Aliases: []string{"appenders-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of appenders for each load target",
}
flagReportInterval = &cli.DurationFlag{
Name: "report-interval",
Value: benchmark.DefaultReportInterval,
flagMsgSize = &cli.UintSliceFlag{
Name: "message-size",
Category: "Append: ",
Aliases: []string{"msg-size"},
Value: cli.NewUintSlice(benchmark.DefaultMessageSize),
Usage: "Message sizes for each load target",
}
flagPrintJSON = &cli.BoolFlag{
Name: "print-json",
Usage: "Print json output if it is set",
flagBatchSize = &cli.UintSliceFlag{
Name: "batch-size",
Category: "Append: ",
Value: cli.NewUintSlice(benchmark.DefaultBatchSize),
Usage: "Batch sizes for each load target",
}
flagPipelineSize = &cli.IntFlag{
Name: "pipeline-size",
Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.",
Value: 0,
Name: "pipeline-size",
Category: "Append: ",
Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.",
Value: 0,
}
flagSingleConnPerTarget = &cli.BoolFlag{
Name: "single-conn-per-target",
Usage: "Use single connection shared by appenders in a target. Each target uses different connection.",

flagSubscribers = &cli.UintSliceFlag{
Name: "subscribers",
Category: "Subscribe: ",
Aliases: []string{"subscribers-count"},
Value: cli.NewUintSlice(benchmark.DefaultConcurrency),
Usage: "The number of subscribers for each load target",
}
flagSubscribeSize = &cli.IntFlag{
Name: "subscribe-size",
Category: "Subscribe: ",
Usage: "The number of messages to subscribe at once",
Value: 1000,
}
)

Expand All @@ -85,6 +105,7 @@ func newCommandTest() *cli.Command {
flagPrintJSON,
flagPipelineSize,
flagSingleConnPerTarget,
flagSubscribeSize,
},
Action: runCommandTest,
}
Expand Down Expand Up @@ -118,6 +139,7 @@ func runCommandTest(c *cli.Context) error {
}
}
target.PipelineSize = c.Int(flagPipelineSize.Name)
target.SubscribeSize = c.Int(flagSubscribeSize.Name)
targets[idx] = target
}

Expand Down
6 changes: 4 additions & 2 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func New(opts ...Option) (bm *Benchmark, err error) {

// Run starts Loaders and metric reporter. It blocks until the loaders are finished.
func (bm *Benchmark) Run() error {
g, ctx := errgroup.WithContext(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)

benchmarkTimer := time.NewTimer(bm.duration)

Expand All @@ -87,6 +89,7 @@ func (bm *Benchmark) Run() error {
close(bm.stopC)
wg.Done()
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
cancel()
}()
for {
select {
Expand All @@ -105,7 +108,6 @@ func (bm *Benchmark) Run() error {
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
}
}

}()

for _, tw := range bm.loaders {
Expand Down
Loading