Skip to content

Commit

Permalink
Merge branch 'main' into svennergr/fix-log-to-span
Browse files Browse the repository at this point in the history
  • Loading branch information
svennergr authored Sep 19, 2024
2 parents 5950ffb + 8f54ec6 commit d672e8a
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 51 deletions.
63 changes: 61 additions & 2 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
awscommon "github.com/grafana/dskit/aws"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/instrument"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

amnet "k8s.io/apimachinery/pkg/util/net"

bucket_s3 "github.com/grafana/loki/v3/pkg/storage/bucket/s3"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
Expand Down Expand Up @@ -532,5 +535,61 @@ func (a *S3ObjectClient) IsObjectNotFoundErr(err error) bool {
return false
}

// TODO(dannyk): implement for client
func (a *S3ObjectClient) IsRetryableErr(error) bool { return false }
func isTimeoutError(err error) bool {
var netErr net.Error
return errors.As(err, &netErr) && netErr.Timeout()
}

func isContextErr(err error) bool {
return errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled)
}

// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts.
func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool {
// TODO(dannyk): move these out to be generic
// context errors are all client-side
if isContextErr(err) {
return false
}

// connection misconfiguration, or writing on a closed connection
// do NOT retry; this is not a server-side issue
if errors.Is(err, net.ErrClosed) || amnet.IsConnectionRefused(err) {
return false
}

// this is a server-side timeout
if isTimeoutError(err) {
return true
}

// connection closed (closed before established) or reset (closed after established)
// this is a server-side issue
if errors.Is(err, io.EOF) || amnet.IsConnectionReset(err) {
return true
}

if rerr, ok := err.(awserr.RequestFailure); ok {
// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
return rerr.StatusCode() == http.StatusRequestTimeout ||
rerr.StatusCode() == http.StatusGatewayTimeout
}

return false
}

// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling.
func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool {
if rerr, ok := err.(awserr.RequestFailure); ok {

// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
return rerr.StatusCode() == http.StatusTooManyRequests ||
(rerr.StatusCode()/100 == 5) // all 5xx errors are retryable
}

return false
}
func (a *S3ObjectClient) IsRetryableErr(err error) bool {
return a.IsStorageTimeoutErr(err) || a.IsStorageThrottledErr(err)
}
104 changes: 104 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -73,6 +75,108 @@ func TestIsObjectNotFoundErr(t *testing.T) {
}
}

func TestIsRetryableErr(t *testing.T) {
tests := []struct {
err error
expected bool
name string
}{
{
name: "IsStorageThrottledErr - Too Many Requests",
err: awserr.NewRequestFailure(
awserr.New("TooManyRequests", "TooManyRequests", nil), 429, "reqId",
),
expected: true,
},
{
name: "IsStorageThrottledErr - 500",
err: awserr.NewRequestFailure(
awserr.New("500", "500", nil), 500, "reqId",
),
expected: true,
},
{
name: "IsStorageThrottledErr - 5xx",
err: awserr.NewRequestFailure(
awserr.New("501", "501", nil), 501, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Request Timeout",
err: awserr.NewRequestFailure(
awserr.New("Request Timeout", "Request Timeout", nil), 408, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Gateway Timeout",
err: awserr.NewRequestFailure(
awserr.New("Gateway Timeout", "Gateway Timeout", nil), 504, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - EOF",
err: io.EOF,
expected: true,
},
{
name: "IsStorageTimeoutErr - Connection Reset",
err: syscall.ECONNRESET,
expected: true,
},
{
name: "IsStorageTimeoutErr - Timeout Error",
err: awserr.NewRequestFailure(
awserr.New("RequestCanceled", "request canceled due to timeout", nil), 408, "request-id",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Closed",
err: net.ErrClosed,
expected: false,
},
{
name: "IsStorageTimeoutErr - Connection Refused",
err: syscall.ECONNREFUSED,
expected: false,
},
{
name: "IsStorageTimeoutErr - Context Deadline Exceeded",
err: context.DeadlineExceeded,
expected: false,
},
{
name: "IsStorageTimeoutErr - Context Canceled",
err: context.Canceled,
expected: false,
},
{
name: "Not a retryable error",
err: syscall.EINVAL,
expected: false,
},
{
name: "Not found 404",
err: awserr.NewRequestFailure(
awserr.New("404", "404", nil), 404, "reqId",
),
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewS3ObjectClient(S3Config{BucketNames: "mybucket"}, hedging.Config{})
require.NoError(t, err)

require.Equal(t, tt.expected, client.IsRetryableErr(tt.err))
})
}
}

func TestRequestMiddleware(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, r.Header.Get("echo-me"))
Expand Down
2 changes: 1 addition & 1 deletion production/helm/loki/src/alerts.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ groups:
message: |
{{`{{`}} $labels.cluster {{`}}`}} {{`{{`}} $labels.namespace {{`}}`}} has had {{`{{`}} printf "%.0f" $value {{`}}`}} compactors running for more than 5m. Only one compactor should run at a time.
expr: |
sum(loki_boltdb_shipper_compactor_running) by (namespace, cluster) > 1
sum(loki_boltdb_shipper_compactor_running) by (cluster, namespace) > 1
for: "5m"
labels:
severity: "warning"
Expand Down
14 changes: 7 additions & 7 deletions production/loki-mixin-compiled-ssd/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@ groups:
- alert: LokiRequestErrors
annotations:
description: |
{{ $labels.job }} {{ $labels.route }} is experiencing {{ printf "%.2f" $value }}% errors.
{{ $labels.cluster }} {{ $labels.job }} {{ $labels.route }} is experiencing {{ printf "%.2f" $value }}% errors.
summary: Loki request error rate is high.
expr: |
100 * sum(rate(loki_request_duration_seconds_count{status_code=~"5.."}[2m])) by (namespace, job, route)
100 * sum(rate(loki_request_duration_seconds_count{status_code=~"5.."}[2m])) by (cluster, namespace, job, route)
/
sum(rate(loki_request_duration_seconds_count[2m])) by (namespace, job, route)
sum(rate(loki_request_duration_seconds_count[2m])) by (cluster, namespace, job, route)
> 10
for: 15m
labels:
severity: critical
- alert: LokiRequestPanics
annotations:
description: |
{{ $labels.job }} is experiencing {{ printf "%.2f" $value }}% increase of panics.
{{ $labels.cluster }} {{ $labels.job }} is experiencing {{ printf "%.2f" $value }}% increase of panics.
summary: Loki requests are causing code panics.
expr: |
sum(increase(loki_panic_total[10m])) by (namespace, job) > 0
sum(increase(loki_panic_total[10m])) by (cluster, namespace, job) > 0
labels:
severity: critical
- alert: LokiRequestLatency
annotations:
description: |
{{ $labels.job }} {{ $labels.route }} is experiencing {{ printf "%.2f" $value }}s 99th percentile latency.
{{ $labels.cluster }} {{ $labels.job }} {{ $labels.route }} is experiencing {{ printf "%.2f" $value }}s 99th percentile latency.
summary: Loki request error latency is high.
expr: |
cluster_namespace_job_route:loki_request_duration_seconds:99quantile{route!~"(?i).*tail.*|/schedulerpb.SchedulerForQuerier/QuerierLoop"} > 1
Expand All @@ -39,7 +39,7 @@ groups:
{{ $labels.cluster }} {{ $labels.namespace }} has had {{ printf "%.0f" $value }} compactors running for more than 5m. Only one compactor should run at a time.
summary: Loki deployment is running more than one compactor.
expr: |
sum(loki_boltdb_shipper_compactor_running) by (namespace, cluster) > 1
sum(loki_boltdb_shipper_compactor_running) by (cluster, namespace) > 1
for: 5m
labels:
severity: warning
Loading

0 comments on commit d672e8a

Please sign in to comment.