Skip to content

Commit

Permalink
Extract common logic for sharding from bloom-gw and bloom-compactor i…
Browse files Browse the repository at this point in the history
…nto a ring utils lib
  • Loading branch information
salvacorts committed Nov 8, 2023
1 parent 27240d7 commit 368f8f0
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 63 deletions.
42 changes: 8 additions & 34 deletions pkg/bloomcompactor/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomcompactor

import (
"github.com/grafana/dskit/ring"
util_ring "github.com/grafana/loki/pkg/util/ring"
)

var (
Expand All @@ -11,58 +12,31 @@ var (

// ShardingStrategy describes whether compactor "owns" given user or job.
type ShardingStrategy interface {
OwnsTenant(tenant string) (bool, error)
util_ring.TenantSharding
OwnsJob(job Job) (bool, error)
}

type ShuffleShardingStrategy struct {
ring *ring.Ring
util_ring.TenantSharding
ringLifeCycler *ring.BasicLifecycler
limits Limits
}

func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy {
s := ShuffleShardingStrategy{
ring: r,
TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomCompactorShardSize),
ringLifeCycler: ringLifecycler,
limits: limits,
}

return &s
}

// getShuffleShardingSubring returns the subring to be used for a given user.
func (s *ShuffleShardingStrategy) getShuffleShardingSubring(tenantID string) ring.ReadRing {
shardSize := s.limits.BloomCompactorShardSize(tenantID)

// A shard size of 0 means shuffle sharding is disabled for this specific user,
// so we just return the full ring so that blocks will be sharded across all compactors.
if shardSize <= 0 {
return s.ring
}

return s.ring.ShuffleShard(tenantID, shardSize)
}

func (s *ShuffleShardingStrategy) OwnsTenant(tenantID string) (bool, error) {
subRing := s.getShuffleShardingSubring(tenantID)
return subRing.HasInstance(s.ringLifeCycler.GetInstanceID()), nil
}

// OwnsJob makes sure only a single compactor should execute the job.
// TODO: Pretty similar to sharding strategy in pkg/bloomgateway/sharding.go
func (s *ShuffleShardingStrategy) OwnsJob(job Job) (bool, error) {
// We check again if we own the tenant
subRing := s.getShuffleShardingSubring(job.Tenant())
ownsTenant := subRing.HasInstance(s.ringLifeCycler.GetInstanceID())
if !ownsTenant {
if !s.OwnsTenant(job.Tenant()) {
return false, nil
}

rs, err := subRing.Get(uint32(job.Fingerprint()), RingOp, nil, nil, nil)
if err != nil {
return false, err
}

return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil
tenantRing := s.GetTenantSubRing(job.Tenant())
fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, RingOp)
return fpSharding.OwnsFingerprint(uint64(job.Fingerprint()))
}
48 changes: 22 additions & 26 deletions pkg/bloomgateway/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
util_ring "github.com/grafana/loki/pkg/util/ring"
)

// TODO(chaudum): Replace this placeholder with actual BlockRef struct.
Expand Down Expand Up @@ -45,20 +46,17 @@ type ShardingStrategy interface {
}

type ShuffleShardingStrategy struct {
r ring.ReadRing
limits Limits
instanceAddr string
instanceID string
logger log.Logger
util_ring.TenantSharding
r ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
logger log.Logger
}

func NewShuffleShardingStrategy(r ring.ReadRing, l Limits, instanceAddr, instanceID string, logger log.Logger) *ShuffleShardingStrategy {
func NewShuffleShardingStrategy(r ring.ReadRing, ringLifecycler *ring.BasicLifecycler, limits Limits, logger log.Logger) *ShuffleShardingStrategy {
return &ShuffleShardingStrategy{
r: r,
limits: l,
instanceAddr: instanceAddr,
instanceID: instanceID,
logger: logger,
TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomGatewayShardSize),
ringLifeCycler: ringLifecycler,
logger: logger,
}
}

Expand All @@ -69,17 +67,15 @@ func (s *ShuffleShardingStrategy) FilterTenants(_ context.Context, tenantIDs []s
// instance, because of the auto-forget feature.
if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil {
return nil, err
} else if !set.Includes(s.instanceAddr) {
} else if !set.Includes(s.ringLifeCycler.GetInstanceID()) {
return nil, errGatewayUnhealthy
}

var filteredIDs []string

for _, tenantID := range tenantIDs {
subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits)

// Include the user only if it belongs to this bloom gateway shard.
if subRing.HasInstance(s.instanceID) {
if s.OwnsTenant(tenantID) {
filteredIDs = append(filteredIDs, tenantID)
}
}
Expand All @@ -94,35 +90,35 @@ func getBucket(rangeMin, rangeMax, pos uint64) int {

// FilterBlocks implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) {
filteredBlockRefs := make([]BlockRef, 0, len(blockRefs))
if !s.OwnsTenant(tenantID) {
return nil, nil
}

subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits)
filteredBlockRefs := make([]BlockRef, 0, len(blockRefs))

bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
var rs ring.ReplicationSet
var err error
tenantRing := s.GetTenantSubRing(tenantID)

fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, BlocksOwnerSync)
for _, blockRef := range blockRefs {
rs, err = subRing.Get(uint32(blockRef.FromFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones)
owns, err := fpSharding.OwnsFingerprint(blockRef.FromFp)
if err != nil {
return nil, err
}
// Include the block only if it belongs to this bloom gateway shard.
if rs.Includes(s.instanceID) {
if owns {
filteredBlockRefs = append(filteredBlockRefs, blockRef)
continue
}

rs, err = subRing.Get(uint32(blockRef.ThroughFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones)
owns, err = fpSharding.OwnsFingerprint(blockRef.ThroughFp)
if err != nil {
return nil, err
}
// Include the block only if it belongs to this bloom gateway shard.
if rs.Includes(s.instanceID) {
if owns {
filteredBlockRefs = append(filteredBlockRefs, blockRef)
continue
}
}

return filteredBlockRefs, nil
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,9 +1253,7 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler {
func (t *Loki) initBloomGateway() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "bloom-gateway")

instanceAddr := t.bloomGatewayRingManager.RingLifecycler.GetInstanceAddr()
instanceID := t.bloomGatewayRingManager.RingLifecycler.GetInstanceID()
shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.Overrides, instanceAddr, instanceID, logger)
shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger)

gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
if err != nil {
Expand Down
85 changes: 85 additions & 0 deletions pkg/util/ring/sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package ring

import (
"github.com/grafana/dskit/ring"
"github.com/prometheus/common/model"
)

type TenantSharding interface {
GetTenantSubRing(tenantID string) ring.ReadRing
OwnsTenant(tenantID string) bool
}

type TenantShuffleSharding struct {
r ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
shardSizeForTenant func(tenantID string) int
}

func NewTenantShuffleSharding(
r ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
shardSizeForTenant func(tenantID string) int,
) *TenantShuffleSharding {
return &TenantShuffleSharding{
r: r,
ringLifeCycler: ringLifeCycler,
shardSizeForTenant: shardSizeForTenant,
}
}

func (s *TenantShuffleSharding) GetTenantSubRing(tenantID string) ring.ReadRing {
shardSize := s.shardSizeForTenant(tenantID)

// A shard size of 0 means shuffle sharding is disabled for this specific user,
if shardSize <= 0 {
return s.r
}

return s.r.ShuffleShard(tenantID, shardSize)
}

func (s *TenantShuffleSharding) OwnsTenant(tenantID string) bool {
subRing := s.GetTenantSubRing(tenantID)
return subRing.HasInstance(s.ringLifeCycler.GetInstanceID())
}

type FingerprintSharding interface {
OwnsFingerprint(fp model.Fingerprint) (bool, error)
}

// FingerprintShuffleSharding is not thread-safe.
type FingerprintShuffleSharding struct {
r ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
ringOp ring.Operation

// Buffers for ring.Get() calls.
bufDescs []ring.InstanceDesc
bufHosts, bufZones []string
}

func NewFingerprintShuffleSharding(
r ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
ringOp ring.Operation,
) *FingerprintShuffleSharding {
s := FingerprintShuffleSharding{
r: r,
ringLifeCycler: ringLifeCycler,
ringOp: ringOp,
}

s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet()

return &s
}

func (s *FingerprintShuffleSharding) OwnsFingerprint(fp uint64) (bool, error) {
rs, err := s.r.Get(uint32(fp), s.ringOp, s.bufDescs, s.bufHosts, s.bufZones)
if err != nil {
return false, err
}

return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil
}

0 comments on commit 368f8f0

Please sign in to comment.