Skip to content

Commit

Permalink
feat(thanos): add new metric to track status codes (#14937)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Nov 15, 2024
1 parent 83e680f commit a629212
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 18 deletions.
8 changes: 4 additions & 4 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/thanos-io/objstore/providers/azure"
)

func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig)
func NewBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
return newBucketClient(cfg, name, logger, wrapRT, azure.NewBucketWithConfig)
}

func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, func(http.RoundTripper) http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
func newBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper, factory func(log.Logger, azure.Config, string, func(http.RoundTripper) http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
// Start with default config to make sure that all parameters are set to sensible values, especially
// HTTP Config field.
bucketConfig := azure.DefaultConfig
Expand All @@ -29,5 +29,5 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo
bucketConfig.Endpoint = cfg.Endpoint
}

return factory(logger, bucketConfig, name, nil)
return factory(logger, bucketConfig, name, wrapRT)
}
47 changes: 42 additions & 5 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"regexp"
"strconv"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -46,9 +47,24 @@ var (
ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")

metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "")
metrics *objstore.Metrics

// added to track the status codes by method
bucketRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "loki",
Name: "objstore_bucket_transport_requests_total",
Help: "Total number of HTTP transport requests made to the bucket backend by status code and method.",
},
[]string{"status_code", "method"},
)
)

func init() {
prometheus.MustRegister(bucketRequestsTotal)
metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "")
}

// StorageBackendConfig holds configuration for accessing long-term storage.
type StorageBackendConfig struct {
// Backends
Expand Down Expand Up @@ -176,13 +192,13 @@ func NewClient(ctx context.Context, backend string, cfg Config, name string, log
// TODO: add support for other backends that loki already supports
switch backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
client, err = s3.NewBucketClient(cfg.S3, name, logger, instrumentTransport())
case GCS:
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger)
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger, instrumentTransport())
case Azure:
client, err = azure.NewBucketClient(cfg.Azure, name, logger)
client, err = azure.NewBucketClient(cfg.Azure, name, logger, instrumentTransport())
case Swift:
client, err = swift.NewBucketClient(cfg.Swift, name, logger)
client, err = swift.NewBucketClient(cfg.Swift, name, logger, instrumentTransport())
case Filesystem:
client, err = filesystem.NewBucketClient(cfg.Filesystem)
default:
Expand All @@ -209,3 +225,24 @@ func NewClient(ctx context.Context, backend string, cfg Config, name string, log

return instrumentedClient, nil
}

type instrumentedRoundTripper struct {
next http.RoundTripper
}

func instrumentTransport() func(http.RoundTripper) http.RoundTripper {
return func(rt http.RoundTripper) http.RoundTripper {
return &instrumentedRoundTripper{next: rt}
}
}

func (i *instrumentedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := i.next.RoundTrip(req)
if err != nil {
return resp, err
}

// Record status code and method metrics
bucketRequestsTotal.WithLabelValues(strconv.Itoa(resp.StatusCode), req.Method).Inc()
return resp, nil
}
5 changes: 3 additions & 2 deletions pkg/storage/bucket/gcs/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package gcs

import (
"context"
"net/http"

"github.com/go-kit/log"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/gcs"
)

// NewBucketClient creates a new GCS bucket client
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
// start with default http configs
bucketConfig := gcs.DefaultConfig
bucketConfig.Bucket = cfg.BucketName
Expand All @@ -18,5 +19,5 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.HTTPConfig.Transport = cfg.Transport

return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil)
return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, wrapRT)
}
10 changes: 6 additions & 4 deletions pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package s3

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
Expand All @@ -14,23 +16,23 @@ const (
)

// NewBucketClient creates a new S3 bucket client
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
s3Cfg, err := newS3Config(cfg)
if err != nil {
return nil, err
}

return s3.NewBucketWithConfig(logger, s3Cfg, name, nil)
return s3.NewBucketWithConfig(logger, s3Cfg, name, wrapRT)
}

// NewBucketReaderClient creates a new S3 bucket client
func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore.BucketReader, error) {
func NewBucketReaderClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.BucketReader, error) {
s3Cfg, err := newS3Config(cfg)
if err != nil {
return nil, err
}

return s3.NewBucketWithConfig(logger, s3Cfg, name, nil)
return s3.NewBucketWithConfig(logger, s3Cfg, name, wrapRT)
}

func newS3Config(cfg Config) (s3.Config, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
Insecure: true,
}

s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger())
s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger(), nil)
require.NoError(t, err)

// Configure the config provider with NO KMS key ID.
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/bucket/swift/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package swift

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
Expand All @@ -9,7 +11,7 @@ import (
)

// NewBucketClient creates a new Swift bucket client
func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, _ string, logger log.Logger, wrapper func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
bucketConfig := swift.Config{
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Expand Down Expand Up @@ -37,5 +39,5 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket,
}
bucketConfig.HTTPConfig.Transport = cfg.Transport

return swift.NewContainerFromConfig(logger, &bucketConfig, false, nil)
return swift.NewContainerFromConfig(logger, &bucketConfig, false, wrapper)
}

0 comments on commit a629212

Please sign in to comment.