diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index f5cdc4a13958b..ad7d745d89452 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -791,7 +791,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) { loop: for { select { - case event := <-main.UploadEventsC: + case event := <-aux.UploadEventsC: sessionID = event.SessionID break loop case <-timeoutC: @@ -800,7 +800,7 @@ loop: } // read back the entire session and verify that it matches the stated output - capturedStream, _ := streamSession(ctx, t, main.Process.GetAuthServer(), sessionID) + capturedStream, _ := streamSession(ctx, t, aux.Process.GetAuthServer(), sessionID) require.Equal(t, sessionStream, capturedStream) // impersonating kube exec should be denied @@ -1614,7 +1614,6 @@ func waitForContainer(ctx context.Context, podClient corev1client.PodInterface, } s := getContainerStatusByName(p, containerName) - fmt.Println("test", s) if s == nil { return false, nil } diff --git a/lib/kube/proxy/ephemeral_containers.go b/lib/kube/proxy/ephemeral_containers.go index e39cb7dbca3a8..1c9ae08e417a4 100644 --- a/lib/kube/proxy/ephemeral_containers.go +++ b/lib/kube/proxy/ephemeral_containers.go @@ -104,7 +104,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri f.log.Errorf("Failed to set up forwarding headers: %v.", err) return nil, trace.Wrap(err) } - if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if !sess.isLocalKubernetesCluster { sess.forwarder.ServeHTTP(w, req) return nil, nil } diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 729ab913139ff..ca03e46b557ab 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -452,6 +452,10 @@ type authContext struct { kubeServers []types.KubeServer // apiResource holds the information about the requested API resource. apiResource apiResource + // isLocalKubernetesCluster is true if the target cluster is served by this teleport service. + // It is false if the target cluster is served by another teleport service or a different + // Teleport cluster. + isLocalKubernetesCluster bool } func (c authContext) String() string { @@ -789,7 +793,8 @@ func (f *Forwarder) setupContext( return nil, trace.NotFound("Kubernetes cluster %q not found", kubeCluster) } } - if f.isLocalKubeCluster(isRemoteCluster, kubeCluster) { + isLocalKubernetesCluster := f.isLocalKubeCluster(isRemoteCluster, kubeCluster) + if isLocalKubernetesCluster { kubeResource, apiResource, err = f.parseResourceFromRequest(req, kubeCluster) if err != nil { return nil, trace.Wrap(err) @@ -823,10 +828,11 @@ func (f *Forwarder) setupContext( remoteAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: req.RemoteAddr}, isRemote: isRemoteCluster, }, - kubeServers: kubeServers, - requestVerb: apiResource.getVerb(req), - apiResource: apiResource, - kubeResource: kubeResource, + kubeServers: kubeServers, + requestVerb: apiResource.getVerb(req), + apiResource: apiResource, + kubeResource: kubeResource, + isLocalKubernetesCluster: isLocalKubernetesCluster, }, nil } @@ -879,9 +885,11 @@ func (f *Forwarder) emitAuditEvent(req *http.Request, sess *clusterSession, stat ) defer span.End() - if sess.noAuditEvents { + // If the session is not local, don't emit the event. + if !sess.isLocalKubernetesCluster { return } + r := sess.apiResource if r.skipEvent { return @@ -1175,7 +1183,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ return nil, trace.Wrap(err) } - if !f.isLocalKubeCluster(ctx.teleportCluster.isRemote, ctx.kubeClusterName) { + if !sess.isLocalKubernetesCluster { return f.remoteJoin(ctx, w, req, p, sess) } @@ -1668,8 +1676,8 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. return upgradeRequestToRemoteCommandProxy(request, func(proxy *remoteCommandProxy) error { - if sess.noAuditEvents { - // We're forwarding this to another kubernetes_service instance, let it handle multiplexing. + if !sess.isLocalKubernetesCluster { + // We're forwarding this to another kubernetes_service instance or Teleport proxy, let it handle session recording. return f.remoteExec(req, sess, proxy) } @@ -1765,7 +1773,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req } onPortForward := func(addr string, success bool) { - if sess.noAuditEvents { + if !sess.isLocalKubernetesCluster { return } portForward := &apievents.PortForward{ @@ -2040,7 +2048,7 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h return nil, trace.Wrap(err) } - isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster := sess.isLocalKubernetesCluster isListRequest := authCtx.requestVerb == types.KubeVerbList // Watch requests can be send to a single resource or to a collection of resources. // isWatchingCollectionRequest is true when the request is a watch request and @@ -2122,7 +2130,7 @@ func isRelevantWebsocketError(err error) bool { func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { isWSSupported := false - if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if !sess.isLocalKubernetesCluster { // We're forwarding it to another Teleport kube_service, // which supports the websocket protocol. isWSSupported = true @@ -2275,10 +2283,8 @@ type clusterSession struct { // nil otherwise. kubeAPICreds kubeCreds forwarder *reverseproxy.Forwarder - // noAuditEvents is true if this teleport service should leave audit event - // logging to another service. - noAuditEvents bool - targetAddr string + // targetAddr is the address of the target cluster. + targetAddr string // kubeAddress is the address of this session's active connection (if there is one) kubeAddress string // upgradeToHTTP2 indicates whether the transport should be configured to use HTTP2. @@ -2486,11 +2492,8 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont func (f *Forwarder) newClusterSessionDirect(ctx context.Context, authCtx authContext) (*clusterSession, error) { connCtx, cancel := context.WithCancelCause(ctx) return &clusterSession{ - parent: f, - authContext: authCtx, - // This session talks to a kubernetes_service, which should handle - // audit logging. Avoid duplicate logging. - noAuditEvents: true, + parent: f, + authContext: authCtx, requestContext: ctx, connCtx: connCtx, connMonitorCancel: cancel, @@ -2514,7 +2517,7 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*reverseproxy.Fo reverseproxy.WithLogger(f.log), reverseproxy.WithErrorHandler(f.formatForwardResponseError), } - if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if sess.isLocalKubernetesCluster { // If the target cluster is local, i.e. the cluster that is served by this // teleport service, then we set up the forwarder to allow re-writing // the response to the client to include user friendly error messages. diff --git a/lib/kube/proxy/resource_deletecollection.go b/lib/kube/proxy/resource_deletecollection.go index 0ca57f53fb090..f73c5f2043426 100644 --- a/lib/kube/proxy/resource_deletecollection.go +++ b/lib/kube/proxy/resource_deletecollection.go @@ -62,7 +62,7 @@ func (f *Forwarder) deleteResourcesCollection(sess *clusterSession, w http.Respo defer span.End() req = req.WithContext(ctx) var ( - isLocalKubeCluster = f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster = sess.isLocalKubernetesCluster kubeObjType string namespace string ) diff --git a/lib/kube/proxy/resource_list.go b/lib/kube/proxy/resource_list.go index c86c3e788b0e8..d0401a600fe5d 100644 --- a/lib/kube/proxy/resource_list.go +++ b/lib/kube/proxy/resource_list.go @@ -53,7 +53,7 @@ func (f *Forwarder) listResources(sess *clusterSession, w http.ResponseWriter, r req = req.WithContext(ctx) - isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster := sess.isLocalKubernetesCluster supportsType := false resourceKind := "" if isLocalKubeCluster { diff --git a/lib/kube/proxy/self_subject_reviews.go b/lib/kube/proxy/self_subject_reviews.go index fe8b463acea33..2130cfdaed034 100644 --- a/lib/kube/proxy/self_subject_reviews.go +++ b/lib/kube/proxy/self_subject_reviews.go @@ -80,7 +80,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon // only allow self subject access reviews for the service that proxies the // request to the kubernetes API server. - if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if sess.isLocalKubernetesCluster { if err := f.validateSelfSubjectAccessReview(sess, w, req); trace.IsAccessDenied(err) { return nil, nil } else if err != nil { diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index c5e4056bb4e23..bbf2f308a1a25 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -747,7 +747,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta s.started = true sessionStart := s.forwarder.cfg.Clock.Now().UTC() - if !s.sess.noAuditEvents { + if s.sess.isLocalKubernetesCluster { s.terminalSizeQueue.callback = func(termSize terminalResizeMessage) { s.mu.Lock() defer s.mu.Unlock()