Skip to content

Commit

Permalink
compactor: do not block compation when retention is taking too long (#…
Browse files Browse the repository at this point in the history
…9884)

**What this PR does / why we need it**:
Currently, we perform compaction and apply retention in the same loop.
Although we have a flag for not applying retention every time we perform
compaction, we still see compaction getting blocked when processing some
intensive delete requests(processed while applying retention).

This PR separates out the compaction and retention to run in a separate
loop. I have added a table-locking feature to avoid compaction and
retention from processing the same tables at a time. However, compaction
and retention would treat locked tables differently, as explained below:
* When compaction sees a table is locked: It would skip the table and
move on to the following table. However, before skipping, it would check
if the table has any uncompacted files and increment the newly added
counter called `loki_compactor_skipped_compacting_locked_tables_total`
to track how often we are skipping compacting tables which have
uncompacted files.
* When retention sees a table is locked: It would wait for the lock to
be released since we can't skip any tables while processing delete
requests.

**Special notes for your reviewer**:
* The check for tables with uncompacted files looks for count > 1
because initially, we did not support per tenant index in
`boltdb-shipper`, so a table can have a single compacted multi-tenant
index file. In a rare case where we have a single file which was
supposed to be compacted away, it is okay to have a single uncompacted
file for a while. The aim here is to not block compaction for too long
in a large cell with too many uncompacted files.
* Retention only works on the compacted index, so we first compact down
any uncompacted files while applying retention.

**Checklist**
- [x] Tests updated

---------

Co-authored-by: Ashwanth <[email protected]>
  • Loading branch information
sandeepsukhani and ashwanthgoli authored Nov 20, 2023
1 parent 58eaad9 commit c716e49
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 39 deletions.
104 changes: 77 additions & 27 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("compactor.delete-request-store should be configured when retention is enabled")
}

if cfg.ApplyRetentionInterval != 0 && cfg.ApplyRetentionInterval%cfg.CompactionInterval != 0 {
return fmt.Errorf("interval for applying retention should either be set to a 0 or a multiple of compaction interval")
if cfg.ApplyRetentionInterval == 0 {
cfg.ApplyRetentionInterval = cfg.CompactionInterval
}

if err := config.ValidatePathPrefix(cfg.DeleteRequestStoreKeyPrefix); err != nil {
Expand Down Expand Up @@ -153,6 +153,7 @@ type Compactor struct {
wg sync.WaitGroup
indexCompactors map[string]IndexCompactor
schemaConfig config.SchemaConfig
tableLocker *tableLocker

// Ring used for running a single compactor
ringLifecycler *ring.BasicLifecycler
Expand Down Expand Up @@ -193,6 +194,7 @@ func NewCompactor(cfg Config, objectStoreClients map[config.DayTime]client.Objec
ringPollPeriod: 5 * time.Second,
indexCompactors: map[string]IndexCompactor{},
schemaConfig: schemaConfig,
tableLocker: newTableLocker(),
}

ringStore, err := kv.NewClient(
Expand Down Expand Up @@ -503,41 +505,52 @@ func (c *Compactor) runCompactions(ctx context.Context) {
}
}()

lastRetentionRunAt := time.Unix(0, 0)
runCompaction := func() {
applyRetention := false
if c.cfg.RetentionEnabled && time.Since(lastRetentionRunAt) >= c.cfg.ApplyRetentionInterval {
level.Info(util_log.Logger).Log("msg", "applying retention with compaction")
applyRetention = true
}
// do the initial compaction
if err := c.RunCompaction(ctx, false); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run compaction", err)
}

err := c.RunCompaction(ctx, applyRetention)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err)
}
c.wg.Add(1)
go func() {
defer c.wg.Done()

if applyRetention {
lastRetentionRunAt = time.Now()
ticker := time.NewTicker(c.cfg.CompactionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := c.RunCompaction(ctx, false); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run compaction", err)
}
case <-ctx.Done():
return
}
}
}
}()

c.wg.Add(1)
go func() {
defer c.wg.Done()
runCompaction()
if err := c.RunCompaction(ctx, true); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to apply retention", err)
}

ticker := time.NewTicker(c.cfg.CompactionInterval)
ticker := time.NewTicker(c.cfg.ApplyRetentionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
runCompaction()
if err := c.RunCompaction(ctx, true); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to apply retention", err)
}
case <-ctx.Done():
return
}
}
}()

if c.cfg.RetentionEnabled {
for _, container := range c.storeContainers {
c.wg.Add(1)
Expand Down Expand Up @@ -576,6 +589,37 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRet
return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String())
}

for {
locked, lockWaiterChan := c.tableLocker.lockTable(tableName)
if locked {
break
}
// do not wait for lock to be released if we are only compacting the table since
// compaction should happen more frequently than retention and retention anyway compacts un-compacted files as well.
if !applyRetention {
hasUncompactedIndex, err := tableHasUncompactedIndex(ctx, tableName, sc.indexStorageClient)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to check if table has uncompacted index", "table_name", tableName)
hasUncompactedIndex = true
}

if hasUncompactedIndex {
c.metrics.skippedCompactingLockedTables.Inc()
level.Warn(util_log.Logger).Log("msg", "skipped compacting table which likely has uncompacted index since it is locked by retention", "table_name", tableName)
}
return nil
}

// we are applying retention and processing delete requests so,
// wait for lock to be released since we can't mark delete requests as processed without checking all the tables
select {
case <-lockWaiterChan:
case <-ctx.Done():
return nil
}
}
defer c.tableLocker.unlockTable(tableName)

table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), sc.indexStorageClient, indexCompactor,
schemaCfg, sc.tableMarker, c.expirationChecker, c.cfg.UploadParallelism)
if err != nil {
Expand All @@ -601,7 +645,7 @@ func (c *Compactor) RegisterIndexCompactor(indexType string, indexCompactor Inde
c.indexCompactors[indexType] = indexCompactor
}

func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) error {
func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) (err error) {
status := statusSuccess
start := time.Now()

Expand All @@ -610,11 +654,15 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro
}

defer func() {
c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc()
if err != nil {
status = statusFailure
}
withRetentionLabelValue := fmt.Sprintf("%v", applyRetention)
c.metrics.compactTablesOperationTotal.WithLabelValues(status, withRetentionLabelValue).Inc()
runtime := time.Since(start)
if status == statusSuccess {
c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds())
c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime()
c.metrics.compactTablesOperationDurationSeconds.WithLabelValues(withRetentionLabelValue).Set(runtime.Seconds())
c.metrics.compactTablesOperationLastSuccess.WithLabelValues(withRetentionLabelValue).SetToCurrentTime()
if applyRetention {
c.metrics.applyRetentionLastSuccess.SetToCurrentTime()
}
Expand All @@ -627,7 +675,7 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro
c.expirationChecker.MarkPhaseFailed()
}
}
if runtime > c.cfg.CompactionInterval {
if !applyRetention && runtime > c.cfg.CompactionInterval {
level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("last compaction took %s which is longer than the compaction interval of %s, this can lead to duplicate compactors running if not running a standalone compactor instance.", runtime, c.cfg.CompactionInterval))
}
}()
Expand All @@ -644,7 +692,6 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro
sc.indexStorageClient.RefreshIndexTableNamesCache(ctx)
tbls, err := sc.indexStorageClient.ListTables(ctx)
if err != nil {
status = statusFailure
return fmt.Errorf("failed to list tables: %w", err)
}

Expand Down Expand Up @@ -721,12 +768,15 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro
for i := 0; i < c.cfg.MaxCompactionParallelism; i++ {
err := <-errChan
if err != nil && firstErr == nil {
status = statusFailure
firstErr = err
}
}

return firstErr
if firstErr != nil {
return firstErr
}

return ctx.Err()
}

type expirationChecker struct {
Expand Down
150 changes: 147 additions & 3 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/grafana/dskit/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

Expand All @@ -18,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util/constants"
loki_net "github.com/grafana/loki/pkg/util/net"
"github.com/grafana/loki/pkg/validation"
)

const indexTablePrefix = "table_"
Expand All @@ -41,7 +44,8 @@ func setupTestCompactor(t *testing.T, objectClients map[config.DayTime]client.Ob
cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.WorkingDirectory = filepath.Join(tempDir, workingDirName)
cfg.RetentionEnabled = false
cfg.RetentionEnabled = true
cfg.DeleteRequestStore = periodConfigs[len(periodConfigs)-1].ObjectType
cfg.CompactorRing.InstanceAddr = localhost

if loopbackIFace, err := loki_net.LoopbackInterfaceName(); err == nil {
Expand All @@ -50,9 +54,16 @@ func setupTestCompactor(t *testing.T, objectClients map[config.DayTime]client.Ob

require.NoError(t, cfg.Validate())

c, err := NewCompactor(cfg, objectClients, nil, config.SchemaConfig{
defaultLimits := validation.Limits{}
flagext.DefaultValues(&defaultLimits)
require.NoError(t, defaultLimits.RetentionPeriod.Set("30d"))

overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)

c, err := NewCompactor(cfg, objectClients, objectClients[periodConfigs[len(periodConfigs)-1].From], config.SchemaConfig{
Configs: periodConfigs,
}, nil, nil, constants.Loki)
}, overrides, prometheus.NewPedanticRegistry(), constants.Loki)
require.NoError(t, err)

c.RegisterIndexCompactor("dummy", testIndexCompactor{})
Expand Down Expand Up @@ -292,3 +303,136 @@ func Test_tableSort(t *testing.T) {
sortTablesByRange(intervals)
require.Equal(t, []string{"index_19195", "index_19192", "index_19191"}, intervals)
}

func TestCompactor_TableLocking(t *testing.T) {
commonDBsConfig := IndexesConfig{NumUnCompactedFiles: 5}
perUserDBsConfig := PerUserIndexesConfig{}

daySeconds := int64(24 * time.Hour / time.Second)
tableNumEnd := time.Now().Unix() / daySeconds
tableNumStart := tableNumEnd - 5

setupCompactorAndIndex := func(tempDir string) *Compactor {
tablesPath := filepath.Join(tempDir, "index")

periodConfigs := []config.PeriodConfig{
{
From: config.DayTime{Time: model.Time(0)},
IndexType: "dummy",
ObjectType: "fs_01",
IndexTables: config.IndexPeriodicTableConfig{
PathPrefix: "index/",
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
Period: config.ObjectStorageIndexRequiredPeriod,
}},
},
}

for i := tableNumStart; i <= tableNumEnd; i++ {
SetupTable(t, filepath.Join(tablesPath, fmt.Sprintf("%s%d", indexTablePrefix, i)), IndexesConfig{NumUnCompactedFiles: 5}, PerUserIndexesConfig{})
}

var (
objectClients = map[config.DayTime]client.ObjectClient{}
err error
)
objectClients[periodConfigs[0].From], err = local.NewFSObjectClient(local.FSConfig{Directory: tempDir})
require.NoError(t, err)

return setupTestCompactor(t, objectClients, periodConfigs, tempDir)
}

for _, tc := range []struct {
name string
lockTable string
applyRetention bool

compactionShouldTimeout bool
}{
{
name: "no table locked - not applying retention",
},
{
name: "no table locked - applying retention",
applyRetention: true,
},
{
name: "first table locked - not applying retention",
lockTable: fmt.Sprintf("%s%d", indexTablePrefix, tableNumEnd),
},
{
name: "first table locked - applying retention",
lockTable: fmt.Sprintf("%s%d", indexTablePrefix, tableNumEnd),
applyRetention: true,
compactionShouldTimeout: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
tempDir := t.TempDir()
tablesPath := filepath.Join(tempDir, "index")
compactor := setupCompactorAndIndex(tempDir)

// run the compaction twice, 2nd time without any table locking
for n := 1; n <= 2; n++ {
t.Run(fmt.Sprintf("%d", n), func(t *testing.T) {
// lock table only for the first run
if n == 1 && tc.lockTable != "" {
locked, _ := compactor.tableLocker.lockTable(tc.lockTable)
require.True(t, locked)

defer compactor.tableLocker.unlockTable(tc.lockTable)
}

// set a timeout so that retention does not get blocked forever on acquiring table lock.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err := compactor.RunCompaction(ctx, tc.applyRetention)
// compaction should not timeout after first run since we won't be locking the table
if n == 1 && tc.compactionShouldTimeout {
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, float64(1), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusFailure, "true")))
require.Equal(t, float64(0), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusFailure, "false")))
return
}
require.NoError(t, err)

if n > 1 && tc.compactionShouldTimeout {
// this should be the first successful run if compaction was expected to be timeout out during first run
require.Equal(t, float64(1), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusSuccess, fmt.Sprintf("%v", tc.applyRetention))))
} else {
// else it should have succeeded during all the n runs
require.Equal(t, float64(n), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusSuccess, fmt.Sprintf("%v", tc.applyRetention))))
}
require.Equal(t, float64(0), testutil.ToFloat64(compactor.metrics.compactTablesOperationTotal.WithLabelValues(statusSuccess, fmt.Sprintf("%v", !tc.applyRetention))))

// if the table was locked and compaction ran without retention then only locked table should have been skipped
if tc.lockTable != "" {
if tc.applyRetention {
require.Equal(t, float64(0), testutil.ToFloat64(compactor.metrics.skippedCompactingLockedTables))
} else {
require.Equal(t, float64(1), testutil.ToFloat64(compactor.metrics.skippedCompactingLockedTables))
}
}

for tableNum := tableNumStart; tableNum <= tableNumEnd; tableNum++ {
name := fmt.Sprintf("%s%d", indexTablePrefix, tableNum)
files, err := os.ReadDir(filepath.Join(tablesPath, name))
require.NoError(t, err)

if n == 1 && name == tc.lockTable {
// locked table should not be compacted during first run
require.Len(t, files, 5)
} else {
require.Len(t, files, 1)
require.True(t, strings.HasSuffix(files[0].Name(), ".gz"))

verifyCompactedIndexTable(t, commonDBsConfig, perUserDBsConfig, filepath.Join(tablesPath, name))
}
}
})
}
})
}
}
Loading

0 comments on commit c716e49

Please sign in to comment.