Skip to content

Commit

Permalink
feat(distributors): Use a pool of worker to push to ingesters.
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Sep 24, 2024
1 parent b6e9945 commit 0738dd0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,10 @@ ring:
# CLI flag: -distributor.ring.instance-interface-names
[instance_interface_names: <list of strings> | default = [<private network interfaces>]]
# Number of workers to push batches to ingesters.
# CLI flag: -distributor.push-worker-count
[push_worker_count: <int> | default = 256]
rate_store:
# The max number of concurrent requests to make to ingester stream apis
# CLI flag: -distributor.rate-store.max-request-parallelism
Expand Down
58 changes: 46 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"unicode"
"unsafe"
Expand Down Expand Up @@ -79,6 +80,7 @@ var allowedLabelsForLevel = map[string]struct{}{
type Config struct {
// Distributors ring
DistributorRing RingConfig `yaml:"ring,omitempty"`
PushWorkerCount int `yaml:"push_worker_count"`

// For testing.
factory ring_client.PoolFactory `yaml:"-"`
Expand All @@ -102,7 +104,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)

fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.")
fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
}
Expand Down Expand Up @@ -166,7 +168,9 @@ type Distributor struct {
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter

usageTracker push.UsageTracker
usageTracker push.UsageTracker
ingesterTasks chan pushIngesterTask
ingesterTaskWg sync.WaitGroup

// kafka
kafkaWriter KafkaProducer
Expand Down Expand Up @@ -253,6 +257,7 @@ func New(
rateLimitStrat: rateLimitStrat,
tee: tee,
usageTracker: usageTracker,
ingesterTasks: make(chan pushIngesterTask),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_ingester_appends_total",
Expand Down Expand Up @@ -350,10 +355,18 @@ func New(
}

func (d *Distributor) starting(ctx context.Context) error {
d.ingesterTaskWg.Add(d.cfg.PushWorkerCount)
for i := 0; i < d.cfg.PushWorkerCount; i++ {
go d.pushIngesterWorker()
}
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
}

func (d *Distributor) running(ctx context.Context) error {
defer func() {
close(d.ingesterTasks)
d.ingesterTaskWg.Wait()
}()
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -630,15 +643,20 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

for ingester, streams := range streamsByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, tenantID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendStreams(localCtx, ingester, samples, &tracker)
d.ingesterTasks <- pushIngesterTask{
ingester: ingester,
streamTracker: samples,
pushTracker: &tracker,
ctx: localCtx,
cancel: cancel,
}
}(ingesterDescs[ingester], streams)
}
}
Expand Down Expand Up @@ -830,9 +848,25 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto
validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes))
}

type pushIngesterTask struct {
streamTracker []*streamTracker
pushTracker *pushTracker
ingester ring.InstanceDesc
ctx context.Context
cancel context.CancelFunc
}

func (d *Distributor) pushIngesterWorker() {
defer d.ingesterTaskWg.Done()
for task := range d.ingesterTasks {
d.sendStreams(task)
}
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendStreamsErr(ctx, ingester, streamTrackers)
func (d *Distributor) sendStreams(task pushIngesterTask) {
defer task.cancel()
err := d.sendStreamsErr(task.ctx, task.ingester, task.streamTracker)

// If we succeed, decrement each stream's pending count by one.
// If we reach the required number of successful puts on this stream, then
Expand All @@ -843,17 +877,17 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes
//
// The use of atomic increments here guarantees only a single sendStreams
// goroutine will write to either channel.
for i := range streamTrackers {
for i := range task.streamTracker {
if err != nil {
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
if task.streamTracker[i].failed.Inc() <= int32(task.streamTracker[i].maxFailures) {
continue
}
pushTracker.doneWithResult(err)
task.pushTracker.doneWithResult(err)
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
if task.streamTracker[i].succeeded.Inc() != int32(task.streamTracker[i].minSuccess) {
continue
}
pushTracker.doneWithResult(nil)
task.pushTracker.doneWithResult(nil)
}
}
}
Expand Down

0 comments on commit 0738dd0

Please sign in to comment.