From f84405d5d333d9a9c09ff7b98d47437a1af09ad5 Mon Sep 17 00:00:00 2001
From: rosstimothy <39066650+rosstimothy@users.noreply.github.com>
Date: Wed, 13 Nov 2024 11:30:19 -0500
Subject: [PATCH] Resolve and log endpoints of AWS services (#48710)
Includes the AWS endpoints in log output to allow easier
inspection of which URIs are being hit based on configuration.
Closes https://github.com/gravitational/customer-sensitive-requests/issues/333.
---
lib/backend/dynamo/dynamodbbk.go | 37 ++++++-
lib/events/dynamoevents/dynamoevents.go | 24 ++++-
lib/events/s3sessions/s3handler.go | 32 ++++++-
lib/utils/aws/endpoint/resolver.go | 75 +++++++++++++++
lib/utils/aws/endpoint/resolver_test.go | 122 ++++++++++++++++++++++++
5 files changed, 280 insertions(+), 10 deletions(-)
create mode 100644 lib/utils/aws/endpoint/resolver.go
create mode 100644 lib/utils/aws/endpoint/resolver_test.go
diff --git a/lib/backend/dynamo/dynamodbbk.go b/lib/backend/dynamo/dynamodbbk.go
index 0456c29924b09..44f9774584b7e 100644
--- a/lib/backend/dynamo/dynamodbbk.go
+++ b/lib/backend/dynamo/dynamodbbk.go
@@ -49,6 +49,7 @@ import (
"github.com/gravitational/teleport/lib/modules"
awsmetrics "github.com/gravitational/teleport/lib/observability/metrics/aws"
dynamometrics "github.com/gravitational/teleport/lib/observability/metrics/dynamo"
+ "github.com/gravitational/teleport/lib/utils/aws/endpoint"
)
func init() {
@@ -270,7 +271,18 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) {
return nil, trace.Wrap(err)
}
- var dynamoOpts []func(*dynamodb.Options)
+ dynamoResolver, err := endpoint.NewLoggingResolver(
+ dynamodb.NewDefaultEndpointResolverV2(),
+ l.With(slog.Group("service",
+ "id", dynamodb.ServiceID,
+ "api_version", dynamodb.ServiceAPIVersion,
+ )),
+ )
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ dynamoOpts := []func(*dynamodb.Options){dynamodb.WithEndpointResolverV2(dynamoResolver)}
// FIPS settings are applied on the individual service instead of the aws config,
// as DynamoDB Streams and Application Auto Scaling do not yet have FIPS endpoints in non-GovCloud.
@@ -283,19 +295,38 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) {
otelaws.AppendMiddlewares(&awsConfig.APIOptions, otelaws.WithAttributeSetter(otelaws.DynamoDBAttributeSetter))
+ dynamoClient := dynamodb.NewFromConfig(awsConfig, dynamoOpts...)
+
+ streamsResolver, err := endpoint.NewLoggingResolver(
+ dynamodbstreams.NewDefaultEndpointResolverV2(),
+ l.With(slog.Group("service",
+ "id", dynamodbstreams.ServiceID,
+ "api_version", dynamodbstreams.ServiceAPIVersion,
+ )),
+ )
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ streamsClient := dynamodbstreams.NewFromConfig(awsConfig, dynamodbstreams.WithEndpointResolverV2(streamsResolver))
b := &Backend{
logger: l,
Config: *cfg,
clock: clockwork.NewRealClock(),
buf: backend.NewCircularBuffer(backend.BufferCapacity(cfg.BufferSize)),
- svc: dynamodb.NewFromConfig(awsConfig, dynamoOpts...),
- streams: dynamodbstreams.NewFromConfig(awsConfig),
+ svc: dynamoClient,
+ streams: streamsClient,
}
if err := b.configureTable(ctx, applicationautoscaling.NewFromConfig(awsConfig)); err != nil {
return nil, trace.Wrap(err)
}
+ l.InfoContext(ctx, "Connection established to DynamoDB state database",
+ "table", cfg.TableName,
+ "region", cfg.Region,
+ )
+
go func() {
if err := b.asyncPollStreams(ctx); err != nil {
b.logger.ErrorContext(ctx, "Stream polling loop exited", "error", err)
diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go
index dcfccc7713e20..992c0b6118016 100644
--- a/lib/events/dynamoevents/dynamoevents.go
+++ b/lib/events/dynamoevents/dynamoevents.go
@@ -60,6 +60,7 @@ import (
awsmetrics "github.com/gravitational/teleport/lib/observability/metrics/aws"
dynamometrics "github.com/gravitational/teleport/lib/observability/metrics/dynamo"
"github.com/gravitational/teleport/lib/utils"
+ "github.com/gravitational/teleport/lib/utils/aws/endpoint"
)
const (
@@ -264,7 +265,7 @@ const (
// It's an implementation of backend API's NewFunc
func New(ctx context.Context, cfg Config) (*Log, error) {
l := slog.With(teleport.ComponentKey, teleport.ComponentDynamoDB)
- l.InfoContext(ctx, "Initializing event backend.")
+ l.InfoContext(ctx, "Initializing event backend")
err := cfg.CheckAndSetDefaults()
if err != nil {
@@ -288,7 +289,18 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
opts = append(opts, config.WithCredentialsProvider(cfg.CredentialsProvider))
}
- var dynamoOpts []func(*dynamodb.Options)
+ resolver, err := endpoint.NewLoggingResolver(
+ dynamodb.NewDefaultEndpointResolverV2(),
+ l.With(slog.Group("service",
+ "id", dynamodb.ServiceID,
+ "api_version", dynamodb.ServiceAPIVersion,
+ )),
+ )
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ dynamoOpts := []func(*dynamodb.Options){dynamodb.WithEndpointResolverV2(resolver)}
// Override the service endpoint using the "endpoint" query parameter from
// "audit_events_uri". This is for non-AWS DynamoDB-compatible backends.
@@ -316,16 +328,22 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
otelaws.AppendMiddlewares(&awsConfig.APIOptions, otelaws.WithAttributeSetter(otelaws.DynamoDBAttributeSetter))
+ client := dynamodb.NewFromConfig(awsConfig, dynamoOpts...)
b := &Log{
logger: l,
Config: cfg,
- svc: dynamodb.NewFromConfig(awsConfig, dynamoOpts...),
+ svc: client,
}
if err := b.configureTable(ctx, applicationautoscaling.NewFromConfig(awsConfig)); err != nil {
return nil, trace.Wrap(err)
}
+ l.InfoContext(ctx, "Connection established to DynamoDB events database",
+ "table", cfg.Tablename,
+ "region", cfg.Region,
+ )
+
return b, nil
}
diff --git a/lib/events/s3sessions/s3handler.go b/lib/events/s3sessions/s3handler.go
index b202e78422c8d..e29056806a484 100644
--- a/lib/events/s3sessions/s3handler.go
+++ b/lib/events/s3sessions/s3handler.go
@@ -47,6 +47,7 @@ import (
awsmetrics "github.com/gravitational/teleport/lib/observability/metrics/aws"
"github.com/gravitational/teleport/lib/session"
awsutils "github.com/gravitational/teleport/lib/utils/aws"
+ "github.com/gravitational/teleport/lib/utils/aws/endpoint"
)
// s3AllowedACL is the set of canned ACLs that S3 accepts
@@ -164,6 +165,7 @@ func NewHandler(ctx context.Context, cfg Config) (*Handler, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
+ logger := slog.With(teleport.ComponentKey, teleport.SchemeS3)
opts := []func(*config.LoadOptions) error{
config.WithRegion(cfg.Region),
@@ -190,7 +192,19 @@ func NewHandler(ctx context.Context, cfg Config) (*Handler, error) {
opts = append(opts, config.WithAPIOptions(awsmetrics.MetricsMiddleware()))
- var s3Opts []func(*s3.Options)
+ resolver, err := endpoint.NewLoggingResolver(
+ s3.NewDefaultEndpointResolverV2(),
+ logger.With(slog.Group("service",
+ "id", s3.ServiceID,
+ "api_version", s3.ServiceAPIVersion,
+ )),
+ )
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ s3Opts := []func(*s3.Options){s3.WithEndpointResolverV2(resolver)}
+
if cfg.Endpoint != "" {
if _, err := url.Parse(cfg.Endpoint); err != nil {
return nil, trace.BadParameter("configured S3 endpoint is invalid: %s", err.Error())
@@ -221,7 +235,7 @@ func NewHandler(ctx context.Context, cfg Config) (*Handler, error) {
downloader := manager.NewDownloader(client)
h := &Handler{
- logger: slog.With(teleport.ComponentKey, teleport.SchemeS3),
+ logger: logger,
Config: cfg,
uploader: uploader,
downloader: downloader,
@@ -229,11 +243,21 @@ func NewHandler(ctx context.Context, cfg Config) (*Handler, error) {
}
start := time.Now()
- h.logger.InfoContext(ctx, "Setting up S3 bucket", "bucket", h.Bucket, "path", h.Path, "region", h.Region)
+ h.logger.InfoContext(ctx, "Setting up S3 bucket",
+ "bucket", h.Bucket,
+ "path", h.Path,
+ "region", h.Region,
+ )
if err := h.ensureBucket(ctx); err != nil {
return nil, trace.Wrap(err)
}
- h.logger.InfoContext(ctx, "Setting up bucket S3 completed.", "bucket", h.Bucket, "duration", time.Since(start))
+
+ h.logger.InfoContext(ctx, "Setting up bucket S3 completed",
+ "bucket", h.Bucket,
+ "path", h.Path,
+ "region", h.Region,
+ "duration", time.Since(start),
+ )
return h, nil
}
diff --git a/lib/utils/aws/endpoint/resolver.go b/lib/utils/aws/endpoint/resolver.go
new file mode 100644
index 0000000000000..8d5c96c6d510e
--- /dev/null
+++ b/lib/utils/aws/endpoint/resolver.go
@@ -0,0 +1,75 @@
+// Teleport
+// Copyright (C) 2024 Gravitational, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package endpoint
+
+import (
+ "context"
+ "log/slog"
+ "sync/atomic"
+
+ smithyendpoints "github.com/aws/smithy-go/endpoints"
+ "github.com/gravitational/trace"
+)
+
+// Resolver is a generalized version of an EndpointResolverV2 for
+// services in aws-sdk-go-v2. The generic parameter MUST match the
+// EndpointParameters of the aws service.
+type Resolver[P any] interface {
+ ResolveEndpoint(context.Context, P) (smithyendpoints.Endpoint, error)
+}
+
+// LoggingResolver is a [Resolver] implementation that logs
+// the resolved endpoint of the aws service. It will only
+// log the endpoint one the first resolution, and any later
+// resolutions in which the endpoint has changed to prevent
+// excess log spam.
+type LoggingResolver[P any] struct {
+ inner Resolver[P]
+ logger *slog.Logger
+
+ last atomic.Pointer[string]
+}
+
+// NewLoggingResolver creates a [LoggingResolver] that defers to the
+// provided [Resolver] to do the resolution.
+func NewLoggingResolver[P any](r Resolver[P], logger *slog.Logger) (*LoggingResolver[P], error) {
+ if r == nil {
+ return nil, trace.BadParameter("a valid resolver must be provided to the LoggingResolver")
+ }
+
+ if logger == nil {
+ logger = slog.Default()
+ }
+
+ return &LoggingResolver[P]{inner: r, logger: logger}, nil
+}
+
+// ResolveEndpoint implements [Resolver].
+func (r *LoggingResolver[P]) ResolveEndpoint(ctx context.Context, params P) (smithyendpoints.Endpoint, error) {
+ endpoint, err := r.inner.ResolveEndpoint(ctx, params)
+ if err != nil {
+ return endpoint, err
+ }
+
+ uri := endpoint.URI.String()
+ last := r.last.Swap(&uri)
+ if last == nil || *last != uri {
+ r.logger.InfoContext(ctx, "resolved endpoint for aws service", "uri", uri)
+ }
+
+ return endpoint, nil
+}
diff --git a/lib/utils/aws/endpoint/resolver_test.go b/lib/utils/aws/endpoint/resolver_test.go
new file mode 100644
index 0000000000000..3c542be884d14
--- /dev/null
+++ b/lib/utils/aws/endpoint/resolver_test.go
@@ -0,0 +1,122 @@
+// Teleport
+// Copyright (C) 2024 Gravitational, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package endpoint_test
+
+import (
+ "context"
+ "log/slog"
+ "net/url"
+ "slices"
+ "sync"
+ "testing"
+ "time"
+
+ smithyendpoints "github.com/aws/smithy-go/endpoints"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/gravitational/teleport/lib/utils/aws/endpoint"
+)
+
+type fakeResolver[P any] struct {
+ e smithyendpoints.Endpoint
+}
+
+func (f *fakeResolver[P]) ResolveEndpoint(ctx context.Context, params P) (smithyendpoints.Endpoint, error) {
+ return f.e, nil
+}
+
+type fakeHandler struct {
+ mu sync.Mutex
+ resolved []string
+ slog.Handler
+}
+
+func (*fakeHandler) Enabled(context.Context, slog.Level) bool {
+ return true
+}
+
+func (h *fakeHandler) Handle(ctx context.Context, r slog.Record) error {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+ r.Attrs(func(attr slog.Attr) bool {
+ if attr.Key != "uri" {
+ return true
+ }
+
+ h.resolved = append(h.resolved, attr.Value.String())
+ return false
+ })
+
+ return nil
+}
+
+func (h *fakeHandler) Resolved() []string {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+
+ return slices.Clone(h.resolved)
+}
+
+func TestResolution(t *testing.T) {
+ ctx := context.Background()
+
+ handler := &fakeHandler{}
+
+ expected := smithyendpoints.Endpoint{URI: url.URL{Scheme: "https", Host: "example.com"}}
+ fake := &fakeResolver[string]{e: expected}
+ r, err := endpoint.NewLoggingResolver(fake, slog.New(handler))
+ require.NoError(t, err)
+
+ // Resolve the same endpoint several times and validate that
+ // it is only emitted once.
+ var wg sync.WaitGroup
+ barrier := make(chan struct{})
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ <-barrier
+ resolved, err := r.ResolveEndpoint(ctx, "test")
+ assert.NoError(t, err)
+ assert.Equal(t, expected, resolved)
+
+ switch res := handler.Resolved(); len(res) {
+ case 0:
+ assert.EventuallyWithT(t, func(t *assert.CollectT) {
+ assert.Equal(t, []string{expected.URI.String()}, handler.Resolved())
+ }, 5*time.Second, 100*time.Millisecond)
+ case 1:
+ assert.Equal(t, []string{expected.URI.String()}, res)
+ }
+ }()
+ }
+
+ close(barrier)
+ wg.Wait()
+
+ // Alter the resolved endpoint
+ expected = smithyendpoints.Endpoint{URI: url.URL{Scheme: "test", Host: "example.com"}}
+ fake.e = expected
+
+ // Resolve again and validate that the new endpoint is emitted
+ resolved, err := r.ResolveEndpoint(ctx, "test")
+ require.NoError(t, err)
+ require.Equal(t, expected, resolved)
+ require.Equal(t, []string{"https://example.com", "test://example.com"}, handler.Resolved())
+}