diff --git a/pkg/bloomcompactor/sharding.go b/pkg/bloomcompactor/sharding.go index 9d97c0444750e..10a023827167a 100644 --- a/pkg/bloomcompactor/sharding.go +++ b/pkg/bloomcompactor/sharding.go @@ -2,6 +2,7 @@ package bloomcompactor import ( "github.com/grafana/dskit/ring" + util_ring "github.com/grafana/loki/pkg/util/ring" ) var ( @@ -11,58 +12,31 @@ var ( // ShardingStrategy describes whether compactor "owns" given user or job. type ShardingStrategy interface { - OwnsTenant(tenant string) (bool, error) + util_ring.TenantSharding OwnsJob(job Job) (bool, error) } type ShuffleShardingStrategy struct { - ring *ring.Ring + util_ring.TenantSharding ringLifeCycler *ring.BasicLifecycler - limits Limits } func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy { s := ShuffleShardingStrategy{ - ring: r, + TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomCompactorShardSize), ringLifeCycler: ringLifecycler, - limits: limits, } return &s } -// getShuffleShardingSubring returns the subring to be used for a given user. -func (s *ShuffleShardingStrategy) getShuffleShardingSubring(tenantID string) ring.ReadRing { - shardSize := s.limits.BloomCompactorShardSize(tenantID) - - // A shard size of 0 means shuffle sharding is disabled for this specific user, - // so we just return the full ring so that blocks will be sharded across all compactors. - if shardSize <= 0 { - return s.ring - } - - return s.ring.ShuffleShard(tenantID, shardSize) -} - -func (s *ShuffleShardingStrategy) OwnsTenant(tenantID string) (bool, error) { - subRing := s.getShuffleShardingSubring(tenantID) - return subRing.HasInstance(s.ringLifeCycler.GetInstanceID()), nil -} - // OwnsJob makes sure only a single compactor should execute the job. -// TODO: Pretty similar to sharding strategy in pkg/bloomgateway/sharding.go func (s *ShuffleShardingStrategy) OwnsJob(job Job) (bool, error) { - // We check again if we own the tenant - subRing := s.getShuffleShardingSubring(job.Tenant()) - ownsTenant := subRing.HasInstance(s.ringLifeCycler.GetInstanceID()) - if !ownsTenant { + if !s.OwnsTenant(job.Tenant()) { return false, nil } - rs, err := subRing.Get(uint32(job.Fingerprint()), RingOp, nil, nil, nil) - if err != nil { - return false, err - } - - return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil + tenantRing := s.GetTenantSubRing(job.Tenant()) + fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, RingOp) + return fpSharding.OwnsFingerprint(uint64(job.Fingerprint())) } diff --git a/pkg/bloomgateway/sharding.go b/pkg/bloomgateway/sharding.go index 95cf4f05ab3a6..598a2046d1e01 100644 --- a/pkg/bloomgateway/sharding.go +++ b/pkg/bloomgateway/sharding.go @@ -5,6 +5,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/ring" + util_ring "github.com/grafana/loki/pkg/util/ring" ) // TODO(chaudum): Replace this placeholder with actual BlockRef struct. @@ -45,20 +46,17 @@ type ShardingStrategy interface { } type ShuffleShardingStrategy struct { - r ring.ReadRing - limits Limits - instanceAddr string - instanceID string - logger log.Logger + util_ring.TenantSharding + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + logger log.Logger } -func NewShuffleShardingStrategy(r ring.ReadRing, l Limits, instanceAddr, instanceID string, logger log.Logger) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r ring.ReadRing, ringLifecycler *ring.BasicLifecycler, limits Limits, logger log.Logger) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ - r: r, - limits: l, - instanceAddr: instanceAddr, - instanceID: instanceID, - logger: logger, + TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomGatewayShardSize), + ringLifeCycler: ringLifecycler, + logger: logger, } } @@ -69,17 +67,15 @@ func (s *ShuffleShardingStrategy) FilterTenants(_ context.Context, tenantIDs []s // instance, because of the auto-forget feature. if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil { return nil, err - } else if !set.Includes(s.instanceAddr) { + } else if !set.Includes(s.ringLifeCycler.GetInstanceID()) { return nil, errGatewayUnhealthy } var filteredIDs []string for _, tenantID := range tenantIDs { - subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits) - // Include the user only if it belongs to this bloom gateway shard. - if subRing.HasInstance(s.instanceID) { + if s.OwnsTenant(tenantID) { filteredIDs = append(filteredIDs, tenantID) } } @@ -94,35 +90,35 @@ func getBucket(rangeMin, rangeMax, pos uint64) int { // FilterBlocks implements ShardingStrategy. func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) { - filteredBlockRefs := make([]BlockRef, 0, len(blockRefs)) + if !s.OwnsTenant(tenantID) { + return nil, nil + } - subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits) + filteredBlockRefs := make([]BlockRef, 0, len(blockRefs)) - bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - var rs ring.ReplicationSet - var err error + tenantRing := s.GetTenantSubRing(tenantID) + fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, BlocksOwnerSync) for _, blockRef := range blockRefs { - rs, err = subRing.Get(uint32(blockRef.FromFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones) + owns, err := fpSharding.OwnsFingerprint(blockRef.FromFp) if err != nil { return nil, err } - // Include the block only if it belongs to this bloom gateway shard. - if rs.Includes(s.instanceID) { + if owns { filteredBlockRefs = append(filteredBlockRefs, blockRef) continue } - rs, err = subRing.Get(uint32(blockRef.ThroughFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones) + owns, err = fpSharding.OwnsFingerprint(blockRef.ThroughFp) if err != nil { return nil, err } - // Include the block only if it belongs to this bloom gateway shard. - if rs.Includes(s.instanceID) { + if owns { filteredBlockRefs = append(filteredBlockRefs, blockRef) continue } } + return filteredBlockRefs, nil } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index f23c2222f501c..a6d875d7ffedb 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1253,9 +1253,7 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler { func (t *Loki) initBloomGateway() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-gateway") - instanceAddr := t.bloomGatewayRingManager.RingLifecycler.GetInstanceAddr() - instanceID := t.bloomGatewayRingManager.RingLifecycler.GetInstanceID() - shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.Overrides, instanceAddr, instanceID, logger) + shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger) gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer) if err != nil { diff --git a/pkg/util/ring/sharding.go b/pkg/util/ring/sharding.go new file mode 100644 index 0000000000000..cb549ec02bb90 --- /dev/null +++ b/pkg/util/ring/sharding.go @@ -0,0 +1,85 @@ +package ring + +import ( + "github.com/grafana/dskit/ring" + "github.com/prometheus/common/model" +) + +type TenantSharding interface { + GetTenantSubRing(tenantID string) ring.ReadRing + OwnsTenant(tenantID string) bool +} + +type TenantShuffleSharding struct { + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + shardSizeForTenant func(tenantID string) int +} + +func NewTenantShuffleSharding( + r ring.ReadRing, + ringLifeCycler *ring.BasicLifecycler, + shardSizeForTenant func(tenantID string) int, +) *TenantShuffleSharding { + return &TenantShuffleSharding{ + r: r, + ringLifeCycler: ringLifeCycler, + shardSizeForTenant: shardSizeForTenant, + } +} + +func (s *TenantShuffleSharding) GetTenantSubRing(tenantID string) ring.ReadRing { + shardSize := s.shardSizeForTenant(tenantID) + + // A shard size of 0 means shuffle sharding is disabled for this specific user, + if shardSize <= 0 { + return s.r + } + + return s.r.ShuffleShard(tenantID, shardSize) +} + +func (s *TenantShuffleSharding) OwnsTenant(tenantID string) bool { + subRing := s.GetTenantSubRing(tenantID) + return subRing.HasInstance(s.ringLifeCycler.GetInstanceID()) +} + +type FingerprintSharding interface { + OwnsFingerprint(fp model.Fingerprint) (bool, error) +} + +// FingerprintShuffleSharding is not thread-safe. +type FingerprintShuffleSharding struct { + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + ringOp ring.Operation + + // Buffers for ring.Get() calls. + bufDescs []ring.InstanceDesc + bufHosts, bufZones []string +} + +func NewFingerprintShuffleSharding( + r ring.ReadRing, + ringLifeCycler *ring.BasicLifecycler, + ringOp ring.Operation, +) *FingerprintShuffleSharding { + s := FingerprintShuffleSharding{ + r: r, + ringLifeCycler: ringLifeCycler, + ringOp: ringOp, + } + + s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet() + + return &s +} + +func (s *FingerprintShuffleSharding) OwnsFingerprint(fp uint64) (bool, error) { + rs, err := s.r.Get(uint32(fp), s.ringOp, s.bufDescs, s.bufHosts, s.bufZones) + if err != nil { + return false, err + } + + return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil +}