Skip to content

Commit

Permalink
feat: add backoff mechanism to the retention process (#14182)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko authored Sep 19, 2024
1 parent 9637790 commit 3136880
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 27 deletions.
13 changes: 13 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1900,6 +1900,19 @@ The `compactor` block configures the compactor component, which compacts index s
# CLI flag: -compactor.retention-table-timeout
[retention_table_timeout: <duration> | default = 0s]
retention_backoff_config:
# Minimum delay when backing off.
# CLI flag: -compactor.retention-backoff-config.backoff-min-period
[min_period: <duration> | default = 100ms]
# Maximum delay when backing off.
# CLI flag: -compactor.retention-backoff-config.backoff-max-period
[max_period: <duration> | default = 10s]
# Number of times to backoff and retry before failing.
# CLI flag: -compactor.retention-backoff-config.backoff-retries
[max_retries: <int> | default = 10]
# Store used for managing delete requests.
# CLI flag: -compactor.delete-request-store
[delete_request_store: <string> | default = ""]
Expand Down
12 changes: 8 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/compactor/retention"
Expand Down Expand Up @@ -77,6 +79,7 @@ type Config struct {
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
RetentionBackoffConfig backoff.Config `yaml:"retention_backoff_config"`
DeleteRequestStore string `yaml:"delete_request_store"`
DeleteRequestStoreKeyPrefix string `yaml:"delete_request_store_key_prefix"`
DeleteBatchSize int `yaml:"delete_batch_size"`
Expand Down Expand Up @@ -110,6 +113,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.TablesToCompact, "compactor.tables-to-compact", 0, "Number of tables that compactor will try to compact. Newer tables are chosen when this is less than the number of tables available.")
f.IntVar(&cfg.SkipLatestNTables, "compactor.skip-latest-n-tables", 0, "Do not compact N latest tables. Together with -compactor.run-once and -compactor.tables-to-compact, this is useful when clearing compactor backlogs.")

cfg.RetentionBackoffConfig.RegisterFlagsWithPrefix("compactor.retention-backoff-config", f)
// Ring
skipFlags := []string{
"compactor.ring.num-tokens",
Expand Down Expand Up @@ -323,7 +327,7 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie
}
chunkClient := client.NewClient(objectClient, encoder, schemaConfig)

sc.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r)
sc.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, c.cfg.RetentionBackoffConfig, r)
if err != nil {
return fmt.Errorf("failed to init sweeper: %w", err)
}
Expand Down
54 changes: 36 additions & 18 deletions pkg/compactor/retention/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -272,9 +273,17 @@ type Sweeper struct {
markerProcessor MarkerProcessor
chunkClient ChunkClient
sweeperMetrics *sweeperMetrics
backoffConfig backoff.Config
}

func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount int, minAgeDelete time.Duration, r prometheus.Registerer) (*Sweeper, error) {
func NewSweeper(
workingDir string,
deleteClient ChunkClient,
deleteWorkerCount int,
minAgeDelete time.Duration,
backoffConfig backoff.Config,
r prometheus.Registerer,
) (*Sweeper, error) {
m := newSweeperMetrics(r)

p, err := newMarkerStorageReader(workingDir, deleteWorkerCount, minAgeDelete, m)
Expand All @@ -285,34 +294,43 @@ func NewSweeper(workingDir string, deleteClient ChunkClient, deleteWorkerCount i
markerProcessor: p,
chunkClient: deleteClient,
sweeperMetrics: m,
backoffConfig: backoffConfig,
}, nil
}

func (s *Sweeper) Start() {
s.markerProcessor.Start(func(ctx context.Context, chunkId []byte) error {
status := statusSuccess
start := time.Now()
defer func() {
s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds())
}()
chunkIDString := unsafeGetString(chunkId)
userID, err := getUserIDFromChunkID(chunkId)
if err != nil {
return err
}
s.markerProcessor.Start(s.deleteChunk)
}

func (s *Sweeper) deleteChunk(ctx context.Context, chunkID []byte) error {
status := statusSuccess
start := time.Now()
defer func() {
s.sweeperMetrics.deleteChunkDurationSeconds.WithLabelValues(status).Observe(time.Since(start).Seconds())
}()
chunkIDString := unsafeGetString(chunkID)
userID, err := getUserIDFromChunkID(chunkID)
if err != nil {
return err
}

retry := backoff.New(ctx, s.backoffConfig)
for retry.Ongoing() {
err = s.chunkClient.DeleteChunk(ctx, unsafeGetString(userID), chunkIDString)
if err == nil {
return nil
}
if s.chunkClient.IsChunkNotFoundErr(err) {
status = statusNotFound
level.Debug(util_log.Logger).Log("msg", "delete on not found chunk", "chunkID", chunkIDString)
return nil
}
if err != nil {
level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err)
status = statusFailure
}
return err
})
retry.Wait()
}

level.Error(util_log.Logger).Log("msg", "error deleting chunk", "chunkID", chunkIDString, "err", err)
status = statusFailure
return err
}

func getUserIDFromChunkID(chunkID []byte) ([]byte, error) {
Expand Down
68 changes: 63 additions & 5 deletions pkg/compactor/retention/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"os"
"path"
"path/filepath"
Expand All @@ -14,6 +15,7 @@ import (
"testing"
"time"

"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -32,14 +34,37 @@ import (
)

type mockChunkClient struct {
mtx sync.Mutex
deletedChunks map[string]struct{}
mtx sync.Mutex
deletedChunks map[string]struct{}
unstableDeletion bool
perObjectCounter map[string]uint32
}

// newMockChunkClient creates a client that fails every first call to DeleteChunk if `unstableDeletion` is true.
func newMockChunkClient(unstableDeletion bool) *mockChunkClient {
return &mockChunkClient{
deletedChunks: map[string]struct{}{},
unstableDeletion: unstableDeletion,
perObjectCounter: map[string]uint32{},
}
}

// shouldFail returns true for every first call
func (m *mockChunkClient) shouldFail(objectKey string) bool {
if !m.unstableDeletion {
return false
}
shouldFail := m.perObjectCounter[objectKey]%2 == 0
m.perObjectCounter[objectKey]++
return shouldFail
}

func (m *mockChunkClient) DeleteChunk(_ context.Context, _, chunkID string) error {
m.mtx.Lock()
defer m.mtx.Unlock()

if m.shouldFail(chunkID) {
return fmt.Errorf("chunk deletion for chunkID:%s is failed by mockChunkClient", chunkID)
}
m.deletedChunks[string([]byte(chunkID))] = struct{}{} // forces a copy, because this string is only valid within the delete fn.
return nil
}
Expand Down Expand Up @@ -144,8 +169,9 @@ func Test_Retention(t *testing.T) {
// marks and sweep
expiration := NewExpirationChecker(tt.limits)
workDir := filepath.Join(t.TempDir(), "retention")
chunkClient := &mockChunkClient{deletedChunks: map[string]struct{}{}}
sweep, err := NewSweeper(workDir, chunkClient, 10, 0, nil)
// must not fail the process because deletion must be retried
chunkClient := newMockChunkClient(true)
sweep, err := NewSweeper(workDir, chunkClient, 10, 0, backoff.Config{MaxRetries: 2}, nil)
require.NoError(t, err)
sweep.Start()
defer sweep.Stop()
Expand Down Expand Up @@ -176,6 +202,38 @@ func Test_Retention(t *testing.T) {
}
}

func Test_Sweeper_deleteChunk(t *testing.T) {
chunkID := "1/3fff2c2d7595e046:1916fa8c4bd:1916fdfb33d:bd55fc5"
tests := map[string]struct {
maxRetries int
expectedError error
}{
"expected error if chunk is not deleted and retry is disabled": {
maxRetries: 1,
expectedError: fmt.Errorf("chunk deletion for chunkID:%s is failed by mockChunkClient", chunkID),
},
"expected no error if chunk is not deleted at the first attempt but retried": {
maxRetries: 2,
},
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
workDir := filepath.Join(t.TempDir(), "retention")
chunkClient := newMockChunkClient(true)
sweep, err := NewSweeper(workDir, chunkClient, 10, 0, backoff.Config{MaxRetries: data.maxRetries}, nil)
require.NoError(t, err)

err = sweep.deleteChunk(context.Background(), []byte(chunkID))
if data.expectedError != nil {
require.Equal(t, data.expectedError, err)
} else {
require.NoError(t, err)
}
})
}

}

type noopWriter struct {
count int64
}
Expand Down

0 comments on commit 3136880

Please sign in to comment.