diff --git a/tool/tctl/common/loadtest_command.go b/tool/tctl/common/loadtest_command.go index ac8d382938e9a..f1b285167deb5 100644 --- a/tool/tctl/common/loadtest_command.go +++ b/tool/tctl/common/loadtest_command.go @@ -21,6 +21,7 @@ package common import ( "context" "fmt" + "log/slog" "os" "os/signal" "runtime" @@ -31,6 +32,7 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/google/uuid" + "github.com/gravitational/teleport" "github.com/gravitational/trace" log "github.com/sirupsen/logrus" @@ -39,6 +41,7 @@ import ( "github.com/gravitational/teleport/lib/cache" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/utils" ) // LoadtestCommand implements the `tctl loadtest` family of commands. @@ -56,7 +59,9 @@ type LoadtestCommand struct { ttl time.Duration concurrency int - kind string + kind string + ops string + format string } // Initialize allows LoadtestCommand to plug itself into the CLI parser @@ -76,6 +81,8 @@ func (c *LoadtestCommand) Initialize(app *kingpin.Application, config *servicecf c.watch = loadtest.Command("watch", "Monitor event stream").Hidden() c.watch.Flag("kind", "Resource kind(s) to watch, e.g. --kind=node,user,role").StringVar(&c.kind) + c.watch.Flag("ops", "Operations to watch, e.g. --ops=put,del").Default("put,del").StringVar(&c.ops) + c.watch.Flag("format", "Output format").Default(teleport.Text).EnumVar(&c.format, teleport.Text, teleport.JSON) } // TryRun takes the CLI command as an argument (like "loadtest node-heartbeats") and executes it. @@ -224,6 +231,27 @@ func (c *LoadtestCommand) Watch(ctx context.Context, client *authclient.Client) }) } + ops := make(map[types.OpType]struct{}) + for _, op := range strings.Split(c.ops, ",") { + op = strings.TrimSpace(op) + if op == "" { + continue + } + + switch op { + case "put": + ops[types.OpPut] = struct{}{} + case "del": + ops[types.OpDelete] = struct{}{} + default: + return trace.BadParameter("unknown operation: %v", op) + } + } + + if len(ops) == 0 { + return trace.BadParameter("no operations specified") + } + var allowPartialSuccess bool if len(kinds) == 0 { // use auth watch kinds by default @@ -232,58 +260,95 @@ func (c *LoadtestCommand) Watch(ctx context.Context, client *authclient.Client) allowPartialSuccess = true } - watcher, err := client.NewWatcher(ctx, types.Watch{ - Name: "tctl-watch", - Kinds: kinds, - AllowPartialSuccess: allowPartialSuccess, - }) - if err != nil { - return trace.Wrap(err) - } - - defer watcher.Close() - - select { - case event := <-watcher.Events(): - if event.Type != types.OpInit { - return trace.BadParameter("expected init event, got %v instead", event.Type) - } - - var skinds []string - for _, k := range event.Resource.(types.WatchStatus).GetKinds() { - skinds = append(skinds, k.Kind) +Outer: + for { + slog.InfoContext(ctx, "starting watch", "kinds", kinds, "ops", ops) + watcher, err := client.NewWatcher(ctx, types.Watch{ + Name: "tctl-watch", + Kinds: kinds, + AllowPartialSuccess: allowPartialSuccess, + }) + if err != nil { + return trace.Wrap(err) } - fmt.Printf("INIT: %v\n", skinds) - case <-watcher.Done(): - return trace.Errorf("failed to get init event: %v", watcher.Error()) - } + defer watcher.Close() - for { select { case event := <-watcher.Events(): - switch event.Type { - case types.OpPut: - printEvent("PUT", event.Resource) - case types.OpDelete: - printEvent("DEL", event.Resource) - default: - return trace.BadParameter("expected put or del event, got %v instead", event.Type) + if event.Type != types.OpInit { + return trace.BadParameter("expected init event, got %v instead", event.Type) + } + + var skinds []string + for _, k := range event.Resource.(types.WatchStatus).GetKinds() { + skinds = append(skinds, k.Kind) } + + slog.InfoContext(ctx, "watcher initialized", "kinds", skinds) case <-watcher.Done(): if ctx.Err() != nil { - // canceled by caller return nil } - return trace.Errorf("watcher exited unexpectedly: %v", watcher.Error()) + slog.ErrorContext(ctx, "watcher failed while waiting for init, will retry", "error", watcher.Error()) + continue Outer + } + + Inner: + for { + select { + case event := <-watcher.Events(): + if _, ok := ops[event.Type]; !ok { + continue Inner + } + switch event.Type { + case types.OpPut: + if err := printEvent("PUT", event.Resource, c.format); err != nil { + return trace.Wrap(err) + } + case types.OpDelete: + if err := printEvent("DEL", event.Resource, c.format); err != nil { + return trace.Wrap(err) + } + default: + return trace.BadParameter("expected put or del event, got %v instead", event.Type) + } + case <-watcher.Done(): + if ctx.Err() != nil { + // canceled by caller + return nil + } + slog.ErrorContext(ctx, "watcher exited unexpectedly, will retry", "error", watcher.Error()) + continue Outer + } } } } -func printEvent(ekind string, rsc types.Resource) { - if sk := rsc.GetSubKind(); sk != "" { - fmt.Printf("%s: %s/%s/%s\n", ekind, rsc.GetKind(), sk, rsc.GetName()) - } else { - fmt.Printf("%s: %s/%s\n", ekind, rsc.GetKind(), rsc.GetName()) +func printEvent(ekind string, rsc types.Resource, format string) error { + ts := time.Now().Format(time.RFC3339) + var ln string + switch format { + case teleport.Text: + if sk := rsc.GetSubKind(); sk != "" { + ln = fmt.Sprintf("%s %s: %s/%s/%s", ts, ekind, rsc.GetKind(), sk, rsc.GetName()) + } else { + ln = fmt.Sprintf("%s %s: %s/%s", ts, ekind, rsc.GetKind(), rsc.GetName()) + } + case teleport.JSON: + b, err := utils.FastMarshal(map[string]any{ + "op": ekind, + "resource": rsc, + "rx_time": ts, + }) + if err != nil { + return trace.Wrap(err) + } + ln = string(b) + default: + return trace.BadParameter("unknown format: %v", format) } + + fmt.Println(ln) + return nil }