Skip to content

Commit

Permalink
Switch existing error reporting for tracing to use new helper (#40662) (
Browse files Browse the repository at this point in the history
#40767)

* Switch existing trace error reporting to use `apitracing.EndSpan`

* Add deferred trace error reports to tbot

* Optimize for @espadolini limited horizontal editor length
  • Loading branch information
strideynet authored Apr 23, 2024
1 parent dd10c27 commit 2288f1d
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 122 deletions.
15 changes: 5 additions & 10 deletions api/observability/tracing/ssh/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"fmt"

"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/crypto/ssh"
Expand All @@ -45,7 +44,7 @@ func NewTraceChannel(ch ssh.Channel, opts ...tracing.Option) *Channel {
// SendRequest sends a global request, and returns the
// reply. If tracing is enabled, the provided payload
// is wrapped in an Envelope to forward any tracing context.
func (c *Channel) SendRequest(ctx context.Context, name string, wantReply bool, payload []byte) (bool, error) {
func (c *Channel) SendRequest(ctx context.Context, name string, wantReply bool, payload []byte) (_ bool, err error) {
config := tracing.NewConfig(c.opts)
tracer := config.TracerProvider.Tracer(instrumentationName)

Expand All @@ -59,15 +58,11 @@ func (c *Channel) SendRequest(ctx context.Context, name string, wantReply bool,
semconv.RPCSystemKey.String("ssh"),
),
)
defer span.End()
defer func() { tracing.EndSpan(span, err) }()

ok, err := c.Channel.SendRequest(name, wantReply, wrapPayload(ctx, c.tracingSupported, config.TextMapPropagator, payload))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return ok, err
return c.Channel.SendRequest(
name, wantReply, wrapPayload(ctx, c.tracingSupported, config.TextMapPropagator, payload),
)
}

// NewChannel is a wrapper around ssh.NewChannel that allows an
Expand Down
49 changes: 16 additions & 33 deletions api/observability/tracing/ssh/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/gravitational/trace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -105,7 +104,9 @@ func (c *Client) DialContext(ctx context.Context, n, addr string) (net.Conn, err
// SendRequest sends a global request, and returns the
// reply. If tracing is enabled, the provided payload
// is wrapped in an Envelope to forward any tracing context.
func (c *Client) SendRequest(ctx context.Context, name string, wantReply bool, payload []byte) (bool, []byte, error) {
func (c *Client) SendRequest(
ctx context.Context, name string, wantReply bool, payload []byte,
) (_ bool, _ []byte, err error) {
config := tracing.NewConfig(c.opts)
tracer := config.TracerProvider.Tracer(instrumentationName)

Expand All @@ -123,21 +124,19 @@ func (c *Client) SendRequest(ctx context.Context, name string, wantReply bool, p
)...,
),
)
defer span.End()
defer func() { tracing.EndSpan(span, err) }()

ok, resp, err := c.Client.SendRequest(name, wantReply, wrapPayload(ctx, c.capability, config.TextMapPropagator, payload))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return ok, resp, err
return c.Client.SendRequest(
name, wantReply, wrapPayload(ctx, c.capability, config.TextMapPropagator, payload),
)
}

// OpenChannel tries to open a channel. If tracing is enabled,
// the provided payload is wrapped in an Envelope to forward
// any tracing context.
func (c *Client) OpenChannel(ctx context.Context, name string, data []byte) (*Channel, <-chan *ssh.Request, error) {
func (c *Client) OpenChannel(
ctx context.Context, name string, data []byte,
) (_ *Channel, _ <-chan *ssh.Request, err error) {
config := tracing.NewConfig(c.opts)
tracer := config.TracerProvider.Tracer(instrumentationName)
ctx, span := tracer.Start(
Expand All @@ -153,14 +152,9 @@ func (c *Client) OpenChannel(ctx context.Context, name string, data []byte) (*Ch
)...,
),
)
defer span.End()
defer func() { tracing.EndSpan(span, err) }()

ch, reqs, err := c.Client.OpenChannel(name, wrapPayload(ctx, c.capability, config.TextMapPropagator, data))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return &Channel{
Channel: ch,
opts: c.opts,
Expand Down Expand Up @@ -287,7 +281,7 @@ func (c *clientWrapper) nextContext(name string) context.Context {
// OpenChannel tries to open a channel. If tracing is enabled,
// the provided payload is wrapped in an Envelope to forward
// any tracing context.
func (c *clientWrapper) OpenChannel(name string, data []byte) (ssh.Channel, <-chan *ssh.Request, error) {
func (c *clientWrapper) OpenChannel(name string, data []byte) (_ ssh.Channel, _ <-chan *ssh.Request, err error) {
config := tracing.NewConfig(c.opts)
tracer := config.TracerProvider.Tracer(instrumentationName)
ctx, span := tracer.Start(
Expand All @@ -303,14 +297,9 @@ func (c *clientWrapper) OpenChannel(name string, data []byte) (ssh.Channel, <-ch
)...,
),
)
defer span.End()
defer func() { tracing.EndSpan(span, err) }()

ch, reqs, err := c.Conn.OpenChannel(name, wrapPayload(ctx, c.capability, config.TextMapPropagator, data))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}

return channelWrapper{
Channel: ch,
manager: c,
Expand All @@ -331,7 +320,7 @@ type channelWrapper struct {
// It is the callers' responsibility to ensure that addContext is
// called with the appropriate context.Context prior to any
// requests being sent along the channel.
func (c channelWrapper) SendRequest(name string, wantReply bool, payload []byte) (bool, error) {
func (c channelWrapper) SendRequest(name string, wantReply bool, payload []byte) (_ bool, err error) {
config := tracing.NewConfig(c.manager.opts)
ctx, span := config.TracerProvider.Tracer(instrumentationName).Start(
c.manager.nextContext(name),
Expand All @@ -344,13 +333,7 @@ func (c channelWrapper) SendRequest(name string, wantReply bool, payload []byte)
semconv.RPCSystemKey.String("ssh"),
),
)
defer span.End()

ok, err := c.Channel.SendRequest(name, wantReply, wrapPayload(ctx, c.manager.capability, config.TextMapPropagator, payload))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
defer func() { tracing.EndSpan(span, err) }()

return ok, err
return c.Channel.SendRequest(name, wantReply, wrapPayload(ctx, c.manager.capability, config.TextMapPropagator, payload))
}
6 changes: 2 additions & 4 deletions api/observability/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ func NewTracer(name string) oteltrace.Tracer {
// Example usage:
//
// func myFunc() (err error) {
// ctx, span := tracer.NewSpan(ctx, "myFunc")
// defer func() {
// tracing.EndSpan(span, err)
// }()
// ctx, span := tracer.Start(ctx, "myFunc")
// defer func() { tracing.EndSpan(span, err) }()
// ...
// }
func EndSpan(span oteltrace.Span, err error) {
Expand Down
8 changes: 1 addition & 7 deletions lib/auth/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,7 @@ func Apply(ctx context.Context, b backend.Backend, opts ...func(c *applyConfig))
}

ctx, span := tracer.Start(ctx, "migrations/Apply")
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
span.End()
}()
defer func() { tracing.EndSpan(span, err) }()

slices.SortFunc(cfg.migrations, func(a, b migration) int {
return cmp.Compare(a.Version(), b.Version())
Expand Down
12 changes: 3 additions & 9 deletions lib/auth/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ type HostCredentials func(context.Context, string, bool, types.RegisterUsingToke
// as well as a method for the node to validate the auth server.
func Register(ctx context.Context, params RegisterParams) (certs *proto.Certs, err error) {
ctx, span := tracer.Start(ctx, "Register")
defer func() {
tracing.EndSpan(span, err)
}()
defer func() { tracing.EndSpan(span, err) }()

if err := params.checkAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -324,9 +322,7 @@ func registerThroughProxy(
params RegisterParams,
) (certs *proto.Certs, err error) {
ctx, span := tracer.Start(ctx, "registerThroughProxy")
defer func() {
tracing.EndSpan(span, err)
}()
defer func() { tracing.EndSpan(span, err) }()

switch params.JoinMethod {
case types.JoinMethodIAM, types.JoinMethodAzure:
Expand Down Expand Up @@ -387,9 +383,7 @@ func registerThroughAuth(
ctx context.Context, token string, params RegisterParams,
) (certs *proto.Certs, err error) {
ctx, span := tracer.Start(ctx, "registerThroughAuth")
defer func() {
tracing.EndSpan(span, err)
}()
defer func() { tracing.EndSpan(span, err) }()

var client *Client
// Build a client for the Auth Server with different certificate validation
Expand Down
27 changes: 4 additions & 23 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

Expand All @@ -37,6 +36,7 @@ import (
apidefaults "github.com/gravitational/teleport/api/defaults"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
apitracing "github.com/gravitational/teleport/api/observability/tracing"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -1519,14 +1519,7 @@ func tracedApplyFn(parent oteltrace.Span, tracer oteltrace.Tracer, kind resource
oteltrace.ContextWithSpan(ctx, parent),
fmt.Sprintf("cache/apply/%s", kind.String()),
)

defer func() {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
}()
defer func() { apitracing.EndSpan(span, err) }()

return f(ctx)
}
Expand Down Expand Up @@ -1558,13 +1551,7 @@ func isControlPlane(target string) bool {

func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types.WatchKind) (fn applyFn, err error) {
ctx, fetchSpan := c.Tracer.Start(ctx, "cache/fetch", oteltrace.WithAttributes(attribute.String("target", c.target)))
defer func() {
if err != nil {
fetchSpan.RecordError(err)
fetchSpan.SetStatus(codes.Error, err.Error())
}
fetchSpan.End()
}()
defer func() { apitracing.EndSpan(fetchSpan, err) }()

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(fetchLimit(c.target))
Expand All @@ -1583,13 +1570,7 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types
attribute.String("target", c.target),
),
)
defer func() {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
span.End()
}()
defer func() { apitracing.EndSpan(span, err) }()

_, cacheOK := confirmedKinds[resourceKind{kind: kind.kind, subkind: kind.subkind}]
applyfn, err := collection.fetch(ctx, cacheOK)
Expand Down
21 changes: 2 additions & 19 deletions lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/agent"
Expand Down Expand Up @@ -1608,15 +1607,7 @@ func (tc *TeleportClient) ConnectToNode(ctx context.Context, clt *ClusterClient,
attribute.String("node", node),
),
)
defer func() {
if err != nil {
// The error is unwrapped here so that the error reported is any underlying
// error and not a trace.TraceErr.
span.RecordError(trace.Unwrap(err))
span.SetStatus(codes.Error, err.Error())
}
span.End()
}()
defer func() { apitracing.EndSpan(span, err) }()

// if per-session mfa is required, perform the mfa ceremony to get
// new certificates and use them to connect.
Expand Down Expand Up @@ -2895,15 +2886,7 @@ func (tc *TeleportClient) ConnectToCluster(ctx context.Context) (_ *ClusterClien
attribute.String("proxy_ssh", tc.Config.SSHProxyAddr),
),
)
defer func() {
if err != nil {
// The error is unwrapped here so that the error reported is any underlying
// error and not a trace.TraceErr.
span.RecordError(trace.Unwrap(err))
span.SetStatus(codes.Error, err.Error())
}
span.End()
}()
defer func() { apitracing.EndSpan(span, err) }()

cfg, err := tc.generateClientConfig(ctx)
if err != nil {
Expand Down
13 changes: 2 additions & 11 deletions lib/proxy/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/crypto/ssh"

Expand Down Expand Up @@ -222,10 +221,8 @@ func (r *Router) DialHost(ctx context.Context, clientSrcAddr, clientDstAddr net.
defer func() {
if err != nil {
failedConnectingToNode.Inc()
span.RecordError(trace.Unwrap(err))
span.SetStatus(codes.Error, err.Error())
}
span.End()
tracing.EndSpan(span, err)
}()

site := r.localSite
Expand Down Expand Up @@ -556,13 +553,7 @@ func (r *Router) DialSite(ctx context.Context, clusterName string, clientSrcAddr
attribute.String("cluster", clusterName),
),
)
defer func() {
if err != nil {
span.RecordError(trace.Unwrap(err))
span.SetStatus(codes.Error, err.Error())
}
span.End()
}()
defer func() { tracing.EndSpan(span, err) }()

// default to local cluster if one wasn't provided
if clusterName == "" {
Expand Down
13 changes: 7 additions & 6 deletions lib/tbot/tbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/client/webclient"
"github.com/gravitational/teleport/api/metadata"
apitracing "github.com/gravitational/teleport/api/observability/tracing"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/auth/authclient"
Expand Down Expand Up @@ -111,9 +112,9 @@ func (b *Bot) BotIdentity() *identity.Identity {
return b.botIdentitySvc.GetIdentity()
}

func (b *Bot) Run(ctx context.Context) error {
func (b *Bot) Run(ctx context.Context) (err error) {
ctx, span := tracer.Start(ctx, "Bot/Run")
defer span.End()
defer func() { apitracing.EndSpan(span, err) }()

if err := b.markStarted(); err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -299,9 +300,9 @@ func (b *Bot) Run(ctx context.Context) error {
// preRunChecks returns an unlock function which must be deferred.
// It performs any initial validation and locks the bot's storage before any
// more expensive initialization is performed.
func (b *Bot) preRunChecks(ctx context.Context) (func() error, error) {
func (b *Bot) preRunChecks(ctx context.Context) (_ func() error, err error) {
ctx, span := tracer.Start(ctx, "Bot/preRunChecks")
defer span.End()
defer func() { apitracing.EndSpan(span, err) }()

switch _, addrKind := b.cfg.Address(); addrKind {
case config.AddressKindUnspecified:
Expand Down Expand Up @@ -423,9 +424,9 @@ func clientForFacade(
cfg *config.BotConfig,
facade *identity.Facade,
resolver reversetunnelclient.Resolver,
) (*auth.Client, error) {
) (_ *auth.Client, err error) {
ctx, span := tracer.Start(ctx, "clientForFacade")
defer span.End()
defer func() { apitracing.EndSpan(span, err) }()

tlsConfig, err := facade.TLSConfig()
if err != nil {
Expand Down

0 comments on commit 2288f1d

Please sign in to comment.