diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index 14793b1b399e5..27ad39e51a13a 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -793,7 +793,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) { loop: for { select { - case event := <-main.UploadEventsC: + case event := <-aux.UploadEventsC: sessionID = event.SessionID break loop case <-timeoutC: @@ -802,7 +802,7 @@ loop: } // read back the entire session and verify that it matches the stated output - capturedStream, err := main.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes) + capturedStream, err := aux.Process.GetAuthServer().GetSessionChunk(apidefaults.Namespace, session.ID(sessionID), 0, events.MaxChunkBytes) require.NoError(t, err) require.Equal(t, sessionStream, string(capturedStream)) @@ -1620,7 +1620,6 @@ func waitForContainer(ctx context.Context, podClient corev1client.PodInterface, } s := getContainerStatusByName(p, containerName) - fmt.Println("test", s) if s == nil { return false, nil } diff --git a/lib/kube/proxy/ephemeral_containers.go b/lib/kube/proxy/ephemeral_containers.go index e39cb7dbca3a8..1c9ae08e417a4 100644 --- a/lib/kube/proxy/ephemeral_containers.go +++ b/lib/kube/proxy/ephemeral_containers.go @@ -104,7 +104,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri f.log.Errorf("Failed to set up forwarding headers: %v.", err) return nil, trace.Wrap(err) } - if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if !sess.isLocalKubernetesCluster { sess.forwarder.ServeHTTP(w, req) return nil, nil } diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index bf38e9a3b010d..1b109d42a488b 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -441,6 +441,10 @@ type authContext struct { kubeServers []types.KubeServer // apiResource holds the information about the requested API resource. apiResource apiResource + // isLocalKubernetesCluster is true if the target cluster is served by this teleport service. + // It is false if the target cluster is served by another teleport service or a different + // Teleport cluster. + isLocalKubernetesCluster bool } func (c authContext) String() string { @@ -778,7 +782,8 @@ func (f *Forwarder) setupContext( return nil, trace.NotFound("Kubernetes cluster %q not found", kubeCluster) } } - if f.isLocalKubeCluster(isRemoteCluster, kubeCluster) { + isLocalKubernetesCluster := f.isLocalKubeCluster(isRemoteCluster, kubeCluster) + if isLocalKubernetesCluster { kubeResource, apiResource, err = f.parseResourceFromRequest(req, kubeCluster) if err != nil { return nil, trace.Wrap(err) @@ -812,10 +817,11 @@ func (f *Forwarder) setupContext( remoteAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: req.RemoteAddr}, isRemote: isRemoteCluster, }, - kubeServers: kubeServers, - requestVerb: apiResource.getVerb(req), - apiResource: apiResource, - kubeResource: kubeResource, + kubeServers: kubeServers, + requestVerb: apiResource.getVerb(req), + apiResource: apiResource, + kubeResource: kubeResource, + isLocalKubernetesCluster: isLocalKubernetesCluster, }, nil } @@ -868,9 +874,11 @@ func (f *Forwarder) emitAuditEvent(req *http.Request, sess *clusterSession, stat ) defer span.End() - if sess.noAuditEvents { + // If the session is not local, don't emit the event. + if !sess.isLocalKubernetesCluster { return } + r := sess.apiResource if r.skipEvent { return @@ -1164,7 +1172,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ return nil, trace.Wrap(err) } - if !f.isLocalKubeCluster(ctx.teleportCluster.isRemote, ctx.kubeClusterName) { + if !sess.isLocalKubernetesCluster { return f.remoteJoin(ctx, w, req, p, sess) } @@ -1657,7 +1665,7 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. return upgradeRequestToRemoteCommandProxy(request, func(proxy *remoteCommandProxy) error { - if sess.noAuditEvents { + if !sess.isLocalKubernetesCluster { // We're forwarding this to another kubernetes_service instance, let it handle multiplexing. return f.remoteExec(authCtx, w, req, p, sess, request, proxy) } @@ -1754,7 +1762,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req } onPortForward := func(addr string, success bool) { - if sess.noAuditEvents { + if !sess.isLocalKubernetesCluster { return } portForward := &apievents.PortForward{ @@ -2029,7 +2037,7 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h return nil, trace.Wrap(err) } - isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster := sess.isLocalKubernetesCluster isListRequest := authCtx.requestVerb == types.KubeVerbList // Watch requests can be send to a single resource or to a collection of resources. // isWatchingCollectionRequest is true when the request is a watch request and @@ -2103,7 +2111,7 @@ func isRelevantWebsocketError(err error) bool { func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) { isWSSupported := false - if sess.noAuditEvents { + if !sess.isLocalKubernetesCluster { // We're forwarding it to another kube_service, check if it supports new protocol. isWSSupported = f.allServersSupportExecSubprotocolV5(sess) } else { @@ -2224,10 +2232,8 @@ type clusterSession struct { // nil otherwise. kubeAPICreds kubeCreds forwarder *reverseproxy.Forwarder - // noAuditEvents is true if this teleport service should leave audit event - // logging to another service. - noAuditEvents bool - targetAddr string + // targetAddr is the address of the target cluster. + targetAddr string // kubeAddress is the address of this session's active connection (if there is one) kubeAddress string // upgradeToHTTP2 indicates whether the transport should be configured to use HTTP2. @@ -2435,11 +2441,8 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont func (f *Forwarder) newClusterSessionDirect(ctx context.Context, authCtx authContext) (*clusterSession, error) { connCtx, cancel := context.WithCancelCause(ctx) return &clusterSession{ - parent: f, - authContext: authCtx, - // This session talks to a kubernetes_service, which should handle - // audit logging. Avoid duplicate logging. - noAuditEvents: true, + parent: f, + authContext: authCtx, requestContext: ctx, connCtx: connCtx, connMonitorCancel: cancel, @@ -2463,7 +2466,7 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*reverseproxy.Fo reverseproxy.WithLogger(f.log), reverseproxy.WithErrorHandler(f.formatForwardResponseError), } - if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if sess.isLocalKubernetesCluster { // If the target cluster is local, i.e. the cluster that is served by this // teleport service, then we set up the forwarder to allow re-writing // the response to the client to include user friendly error messages. diff --git a/lib/kube/proxy/resource_deletecollection.go b/lib/kube/proxy/resource_deletecollection.go index dced5822ffc71..89c658c551c05 100644 --- a/lib/kube/proxy/resource_deletecollection.go +++ b/lib/kube/proxy/resource_deletecollection.go @@ -62,7 +62,7 @@ func (f *Forwarder) deleteResourcesCollection(sess *clusterSession, w http.Respo defer span.End() req = req.WithContext(ctx) var ( - isLocalKubeCluster = f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster = sess.isLocalKubernetesCluster kubeObjType string namespace string ) diff --git a/lib/kube/proxy/resource_list.go b/lib/kube/proxy/resource_list.go index a53313778608b..355cfbb2a3407 100644 --- a/lib/kube/proxy/resource_list.go +++ b/lib/kube/proxy/resource_list.go @@ -53,7 +53,7 @@ func (f *Forwarder) listResources(sess *clusterSession, w http.ResponseWriter, r req = req.WithContext(ctx) - isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) + isLocalKubeCluster := sess.isLocalKubernetesCluster supportsType := false resourceKind := "" if isLocalKubeCluster { diff --git a/lib/kube/proxy/self_subject_reviews.go b/lib/kube/proxy/self_subject_reviews.go index fe8b463acea33..2130cfdaed034 100644 --- a/lib/kube/proxy/self_subject_reviews.go +++ b/lib/kube/proxy/self_subject_reviews.go @@ -80,7 +80,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon // only allow self subject access reviews for the service that proxies the // request to the kubernetes API server. - if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) { + if sess.isLocalKubernetesCluster { if err := f.validateSelfSubjectAccessReview(sess, w, req); trace.IsAccessDenied(err) { return nil, nil } else if err != nil { diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index 54ed10105b8e7..8a12d764fba99 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -748,7 +748,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta s.started = true sessionStart := s.forwarder.cfg.Clock.Now().UTC() - if !s.sess.noAuditEvents { + if s.sess.isLocalKubernetesCluster { s.terminalSizeQueue.callback = func(termSize terminalResizeMessage) { s.mu.Lock() defer s.mu.Unlock()