Skip to content

Commit

Permalink
feat(kafka): Implement limiter using partition ring for Kafka (#14359)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Oct 4, 2024
1 parent 39b57ec commit 5cbb239
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 91 deletions.
5 changes: 3 additions & 2 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ func Test_SeriesIterator(t *testing.T) {

limits, err := validation.NewOverrides(l, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

for i := 0; i < 3; i++ {
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
Expand Down Expand Up @@ -505,7 +506,7 @@ func Benchmark_SeriesIterator(b *testing.B) {

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

for i := range instances {
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.lifecyclerWatcher.WatchService(i.partitionReader)
}

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)

i.Service = services.NewBasicService(i.starting, i.running, i.stopping)

i.setupAutoForget()
Expand All @@ -408,12 +404,18 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
}

var limiterStrategy limiterRingStrategy
var ownedStreamsStrategy ownershipStrategy
if i.cfg.KafkaIngestion.Enabled {
limiterStrategy = newPartitionRingLimiterStrategy(partitionRingWatcher, limits.IngestionPartitionsTenantShardSize)
ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, limits.IngestionPartitionsTenantShardSize, util_log.Logger)
} else {
limiterStrategy = newIngesterRingLimiterStrategy(i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger)
}
// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, metrics, limiterStrategy)
i.recalculateOwnedStreams = newRecalculateOwnedStreamsSvc(i.getInstances, ownedStreamsStrategy, cfg.OwnedStreamsCheckInterval, util_log.Logger)

return i, nil
Expand Down
18 changes: 9 additions & 9 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var NilMetrics = newIngesterMetrics(nil, constants.Loki)
func TestLabelsCollisions(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
require.Nil(t, err)
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestLabelsCollisions(t *testing.T) {
func TestConcurrentPushes(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
require.Nil(t, err)
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestConcurrentPushes(t *testing.T) {
func TestGetStreamRates(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -245,7 +245,7 @@ func labelHashNoShard(l labels.Labels) uint64 {
func TestSyncPeriod(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

const (
syncPeriod = 1 * time.Minute
Expand Down Expand Up @@ -290,7 +290,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
t.Helper()
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
indexShards := 2

// just some random values
Expand Down Expand Up @@ -507,7 +507,7 @@ func makeRandomLabels() labels.Labels {
func Benchmark_PushInstance(b *testing.B) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
ctx := context.Background()
Expand Down Expand Up @@ -549,7 +549,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
l.MaxLocalStreamsPerUser = 100000
limits, err := validation.NewOverrides(l, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

ctx := context.Background()

Expand Down Expand Up @@ -1089,7 +1089,7 @@ func TestStreamShardingUsage(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), limitsDefinition)
require.NoError(t, err)

limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))

defaultShardStreamsCfg := limiter.limits.ShardStreams("fake")
tenantShardStreamsCfg := limiter.limits.ShardStreams(customTenant1)
Expand Down Expand Up @@ -1454,7 +1454,7 @@ func defaultInstance(t *testing.T) *instance {
&ingesterConfig,
defaultPeriodConfigs,
"fake",
NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1),
NewLimiter(overrides, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)),
loki_runtime.DefaultTenantConfigs(),
noopWAL{},
NilMetrics,
Expand Down
75 changes: 59 additions & 16 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"sync"
"time"

"github.com/grafana/dskit/ring"
"golang.org/x/time/rate"

"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/validation"
)

const (
errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, global/ingesters: %d)"
errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, local share: %d)"
)

// RingCount is the interface exposed by a ring implementation which allows
Expand All @@ -37,10 +38,9 @@ type Limits interface {
// Limiter implements primitives to get the maximum number of streams
// an ingester can handle for a specific tenant
type Limiter struct {
limits Limits
ring RingCount
replicationFactor int
metrics *ingesterMetrics
limits Limits
ringStrategy limiterRingStrategy
metrics *ingesterMetrics

mtx sync.RWMutex
disabled bool
Expand All @@ -60,13 +60,16 @@ func (l *Limiter) Enable() {
l.metrics.limiterEnabled.Set(1)
}

type limiterRingStrategy interface {
convertGlobalToLocalLimit(int, string) int
}

// NewLimiter makes a new limiter
func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter {
func NewLimiter(limits Limits, metrics *ingesterMetrics, ingesterRingLimiterStrategy limiterRingStrategy) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
metrics: metrics,
limits: limits,
ringStrategy: ingesterRingLimiterStrategy,
metrics: metrics,
}
}

Expand All @@ -87,7 +90,7 @@ func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLi
// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID)
adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit)
adjustedGlobalLimit = l.ringStrategy.convertGlobalToLocalLimit(globalLimit, tenantID)

// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit)
Expand All @@ -108,35 +111,75 @@ func (l *Limiter) minNonZero(first, second int) int {
return first
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
type ingesterRingLimiterStrategy struct {
ring RingCount
replicationFactor int
}

func newIngesterRingLimiterStrategy(ring RingCount, replicationFactor int) *ingesterRingLimiterStrategy {
return &ingesterRingLimiterStrategy{
ring: ring,
replicationFactor: replicationFactor,
}
}

func (l *ingesterRingLimiterStrategy) convertGlobalToLocalLimit(globalLimit int, _ string) int {
if globalLimit == 0 || l.replicationFactor == 0 {
return 0
}

zonesCount := l.ring.ZonesCount()
if zonesCount <= 1 {
return calculateLimitForSingleZone(globalLimit, l)
return l.calculateLimitForSingleZone(globalLimit)
}

return calculateLimitForMultipleZones(globalLimit, zonesCount, l)
return l.calculateLimitForMultipleZones(globalLimit, zonesCount)
}

func calculateLimitForSingleZone(globalLimit int, l *Limiter) int {
func (l *ingesterRingLimiterStrategy) calculateLimitForSingleZone(globalLimit int) int {
numIngesters := l.ring.HealthyInstancesCount()
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}
return 0
}

func calculateLimitForMultipleZones(globalLimit, zonesCount int, l *Limiter) int {
func (l *ingesterRingLimiterStrategy) calculateLimitForMultipleZones(globalLimit, zonesCount int) int {
ingestersInZone := l.ring.HealthyInstancesInZoneCount()
if ingestersInZone > 0 {
return int((float64(globalLimit) * float64(l.replicationFactor)) / float64(zonesCount) / float64(ingestersInZone))
}
return 0
}

type partitionRingLimiterStrategy struct {
ring ring.PartitionRingReader
getPartitionShardSize func(user string) int
}

func newPartitionRingLimiterStrategy(ring ring.PartitionRingReader, getPartitionShardSize func(user string) int) *partitionRingLimiterStrategy {
return &partitionRingLimiterStrategy{
ring: ring,
getPartitionShardSize: getPartitionShardSize,
}
}

func (l *partitionRingLimiterStrategy) convertGlobalToLocalLimit(globalLimit int, tenantID string) int {
if globalLimit == 0 {
return 0
}

userShardSize := l.getPartitionShardSize(tenantID)

// ShuffleShardSize correctly handles cases when user has no shard config or more shards than number of active partitions in the ring.
activePartitionsForUser := l.ring.PartitionRing().ShuffleShardSize(userShardSize)

if activePartitionsForUser == 0 {
return 0
}
return int(float64(globalLimit) / float64(activePartitionsForUser))
}

type supplier[T any] func() T

type streamCountLimiter struct {
Expand Down
Loading

0 comments on commit 5cbb239

Please sign in to comment.