Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vnet][5] app proxying #41033

Merged
merged 13 commits into from
May 28, 2024
6 changes: 3 additions & 3 deletions integration/proxy/teleterm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ import (
wantypes "github.com/gravitational/teleport/lib/auth/webauthntypes"
"github.com/gravitational/teleport/lib/client"
libclient "github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/client/clientcache"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/service"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/teleterm/api/uri"
"github.com/gravitational/teleport/lib/teleterm/clusters"
"github.com/gravitational/teleport/lib/teleterm/daemon"
"github.com/gravitational/teleport/lib/teleterm/gateway"
"github.com/gravitational/teleport/lib/teleterm/services/clientcache"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -200,8 +200,8 @@ func testGatewayCertRenewal(ctx context.Context, t *testing.T, params gatewayCer
CreateTshdEventsClientCredsFunc: func() (grpc.DialOption, error) {
return grpc.WithTransportCredentials(insecure.NewCredentials()), nil
},
CreateClientCacheFunc: func(resolveCluster daemon.ResolveClusterFunc) daemon.ClientCache {
return clientcache.NewNoCache(clientcache.ResolveClusterFunc(resolveCluster))
CreateClientCacheFunc: func(newClient clientcache.NewClientFunc) (daemon.ClientCache, error) {
return clientcache.NewNoCache(newClient), nil
},
KubeconfigsDir: t.TempDir(),
AgentsDir: t.TempDir(),
Expand Down
40 changes: 36 additions & 4 deletions lib/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,9 @@ func RetryWithRelogin(ctx context.Context, tc *TeleportClient, fn func() error,
case !IsErrorResolvableWithRelogin(fnErr):
return trace.Wrap(fnErr)
}
opt := retryWithReloginOptions{}
opt := defaultRetryWithReloginOptions()
for _, o := range opts {
o(&opt)
o(opt)
}
log.Debugf("Activating relogin on error=%q (type=%T)", fnErr, trace.Unwrap(fnErr))

Expand Down Expand Up @@ -650,11 +650,17 @@ func RetryWithRelogin(ctx context.Context, tc *TeleportClient, fn func() error,
}

// Save profile to record proxy credentials
if err := tc.SaveProfile(true); err != nil {
if err := tc.SaveProfile(opt.makeCurrentProfile); err != nil {
log.Warningf("Failed to save profile: %v", err)
return trace.Wrap(err)
}

if opt.afterLoginHook != nil {
if err := opt.afterLoginHook(); err != nil {
return trace.Wrap(err)
}
}

return fn()
}

Expand All @@ -666,16 +672,42 @@ type RetryWithReloginOption func(*retryWithReloginOptions)
type retryWithReloginOptions struct {
// beforeLoginHook is a function that will be called before the login attempt.
beforeLoginHook func() error
// afterLoginHook is a function that will be called after a successful login.
afterLoginHook func() error
// makeCurrentProfile determines whether to update the current profile after login.
makeCurrentProfile bool
}

// WithBeforeLogin is a functional option for configuring a function that will
func defaultRetryWithReloginOptions() *retryWithReloginOptions {
return &retryWithReloginOptions{
makeCurrentProfile: true,
}
}

// WithBeforeLoginHook is a functional option for configuring a function that will
// be called before the login attempt.
func WithBeforeLoginHook(fn func() error) RetryWithReloginOption {
return func(o *retryWithReloginOptions) {
o.beforeLoginHook = fn
}
}

// WithAfterLoginHook is a functional option for configuring a function that will
// be called after a successful login.
func WithAfterLoginHook(fn func() error) RetryWithReloginOption {
return func(o *retryWithReloginOptions) {
o.afterLoginHook = fn
}
}

// WithMakeCurrentProfile is a functional option for configuring whether to update the current profile after a
// successful login.
func WithMakeCurrentProfile(makeCurrentProfile bool) RetryWithReloginOption {
return func(o *retryWithReloginOptions) {
o.makeCurrentProfile = makeCurrentProfile
}
}

// IsErrorResolvableWithRelogin returns true if relogin is attempted on `err`.
func IsErrorResolvableWithRelogin(err error) bool {
// Private key policy errors indicate that the user must login with an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,66 +27,90 @@ import (

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/teleterm/api/uri"
"github.com/gravitational/teleport/lib/teleterm/clusters"
)

// Cache stores clients keyed by cluster URI.
// Cache stores clients keyed by profile name and leaf cluster name.
// Safe for concurrent access.
// Closes all clients and wipes the cache on Clear.
type Cache struct {
cfg Config
mu sync.Mutex
// clients keep mapping between cluster URI
// (both root and leaf) and cluster clients
clients map[uri.ResourceURI]*client.ClusterClient
// group prevents duplicate requests to create clients
// for a given cluster URI
mu sync.RWMutex
// clients keeps a mapping from key (profile name and leaf cluster name) to cluster client.
clients map[key]*client.ClusterClient
// group prevents duplicate requests to create clients for a given cluster.
group singleflight.Group
}

type ResolveClusterFunc func(uri uri.ResourceURI) (*clusters.Cluster, *client.TeleportClient, error)
// NewClientFunc is a function that will return a new [*client.TeleportClient] for a given profile and leaf
// cluster. [leafClusterName] may be empty, in which case implementations should return a client for the root cluster.
type NewClientFunc func(ctx context.Context, profileName, leafClusterName string) (*client.TeleportClient, error)

// RetryWithReloginFunc is a function that should call [fn], and if it fails with an error that may be
// resolved with a cluster relogin, attempts the relogin and calls [fn] again if the relogin is successful.
type RetryWithReloginFunc func(ctx context.Context, tc *client.TeleportClient, fn func() error, opts ...client.RetryWithReloginOption) error

// Config describes the client cache configuration.
type Config struct {
ResolveClusterFunc ResolveClusterFunc
Log logrus.FieldLogger
NewClientFunc NewClientFunc
RetryWithReloginFunc RetryWithReloginFunc
Log logrus.FieldLogger
}

func (c *Config) checkAndSetDefaults() {
func (c *Config) checkAndSetDefaults() error {
if c.NewClientFunc == nil {
return trace.BadParameter("NewClientFunc is required")
}
if c.RetryWithReloginFunc == nil {
return trace.BadParameter("RetryWithReloginFunc is required")
}
if c.Log == nil {
c.Log = logrus.WithField(teleport.ComponentKey, "clientcache")
}
return nil
}

type key struct {
profile string
leafCluster string
}

func (k key) String() string {
if k.leafCluster != "" {
return k.profile + "/" + k.leafCluster
}
return k.profile
}

// New creates an instance of Cache.
func New(c Config) *Cache {
c.checkAndSetDefaults()
func New(c Config) (*Cache, error) {
if err := c.checkAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

return &Cache{
cfg: c,
clients: make(map[uri.ResourceURI]*client.ClusterClient),
}
clients: make(map[key]*client.ClusterClient),
}, nil
}

// Get returns a client from the cache if there is one,
// otherwise it dials the remote server.
// Get returns a client from the cache if there is one, otherwise it dials the remote server.
// The caller should not close the returned client.
func (c *Cache) Get(ctx context.Context, clusterURI uri.ResourceURI) (*client.ClusterClient, error) {
groupClt, err, _ := c.group.Do(clusterURI.String(), func() (any, error) {
if fromCache := c.getFromCache(clusterURI); fromCache != nil {
c.cfg.Log.WithField("cluster", clusterURI.String()).Info("Retrieved client from cache.")
func (c *Cache) Get(ctx context.Context, profileName, leafClusterName string) (*client.ClusterClient, error) {
k := key{profile: profileName, leafCluster: leafClusterName}
groupClt, err, _ := c.group.Do(k.String(), func() (any, error) {
if fromCache := c.getFromCache(k); fromCache != nil {
c.cfg.Log.WithField("cluster", k).Debug("Retrieved client from cache.")
return fromCache, nil
}

_, clusterClient, err := c.cfg.ResolveClusterFunc(clusterURI)
tc, err := c.cfg.NewClientFunc(ctx, profileName, leafClusterName)
if err != nil {
return nil, trace.Wrap(err)
}

var newClient *client.ClusterClient
if err := clusters.AddMetadataToRetryableError(ctx, func() error {
clt, err := clusterClient.ConnectToCluster(ctx)
if err := c.cfg.RetryWithReloginFunc(ctx, tc, func() error {
clt, err := tc.ConnectToCluster(ctx)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -96,12 +120,10 @@ func (c *Cache) Get(ctx context.Context, clusterURI uri.ResourceURI) (*client.Cl
return nil, trace.Wrap(err)
}

// We'll save the client in the cache, so we don't have to
// build a new connection next time.
// All cached clients will be closed when the daemon exits.
c.addToCache(clusterURI, newClient)
// Save the client in the cache, so we don't have to build a new connection next time.
c.addToCache(k, newClient)

c.cfg.Log.WithField("cluster", clusterURI.String()).Info("Added client to cache.")
c.cfg.Log.WithField("cluster", k).Info("Added client to cache.")

return newClient, nil
})
Expand All @@ -117,30 +139,28 @@ func (c *Cache) Get(ctx context.Context, clusterURI uri.ResourceURI) (*client.Cl
return clt, nil
}

// ClearForRoot closes and removes clients from the cache
// for the root cluster and its leaf clusters.
func (c *Cache) ClearForRoot(clusterURI uri.ResourceURI) error {
// ClearForRoot closes and removes clients from the cache for the root cluster and its leaf clusters.
func (c *Cache) ClearForRoot(profileName string) error {
c.mu.Lock()
defer c.mu.Unlock()

rootClusterURI := clusterURI.GetRootClusterURI()
var (
errors []error
deleted []string
)

for resourceURI, clt := range c.clients {
if resourceURI.GetRootClusterURI() == rootClusterURI {
for k, clt := range c.clients {
if k.profile == profileName {
if err := clt.Close(); err != nil {
errors = append(errors, err)
}
deleted = append(deleted, resourceURI.GetClusterURI().String())
delete(c.clients, resourceURI)
deleted = append(deleted, k.String())
delete(c.clients, k)
}
}

c.cfg.Log.WithFields(
logrus.Fields{"cluster": rootClusterURI.String(), "clients": deleted},
logrus.Fields{"cluster": profileName, "clients": deleted},
).Info("Invalidated cached clients for root cluster.")
ravicious marked this conversation as resolved.
Show resolved Hide resolved

return trace.NewAggregate(errors...)
Expand All @@ -163,18 +183,18 @@ func (c *Cache) Clear() error {
return trace.NewAggregate(errors...)
}

func (c *Cache) addToCache(clusterURI uri.ResourceURI, clusterClient *client.ClusterClient) {
func (c *Cache) addToCache(k key, clusterClient *client.ClusterClient) {
c.mu.Lock()
defer c.mu.Unlock()

c.clients[clusterURI] = clusterClient
c.clients[k] = clusterClient
}

func (c *Cache) getFromCache(clusterURI uri.ResourceURI) *client.ClusterClient {
c.mu.Lock()
defer c.mu.Unlock()
func (c *Cache) getFromCache(k key) *client.ClusterClient {
c.mu.RLock()
defer c.mu.RUnlock()

clt := c.clients[clusterURI]
clt := c.clients[k]
return clt
}

Expand All @@ -183,24 +203,24 @@ func (c *Cache) getFromCache(clusterURI uri.ResourceURI) *client.ClusterClient {
//
// ClearForRoot and Clear still work as expected.
type NoCache struct {
mu sync.Mutex
resolveClusterFunc ResolveClusterFunc
clients []noCacheClient
mu sync.Mutex
newClientFunc NewClientFunc
clients []noCacheClient
}

type noCacheClient struct {
uri uri.ResourceURI
k key
client *client.ClusterClient
}

func NewNoCache(resolveClusterFunc ResolveClusterFunc) *NoCache {
func NewNoCache(newClientFunc NewClientFunc) *NoCache {
return &NoCache{
resolveClusterFunc: resolveClusterFunc,
newClientFunc: newClientFunc,
}
}

func (c *NoCache) Get(ctx context.Context, clusterURI uri.ResourceURI) (*client.ClusterClient, error) {
_, clusterClient, err := c.resolveClusterFunc(clusterURI)
func (c *NoCache) Get(ctx context.Context, profileName, leafClusterName string) (*client.ClusterClient, error) {
clusterClient, err := c.newClientFunc(ctx, profileName, leafClusterName)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -212,25 +232,24 @@ func (c *NoCache) Get(ctx context.Context, clusterURI uri.ResourceURI) (*client.

c.mu.Lock()
c.clients = append(c.clients, noCacheClient{
uri: clusterURI,
k: key{profile: profileName, leafCluster: leafClusterName},
client: newClient,
})
c.mu.Unlock()

return newClient, nil
}

func (c *NoCache) ClearForRoot(clusterURI uri.ResourceURI) error {
func (c *NoCache) ClearForRoot(profileName string) error {
c.mu.Lock()
defer c.mu.Unlock()

rootClusterURI := clusterURI.GetRootClusterURI()
var (
errors []error
)

c.clients = slices.DeleteFunc(c.clients, func(ncc noCacheClient) bool {
belongsToCluster := ncc.uri.GetRootClusterURI() == rootClusterURI
belongsToCluster := ncc.k.profile == profileName

if belongsToCluster {
if err := ncc.client.Close(); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions lib/client/local_proxy_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"time"

"github.com/gravitational/trace"
Expand Down Expand Up @@ -76,7 +75,7 @@ func NewAppCertChecker(tc *TeleportClient, appRoute proto.RouteToApp, clock cloc

// OnNewConnection is a callback triggered when a new downstream connection is
// accepted by the local proxy.
func (c *CertChecker) OnNewConnection(ctx context.Context, lp *alpnproxy.LocalProxy, conn net.Conn) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's possible to extract local VNet app address from net.Conn, it'd be beneficial to include it here. When I was adding telemetry, I ran into an issue where I cannot tell the user which app they were trying to access, because types.Application alone doesn't have this info obviously.

OTOH, wouldn't net.Conn here in the context of VNet be the connection between a local proxy and the proxy service? 🤔 So it still wouldn't tell us whether someone tried to open app.cluster.com or app.custom-dns-zone.com.

Anyway, I can handle adding this down the line when I'll be working on showing recent connections in Connect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "I cannot tell the user which app they were trying to access, because types.Application alone doesn't have this info obviously"? Seems like you have all the info you'd need about which app is being accessed here #41587

but no, i don't think you could get any extra info from the net.Conn here other than the client-side and vnet IP address, but I don't see why we'd need those. We wouldn't be able to get the DNS name the user is trying to connect to from the conn

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should've said "which DNS name" and not "which app they were trying to access".

Displaying the DNS name will be useful once I get around to showing connections in the UI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only valid DNS name is the app's public_addr though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about edge cases such as an app with a custom public_addr, say example.com, being accessed over app.cluster.com anyways – this should still work, right? In that, types.Application.GetPublicAddr would return example.com instead of app.cluster.com.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't work, we only query for apps by their public_addr

func (c *CertChecker) OnNewConnection(ctx context.Context, lp *alpnproxy.LocalProxy) error {
return trace.Wrap(c.ensureValidCerts(ctx, lp))
}

Expand Down
Loading
Loading