diff --git a/cmd/benchmark/test_command.go b/cmd/benchmark/test_command.go index 41a03db68..3e61264ff 100644 --- a/cmd/benchmark/test_command.go +++ b/cmd/benchmark/test_command.go @@ -13,58 +13,78 @@ import ( var ( flagClusterID = flags.ClusterID - flagTarget = &cli.StringSliceFlag{ + + flagTarget = &cli.StringSliceFlag{ Name: "target", + Category: "Common: ", Required: true, Usage: "The target of the benchmark load formatted by \"topic1:logstream1,topic2:logstream2,...\"", } flagMRAddrs = &cli.StringSliceFlag{ Name: "address", + Category: "Common: ", Required: true, } - flagMsgSize = &cli.UintSliceFlag{ - Name: "message-size", - Aliases: []string{"msg-size"}, - Value: cli.NewUintSlice(benchmark.DefaultMessageSize), - Usage: "Message sizes for each load target", + flagDuration = &cli.DurationFlag{ + Name: "duration", + Category: "Common: ", + Value: benchmark.DefaultDuration, } - flagBatchSize = &cli.UintSliceFlag{ - Name: "batch-size", - Value: cli.NewUintSlice(benchmark.DefaultBatchSize), - Usage: "Batch sizes for each load target", + flagReportInterval = &cli.DurationFlag{ + Name: "report-interval", + Category: "Common: ", + Value: benchmark.DefaultReportInterval, } - flagAppenders = &cli.UintSliceFlag{ - Name: "appenders", - Aliases: []string{"appenders-count"}, - Value: cli.NewUintSlice(benchmark.DefaultConcurrency), - Usage: "The number of appenders for each load target", + flagPrintJSON = &cli.BoolFlag{ + Name: "print-json", + Category: "Common: ", + Usage: "Print json output if it is set", } - flagSubscribers = &cli.UintSliceFlag{ - Name: "subscribers", - Aliases: []string{"subscribers-count"}, - Value: cli.NewUintSlice(benchmark.DefaultConcurrency), - Usage: "The number of subscribers for each load target", + flagSingleConnPerTarget = &cli.BoolFlag{ + Name: "single-conn-per-target", + Category: "Common: ", + Usage: "Use single connection shared by appenders in a target. Each target uses different connection.", } - flagDuration = &cli.DurationFlag{ - Name: "duration", - Value: benchmark.DefaultDuration, + + flagAppenders = &cli.UintSliceFlag{ + Name: "appenders", + Category: "Append: ", + Aliases: []string{"appenders-count"}, + Value: cli.NewUintSlice(benchmark.DefaultConcurrency), + Usage: "The number of appenders for each load target", } - flagReportInterval = &cli.DurationFlag{ - Name: "report-interval", - Value: benchmark.DefaultReportInterval, + flagMsgSize = &cli.UintSliceFlag{ + Name: "message-size", + Category: "Append: ", + Aliases: []string{"msg-size"}, + Value: cli.NewUintSlice(benchmark.DefaultMessageSize), + Usage: "Message sizes for each load target", } - flagPrintJSON = &cli.BoolFlag{ - Name: "print-json", - Usage: "Print json output if it is set", + flagBatchSize = &cli.UintSliceFlag{ + Name: "batch-size", + Category: "Append: ", + Value: cli.NewUintSlice(benchmark.DefaultBatchSize), + Usage: "Batch sizes for each load target", } flagPipelineSize = &cli.IntFlag{ - Name: "pipeline-size", - Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.", - Value: 0, + Name: "pipeline-size", + Category: "Append: ", + Usage: "Pipeline size, no pipelined requests if zero. Not support per-target pipeline size yet.", + Value: 0, } - flagSingleConnPerTarget = &cli.BoolFlag{ - Name: "single-conn-per-target", - Usage: "Use single connection shared by appenders in a target. Each target uses different connection.", + + flagSubscribers = &cli.UintSliceFlag{ + Name: "subscribers", + Category: "Subscribe: ", + Aliases: []string{"subscribers-count"}, + Value: cli.NewUintSlice(benchmark.DefaultConcurrency), + Usage: "The number of subscribers for each load target", + } + flagSubscribeSize = &cli.IntFlag{ + Name: "subscribe-size", + Category: "Subscribe: ", + Usage: "The number of messages to subscribe at once", + Value: 1000, } ) @@ -85,6 +105,7 @@ func newCommandTest() *cli.Command { flagPrintJSON, flagPipelineSize, flagSingleConnPerTarget, + flagSubscribeSize, }, Action: runCommandTest, } @@ -118,6 +139,7 @@ func runCommandTest(c *cli.Context) error { } } target.PipelineSize = c.Int(flagPipelineSize.Name) + target.SubscribeSize = c.Int(flagSubscribeSize.Name) targets[idx] = target } diff --git a/internal/benchmark/benchmark.go b/internal/benchmark/benchmark.go index 2eb92fd6f..b377f3a31 100644 --- a/internal/benchmark/benchmark.go +++ b/internal/benchmark/benchmark.go @@ -69,7 +69,9 @@ func New(opts ...Option) (bm *Benchmark, err error) { // Run starts Loaders and metric reporter. It blocks until the loaders are finished. func (bm *Benchmark) Run() error { - g, ctx := errgroup.WithContext(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + g, ctx := errgroup.WithContext(ctx) benchmarkTimer := time.NewTimer(bm.duration) @@ -87,6 +89,7 @@ func (bm *Benchmark) Run() error { close(bm.stopC) wg.Done() fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush())) + cancel() }() for { select { @@ -105,7 +108,6 @@ func (bm *Benchmark) Run() error { fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush())) } } - }() for _, tw := range bm.loaders { diff --git a/internal/benchmark/loader.go b/internal/benchmark/loader.go index 348952627..e1b83ecdf 100644 --- a/internal/benchmark/loader.go +++ b/internal/benchmark/loader.go @@ -1,10 +1,14 @@ package benchmark import ( + "cmp" "context" "errors" "fmt" + "io" "log/slog" + "slices" + "sync" "time" "go.uber.org/multierr" @@ -26,13 +30,9 @@ type loaderConfig struct { type Loader struct { loaderConfig - batch [][]byte - apps []varlog.Log - subs []varlog.Log - begin struct { - lsn varlogpb.LogSequenceNumber - ch chan varlogpb.LogSequenceNumber - } + batch [][]byte + apps []varlog.Log + subs []varlog.Log logger *slog.Logger } @@ -40,7 +40,6 @@ func NewLoader(cfg loaderConfig) (loader *Loader, err error) { loader = &Loader{ loaderConfig: cfg, } - loader.begin.ch = make(chan varlogpb.LogSequenceNumber, cfg.AppendersCount) loader.logger = slog.With(slog.Any("topic", loader.TopicID)) defer func() { @@ -109,12 +108,6 @@ func (loader *Loader) Run(ctx context.Context) (err error) { }) } - err = loader.setBeginLSN(ctx) - if err != nil { - err = fmt.Errorf("begin lsn: %w", err) - return err - } - for i := 0; i < len(loader.subs); i++ { c := loader.subs[i] g.Go(func() error { @@ -137,15 +130,6 @@ func (loader *Loader) Close() error { } func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *AppendMetrics) (appendFunc func() error, closeFunc func(), err error) { - begin := true - notifyBegin := func(meta varlogpb.LogEntryMeta) { - loader.begin.ch <- varlogpb.LogSequenceNumber{ - LLSN: meta.LLSN, - GLSN: meta.GLSN, - } - begin = false - } - debugLog := func(meta []varlogpb.LogEntryMeta) { cnt := len(meta) loader.logger.Debug("append", @@ -179,9 +163,6 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe } dur := time.Since(ts) recordMetrics(dur) - if begin { - notifyBegin(res.Metadata[0]) - } debugLog(res.Metadata) return nil } @@ -197,9 +178,6 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe } dur := time.Since(ts) recordMetrics(dur) - if begin { - notifyBegin(res.Metadata[0]) - } debugLog(res.Metadata) return nil } @@ -227,9 +205,6 @@ func (loader *Loader) makeAppendFunc(ctx context.Context, c varlog.Log, am *Appe } dur := time.Since(ts) recordMetrics(dur) - if begin { - notifyBegin(lem[0]) - } debugLog(lem) }) return err @@ -260,42 +235,75 @@ func (loader *Loader) appendLoop(ctx context.Context, c varlog.Log) error { } func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { - var sm SubscribeMetrics + first, last, err := loader.getLogRange(ctx, c, loader.TopicID, loader.LogStreamID) + if err != nil { + return fmt.Errorf("subscribe: %w", err) + } + loader.logger.Info("subscribe range", slog.Any("first", first), slog.Any("last", last)) + if loader.LogStreamID.Invalid() { - var subErr error - stop := make(chan struct{}) - closer, err := c.Subscribe(ctx, loader.TopicID, loader.begin.lsn.GLSN, types.MaxGLSN, func(logEntry varlogpb.LogEntry, err error) { - if err != nil { - subErr = err - close(stop) - return - } - loader.logger.Debug("subscribed", slog.String("log", logEntry.String())) - sm.logs++ - sm.bytes += int64(len(logEntry.Data)) - if loader.metrics.ReportSubscribeMetrics(sm) { - sm = SubscribeMetrics{} - } - }) + return loader.subscribeLoopInternal(ctx, c, first, last) + } + return loader.subscribeToInternal(ctx, c, first, last) +} + +func (loader *Loader) subscribeLoopInternal(ctx context.Context, c varlog.Log, first, last varlogpb.LogSequenceNumber) error { + subscribeSize := types.GLSN(loader.SubscribeSize) + + loader.logger.Info("subscribe", slog.Any("first", first), slog.Any("last", last), slog.Int("subscribeSize", loader.SubscribeSize)) + + var sm SubscribeMetrics + begin := first.GLSN + end := min(begin+subscribeSize, last.GLSN+1) + for begin < end { + err := loader.subscribeInternal(ctx, c, begin, end, &sm) if err != nil { return err } - defer closer() + begin = end + end = min(begin+subscribeSize, last.GLSN+1) + } - select { - case <-loader.stopC: - return nil - case <-stop: - if subErr != nil { - return fmt.Errorf("subscribe: %w", subErr) + return nil +} + +func (loader *Loader) subscribeInternal(ctx context.Context, c varlog.Log, begin, end types.GLSN, sm *SubscribeMetrics) error { + errC := make(chan error, 1) + closer, err := c.Subscribe(ctx, loader.TopicID, begin, end, func(logEntry varlogpb.LogEntry, err error) { + if err != nil { + if err != io.EOF { + errC <- err } - return nil - case <-ctx.Done(): - return ctx.Err() + close(errC) + return + } + sm.logs++ + sm.bytes += int64(len(logEntry.Data)) + if loader.metrics.ReportSubscribeMetrics(*sm) { + *sm = SubscribeMetrics{} + } + }) + if err != nil { + return err + } + defer closer() + + select { + case <-loader.stopC: + return nil + case err := <-errC: + if err != nil { + return fmt.Errorf("subscribe: %w", err) } + return nil + case <-ctx.Done(): + return ctx.Err() } +} - subscriber := c.SubscribeTo(ctx, loader.TopicID, loader.LogStreamID, loader.begin.lsn.LLSN, types.MaxLLSN) +func (loader *Loader) subscribeToInternal(ctx context.Context, c varlog.Log, first, last varlogpb.LogSequenceNumber) error { + var sm SubscribeMetrics + subscriber := c.SubscribeTo(ctx, loader.TopicID, loader.LogStreamID, first.LLSN, last.LLSN+1) defer func() { _ = subscriber.Close() }() @@ -311,7 +319,6 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { if err != nil { return fmt.Errorf("subscribe: %w", err) } - loader.logger.Debug("subscribeTo", slog.Any("llsn", logEntry.LLSN)) sm.logs++ sm.bytes += int64(len(logEntry.Data)) if loader.metrics.ReportSubscribeMetrics(sm) { @@ -320,24 +327,54 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { } } -func (loader *Loader) setBeginLSN(ctx context.Context) error { - beginLSN := varlogpb.LogSequenceNumber{ - LLSN: types.MaxLLSN, - GLSN: types.MaxGLSN, +func (loader *Loader) getLogRange(ctx context.Context, c varlog.Log, tpid types.TopicID, lsid types.LogStreamID) (first, last varlogpb. + LogSequenceNumber, err error, +) { + if !lsid.Invalid() { + return c.PeekLogStream(ctx, tpid, lsid) } - for i := uint(0); i < loader.AppendersCount; i++ { - select { - case <-loader.stopC: - return nil - case lsn := <-loader.begin.ch: - if lsn.GLSN < beginLSN.GLSN { - beginLSN = lsn + + lsids := c.AppendableLogStreams(tpid) + var mu sync.Mutex + fs := make([]varlogpb.LogSequenceNumber, 0, len(lsids)) + ls := make([]varlogpb.LogSequenceNumber, 0, len(lsids)) + eg, ctx := errgroup.WithContext(ctx) + for lsid := range lsids { + lsid := lsid + eg.Go(func() error { + f, l, err := c.PeekLogStream(ctx, tpid, lsid) + if err != nil { + return err } - case <-ctx.Done(): - return ctx.Err() - } + mu.Lock() + fs = append(fs, f) + ls = append(ls, l) + mu.Unlock() + return nil + }) } - loader.begin.lsn = beginLSN - loader.logger.Debug("begin lsn", slog.Any("lsn", beginLSN)) - return nil + err = eg.Wait() + if err != nil { + return first, last, err + } + + hasLog := slices.ContainsFunc(fs, func(lsn varlogpb.LogSequenceNumber) bool { + return !lsn.GLSN.Invalid() + }) + if hasLog { + slices.SortFunc(fs, func(lsn1, lsn2 varlogpb.LogSequenceNumber) int { + return cmp.Compare(lsn1.GLSN, lsn2.GLSN) + }) + idx := slices.IndexFunc(fs, func(lsn varlogpb.LogSequenceNumber) bool { + return !lsn.GLSN.Invalid() + }) + first = fs[idx] + + slices.SortFunc(ls, func(lsn1, lsn2 varlogpb.LogSequenceNumber) int { + return cmp.Compare(lsn1.GLSN, lsn2.GLSN) + }) + last = ls[len(ls)-1] + } + + return first, last, nil } diff --git a/internal/benchmark/target.go b/internal/benchmark/target.go index ec2f516f9..9a49f4c6d 100644 --- a/internal/benchmark/target.go +++ b/internal/benchmark/target.go @@ -15,6 +15,7 @@ type Target struct { AppendersCount uint SubscribersCount uint PipelineSize int + SubscribeSize int } func (tgt Target) Valid() error {