Skip to content

Commit

Permalink
[kube] fixes duplicated session recordings in root and leaf clusters (#…
Browse files Browse the repository at this point in the history
…47910)

This PR addresses a bug in Kubernetes session recordings where both the root proxy and the leaf cluster's Kubernetes services were recording the same session, resulting in the session being available in both clusters.

This behavior was inconsistent with other protocols, where recordings of leaf resources are only accessible in leaf clusters. To maintain consistency, this PR removes session recordings on the root clusters.

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato authored Nov 11, 2024
1 parent 3aa6f3c commit 5711ee9
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 30 deletions.
5 changes: 2 additions & 3 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) {
loop:
for {
select {
case event := <-main.UploadEventsC:
case event := <-aux.UploadEventsC:
sessionID = event.SessionID
break loop
case <-timeoutC:
Expand All @@ -800,7 +800,7 @@ loop:
}

// read back the entire session and verify that it matches the stated output
capturedStream, _ := streamSession(ctx, t, main.Process.GetAuthServer(), sessionID)
capturedStream, _ := streamSession(ctx, t, aux.Process.GetAuthServer(), sessionID)
require.Equal(t, sessionStream, capturedStream)

// impersonating kube exec should be denied
Expand Down Expand Up @@ -1614,7 +1614,6 @@ func waitForContainer(ctx context.Context, podClient corev1client.PodInterface,
}

s := getContainerStatusByName(p, containerName)
fmt.Println("test", s)
if s == nil {
return false, nil
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/ephemeral_containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (f *Forwarder) ephemeralContainers(authCtx *authContext, w http.ResponseWri
f.log.Errorf("Failed to set up forwarding headers: %v.", err)
return nil, trace.Wrap(err)
}
if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if !sess.isLocalKubernetesCluster {
sess.forwarder.ServeHTTP(w, req)
return nil, nil
}
Expand Down
47 changes: 25 additions & 22 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ type authContext struct {
kubeServers []types.KubeServer
// apiResource holds the information about the requested API resource.
apiResource apiResource
// isLocalKubernetesCluster is true if the target cluster is served by this teleport service.
// It is false if the target cluster is served by another teleport service or a different
// Teleport cluster.
isLocalKubernetesCluster bool
}

func (c authContext) String() string {
Expand Down Expand Up @@ -789,7 +793,8 @@ func (f *Forwarder) setupContext(
return nil, trace.NotFound("Kubernetes cluster %q not found", kubeCluster)
}
}
if f.isLocalKubeCluster(isRemoteCluster, kubeCluster) {
isLocalKubernetesCluster := f.isLocalKubeCluster(isRemoteCluster, kubeCluster)
if isLocalKubernetesCluster {
kubeResource, apiResource, err = f.parseResourceFromRequest(req, kubeCluster)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -823,10 +828,11 @@ func (f *Forwarder) setupContext(
remoteAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: req.RemoteAddr},
isRemote: isRemoteCluster,
},
kubeServers: kubeServers,
requestVerb: apiResource.getVerb(req),
apiResource: apiResource,
kubeResource: kubeResource,
kubeServers: kubeServers,
requestVerb: apiResource.getVerb(req),
apiResource: apiResource,
kubeResource: kubeResource,
isLocalKubernetesCluster: isLocalKubernetesCluster,
}, nil
}

Expand Down Expand Up @@ -879,9 +885,11 @@ func (f *Forwarder) emitAuditEvent(req *http.Request, sess *clusterSession, stat
)
defer span.End()

if sess.noAuditEvents {
// If the session is not local, don't emit the event.
if !sess.isLocalKubernetesCluster {
return
}

r := sess.apiResource
if r.skipEvent {
return
Expand Down Expand Up @@ -1175,7 +1183,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ
return nil, trace.Wrap(err)
}

if !f.isLocalKubeCluster(ctx.teleportCluster.isRemote, ctx.kubeClusterName) {
if !sess.isLocalKubernetesCluster {
return f.remoteJoin(ctx, w, req, p, sess)
}

Expand Down Expand Up @@ -1668,8 +1676,8 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http.

return upgradeRequestToRemoteCommandProxy(request,
func(proxy *remoteCommandProxy) error {
if sess.noAuditEvents {
// We're forwarding this to another kubernetes_service instance, let it handle multiplexing.
if !sess.isLocalKubernetesCluster {
// We're forwarding this to another kubernetes_service instance or Teleport proxy, let it handle session recording.
return f.remoteExec(req, sess, proxy)
}

Expand Down Expand Up @@ -1765,7 +1773,7 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
}

onPortForward := func(addr string, success bool) {
if sess.noAuditEvents {
if !sess.isLocalKubernetesCluster {
return
}
portForward := &apievents.PortForward{
Expand Down Expand Up @@ -2040,7 +2048,7 @@ func (f *Forwarder) catchAll(authCtx *authContext, w http.ResponseWriter, req *h
return nil, trace.Wrap(err)
}

isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster := sess.isLocalKubernetesCluster
isListRequest := authCtx.requestVerb == types.KubeVerbList
// Watch requests can be send to a single resource or to a collection of resources.
// isWatchingCollectionRequest is true when the request is a watch request and
Expand Down Expand Up @@ -2122,7 +2130,7 @@ func isRelevantWebsocketError(err error) bool {

func (f *Forwarder) getExecutor(sess *clusterSession, req *http.Request) (remotecommand.Executor, error) {
isWSSupported := false
if !f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if !sess.isLocalKubernetesCluster {
// We're forwarding it to another Teleport kube_service,
// which supports the websocket protocol.
isWSSupported = true
Expand Down Expand Up @@ -2275,10 +2283,8 @@ type clusterSession struct {
// nil otherwise.
kubeAPICreds kubeCreds
forwarder *reverseproxy.Forwarder
// noAuditEvents is true if this teleport service should leave audit event
// logging to another service.
noAuditEvents bool
targetAddr string
// targetAddr is the address of the target cluster.
targetAddr string
// kubeAddress is the address of this session's active connection (if there is one)
kubeAddress string
// upgradeToHTTP2 indicates whether the transport should be configured to use HTTP2.
Expand Down Expand Up @@ -2486,11 +2492,8 @@ func (f *Forwarder) newClusterSessionLocal(ctx context.Context, authCtx authCont
func (f *Forwarder) newClusterSessionDirect(ctx context.Context, authCtx authContext) (*clusterSession, error) {
connCtx, cancel := context.WithCancelCause(ctx)
return &clusterSession{
parent: f,
authContext: authCtx,
// This session talks to a kubernetes_service, which should handle
// audit logging. Avoid duplicate logging.
noAuditEvents: true,
parent: f,
authContext: authCtx,
requestContext: ctx,
connCtx: connCtx,
connMonitorCancel: cancel,
Expand All @@ -2514,7 +2517,7 @@ func (f *Forwarder) makeSessionForwarder(sess *clusterSession) (*reverseproxy.Fo
reverseproxy.WithLogger(f.log),
reverseproxy.WithErrorHandler(f.formatForwardResponseError),
}
if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if sess.isLocalKubernetesCluster {
// If the target cluster is local, i.e. the cluster that is served by this
// teleport service, then we set up the forwarder to allow re-writing
// the response to the client to include user friendly error messages.
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_deletecollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (f *Forwarder) deleteResourcesCollection(sess *clusterSession, w http.Respo
defer span.End()
req = req.WithContext(ctx)
var (
isLocalKubeCluster = f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster = sess.isLocalKubernetesCluster
kubeObjType string
namespace string
)
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (f *Forwarder) listResources(sess *clusterSession, w http.ResponseWriter, r

req = req.WithContext(ctx)

isLocalKubeCluster := f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName)
isLocalKubeCluster := sess.isLocalKubernetesCluster
supportsType := false
resourceKind := ""
if isLocalKubeCluster {
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/self_subject_reviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (f *Forwarder) selfSubjectAccessReviews(authCtx *authContext, w http.Respon

// only allow self subject access reviews for the service that proxies the
// request to the kubernetes API server.
if f.isLocalKubeCluster(sess.teleportCluster.isRemote, sess.kubeClusterName) {
if sess.isLocalKubernetesCluster {
if err := f.validateSelfSubjectAccessReview(sess, w, req); trace.IsAccessDenied(err) {
return nil, nil
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta
s.started = true
sessionStart := s.forwarder.cfg.Clock.Now().UTC()

if !s.sess.noAuditEvents {
if s.sess.isLocalKubernetesCluster {
s.terminalSizeQueue.callback = func(termSize terminalResizeMessage) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit 5711ee9

Please sign in to comment.