diff --git a/CHANGELOG.md b/CHANGELOG.md index 90b29a88..fbccc79d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#92](https://github.com/thanos-io/objstore/pull/92) GCS: Allow using a gRPC client. - [#94](https://github.com/thanos-io/objstore/pull/94) Allow timingReadCloser to be seeker - [#96](https://github.com/thanos-io/objstore/pull/96) Allow nopCloserWithObjectSize to be seeker +- [#86](https://github.com/thanos-io/objstore/pull/86) GCS: Add HTTP Config to GCS ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. diff --git a/exthttp/transport.go b/exthttp/transport.go index d9a807fd..92f86a11 100644 --- a/exthttp/transport.go +++ b/exthttp/transport.go @@ -11,6 +11,16 @@ import ( "github.com/prometheus/common/model" ) +var DefaultHTTPConfig = HTTPConfig{ + IdleConnTimeout: model.Duration(90 * time.Second), + ResponseHeaderTimeout: model.Duration(2 * time.Minute), + TLSHandshakeTimeout: model.Duration(10 * time.Second), + ExpectContinueTimeout: model.Duration(1 * time.Second), + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + MaxConnsPerHost: 0, +} + // HTTPConfig stores the http.Transport configuration for the cos and s3 minio client. type HTTPConfig struct { IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"` diff --git a/providers/gcs/gcs.go b/providers/gcs/gcs.go index 1d717b77..1b9c78c2 100644 --- a/providers/gcs/gcs.go +++ b/providers/gcs/gcs.go @@ -8,9 +8,11 @@ import ( "context" "fmt" "io" + "net/http" "runtime" "strings" "testing" + "time" "cloud.google.com/go/storage" "github.com/go-kit/log" @@ -19,17 +21,23 @@ import ( "golang.org/x/oauth2/google" "google.golang.org/api/iterator" "google.golang.org/api/option" + htransport "google.golang.org/api/transport/http" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "gopkg.in/yaml.v2" "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/exthttp" ) // DirDelim is the delimiter used to model a directory structure in an object store bucket. const DirDelim = "/" +var DefaultConfig = Config{ + HTTPConfig: exthttp.DefaultHTTPConfig, +} + // Config stores the configuration for gcs bucket. type Config struct { Bucket string `yaml:"bucket"` @@ -39,7 +47,8 @@ type Config struct { // when direct path is not enabled. // See https://pkg.go.dev/cloud.google.com/go/storage#hdr-Experimental_gRPC_API for more details // on how to enable direct path. - GRPCConnPoolSize int `yaml:"grpc_conn_pool_size"` + GRPCConnPoolSize int `yaml:"grpc_conn_pool_size"` + HTTPConfig exthttp.HTTPConfig `yaml:"http_config"` } // Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS. @@ -51,14 +60,23 @@ type Bucket struct { closer io.Closer } +// parseConfig unmarshals a buffer into a Config with default values. +func parseConfig(conf []byte) (Config, error) { + config := DefaultConfig + if err := yaml.UnmarshalStrict(conf, &config); err != nil { + return Config{}, err + } + + return config, nil +} + // NewBucket returns a new Bucket against the given bucket handle. func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) { - var gc Config - if err := yaml.Unmarshal(conf, &gc); err != nil { + config, err := parseConfig(conf) + if err != nil { return nil, err } - - return NewBucketWithConfig(ctx, logger, gc, component) + return NewBucketWithConfig(ctx, logger, config, component) } // NewBucketWithConfig returns a new Bucket with gcs Config struct. @@ -76,12 +94,38 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp return nil, errors.Wrap(err, "failed to create credentials from JSON") } opts = append(opts, option.WithCredentials(credentials)) + } else { + opts = append(opts, option.WithoutAuthentication()) } opts = append(opts, option.WithUserAgent(fmt.Sprintf("thanos-%s/%s (%s)", component, version.Version, runtime.Version())), ) + // Check if a roundtripper has been set in the config + // otherwise build the default transport. + var rt http.RoundTripper + if gc.HTTPConfig.Transport != nil { + rt = gc.HTTPConfig.Transport + } else { + var err error + rt, err = exthttp.DefaultTransport(gc.HTTPConfig) + if err != nil { + return nil, err + } + } + + gRT, err := htransport.NewTransport(context.Background(), rt, opts...) + if err != nil { + return nil, err + } + + httpCli := &http.Client{ + Transport: gRT, + Timeout: time.Duration(gc.HTTPConfig.IdleConnTimeout), + } + opts = append(opts, option.WithHTTPClient(httpCli)) + return newBucket(ctx, logger, gc, opts) } diff --git a/providers/gcs/gcs_test.go b/providers/gcs/gcs_test.go index 283de6b6..95e0b005 100644 --- a/providers/gcs/gcs_test.go +++ b/providers/gcs/gcs_test.go @@ -10,9 +10,11 @@ import ( "net/http/httptest" "os" "testing" + "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" + "github.com/prometheus/common/model" ) func TestBucket_Get_ShouldReturnErrorIfServerTruncateResponse(t *testing.T) { @@ -44,3 +46,58 @@ func TestBucket_Get_ShouldReturnErrorIfServerTruncateResponse(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "storage: partial request not satisfied", err.Error()) } + +func TestParseConfig_HTTPConfig(t *testing.T) { + for _, tc := range []struct { + name string + input string + assertions func(cfg Config) + }{ + { + name: "DefaultHTTPConfig", + input: `bucket: abcd`, + assertions: func(cfg Config) { + testutil.Equals(t, cfg.HTTPConfig.IdleConnTimeout, model.Duration(90*time.Second)) + testutil.Equals(t, cfg.HTTPConfig.ResponseHeaderTimeout, model.Duration(2*time.Minute)) + testutil.Equals(t, cfg.HTTPConfig.InsecureSkipVerify, false) + }, + }, + { + name: "CustomHTTPConfig", + input: `bucket: abcd +http_config: + insecure_skip_verify: true + idle_conn_timeout: 50s + response_header_timeout: 1m`, + assertions: func(cfg Config) { + testutil.Equals(t, cfg.HTTPConfig.IdleConnTimeout, model.Duration(50*time.Second)) + testutil.Equals(t, cfg.HTTPConfig.ResponseHeaderTimeout, model.Duration(1*time.Minute)) + testutil.Equals(t, cfg.HTTPConfig.InsecureSkipVerify, true) + }, + }, + { + name: "CustomHTTPConfigWithTLS", + input: `bucket: abcd +http_config: + tls_config: + ca_file: /certs/ca.crt + cert_file: /certs/cert.crt + key_file: /certs/key.key + server_name: server + insecure_skip_verify: false`, + assertions: func(cfg Config) { + testutil.Equals(t, "/certs/ca.crt", cfg.HTTPConfig.TLSConfig.CAFile) + testutil.Equals(t, "/certs/cert.crt", cfg.HTTPConfig.TLSConfig.CertFile) + testutil.Equals(t, "/certs/key.key", cfg.HTTPConfig.TLSConfig.KeyFile) + testutil.Equals(t, "server", cfg.HTTPConfig.TLSConfig.ServerName) + testutil.Equals(t, false, cfg.HTTPConfig.TLSConfig.InsecureSkipVerify) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cfg, err := parseConfig([]byte(tc.input)) + testutil.Ok(t, err) + tc.assertions(cfg) + }) + } +} diff --git a/providers/s3/s3.go b/providers/s3/s3.go index 507b550b..72130034 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -14,7 +14,6 @@ import ( "strconv" "strings" "testing" - "time" "github.com/efficientgo/core/logerrcapture" "github.com/go-kit/log" @@ -23,7 +22,6 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/common/version" "gopkg.in/yaml.v2" @@ -101,16 +99,8 @@ const ( ) var DefaultConfig = Config{ - PutUserMetadata: map[string]string{}, - HTTPConfig: exthttp.HTTPConfig{ - IdleConnTimeout: model.Duration(90 * time.Second), - ResponseHeaderTimeout: model.Duration(2 * time.Minute), - TLSHandshakeTimeout: model.Duration(10 * time.Second), - ExpectContinueTimeout: model.Duration(1 * time.Second), - MaxIdleConns: 100, - MaxIdleConnsPerHost: 100, - MaxConnsPerHost: 0, - }, + PutUserMetadata: map[string]string{}, + HTTPConfig: exthttp.DefaultHTTPConfig, PartSize: 1024 * 1024 * 64, // 64MB. BucketLookupType: AutoLookup, SendContentMd5: true, // Default to using MD5.