Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sse config to tempo #3914

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ querier:
* [CHANGE] fix deprecation warning by switching to DoBatchWithOptions [#4343](https://github.com/grafana/tempo/pull/4343) (@dastrobu)
* [CHANGE] **BREAKING CHANGE** The Tempo serverless is now deprecated and will be removed in an upcoming release [#4017](https://github.com/grafana/tempo/pull/4017/) @electron0zero
* [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero)
**BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default,
**BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default,
please use `--v1` flag to use `/api/traces` endpoint, which was the default in previous versions.
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio)
Expand Down Expand Up @@ -181,7 +181,7 @@ querier:
* [ENHANCEMENT] Add vParquet4 support to the tempo-cli analyse blocks command [#3868](https://github.com/grafana/tempo/pull/3868) (@stoewer)
* [ENHANCEMENT] Improve trace id lookup from Tempo Vulture by selecting a date range [#3874](https://github.com/grafana/tempo/pull/3874) (@javiermolinar)
* [ENHANCEMENT] Add native histograms for internal metrics[#3870](https://github.com/grafana/tempo/pull/3870) (@zalegrala)
* [ENHANCEMENT] Expose availability-zone as a cli flag in ingester [#3881](https://github.com/grafana/tempo/pull/3881)
* [ENHANCEMENT] Expose availability-zone as a cli flag in ingester [#3881](https://github.com/grafana/tempo/pull/3881) (@KyriosGN0)
* [ENHANCEMENT] Rename batches property of Trace to ResourceSpans to be OTEL compatible [#3895](https://github.com/grafana/tempo/pull/3895)
* [ENHANCEMENT] Reduce memory consumption of query-frontend[#3888](https://github.com/grafana/tempo/pull/3888) (@joe-elliott)
* [ENHANCEMENT] Reduce log level verbosity for e2e tests[#3900](https://github.com/grafana/tempo/pull/3900) (@javiermolinar)
Expand Down Expand Up @@ -1297,4 +1297,4 @@ Additionally, default label `span_status` is renamed to `status_code`.
* [BUGFIX] S3 multi-part upload errors [#306](https://github.com/grafana/tempo/pull/325)
* [BUGFIX] Increase Prometheus `notfound` metric on tempo-vulture. [#301](https://github.com/grafana/tempo/pull/301)
* [BUGFIX] Return 404 if searching for a tenant id that does not exist in the backend. [#321](https://github.com/grafana/tempo/pull/321)
* [BUGFIX] Prune in-memory blocks from missing tenants. [#314](https://github.com/grafana/tempo/pull/314)
* [BUGFIX] Prune in-memory blocks from missing tenants. [#314](https://github.com/grafana/tempo/pull/314)
17 changes: 17 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ query_frontend:
# NOTE: Requires `duration_slo` AND `throughput_bytes_slo` to be configured.
[duration_slo: <duration> | default = 0s ]


# If set to a non-zero value, it's value will be used to decide if metadata query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]
Expand Down Expand Up @@ -1111,6 +1112,22 @@ storage:
# See the [S3 documentation on object tagging](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html) for more detail.
[tags: <map[string]string>]


[sse: <map[string]string>]:
# Optional
# Example: type: SSE-S3
# Type of encryption to use with s3 bucket, either SSE-KMS or SSE-S3
[type: string]:

# Optional
# Example: kms_key_id: "1234abcd-12ab-34cd-56ef-1234567890ab"
# the kms key id is the identification of the key in an account or region
kms_key_id:
# Optional
# Example: kms_encryption_context: "encryptionContext": {"department": "10103.0"}
# KMS Encryption Context used for object encryption. It expects JSON formatted string
kms_encryption_context:

# azure configuration. Will be used only if value of backend is "azure"
# EXPERIMENTAL
azure:
Expand Down
37 changes: 35 additions & 2 deletions tempodb/backend/s3/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package s3

import (
"errors"
"flag"
"fmt"
"strings"
"time"

"github.com/grafana/dskit/crypto/tls"
Expand All @@ -10,6 +13,31 @@ import (
"github.com/grafana/tempo/pkg/util"
)

const (
SignatureVersionV4 = "v4"
SignatureVersionV2 = "v2"

// SSEKMS config type constant to configure S3 server side encryption using KMS
// https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
SSEKMS = "SSE-KMS"

// SSES3 config type constant to configure S3 server side encryption with AES-256
// https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
SSES3 = "SSE-S3"
)

var (
supportedSSETypes = []string{SSEKMS, SSES3}

errUnsupportedSSEType = errors.New("unsupported S3 SSE type")
)

type SSEConfig struct {
Type string `yaml:"type"`
KMSKeyID string `yaml:"kms_key_id"`
KMSEncryptionContext string `yaml:"kms_encryption_context"`
}

type Config struct {
tls.ClientConfig `yaml:",inline"`

Expand All @@ -34,8 +62,9 @@ type Config struct {
Metadata map[string]string `yaml:"metadata"`
// Deprecated
// See https://github.com/grafana/tempo/pull/3006 for more details
NativeAWSAuthEnabled bool `yaml:"native_aws_auth_enabled"`
ListBlocksConcurrency int `yaml:"list_blocks_concurrency"`
NativeAWSAuthEnabled bool `yaml:"native_aws_auth_enabled"`
ListBlocksConcurrency int `yaml:"list_blocks_concurrency"`
SSE SSEConfig `yaml:"sse"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand All @@ -47,6 +76,10 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.Var(&cfg.SecretKey, util.PrefixConfig(prefix, "s3.secret_key"), "s3 secret key.")
f.Var(&cfg.SessionToken, util.PrefixConfig(prefix, "s3.session_token"), "s3 session token.")
f.IntVar(&cfg.ListBlocksConcurrency, util.PrefixConfig(prefix, "s3.list_blocks_concurrency"), 3, "number of concurrent list calls to make to backend")

f.StringVar(&cfg.SSE.Type, util.PrefixConfig(prefix, "s3.sse.type"), "", fmt.Sprintf("Enable AWS Server Side Encryption. Supported values: %s.", strings.Join(supportedSSETypes, ", ")))
f.StringVar(&cfg.SSE.KMSKeyID, util.PrefixConfig(prefix, "s3.sse.kms-key-id"), "", "KMS Key ID used to encrypt objects in S3")
f.StringVar(&cfg.SSE.KMSEncryptionContext, util.PrefixConfig(prefix, "s3.sse.kms-encryption-context"), "", "KMS Encryption Context used for object encryption. It expects JSON formatted string.")
cfg.HedgeRequestsUpTo = 2
}

Expand Down
69 changes: 62 additions & 7 deletions tempodb/backend/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/go-kit/log/level"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt"

"github.com/grafana/tempo/pkg/blockboundary"
tempo_io "github.com/grafana/tempo/pkg/io"
Expand All @@ -39,6 +41,7 @@ type readerWriter struct {
cfg *Config
core *minio.Core
hedgedCore *minio.Core
sse encrypt.ServerSide
}

var tracer = otel.Tracer("tempodb/backend/s3")
Expand Down Expand Up @@ -136,21 +139,35 @@ func internalNew(cfg *Config, confirm bool) (*readerWriter, error) {
}
}

encryption, err := buildSSEConfig(cfg)
if err != nil {
return nil, fmt.Errorf("returned Error when trying to configure Server Side Encryption: %w", err)
}

rw := &readerWriter{
logger: l,
cfg: cfg,
core: core,
hedgedCore: hedgedCore,
sse: encryption,
}

return rw, nil
}

func getPutObjectOptions(rw *readerWriter) minio.PutObjectOptions {
return minio.PutObjectOptions{
PartSize: rw.cfg.PartSize,
UserTags: rw.cfg.Tags,
StorageClass: rw.cfg.StorageClass,
UserMetadata: rw.cfg.Metadata,
PartSize: rw.cfg.PartSize,
UserTags: rw.cfg.Tags,
StorageClass: rw.cfg.StorageClass,
UserMetadata: rw.cfg.Metadata,
ServerSideEncryption: rw.sse,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect an early fatal error if the SSE is set but wrongly configured. Nevertheless, it will fail trying to write or read the blocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean erroring fataly?, at the moment the buildSSEConfig will return errUnsupportedSSEType if it is configured with unsupported SSE type, and i have added an error for missing KMSKeyID, what else am i missing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a user configuring SSE I would like to know if my configuration is not correctly set. Maybe a warning log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we've set a the config incorrectly we should error, i'll try to set up a minio locally with SSE and spin up tempo and test various configs

}

func getObjectOptions(rw *readerWriter) minio.GetObjectOptions {
return minio.GetObjectOptions{
ServerSideEncryption: rw.sse,
}
}

Expand Down Expand Up @@ -540,7 +557,8 @@ func (rw *readerWriter) ReadVersioned(ctx context.Context, name string, keypath
}

func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error) {
reader, info, _, err := rw.hedgedCore.GetObject(ctx, rw.cfg.Bucket, name, minio.GetObjectOptions{})
options := getObjectOptions(rw)
reader, info, _, err := rw.hedgedCore.GetObject(ctx, rw.cfg.Bucket, name, options)
if err != nil {
// do not change or wrap this error
// we need to compare the specific err message
Expand All @@ -552,7 +570,8 @@ func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error
}

func (rw *readerWriter) readAllWithObjInfo(ctx context.Context, name string) ([]byte, minio.ObjectInfo, error) {
reader, info, _, err := rw.hedgedCore.GetObject(ctx, rw.cfg.Bucket, name, minio.GetObjectOptions{})
options := getObjectOptions(rw)
KyriosGN0 marked this conversation as resolved.
Show resolved Hide resolved
reader, info, _, err := rw.hedgedCore.GetObject(ctx, rw.cfg.Bucket, name, options)
if err != nil && minio.ToErrorResponse(err).Code == s3.ErrCodeNoSuchKey {
return nil, minio.ObjectInfo{}, backend.ErrDoesNotExist
} else if err != nil {
Expand All @@ -568,7 +587,7 @@ func (rw *readerWriter) readAllWithObjInfo(ctx context.Context, name string) ([]
}

func (rw *readerWriter) readRange(ctx context.Context, objName string, offset int64, buffer []byte) error {
options := minio.GetObjectOptions{}
options := getObjectOptions(rw)
err := options.SetRange(offset, offset+int64(len(buffer)))
if err != nil {
return fmt.Errorf("error setting headers for range read in s3: %w", err)
Expand Down Expand Up @@ -693,3 +712,39 @@ func readError(err error) error {
}
return err
}

func parseKMSEncryptionContext(data string) (map[string]string, error) {
if data == "" {
return nil, nil
}

decoded := map[string]string{}
err := json.Unmarshal([]byte(data), &decoded)
return decoded, err
KyriosGN0 marked this conversation as resolved.
Show resolved Hide resolved
}

func buildSSEConfig(cfg *Config) (encrypt.ServerSide, error) {
switch cfg.SSE.Type {
case "":
return nil, nil
case SSEKMS:
if cfg.SSE.KMSKeyID == "" {
return nil, errors.New("KMSKeyID is missing")
} else {
encryptionCtx, err := parseKMSEncryptionContext(cfg.SSE.KMSEncryptionContext)
if err != nil {
return nil, err
}
if encryptionCtx == nil {
// To overcome a limitation in Minio which checks interface{} == nil.

return encrypt.NewSSEKMS(cfg.SSE.KMSKeyID, nil)
}
return encrypt.NewSSEKMS(cfg.SSE.KMSKeyID, encryptionCtx)
}
case SSES3:
return encrypt.NewSSE(), nil
default:
return nil, errUnsupportedSSEType
}
}
Loading