Skip to content

Commit

Permalink
Bloom compactor shuffle-sharding (#11947)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Feb 14, 2024
1 parent bd12e16 commit 6434df7
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 301 deletions.
38 changes: 31 additions & 7 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bloomcompactor

import (
"context"
"math"
"sync"
"time"

Expand All @@ -11,16 +10,23 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
util_ring "github.com/grafana/loki/pkg/util/ring"
)

var (
RingOp = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE}, nil)
)

/*
Expand All @@ -47,7 +53,7 @@ type Compactor struct {
// temporary workaround until bloomStore has implemented read/write shipper interface
bloomStore bloomshipper.Store

sharding ShardingStrategy
sharding util_ring.TenantSharding

metrics *Metrics
btMetrics *v1.Metrics
Expand All @@ -59,7 +65,7 @@ func New(
storeCfg storage.Config,
clientMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
sharding ShardingStrategy,
sharding util_ring.TenantSharding,
limits Limits,
logger log.Logger,
r prometheus.Registerer,
Expand Down Expand Up @@ -182,9 +188,24 @@ func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Itera
return v1.NewSliceIter(tenants), nil
}

// TODO(owen-d): implement w/ subrings
func (c *Compactor) ownsTenant(_ string) (ownershipRange v1.FingerprintBounds, owns bool) {
return v1.NewBounds(0, math.MaxUint64), true
// ownsTenant returns the ownership range for the tenant, if the compactor owns the tenant, and an error.
func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error) {
tenantRing, owned := c.sharding.OwnsTenant(tenant)
if !owned {
return v1.FingerprintBounds{}, false, nil
}

rs, err := tenantRing.GetAllHealthy(RingOp)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting ring healthy instances")

}

ownershipBounds, err := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range")
}
return ownershipBounds, true, nil
}

// runs a single round of compaction for all relevant tenants and tables
Expand Down Expand Up @@ -232,7 +253,10 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {

for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
tenant := tenants.At()
ownershipRange, owns := c.ownsTenant(tenant)
ownershipRange, owns, err := c.ownsTenant(tenant)
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
if !owns {
continue
}
Expand Down
197 changes: 197 additions & 0 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package bloomcompactor

import (
"context"
"flag"
"fmt"
"math"
"testing"
"time"

"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
util_log "github.com/grafana/loki/pkg/util/log"
lokiring "github.com/grafana/loki/pkg/util/ring"
util_ring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/validation"
)

func TestCompactor_ownsTenant(t *testing.T) {
for _, tc := range []struct {
name string
limits Limits
compactors int

expectedCompactorsOwningTenant int
}{
{
name: "no sharding with one instance",
limits: mockLimits{
shardSize: 0,
},
compactors: 1,
expectedCompactorsOwningTenant: 1,
},
{
name: "no sharding with multiple instances",
limits: mockLimits{
shardSize: 0,
},
compactors: 10,
expectedCompactorsOwningTenant: 10,
},
{
name: "sharding with one instance",
limits: mockLimits{
shardSize: 5,
},
compactors: 1,
expectedCompactorsOwningTenant: 1,
},
{
name: "sharding with multiple instances",
limits: mockLimits{
shardSize: 5,
},
compactors: 10,
expectedCompactorsOwningTenant: 5,
},
} {
t.Run(tc.name, func(t *testing.T) {
var ringManagers []*lokiring.RingManager
var compactors []*Compactor
for i := 0; i < tc.compactors; i++ {
var ringCfg lokiring.RingConfig
ringCfg.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("ring", flag.PanicOnError))
ringCfg.KVStore.Store = "inmemory"
ringCfg.InstanceID = fmt.Sprintf("bloom-compactor-%d", i)
ringCfg.InstanceAddr = fmt.Sprintf("localhost-%d", i)

ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, ringCfg, 1, 1, util_log.Logger, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, ringManager.StartAsync(context.Background()))

shuffleSharding := util_ring.NewTenantShuffleSharding(ringManager.Ring, ringManager.RingLifecycler, tc.limits.BloomCompactorShardSize)

compactor := &Compactor{
cfg: Config{
Ring: ringCfg,
},
sharding: shuffleSharding,
limits: tc.limits,
}

ringManagers = append(ringManagers, ringManager)
compactors = append(compactors, compactor)
}
defer func() {
// Stop all rings and wait for them to stop.
for _, ringManager := range ringManagers {
ringManager.StopAsync()
require.Eventually(t, func() bool {
return ringManager.State() == services.Terminated
}, 1*time.Minute, 100*time.Millisecond)
}
}()

// Wait for all rings to see each other.
for _, ringManager := range ringManagers {
require.Eventually(t, func() bool {
running := ringManager.State() == services.Running
discovered := ringManager.Ring.InstancesCount() == tc.compactors
return running && discovered
}, 1*time.Minute, 100*time.Millisecond)
}

var compactorOwnsTenant int
var compactorOwnershipRange []v1.FingerprintBounds
for _, compactor := range compactors {
ownershipRange, ownsTenant, err := compactor.ownsTenant("tenant")
require.NoError(t, err)
if ownsTenant {
compactorOwnsTenant++
compactorOwnershipRange = append(compactorOwnershipRange, ownershipRange)
}
}
require.Equal(t, tc.expectedCompactorsOwningTenant, compactorOwnsTenant)

coveredKeySpace := v1.NewBounds(math.MaxUint64, 0)
for i, boundsA := range compactorOwnershipRange {
for j, boundsB := range compactorOwnershipRange {
if i == j {
continue
}
// Assert that the fingerprint key-space is not overlapping
require.False(t, boundsA.Overlaps(boundsB))
}

if boundsA.Min < coveredKeySpace.Min {
coveredKeySpace.Min = boundsA.Min
}
if boundsA.Max > coveredKeySpace.Max {
coveredKeySpace.Max = boundsA.Max
}

// Assert that the fingerprint key-space is evenly distributed across the compactors
// We do some adjustments if the key-space is not evenly distributable, so we use a delta of 10
// to account for that and check that the key-space is reasonably evenly distributed.
fpPerTenant := math.MaxUint64 / uint64(tc.expectedCompactorsOwningTenant)
boundsLen := uint64(boundsA.Max - boundsA.Min)
require.InDelta(t, fpPerTenant, boundsLen, 10)
}
// Assert that the fingerprint key-space is complete
require.True(t, coveredKeySpace.Equal(v1.NewBounds(0, math.MaxUint64)))
})
}
}

type mockLimits struct {
shardSize int
}

func (m mockLimits) AllByUserID() map[string]*validation.Limits {
panic("implement me")
}

func (m mockLimits) DefaultLimits() *validation.Limits {
panic("implement me")
}

func (m mockLimits) VolumeMaxSeries(_ string) int {
panic("implement me")
}

func (m mockLimits) BloomCompactorShardSize(_ string) int {
return m.shardSize
}

func (m mockLimits) BloomCompactorChunksBatchSize(_ string) int {
panic("implement me")
}

func (m mockLimits) BloomCompactorMaxTableAge(_ string) time.Duration {
panic("implement me")
}

func (m mockLimits) BloomCompactorEnabled(_ string) bool {
panic("implement me")
}

func (m mockLimits) BloomNGramLength(_ string) int {
panic("implement me")
}

func (m mockLimits) BloomNGramSkip(_ string) int {
panic("implement me")
}

func (m mockLimits) BloomFalsePositiveRate(_ string) float64 {
panic("implement me")
}

func (m mockLimits) BloomCompactorMaxBlockSize(_ string) int {
panic("implement me")
}
58 changes: 0 additions & 58 deletions pkg/bloomcompactor/sharding.go

This file was deleted.

Loading

0 comments on commit 6434df7

Please sign in to comment.