From 0738dd0dccd702df5bda992f6513eab830cf8a27 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 24 Sep 2024 14:54:29 +0200 Subject: [PATCH 1/3] feat(distributors): Use a pool of worker to push to ingesters. --- docs/sources/shared/configuration.md | 4 ++ pkg/distributor/distributor.go | 58 ++++++++++++++++++++++------ 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 08acf80bfcc22..15426e54d088f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2248,6 +2248,10 @@ ring: # CLI flag: -distributor.ring.instance-interface-names [instance_interface_names: | default = []] +# Number of workers to push batches to ingesters. +# CLI flag: -distributor.push-worker-count +[push_worker_count: | default = 256] + rate_store: # The max number of concurrent requests to make to ingester stream apis # CLI flag: -distributor.rate-store.max-request-parallelism diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 08fba483ec9bc..8712664dfa902 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -10,6 +10,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "unicode" "unsafe" @@ -79,6 +80,7 @@ var allowedLabelsForLevel = map[string]struct{}{ type Config struct { // Distributors ring DistributorRing RingConfig `yaml:"ring,omitempty"` + PushWorkerCount int `yaml:"push_worker_count"` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -102,7 +104,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.DistributorRing.RegisterFlags(fs) cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs) cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs) - + fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.") fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.") fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.") } @@ -166,7 +168,9 @@ type Distributor struct { replicationFactor prometheus.Gauge streamShardCount prometheus.Counter - usageTracker push.UsageTracker + usageTracker push.UsageTracker + ingesterTasks chan pushIngesterTask + ingesterTaskWg sync.WaitGroup // kafka kafkaWriter KafkaProducer @@ -253,6 +257,7 @@ func New( rateLimitStrat: rateLimitStrat, tee: tee, usageTracker: usageTracker, + ingesterTasks: make(chan pushIngesterTask), ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_ingester_appends_total", @@ -350,10 +355,18 @@ func New( } func (d *Distributor) starting(ctx context.Context) error { + d.ingesterTaskWg.Add(d.cfg.PushWorkerCount) + for i := 0; i < d.cfg.PushWorkerCount; i++ { + go d.pushIngesterWorker() + } return services.StartManagerAndAwaitHealthy(ctx, d.subservices) } func (d *Distributor) running(ctx context.Context) error { + defer func() { + close(d.ingesterTasks) + d.ingesterTaskWg.Wait() + }() select { case <-ctx.Done(): return nil @@ -630,15 +643,20 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } for ingester, streams := range streamsByIngester { - go func(ingester ring.InstanceDesc, samples []*streamTracker) { + func(ingester ring.InstanceDesc, samples []*streamTracker) { // Use a background context to make sure all ingesters get samples even if we return early localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) - defer cancel() localCtx = user.InjectOrgID(localCtx, tenantID) if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } - d.sendStreams(localCtx, ingester, samples, &tracker) + d.ingesterTasks <- pushIngesterTask{ + ingester: ingester, + streamTracker: samples, + pushTracker: &tracker, + ctx: localCtx, + cancel: cancel, + } }(ingesterDescs[ingester], streams) } } @@ -830,9 +848,25 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes)) } +type pushIngesterTask struct { + streamTracker []*streamTracker + pushTracker *pushTracker + ingester ring.InstanceDesc + ctx context.Context + cancel context.CancelFunc +} + +func (d *Distributor) pushIngesterWorker() { + defer d.ingesterTaskWg.Done() + for task := range d.ingesterTasks { + d.sendStreams(task) + } +} + // TODO taken from Cortex, see if we can refactor out an usable interface. -func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { - err := d.sendStreamsErr(ctx, ingester, streamTrackers) +func (d *Distributor) sendStreams(task pushIngesterTask) { + defer task.cancel() + err := d.sendStreamsErr(task.ctx, task.ingester, task.streamTracker) // If we succeed, decrement each stream's pending count by one. // If we reach the required number of successful puts on this stream, then @@ -843,17 +877,17 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes // // The use of atomic increments here guarantees only a single sendStreams // goroutine will write to either channel. - for i := range streamTrackers { + for i := range task.streamTracker { if err != nil { - if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { + if task.streamTracker[i].failed.Inc() <= int32(task.streamTracker[i].maxFailures) { continue } - pushTracker.doneWithResult(err) + task.pushTracker.doneWithResult(err) } else { - if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { + if task.streamTracker[i].succeeded.Inc() != int32(task.streamTracker[i].minSuccess) { continue } - pushTracker.doneWithResult(nil) + task.pushTracker.doneWithResult(nil) } } } From 130f8fc29f2af761bcdaf8763c35eacf1c1e9730 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 24 Sep 2024 15:34:17 +0200 Subject: [PATCH 2/3] improve shutdown --- pkg/distributor/distributor.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 8712664dfa902..a175008251397 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -355,18 +355,19 @@ func New( } func (d *Distributor) starting(ctx context.Context) error { - d.ingesterTaskWg.Add(d.cfg.PushWorkerCount) - for i := 0; i < d.cfg.PushWorkerCount; i++ { - go d.pushIngesterWorker() - } return services.StartManagerAndAwaitHealthy(ctx, d.subservices) } func (d *Distributor) running(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) defer func() { - close(d.ingesterTasks) + cancel() d.ingesterTaskWg.Wait() }() + d.ingesterTaskWg.Add(d.cfg.PushWorkerCount) + for i := 0; i < d.cfg.PushWorkerCount; i++ { + go d.pushIngesterWorker(ctx) + } select { case <-ctx.Done(): return nil @@ -856,10 +857,15 @@ type pushIngesterTask struct { cancel context.CancelFunc } -func (d *Distributor) pushIngesterWorker() { +func (d *Distributor) pushIngesterWorker(ctx context.Context) { defer d.ingesterTaskWg.Done() - for task := range d.ingesterTasks { - d.sendStreams(task) + for { + select { + case <-ctx.Done(): + return + case task := <-d.ingesterTasks: + d.sendStreams(task) + } } } From 11f8c40d0c82021ee0e26d7f3f95a458a1759513 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 24 Sep 2024 17:14:47 +0200 Subject: [PATCH 3/3] Don't lock on push to channekl --- pkg/distributor/distributor.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 78895a33d98c9..476bad507ea0b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -651,12 +651,18 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } - d.ingesterTasks <- pushIngesterTask{ + select { + case <-ctx.Done(): + cancel() + return + case d.ingesterTasks <- pushIngesterTask{ ingester: ingester, streamTracker: samples, pushTracker: &tracker, ctx: localCtx, cancel: cancel, + }: + return } }(ingesterDescs[ingester], streams) }