From 9767807680cb4149c7b56345c531b62105a1b976 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Fri, 7 Jun 2024 16:02:18 +0100 Subject: [PATCH] feat: Add backoff to flush op (#13140) --- docs/sources/shared/configuration.md | 18 ++++- pkg/ingester/flush.go | 34 +++++++-- pkg/ingester/flush_test.go | 66 +++++++++++++++++ pkg/ingester/ingester.go | 16 ++++- pkg/ingester/ingester_test.go | 101 ++++++++++++++++++++++----- pkg/ingester/instance_test.go | 13 +++- 6 files changed, 217 insertions(+), 31 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index cae0094873a84..b287bdea5f37f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2752,7 +2752,23 @@ lifecycler: # CLI flag: -ingester.flush-check-period [flush_check_period: | default = 30s] -# The timeout before a flush is cancelled. +flush_op_backoff: + # Minimum backoff period when a flush fails. Each concurrent flush has its own + # backoff, see `ingester.concurrent-flushes`. + # CLI flag: -ingester.flush-op-backoff-min-period + [min_period: | default = 10s] + + # Maximum backoff period when a flush fails. Each concurrent flush has its own + # backoff, see `ingester.concurrent-flushes`. + # CLI flag: -ingester.flush-op-backoff-max-period + [max_period: | default = 1m] + + # Maximum retries for failed flushes. + # CLI flag: -ingester.flush-op-backoff-retries + [max_retries: | default = 10] + +# The timeout for an individual flush. Will be retried up to +# `flush-op-backoff-retries` times. # CLI flag: -ingester.flush-op-timeout [flush_op_timeout: | default = 10m] diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 00aad05475495..81407abcb2e25 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -7,7 +7,9 @@ import ( "sync" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" @@ -135,8 +137,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo } func (i *Ingester) flushLoop(j int) { + l := log.With(i.logger, "loop", j) defer func() { - level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited") + level.Debug(l).Log("msg", "Ingester.flushLoop() exited") i.flushQueuesDone.Done() }() @@ -147,9 +150,10 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) - err := i.flushUserSeries(op.userID, op.fp, op.immediate) + m := util_log.WithUserID(op.userID, l) + err := i.flushOp(m, op) if err != nil { - level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err) + level.Error(m).Log("msg", "failed to flush", "err", err) } // If we're exiting & we failed to flush, put the failed operation @@ -161,7 +165,23 @@ func (i *Ingester) flushLoop(j int) { } } -func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { +func (i *Ingester) flushOp(l log.Logger, op *flushOp) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + b := backoff.New(ctx, i.cfg.FlushOpBackoff) + for b.Ongoing() { + err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate) + if err == nil { + break + } + level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err) + b.Wait() + } + return b.Err() +} + +func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error { instance, ok := i.getInstanceByID(userID) if !ok { return nil @@ -175,9 +195,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat lbs := labels.String() level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs) - ctx := user.InjectOrgID(context.Background(), userID) - ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) - defer cancel() + ctx = user.InjectOrgID(ctx, userID) + ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) + defer cancelFunc() err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) if err != nil { return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 6fd52bafa066f..edd6084a2741b 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -1,6 +1,7 @@ package ingester import ( + "errors" "fmt" "os" "sort" @@ -102,6 +103,67 @@ func Benchmark_FlushLoop(b *testing.B) { } } +func Test_FlushOp(t *testing.T) { + t.Run("no error", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + _, ing := newTestStore(t, cfg, nil) + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + })) + }) + + t.Run("max retries exceeded", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + store, ing := newTestStore(t, cfg, nil) + store.onPut = func(_ context.Context, _ []chunk.Chunk) error { + return errors.New("failed to write chunks") + } + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + }), "terminated after 1 retries") + }) +} + func Test_Flush(t *testing.T) { var ( store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil) @@ -297,6 +359,10 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg := Config{} flagext.DefaultValues(&cfg) + cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushOpTimeout = 15 * time.Second cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxChunkIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f892a2a7a89ce..1a89aebe6ef9f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" @@ -84,6 +85,7 @@ type Config struct { ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` @@ -129,7 +131,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.") + f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.") + f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.") f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") @@ -157,6 +162,15 @@ func (cfg *Config) Validate() error { return err } + if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff { + return errors.New("invalid flush op min backoff: cannot be larger than max backoff") + } + if cfg.FlushOpBackoff.MaxRetries <= 0 { + return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries) + } + if cfg.FlushOpTimeout <= 0 { + return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout) + } if cfg.IndexShards <= 0 { return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards) } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1c438bd6bf2c0..6bb27ad645cc9 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" @@ -676,57 +677,119 @@ func TestIngester_asyncStoreMaxLookBack(t *testing.T) { func TestValidate(t *testing.T) { for i, tc := range []struct { - in Config - err bool - expected Config + in Config + expected Config + expectedErr string }{ { in: Config{ - MaxChunkAge: time.Minute, ChunkEncoding: chunkenc.EncGZIP.String(), - IndexShards: index.DefaultIndexShards, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, }, expected: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, MaxChunkAge: time.Minute, - ChunkEncoding: chunkenc.EncGZIP.String(), parsedEncoding: chunkenc.EncGZIP, - IndexShards: index.DefaultIndexShards, }, }, { in: Config{ ChunkEncoding: chunkenc.EncSnappy.String(), - IndexShards: index.DefaultIndexShards, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, }, expected: Config{ - ChunkEncoding: chunkenc.EncSnappy.String(), - parsedEncoding: chunkenc.EncSnappy, + ChunkEncoding: chunkenc.EncSnappy.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, IndexShards: index.DefaultIndexShards, + parsedEncoding: chunkenc.EncSnappy, }, }, { in: Config{ - IndexShards: index.DefaultIndexShards, ChunkEncoding: "bad-enc", + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + }, + expectedErr: "invalid encoding: bad-enc, supported: none, gzip, lz4-64k, snappy, lz4-256k, lz4-1M, lz4, flate, zstd", + }, + { + in: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, + }, + expectedErr: "invalid flush op max retries: 0", + }, + { + in: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, }, - err: true, + expectedErr: "invalid flush op timeout: 0s", }, { in: Config{ - MaxChunkAge: time.Minute, ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + MaxChunkAge: time.Minute, }, - err: true, + expectedErr: "invalid ingester index shard factor: 0", }, } { t.Run(fmt.Sprint(i), func(t *testing.T) { err := tc.in.Validate() - if tc.err { - require.NotNil(t, err) - return + if tc.expectedErr != "" { + require.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, tc.in) } - require.Nil(t, err) - require.Equal(t, tc.expected, tc.in) }) } } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 7f7dc30361d6a..3055a7fb0c5b7 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -40,9 +41,15 @@ import ( func defaultConfig() *Config { cfg := Config{ - BlockSize: 512, - ChunkEncoding: "gzip", - IndexShards: 32, + BlockSize: 512, + ChunkEncoding: "gzip", + IndexShards: 32, + FlushOpTimeout: 15 * time.Second, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, } if err := cfg.Validate(); err != nil { panic(errors.Wrap(err, "error building default test config"))