Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added roundtripper wrapper as param #150

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#116](https://github.com/thanos-io/objstore/pull/116) Azure: Add new storage_create_container configuration property
- [#128](https://github.com/thanos-io/objstore/pull/128) GCS: Add support for `ChunkSize` for writer.
- [#130](https://github.com/thanos-io/objstore/pull/130) feat: Decouple creating bucket metrics from instrumenting the bucket
- [#150](https://github.com/thanos-io/objstore/pull/150) Add support for roundtripper wrapper.

### Changed
- [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`.
Expand Down
16 changes: 8 additions & 8 deletions client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type BucketConfig struct {

// NewBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand All @@ -65,23 +65,23 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt h
var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt)
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, wrapRoundtripper)
case string(S3):
bucket, err = s3.NewBucket(logger, config, component, rt)
bucket, err = s3.NewBucket(logger, config, component, wrapRoundtripper)
case string(AZURE):
bucket, err = azure.NewBucket(logger, config, component, rt)
bucket, err = azure.NewBucket(logger, config, component, wrapRoundtripper)
case string(SWIFT):
bucket, err = swift.NewContainer(logger, config, rt)
bucket, err = swift.NewContainer(logger, config, wrapRoundtripper)
case string(COS):
bucket, err = cos.NewBucket(logger, config, component, rt)
bucket, err = cos.NewBucket(logger, config, component, wrapRoundtripper)
case string(ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component, rt)
bucket, err = oss.NewBucket(logger, config, component, wrapRoundtripper)
case string(FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
case string(BOS):
bucket, err = bos.NewBucket(logger, config, component)
case string(OCI):
bucket, err = oci.NewBucket(logger, config, rt)
bucket, err = oci.NewBucket(logger, config, wrapRoundtripper)
case string(OBS):
bucket, err = obs.NewBucket(logger, config)
default:
Expand Down
16 changes: 15 additions & 1 deletion errutil/rt_error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
package errutil

import "net/http"
import (
"net/http"

"github.com/pkg/errors"
)

var rtErr = errors.New("RoundTripper error")

func IsMockedError(err error) bool {
return errors.Is(err, rtErr)
}

// ErrorRoundTripper is a custom RoundTripper that always returns an error.
type ErrorRoundTripper struct {
Expand All @@ -10,3 +20,7 @@ type ErrorRoundTripper struct {
func (ert *ErrorRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
return nil, ert.Err
}

func WrapWithErrRoundtripper(rt http.RoundTripper) http.RoundTripper {
return &ErrorRoundTripper{Err: rtErr}
}
11 changes: 4 additions & 7 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type Bucket struct {
}

// NewBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(logger log.Logger, azureConfig []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
conf, err := parseConfig(azureConfig)
if err != nil {
Expand All @@ -155,19 +155,16 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.
if conf.MSIResource != "" {
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
}
return NewBucketWithConfig(logger, conf, component, rt)
return NewBucketWithConfig(logger, conf, component, wrapRoundtripper)
}

// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
if rt != nil {
conf.HTTPConfig.Transport = rt
}
func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if err := conf.validate(); err != nil {
return nil, err
}

containerClient, err := getContainerClient(conf)
containerClient, err := getContainerClient(conf, wrapRoundtripper)
if err != nil {
return nil, err
}
Expand Down
7 changes: 2 additions & 5 deletions providers/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"

"github.com/thanos-io/objstore/errutil"
"github.com/thanos-io/objstore/exthttp"
Expand Down Expand Up @@ -230,11 +229,9 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
cfg, err := parseConfig(validConfig)
testutil.Ok(t, err)

rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", rt)
_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", errutil.WrapWithErrRoundtripper)

// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
}
5 changes: 4 additions & 1 deletion providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

func getContainerClient(conf Config) (*container.Client, error) {
func getContainerClient(conf Config, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*container.Client, error) {
var rt http.RoundTripper
rt, err := exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
Expand All @@ -28,6 +28,9 @@ func getContainerClient(conf Config) (*container.Client, error) {
if conf.HTTPConfig.Transport != nil {
rt = conf.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}
opt := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
Expand Down
2 changes: 1 addition & 1 deletion providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func parseConfig(conf []byte) (Config, error) {

// NewBucket new bos bucket.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
// TODO(https://github.com/thanos-io/objstore/pull/140): Add support for custom roundtripper.
// TODO(https://github.com/thanos-io/objstore/pull/150): Add support for roundtripper wrapper.
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down
19 changes: 11 additions & 8 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket using the provided cos configuration.
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -104,11 +104,11 @@ func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTr
if err != nil {
return nil, errors.Wrap(err, "parsing cos configuration")
}
return NewBucketWithConfig(logger, config, component, rt)
return NewBucketWithConfig(logger, config, component, wrapRoundtripper)
}

// NewBucketWithConfig returns a new Bucket using the provided cos config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate cos configuration")
}
Expand All @@ -127,19 +127,22 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, rt
}
}
b := &cos.BaseURL{BucketURL: bucketURL}
var tpt http.RoundTripper
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
var rt http.RoundTripper
rt, err = exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, err
}
if rt != nil {
tpt = rt
if config.HTTPConfig.Transport != nil {
rt = config.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretId,
SecretKey: config.SecretKey,
Transport: tpt,
Transport: rt,
},
})

Expand Down
6 changes: 2 additions & 4 deletions providers/cos/cos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/thanos-io/objstore/errutil"
Expand Down Expand Up @@ -150,12 +149,11 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
SecretId: "sid",
SecretKey: "skey",
}
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt)
bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", errutil.WrapWithErrRoundtripper)
testutil.Ok(t, err)
_, err = bkt.Get(context.Background(), "Test")
// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
}
17 changes: 9 additions & 8 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,20 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, config, component, rt)
return NewBucketWithConfig(ctx, logger, config, component, wrapRoundtripper)
}

// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}
if rt != nil {
gc.HTTPConfig.Transport = rt
}

var opts []option.ClientOption

// If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
Expand All @@ -112,7 +110,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp

if !gc.UseGRPC {
var err error
opts, err = appendHttpOptions(gc, opts)
opts, err = appendHttpOptions(gc, opts, wrapRoundtripper)
if err != nil {
return nil, err
}
Expand All @@ -121,7 +119,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
return newBucket(ctx, logger, gc, opts)
}

func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOption, error) {
func appendHttpOptions(gc Config, opts []option.ClientOption, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) ([]option.ClientOption, error) {
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
Expand All @@ -132,6 +130,9 @@ func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOp
if gc.HTTPConfig.Transport != nil {
rt = gc.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}

// GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call
// htransport.NewTransport namely the scopes that are then used for OAth authentication. So to build our own
Expand Down
6 changes: 2 additions & 4 deletions providers/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/efficientgo/core/testutil"
"github.com/fullstorydev/emulators/storage/gcsemu"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore/errutil"
"google.golang.org/api/option"
Expand Down Expand Up @@ -161,7 +160,6 @@ http_config:
}

func TestNewBucketWithErrorRoundTripper(t *testing.T) {
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}
cfg := Config{
Bucket: "test-bucket",
ServiceAccount: "",
Expand All @@ -174,9 +172,9 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
err = os.Setenv("STORAGE_EMULATOR_HOST", svr.Addr)
testutil.Ok(t, err)

bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", rt)
bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", errutil.WrapWithErrRoundtripper)
testutil.Ok(t, err)
_, err = bkt.Get(context.Background(), "test-bucket")
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
}
2 changes: 1 addition & 1 deletion providers/obs/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Bucket struct {
}

func NewBucket(logger log.Logger, conf []byte) (*Bucket, error) {
// TODO(https://github.com/thanos-io/objstore/pull/140): Add support for custom roundtripper.
// TODO(https://github.com/thanos-io/objstore/pull/150): Add support for roundtripper wrapper.
config, err := parseConfig(conf)
if err != nil {
return nil, errors.Wrap(err, "parsing cos configuration")
Expand Down
15 changes: 9 additions & 6 deletions providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (b *Bucket) deleteBucket(ctx context.Context) (err error) {
}

// NewBucket returns a new Bucket using the provided oci config values.
func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Bucket, error) {
func NewBucket(logger log.Logger, ociConfig []byte, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new oci bucket connection")
var config = DefaultConfig
var configurationProvider common.ConfigurationProvider
Expand Down Expand Up @@ -344,13 +344,16 @@ func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Buck
if err != nil {
return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations")
}

config.HTTPConfig.Transport = CustomTransport(config)
if rt != nil {
config.HTTPConfig.Transport = rt
var rt http.RoundTripper
rt = CustomTransport(config)
if config.HTTPConfig.Transport != nil {
rt = config.HTTPConfig.Transport
}
if wrapRoundtripper != nil {
rt = wrapRoundtripper(rt)
}
httpClient := http.Client{
Transport: config.HTTPConfig.Transport,
Transport: rt,
Timeout: config.HTTPConfig.ClientTimeout,
}
client.HTTPClient = &httpClient
Expand Down
7 changes: 2 additions & 5 deletions providers/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/objstore/errutil"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -38,10 +37,8 @@ G6aFKaqQfOXKCyWoUiVknQJAXrlgySFci/2ueKlIE1QqIiLSZ8V8OlpFLRnb1pzI
ociConfig, err := yaml.Marshal(config)
testutil.Ok(t, err)

rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

_, err = NewBucket(log.NewNopLogger(), ociConfig, rt)
_, err = NewBucket(log.NewNopLogger(), ociConfig, errutil.WrapWithErrRoundtripper)
// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
}
Loading
Loading