Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optionally ack writes to kafka on Push requests #14186

Merged
merged 8 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,14 @@ otlp_config:
# List of default otlp resource attributes to be picked as index labels
# CLI flag: -distributor.otlp.default_resource_attributes_as_index_labels
[default_resource_attributes_as_index_labels: <list of strings> | default = [service.name service.namespace service.instance.id deployment.environment cloud.region cloud.availability_zone k8s.cluster.name k8s.namespace.name k8s.pod.name k8s.container.name container.name k8s.replicaset.name k8s.deployment.name k8s.statefulset.name k8s.daemonset.name k8s.cronjob.name k8s.job.name]]

# Enable writes to Kafka during Push requests.
# CLI flag: -distributor.kafka-writes-enabled
[kafka_writes_enabled: <boolean> | default = false]

# Enable writes to Ingesters during Push requests. Defaults to true.
# CLI flag: -distributor.ingester-writes-enabled
[ingester_writes_enabled: <boolean> | default = true]
```

### etcd
Expand Down
262 changes: 213 additions & 49 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/prometheus/prometheus/model/labels"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/grpc/codes"

Expand All @@ -44,6 +45,7 @@ import (
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
Expand Down Expand Up @@ -88,6 +90,10 @@ type Config struct {
WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."`

OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`

KafkaEnabled bool `yaml:"kafka_writes_enabled"`
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
KafkaConfig kafka.Config `yaml:"-"`
}

// RegisterFlags registers distributor-related flags.
Expand All @@ -96,13 +102,28 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)

fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
}

func (cfg *Config) Validate() error {
if !cfg.KafkaEnabled && !cfg.IngesterEnabled {
return fmt.Errorf("at least one of kafka and ingestor writes must be enabled")
}
return nil
}

// RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStore interface {
RateFor(tenantID string, streamHash uint64) (int64, float64)
}

type KafkaProducer interface {
ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults
Close()
}

// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
services.Service
Expand Down Expand Up @@ -146,6 +167,16 @@ type Distributor struct {
streamShardCount prometheus.Counter

usageTracker push.UsageTracker

// kafka
kafkaWriter KafkaProducer
partitionRing ring.PartitionRingReader

// kafka metrics
kafkaAppends *prometheus.CounterVec
kafkaWriteBytesTotal prometheus.Counter
kafkaWriteLatency prometheus.Histogram
kafkaRecordsPerRequest prometheus.Histogram
}

// New a distributor creates.
Expand All @@ -154,6 +185,7 @@ func New(
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
partitionRing ring.PartitionRingReader,
overrides Limits,
registerer prometheus.Registerer,
metricsNamespace string,
Expand Down Expand Up @@ -192,6 +224,20 @@ func New(
return nil, err
}

if partitionRing == nil && cfg.KafkaEnabled {
return nil, fmt.Errorf("partition ring is required for kafka writes")
}

var kafkaWriter KafkaProducer
if cfg.KafkaEnabled {
kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
if err != nil {
return nil, fmt.Errorf("failed to start kafka client: %w", err)
}
kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
prometheus.WrapRegistererWithPrefix("_kafka_", registerer))
}

d := &Distributor{
cfg: cfg,
logger: logger,
Expand Down Expand Up @@ -227,7 +273,30 @@ func New(
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "kafka_appends_total",
Help: "The total number of appends sent to kafka ingest path.",
}, []string{"partition", "status"}),
kafkaWriteLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_latency_seconds",
Help: "Latency to write an incoming request to the ingest storage.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
Buckets: prometheus.DefBuckets,
}),
kafkaWriteBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "kafka_sent_bytes_total",
Help: "Total number of bytes sent to the ingest storage.",
}),
kafkaRecordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_records_per_write_request",
Help: "The number of records a single per-partition write request has been split into.",
Buckets: prometheus.ExponentialBuckets(1, 2, 8),
}),
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
partitionRing: partitionRing,
}

if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
Expand Down Expand Up @@ -294,6 +363,9 @@ func (d *Distributor) running(ctx context.Context) error {
}

func (d *Distributor) stopping(_ error) error {
if d.kafkaWriter != nil {
d.kafkaWriter.Close()
}
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

Expand All @@ -319,6 +391,21 @@ type pushTracker struct {
err chan error
}

// doneWithResult records the result of a stream push.
// If err is nil, the stream push is considered successful.
// If err is not nil, the stream push is considered failed.
func (p *pushTracker) doneWithResult(err error) {
if err == nil {
if p.streamsPending.Dec() == 0 {
p.done <- struct{}{}
}
} else {
if p.streamsFailed.Inc() == 1 {
p.err <- err
}
}
}

// Push a set of streams.
// The returned error is the last one seen.
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
Expand Down Expand Up @@ -488,57 +575,74 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc

streamTrackers := make([]streamTracker, len(streams))
streamsByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
tracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
err: make(chan error, 1),
}
streamsToWrite := 0
if d.cfg.IngesterEnabled {
streamsToWrite += len(streams)
}
if d.cfg.KafkaEnabled {
streamsToWrite += len(streams)
}
// We must correctly set streamsPending before beginning any writes to ensure we don't have a race between finishing all of one path before starting the other.
tracker.streamsPending.Store(int32(streamsToWrite))

if err := func() error {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogKV("event", "started to query ingesters ring")
defer func() {
sp.LogKV("event", "finished to query ingesters ring")
}()
}
if d.cfg.KafkaEnabled {
// We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded.
d.sendStreamsToKafka(ctx, streams, tenantID, &tracker)
}

for i, stream := range streams {
replicationSet, err := d.ingestersRing.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil)
if err != nil {
return err
if d.cfg.IngesterEnabled {
streamTrackers := make([]streamTracker, len(streams))
streamsByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}

if err := func() error {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogKV("event", "started to query ingesters ring")
defer func() {
sp.LogKV("event", "finished to query ingesters ring")
}()
}

streamTrackers[i] = streamTracker{
KeyedStream: stream,
minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors,
maxFailures: replicationSet.MaxErrors,
}
for _, ingester := range replicationSet.Instances {
streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streamTrackers[i])
ingesterDescs[ingester.Addr] = ingester
for i, stream := range streams {
replicationSet, err := d.ingestersRing.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil)
if err != nil {
return err
}

streamTrackers[i] = streamTracker{
KeyedStream: stream,
minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors,
maxFailures: replicationSet.MaxErrors,
}
for _, ingester := range replicationSet.Instances {
streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streamTrackers[i])
ingesterDescs[ingester.Addr] = ingester
}
}
return nil
}(); err != nil {
return nil, err
}
return nil
}(); err != nil {
return nil, err
}

tracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
tracker.streamsPending.Store(int32(len(streams)))
for ingester, streams := range streamsByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, tenantID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendStreams(localCtx, ingester, samples, &tracker)
}(ingesterDescs[ingester], streams)
for ingester, streams := range streamsByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, tenantID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendStreams(localCtx, ingester, samples, &tracker)
}(ingesterDescs[ingester], streams)
}
}

select {
case err := <-tracker.err:
return nil, err
Expand Down Expand Up @@ -744,16 +848,12 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
continue
}
if pushTracker.streamsFailed.Inc() == 1 {
pushTracker.err <- err
}
pushTracker.doneWithResult(err)
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if pushTracker.streamsPending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
pushTracker.doneWithResult(nil)
}
}
}
Expand Down Expand Up @@ -785,6 +885,70 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
return err
}

func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker) {
for _, s := range streams {
go func(s KeyedStream) {
err := d.sendStreamToKafka(ctx, s, tenant)
if err != nil {
err = fmt.Errorf("failed to write stream to kafka: %w", err)
}
tracker.doneWithResult(err)
}(s)
}
}

func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string) error {
if len(stream.Stream.Entries) == 0 {
return nil
}
/* partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
if err != nil {
d.kafkaAppends.WithLabelValues("kafka", "fail").Inc()
return fmt.Errorf("failed to find active partition for stream: %w", err)
}*/
partitionID := int32(0)

startTime := time.Now()

records, err := kafka.Encode(partitionID, tenant, stream.Stream, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes)
if err != nil {
d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
return fmt.Errorf("failed to marshal write request to records: %w", err)
}

d.kafkaRecordsPerRequest.Observe(float64(len(records)))

produceResults := d.kafkaWriter.ProduceSync(ctx, records)

if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 {
d.kafkaWriteLatency.Observe(time.Since(startTime).Seconds())
d.kafkaWriteBytesTotal.Add(float64(sizeBytes))
}

var finalErr error
for _, result := range produceResults {
if result.Err != nil {
d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
finalErr = result.Err
} else {
d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc()
}
}

return finalErr
}

func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) {
for _, res := range results {
if res.Err == nil && res.Record != nil {
count++
sizeBytes += len(res.Record.Value)
}
}

return
}

type labelData struct {
ls labels.Labels
hash uint64
Expand Down
Loading
Loading