diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 08acf80bfcc22..15426e54d088f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2248,6 +2248,10 @@ ring: # CLI flag: -distributor.ring.instance-interface-names [instance_interface_names: | default = []] +# Number of workers to push batches to ingesters. +# CLI flag: -distributor.push-worker-count +[push_worker_count: | default = 256] + rate_store: # The max number of concurrent requests to make to ingester stream apis # CLI flag: -distributor.rate-store.max-request-parallelism diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 08fba483ec9bc..8712664dfa902 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -10,6 +10,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "unicode" "unsafe" @@ -79,6 +80,7 @@ var allowedLabelsForLevel = map[string]struct{}{ type Config struct { // Distributors ring DistributorRing RingConfig `yaml:"ring,omitempty"` + PushWorkerCount int `yaml:"push_worker_count"` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -102,7 +104,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.DistributorRing.RegisterFlags(fs) cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs) cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs) - + fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.") fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.") fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.") } @@ -166,7 +168,9 @@ type Distributor struct { replicationFactor prometheus.Gauge streamShardCount prometheus.Counter - usageTracker push.UsageTracker + usageTracker push.UsageTracker + ingesterTasks chan pushIngesterTask + ingesterTaskWg sync.WaitGroup // kafka kafkaWriter KafkaProducer @@ -253,6 +257,7 @@ func New( rateLimitStrat: rateLimitStrat, tee: tee, usageTracker: usageTracker, + ingesterTasks: make(chan pushIngesterTask), ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_ingester_appends_total", @@ -350,10 +355,18 @@ func New( } func (d *Distributor) starting(ctx context.Context) error { + d.ingesterTaskWg.Add(d.cfg.PushWorkerCount) + for i := 0; i < d.cfg.PushWorkerCount; i++ { + go d.pushIngesterWorker() + } return services.StartManagerAndAwaitHealthy(ctx, d.subservices) } func (d *Distributor) running(ctx context.Context) error { + defer func() { + close(d.ingesterTasks) + d.ingesterTaskWg.Wait() + }() select { case <-ctx.Done(): return nil @@ -630,15 +643,20 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } for ingester, streams := range streamsByIngester { - go func(ingester ring.InstanceDesc, samples []*streamTracker) { + func(ingester ring.InstanceDesc, samples []*streamTracker) { // Use a background context to make sure all ingesters get samples even if we return early localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) - defer cancel() localCtx = user.InjectOrgID(localCtx, tenantID) if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } - d.sendStreams(localCtx, ingester, samples, &tracker) + d.ingesterTasks <- pushIngesterTask{ + ingester: ingester, + streamTracker: samples, + pushTracker: &tracker, + ctx: localCtx, + cancel: cancel, + } }(ingesterDescs[ingester], streams) } } @@ -830,9 +848,25 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes)) } +type pushIngesterTask struct { + streamTracker []*streamTracker + pushTracker *pushTracker + ingester ring.InstanceDesc + ctx context.Context + cancel context.CancelFunc +} + +func (d *Distributor) pushIngesterWorker() { + defer d.ingesterTaskWg.Done() + for task := range d.ingesterTasks { + d.sendStreams(task) + } +} + // TODO taken from Cortex, see if we can refactor out an usable interface. -func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { - err := d.sendStreamsErr(ctx, ingester, streamTrackers) +func (d *Distributor) sendStreams(task pushIngesterTask) { + defer task.cancel() + err := d.sendStreamsErr(task.ctx, task.ingester, task.streamTracker) // If we succeed, decrement each stream's pending count by one. // If we reach the required number of successful puts on this stream, then @@ -843,17 +877,17 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes // // The use of atomic increments here guarantees only a single sendStreams // goroutine will write to either channel. - for i := range streamTrackers { + for i := range task.streamTracker { if err != nil { - if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { + if task.streamTracker[i].failed.Inc() <= int32(task.streamTracker[i].maxFailures) { continue } - pushTracker.doneWithResult(err) + task.pushTracker.doneWithResult(err) } else { - if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { + if task.streamTracker[i].succeeded.Inc() != int32(task.streamTracker[i].minSuccess) { continue } - pushTracker.doneWithResult(nil) + task.pushTracker.doneWithResult(nil) } } }