Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add objstore support for Swift using thanos.io/objstore #11672

Merged
merged 35 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0a946a2
add TLS support for swift
btaani Jan 12, 2024
88df54d
fix typo
btaani Jan 12, 2024
5f821dd
fix error from unit test
btaani Jan 12, 2024
749f64b
Merge branch 'main' into swift-tls
btaani Jan 12, 2024
f5bcc9c
Merge branch 'main' into swift-tls
btaani Jan 16, 2024
06ef15b
Merge branch 'main' into swift-tls
btaani Jan 18, 2024
fb562c4
remove condition on transport type
btaani Jan 18, 2024
b1b203b
remove duplicate initialization
btaani Jan 18, 2024
7b23e51
modify swift config in object client
btaani Feb 5, 2024
d5ad3e7
use bucket_http config in swift bucket client
btaani Feb 5, 2024
6b70293
Merge branch 'main' into swift-tls
btaani Feb 5, 2024
6a266e0
add swift thanos object client implementation
btaani Feb 9, 2024
fd98f34
undo docs change
btaani Feb 13, 2024
959aa77
change all objectClient implementations
btaani Feb 13, 2024
05acfd9
add hedged client to swift thanos client
btaani Feb 13, 2024
ce0c7e6
add hedged client to swift thanos client
btaani Feb 13, 2024
3659f19
Merge branch 'main' into swift-tls
btaani Feb 14, 2024
7fe24b8
update thanos-io/objstore
btaani Feb 14, 2024
53fd4ee
add TLS support
btaani Feb 14, 2024
b02afa8
Merge branch 'main' into swift-tls
btaani Feb 15, 2024
0c6526e
Merge branch 'main' into swift-tls
btaani Mar 4, 2024
5a511b9
Merge branch 'main' into swift-tls
JoaoBraveCoding Oct 30, 2024
bc2eeb8
chore: Move Swift client to new ObjectClientAdapter
JoaoBraveCoding Oct 30, 2024
e64f09e
docs: fixed docs
JoaoBraveCoding Oct 30, 2024
11970b9
Merge branch 'main' into swift-tls
JoaoBraveCoding Oct 30, 2024
380c0b8
fix: fix tests
JoaoBraveCoding Oct 30, 2024
61c50f3
address review comments
JoaoBraveCoding Nov 11, 2024
776c5c4
make s3 use http config
JoaoBraveCoding Nov 11, 2024
99a0cf3
Merge branch 'main' into swift-tls
JoaoBraveCoding Nov 15, 2024
32f92fe
address review comments and fix merge result
JoaoBraveCoding Nov 15, 2024
ad09648
Duplicate Swift config to old client
JoaoBraveCoding Nov 15, 2024
bc652bd
Merge branch 'main' into swift-tls
ashwanthgoli Nov 25, 2024
340b953
fix build
ashwanthgoli Nov 25, 2024
fb3f953
make format
ashwanthgoli Nov 25, 2024
dd26273
ensure cli flags are same as existing ones
ashwanthgoli Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5921,6 +5921,12 @@ The `swift_storage_config` block configures the connection to OpenStack Object S
# is received on a request.
# CLI flag: -<prefix>.swift.request-timeout
[request_timeout: <duration> | default = 5s]

http:
# Path to the CA certificates to validate server certificate against. If not
# set, the host's root CA certificates are used.
# CLI flag: -<prefix>.swift.http.tls-ca-path
[tls_ca_path: <string> | default = ""]
```

### table_manager
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ memberlist:
assert.Equal(t, "example.com", actual.UserDomainName)
assert.Equal(t, "1", actual.UserDomainID)
assert.Equal(t, "27", actual.UserID)
assert.Equal(t, "supersecret", actual.Password)
assert.Equal(t, flagext.SecretWithValue("supersecret"), actual.Password)
assert.Equal(t, "2", actual.DomainID)
assert.Equal(t, "test.com", actual.DomainName)
assert.Equal(t, "13", actual.ProjectID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) erro
case Azure:
cfg.Azure.Transport = rt
case Swift:
cfg.Swift.Transport = rt
cfg.Swift.HTTP.Transport = rt
case Filesystem:
// do nothing
default:
Expand Down
26 changes: 24 additions & 2 deletions pkg/storage/bucket/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"flag"
"net/http"
"time"
)

Expand All @@ -10,12 +11,25 @@ type Config struct {
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`

TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`
ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
MaxIdleConns int `yaml:"max_idle_connections"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host"`
MaxConnsPerHost int `yaml:"max_connections_per_host"`
CAFile string `yaml:"ca_file"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`

TLSConfig TLSConfig `yaml:",inline"`
}

// TLSConfig configures the options for TLS connections.
type TLSConfig struct {
CAPath string `yaml:"tls_ca_path" category:"advanced"`
CertPath string `yaml:"tls_cert_path" category:"advanced"`
KeyPath string `yaml:"tls_key_path" category:"advanced"`
ServerName string `yaml:"tls_server_name" category:"advanced"`
}

// RegisterFlags registers the flags for the storage HTTP client.
Expand All @@ -33,5 +47,13 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxIdleConns, prefix+"max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
f.StringVar(&cfg.CAFile, prefix+"ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the object storage endpoint.")
cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f)
}

// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix.
func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.CAPath, prefix+"http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
f.StringVar(&cfg.CertPath, prefix+"http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.")
f.StringVar(&cfg.KeyPath, prefix+"http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.")
f.StringVar(&cfg.ServerName, prefix+"http.tls-server-name", "", "Override the expected name on the server certificate.")
}
53 changes: 3 additions & 50 deletions pkg/storage/bucket/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ import (
"encoding/json"
"flag"
"fmt"
"net/http"
"slices"
"strings"
"time"

s3_service "github.com/aws/aws-sdk-go/service/s3"
"github.com/grafana/dskit/flagext"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/thanos-io/objstore/providers/s3"

"github.com/grafana/loki/v3/pkg/storage/bucket/http"
"github.com/grafana/loki/v3/pkg/util"
)

Expand Down Expand Up @@ -55,52 +54,6 @@ func thanosS3BucketLookupTypesValues() (list []string) {
return list
}

// HTTPConfig stores the http.Transport configuration for the s3 minio client.
type HTTPConfig struct {
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout" category:"advanced"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout" category:"advanced"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify" category:"advanced"`
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout" category:"advanced"`
ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout" category:"advanced"`
MaxIdleConns int `yaml:"max_idle_connections" category:"advanced"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"`
MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`

TLSConfig TLSConfig `yaml:",inline"`
}

// TLSConfig configures the options for TLS connections.
type TLSConfig struct {
CAPath string `yaml:"tls_ca_path" category:"advanced"`
CertPath string `yaml:"tls_cert_path" category:"advanced"`
KeyPath string `yaml:"tls_key_path" category:"advanced"`
ServerName string `yaml:"tls_server_name" category:"advanced"`
}

// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "If the client connects to S3 via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"s3.tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.")
f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"s3.expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.")
f.IntVar(&cfg.MaxIdleConns, prefix+"s3.max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"s3.max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"s3.max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f)
}

// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix.
func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.CAPath, prefix+"s3.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
f.StringVar(&cfg.CertPath, prefix+"s3.http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.")
f.StringVar(&cfg.KeyPath, prefix+"s3.http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.")
f.StringVar(&cfg.ServerName, prefix+"s3.http.tls-server-name", "", "Override the expected name on the server certificate.")
}

// Config holds the config options for an S3 backend
type Config struct {
Endpoint string `yaml:"endpoint"`
Expand All @@ -121,7 +74,7 @@ type Config struct {
MaxRetries int `yaml:"max_retries"`

SSE SSEConfig `yaml:"sse"`
HTTP HTTPConfig `yaml:"http"`
HTTP http.Config `yaml:"http"`
TraceConfig TraceConfig `yaml:"trace"`
}

Expand Down Expand Up @@ -149,7 +102,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.")
f.IntVar(&cfg.MaxRetries, prefix+"s3.max-retries", 10, "The maximum number of retries for S3 requests that are retryable. Default is 10, set this to 1 to disable retries.")
cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f)
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
cfg.HTTP.RegisterFlagsWithPrefix(prefix+"s3.", f)
cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f)
}

Expand Down
58 changes: 38 additions & 20 deletions pkg/storage/bucket/swift/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,49 @@ import (
// NewBucketClient creates a new Swift bucket client
func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := swift.Config{
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Username: cfg.Username,
UserDomainName: cfg.UserDomainName,
UserDomainID: cfg.UserDomainID,
UserId: cfg.UserID,
Password: cfg.Password,
DomainId: cfg.DomainID,
DomainName: cfg.DomainName,
ProjectID: cfg.ProjectID,
ProjectName: cfg.ProjectName,
ProjectDomainID: cfg.ProjectDomainID,
ProjectDomainName: cfg.ProjectDomainName,
RegionName: cfg.RegionName,
ContainerName: cfg.ContainerName,
Retries: cfg.MaxRetries,
ConnectTimeout: model.Duration(cfg.ConnectTimeout),
Timeout: model.Duration(cfg.RequestTimeout),
ApplicationCredentialID: cfg.ApplicationCredentialID,
ApplicationCredentialName: cfg.ApplicationCredentialName,
ApplicationCredentialSecret: cfg.ApplicationCredentialSecret.String(),
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Username: cfg.Username,
UserDomainName: cfg.UserDomainName,
UserDomainID: cfg.UserDomainID,
UserId: cfg.UserID,
Password: cfg.Password.String(),
DomainId: cfg.DomainID,
DomainName: cfg.DomainName,
ProjectID: cfg.ProjectID,
ProjectName: cfg.ProjectName,
ProjectDomainID: cfg.ProjectDomainID,
ProjectDomainName: cfg.ProjectDomainName,
RegionName: cfg.RegionName,
ContainerName: cfg.ContainerName,
Retries: cfg.MaxRetries,
ConnectTimeout: model.Duration(cfg.ConnectTimeout),
Timeout: model.Duration(cfg.RequestTimeout),
HTTPConfig: exthttp.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout),
InsecureSkipVerify: cfg.HTTP.InsecureSkipVerify,
TLSHandshakeTimeout: model.Duration(cfg.HTTP.TLSHandshakeTimeout),
ExpectContinueTimeout: model.Duration(cfg.HTTP.ExpectContinueTimeout),
MaxIdleConns: cfg.HTTP.MaxIdleConns,
MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost,
Transport: cfg.HTTP.Transport,
TLSConfig: exthttp.TLSConfig{
CAFile: cfg.HTTP.TLSConfig.CAPath,
CertFile: cfg.HTTP.TLSConfig.CertPath,
KeyFile: cfg.HTTP.TLSConfig.KeyPath,
ServerName: cfg.HTTP.TLSConfig.ServerName,
},
},

// Hard-coded defaults.
ChunkSize: swift.DefaultConfig.ChunkSize,
UseDynamicLargeObjects: false,
HTTPConfig: exthttp.DefaultHTTPConfig,
}
bucketConfig.HTTPConfig.Transport = cfg.Transport

return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil)
}
60 changes: 31 additions & 29 deletions pkg/storage/bucket/swift/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,37 @@ package swift

import (
"flag"
"net/http"
"time"

"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/v3/pkg/storage/bucket/http"
)

// Config holds the config options for Swift backend
type Config struct {
AuthVersion int `yaml:"auth_version"`
AuthURL string `yaml:"auth_url"`
Internal bool `yaml:"internal"`
Username string `yaml:"username"`
UserDomainName string `yaml:"user_domain_name"`
UserDomainID string `yaml:"user_domain_id"`
UserID string `yaml:"user_id"`
Password string `yaml:"password"`
DomainID string `yaml:"domain_id"`
DomainName string `yaml:"domain_name"`
ProjectID string `yaml:"project_id"`
ProjectName string `yaml:"project_name"`
ProjectDomainID string `yaml:"project_domain_id"`
ProjectDomainName string `yaml:"project_domain_name"`
RegionName string `yaml:"region_name"`
ContainerName string `yaml:"container_name"`
MaxRetries int `yaml:"max_retries"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
RequestTimeout time.Duration `yaml:"request_timeout"`

// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
ApplicationCredentialID string `yaml:"application_credential_id"`
ApplicationCredentialName string `yaml:"application_credential_name"`
ApplicationCredentialSecret flagext.Secret `yaml:"application_credential_secret"`
AuthVersion int `yaml:"auth_version"`
AuthURL string `yaml:"auth_url"`
Username string `yaml:"username"`
UserDomainName string `yaml:"user_domain_name"`
UserDomainID string `yaml:"user_domain_id"`
UserID string `yaml:"user_id"`
Password flagext.Secret `yaml:"password"`
DomainID string `yaml:"domain_id"`
DomainName string `yaml:"domain_name"`
ProjectID string `yaml:"project_id"`
ProjectName string `yaml:"project_name"`
ProjectDomainID string `yaml:"project_domain_id"`
ProjectDomainName string `yaml:"project_domain_name"`
RegionName string `yaml:"region_name"`
ContainerName string `yaml:"container_name"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
ConnectTimeout time.Duration `yaml:"connect_timeout" category:"advanced"`
RequestTimeout time.Duration `yaml:"request_timeout" category:"advanced"`
HTTP http.Config `yaml:"http"`
}

// RegisterFlags registers the flags for Swift storage
Expand All @@ -39,14 +42,16 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// RegisterFlagsWithPrefix registers the flags for Swift storage with the provided prefix
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.ApplicationCredentialID, prefix+"swift.application-credential-id", "", "OpenStack Swift application credential id")
f.StringVar(&cfg.ApplicationCredentialName, prefix+"swift.application-credential-name", "", "OpenStack Swift application credential name")
f.Var(&cfg.ApplicationCredentialSecret, prefix+"swift.application-credential-secret", "OpenStack Swift application credential secret")
f.IntVar(&cfg.AuthVersion, prefix+"swift.auth-version", 0, "OpenStack Swift authentication API version. 0 to autodetect.")
f.StringVar(&cfg.AuthURL, prefix+"swift.auth-url", "", "OpenStack Swift authentication URL")
f.BoolVar(&cfg.Internal, prefix+"swift.internal", false, "Set this to true to use the internal OpenStack Swift endpoint URL")
f.StringVar(&cfg.Username, prefix+"swift.username", "", "OpenStack Swift username.")
f.StringVar(&cfg.UserDomainName, prefix+"swift.user-domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.UserDomainID, prefix+"swift.user-domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.UserID, prefix+"swift.user-id", "", "OpenStack Swift user ID.")
f.StringVar(&cfg.Password, prefix+"swift.password", "", "OpenStack Swift API key.")
f.Var(&cfg.Password, prefix+"swift.password", "OpenStack Swift API key.")
f.StringVar(&cfg.DomainID, prefix+"swift.domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.DomainName, prefix+"swift.domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.ProjectID, prefix+"swift.project-id", "", "OpenStack Swift project ID (v2,v3 auth only).")
Expand All @@ -58,8 +63,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.")
f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.")
f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.")
}

func (cfg *Config) Validate() error {
return nil
cfg.HTTP.RegisterFlagsWithPrefix(prefix+"swift.", f)
}
Loading
Loading