Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement partition compaction planner #6469

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ compactor:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# How long shuffle sharding planner would wait before running planning code.
# CLI flag: -compactor.sharding-planner-delay
[sharding_planner_delay: <duration> | default = 10s]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2330,6 +2330,10 @@ sharding_ring:
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# How long shuffle sharding planner would wait before running planning code.
# CLI flag: -compactor.sharding-planner-delay
[sharding_planner_delay: <duration> | default = 10s]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]
Expand Down
11 changes: 7 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var (
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {

if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionPlanner(ctx, bkt, logger)
return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics)
} else {
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
Expand Down Expand Up @@ -234,9 +234,10 @@ type Config struct {
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

// Compactors sharding.
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`
ShardingPlannerDelay time.Duration `yaml:"sharding_planner_delay"`

// Compaction strategy.
CompactionStrategy string `yaml:"compaction_strategy"`
Expand Down Expand Up @@ -304,6 +305,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")

f.DurationVar(&cfg.ShardingPlannerDelay, "compactor.sharding-planner-delay", 10*time.Second, "How long shuffle sharding planner would wait before running planning code.")
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *Config) Validate(limits validation.Limits) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/compactor/compactor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type compactorMetrics struct {
remainingPlannedCompactions *prometheus.GaugeVec
compactionErrorsCount *prometheus.CounterVec
partitionCount *prometheus.GaugeVec
compactionsNotPlanned *prometheus.CounterVec
}

const (
Expand Down Expand Up @@ -174,6 +175,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
Name: "cortex_compactor_group_partition_count",
Help: "Number of partitions for each compaction group.",
}, compactionLabels)
m.compactionsNotPlanned = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_compactor_group_compactions_not_planned",
Help: "Total number of group compaction not planned due to non-critical error (ie. group is currently visited by other compactor).",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is a bit confusing to me. Non-critical error... Is it recommended for users to alarm on this metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated description and use this metric for not planned case due to error

}, compactionLabels)

return &m
}
Expand Down Expand Up @@ -225,6 +230,7 @@ func (m *compactorMetrics) initMetricWithCompactionLabelValues(labelValue ...str
m.compactionFailures.WithLabelValues(labelValue...)
m.verticalCompactions.WithLabelValues(labelValue...)
m.partitionCount.WithLabelValues(labelValue...)
m.compactionsNotPlanned.WithLabelValues(labelValue...)
}

func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
Expand All @@ -236,4 +242,5 @@ func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
m.compactionFailures.DeleteLabelValues(userID)
m.verticalCompactions.DeleteLabelValues(userID)
m.partitionCount.DeleteLabelValues(userID)
m.compactionsNotPlanned.DeleteLabelValues(userID)
}
8 changes: 8 additions & 0 deletions pkg/compactor/compactor_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func TestSyncerMetrics(t *testing.T) {
cortex_compactor_group_partition_count{user="aaa"} 511060
cortex_compactor_group_partition_count{user="bbb"} 522170
cortex_compactor_group_partition_count{user="ccc"} 533280
# HELP cortex_compactor_group_compactions_not_planned Total number of group compaction not planned due to non-critical error (ie. group is currently visited by other compactor).
# TYPE cortex_compactor_group_compactions_not_planned counter
cortex_compactor_group_compactions_not_planned{user="aaa"} 544390
cortex_compactor_group_compactions_not_planned{user="bbb"} 555500
cortex_compactor_group_compactions_not_planned{user="ccc"} 566610
`))
require.NoError(t, err)

Expand Down Expand Up @@ -191,4 +196,7 @@ func generateTestData(cm *compactorMetrics, base float64) {
cm.partitionCount.WithLabelValues("aaa").Add(46 * base)
cm.partitionCount.WithLabelValues("bbb").Add(47 * base)
cm.partitionCount.WithLabelValues("ccc").Add(48 * base)
cm.compactionsNotPlanned.WithLabelValues("aaa").Add(49 * base)
cm.compactionsNotPlanned.WithLabelValues("bbb").Add(50 * base)
cm.compactionsNotPlanned.WithLabelValues("ccc").Add(51 * base)
}
130 changes: 123 additions & 7 deletions pkg/compactor/partition_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,146 @@ package compactor

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

var (
plannerCompletedPartitionError = errors.New("got completed partition")
plannerVisitedPartitionError = errors.New("got partition visited by other compactor")
)

type PartitionCompactionPlanner struct {
ctx context.Context
bkt objstore.InstrumentedBucket
logger log.Logger
ctx context.Context
bkt objstore.InstrumentedBucket
logger log.Logger
ranges []int64
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
ringLifecyclerID string
userID string
plannerDelay time.Duration
partitionVisitMarkerTimeout time.Duration
partitionVisitMarkerFileUpdateInterval time.Duration
compactorMetrics *compactorMetrics
}

func NewPartitionCompactionPlanner(
ctx context.Context,
bkt objstore.InstrumentedBucket,
logger log.Logger,
ranges []int64,
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
ringLifecyclerID string,
userID string,
plannerDelay time.Duration,
partitionVisitMarkerTimeout time.Duration,
partitionVisitMarkerFileUpdateInterval time.Duration,
compactorMetrics *compactorMetrics,
) *PartitionCompactionPlanner {
return &PartitionCompactionPlanner{
ctx: ctx,
bkt: bkt,
logger: logger,
ctx: ctx,
bkt: bkt,
logger: logger,
ranges: ranges,
noCompBlocksFunc: noCompBlocksFunc,
ringLifecyclerID: ringLifecyclerID,
userID: userID,
plannerDelay: plannerDelay,
partitionVisitMarkerTimeout: partitionVisitMarkerTimeout,
partitionVisitMarkerFileUpdateInterval: partitionVisitMarkerFileUpdateInterval,
compactorMetrics: compactorMetrics,
}
}

func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) {
panic("PartitionCompactionPlanner not implemented")
cortexMetaExtensions, err := tsdb.ConvertToCortexMetaExtensions(extensions)
if err != nil {
return nil, err
}
if cortexMetaExtensions == nil {
return nil, fmt.Errorf("cortexMetaExtensions cannot be nil")
}
return p.PlanWithPartition(ctx, metasByMinTime, cortexMetaExtensions, errChan)
}

func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) {
partitionInfo := cortexMetaExtensions.PartitionInfo
if partitionInfo == nil {
return nil, fmt.Errorf("partitionInfo cannot be nil")
}
partitionID := partitionInfo.PartitionID
partitionedGroupID := partitionInfo.PartitionedGroupID

// This delay would prevent double compaction when two compactors
// claimed same partition in grouper at same time.
time.Sleep(p.plannerDelay)

visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID)
visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker)
existingPartitionVisitMarker := &partitionVisitMarker{}
err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker)
visitMarkerExists := true
if err != nil {
if errors.Is(err, errorVisitMarkerNotFound) {
visitMarkerExists = false
} else {
return nil, fmt.Errorf("unable to get visit marker file for partition with partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error())
}
}
if visitMarkerExists {
if existingPartitionVisitMarker.GetStatus() == Completed {
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
level.Warn(p.logger).Log("msg", "partition is in completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String())
return nil, plannerCompletedPartitionError
}
if !existingPartitionVisitMarker.IsPendingByCompactor(p.partitionVisitMarkerTimeout, partitionID, p.ringLifecyclerID) {
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
level.Warn(p.logger).Log("msg", "partition is not visited by current compactor", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String())
return nil, plannerVisitedPartitionError
}
}

// Ensure all blocks fits within the largest range. This is a double check
// to ensure there's no bug in the previous blocks grouping, given this Plan()
// is just a pass-through.
// Modified from https://github.com/cortexproject/cortex/pull/2616/files#diff-e3051fc530c48bb276ba958dd8fadc684e546bd7964e6bc75cef9a86ef8df344R28-R63
largestRange := p.ranges[len(p.ranges)-1]
rangeStart := getRangeStart(metasByMinTime[0], largestRange)
rangeEnd := rangeStart + largestRange
noCompactMarked := p.noCompBlocksFunc()
resultMetas := make([]*metadata.Meta, 0, len(metasByMinTime))

for _, b := range metasByMinTime {
if b.ULID == DUMMY_BLOCK_ID {
continue
}
blockID := b.ULID.String()
if _, excluded := noCompactMarked[b.ULID]; excluded {
continue
}

if b.MinTime < rangeStart || b.MaxTime > rangeEnd {
return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", blockID, b.MinTime, b.MaxTime, rangeStart, rangeEnd)
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
}

resultMetas = append(resultMetas, b)
}

if len(resultMetas) < 1 {
level.Info(p.logger).Log("msg", "result meta size is empty", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "size", len(resultMetas))
danielblando marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

go visitMarkerManager.HeartBeat(p.ctx, errChan, p.partitionVisitMarkerFileUpdateInterval, false)
danielblando marked this conversation as resolved.
Show resolved Hide resolved

return resultMetas, nil
}
Loading
Loading