diff --git a/lib/kube/grpc/grpc.go b/lib/kube/grpc/grpc.go index 67f17fc4d5079..bbd982cd12da0 100644 --- a/lib/kube/grpc/grpc.go +++ b/lib/kube/grpc/grpc.go @@ -21,11 +21,11 @@ package kubev1 import ( "context" "errors" + "log/slog" "slices" "github.com/gravitational/trace" "github.com/gravitational/trace/trail" - "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -85,7 +85,7 @@ type Config struct { // Authz authenticates user. Authz authz.Authorizer // Log is the logger function. - Log logrus.FieldLogger + Log *slog.Logger // Emitter is used to emit audit events. Emitter apievents.Emitter // Component name to include in log output. @@ -139,9 +139,9 @@ func (c *Config) CheckAndSetDefaults() error { c.Component = "kube.grpc" } if c.Log == nil { - c.Log = logrus.New() + c.Log = slog.Default() } - c.Log = c.Log.WithFields(logrus.Fields{teleport.ComponentKey: c.Component}) + c.Log = c.Log.With(teleport.ComponentKey, c.Component) return nil } diff --git a/lib/kube/kubeconfig/kubeconfig.go b/lib/kube/kubeconfig/kubeconfig.go index 30e587da834d1..dc9f8d92833f1 100644 --- a/lib/kube/kubeconfig/kubeconfig.go +++ b/lib/kube/kubeconfig/kubeconfig.go @@ -21,13 +21,13 @@ package kubeconfig import ( "bytes" + "context" "fmt" "os" "path/filepath" "strings" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/clientcmd" @@ -36,11 +36,10 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib/client" "github.com/gravitational/teleport/lib/utils" + logutils "github.com/gravitational/teleport/lib/utils/log" ) -var log = logrus.WithFields(logrus.Fields{ - teleport.ComponentKey: teleport.ComponentKubeClient, -}) +var log = logutils.NewPackageLogger(teleport.ComponentKey, teleport.ComponentKubeClient) const ( // teleportKubeClusterNameExtension is the name of the extension that @@ -268,7 +267,7 @@ func UpdateConfig(path string, v Values, storeAllCAs bool, fs ConfigFS) error { } else if !trace.IsBadParameter(err) { return trace.Wrap(err) } - log.WithError(err).Warn("Kubernetes integration is not supported when logging in with a hardware private key.") + log.WarnContext(context.Background(), "Kubernetes integration is not supported when logging in with a hardware private key", "error", err) } return SaveConfig(path, *config, fs) @@ -493,7 +492,7 @@ func PathFromEnv() string { var configPath string if len(parts) > 0 { configPath = parts[0] - log.Debugf("Using kubeconfig from environment: %q.", configPath) + log.DebugContext(context.Background(), "Using kubeconfig from environment", "config_path", configPath) } return configPath diff --git a/lib/kube/proxy/auth.go b/lib/kube/proxy/auth.go index 16ee58685e1a2..61842aca77bf0 100644 --- a/lib/kube/proxy/auth.go +++ b/lib/kube/proxy/auth.go @@ -23,12 +23,12 @@ import ( "context" "crypto/tls" "fmt" + "log/slog" "net" "net/http" "net/url" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" authzapi "k8s.io/api/authorization/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -77,11 +77,11 @@ func (f *Forwarder) getKubeDetails(ctx context.Context) error { kubeClusterName := f.cfg.KubeClusterName tpClusterName := f.cfg.ClusterName - f.log. - WithField("kubeconfigPath", kubeconfigPath). - WithField("kubeClusterName", kubeClusterName). - WithField("serviceType", serviceType). - Debug("Reading Kubernetes details.") + f.log.DebugContext(ctx, "Reading Kubernetes details", + "kubeconfig_path", kubeconfigPath, + "kube_cluster_name", kubeClusterName, + "service_type", serviceType, + ) // Proxy service should never have creds, forwards to kube service if serviceType == ProxyService { @@ -100,7 +100,7 @@ func (f *Forwarder) getKubeDetails(ctx context.Context) error { case KubeService: return trace.BadParameter("no Kubernetes credentials found; Kubernetes_service requires either a valid kubeconfig_file or to run inside of a Kubernetes pod") case LegacyProxyService: - f.log.Debugf("Could not load Kubernetes credentials. This proxy will still handle Kubernetes requests for trusted teleport clusters or Kubernetes nodes in this teleport cluster") + f.log.DebugContext(ctx, "Could not load Kubernetes credentials. This proxy will still handle Kubernetes requests for trusted teleport clusters or Kubernetes nodes in this teleport cluster") } return nil } @@ -124,14 +124,20 @@ func (f *Forwarder) getKubeDetails(ctx context.Context) error { for cluster, clientCfg := range cfg.Contexts { clusterCreds, err := extractKubeCreds(ctx, serviceType, cluster, clientCfg, f.log, f.cfg.CheckImpersonationPermissions) if err != nil { - f.log.WithError(err).Warnf("failed to load credentials for cluster %q.", cluster) + f.log.WarnContext(ctx, "failed to load credentials for cluster", + "cluster", cluster, + "error", err, + ) continue } kubeCluster, err := types.NewKubernetesClusterV3(types.Metadata{ Name: cluster, }, types.KubernetesClusterSpecV3{}) if err != nil { - f.log.WithError(err).Warnf("failed to create KubernetesClusterV3 from credentials for cluster %q.", cluster) + f.log.WarnContext(ctx, "failed to create KubernetesClusterV3 from credentials for cluster", + "cluster", cluster, + "error", err, + ) continue } @@ -139,13 +145,16 @@ func (f *Forwarder) getKubeDetails(ctx context.Context) error { clusterDetailsConfig{ cluster: kubeCluster, kubeCreds: clusterCreds, - log: f.log.WithField("cluster", kubeCluster.GetName()), + log: f.log.With("cluster", kubeCluster.GetName()), checker: f.cfg.CheckImpersonationPermissions, component: serviceType, clock: f.cfg.Clock, }) if err != nil { - f.log.WithError(err).Warnf("Failed to create cluster details for cluster %q.", cluster) + f.log.WarnContext(ctx, "Failed to create cluster details for cluster", + "cluster", cluster, + "error", err, + ) return trace.Wrap(err) } f.clusterDetails[cluster] = details @@ -153,10 +162,10 @@ func (f *Forwarder) getKubeDetails(ctx context.Context) error { return nil } -func extractKubeCreds(ctx context.Context, component string, cluster string, clientCfg *rest.Config, log logrus.FieldLogger, checkPermissions servicecfg.ImpersonationPermissionsChecker) (*staticKubeCreds, error) { - log = log.WithField("cluster", cluster) +func extractKubeCreds(ctx context.Context, component string, cluster string, clientCfg *rest.Config, log *slog.Logger, checkPermissions servicecfg.ImpersonationPermissionsChecker) (*staticKubeCreds, error) { + log = log.With("cluster", cluster) - log.Debug("Checking Kubernetes impersonation permissions.") + log.DebugContext(ctx, "Checking Kubernetes impersonation permissions") client, err := kubernetes.NewForConfig(clientCfg) if err != nil { return nil, trace.Wrap(err, "failed to generate Kubernetes client for cluster %q", cluster) @@ -165,9 +174,11 @@ func extractKubeCreds(ctx context.Context, component string, cluster string, cli // For each loaded cluster, check impersonation permissions. This // check only logs when permissions are not configured, but does not fail startup. if err := checkPermissions(ctx, cluster, client.AuthorizationV1().SelfSubjectAccessReviews()); err != nil { - log.WithError(err).Warning("Failed to test the necessary Kubernetes permissions. The target Kubernetes cluster may be down or have misconfigured RBAC. This teleport instance will still handle Kubernetes requests towards this Kubernetes cluster.") + log.WarnContext(ctx, "Failed to test the necessary Kubernetes permissions. The target Kubernetes cluster may be down or have misconfigured RBAC. This teleport instance will still handle Kubernetes requests towards this Kubernetes cluster.", + "error", err, + ) } else { - log.Debug("Have all necessary Kubernetes impersonation permissions.") + log.DebugContext(ctx, "Have all necessary Kubernetes impersonation permissions") } targetAddr, err := parseKubeHost(clientCfg.Host) @@ -192,7 +203,7 @@ func extractKubeCreds(ctx context.Context, component string, cluster string, cli return nil, trace.Wrap(err, "failed to generate transport from kubeconfig: %v", err) } - log.Debug("Initialized Kubernetes credentials") + log.DebugContext(ctx, "Initialized Kubernetes credentials") return &staticKubeCreds{ tlsConfig: tlsConfig, transportConfig: transportConfig, diff --git a/lib/kube/proxy/auth_test.go b/lib/kube/proxy/auth_test.go index 1263a0e8b9ad2..9d8269297f8f1 100644 --- a/lib/kube/proxy/auth_test.go +++ b/lib/kube/proxy/auth_test.go @@ -140,7 +140,6 @@ func TestGetKubeCreds(t *testing.T) { rbacSupportedTypes[allowedResourcesKey{apiGroup: "resources.teleport.dev", resourceKind: "teleportroles"}] = utils.KubeCustomResource rbacSupportedTypes[allowedResourcesKey{apiGroup: "resources.teleport.dev", resourceKind: "teleportroles/status"}] = utils.KubeCustomResource - logger := utils.NewLoggerForTests() ctx := context.TODO() const teleClusterName = "teleport-cluster" dir := t.TempDir() @@ -351,7 +350,7 @@ current-context: foo CheckImpersonationPermissions: tt.impersonationCheck, Clock: clockwork.NewFakeClock(), }, - log: logger, + log: utils.NewSlogLoggerForTests(), } err := fwd.getKubeDetails(ctx) tt.assertErr(t, err) diff --git a/lib/kube/proxy/cluster_details.go b/lib/kube/proxy/cluster_details.go index c949494ac982d..1a66ce0562978 100644 --- a/lib/kube/proxy/cluster_details.go +++ b/lib/kube/proxy/cluster_details.go @@ -21,6 +21,7 @@ package proxy import ( "context" "encoding/base64" + "log/slog" "strings" "sync" "time" @@ -29,7 +30,6 @@ import ( "github.com/aws/aws-sdk-go/service/eks" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/version" @@ -91,7 +91,7 @@ type clusterDetailsConfig struct { // cluster is the cluster to create a proxied cluster for. cluster types.KubeCluster // log is the logger to use. - log *logrus.Entry + log *slog.Logger // checker is the permissions checker to use. checker servicecfg.ImpersonationPermissionsChecker // resourceMatchers is the list of resource matchers to match the cluster against @@ -135,7 +135,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe // Create the codec factory and the list of supported types for RBAC. codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient()) if err != nil { - cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.") + cfg.log.WarnContext(ctx, "Failed to create cluster schema, the cluster may be offline", "error", err) // If the cluster is offline, we will not be able to create the codec factory // and the list of supported types for RBAC. // We mark the cluster as offline and continue to create the kubeDetails but @@ -145,7 +145,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion() if err != nil { - cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.") + cfg.log.WarnContext(ctx, "Failed to get Kubernetes cluster version, the cluster may be offline", "error", err) } ctx, cancel := context.WithCancel(ctx) @@ -198,13 +198,13 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe } else { refreshDelay.Inc() } - cfg.log.WithError(err).Error("Failed to update cluster schema") + cfg.log.ErrorContext(ctx, "Failed to update cluster schema", "error", err) continue } kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion() if err != nil { - cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.") + cfg.log.WarnContext(ctx, "Failed to get Kubernetes cluster version, the cluster may be offline", "error", err) } // Restore details refresh delay to the default value, in case previously cluster was offline. @@ -389,7 +389,7 @@ func getAWSClientRestConfig(cloudClients cloud.Clients, clock clockwork.Clock, r // getStaticCredentialsFromKubeconfig loads a kubeconfig from the cluster and returns the access credentials for the cluster. // If the config defines multiple contexts, it will pick one (the order is not guaranteed). -func getStaticCredentialsFromKubeconfig(ctx context.Context, component KubeServiceType, cluster types.KubeCluster, log *logrus.Entry, checker servicecfg.ImpersonationPermissionsChecker) (*staticKubeCreds, error) { +func getStaticCredentialsFromKubeconfig(ctx context.Context, component KubeServiceType, cluster types.KubeCluster, log *slog.Logger, checker servicecfg.ImpersonationPermissionsChecker) (*staticKubeCreds, error) { config, err := clientcmd.Load(cluster.GetKubeconfig()) if err != nil { return nil, trace.WrapWithMessage(err, "unable to parse kubeconfig for cluster %q", cluster.GetName()) diff --git a/lib/kube/proxy/cluster_details_test.go b/lib/kube/proxy/cluster_details_test.go index 116a575bc4143..9b52a695752da 100644 --- a/lib/kube/proxy/cluster_details_test.go +++ b/lib/kube/proxy/cluster_details_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/version" @@ -34,12 +33,12 @@ import ( "k8s.io/client-go/kubernetes" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/utils" ) func TestNewClusterDetails(t *testing.T) { t.Parallel() ctx := context.Background() - log := logrus.New().WithContext(ctx) getClusterDetailsConfig := func(c clockwork.FakeClock) (clusterDetailsConfig, *clusterDetailsClientSet) { client := &clusterDetailsClientSet{} @@ -48,7 +47,7 @@ func TestNewClusterDetails(t *testing.T) { kubeClient: client, }, cluster: &types.KubernetesClusterV3{}, - log: log, + log: utils.NewSlogLoggerForTests(), clock: c, }, client } diff --git a/lib/kube/proxy/ephemeral_containers.go b/lib/kube/proxy/ephemeral_containers.go index 1c9ae08e417a4..61947055c0067 100644 --- a/lib/kube/proxy/ephemeral_containers.go +++ b/lib/kube/proxy/ephemeral_containers.go @@ -83,7 +83,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri if err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to create cluster session: %v.", err) + f.log.ErrorContext(req.Context(), "Failed to create cluster session", "error", err) return nil, trace.Wrap(err) } // sess.Close cancels the connection monitor context to release it sooner. @@ -101,7 +101,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri if err := f.setupForwardingHeaders(sess, req, true /* withImpersonationHeaders */); err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to set up forwarding headers: %v.", err) + f.log.ErrorContext(req.Context(), "Failed to set up forwarding headers", "error", err) return nil, trace.Wrap(err) } if !sess.isLocalKubernetesCluster { diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index b3df6d6c0b153..aeed6d7c631ac 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -41,7 +41,6 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/julienschmidt/httprouter" - "github.com/sirupsen/logrus" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" oteltrace "go.opentelemetry.io/otel/trace" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -86,6 +85,7 @@ import ( "github.com/gravitational/teleport/lib/srv" "github.com/gravitational/teleport/lib/sshca" "github.com/gravitational/teleport/lib/utils" + logutils "github.com/gravitational/teleport/lib/utils/log" ) // KubeServiceType specifies a Teleport service type which can forward Kubernetes requests @@ -157,7 +157,7 @@ type ForwarderConfig struct { // PROXYSigner is used to sign PROXY headers for securely propagating client IP address PROXYSigner multiplexer.PROXYHeaderSigner // log is the logger function - log logrus.FieldLogger + log *slog.Logger // TracerProvider is used to create tracers capable // of starting spans. TracerProvider oteltrace.TracerProvider @@ -272,7 +272,7 @@ func (f *ForwarderConfig) CheckAndSetDefaults() error { f.KubeClusterName = f.ClusterName } if f.log == nil { - f.log = logrus.New() + f.log = slog.Default() } return nil } @@ -347,7 +347,7 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) { fwd.router = instrumentHTTPHandler(fwd.cfg.KubeServiceType, router) if cfg.ClusterOverride != "" { - fwd.log.Debugf("Cluster override is set, forwarder will send all requests to remote cluster %v.", cfg.ClusterOverride) + fwd.log.DebugContext(closeCtx, "Cluster override is set, forwarder will send all requests to remote cluster", "cluster_override", cfg.ClusterOverride) } if len(cfg.KubeClusterName) > 0 || len(cfg.KubeconfigPath) > 0 || cfg.KubeServiceType != KubeService { if err := fwd.getKubeDetails(cfg.Context); err != nil { @@ -363,7 +363,7 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) { // however some requests like exec sessions it intercepts and records. type Forwarder struct { mu sync.Mutex - log logrus.FieldLogger + log *slog.Logger router http.Handler cfg ForwarderConfig // activeRequests is a map used to serialize active CSR requests to the auth server @@ -540,7 +540,7 @@ func (f *Forwarder) authenticate(req *http.Request) (*authContext, error) { var isRemoteUser bool userTypeI, err := authz.UserFromContext(ctx) if err != nil { - f.log.WithError(err).Warn("error getting user from context") + f.log.WarnContext(ctx, "error getting user from context", "error", err) return nil, trace.AccessDenied(accessDeniedMsg) } switch userTypeI.(type) { @@ -549,10 +549,12 @@ func (f *Forwarder) authenticate(req *http.Request) (*authContext, error) { case authz.RemoteUser: isRemoteUser = true case authz.BuiltinRole: - f.log.Warningf("Denying proxy access to unauthenticated user of type %T - this can sometimes be caused by inadvertently using an HTTP load balancer instead of a TCP load balancer on the Kubernetes port.", userTypeI) + f.log.WarnContext(ctx, "Denying proxy access to unauthenticated user - this can sometimes be caused by inadvertently using an HTTP load balancer instead of a TCP load balancer on the Kubernetes port", + "user_type", logutils.TypeAttr(userTypeI), + ) return nil, trace.AccessDenied(accessDeniedMsg) default: - f.log.Warningf("Denying proxy access to unsupported user type: %T.", userTypeI) + f.log.WarnContext(ctx, "Denying proxy access to unsupported user type", "user_type", logutils.TypeAttr(userTypeI)) return nil, trace.AccessDenied(accessDeniedMsg) } @@ -563,7 +565,7 @@ func (f *Forwarder) authenticate(req *http.Request) (*authContext, error) { authContext, err := f.setupContext(ctx, *userContext, req, isRemoteUser) if err != nil { - f.log.WithError(err).Warn("Unable to setup context.") + f.log.WarnContext(ctx, "Unable to setup context", "error", err) if trace.IsAccessDenied(err) { return nil, trace.AccessDenied(accessDeniedMsg) } @@ -726,7 +728,7 @@ func (f *Forwarder) formatStatusResponseError(rw http.ResponseWriter, respErr er } data, err := runtime.Encode(globalKubeCodecs.LegacyCodec(), status) if err != nil { - f.log.Warningf("Failed encoding error into kube Status object: %v", err) + f.log.WarnContext(f.ctx, "Failed encoding error into kube Status object", "error", err) trace.WriteError(rw, respErr) return } @@ -737,7 +739,7 @@ func (f *Forwarder) formatStatusResponseError(rw http.ResponseWriter, respErr er // has prevented the request from succeeding`` instead of the correct reason. rw.WriteHeader(trace.ErrorToCode(respErr)) if _, err := rw.Write(data); err != nil { - f.log.Warningf("Failed writing kube error response body: %v", err) + f.log.WarnContext(f.ctx, "Failed writing kube error response body", "error", err) } } @@ -919,7 +921,7 @@ func (f *Forwarder) emitAuditEvent(req *http.Request, sess *clusterSession, stat r.populateEvent(event) if err := f.cfg.AuthClient.EmitAuditEvent(f.ctx, event); err != nil { - f.log.WithError(err).Warn("Failed to emit event.") + f.log.WarnContext(f.ctx, "Failed to emit event", "error", err) } } @@ -1033,13 +1035,17 @@ func (f *Forwarder) authorize(ctx context.Context, actx *authContext) error { if actx.teleportCluster.isRemote { // Authorization for a remote kube cluster will happen on the remote // end (by their proxy), after that cluster has remapped used roles. - f.log.WithField("auth_context", actx.String()).Debug("Skipping authorization for a remote kubernetes cluster name") + f.log.DebugContext(ctx, "Skipping authorization for a remote kubernetes cluster name", + "auth_context", logutils.StringerAttr(actx), + ) return nil } if actx.kubeClusterName == "" { // This should only happen for remote clusters (filtered above), but // check and report anyway. - f.log.WithField("auth_context", actx.String()).Debug("Skipping authorization due to unknown kubernetes cluster name") + f.log.DebugContext(ctx, "Skipping authorization due to unknown kubernetes cluster name", + "auth_context", logutils.StringerAttr(actx), + ) return nil } @@ -1135,7 +1141,9 @@ func (f *Forwarder) authorize(ctx context.Context, actx *authContext) error { return nil } if actx.kubeClusterName == f.cfg.ClusterName { - f.log.WithField("auth_context", actx.String()).Debug("Skipping authorization for proxy-based kubernetes cluster,") + f.log.DebugContext(ctx, "Skipping authorization for proxy-based kubernetes cluster", + "auth_context", logutils.StringerAttr(actx), + ) return nil } return trace.AccessDenied(notFoundMessage) @@ -1168,7 +1176,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ joinSessionsInFlightGauge.WithLabelValues(f.cfg.KubeServiceType).Inc() defer joinSessionsInFlightGauge.WithLabelValues(f.cfg.KubeServiceType).Dec() - f.log.Debugf("Join %v.", req.URL.String()) + f.log.DebugContext(req.Context(), "Joining session", "join_url", logutils.StringerAttr(req.URL)) sess, err := f.newClusterSession(req.Context(), *ctx) if err != nil { @@ -1241,7 +1249,11 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ close(closeC) if _, err := session.leave(party.ID); err != nil { - f.log.WithError(err).Debugf("Participant %q was unable to leave session %s", party.ID, session.id) + f.log.DebugContext(req.Context(), "Participant was unable to leave session", + "participant_id", party.ID, + "session_id", session.id, + "error", err, + ) } wg.Wait() @@ -1249,7 +1261,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ }(); err != nil { writeErr := ws.WriteControl(gwebsocket.CloseMessage, gwebsocket.FormatCloseMessage(gwebsocket.CloseInternalServerErr, err.Error()), time.Now().Add(time.Second*10)) if writeErr != nil { - f.log.WithError(writeErr).Warn("Failed to send early-exit websocket close message.") + f.log.WarnContext(req.Context(), "Failed to send early-exit websocket close message", "error", writeErr) } } @@ -1337,7 +1349,7 @@ func (f *Forwarder) remoteJoin(ctx *authContext, w http.ResponseWriter, req *htt } defer wsSource.Close() - wsProxy(f.log, wsSource, wsTarget) + wsProxy(req.Context(), f.log, wsSource, wsTarget) return nil, nil } @@ -1362,7 +1374,7 @@ func (f *Forwarder) getSessionHostID(ctx context.Context, authCtx *authContext, // wsProxy proxies a websocket connection between two clusters transparently to allow for // remote joins. -func wsProxy(log logrus.FieldLogger, wsSource *gwebsocket.Conn, wsTarget *gwebsocket.Conn) { +func wsProxy(ctx context.Context, log *slog.Logger, wsSource *gwebsocket.Conn, wsTarget *gwebsocket.Conn) { errS := make(chan error, 1) errT := make(chan error, 1) wg := &sync.WaitGroup{} @@ -1416,7 +1428,7 @@ func wsProxy(log logrus.FieldLogger, wsSource *gwebsocket.Conn, wsTarget *gwebso var websocketErr *gwebsocket.CloseError if errors.As(err, &websocketErr) && websocketErr.Code == gwebsocket.CloseAbnormalClosure { - log.WithError(err).Debugf("websocket proxy: Error when copying from %s to %s", from, to) + log.DebugContext(ctx, "websocket proxying failed", "src", from, "target", to, "error", err) } wg.Wait() } @@ -1496,7 +1508,7 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, req *http.Request, _ ht } if err := f.cfg.Emitter.EmitAuditEvent(f.ctx, sessionStartEvent); err != nil { - f.log.WithError(err).Warn("Failed to emit event.") + f.log.WarnContext(f.ctx, "Failed to emit event", "error", err) return trace.Wrap(err) } @@ -1518,7 +1530,7 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, req *http.Request, _ ht defer func() { if err := f.cfg.Emitter.EmitAuditEvent(f.ctx, execEvent); err != nil { - f.log.WithError(err).Warn("Failed to emit exec event.") + f.log.WarnContext(f.ctx, "Failed to emit exec event", "error", err) } sessionEndEvent := &apievents.SessionEnd{ @@ -1541,7 +1553,7 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, req *http.Request, _ ht } if err := f.cfg.Emitter.EmitAuditEvent(f.ctx, sessionEndEvent); err != nil { - f.log.WithError(err).Warn("Failed to emit session end event.") + f.log.WarnContext(f.ctx, "Failed to emit session end event", "error", err) } }() @@ -1550,7 +1562,7 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, req *http.Request, _ ht execEvent.Code = events.ExecFailureCode execEvent.Error, execEvent.ExitCode = exitCode(err) - f.log.WithError(err).Warning("Failed creating executor.") + f.log.WarnContext(f.ctx, "Failed creating executor", "error", err) return trace.Wrap(err) } @@ -1560,7 +1572,7 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, req *http.Request, _ ht execEvent.Code = events.ExecFailureCode execEvent.Error, execEvent.ExitCode = exitCode(err) - f.log.WithError(err).Warning("Executor failed while streaming.") + f.log.WarnContext(f.ctx, "Executor failed while streaming", "error", err) return trace.Wrap(err) } @@ -1629,10 +1641,10 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. ) defer span.End() - f.log.Debugf("Exec %v.", req.URL.String()) + f.log.DebugContext(ctx, "Starting exec", "exec_url", logutils.StringerAttr(req.URL)) defer func() { if err != nil { - f.log.WithError(err).Debug("Exec request failed") + f.log.DebugContext(ctx, "Exec request failed", "error", err) } }() @@ -1640,7 +1652,7 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. if err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to create cluster session: %v.", err) + f.log.ErrorContext(ctx, "Failed to create cluster session", "error", err) return nil, trace.Wrap(err) } // sess.Close cancels the connection monitor context to release it sooner. @@ -1702,7 +1714,11 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. err = <-party.closeC if _, errLeave := session.leave(party.ID); errLeave != nil { - f.log.WithError(errLeave).Debugf("Participant %q was unable to leave session %s", party.ID, session.id) + f.log.DebugContext(ctx, "Participant was unable to leave session", + "participant_id", party.ID, + "session_id", session.id, + "error", errLeave, + ) } return trace.Wrap(err) @@ -1714,13 +1730,13 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. func (f *Forwarder) remoteExec(req *http.Request, sess *clusterSession, proxy *remoteCommandProxy) error { executor, err := f.getExecutor(sess, req) if err != nil { - f.log.WithError(err).Warning("Failed creating executor.") + f.log.WarnContext(req.Context(), "Failed creating executor", "error", err) return trace.Wrap(err) } streamOptions := proxy.options() err = executor.StreamWithContext(req.Context(), streamOptions) if err != nil { - f.log.WithError(err).Warning("Executor failed while streaming.") + f.log.WarnContext(req.Context(), "Executor failed while streaming", "error", err) } return trace.Wrap(err) @@ -1745,12 +1761,15 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req ) defer span.End() - f.log.Debugf("Port forward: %v. req headers: %v.", req.URL.String(), req.Header) + f.log.DebugContext(ctx, "Handling port forward request", + "request_url", logutils.StringerAttr(req.URL), + "request_headers", req.Header, + ) sess, err := f.newClusterSession(ctx, *authCtx) if err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to create cluster session: %v.", err) + f.log.ErrorContext(ctx, "Failed to create cluster session", "error", err) return nil, trace.Wrap(err) } // sess.Close cancels the connection monitor context to release it sooner. @@ -1765,7 +1784,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req } if err := f.setupForwardingHeaders(sess, req, true /* withImpersonationHeaders */); err != nil { - f.log.Debugf("DENIED Port forward: %v.", req.URL.String()) + f.log.DebugContext(ctx, "DENIED Port forward", "request_url", logutils.StringerAttr(req.URL)) return nil, trace.Wrap(err) } @@ -1805,7 +1824,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req portForward.Code = events.PortForwardFailureCode } if err := f.cfg.Emitter.EmitAuditEvent(f.ctx, portForward); err != nil { - f.log.WithError(err).Warn("Failed to emit event.") + f.log.WarnContext(ctx, "Failed to emit event", "error", err) } } defer func() { @@ -1829,7 +1848,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req }, } if err := f.cfg.Emitter.EmitAuditEvent(f.ctx, portForward); err != nil { - f.log.WithError(err).Warn("Failed to emit event.") + f.log.WarnContext(ctx, "Failed to emit event", "error", err) } } }() @@ -1847,12 +1866,12 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req pingPeriod: f.cfg.ConnPingPeriod, idleTimeout: sess.clientIdleTimeout, } - f.log.Debugf("Starting %v.", request) + f.log.DebugContext(ctx, "Starting port forwarding", "request", request) err = runPortForwarding(request) if err != nil { return nil, trace.Wrap(err) } - f.log.Debugf("Done %v.", request) + f.log.DebugContext(ctx, "Completed port forwarding", "request", request) return nil, nil } @@ -2057,11 +2076,11 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h req = req.WithContext(ctx) defer span.End() - sess, err := f.newClusterSession(req.Context(), *authCtx) + sess, err := f.newClusterSession(ctx, *authCtx) if err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to create cluster session: %v.", err) + f.log.ErrorContext(ctx, "Failed to create cluster session", "error", err) return nil, trace.Wrap(err) } // sess.Close cancels the connection monitor context to release it sooner. @@ -2079,7 +2098,7 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h if err := f.setupForwardingHeaders(sess, req, true /* withImpersonationHeaders */); err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to set up forwarding headers: %v.", err) + f.log.ErrorContext(ctx, "Failed to set up forwarding headers", "error", err) return nil, trace.Wrap(err) } @@ -2151,7 +2170,10 @@ func (f *Forwarder) getWebsocketRestConfig(sess *clusterSession, req *http.Reque } func (f *Forwarder) getWebsocketExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { - f.log.Debugf("Creating websocket remote executor for request %s %s", req.Method, req.RequestURI) + f.log.DebugContext(req.Context(), "Creating websocket remote executor for request", + "request_method", req.Method, + "request_uri", req.RequestURI, + ) cfg, err := f.getWebsocketRestConfig(sess, req) if err != nil { return nil, trace.Wrap(err, "unable to create websocket executor") @@ -2190,7 +2212,10 @@ func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remote } func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { - f.log.Debugf("Creating SPDY remote executor for request %s %s", req.Method, req.RequestURI) + f.log.DebugContext(req.Context(), "Creating SPDY remote executor for request", + "request_method", req.Method, + "request_uri", req.RequestURI, + ) tlsConfig, useImpersonation, err := f.getTLSConfig(sess) if err != nil { @@ -2381,11 +2406,9 @@ func (s *clusterSession) monitorConn(conn net.Conn, err error, hostID string) (n Context: s.connCtx, TeleportUser: s.User.GetName(), ServerID: s.parent.cfg.HostID, - // TODO(tross) update this to use the child logger - // once Forwarder is converted to use a slog.Logger - Logger: slog.Default(), - Emitter: s.parent.cfg.AuthClient, - EmitterContext: s.parent.ctx, + Logger: s.parent.log, + Emitter: s.parent.cfg.AuthClient, + EmitterContext: s.parent.ctx, }) if err != nil { tc.CloseWithCause(err) @@ -2462,7 +2485,7 @@ func (f *Forwarder) newClusterSession(ctx context.Context, authCtx authContext) } func (f *Forwarder) newClusterSessionRemoteCluster(ctx context.Context, authCtx authContext) (*clusterSession, error) { - f.log.Debugf("Forwarding kubernetes session for %v to remote cluster.", authCtx) + f.log.DebugContext(ctx, "Forwarding kubernetes session to remote cluster", "auth_context", logutils.StringerAttr(authCtx)) connCtx, cancel := context.WithCancelCause(ctx) return &clusterSession{ parent: f, @@ -2511,7 +2534,7 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont return nil, trace.Wrap(err) } connCtx, cancel := context.WithCancelCause(ctx) - f.log.Debugf("Handling kubernetes session for %v using local credentials.", authCtx) + f.log.DebugContext(ctx, "Handling kubernetes session using local credentials", "auth_context", logutils.StringerAttr(authCtx)) return &clusterSession{ parent: f, authContext: authCtx, @@ -2550,8 +2573,7 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*reverseproxy.Fo opts := []reverseproxy.Option{ reverseproxy.WithFlushInterval(100 * time.Millisecond), reverseproxy.WithRoundTripper(transport), - // TODO(tross): convert this to use f.log once it has been converted to use slog - reverseproxy.WithLogger(slog.Default()), + reverseproxy.WithLogger(f.log), reverseproxy.WithErrorHandler(f.formatForwardResponseError), } if sess.isLocalKubernetesCluster { diff --git a/lib/kube/proxy/forwarder_test.go b/lib/kube/proxy/forwarder_test.go index ef10408506f5d..860746ab97213 100644 --- a/lib/kube/proxy/forwarder_test.go +++ b/lib/kube/proxy/forwarder_test.go @@ -40,7 +40,6 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/julienschmidt/httprouter" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -132,7 +131,7 @@ func TestAuthenticate(t *testing.T) { }, } f := &Forwarder{ - log: logrus.NewEntry(logrus.New()), + log: utils.NewSlogLoggerForTests(), cfg: ForwarderConfig{ ClusterName: "local", CachingAuthClient: ap, @@ -1112,7 +1111,7 @@ func newMockForwader(ctx context.Context, t *testing.T) *Forwarder { require.NoError(t, err) return &Forwarder{ - log: logrus.NewEntry(logrus.New()), + log: utils.NewSlogLoggerForTests(), router: httprouter.New(), cfg: ForwarderConfig{ Keygen: testauthority.New(), @@ -1316,7 +1315,7 @@ func (m *mockWatcher) Done() <-chan struct{} { func newTestForwarder(ctx context.Context, cfg ForwarderConfig) *Forwarder { return &Forwarder{ - log: logrus.NewEntry(logrus.New()), + log: utils.NewSlogLoggerForTests(), router: httprouter.New(), cfg: cfg, activeRequests: make(map[string]context.Context), @@ -1676,7 +1675,7 @@ func TestForwarderTLSConfigCAs(t *testing.T) { return x509.NewCertPool(), nil }, }, - log: logrus.NewEntry(logrus.New()), + log: utils.NewSlogLoggerForTests(), ctx: context.Background(), } diff --git a/lib/kube/proxy/kube_creds.go b/lib/kube/proxy/kube_creds.go index 19fd6edb2bd69..fd7e367d5f8cf 100644 --- a/lib/kube/proxy/kube_creds.go +++ b/lib/kube/proxy/kube_creds.go @@ -21,13 +21,13 @@ package proxy import ( "context" "crypto/tls" + "log/slog" "net/http" "sync" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/transport" @@ -151,7 +151,7 @@ type dynamicKubeCreds struct { ctx context.Context renewTicker clockwork.Ticker staticCreds *staticKubeCreds - log logrus.FieldLogger + log *slog.Logger closeC chan struct{} client dynamicCredsClient checker servicecfg.ImpersonationPermissionsChecker @@ -164,7 +164,7 @@ type dynamicKubeCreds struct { // dynamicCredsConfig contains configuration for dynamicKubeCreds. type dynamicCredsConfig struct { kubeCluster types.KubeCluster - log logrus.FieldLogger + log *slog.Logger client dynamicCredsClient checker servicecfg.ImpersonationPermissionsChecker clock clockwork.Clock @@ -224,7 +224,7 @@ func newDynamicKubeCreds(ctx context.Context, cfg dynamicCredsConfig) (*dynamicK return case <-dyn.renewTicker.Chan(): if err := dyn.renewClientset(cfg.kubeCluster); err != nil { - logrus.WithError(err).Warnf("Unable to renew cluster %q credentials.", cfg.kubeCluster.GetName()) + cfg.log.WarnContext(ctx, "Unable to renew cluster credentials", "cluster", cfg.kubeCluster.GetName(), "error", err) } } } diff --git a/lib/kube/proxy/kube_creds_test.go b/lib/kube/proxy/kube_creds_test.go index ef8950691c0a1..b032964021b73 100644 --- a/lib/kube/proxy/kube_creds_test.go +++ b/lib/kube/proxy/kube_creds_test.go @@ -30,19 +30,19 @@ import ( "github.com/aws/aws-sdk-go/service/eks" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" authztypes "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/api/utils" + apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/cloud" "github.com/gravitational/teleport/lib/cloud/azure" "github.com/gravitational/teleport/lib/cloud/gcp" "github.com/gravitational/teleport/lib/cloud/mocks" "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/services" + "github.com/gravitational/teleport/lib/utils" ) // Test_DynamicKubeCreds tests the dynamic kube credrentials generator for @@ -54,7 +54,6 @@ func Test_DynamicKubeCreds(t *testing.T) { t.Parallel() var ( fakeClock = clockwork.NewFakeClock() - log = logrus.New() notify = make(chan struct{}, 1) ttl = 14 * time.Minute ) @@ -303,7 +302,7 @@ func Test_DynamicKubeCreds(t *testing.T) { ) error { return nil }, - log: log, + log: utils.NewSlogLoggerForTests(), kubeCluster: tt.args.cluster, client: tt.args.client, initialRenewInterval: ttl / 2, @@ -332,8 +331,8 @@ func Test_DynamicKubeCreds(t *testing.T) { } require.NoError(t, got.close()) - require.Equal(t, tt.wantAssumedRole, utils.Deduplicate(sts.GetAssumedRoleARNs())) - require.Equal(t, tt.wantExternalIds, utils.Deduplicate(sts.GetAssumedRoleExternalIDs())) + require.Equal(t, tt.wantAssumedRole, apiutils.Deduplicate(sts.GetAssumedRoleARNs())) + require.Equal(t, tt.wantExternalIds, apiutils.Deduplicate(sts.GetAssumedRoleExternalIDs())) sts.ResetAssumeRoleHistory() }) } diff --git a/lib/kube/proxy/portforward_spdy.go b/lib/kube/proxy/portforward_spdy.go index 561750239250d..1745536fc44f1 100644 --- a/lib/kube/proxy/portforward_spdy.go +++ b/lib/kube/proxy/portforward_spdy.go @@ -19,6 +19,7 @@ package proxy import ( "context" "fmt" + "log/slog" "net" "net/http" "strconv" @@ -26,7 +27,6 @@ import ( "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/httpstream" spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" @@ -91,10 +91,10 @@ func runPortForwardingHTTPStreams(req portForwardRequest) error { defer conn.Close() h := &portForwardProxy{ - Entry: log.WithFields(log.Fields{ - teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube), - events.RemoteAddr: req.httpRequest.RemoteAddr, - }), + logger: slog.With( + teleport.ComponentKey, teleport.Component(teleport.ComponentProxyKube), + events.RemoteAddr, req.httpRequest.RemoteAddr, + ), portForwardRequest: req, sourceConn: conn, streamChan: streamChan, @@ -104,7 +104,7 @@ func runPortForwardingHTTPStreams(req portForwardRequest) error { } defer h.Close() - h.Debugf("Setting port forwarding streaming connection idle timeout to %s.", req.idleTimeout) + h.logger.DebugContext(req.context, "Setting port forwarding streaming connection idle timeout", "idle_timeout", req.idleTimeout) conn.SetIdleTimeout(req.idleTimeout) h.run() @@ -149,7 +149,7 @@ func httpStreamReceived(ctx context.Context, streams chan httpstream.Stream) fun // portForwardProxy is capable of processing multiple port forward // requests over a single httpstream.Connection. type portForwardProxy struct { - *log.Entry + logger *slog.Logger portForwardRequest sourceConn httpstream.Connection streamChan chan httpstream.Stream @@ -200,7 +200,7 @@ func (h *portForwardProxy) forwardStreamPair(p *httpStreamPair, remotePort int64 go func() { defer wg.Done() if err := utils.ProxyConn(h.context, p.errorStream, targetErrorStream); err != nil { - h.WithError(err).Debugf("Unable to proxy portforward error-stream.") + h.logger.DebugContext(h.context, "Unable to proxy portforward error-stream", "error", err) } }() @@ -222,14 +222,14 @@ func (h *portForwardProxy) forwardStreamPair(p *httpStreamPair, remotePort int64 go func() { defer wg.Done() if err := utils.ProxyConn(h.context, p.dataStream, targetDataStream); err != nil { - h.WithError(err).Debugf("Unable to proxy portforward data-stream.") + h.logger.DebugContext(h.context, "Unable to proxy portforward data-stream", "error", err) } }() - h.Debugf("Streams have been created, Waiting for copy to complete.") + h.logger.DebugContext(h.context, "Streams have been created, Waiting for copy to complete") // wait for the copies to complete before returning. wg.Wait() - h.Debugf("Port forwarding pair completed.") + h.logger.DebugContext(h.context, "Port forwarding pair completed") return nil } @@ -241,11 +241,11 @@ func (h *portForwardProxy) getStreamPair(requestID string) (*httpStreamPair, boo defer h.streamPairsLock.Unlock() if p, ok := h.streamPairs[requestID]; ok { - log.Debugf("Request %s, found existing stream pair", requestID) + h.logger.DebugContext(h.context, "Found existing stream pair for request", "request_id", requestID) return p, false } - h.Debugf("Request %s, creating new stream pair.", requestID) + h.logger.DebugContext(h.context, "Creating new stream pair for request", "request_id", requestID) p := newPortForwardPair(requestID) h.streamPairs[requestID] = p @@ -261,9 +261,9 @@ func (h *portForwardProxy) monitorStreamPair(p *httpStreamPair) { defer timeC.Stop() select { case <-timeC.C: - h.Errorf("Request %s, timed out waiting for streams.", p.requestID) + h.logger.ErrorContext(h.context, "Request timed out waiting for streams", "request_id", p.requestID) case <-p.complete: - h.Debugf("Request %s, successfully received error and data streams.", p.requestID) + h.logger.DebugContext(h.context, "Request successfully received error and data streams", "request_id", p.requestID) } h.removeStreamPair(p.requestID) } @@ -296,23 +296,24 @@ func (h *portForwardProxy) requestID(stream httpstream.Stream) (string, error) { // streams, invoking portForward for each complete stream pair. The loop exits // when the httpstream.Connection is closed. func (h *portForwardProxy) run() { - h.Debugf("Waiting for port forward streams.") + h.logger.DebugContext(h.context, "Waiting for port forward streams") for { select { case <-h.context.Done(): - h.Debugf("Context is closing, returning.") + h.logger.DebugContext(h.context, "Context is closing, returning") return case <-h.sourceConn.CloseChan(): - h.Debugf("Upgraded connection closed.") + h.logger.DebugContext(h.context, "Upgraded connection closed") return case stream := <-h.streamChan: requestID, err := h.requestID(stream) if err != nil { - h.Warningf("Failed to parse request id: %v.", err) + h.logger.WarnContext(h.context, "Failed to parse request id", "error", err) return } + streamType := stream.Headers().Get(StreamType) - h.Debugf("Received new stream %v of type %v.", requestID, streamType) + h.logger.DebugContext(h.context, "Received new stream", "request_id", requestID, "stream_type", streamType) p, created := h.getStreamPair(requestID) if created { @@ -336,13 +337,15 @@ func (h *portForwardProxy) portForward(p *httpStreamPair) { portString := p.dataStream.Headers().Get(PortHeader) port, _ := strconv.ParseInt(portString, 10, 32) - h.Debugf("Forwarding port %v -> %v.", p.requestID, portString) + logger := h.logger.With("request_id", p.requestID, "port", portString) + + logger.DebugContext(h.context, "Forwarding port") if err := h.forwardStreamPair(p, port); err != nil { - h.WithError(err).Debugf("Error forwarding port %v -> %v.", p.requestID, portString) + logger.DebugContext(h.context, "Error forwarding port", "error", err) return } - h.Debugf("Completed forwarding port %v -> %v.", p.requestID, portString) + h.logger.DebugContext(h.context, "Completed forwarding port") } // httpStreamPair represents the error and data streams for a port diff --git a/lib/kube/proxy/portforward_test.go b/lib/kube/proxy/portforward_test.go index b923ca591129e..7dc7a147e22b7 100644 --- a/lib/kube/proxy/portforward_test.go +++ b/lib/kube/proxy/portforward_test.go @@ -31,7 +31,6 @@ import ( "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -43,6 +42,7 @@ import ( "k8s.io/client-go/transport/spdy" testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" + "github.com/gravitational/teleport/lib/utils" ) func TestPortForwardKubeService(t *testing.T) { @@ -269,7 +269,6 @@ type portForwarder interface { // connection, it will leak memory. func TestPortForwardProxy_run_connsClosed(t *testing.T) { t.Parallel() - logger := log.NewEntry(&log.Logger{Out: io.Discard}) const ( reqID = "reqID" // portHeaderValue is the value of the port header in the stream. @@ -285,7 +284,7 @@ func TestPortForwardProxy_run_connsClosed(t *testing.T) { context: context.Background(), onPortForward: func(addr string, success bool) {}, }, - Entry: logger, + logger: utils.NewSlogLoggerForTests(), sourceConn: sourceConn, targetConn: targetConn, streamChan: make(chan httpstream.Stream), diff --git a/lib/kube/proxy/portforward_websocket.go b/lib/kube/proxy/portforward_websocket.go index af1cb04ab525a..c2cb4cb6c97a9 100644 --- a/lib/kube/proxy/portforward_websocket.go +++ b/lib/kube/proxy/portforward_websocket.go @@ -23,13 +23,13 @@ import ( "encoding/binary" "fmt" "io" + "log/slog" "net/http" "strings" "sync" gwebsocket "github.com/gorilla/websocket" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/httpstream" spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" @@ -148,10 +148,10 @@ func runPortForwardingWebSocket(req portForwardRequest) error { podName: req.podName, targetConn: targetConn, onPortForward: req.onPortForward, - FieldLogger: logrus.WithFields(logrus.Fields{ - teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube), - events.RemoteAddr: req.httpRequest.RemoteAddr, - }), + logger: slog.With( + teleport.ComponentKey, teleport.Component(teleport.ComponentProxyKube), + events.RemoteAddr, req.httpRequest.RemoteAddr, + ), context: req.context, } // run the portforward request until termination. @@ -213,8 +213,8 @@ type websocketPortforwardHandler struct { podName string targetConn httpstream.Connection onPortForward portForwardCallback - logrus.FieldLogger - context context.Context + logger *slog.Logger + context context.Context } // run invokes the targetConn SPDY connection and copies the client data into @@ -237,10 +237,12 @@ func (h *websocketPortforwardHandler) run() { // portForward copies the client and upstream streams. func (h *websocketPortforwardHandler) portForward(p *websocketChannelPair) { - h.Debugf("Forwarding port %v -> %v.", p.requestID, p.port) + logger := h.logger.With("request_id", p.requestID, "port", p.port) + + logger.DebugContext(h.context, "Forwarding port") h.forwardStreamPair(p) - h.Debugf("Completed forwarding port %v -> %v.", p.requestID, p.port) + logger.DebugContext(h.context, "Completed forwarding port") } func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair) { @@ -269,7 +271,7 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair) go func() { defer wg.Done() if err := utils.ProxyConn(h.context, p.errorStream, targetErrorStream); err != nil { - h.WithError(err).Debugf("Unable to proxy portforward error-stream.") + h.logger.DebugContext(h.context, "Unable to proxy portforward error-stream", "error", err) } }() @@ -292,15 +294,15 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair) go func() { defer wg.Done() if err := utils.ProxyConn(h.context, p.dataStream, targetDataStream); err != nil { - h.WithError(err).Debugf("Unable to proxy portforward data-stream.") + h.logger.DebugContext(h.context, "Unable to proxy portforward data-stream", "error", err) } }() - h.Debugf("Streams have been created, Waiting for copy to complete.") + h.logger.DebugContext(h.context, "Streams have been created, Waiting for copy to complete") // Wait until every goroutine exits. wg.Wait() - h.Debugf("Port forwarding pair completed.") + h.logger.DebugContext(h.context, "Port forwarding pair completed") } // runPortForwardingTunneledHTTPStreams handles a port-forwarding request that uses SPDY protocol @@ -341,10 +343,10 @@ func runPortForwardingTunneledHTTPStreams(req portForwardRequest) error { defer conn.Close() h := &portForwardProxy{ - Entry: logrus.WithFields(logrus.Fields{ - teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube), - events.RemoteAddr: req.httpRequest.RemoteAddr, - }), + logger: slog.With( + teleport.ComponentKey, teleport.Component(teleport.ComponentProxyKube), + events.RemoteAddr, req.httpRequest.RemoteAddr, + ), portForwardRequest: req, sourceConn: spdyConn, streamChan: streamChan, @@ -354,7 +356,7 @@ func runPortForwardingTunneledHTTPStreams(req portForwardRequest) error { } defer h.Close() - h.Debugf("Setting port forwarding streaming connection idle timeout to %s.", req.idleTimeout) + h.logger.DebugContext(context.Background(), "Setting port forwarding streaming connection idle timeout to", "idle_timeout", req.idleTimeout) spdyConn.SetIdleTimeout(req.idleTimeout) h.run() diff --git a/lib/kube/proxy/remotecommand.go b/lib/kube/proxy/remotecommand.go index 09a9c868b43ca..2cd03c870d70b 100644 --- a/lib/kube/proxy/remotecommand.go +++ b/lib/kube/proxy/remotecommand.go @@ -22,12 +22,12 @@ import ( "errors" "fmt" "io" + "log/slog" "net/http" "strings" "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -75,7 +75,7 @@ func (req remoteCommandRequest) eventPodMeta(ctx context.Context, creds kubeCred // here shouldn't prevent a session from starting. pod, err := creds.getKubeClient().CoreV1().Pods(req.podNamespace).Get(ctx, req.podName, metav1.GetOptions{}) if err != nil { - log.WithError(err).Debugf("Failed fetching pod from kubernetes API; skipping additional metadata on the audit event") + slog.DebugContext(ctx, "Failed fetching pod from kubernetes API; skipping additional metadata on the audit event", "error", err) return meta } meta.KubernetesNodeName = pod.Spec.NodeName @@ -121,7 +121,7 @@ func upgradeRequestToRemoteCommandProxy(req remoteCommandRequest, exec func(*rem err = nil } if err := proxy.sendStatus(err); err != nil { - log.Warningf("Failed to send status: %v", err) + slog.WarnContext(req.context, "Failed to send status", "error", err) } // return rsp=nil, err=nil to indicate that the request has been handled // by the hijacked connection. If we return an error, the request will be @@ -162,10 +162,10 @@ func createSPDYStreams(req remoteCommandRequest) (*remoteCommandProxy, error) { var handler protocolHandler switch protocol { case "": - log.Warningf("Client did not request protocol negotiation.") + slog.WarnContext(ctx, "Client did not request protocol negotiation") fallthrough case StreamProtocolV4Name: - log.Infof("Negotiated protocol %v.", protocol) + slog.InfoContext(ctx, "Negotiated protocol", "protocol", protocol) handler = &v4ProtocolHandler{} default: err = trace.BadParameter("protocol %v is not supported. upgrade the client", protocol) @@ -357,7 +357,7 @@ func (t *termQueue) handleResizeEvents(stream io.Reader) { size := remotecommand.TerminalSize{} if err := decoder.Decode(&size); err != nil { if !errors.Is(err, io.EOF) { - log.Warningf("Failed to decode resize event: %v", err) + slog.WarnContext(t.done, "Failed to decode resize event", "error", err) } t.cancel() return @@ -412,7 +412,7 @@ WaitForStreams: remoteProxy.resizeStream = stream go waitStreamReply(stopCtx, stream.replySent, replyChan) default: - log.Warningf("Ignoring unexpected stream type: %q", streamType) + slog.WarnContext(stopCtx, "Ignoring unexpected stream type", "stream_type", streamType) } case <-replyChan: receivedStreams++ diff --git a/lib/kube/proxy/resource_deletecollection.go b/lib/kube/proxy/resource_deletecollection.go index f73c5f2043426..0e9fc22540c02 100644 --- a/lib/kube/proxy/resource_deletecollection.go +++ b/lib/kube/proxy/resource_deletecollection.go @@ -21,10 +21,10 @@ package proxy import ( "context" "io" + "log/slog" "net/http" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" oteltrace "go.opentelemetry.io/otel/trace" appsv1 "k8s.io/api/apps/v1" @@ -566,7 +566,7 @@ func (f *Forwarder) handleDeleteCustomResourceCollection(w http.ResponseWriter, type deleteResourcesCommonParams struct { ctx context.Context - log logrus.FieldLogger + log *slog.Logger authCtx *authContext header http.Header kubeDetails *kubeDetails diff --git a/lib/kube/proxy/resource_filters.go b/lib/kube/proxy/resource_filters.go index 735aab9648f9d..16d8a6076c2d4 100644 --- a/lib/kube/proxy/resource_filters.go +++ b/lib/kube/proxy/resource_filters.go @@ -20,12 +20,13 @@ package proxy import ( "bytes" + "context" "io" + "log/slog" "mime" "net/http" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" certificatesv1 "k8s.io/api/certificates/v1" @@ -50,7 +51,7 @@ import ( // - deniedResources: excluded if (namespace,name) matches an entry even if it matches // the allowedResources's list. // - allowedResources: excluded if (namespace,name) not match a single entry. -func newResourceFilterer(kind, verb string, codecs *serializer.CodecFactory, allowedResources, deniedResources []types.KubernetesResource, log logrus.FieldLogger) responsewriters.FilterWrapper { +func newResourceFilterer(kind, verb string, codecs *serializer.CodecFactory, allowedResources, deniedResources []types.KubernetesResource, log *slog.Logger) responsewriters.FilterWrapper { // If the list of allowed resources contains a wildcard and no deniedResources, then we // don't need to filter anything. if containsWildcard(allowedResources) && len(deniedResources) == 0 { @@ -113,7 +114,7 @@ type resourceFilterer struct { // deniedResources is the list of kubernetes resources the user must not access. deniedResources []types.KubernetesResource // log is the logger. - log logrus.FieldLogger + log *slog.Logger // kind is the type of the resource. kind string // verb is the kube API verb based on HTTP verb. @@ -176,6 +177,8 @@ func pointerArrayToArray[T any](arr []*T) []T { // with the object. // The isListObj boolean returned indicates if the object is a list of resources. func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList bool, err error) { + ctx := context.Background() + switch o := obj.(type) { case *metav1.Status: // Status object is returned when the Kubernetes API returns an error and @@ -184,7 +187,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.Pod: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -198,7 +201,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.Secret: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -212,7 +215,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.ConfigMap: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -226,7 +229,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.Namespace: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -240,7 +243,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.Service: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -254,7 +257,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.Endpoints: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -268,7 +271,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.ServiceAccount: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -282,7 +285,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.Node: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -296,7 +299,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.PersistentVolume: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -310,7 +313,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *corev1.PersistentVolumeClaim: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -325,7 +328,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *appsv1.Deployment: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -340,7 +343,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *appsv1.ReplicaSet: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -354,7 +357,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *appsv1.StatefulSet: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -369,7 +372,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *appsv1.DaemonSet: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -383,7 +386,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *authv1.ClusterRole: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -397,7 +400,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *authv1.Role: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -412,7 +415,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *authv1.ClusterRoleBinding: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -427,7 +430,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *authv1.RoleBinding: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -442,7 +445,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *batchv1.CronJob: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -457,7 +460,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *batchv1.Job: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -472,7 +475,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *certificatesv1.CertificateSigningRequest: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -486,7 +489,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *networkingv1.Ingress: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -500,7 +503,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *extensionsv1beta1.Ingress: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -515,7 +518,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *extensionsv1beta1.DaemonSet: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -530,7 +533,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *extensionsv1beta1.Deployment: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -545,7 +548,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList case *extensionsv1beta1.ReplicaSet: result, err := filterResource(d.kind, d.verb, o, d.allowedResources, d.deniedResources) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -569,7 +572,7 @@ func (d *resourceFilterer) FilterObj(obj runtime.Object) (isAllowed bool, isList d.allowedResources, d.deniedResources, ) if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expressions within kubernetes_resources.") + d.log.WarnContext(ctx, "Unable to compile regex expressions within kubernetes_resources", "error", err) } // if err is not nil or result is false, we should not include it. return result, false, nil @@ -632,13 +635,13 @@ func (d *resourceFilterer) encode(obj runtime.Object, w io.Writer) error { } // filterResourceList excludes resources the user should not have access to. -func filterResourceList[T kubeObjectInterface](kind, verb string, originalList []T, allowed, denied []types.KubernetesResource, log logrus.FieldLogger) []T { +func filterResourceList[T kubeObjectInterface](kind, verb string, originalList []T, allowed, denied []types.KubernetesResource, log *slog.Logger) []T { filteredList := make([]T, 0, len(originalList)) for _, resource := range originalList { if result, err := filterResource(kind, verb, resource, allowed, denied); err == nil && result { filteredList = append(filteredList, resource) } else if err != nil { - log.WithError(err).Warnf("Unable to compile regex expressions within kubernetes_resources.") + slog.WarnContext(context.Background(), "Unable to compile regex expressions within kubernetes_resources", "error", err) } } return filteredList @@ -686,7 +689,7 @@ func (d *resourceFilterer) filterMetaV1Table(table *metav1.Table, allowedResourc if result, err := matchKubernetesResource(resource, allowedResources, deniedResources); err == nil && result { resources = append(resources, *row) } else if err != nil { - d.log.WithError(err).Warn("Unable to compile regex expression.") + d.log.WarnContext(context.Background(), "Unable to compile regex expression", "error", err) } } table.Rows = resources @@ -799,7 +802,7 @@ func filterBuffer(filterWrapper responsewriters.FilterWrapper, src *responsewrit // filterUnstructuredList filters the unstructured list object to exclude resources // that the user must not have access to. // The filtered list is re-assigned to `obj.Object["items"]`. -func filterUnstructuredList(verb string, obj *unstructured.Unstructured, allowed, denied []types.KubernetesResource, log logrus.FieldLogger) (hasElems bool) { +func filterUnstructuredList(verb string, obj *unstructured.Unstructured, allowed, denied []types.KubernetesResource, log *slog.Logger) (hasElems bool) { const ( itemsKey = "items" ) @@ -809,7 +812,7 @@ func filterUnstructuredList(verb string, obj *unstructured.Unstructured, allowed objList, err := obj.ToList() if err != nil { // This should never happen, but if it does, we should log it. - log.WithError(err).Warnf("Unable to convert unstructured object to list.") + slog.WarnContext(context.Background(), "Unable to convert unstructured object to list", "error", err) return false } @@ -822,7 +825,7 @@ func filterUnstructuredList(verb string, obj *unstructured.Unstructured, allowed ); result { filteredList = append(filteredList, resource.Object) } else if err != nil { - log.WithError(err).Warnf("Unable to compile regex expressions within kubernetes_resources.") + slog.WarnContext(context.Background(), "Unable to compile regex expressions within kubernetes_resources", "error", err) } } obj.Object[itemsKey] = filteredList diff --git a/lib/kube/proxy/resource_filters_test.go b/lib/kube/proxy/resource_filters_test.go index 3d49563712b81..aecfae1f3e154 100644 --- a/lib/kube/proxy/resource_filters_test.go +++ b/lib/kube/proxy/resource_filters_test.go @@ -30,7 +30,6 @@ import ( "text/template" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" @@ -42,10 +41,10 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/kube/proxy/responsewriters" + "github.com/gravitational/teleport/lib/utils" ) func Test_filterBuffer(t *testing.T) { - log := logrus.New() type objectAndAPI struct { obj string api string @@ -175,7 +174,7 @@ func Test_filterBuffer(t *testing.T) { buf, decompress := newMemoryResponseWriter(t, data.Bytes(), tt.args.contentEncoding) - err = filterBuffer(newResourceFilterer(r, types.KubeVerbList, &globalKubeCodecs, allowedResources, nil, log), buf) + err = filterBuffer(newResourceFilterer(r, types.KubeVerbList, &globalKubeCodecs, allowedResources, nil, utils.NewSlogLoggerForTests()), buf) require.NoError(t, err) // Decompress the buffer to compare the result. diff --git a/lib/kube/proxy/resource_list.go b/lib/kube/proxy/resource_list.go index d0401a600fe5d..f892c91bb8eb5 100644 --- a/lib/kube/proxy/resource_list.go +++ b/lib/kube/proxy/resource_list.go @@ -237,7 +237,7 @@ func (f *Forwarder) sendEphemeralContainerEvents(done <-chan struct{}, req *http podName, ) if err != nil { - f.log.WithError(err).Warn("error getting user ephemeral containers") + f.log.WarnContext(req.Context(), "error getting user ephemeral containers", "error", err) return } @@ -247,7 +247,7 @@ func (f *Forwarder) sendEphemeralContainerEvents(done <-chan struct{}, req *http } evt, err := f.getPatchedPodEvent(req.Context(), sess, wc) if err != nil { - f.log.WithError(err).Warn("error pushing pod event") + f.log.WarnContext(req.Context(), "error pushing pod event", "error", err) continue } sentDebugContainers[wc.Spec.ContainerName] = struct{}{} diff --git a/lib/kube/proxy/resource_rbac_test.go b/lib/kube/proxy/resource_rbac_test.go index 9ee1f0b931824..faebea646681c 100644 --- a/lib/kube/proxy/resource_rbac_test.go +++ b/lib/kube/proxy/resource_rbac_test.go @@ -32,7 +32,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" @@ -51,6 +50,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/kube/proxy/responsewriters" testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" + "github.com/gravitational/teleport/lib/utils" ) func TestListPodRBAC(t *testing.T) { @@ -518,8 +518,6 @@ func TestListPodRBAC(t *testing.T) { func TestWatcherResponseWriter(t *testing.T) { defaultNamespace := "default" devNamespace := "dev" - log := logrus.New() - log.SetLevel(logrus.DebugLevel) t.Parallel() statusErr := &metav1.Status{ TypeMeta: metav1.TypeMeta{ @@ -633,7 +631,7 @@ func TestWatcherResponseWriter(t *testing.T) { t.Run(tt.name, func(t *testing.T) { userReader, userWriter := io.Pipe() negotiator := newClientNegotiator(&globalKubeCodecs) - filterWrapper := newResourceFilterer(types.KindKubePod, types.KubeVerbWatch, &globalKubeCodecs, tt.args.allowed, tt.args.denied, log) + filterWrapper := newResourceFilterer(types.KindKubePod, types.KubeVerbWatch, &globalKubeCodecs, tt.args.allowed, tt.args.denied, utils.NewSlogLoggerForTests()) // watcher parses the data written into itself and if the user is allowed to // receive the update, it writes the event into target. watcher, err := responsewriters.NewWatcherResponseWriter(newFakeResponseWriter(userWriter) /*target*/, negotiator, filterWrapper) diff --git a/lib/kube/proxy/response_rewriter.go b/lib/kube/proxy/response_rewriter.go index 7fccfe0eb5132..1060762c16b87 100644 --- a/lib/kube/proxy/response_rewriter.go +++ b/lib/kube/proxy/response_rewriter.go @@ -87,7 +87,7 @@ func (f *Forwarder) rewriteResponseForbidden(s *clusterSession) func(r *http.Res newClientNegotiator(&globalKubeCodecs), ) if err != nil { - f.log.WithError(err).Error("Failed to create encoder") + f.log.ErrorContext(r.Request.Context(), "Failed to create encoder", "error", err) return nil } @@ -107,7 +107,7 @@ func (f *Forwarder) rewriteResponseForbidden(s *clusterSession) func(r *http.Res // Encode the new response. if err = encoder.Encode(status, b); err != nil { - f.log.WithError(err).Error("Failed to encode response") + f.log.ErrorContext(r.Request.Context(), "Failed to encode response", "error", err) return trace.Wrap(err) } diff --git a/lib/kube/proxy/roundtrip.go b/lib/kube/proxy/roundtrip.go index 3630f3e898dd7..7fb1bf9c8517a 100644 --- a/lib/kube/proxy/roundtrip.go +++ b/lib/kube/proxy/roundtrip.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io" + "log/slog" "net" "net/http" "net/url" @@ -31,7 +32,6 @@ import ( "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -90,7 +90,7 @@ type roundTripperConfig struct { // headers instead of relying on the certificate to transport it. useIdentityForwarding bool // log specifies the logger. - log log.FieldLogger + log *slog.Logger proxier func(*http.Request) (*url.URL, error) } diff --git a/lib/kube/proxy/scheme.go b/lib/kube/proxy/scheme.go index 85d80739ada50..0d88a0fdaef9c 100644 --- a/lib/kube/proxy/scheme.go +++ b/lib/kube/proxy/scheme.go @@ -19,11 +19,12 @@ package proxy import ( + "context" "errors" + "log/slog" "strings" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "golang.org/x/exp/maps" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -113,7 +114,7 @@ type gvkSupportedResources map[gvkSupportedResourcesKey]*schema.GroupVersionKind // This schema includes all well-known Kubernetes types and all namespaced // custom resources. // It also returns a map of resources that we support RBAC restrictions for. -func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface) (*serializer.CodecFactory, rbacSupportedResources, gvkSupportedResources, error) { +func newClusterSchemaBuilder(log *slog.Logger, client kubernetes.Interface) (*serializer.CodecFactory, rbacSupportedResources, gvkSupportedResources, error) { kubeScheme := runtime.NewScheme() kubeCodecs := serializer.NewCodecFactory(kubeScheme) supportedResources := maps.Clone(defaultRBACResources) @@ -135,7 +136,10 @@ func newClusterSchemaBuilder(log logrus.FieldLogger, client kubernetes.Interface // reachable. // In this case, we still want to register the other resources that are // available in the cluster. - log.WithError(err).Debugf("Failed to discover some API groups: %v", maps.Keys(discoveryErr.Groups)) + log.DebugContext(context.Background(), "Failed to discover some API groups", + "groups", maps.Keys(discoveryErr.Groups), + "error", err, + ) case err != nil: return nil, nil, nil, trace.Wrap(err) } diff --git a/lib/kube/proxy/scheme_test.go b/lib/kube/proxy/scheme_test.go index d13dbbc94df8f..ae7075db251f4 100644 --- a/lib/kube/proxy/scheme_test.go +++ b/lib/kube/proxy/scheme_test.go @@ -21,17 +21,18 @@ package proxy import ( "testing" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" + + "github.com/gravitational/teleport/lib/utils" ) // TestNewClusterSchemaBuilder tests that newClusterSchemaBuilder doesn't panic // when it's given types already registered in the global scheme. func Test_newClusterSchemaBuilder(t *testing.T) { - _, _, _, err := newClusterSchemaBuilder(logrus.StandardLogger(), &clientSet{}) + _, _, _, err := newClusterSchemaBuilder(utils.NewSlogLoggerForTests(), &clientSet{}) require.NoError(t, err) } diff --git a/lib/kube/proxy/self_subject_reviews.go b/lib/kube/proxy/self_subject_reviews.go index 2130cfdaed034..25fb264a0db38 100644 --- a/lib/kube/proxy/self_subject_reviews.go +++ b/lib/kube/proxy/self_subject_reviews.go @@ -63,7 +63,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon if err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to create cluster session: %v.", err) + f.log.ErrorContext(req.Context(), "Failed to create cluster session", "error", err) return nil, trace.Wrap(err) } // sess.Close cancels the connection monitor context to release it sooner. @@ -91,7 +91,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon if err := f.setupForwardingHeaders(sess, req, true /* withImpersonationHeaders */); err != nil { // This error goes to kubernetes client and is not visible in the logs // of the teleport server if not logged here. - f.log.Errorf("Failed to set up forwarding headers: %v.", err) + f.log.ErrorContext(req.Context(), "Failed to set up forwarding headers", "error", err) return nil, trace.Wrap(err) } rw := httplib.NewResponseStatusRecorder(w) diff --git a/lib/kube/proxy/server.go b/lib/kube/proxy/server.go index 6f05f2a13a22f..6ac466746b51f 100644 --- a/lib/kube/proxy/server.go +++ b/lib/kube/proxy/server.go @@ -29,7 +29,6 @@ import ( "time" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "golang.org/x/net/http2" "github.com/gravitational/teleport" @@ -68,7 +67,7 @@ type TLSServerConfig struct { // ConnectedProxyGetter gets the proxies teleport is connected to. ConnectedProxyGetter *reversetunnel.ConnectedProxyGetter // Log is the logger. - Log logrus.FieldLogger + Log *slog.Logger // Selectors is a list of resource monitor selectors. ResourceMatchers []services.ResourceMatcher // OnReconcile is called after each kube_cluster resource reconciliation. @@ -134,7 +133,7 @@ func (c *TLSServerConfig) CheckAndSetDefaults() error { } if c.Log == nil { - c.Log = logrus.New() + c.Log = slog.Default() } if c.CloudClients == nil { cloudClients, err := cloud.NewClients() @@ -180,7 +179,7 @@ type TLSServer struct { monitoredKubeClusters monitoredKubeClusters // reconcileCh triggers reconciliation of proxied kube_clusters. reconcileCh chan struct{} - log *logrus.Entry + log *slog.Logger } // NewTLSServer returns new unstarted TLS server @@ -188,9 +187,7 @@ func NewTLSServer(cfg TLSServerConfig) (*TLSServer, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - log := cfg.Log.WithFields(logrus.Fields{ - teleport.ComponentKey: cfg.Component, - }) + log := cfg.Log.With(teleport.ComponentKey, cfg.Component) // limiter limits requests by frequency and amount of simultaneous // connections per client limiter, err := limiter.NewLimiter(cfg.LimiterConfig) @@ -422,8 +419,7 @@ func (t *TLSServer) close(ctx context.Context) error { // and server's GetConfigForClient reloads the list of trusted // local and remote certificate authorities func (t *TLSServer) GetConfigForClient(info *tls.ClientHelloInfo) (*tls.Config, error) { - // TODO(tross): remove slog.Default once the TLSServer is updated to use a slog.Logger - return authclient.WithClusterCAs(t.TLS, t.AccessPoint, t.ClusterName, slog.Default())(info) + return authclient.WithClusterCAs(t.TLS, t.AccessPoint, t.ClusterName, t.log)(info) } // GetServerInfo returns a services.Server object for heartbeats (aka @@ -523,7 +519,7 @@ func (t *TLSServer) startHeartbeat(name string) error { func (t *TLSServer) getRotationState() types.Rotation { rotation, err := t.TLSServerConfig.GetRotation(types.RoleKube) if err != nil && !trace.IsNotFound(err) { - t.log.WithError(err).Warn("Failed to get rotation state.") + t.log.WarnContext(t.closeContext, "Failed to get rotation state", "error", err) } if rotation != nil { return *rotation @@ -539,14 +535,14 @@ func (t *TLSServer) startStaticClustersHeartbeat() error { // proxy_service will pretend to also be kube_server. if t.KubeServiceType == KubeService || t.KubeServiceType == LegacyProxyService { - t.log.Debugf("Starting kubernetes_service heartbeats for %q", t.Component) + t.log.DebugContext(t.closeContext, "Starting kubernetes_service heartbeats") for _, cluster := range t.fwd.kubeClusters() { if err := t.startHeartbeat(cluster.GetName()); err != nil { return trace.Wrap(err) } } } else { - t.log.Debug("No local kube credentials on proxy, will not start kubernetes_service heartbeats") + t.log.DebugContext(t.closeContext, "No local kube credentials on proxy, will not start kubernetes_service heartbeats") } return nil diff --git a/lib/kube/proxy/server_test.go b/lib/kube/proxy/server_test.go index 091f33599c96f..a5a28823080ed 100644 --- a/lib/kube/proxy/server_test.go +++ b/lib/kube/proxy/server_test.go @@ -35,7 +35,6 @@ import ( "time" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "github.com/gravitational/teleport/api/client/proto" @@ -45,6 +44,7 @@ import ( testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" "github.com/gravitational/teleport/lib/reversetunnel" "github.com/gravitational/teleport/lib/tlsca" + "github.com/gravitational/teleport/lib/utils" ) func TestServeConfigureError(t *testing.T) { @@ -111,10 +111,9 @@ func TestMTLSClientCAs(t *testing.T) { } hostCert := genCert(t, "localhost", "localhost", "127.0.0.1", "::1") userCert := genCert(t, "user") - log := logrus.New() srv := &TLSServer{ TLSServerConfig: TLSServerConfig{ - Log: log, + Log: utils.NewSlogLoggerForTests(), ForwarderConfig: ForwarderConfig{ ClusterName: mainClusterName, }, @@ -125,7 +124,7 @@ func TestMTLSClientCAs(t *testing.T) { }, GetRotation: func(role types.SystemRole) (*types.Rotation, error) { return &types.Rotation{}, nil }, }, - log: logrus.NewEntry(log), + log: utils.NewSlogLoggerForTests(), } lis, err := net.Listen("tcp", "localhost:0") @@ -207,7 +206,7 @@ func TestGetServerInfo(t *testing.T) { srv := &TLSServer{ TLSServerConfig: TLSServerConfig{ - Log: logrus.New(), + Log: utils.NewSlogLoggerForTests(), ForwarderConfig: ForwarderConfig{ Clock: clockwork.NewFakeClock(), ClusterName: "kube-cluster", diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index bbf2f308a1a25..f017b59dbd851 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "io" + "log/slog" "net/http" "path" "reflect" @@ -33,7 +34,6 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" "github.com/julienschmidt/httprouter" - log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -55,6 +55,7 @@ import ( tsession "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/srv" "github.com/gravitational/teleport/lib/utils" + logutils "github.com/gravitational/teleport/lib/utils/log" ) const sessionRecorderID = "session-recorder" @@ -378,7 +379,7 @@ type session struct { // This is used for audit trails. partiesHistorical map[uuid.UUID]*party - log *log.Entry + log *slog.Logger io *srv.TermManager @@ -440,8 +441,8 @@ type session struct { // newSession creates a new session in pending mode. func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params httprouter.Params, initiator *party, sess *clusterSession) (*session, error) { id := uuid.New() - log := forwarder.log.WithField("session", id.String()) - log.Debug("Creating session") + log := forwarder.log.With("session", id.String()) + log.DebugContext(req.Context(), "Creating session") var policySets []*types.SessionTrackerPolicySet roles := ctx.Checker.Roles() @@ -502,7 +503,7 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params if _, open := <-s.io.TerminateNotifier(); open { err := s.Close() if err != nil { - s.log.Errorf("Failed to close session: %v.", err) + s.log.ErrorContext(req.Context(), "Failed to close session", "error", err) } } }() @@ -518,24 +519,33 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params // It is used to properly handle client disconnections. func (s *session) disconnectPartyOnErr(idString string, err error) { if idString == sessionRecorderID { - s.log.Error("Failed to write to session recorder, closing session.") + s.log.ErrorContext(s.sess.connCtx, "Failed to write to session recorder, closing session") s.Close() return } id, uuidParseErr := uuid.Parse(idString) if uuidParseErr != nil { - s.log.WithError(uuidParseErr).Errorf("Unable to decode %q into a UUID.", idString) + s.log.ErrorContext(s.sess.connCtx, "Unable to decode party id", + "party_id", idString, + "error", uuidParseErr, + ) return } wasActive, leaveErr := s.leave(id) if leaveErr != nil { - s.log.WithError(leaveErr).Errorf("Failed to disconnect party %v from the session.", idString) + s.log.ErrorContext(s.sess.connCtx, "Failed to disconnect party from the session", + "party_id", idString, + "error", leaveErr, + ) } if wasActive { // log the error only if it was the reason for the user disconnection. - s.log.Errorf("Encountered error: %v with party %v. Disconnecting them from the session.", err, idString) + s.log.ErrorContext(s.sess.connCtx, "Encountered error with party, disconnecting them from the session", + "error", err, + "party_id", idString, + ) } } @@ -551,11 +561,14 @@ func (s *session) checkPresence() error { } if participant.Mode == string(types.SessionModeratorMode) && time.Now().UTC().After(participant.LastActive.Add(PresenceMaxDifference)) { - s.log.Debugf("Participant %v is not active, kicking.", participant.ID) + s.log.DebugContext(s.sess.connCtx, "Participant is not active, kicking", "participant_id", participant.ID) id, _ := uuid.Parse(participant.ID) _, err := s.unlockedLeave(id) if err != nil { - s.log.WithError(err).Warnf("Failed to kick participant %v for inactivity.", participant.ID) + s.log.WarnContext(s.sess.connCtx, "Failed to kick participant for inactivity", + "participant_id", participant.ID, + "error", err, + ) } } } @@ -569,11 +582,14 @@ func (s *session) launch(ephemeralContainerStatus *corev1.ContainerStatus) (retu defer func() { err := s.Close() if err != nil { - s.log.WithError(err).Errorf("Failed to close session: %v", s.id) + s.log.ErrorContext(s.req.Context(), "Failed to close session", + "session_id", s.id, + "error", err, + ) } }() - s.log.Debugf("Launching session: %v", s.id) + s.log.DebugContext(s.req.Context(), "Launching session", "session_id", s.id) q := s.req.URL.Query() namespace := s.params.ByName("podNamespace") @@ -603,7 +619,7 @@ func (s *session) launch(ephemeralContainerStatus *corev1.ContainerStatus) (retu if returnErr != nil { s.setTerminationErr(returnErr) s.reportErrorToSessionRecorder(returnErr) - s.log.WithError(returnErr).Warning("Executor failed while streaming.") + s.log.WarnContext(s.req.Context(), "Executor failed while streaming", "error", returnErr) } // call onFinished to emit the session.end and exec events. // onFinished is never nil. @@ -640,13 +656,13 @@ func (s *session) launch(ephemeralContainerStatus *corev1.ContainerStatus) (retu }) if err == nil { if err := s.recorder.RecordEvent(s.forwarder.ctx, sessionStartEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to record session start event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to record session start event", "error", err) } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionStartEvent.GetAuditEvent()); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit session start event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to emit session start event", "error", err) } } else { - s.forwarder.log.WithError(err).Warn("Failed to set up session start event - event will not be recorded") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to set up session start event - event will not be recorded", "error", err) } s.weakEventsWaiter.Add(1) @@ -660,21 +676,21 @@ func (s *session) launch(ephemeralContainerStatus *corev1.ContainerStatus) (retu s.BroadcastMessage("Session expired, closing...") err := s.Close() if err != nil { - s.log.WithError(err).Error("Failed to close session") + s.log.ErrorContext(s.forwarder.ctx, "Failed to close session", "error", err) } case <-s.closeC: } }() if err = s.tracker.UpdateState(s.forwarder.ctx, types.SessionState_SessionStateRunning); err != nil { - s.log.WithError(err).Warn("Failed to set tracker state to running") + s.log.WarnContext(s.forwarder.ctx, "Failed to set tracker state to running", "error", err) } var executor remotecommand.Executor executor, err = s.forwarder.getExecutor(s.sess, s.req) if err != nil { - s.log.WithError(err).Warning("Failed creating executor.") + s.log.WarnContext(s.forwarder.ctx, "Failed creating executor", "error", err) return trace.Wrap(err) } @@ -759,7 +775,10 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta } err := p.Client.resize(termSize.size) if err != nil { - s.log.WithError(err).Errorf("Failed to resize client: %v", id.String()) + s.log.ErrorContext(s.forwarder.ctx, "Failed to resize participant", + "party_id", id.String(), + "error", err, + ) } } @@ -789,10 +808,10 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta // Report the updated window size to the event log (this is so the sessions // can be replayed correctly). if err := s.recorder.RecordEvent(s.forwarder.ctx, resizeEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit terminal resize event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to emit terminal resize event", "error", err) } } else { - s.forwarder.log.WithError(err).Warn("Failed to set up terminal resize event - event will not be recorded") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to set up terminal resize event - event will not be recorded", "error", err) } } } else { @@ -844,7 +863,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit exec event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to emit exec event", "error", err) } sessionDataEvent := &apievents.SessionData{ @@ -864,7 +883,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionDataEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit session data event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to emit session data event", "error", err) } sessionEndEvent, err := s.recorder.PrepareSessionEvent(&apievents.SessionEnd{ @@ -888,13 +907,13 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta }) if err == nil { if err := s.recorder.RecordEvent(s.forwarder.ctx, sessionEndEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to record session end event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to record session end event", "error", err) } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionEndEvent.GetAuditEvent()); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit session end event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to emit session end event", "error", err) } } else { - s.forwarder.log.WithError(err).Warn("Failed to set up session end event - event will not be recorded") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to set up session end event - event will not be recorded", "error", err) } } @@ -934,9 +953,9 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta case <-ticker.C: err := s.checkPresence() if err != nil { - s.log.WithError(err).Error("Failed to check presence, closing session as a security measure") + s.log.ErrorContext(s.forwarder.ctx, "Failed to check presence, closing session as a security measure", "error", err) if err := s.Close(); err != nil { - s.log.WithError(err).Error("Failed to close session") + s.log.ErrorContext(s.forwarder.ctx, "Failed to close session", "error", err) } return } @@ -969,7 +988,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { return trace.AccessDenied("The requested session is not active") } - s.log.Debugf("Tracking participant: %s", p.ID) + s.log.DebugContext(s.forwarder.ctx, "Tracking participant", "participant_id", p.ID) participant := &types.Participant{ ID: p.ID.String(), User: p.Ctx.User.GetName(), @@ -989,7 +1008,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { recentWrites := s.io.GetRecentHistory() if _, err := p.Client.stdoutStream().Write(recentWrites); err != nil { - s.log.Warnf("Failed to write history to client: %v.", err) + s.log.WarnContext(s.forwarder.ctx, "Failed to write history to participant", "error", err) } s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode) @@ -1009,7 +1028,10 @@ func (s *session) join(p *party, emitJoinEvent bool) error { // other parties' terminals and no discrepancies are present. if lastQueueSize := s.terminalSizeQueue.getLastSize(); lastQueueSize != nil { if err := p.Client.resize(lastQueueSize); err != nil { - s.log.WithError(err).Errorf("Failed to resize client: %v", stringID) + s.log.ErrorContext(s.forwarder.ctx, "Failed to resize participant", + "participant_id", stringID, + "error", err, + ) } } @@ -1022,7 +1044,10 @@ func (s *session) join(p *party, emitJoinEvent bool) error { if p.Ctx.User.GetName() != s.ctx.User.GetName() { err := srv.MsgParticipantCtrls(p.Client.stdoutStream(), p.Mode) if err != nil { - s.log.Errorf("Could not send intro message to participant: %v", err) + s.log.ErrorContext(s.forwarder.ctx, "Could not send intro message to participant", + "error", err, + "participant_id", stringID, + ) } } @@ -1036,10 +1061,10 @@ func (s *session) join(p *party, emitJoinEvent bool) error { case <-c: s.setTerminationErr(sessionTerminatedByModeratorErr) go func() { - s.log.Debugf("Received force termination request") + s.log.DebugContext(s.forwarder.ctx, "Received force termination request") err := s.Close() if err != nil { - s.log.Errorf("Failed to close session: %v.", err) + s.log.ErrorContext(s.forwarder.ctx, "Failed to close session", "error", err) } }() case <-s.closeC: @@ -1064,11 +1089,11 @@ func (s *session) join(p *party, emitJoinEvent bool) error { // we must inform all parties that the session is closing. s.setTerminationErrUnlocked(err) s.reportErrorToSessionRecorder(err) - s.log.WithError(err).Warning("Executor failed while creating ephemeral pod.") + s.log.WarnContext(s.forwarder.ctx, "Executor failed while creating ephemeral pod", "error", err) go func() { err := s.Close() if err != nil { - s.log.WithError(err).Error("Failed to close session") + s.log.ErrorContext(s.forwarder.ctx, "Failed to close session", "error", err) } }() return trace.Wrap(err) @@ -1076,7 +1101,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { go func() { if err := s.launch(startedEphemeralCont); err != nil { - s.log.WithError(err).Warning("Failed to launch Kubernetes session.") + s.log.WarnContext(s.forwarder.ctx, "Failed to launch Kubernetes session", "error", err) } }() } else if len(s.parties) == 1 { @@ -1098,7 +1123,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { // types.SessionState_SessionStatePending marks a session that is waiting for // a moderator to rejoin. if err := s.tracker.UpdateState(s.forwarder.ctx, types.SessionState_SessionStateRunning); err != nil { - s.log.Warnf("Failed to set tracker state to %v", types.SessionState_SessionStateRunning) + s.log.WarnContext(s.forwarder.ctx, "Failed to update tracker to running state") } } return nil @@ -1141,7 +1166,7 @@ func (s *session) createEphemeralContainer() (*corev1.ContainerStatus, error) { return nil, trace.Wrap(err) } - s.log.Debugf("Creating ephemeral container %s on pod %s", container, podName) + s.log.DebugContext(s.forwarder.ctx, "Creating ephemeral container on pod", "container", container, "pod", podName) containerStatus, err := s.patchAndWaitForPodEphemeralContainer(s.forwarder.ctx, &initUser.Ctx, s.req.Header, waitingCont) return containerStatus, trace.Wrap(err) } @@ -1179,7 +1204,7 @@ func (s *session) emitSessionJoinEvent(p *party) { } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to emit event", "error", err) } } @@ -1228,10 +1253,10 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionLeaveEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit event.") + s.forwarder.log.WarnContext(s.forwarder.ctx, "Failed to emit event", "error", err) } - s.log.Debugf("No longer tracking participant: %v", party.ID) + s.log.DebugContext(s.forwarder.ctx, "No longer tracking participant", "participant_id", party.ID) err := s.tracker.RemoveParticipant(s.forwarder.ctx, party.ID.String()) if err != nil { errs = append(errs, trace.Wrap(err)) @@ -1246,7 +1271,7 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { // close session err := s.Close() if err != nil { - s.log.WithError(err).Errorf("Failed to close session") + s.log.ErrorContext(s.forwarder.ctx, "Failed to close session", "error", err) } }() return true, trace.NewAggregate(errs...) @@ -1267,7 +1292,7 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { if options.OnLeaveAction == types.OnSessionLeaveTerminate { go func() { if err := s.Close(); err != nil { - s.log.WithError(err).Errorf("Failed to close session") + s.log.ErrorContext(s.forwarder.ctx, "Failed to close session", "error", err) } }() return true, nil @@ -1277,7 +1302,7 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { s.io.Off() s.BroadcastMessage("Session paused, Waiting for required participants...") if err := s.tracker.UpdateState(s.forwarder.ctx, types.SessionState_SessionStatePending); err != nil { - s.log.Warnf("Failed to set tracker state to %v", types.SessionState_SessionStatePending) + s.log.WarnContext(s.forwarder.ctx, "Failed to set tracker state to pending") } go func() { @@ -1335,7 +1360,7 @@ func (s *session) Close() error { // Once tracker is closed parties cannot join the session. // check session.join for logic. if err := s.tracker.Close(s.forwarder.ctx); err != nil { - s.log.WithError(err).Debug("Failed to close session tracker") + s.log.DebugContext(s.forwarder.ctx, "Failed to close session tracker", "error", err) } s.mu.Lock() terminationErr := s.terminationErr @@ -1345,7 +1370,7 @@ func (s *session) Close() error { } recorder := s.recorder s.mu.Unlock() - s.log.Debugf("Closing session %v.", s.id.String()) + s.log.DebugContext(s.forwarder.ctx, "Closing session", "session_id", logutils.StringerAttr(s.id)) close(s.closeC) // Wait until every party leaves the session and emits the session leave // event before closing the recorder - if available. @@ -1405,18 +1430,18 @@ func (s *session) trackSession(p *party, policySet []*types.SessionTrackerPolicy InitialCommand: command, } - s.log.Debug("Creating session tracker") + s.log.DebugContext(ctx, "Creating session tracker") sessionTrackerService := s.forwarder.cfg.AuthClient tracker, err := srv.NewSessionTracker(ctx, trackerSpec, sessionTrackerService) switch { // there was an error creating the tracker for a moderated session - terminate the session case err != nil && s.accessEvaluator.IsModerated(): - s.log.WithError(err).Warn("Failed to create session tracker, unable to proceed for moderated session") + s.log.WarnContext(ctx, "Failed to create session tracker, unable to proceed for moderated session", "error", err) return trace.Wrap(err) // there was an error creating the tracker for a non-moderated session - permit the session with a local tracker case err != nil && !s.accessEvaluator.IsModerated(): - s.log.Warn("Failed to create session tracker, proceeding with local session tracker for non-moderated session") + s.log.WarnContext(ctx, "Failed to create session tracker, proceeding with local session tracker for non-moderated session") localTracker, err := srv.NewSessionTracker(ctx, trackerSpec, nil) // this error means there are problems with the trackerSpec, we need to return it @@ -1435,7 +1460,7 @@ func (s *session) trackSession(p *party, policySet []*types.SessionTrackerPolicy go func() { if err := s.tracker.UpdateExpirationLoop(s.forwarder.ctx, s.forwarder.cfg.Clock); err != nil { - s.log.WithError(err).Warn("Failed to update session tracker expiration") + s.log.WarnContext(ctx, "Failed to update session tracker expiration", "error", err) } }() @@ -1549,7 +1574,7 @@ func (s *session) retrieveAlreadyStoppedPodLogs(namespace, podName, container st func (s *session) retrieveEphemeralContainerCommand(ctx context.Context, username, containerName string) []string { containers, err := s.forwarder.getUserEphemeralContainersForPod(ctx, username, s.ctx.kubeClusterName, s.podNamespace, s.podName) if err != nil { - s.log.WithError(err).Warn("Failed to retrieve ephemeral containers") + s.log.WarnContext(ctx, "Failed to retrieve ephemeral containers", "error", err) return nil } if len(containers) == 0 { @@ -1569,7 +1594,7 @@ func (s *session) retrieveEphemeralContainerCommand(ctx context.Context, usernam newClientNegotiator(s.sess.codecFactory), ) if err != nil { - s.log.WithError(err).Warn("Failed to create encoder and decoder") + s.log.WarnContext(ctx, "Failed to create encoder and decoder", "error", err) return nil } pod, _, err := s.forwarder.mergeEphemeralPatchWithCurrentPod( @@ -1585,7 +1610,7 @@ func (s *session) retrieveEphemeralContainerCommand(ctx context.Context, usernam }, ) if err != nil { - s.log.WithError(err).Warn("Failed to merge ephemeral patch with current pod") + s.log.WarnContext(ctx, "Failed to merge ephemeral patch with current pod", "error", err) return nil } for _, ephemeral := range pod.Spec.EphemeralContainers { diff --git a/lib/kube/proxy/sess_test.go b/lib/kube/proxy/sess_test.go index 469fe2c4df27a..2c87ec555845d 100644 --- a/lib/kube/proxy/sess_test.go +++ b/lib/kube/proxy/sess_test.go @@ -34,13 +34,11 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/remotecommand" - "github.com/gravitational/teleport" kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" @@ -49,6 +47,7 @@ import ( "github.com/gravitational/teleport/lib/authz" "github.com/gravitational/teleport/lib/events" testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" + "github.com/gravitational/teleport/lib/utils" ) func TestSessionEndError(t *testing.T) { @@ -284,7 +283,7 @@ func Test_session_trackSession(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sess := &session{ - log: logrus.New().WithField(teleport.ComponentKey, "test"), + log: utils.NewSlogLoggerForTests(), id: uuid.New(), req: &http.Request{ URL: &url.URL{ diff --git a/lib/kube/proxy/streamproto/proto.go b/lib/kube/proxy/streamproto/proto.go index e2ee646dfe2b7..605f8e00748c4 100644 --- a/lib/kube/proxy/streamproto/proto.go +++ b/lib/kube/proxy/streamproto/proto.go @@ -19,16 +19,17 @@ package streamproto import ( + "context" "errors" "fmt" "io" + "log/slog" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "k8s.io/client-go/tools/remotecommand" "github.com/gravitational/teleport/api/types" @@ -170,7 +171,7 @@ func (s *SessionStream) readTask() { ty, data, err := s.conn.ReadMessage() if err != nil { if !errors.Is(err, io.EOF) && !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) { - log.WithError(err).Warn("Failed to read message from websocket") + slog.WarnContext(context.Background(), "Failed to read message from websocket", "error", err) } var closeErr *websocket.CloseError @@ -293,7 +294,7 @@ func (s *SessionStream) Close() error { if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { err := s.conn.WriteMessage(websocket.CloseMessage, []byte{}) if err != nil { - log.Warnf("Failed to gracefully close websocket connection: %v", err) + slog.WarnContext(context.Background(), "Failed to gracefully close websocket connection", "error", err) } t := time.NewTimer(time.Second * 5) defer t.Stop() diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index 332d7327c6d69..0bdc27eeb4d67 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "io" + "log/slog" "net/http" "net/http/httptest" "strings" @@ -36,7 +37,6 @@ import ( gwebsocket "github.com/gorilla/websocket" "github.com/gravitational/trace" "github.com/julienschmidt/httprouter" - log "github.com/sirupsen/logrus" "golang.org/x/net/http2" v1 "k8s.io/api/authorization/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -151,7 +151,7 @@ type KubeUpgradeRequests struct { type KubeMockServer struct { router *httprouter.Router - log *log.Entry + log *slog.Logger server *httptest.Server TLS *tls.Config URL string @@ -178,7 +178,7 @@ type KubeMockServer struct { func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) { s := &KubeMockServer{ router: httprouter.New(), - log: log.NewEntry(log.New()), + log: slog.Default(), deletedResources: make(map[deletedResource][]string), version: &apimachineryversion.Info{ Major: "1", @@ -273,7 +273,7 @@ func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr erro status = status.DeepCopy() data, err := runtime.Encode(kubeCodecs.LegacyCodec(), status) if err != nil { - s.log.Warningf("Failed encoding error into kube Status object: %v", err) + s.log.WarnContext(context.Background(), "Failed encoding error into kube Status object", "error", err) trace.WriteError(rw, respErr) return } @@ -283,7 +283,7 @@ func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr erro // embedded. rw.WriteHeader(int(status.Code)) if _, err := rw.Write(data); err != nil { - s.log.Warningf("Failed writing kube error response body: %v", err) + s.log.WarnContext(context.Background(), "Failed writing kube error response body", "error", err) } } @@ -323,13 +323,13 @@ func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httpro if request.stdout { if _, err := proxy.stdoutStream.Write([]byte(request.containerName + "\n")); err != nil { - s.log.WithError(err).Errorf("unable to send to stdout") + s.log.ErrorContext(request.context, "unable to send to stdout", "error", err) } } if request.stderr { if _, err := proxy.stderrStream.Write([]byte(request.containerName + "\n")); err != nil { - s.log.WithError(err).Errorf("unable to send to stderr") + s.log.ErrorContext(request.context, "unable to send to stderr", "error", err) } } @@ -341,7 +341,7 @@ func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httpro if errors.Is(err, io.EOF) && n == 0 { break } else if err != nil && n == 0 { - s.log.WithError(err).Errorf("unable to receive from stdin") + s.log.ErrorContext(request.context, "unable to receive from stdin", "error", err) break } @@ -359,13 +359,13 @@ func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httpro if request.stdout { if _, err := proxy.stdoutStream.Write(buffer); err != nil { - s.log.WithError(err).Errorf("unable to send to stdout") + s.log.ErrorContext(request.context, "unable to send to stdout", "error", err) } } if request.stderr { if _, err := proxy.stderrStream.Write(buffer); err != nil { - s.log.WithError(err).Errorf("unable to send to stdout") + s.log.ErrorContext(request.context, "unable to send to stdout", "error", err) } } @@ -536,10 +536,10 @@ func createSPDYStreams(req remoteCommandRequest) (*remoteCommandProxy, error) { var handler protocolHandler switch protocol { case "": - log.Warningf("Client did not request protocol negotiation.") + slog.WarnContext(req.context, "Client did not request protocol negotiation.") fallthrough case StreamProtocolV4Name: - log.Infof("Negotiated protocol %v.", protocol) + slog.InfoContext(req.context, "Negotiated protocol", "protocol", protocol) handler = &v4ProtocolHandler{} default: return nil, trace.BadParameter("protocol %v is not supported. upgrade the client", protocol) @@ -641,7 +641,7 @@ func (t *termQueue) handleResizeEvents(stream io.Reader) { size := remotecommand.TerminalSize{} if err := decoder.Decode(&size); err != nil { if !errors.Is(err, io.EOF) { - log.Warningf("Failed to decode resize event: %v", err) + slog.WarnContext(t.done, "Failed to decode resize event", "error", err) } t.cancel() return @@ -696,7 +696,7 @@ WaitForStreams: remoteProxy.resizeStream = stream go waitStreamReply(stopCtx, stream.replySent, replyChan) default: - log.Warningf("Ignoring unexpected stream type: %q", streamType) + slog.WarnContext(stopCtx, "Ignoring unexpected stream type", "stream_type", streamType) } case <-replyChan: receivedStreams++ diff --git a/lib/kube/proxy/utils_testing.go b/lib/kube/proxy/utils_testing.go index 462638df203c4..4f3f596ec0a82 100644 --- a/lib/kube/proxy/utils_testing.go +++ b/lib/kube/proxy/utils_testing.go @@ -35,7 +35,6 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/client-go/kubernetes" @@ -66,6 +65,7 @@ import ( "github.com/gravitational/teleport/lib/services" sessPkg "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/tlsca" + "github.com/gravitational/teleport/lib/utils" ) type TestContext struct { @@ -226,8 +226,6 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo require.NoError(t, err) testCtx.kubeProxyListener, err = net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - log := logrus.New() - log.SetLevel(logrus.DebugLevel) inventoryHandle := inventory.NewDownstreamHandle(client.InventoryControlStream, proto.UpstreamInventoryHello{ ServerID: testCtx.HostID, @@ -281,7 +279,7 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo GetRotation: func(role types.SystemRole) (*types.Rotation, error) { return &types.Rotation{}, nil }, ResourceMatchers: cfg.ResourceMatchers, OnReconcile: cfg.OnReconcile, - Log: log, + Log: utils.NewSlogLoggerForTests(), InventoryHandle: inventoryHandle, }) require.NoError(t, err) @@ -358,7 +356,7 @@ func SetupTestContext(ctx context.Context, t *testing.T, cfg TestConfig) *TestCo LimiterConfig: limiter.Config{ MaxConnections: 1000, }, - Log: log, + Log: utils.NewSlogLoggerForTests(), InventoryHandle: inventoryHandle, GetRotation: func(role types.SystemRole) (*types.Rotation, error) { return &types.Rotation{}, nil diff --git a/lib/kube/proxy/watcher.go b/lib/kube/proxy/watcher.go index 24e52e2d9c923..56bea639d5260 100644 --- a/lib/kube/proxy/watcher.go +++ b/lib/kube/proxy/watcher.go @@ -20,7 +20,6 @@ package proxy import ( "context" - "log/slog" "sync" "time" @@ -37,7 +36,7 @@ import ( // kubernetes clusters according to the up-to-date list of kube_cluster resources. func (s *TLSServer) startReconciler(ctx context.Context) (err error) { if len(s.ResourceMatchers) == 0 || s.KubeServiceType != KubeService { - s.log.Debug("Not initializing Kube Cluster resource watcher.") + s.log.DebugContext(ctx, "Not initializing Kube Cluster resource watcher") return nil } s.reconciler, err = services.NewReconciler(services.ReconcilerConfig[types.KubeCluster]{ @@ -47,8 +46,7 @@ func (s *TLSServer) startReconciler(ctx context.Context) (err error) { OnCreate: s.onCreate, OnUpdate: s.onUpdate, OnDelete: s.onDelete, - // TODO(tross): update to use the server logger once it has been converted to slog - Logger: slog.With("kind", types.KindKubernetesCluster), + Logger: s.log.With("kind", types.KindKubernetesCluster), }) if err != nil { return trace.Wrap(err) @@ -71,16 +69,16 @@ func (s *TLSServer) startReconciler(ctx context.Context) (err error) { select { case <-reconcileTicker.C: if err := s.reconciler.Reconcile(ctx); err != nil { - s.log.WithError(err).Error("Failed to reconcile.") + s.log.ErrorContext(ctx, "Failed to reconcile", "error", err) } case <-s.reconcileCh: if err := s.reconciler.Reconcile(ctx); err != nil { - s.log.WithError(err).Error("Failed to reconcile.") + s.log.ErrorContext(ctx, "Failed to reconcile", "error", err) } else if s.OnReconcile != nil { s.OnReconcile(s.fwd.kubeClusters()) } case <-ctx.Done(): - s.log.Debug("Reconciler done.") + s.log.DebugContext(ctx, "Reconciler done") return } } @@ -92,16 +90,15 @@ func (s *TLSServer) startReconciler(ctx context.Context) (err error) { // registers/unregisters the proxied Kube Cluster accordingly. func (s *TLSServer) startKubeClusterResourceWatcher(ctx context.Context) (*services.GenericWatcher[types.KubeCluster, readonly.KubeCluster], error) { if len(s.ResourceMatchers) == 0 || s.KubeServiceType != KubeService { - s.log.Debug("Not initializing Kube Cluster resource watcher.") + s.log.DebugContext(ctx, "Not initializing Kube Cluster resource watcher") return nil, nil } - s.log.Debug("Initializing Kube Cluster resource watcher.") + s.log.DebugContext(ctx, "Initializing Kube Cluster resource watcher") watcher, err := services.NewKubeClusterWatcher(ctx, services.KubeClusterWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: s.Component, - // TODO(tross): update this once converted to use slog - // Logger: s.log, - Client: s.AccessPoint, + Logger: s.log, + Client: s.AccessPoint, }, KubernetesClusterGetter: s.AccessPoint, }) @@ -120,7 +117,7 @@ func (s *TLSServer) startKubeClusterResourceWatcher(ctx context.Context) (*servi return } case <-ctx.Done(): - s.log.Debug("Kube Cluster resource watcher done.") + s.log.DebugContext(ctx, "Kube Cluster resource watcher done") return } } diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index c642da43f4669..f25b46e42da4c 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -240,7 +240,7 @@ func (process *TeleportProcess) initKubernetesService(logger *slog.Logger, conn StaticLabels: cfg.Kube.StaticLabels, DynamicLabels: dynLabels, CloudLabels: process.cloudLabels, - Log: process.log.WithField(teleport.ComponentKey, teleport.Component(teleport.ComponentKube, process.id)), + Log: process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentKube, process.id)), PROXYProtocolMode: multiplexer.PROXYProtocolOff, // Kube service doesn't need to process unsigned PROXY headers. InventoryHandle: process.inventoryHandle, }) diff --git a/lib/service/service.go b/lib/service/service.go index ebdb99b6a282b..e2155affc208c 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -5119,7 +5119,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { AccessPoint: accessPoint, GetRotation: process.GetRotation, OnHeartbeat: process.OnHeartbeat(component), - Log: process.log.WithField(teleport.ComponentKey, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), + Log: process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), IngressReporter: ingressReporter, KubernetesServersWatcher: kubeServerWatcher, PROXYProtocolMode: cfg.Proxy.PROXYProtocolMode, @@ -6748,7 +6748,7 @@ func (process *TeleportProcess) initSecureGRPCServer(cfg initSecureGRPCServerCfg kubeServer, err := kubegrpc.New(kubegrpc.Config{ AccessPoint: cfg.accessPoint, Authz: authorizer, - Log: process.log, + Log: process.logger, Emitter: cfg.emitter, KubeProxyAddr: cfg.kubeProxyAddr.String(), ClusterName: clusterName,