diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d50669016356c..bdbcdc4606bf2 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1900,6 +1900,19 @@ The `compactor` block configures the compactor component, which compacts index s # CLI flag: -compactor.retention-table-timeout [retention_table_timeout: | default = 0s] +retention_backoff_config: + # Minimum delay when backing off. + # CLI flag: -compactor.retention-backoff-config.backoff-min-period + [min_period: | default = 100ms] + + # Maximum delay when backing off. + # CLI flag: -compactor.retention-backoff-config.backoff-max-period + [max_period: | default = 10s] + + # Number of times to backoff and retry before failing. + # CLI flag: -compactor.retention-backoff-config.backoff-retries + [max_retries: | default = 10] + # Store used for managing delete requests. # CLI flag: -compactor.delete-request-store [delete_request_store: | default = ""] diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 88637ca8b4f6e..908152a3edbbe 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -12,13 +12,15 @@ import ( "time" "github.com/go-kit/log/level" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/compactor/retention" @@ -77,6 +79,7 @@ type Config struct { RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"` + RetentionBackoffConfig backoff.Config `yaml:"retention_backoff_config"` DeleteRequestStore string `yaml:"delete_request_store"` DeleteRequestStoreKeyPrefix string `yaml:"delete_request_store_key_prefix"` DeleteBatchSize int `yaml:"delete_batch_size"` @@ -110,6 +113,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.TablesToCompact, "compactor.tables-to-compact", 0, "Number of tables that compactor will try to compact. Newer tables are chosen when this is less than the number of tables available.") f.IntVar(&cfg.SkipLatestNTables, "compactor.skip-latest-n-tables", 0, "Do not compact N latest tables. Together with -compactor.run-once and -compactor.tables-to-compact, this is useful when clearing compactor backlogs.") + cfg.RetentionBackoffConfig.RegisterFlagsWithPrefix("compactor.retention-backoff-config", f) // Ring skipFlags := []string{ "compactor.ring.num-tokens", @@ -323,7 +327,7 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie } chunkClient := client.NewClient(objectClient, encoder, schemaConfig) - sc.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r) + sc.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, c.cfg.RetentionBackoffConfig, r) if err != nil { return fmt.Errorf("failed to init sweeper: %w", err) } diff --git a/pkg/compactor/retention/retention.go b/pkg/compactor/retention/retention.go index 0a4aba59be474..96eafcc2a7d58 100644 --- a/pkg/compactor/retention/retention.go +++ b/pkg/compactor/retention/retention.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -272,9 +273,17 @@ type Sweeper struct { markerProcessor MarkerProcessor chunkClient ChunkClient sweeperMetrics *sweeperMetrics + backoffConfig backoff.Config } -func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) { +func NewSweeper( + workingDir string, + deleteClient ChunkClient, + deleteWorkerCount int, + minAgeDelete time.Duration, + backoffConfig backoff.Config, + r prometheus.Registerer, +) (*Sweeper, error) { m := newSweeperMetrics(r) p, err := newMarkerStorageReader(workingDir, deleteWorkerCount, minAgeDelete, m) @@ -285,34 +294,43 @@ func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount i markerProcessor: p, chunkClient: deleteClient, sweeperMetrics: m, + backoffConfig: backoffConfig, }, nil } func (s *Sweeper) Start() { - s.markerProcessor.Start(func(ctx context.Context, chunkId []byte) error { - status := statusSuccess - start := time.Now() - defer func() { - s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds()) - }() - chunkIDString := unsafeGetString(chunkId) - userID, err := getUserIDFromChunkID(chunkId) - if err != nil { - return err - } + s.markerProcessor.Start(s.deleteChunk) +} +func (s *Sweeper) deleteChunk(ctx context.Context, chunkID []byte) error { + status := statusSuccess + start := time.Now() + defer func() { + s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds()) + }() + chunkIDString := unsafeGetString(chunkID) + userID, err := getUserIDFromChunkID(chunkID) + if err != nil { + return err + } + + retry := backoff.New(ctx, s.backoffConfig) + for retry.Ongoing() { err = s.chunkClient.DeleteChunk(ctx, unsafeGetString(userID), chunkIDString) + if err == nil { + return nil + } if s.chunkClient.IsChunkNotFoundErr(err) { status = statusNotFound level.Debug(util_log.Logger).Log("msg", "delete on not found chunk", "chunkID", chunkIDString) return nil } - if err != nil { - level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err) - status = statusFailure - } - return err - }) + retry.Wait() + } + + level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err) + status = statusFailure + return err } func getUserIDFromChunkID(chunkID []byte) ([]byte, error) { diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index 3b124691087ad..32dac3293a097 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/base64" + "fmt" "os" "path" "path/filepath" @@ -14,6 +15,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/backoff" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -32,14 +34,37 @@ import ( ) type mockChunkClient struct { - mtx sync.Mutex - deletedChunks map[string]struct{} + mtx sync.Mutex + deletedChunks map[string]struct{} + unstableDeletion bool + perObjectCounter map[string]uint32 +} + +// newMockChunkClient creates a client that fails every first call to DeleteChunk if `unstableDeletion` is true. +func newMockChunkClient(unstableDeletion bool) *mockChunkClient { + return &mockChunkClient{ + deletedChunks: map[string]struct{}{}, + unstableDeletion: unstableDeletion, + perObjectCounter: map[string]uint32{}, + } +} + +// shouldFail returns true for every first call +func (m *mockChunkClient) shouldFail(objectKey string) bool { + if !m.unstableDeletion { + return false + } + shouldFail := m.perObjectCounter[objectKey]%2 == 0 + m.perObjectCounter[objectKey]++ + return shouldFail } func (m *mockChunkClient) DeleteChunk(_ context.Context, _, chunkID string) error { m.mtx.Lock() defer m.mtx.Unlock() - + if m.shouldFail(chunkID) { + return fmt.Errorf("chunk deletion for chunkID:%s is failed by mockChunkClient", chunkID) + } m.deletedChunks[string([]byte(chunkID))] = struct{}{} // forces a copy, because this string is only valid within the delete fn. return nil } @@ -144,8 +169,9 @@ func Test_Retention(t *testing.T) { // marks and sweep expiration := NewExpirationChecker(tt.limits) workDir := filepath.Join(t.TempDir(), "retention") - chunkClient := &mockChunkClient{deletedChunks: map[string]struct{}{}} - sweep, err := NewSweeper(workDir, chunkClient, 10, 0, nil) + // must not fail the process because deletion must be retried + chunkClient := newMockChunkClient(true) + sweep, err := NewSweeper(workDir, chunkClient, 10, 0, backoff.Config{MaxRetries: 2}, nil) require.NoError(t, err) sweep.Start() defer sweep.Stop() @@ -176,6 +202,38 @@ func Test_Retention(t *testing.T) { } } +func Test_Sweeper_deleteChunk(t *testing.T) { + chunkID := "1/3fff2c2d7595e046:1916fa8c4bd:1916fdfb33d:bd55fc5" + tests := map[string]struct { + maxRetries int + expectedError error + }{ + "expected error if chunk is not deleted and retry is disabled": { + maxRetries: 1, + expectedError: fmt.Errorf("chunk deletion for chunkID:%s is failed by mockChunkClient", chunkID), + }, + "expected no error if chunk is not deleted at the first attempt but retried": { + maxRetries: 2, + }, + } + for name, data := range tests { + t.Run(name, func(t *testing.T) { + workDir := filepath.Join(t.TempDir(), "retention") + chunkClient := newMockChunkClient(true) + sweep, err := NewSweeper(workDir, chunkClient, 10, 0, backoff.Config{MaxRetries: data.maxRetries}, nil) + require.NoError(t, err) + + err = sweep.deleteChunk(context.Background(), []byte(chunkID)) + if data.expectedError != nil { + require.Equal(t, data.expectedError, err) + } else { + require.NoError(t, err) + } + }) + } + +} + type noopWriter struct { count int64 }