From a893432c1d2d9d948eb7523e17d4ca14cc84192c Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Mon, 11 Nov 2024 18:16:06 +0000 Subject: [PATCH] [kube] fixes duplicated session recordings in root and leaf clusters (#48738) This PR addresses a bug in Kubernetes session recordings where both the root proxy and the leaf cluster's Kubernetes services were recording the same session, resulting in the session being available in both clusters. This behavior was inconsistent with other protocols, where recordings of leaf resources are only accessible in leaf clusters. To maintain consistency, this PR removes session recordings on the root clusters. Signed-off-by: Tiago Silva --- integration/kube_integration_test.go | 5 +-- lib/kube/proxy/ephemeral_containers.go | 2 +- lib/kube/proxy/forwarder.go | 45 +++++++++++---------- lib/kube/proxy/resource_deletecollection.go | 2 +- lib/kube/proxy/resource_list.go | 2 +- lib/kube/proxy/self_subject_reviews.go | 2 +- lib/kube/proxy/sess.go | 2 +- 7 files changed, 31 insertions(+), 29 deletions(-) diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index 14793b1b399e5..27ad39e51a13a 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -793,7 +793,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) { loop: for { select { - case event := <-main.UploadEventsC: + case event := <-aux.UploadEventsC: sessionID = event.SessionID break loop case <-timeoutC: @@ -802,7 +802,7 @@ loop: } // read back the entire session and verify that it matches the stated output - capturedStream, err := main.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes) + capturedStream, err := aux.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes) require.NoError(t, err) require.Equal(t, sessionStream, string(capturedStream)) @@ -1620,7 +1620,6 @@ func waitForContainer(ctx context.Context, podClient corev1client.PodInterface, } s := getContainerStatusByName(p, containerName) - fmt.Println("test", s) if s == nil { return false, nil } diff --git a/lib/kube/proxy/ephemeral_containers.go b/lib/kube/proxy/ephemeral_containers.go index e39cb7dbca3a8..1c9ae08e417a4 100644 --- a/lib/kube/proxy/ephemeral_containers.go +++ b/lib/kube/proxy/ephemeral_containers.go @@ -104,7 +104,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri f.log.Errorf("Failed to set up forwarding headers: %v.", err) return nil, trace.Wrap(err) } - if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if !sess.isLocalKubernetesCluster { sess.forwarder.ServeHTTP(w, req) return nil, nil } diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index bf38e9a3b010d..1b109d42a488b 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -441,6 +441,10 @@ type authContext struct { kubeServers []types.KubeServer // apiResource holds the information about the requested API resource. apiResource apiResource + // isLocalKubernetesCluster is true if the target cluster is served by this teleport service. + // It is false if the target cluster is served by another teleport service or a different + // Teleport cluster. + isLocalKubernetesCluster bool } func (c authContext) String() string { @@ -778,7 +782,8 @@ func (f *Forwarder) setupContext( return nil, trace.NotFound("Kubernetes cluster %q not found", kubeCluster) } } - if f.isLocalKubeCluster(isRemoteCluster, kubeCluster) { + isLocalKubernetesCluster := f.isLocalKubeCluster(isRemoteCluster, kubeCluster) + if isLocalKubernetesCluster { kubeResource, apiResource, err = f.parseResourceFromRequest(req, kubeCluster) if err != nil { return nil, trace.Wrap(err) @@ -812,10 +817,11 @@ func (f *Forwarder) setupContext( remoteAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: req.RemoteAddr}, isRemote: isRemoteCluster, }, - kubeServers: kubeServers, - requestVerb: apiResource.getVerb(req), - apiResource: apiResource, - kubeResource: kubeResource, + kubeServers: kubeServers, + requestVerb: apiResource.getVerb(req), + apiResource: apiResource, + kubeResource: kubeResource, + isLocalKubernetesCluster: isLocalKubernetesCluster, }, nil } @@ -868,9 +874,11 @@ func (f *Forwarder) emitAuditEvent(req *http.Request, sess *clusterSession, stat ) defer span.End() - if sess.noAuditEvents { + // If the session is not local, don't emit the event. + if !sess.isLocalKubernetesCluster { return } + r := sess.apiResource if r.skipEvent { return @@ -1164,7 +1172,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ return nil, trace.Wrap(err) } - if !f.isLocalKubeCluster(ctx.teleportCluster.isRemote, ctx.kubeClusterName) { + if !sess.isLocalKubernetesCluster { return f.remoteJoin(ctx, w, req, p, sess) } @@ -1657,7 +1665,7 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. return upgradeRequestToRemoteCommandProxy(request, func(proxy *remoteCommandProxy) error { - if sess.noAuditEvents { + if !sess.isLocalKubernetesCluster { // We're forwarding this to another kubernetes_service instance, let it handle multiplexing. return f.remoteExec(authCtx, w, req, p, sess, request, proxy) } @@ -1754,7 +1762,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req } onPortForward := func(addr string, success bool) { - if sess.noAuditEvents { + if !sess.isLocalKubernetesCluster { return } portForward := &apievents.PortForward{ @@ -2029,7 +2037,7 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h return nil, trace.Wrap(err) } - isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster := sess.isLocalKubernetesCluster isListRequest := authCtx.requestVerb == types.KubeVerbList // Watch requests can be send to a single resource or to a collection of resources. // isWatchingCollectionRequest is true when the request is a watch request and @@ -2103,7 +2111,7 @@ func isRelevantWebsocketError(err error) bool { func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { isWSSupported := false - if sess.noAuditEvents { + if !sess.isLocalKubernetesCluster { // We're forwarding it to another kube_service, check if it supports new protocol. isWSSupported = f.allServersSupportExecSubprotocolV5(sess) } else { @@ -2224,10 +2232,8 @@ type clusterSession struct { // nil otherwise. kubeAPICreds kubeCreds forwarder *reverseproxy.Forwarder - // noAuditEvents is true if this teleport service should leave audit event - // logging to another service. - noAuditEvents bool - targetAddr string + // targetAddr is the address of the target cluster. + targetAddr string // kubeAddress is the address of this session's active connection (if there is one) kubeAddress string // upgradeToHTTP2 indicates whether the transport should be configured to use HTTP2. @@ -2435,11 +2441,8 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont func (f *Forwarder) newClusterSessionDirect(ctx context.Context, authCtx authContext) (*clusterSession, error) { connCtx, cancel := context.WithCancelCause(ctx) return &clusterSession{ - parent: f, - authContext: authCtx, - // This session talks to a kubernetes_service, which should handle - // audit logging. Avoid duplicate logging. - noAuditEvents: true, + parent: f, + authContext: authCtx, requestContext: ctx, connCtx: connCtx, connMonitorCancel: cancel, @@ -2463,7 +2466,7 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*reverseproxy.Fo reverseproxy.WithLogger(f.log), reverseproxy.WithErrorHandler(f.formatForwardResponseError), } - if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if sess.isLocalKubernetesCluster { // If the target cluster is local, i.e. the cluster that is served by this // teleport service, then we set up the forwarder to allow re-writing // the response to the client to include user friendly error messages. diff --git a/lib/kube/proxy/resource_deletecollection.go b/lib/kube/proxy/resource_deletecollection.go index dced5822ffc71..89c658c551c05 100644 --- a/lib/kube/proxy/resource_deletecollection.go +++ b/lib/kube/proxy/resource_deletecollection.go @@ -62,7 +62,7 @@ func (f *Forwarder) deleteResourcesCollection(sess *clusterSession, w http.Respo defer span.End() req = req.WithContext(ctx) var ( - isLocalKubeCluster = f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster = sess.isLocalKubernetesCluster kubeObjType string namespace string ) diff --git a/lib/kube/proxy/resource_list.go b/lib/kube/proxy/resource_list.go index a53313778608b..355cfbb2a3407 100644 --- a/lib/kube/proxy/resource_list.go +++ b/lib/kube/proxy/resource_list.go @@ -53,7 +53,7 @@ func (f *Forwarder) listResources(sess *clusterSession, w http.ResponseWriter, r req = req.WithContext(ctx) - isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster := sess.isLocalKubernetesCluster supportsType := false resourceKind := "" if isLocalKubeCluster { diff --git a/lib/kube/proxy/self_subject_reviews.go b/lib/kube/proxy/self_subject_reviews.go index fe8b463acea33..2130cfdaed034 100644 --- a/lib/kube/proxy/self_subject_reviews.go +++ b/lib/kube/proxy/self_subject_reviews.go @@ -80,7 +80,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon // only allow self subject access reviews for the service that proxies the // request to the kubernetes API server. - if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if sess.isLocalKubernetesCluster { if err := f.validateSelfSubjectAccessReview(sess, w, req); trace.IsAccessDenied(err) { return nil, nil } else if err != nil { diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index 54ed10105b8e7..8a12d764fba99 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -748,7 +748,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta s.started = true sessionStart := s.forwarder.cfg.Clock.Now().UTC() - if !s.sess.noAuditEvents { + if s.sess.isLocalKubernetesCluster { s.terminalSizeQueue.callback = func(termSize terminalResizeMessage) { s.mu.Lock() defer s.mu.Unlock()