Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] improve loadtest watch cmd #45436

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 105 additions & 40 deletions tool/tctl/common/loadtest_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package common
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"runtime"
Expand All @@ -34,11 +35,13 @@ import (
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/cache"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
)

// LoadtestCommand implements the `tctl loadtest` family of commands.
Expand All @@ -56,7 +59,9 @@ type LoadtestCommand struct {
ttl time.Duration
concurrency int

kind string
kind string
ops string
format string
}

// Initialize allows LoadtestCommand to plug itself into the CLI parser
Expand All @@ -76,6 +81,8 @@ func (c *LoadtestCommand) Initialize(app *kingpin.Application, config *servicecf

c.watch = loadtest.Command("watch", "Monitor event stream").Hidden()
c.watch.Flag("kind", "Resource kind(s) to watch, e.g. --kind=node,user,role").StringVar(&c.kind)
c.watch.Flag("ops", "Operations to watch, e.g. --ops=put,del").Default("put,del").StringVar(&c.ops)
c.watch.Flag("format", "Output format").Default(teleport.Text).EnumVar(&c.format, teleport.Text, teleport.JSON)
}

// TryRun takes the CLI command as an argument (like "loadtest node-heartbeats") and executes it.
Expand Down Expand Up @@ -224,6 +231,27 @@ func (c *LoadtestCommand) Watch(ctx context.Context, client *authclient.Client)
})
}

ops := make(map[types.OpType]struct{})
for _, op := range strings.Split(c.ops, ",") {
op = strings.TrimSpace(op)
if op == "" {
continue
}

switch op {
case "put":
ops[types.OpPut] = struct{}{}
case "del":
ops[types.OpDelete] = struct{}{}
default:
return trace.BadParameter("unknown operation: %v", op)
}
}

if len(ops) == 0 {
return trace.BadParameter("no operations specified")
}

var allowPartialSuccess bool
if len(kinds) == 0 {
// use auth watch kinds by default
Expand All @@ -232,58 +260,95 @@ func (c *LoadtestCommand) Watch(ctx context.Context, client *authclient.Client)
allowPartialSuccess = true
}

watcher, err := client.NewWatcher(ctx, types.Watch{
Name: "tctl-watch",
Kinds: kinds,
AllowPartialSuccess: allowPartialSuccess,
})
if err != nil {
return trace.Wrap(err)
}

defer watcher.Close()

select {
case event := <-watcher.Events():
if event.Type != types.OpInit {
return trace.BadParameter("expected init event, got %v instead", event.Type)
}

var skinds []string
for _, k := range event.Resource.(types.WatchStatus).GetKinds() {
skinds = append(skinds, k.Kind)
Outer:
for {
slog.InfoContext(ctx, "starting watch", "kinds", kinds, "ops", ops)
watcher, err := client.NewWatcher(ctx, types.Watch{
Name: "tctl-watch",
Kinds: kinds,
AllowPartialSuccess: allowPartialSuccess,
})
if err != nil {
return trace.Wrap(err)
}

fmt.Printf("INIT: %v\n", skinds)
case <-watcher.Done():
return trace.Errorf("failed to get init event: %v", watcher.Error())
}
defer watcher.Close()

for {
select {
case event := <-watcher.Events():
switch event.Type {
case types.OpPut:
printEvent("PUT", event.Resource)
case types.OpDelete:
printEvent("DEL", event.Resource)
default:
return trace.BadParameter("expected put or del event, got %v instead", event.Type)
if event.Type != types.OpInit {
return trace.BadParameter("expected init event, got %v instead", event.Type)
}

var skinds []string
for _, k := range event.Resource.(types.WatchStatus).GetKinds() {
skinds = append(skinds, k.Kind)
}

slog.InfoContext(ctx, "watcher initialized", "kinds", skinds)
case <-watcher.Done():
if ctx.Err() != nil {
// canceled by caller
return nil
}
return trace.Errorf("watcher exited unexpectedly: %v", watcher.Error())
slog.ErrorContext(ctx, "watcher failed while waiting for init, will retry", "error", watcher.Error())
continue Outer
}

Inner:
for {
select {
case event := <-watcher.Events():
if _, ok := ops[event.Type]; !ok {
continue Inner
}
switch event.Type {
case types.OpPut:
if err := printEvent("PUT", event.Resource, c.format); err != nil {
return trace.Wrap(err)
}
case types.OpDelete:
if err := printEvent("DEL", event.Resource, c.format); err != nil {
return trace.Wrap(err)
}
default:
return trace.BadParameter("expected put or del event, got %v instead", event.Type)
}
case <-watcher.Done():
if ctx.Err() != nil {
// canceled by caller
return nil
}
slog.ErrorContext(ctx, "watcher exited unexpectedly, will retry", "error", watcher.Error())
continue Outer
}
}
}
}

func printEvent(ekind string, rsc types.Resource) {
if sk := rsc.GetSubKind(); sk != "" {
fmt.Printf("%s: %s/%s/%s\n", ekind, rsc.GetKind(), sk, rsc.GetName())
} else {
fmt.Printf("%s: %s/%s\n", ekind, rsc.GetKind(), rsc.GetName())
func printEvent(ekind string, rsc types.Resource, format string) error {
ts := time.Now().Format(time.RFC3339)
var ln string
switch format {
case teleport.Text:
if sk := rsc.GetSubKind(); sk != "" {
ln = fmt.Sprintf("%s %s: %s/%s/%s", ts, ekind, rsc.GetKind(), sk, rsc.GetName())
} else {
ln = fmt.Sprintf("%s %s: %s/%s", ts, ekind, rsc.GetKind(), rsc.GetName())
}
case teleport.JSON:
b, err := utils.FastMarshal(map[string]any{
"op": ekind,
"resource": rsc,
"rx_time": ts,
})
if err != nil {
return trace.Wrap(err)
}
ln = string(b)
default:
return trace.BadParameter("unknown format: %v", format)
}

fmt.Println(ln)
return nil
}
Loading