Skip to content

Commit

Permalink
Emit port forward event only once at setup instead than upon each con…
Browse files Browse the repository at this point in the history
…nection. Include k8s cluster info in event.
  • Loading branch information
creack committed Nov 16, 2024
1 parent 5ff37d2 commit 5191680
Show file tree
Hide file tree
Showing 6 changed files with 1,227 additions and 1,155 deletions.
7 changes: 7 additions & 0 deletions api/proto/teleport/legacy/types/events/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,13 @@ message PortForward {

// Addr is a target port forwarding address
string Addr = 5 [(gogoproto.jsontag) = "addr"];

// Kubernetes has information about a kubernetes cluster, if applicable.
KubernetesClusterMetadata Kubernetes = 6 [
(gogoproto.nullable) = false,
(gogoproto.embed) = true,
(gogoproto.jsontag) = ""
];
}

// X11Forward is emitted when a user requests X11 protocol forwarding
Expand Down
2,291 changes: 1,169 additions & 1,122 deletions api/types/events/events.pb.go

Large diffs are not rendered by default.

31 changes: 21 additions & 10 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,7 +1772,15 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
return nil, trace.Wrap(err)
}

onForwardConnection := func(addr string, success bool) {
if !success {
f.log.Warn("Failed to establish connection to %q.", addr)
}
// TODO: Consider emit audit log events. Here we have the target pod port and an actual success value.
}

onPortForward := func(addr string, success bool) {
// NOTE: Kubectl doesn't send the target port initially so `addr` will have port 0.
if !sess.isLocalKubernetesCluster {
return
}
Expand All @@ -1791,7 +1799,10 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req
Status: apievents.Status{
Success: success,
},
KubernetesClusterMetadata: sess.eventClusterMeta(req),
}
// NOTE: Success will always be true as there is nothing that can fail at this point.
// TODO: Move this to onForwardConnection / rename. Keeping for now to avoid orphan const code.
if !success {
portForward.Code = events.PortForwardFailureCode
}
Expand All @@ -1802,15 +1813,16 @@ func (f *Forwarder) portForward(authCtx *authContext, w http.ResponseWriter, req

q := req.URL.Query()
request := portForwardRequest{
podNamespace: p.ByName("podNamespace"),
podName: p.ByName("podName"),
ports: q["ports"],
context: ctx,
httpRequest: req,
httpResponseWriter: w,
onPortForward: onPortForward,
targetDialer: dialer,
pingPeriod: f.cfg.ConnPingPeriod,
podNamespace: p.ByName("podNamespace"),
podName: p.ByName("podName"),
ports: q["ports"],
context: ctx,
httpRequest: req,
httpResponseWriter: w,
onPortForward: onPortForward,
onForwardConnection: onForwardConnection,
targetDialer: dialer,
pingPeriod: f.cfg.ConnPingPeriod,
}
f.log.Debugf("Starting %v.", request)
err = runPortForwarding(request)
Expand Down Expand Up @@ -2186,7 +2198,6 @@ func (f *Forwarder) getSPDYExecutor(sess *clusterSession, req *http.Request) (re
}

func (f *Forwarder) getPortForwardDialer(sess *clusterSession, req *http.Request) (httpstream.Dialer, error) {

wsDialer, err := f.getWebsocketDialer(sess, req)
if err != nil {
return nil, trace.Wrap(err)
Expand Down
22 changes: 12 additions & 10 deletions lib/kube/proxy/portforward_spdy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ import (

// portForwardRequest is a request that specifies port forwarding
type portForwardRequest struct {
podNamespace string
podName string
ports []string
httpRequest *http.Request
httpResponseWriter http.ResponseWriter
onPortForward portForwardCallback
context context.Context
targetDialer httpstream.Dialer
pingPeriod time.Duration
podNamespace string
podName string
ports []string
httpRequest *http.Request
httpResponseWriter http.ResponseWriter
onPortForward portForwardCallback
onForwardConnection portForwardCallback
context context.Context
targetDialer httpstream.Dialer
pingPeriod time.Duration
}

func (p portForwardRequest) String() string {
Expand Down Expand Up @@ -179,7 +180,7 @@ func (h *portForwardProxy) forwardStreamPair(p *httpStreamPair, remotePort int64

// read and write from the error stream
targetErrorStream, err := h.targetConn.CreateStream(headers)
h.onPortForward(net.JoinHostPort(h.podName, port), err == nil /* success */)
h.onForwardConnection(net.JoinHostPort(h.podName, port), err == nil /* success */)
if err != nil {
err := trace.ConnectionProblem(err, "error creating error stream for port %d", remotePort)
p.sendErr(err)
Expand Down Expand Up @@ -294,6 +295,7 @@ func (h *portForwardProxy) requestID(stream httpstream.Stream) (string, error) {
// when the httpstream.Connection is closed.
func (h *portForwardProxy) run() {
h.Debugf("Waiting for port forward streams.")
h.onPortForward(net.JoinHostPort(h.podName, "0"), true)
for {
select {
case <-h.context.Done():
Expand Down
5 changes: 3 additions & 2 deletions lib/kube/proxy/portforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,9 @@ func TestPortForwardProxy_run_connsClosed(t *testing.T) {
targetConn := newfakeSPDYConnection()
h := &portForwardProxy{
portForwardRequest: portForwardRequest{
context: context.Background(),
onPortForward: func(addr string, success bool) {},
context: context.Background(),
onPortForward: func(addr string, success bool) {},
onForwardConnection: func(addr string, success bool) {},
},
Entry: logger,
sourceConn: sourceConn,
Expand Down
26 changes: 15 additions & 11 deletions lib/kube/proxy/portforward_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/binary"
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -142,11 +143,12 @@ func runPortForwardingWebSocket(req portForwardRequest) error {
}

h := &websocketPortforwardHandler{
conn: conn,
streamPairs: streamPairs,
podName: req.podName,
targetConn: targetConn,
onPortForward: req.onPortForward,
conn: conn,
streamPairs: streamPairs,
podName: req.podName,
targetConn: targetConn,
onPortForward: req.onPortForward,
onForwardConnection: req.onForwardConnection,
FieldLogger: logrus.WithFields(logrus.Fields{
teleport.ComponentKey: teleport.Component(teleport.ComponentProxyKube),
events.RemoteAddr: req.httpRequest.RemoteAddr,
Expand Down Expand Up @@ -207,11 +209,12 @@ func (w *websocketChannelPair) sendErr(err error) {
// websocketPortforwardHandler is capable of processing a single port forward
// request over a websocket connection
type websocketPortforwardHandler struct {
conn *wsstream.Conn
streamPairs []*websocketChannelPair
podName string
targetConn httpstream.Connection
onPortForward portForwardCallback
conn *wsstream.Conn
streamPairs []*websocketChannelPair
podName string
targetConn httpstream.Connection
onPortForward portForwardCallback
onForwardConnection portForwardCallback
logrus.FieldLogger
context context.Context
}
Expand All @@ -223,6 +226,7 @@ func (h *websocketPortforwardHandler) run() {
wg := sync.WaitGroup{}
wg.Add(len(h.streamPairs))

h.onPortForward(net.JoinHostPort(h.podName, "0"), true)
for _, pair := range h.streamPairs {
p := pair
go func() {
Expand Down Expand Up @@ -251,7 +255,7 @@ func (h *websocketPortforwardHandler) forwardStreamPair(p *websocketChannelPair)

// read and write from the error stream
targetErrorStream, err := h.targetConn.CreateStream(headers)
h.onPortForward(fmt.Sprintf("%v:%v", h.podName, p.port), err == nil /* success */)
h.onForwardConnection(net.JoinHostPort(h.podName, fmt.Sprint(p.port)), err == nil /* success */)
if err != nil {
p.sendErr(err)
return
Expand Down

0 comments on commit 5191680

Please sign in to comment.