Skip to content

Commit

Permalink
improve loadtest watch cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall authored and github-actions committed Aug 13, 2024
1 parent 7646449 commit a0bd270
Showing 1 changed file with 105 additions and 40 deletions.
145 changes: 105 additions & 40 deletions tool/tctl/common/loadtest_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package common
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"runtime"
Expand All @@ -34,11 +35,13 @@ import (
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/cache"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
)

// LoadtestCommand implements the `tctl loadtest` family of commands.
Expand All @@ -56,7 +59,9 @@ type LoadtestCommand struct {
ttl time.Duration
concurrency int

kind string
kind string
ops string
format string
}

// Initialize allows LoadtestCommand to plug itself into the CLI parser
Expand All @@ -76,6 +81,8 @@ func (c *LoadtestCommand) Initialize(app *kingpin.Application, config *servicecf

c.watch = loadtest.Command("watch", "Monitor event stream").Hidden()
c.watch.Flag("kind", "Resource kind(s) to watch, e.g. --kind=node,user,role").StringVar(&c.kind)
c.watch.Flag("ops", "Operations to watch, e.g. --ops=put,del").Default("put,del").StringVar(&c.ops)
c.watch.Flag("format", "Output format").Default(teleport.Text).EnumVar(&c.format, teleport.Text, teleport.JSON)
}

// TryRun takes the CLI command as an argument (like "loadtest node-heartbeats") and executes it.
Expand Down Expand Up @@ -224,6 +231,27 @@ func (c *LoadtestCommand) Watch(ctx context.Context, client *authclient.Client)
})
}

ops := make(map[types.OpType]struct{})
for _, op := range strings.Split(c.ops, ",") {
op = strings.TrimSpace(op)
if op == "" {
continue
}

switch op {
case "put":
ops[types.OpPut] = struct{}{}
case "del":
ops[types.OpDelete] = struct{}{}
default:
return trace.BadParameter("unknown operation: %v", op)
}
}

if len(ops) == 0 {
return trace.BadParameter("no operations specified")
}

var allowPartialSuccess bool
if len(kinds) == 0 {
// use auth watch kinds by default
Expand All @@ -232,58 +260,95 @@ func (c *LoadtestCommand) Watch(ctx context.Context, client *authclient.Client)
allowPartialSuccess = true
}

watcher, err := client.NewWatcher(ctx, types.Watch{
Name: "tctl-watch",
Kinds: kinds,
AllowPartialSuccess: allowPartialSuccess,
})
if err != nil {
return trace.Wrap(err)
}

defer watcher.Close()

select {
case event := <-watcher.Events():
if event.Type != types.OpInit {
return trace.BadParameter("expected init event, got %v instead", event.Type)
}

var skinds []string
for _, k := range event.Resource.(types.WatchStatus).GetKinds() {
skinds = append(skinds, k.Kind)
Outer:
for {
slog.InfoContext(ctx, "starting watch", "kinds", kinds, "ops", ops)
watcher, err := client.NewWatcher(ctx, types.Watch{
Name: "tctl-watch",
Kinds: kinds,
AllowPartialSuccess: allowPartialSuccess,
})
if err != nil {
return trace.Wrap(err)
}

fmt.Printf("INIT: %v\n", skinds)
case <-watcher.Done():
return trace.Errorf("failed to get init event: %v", watcher.Error())
}
defer watcher.Close()

for {
select {
case event := <-watcher.Events():
switch event.Type {
case types.OpPut:
printEvent("PUT", event.Resource)
case types.OpDelete:
printEvent("DEL", event.Resource)
default:
return trace.BadParameter("expected put or del event, got %v instead", event.Type)
if event.Type != types.OpInit {
return trace.BadParameter("expected init event, got %v instead", event.Type)
}

var skinds []string
for _, k := range event.Resource.(types.WatchStatus).GetKinds() {
skinds = append(skinds, k.Kind)
}

slog.InfoContext(ctx, "watcher initialized", "kinds", skinds)
case <-watcher.Done():
if ctx.Err() != nil {
// canceled by caller
return nil
}
return trace.Errorf("watcher exited unexpectedly: %v", watcher.Error())
slog.ErrorContext(ctx, "watcher failed while waiting for init, will retry", "error", watcher.Error())
continue Outer
}

Inner:
for {
select {
case event := <-watcher.Events():
if _, ok := ops[event.Type]; !ok {
continue Inner
}
switch event.Type {
case types.OpPut:
if err := printEvent("PUT", event.Resource, c.format); err != nil {
return trace.Wrap(err)
}
case types.OpDelete:
if err := printEvent("DEL", event.Resource, c.format); err != nil {
return trace.Wrap(err)
}
default:
return trace.BadParameter("expected put or del event, got %v instead", event.Type)
}
case <-watcher.Done():
if ctx.Err() != nil {
// canceled by caller
return nil
}
slog.ErrorContext(ctx, "watcher exited unexpectedly, will retry", "error", watcher.Error())
continue Outer
}
}
}
}

func printEvent(ekind string, rsc types.Resource) {
if sk := rsc.GetSubKind(); sk != "" {
fmt.Printf("%s: %s/%s/%s\n", ekind, rsc.GetKind(), sk, rsc.GetName())
} else {
fmt.Printf("%s: %s/%s\n", ekind, rsc.GetKind(), rsc.GetName())
func printEvent(ekind string, rsc types.Resource, format string) error {
ts := time.Now().Format(time.RFC3339)
var ln string
switch format {
case teleport.Text:
if sk := rsc.GetSubKind(); sk != "" {
ln = fmt.Sprintf("%s %s: %s/%s/%s", ts, ekind, rsc.GetKind(), sk, rsc.GetName())
} else {
ln = fmt.Sprintf("%s %s: %s/%s", ts, ekind, rsc.GetKind(), rsc.GetName())
}
case teleport.JSON:
b, err := utils.FastMarshal(map[string]any{
"op": ekind,
"resource": rsc,
"rx_time": ts,
})
if err != nil {
return trace.Wrap(err)
}
ln = string(b)
default:
return trace.BadParameter("unknown format: %v", format)
}

fmt.Println(ln)
return nil
}

0 comments on commit a0bd270

Please sign in to comment.