Skip to content

Commit

Permalink
feat: Add backoff to flush op (#13140)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jun 7, 2024
1 parent b31e04e commit 9767807
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 31 deletions.
18 changes: 17 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2752,7 +2752,23 @@ lifecycler:
# CLI flag: -ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]
# The timeout before a flush is cancelled.
flush_op_backoff:
# Minimum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-min-period
[min_period: <duration> | default = 10s]

# Maximum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-max-period
[max_period: <duration> | default = 1m]

# Maximum retries for failed flushes.
# CLI flag: -ingester.flush-op-backoff-retries
[max_retries: <int> | default = 10]

# The timeout for an individual flush. Will be retried up to
# `flush-op-backoff-retries` times.
# CLI flag: -ingester.flush-op-timeout
[flush_op_timeout: <duration> | default = 10m]

Expand Down
34 changes: 27 additions & 7 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -135,8 +137,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}

func (i *Ingester) flushLoop(j int) {
l := log.With(i.logger, "loop", j)
defer func() {
level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited")
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()

Expand All @@ -147,9 +150,10 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

err := i.flushUserSeries(op.userID, op.fp, op.immediate)
m := util_log.WithUserID(op.userID, l)
err := i.flushOp(m, op)
if err != nil {
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err)
level.Error(m).Log("msg", "failed to flush", "err", err)
}

// If we're exiting & we failed to flush, put the failed operation
Expand All @@ -161,7 +165,23 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
if err == nil {
break
}
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
b.Wait()
}
return b.Err()
}

func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {
instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
Expand All @@ -175,9 +195,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
lbs := labels.String()
level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
ctx = user.InjectOrgID(ctx, userID)
ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancelFunc()
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
if err != nil {
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)
Expand Down
66 changes: 66 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"errors"
"fmt"
"os"
"sort"
Expand Down Expand Up @@ -102,6 +103,67 @@ func Benchmark_FlushLoop(b *testing.B) {
}
}

func Test_FlushOp(t *testing.T) {
t.Run("no error", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

_, ing := newTestStore(t, cfg, nil)

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}))
})

t.Run("max retries exceeded", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

store, ing := newTestStore(t, cfg, nil)
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
return errors.New("failed to write chunks")
}

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}), "terminated after 1 retries")
})
}

func Test_Flush(t *testing.T) {
var (
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
Expand Down Expand Up @@ -297,6 +359,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushOpTimeout = 15 * time.Second
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.MaxChunkIdle = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
Expand Down
16 changes: 15 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
Expand Down Expand Up @@ -84,6 +85,7 @@ type Config struct {

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
Expand Down Expand Up @@ -129,7 +131,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
Expand Down Expand Up @@ -157,6 +162,15 @@ func (cfg *Config) Validate() error {
return err
}

if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
}
if cfg.FlushOpBackoff.MaxRetries <= 0 {
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries)
}
if cfg.FlushOpTimeout <= 0 {
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
}
if cfg.IndexShards <= 0 {
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
}
Expand Down
101 changes: 82 additions & 19 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
Expand Down Expand Up @@ -676,57 +677,119 @@ func TestIngester_asyncStoreMaxLookBack(t *testing.T) {

func TestValidate(t *testing.T) {
for i, tc := range []struct {
in Config
err bool
expected Config
in Config
expected Config
expectedErr string
}{
{
in: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
IndexShards: index.DefaultIndexShards,
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
},
expected: Config{
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
parsedEncoding: chunkenc.EncGZIP,
IndexShards: index.DefaultIndexShards,
},
},
{
in: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
IndexShards: index.DefaultIndexShards,
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
},
expected: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
parsedEncoding: chunkenc.EncSnappy,
ChunkEncoding: chunkenc.EncSnappy.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
parsedEncoding: chunkenc.EncSnappy,
},
},
{
in: Config{
IndexShards: index.DefaultIndexShards,
ChunkEncoding: "bad-enc",
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
},
expectedErr: "invalid encoding: bad-enc, supported: none, gzip, lz4-64k, snappy, lz4-256k, lz4-1M, lz4, flate, zstd",
},
{
in: Config{
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
},
expectedErr: "invalid flush op max retries: 0",
},
{
in: Config{
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
},
err: true,
expectedErr: "invalid flush op timeout: 0s",
},
{
in: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
MaxChunkAge: time.Minute,
},
err: true,
expectedErr: "invalid ingester index shard factor: 0",
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
err := tc.in.Validate()
if tc.err {
require.NotNil(t, err)
return
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
require.Equal(t, tc.expected, tc.in)
}
require.Nil(t, err)
require.Equal(t, tc.expected, tc.in)
})
}
}
Expand Down
Loading

0 comments on commit 9767807

Please sign in to comment.