Skip to content

Commit

Permalink
Preparation for the QUIC proxy peering implementation (#48836)
Browse files Browse the repository at this point in the history
* Move the QUIC proxy peering envvar check to lib/config

* Move lib/proxy/clusterdial to lib/peer/dial

* Move peer.clientConn to lib/proxy/peer/internal

* Require QUIC peering label to be set to "yes"

* Move peer TLS verification funcs to lib/proxy/peer/internal

* Move QUICServer to lib/proxy/peer/quic

* Avoid stuttering in lib/proxy/peer/quic

* Remove useless quic.ServerConfig.CipherSuites param

* Fix log style in proxy.peer.quic

* Move duplicatePeerMsg into the function scope

Co-authored-by: Alan Parra <[email protected]>

---------

Co-authored-by: Alan Parra <[email protected]>
  • Loading branch information
espadolini and codingllama authored Nov 13, 2024
1 parent 8ea9e7d commit 9bea5e1
Show file tree
Hide file tree
Showing 17 changed files with 362 additions and 235 deletions.
8 changes: 4 additions & 4 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,10 @@ const (
// group they should attempt to be connected to.
ProxyGroupGenerationLabel = TeleportInternalLabelPrefix + "proxygroup-gen"

// ProxyPeerQUICLabel is the internal-user label for proxy heartbeats that's
// used to signal that the proxy supports receiving proxy peering
// connections over QUIC.
ProxyPeerQUICLabel = TeleportInternalLabelPrefix + "proxy-peer-quic"
// UnstableProxyPeerQUICLabel is the internal-use label for proxy heartbeats
// that's used to signal that the proxy supports receiving proxy peering
// connections over QUIC. The value should be "yes".
UnstableProxyPeerQUICLabel = TeleportInternalLabelPrefix + "proxy-peer-quic"

// OktaAppNameLabel is the individual app name label.
OktaAppNameLabel = TeleportInternalLabelPrefix + "okta-app-name"
Expand Down
5 changes: 5 additions & 0 deletions lib/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2730,6 +2730,11 @@ func Configure(clf *CommandLineFlags, cfg *servicecfg.Config, legacyAppFlags boo
cfg.DebugService.Enabled = false
}

// TODO(espadolini): allow this when the implementation is merged
if false && os.Getenv("TELEPORT_UNSTABLE_QUIC_PROXY_PEERING") == "yes" {
cfg.Proxy.QUICProxyPeering = true
}

return nil
}

Expand Down
60 changes: 0 additions & 60 deletions lib/proxy/clusterdial/dial.go

This file was deleted.

89 changes: 32 additions & 57 deletions lib/proxy/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
streamutils "github.com/gravitational/teleport/api/utils/grpc/stream"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/proxy/peer/internal"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
)
Expand Down Expand Up @@ -94,11 +95,11 @@ type ClientConfig struct {
}

// connShuffler shuffles the order of client connections.
type connShuffler func([]clientConn)
type connShuffler func([]internal.ClientConn)

// randomConnShuffler returns a conn shuffler that randomizes the order of connections.
func randomConnShuffler() connShuffler {
return func(conns []clientConn) {
return func(conns []internal.ClientConn) {
rand.Shuffle(len(conns), func(i, j int) {
conns[i], conns[j] = conns[j], conns[i]
})
Expand All @@ -107,7 +108,7 @@ func randomConnShuffler() connShuffler {

// noopConnShutffler returns a conn shuffler that keeps the original connection ordering.
func noopConnShuffler() connShuffler {
return func([]clientConn) {}
return func([]internal.ClientConn) {}
}

// checkAndSetDefaults checks and sets default values
Expand Down Expand Up @@ -163,32 +164,6 @@ func (c *ClientConfig) checkAndSetDefaults() error {
return nil
}

// clientConn manages client connections to a specific peer proxy (with a fixed
// host ID and address).
type clientConn interface {
// peerID returns the host ID of the peer proxy.
peerID() string
// peerAddr returns the address of the peer proxy.
peerAddr() string

// dial opens a connection of a given tunnel type to a node with the given
// ID through the peer proxy managed by the clientConn.
dial(
nodeID string,
src net.Addr,
dst net.Addr,
tunnelType types.TunnelType,
) (net.Conn, error)

// close closes all connections and releases any background resources
// immediately.
close() error

// shutdown waits until all connections are closed or the context is done,
// then acts like close.
shutdown(context.Context)
}

// grpcClientConn manages client connections to a specific peer proxy over gRPC.
type grpcClientConn struct {
cc *grpc.ClientConn
Expand All @@ -205,13 +180,13 @@ type grpcClientConn struct {
count int
}

var _ clientConn = (*grpcClientConn)(nil)
var _ internal.ClientConn = (*grpcClientConn)(nil)

// peerID implements [clientConn].
func (c *grpcClientConn) peerID() string { return c.id }
// PeerID implements [internal.ClientConn].
func (c *grpcClientConn) PeerID() string { return c.id }

// peerAddr implements [clientConn].
func (c *grpcClientConn) peerAddr() string { return c.addr }
// PeerAddr implements [internal.ClientConn].
func (c *grpcClientConn) PeerAddr() string { return c.addr }

// maybeAcquire returns a non-nil release func if the grpcClientConn is
// currently allowed to open connections; i.e., if it hasn't fully shut down.
Expand All @@ -234,8 +209,8 @@ func (c *grpcClientConn) maybeAcquire() (release func()) {
})
}

// shutdown implements [clientConn].
func (c *grpcClientConn) shutdown(ctx context.Context) {
// Shutdown implements [internal.ClientConn].
func (c *grpcClientConn) Shutdown(ctx context.Context) {
defer c.cc.Close()

c.mu.Lock()
Expand All @@ -255,13 +230,13 @@ func (c *grpcClientConn) shutdown(ctx context.Context) {
}
}

// close implements [clientConn].
func (c *grpcClientConn) close() error {
// Close implements [internal.ClientConn].
func (c *grpcClientConn) Close() error {
return c.cc.Close()
}

// dial implements [clientConn].
func (c *grpcClientConn) dial(
// Dial implements [internal.ClientConn].
func (c *grpcClientConn) Dial(
nodeID string,
src net.Addr,
dst net.Addr,
Expand Down Expand Up @@ -335,7 +310,7 @@ type Client struct {
cancel context.CancelFunc

config ClientConfig
conns map[string]clientConn
conns map[string]internal.ClientConn
metrics *clientMetrics
reporter *reporter
}
Expand All @@ -360,7 +335,7 @@ func NewClient(config ClientConfig) (*Client, error) {
config: config,
ctx: closeContext,
cancel: cancel,
conns: make(map[string]clientConn),
conns: make(map[string]internal.ClientConn),
metrics: metrics,
reporter: reporter,
}
Expand Down Expand Up @@ -453,7 +428,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
}

var toDelete []string
toKeep := make(map[string]clientConn)
toKeep := make(map[string]internal.ClientConn)
for id, conn := range c.conns {
proxy, ok := toDial[id]

Expand All @@ -464,7 +439,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
}

// peer address changed
if conn.peerAddr() != proxy.GetPeerAddr() {
if conn.PeerAddr() != proxy.GetPeerAddr() {
toDelete = append(toDelete, id)
continue
}
Expand All @@ -485,8 +460,8 @@ func (c *Client) updateConnections(proxies []types.Server) error {
}

// establish new connections
_, supportsQuic := proxy.GetLabel(types.ProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic)
supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes")
if err != nil {
c.metrics.reportTunnelError(errorProxyPeerTunnelDial)
c.config.Log.DebugContext(c.ctx, "error dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
Expand All @@ -503,7 +478,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {

for _, id := range toDelete {
if conn, ok := c.conns[id]; ok {
go conn.shutdown(c.ctx)
go conn.Shutdown(c.ctx)
}
}
c.conns = toKeep
Expand Down Expand Up @@ -556,9 +531,9 @@ func (c *Client) Shutdown(ctx context.Context) {
var wg sync.WaitGroup
for _, conn := range c.conns {
wg.Add(1)
go func(conn clientConn) {
go func(conn internal.ClientConn) {
defer wg.Done()
conn.shutdown(ctx)
conn.Shutdown(ctx)
}(conn)
}
wg.Wait()
Expand All @@ -572,7 +547,7 @@ func (c *Client) Stop() error {

var errs []error
for _, conn := range c.conns {
if err := conn.close(); err != nil {
if err := conn.Close(); err != nil {
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -627,7 +602,7 @@ func (c *Client) dial(

var errs []error
for _, clientConn := range conns {
conn, err := clientConn.dial(nodeID, src, dst, tunnelType)
conn, err := clientConn.Dial(nodeID, src, dst, tunnelType)
if err != nil {
errs = append(errs, trace.Wrap(err))
continue
Expand All @@ -643,13 +618,13 @@ func (c *Client) dial(
// otherwise.
// The boolean returned in the second argument is intended for testing purposes,
// to indicates whether the connection was cached or newly established.
func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
func (c *Client) getConnections(proxyIDs []string) ([]internal.ClientConn, bool, error) {
if len(proxyIDs) == 0 {
return nil, false, trace.BadParameter("failed to dial: no proxy ids given")
}

ids := make(map[string]struct{})
var conns []clientConn
var conns []internal.ClientConn

// look for existing matching connections.
c.RLock()
Expand Down Expand Up @@ -686,8 +661,8 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
continue
}

_, supportsQuic := proxy.GetLabel(types.ProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQuic)
supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel)
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes")
if err != nil {
c.metrics.reportTunnelError(errorProxyPeerTunnelDirectDial)
c.config.Log.DebugContext(c.ctx, "error direct dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
Expand All @@ -707,15 +682,15 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
defer c.Unlock()

for _, conn := range conns {
c.conns[conn.peerID()] = conn
c.conns[conn.PeerID()] = conn
}

c.config.connShuffler(conns)
return conns, false, nil
}

// connect dials a new connection to proxyAddr.
func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (clientConn, error) {
func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (internal.ClientConn, error) {
if supportsQUIC && c.config.QUICTransport != nil {
panic("QUIC proxy peering is not implemented")
}
Expand Down
3 changes: 2 additions & 1 deletion lib/proxy/peer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gravitational/teleport/api/client/proto"
clientapi "github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/proxy/peer/internal"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -208,7 +209,7 @@ func TestBackupClient(t *testing.T) {
require.True(t, dialCalled)
}

func waitForGRPCConns(t *testing.T, conns map[string]clientConn, d time.Duration) {
func waitForGRPCConns(t *testing.T, conns map[string]internal.ClientConn, d time.Duration) {
require.Eventually(t, func() bool {
for _, conn := range conns {
// panic if we hit a non-grpc client conn
Expand Down
7 changes: 3 additions & 4 deletions lib/proxy/peer/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/credentials"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/proxy/peer/internal"
"github.com/gravitational/teleport/lib/tlsca"
)

Expand Down Expand Up @@ -74,15 +75,13 @@ func (c *clientCredentials) ClientHandshake(ctx context.Context, laddr string, c
}

if err := validatePeer(c.peerID, identity); err != nil {
c.log.ErrorContext(ctx, duplicatePeerMsg, "peer_addr", c.peerAddr, "peer_id", c.peerID)
internal.LogDuplicatePeer(ctx, c.log, slog.LevelError, "peer_addr", c.peerAddr, "peer_id", c.peerID)
return nil, nil, trace.Wrap(err)
}

return conn, authInfo, nil
}

const duplicatePeerMsg = "Detected multiple Proxy Peers with the same public address when connecting to a Proxy which can lead to inconsistent state and problems establishing sessions. For best results ensure that `peer_public_addr` is unique per proxy and not a load balancer."

// getIdentity returns a [tlsca.Identity] that is created from the certificate
// presented during the TLS handshake.
func getIdentity(authInfo credentials.AuthInfo) (*tlsca.Identity, error) {
Expand Down Expand Up @@ -121,5 +120,5 @@ func validatePeer(peerID string, identity *tlsca.Identity) error {
return nil
}

return trace.AccessDenied("connected to unexpected proxy")
return trace.Wrap(internal.WrongProxyError{})
}
Loading

0 comments on commit 9bea5e1

Please sign in to comment.