diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 2f29e1a3e04ed..2610f3bcd9e39 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2264,6 +2264,14 @@ otlp_config: # List of default otlp resource attributes to be picked as index labels # CLI flag: -distributor.otlp.default_resource_attributes_as_index_labels [default_resource_attributes_as_index_labels: | default = [service.name service.namespace service.instance.id deployment.environment cloud.region cloud.availability_zone k8s.cluster.name k8s.namespace.name k8s.pod.name k8s.container.name container.name k8s.replicaset.name k8s.deployment.name k8s.statefulset.name k8s.daemonset.name k8s.cronjob.name k8s.job.name]] + +# Enable writes to Kafka during Push requests. +# CLI flag: -distributor.kafka-writes-enabled +[kafka_writes_enabled: | default = false] + +# Enable writes to Ingesters during Push requests. Defaults to true. +# CLI flag: -distributor.ingester-writes-enabled +[ingester_writes_enabled: | default = true] ``` ### etcd diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f6ae454e1482a..08fba483ec9bc 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/prometheus/prometheus/model/labels" + "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc/codes" @@ -44,6 +45,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" + "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" @@ -88,6 +90,10 @@ type Config struct { WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."` OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"` + + KafkaEnabled bool `yaml:"kafka_writes_enabled"` + IngesterEnabled bool `yaml:"ingester_writes_enabled"` + KafkaConfig kafka.Config `yaml:"-"` } // RegisterFlags registers distributor-related flags. @@ -96,6 +102,16 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.DistributorRing.RegisterFlags(fs) cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs) cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs) + + fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.") + fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.") +} + +func (cfg *Config) Validate() error { + if !cfg.KafkaEnabled && !cfg.IngesterEnabled { + return fmt.Errorf("at least one of kafka and ingestor writes must be enabled") + } + return nil } // RateStore manages the ingestion rate of streams, populated by data fetched from ingesters. @@ -103,6 +119,11 @@ type RateStore interface { RateFor(tenantID string, streamHash uint64) (int64, float64) } +type KafkaProducer interface { + ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults + Close() +} + // Distributor coordinates replicates and distribution of log streams. type Distributor struct { services.Service @@ -146,6 +167,16 @@ type Distributor struct { streamShardCount prometheus.Counter usageTracker push.UsageTracker + + // kafka + kafkaWriter KafkaProducer + partitionRing ring.PartitionRingReader + + // kafka metrics + kafkaAppends *prometheus.CounterVec + kafkaWriteBytesTotal prometheus.Counter + kafkaWriteLatency prometheus.Histogram + kafkaRecordsPerRequest prometheus.Histogram } // New a distributor creates. @@ -154,6 +185,7 @@ func New( clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, + partitionRing ring.PartitionRingReader, overrides Limits, registerer prometheus.Registerer, metricsNamespace string, @@ -192,6 +224,20 @@ func New( return nil, err } + if partitionRing == nil && cfg.KafkaEnabled { + return nil, fmt.Errorf("partition ring is required for kafka writes") + } + + var kafkaWriter KafkaProducer + if cfg.KafkaEnabled { + kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer) + if err != nil { + return nil, fmt.Errorf("failed to start kafka client: %w", err) + } + kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, + prometheus.WrapRegistererWithPrefix("_kafka_", registerer)) + } + d := &Distributor{ cfg: cfg, logger: logger, @@ -227,7 +273,30 @@ func New( Name: "stream_sharding_count", Help: "Total number of times the distributor has sharded streams", }), + kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "kafka_appends_total", + Help: "The total number of appends sent to kafka ingest path.", + }, []string{"partition", "status"}), + kafkaWriteLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_latency_seconds", + Help: "Latency to write an incoming request to the ingest storage.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMinResetDuration: 1 * time.Hour, + NativeHistogramMaxBucketNumber: 100, + Buckets: prometheus.DefBuckets, + }), + kafkaWriteBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "kafka_sent_bytes_total", + Help: "Total number of bytes sent to the ingest storage.", + }), + kafkaRecordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_records_per_write_request", + Help: "The number of records a single per-partition write request has been split into.", + Buckets: prometheus.ExponentialBuckets(1, 2, 8), + }), writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"), + kafkaWriter: kafkaWriter, + partitionRing: partitionRing, } if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { @@ -294,6 +363,9 @@ func (d *Distributor) running(ctx context.Context) error { } func (d *Distributor) stopping(_ error) error { + if d.kafkaWriter != nil { + d.kafkaWriter.Close() + } return services.StopManagerAndAwaitStopped(context.Background(), d.subservices) } @@ -319,6 +391,21 @@ type pushTracker struct { err chan error } +// doneWithResult records the result of a stream push. +// If err is nil, the stream push is considered successful. +// If err is not nil, the stream push is considered failed. +func (p *pushTracker) doneWithResult(err error) { + if err == nil { + if p.streamsPending.Dec() == 0 { + p.done <- struct{}{} + } + } else { + if p.streamsFailed.Inc() == 1 { + p.err <- err + } + } +} + // Push a set of streams. // The returned error is the last one seen. func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { @@ -488,57 +575,74 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck var descs [maxExpectedReplicationSet]ring.InstanceDesc - streamTrackers := make([]streamTracker, len(streams)) - streamsByIngester := map[string][]*streamTracker{} - ingesterDescs := map[string]ring.InstanceDesc{} + tracker := pushTracker{ + done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each + err: make(chan error, 1), + } + streamsToWrite := 0 + if d.cfg.IngesterEnabled { + streamsToWrite += len(streams) + } + if d.cfg.KafkaEnabled { + streamsToWrite += len(streams) + } + // We must correctly set streamsPending before beginning any writes to ensure we don't have a race between finishing all of one path before starting the other. + tracker.streamsPending.Store(int32(streamsToWrite)) - if err := func() error { - sp := opentracing.SpanFromContext(ctx) - if sp != nil { - sp.LogKV("event", "started to query ingesters ring") - defer func() { - sp.LogKV("event", "finished to query ingesters ring") - }() - } + if d.cfg.KafkaEnabled { + // We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded. + d.sendStreamsToKafka(ctx, streams, tenantID, &tracker) + } - for i, stream := range streams { - replicationSet, err := d.ingestersRing.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) - if err != nil { - return err + if d.cfg.IngesterEnabled { + streamTrackers := make([]streamTracker, len(streams)) + streamsByIngester := map[string][]*streamTracker{} + ingesterDescs := map[string]ring.InstanceDesc{} + + if err := func() error { + sp := opentracing.SpanFromContext(ctx) + if sp != nil { + sp.LogKV("event", "started to query ingesters ring") + defer func() { + sp.LogKV("event", "finished to query ingesters ring") + }() } - streamTrackers[i] = streamTracker{ - KeyedStream: stream, - minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors, - maxFailures: replicationSet.MaxErrors, - } - for _, ingester := range replicationSet.Instances { - streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streamTrackers[i]) - ingesterDescs[ingester.Addr] = ingester + for i, stream := range streams { + replicationSet, err := d.ingestersRing.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) + if err != nil { + return err + } + + streamTrackers[i] = streamTracker{ + KeyedStream: stream, + minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors, + maxFailures: replicationSet.MaxErrors, + } + for _, ingester := range replicationSet.Instances { + streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streamTrackers[i]) + ingesterDescs[ingester.Addr] = ingester + } } + return nil + }(); err != nil { + return nil, err } - return nil - }(); err != nil { - return nil, err - } - tracker := pushTracker{ - done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each - err: make(chan error, 1), - } - tracker.streamsPending.Store(int32(len(streams))) - for ingester, streams := range streamsByIngester { - go func(ingester ring.InstanceDesc, samples []*streamTracker) { - // Use a background context to make sure all ingesters get samples even if we return early - localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) - defer cancel() - localCtx = user.InjectOrgID(localCtx, tenantID) - if sp := opentracing.SpanFromContext(ctx); sp != nil { - localCtx = opentracing.ContextWithSpan(localCtx, sp) - } - d.sendStreams(localCtx, ingester, samples, &tracker) - }(ingesterDescs[ingester], streams) + for ingester, streams := range streamsByIngester { + go func(ingester ring.InstanceDesc, samples []*streamTracker) { + // Use a background context to make sure all ingesters get samples even if we return early + localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) + defer cancel() + localCtx = user.InjectOrgID(localCtx, tenantID) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + localCtx = opentracing.ContextWithSpan(localCtx, sp) + } + d.sendStreams(localCtx, ingester, samples, &tracker) + }(ingesterDescs[ingester], streams) + } } + select { case err := <-tracker.err: return nil, err @@ -744,16 +848,12 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { continue } - if pushTracker.streamsFailed.Inc() == 1 { - pushTracker.err <- err - } + pushTracker.doneWithResult(err) } else { if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { continue } - if pushTracker.streamsPending.Dec() == 0 { - pushTracker.done <- struct{}{} - } + pushTracker.doneWithResult(nil) } } } @@ -785,6 +885,70 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance return err } +func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker) { + for _, s := range streams { + go func(s KeyedStream) { + err := d.sendStreamToKafka(ctx, s, tenant) + if err != nil { + err = fmt.Errorf("failed to write stream to kafka: %w", err) + } + tracker.doneWithResult(err) + }(s) + } +} + +func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string) error { + if len(stream.Stream.Entries) == 0 { + return nil + } + /* partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) + if err != nil { + d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() + return fmt.Errorf("failed to find active partition for stream: %w", err) + }*/ + partitionID := int32(0) + + startTime := time.Now() + + records, err := kafka.Encode(partitionID, tenant, stream.Stream, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes) + if err != nil { + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + return fmt.Errorf("failed to marshal write request to records: %w", err) + } + + d.kafkaRecordsPerRequest.Observe(float64(len(records))) + + produceResults := d.kafkaWriter.ProduceSync(ctx, records) + + if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 { + d.kafkaWriteLatency.Observe(time.Since(startTime).Seconds()) + d.kafkaWriteBytesTotal.Add(float64(sizeBytes)) + } + + var finalErr error + for _, result := range produceResults { + if result.Err != nil { + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + finalErr = result.Err + } else { + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() + } + } + + return finalErr +} + +func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) { + for _, res := range results { + if res.Err == nil && res.Record != nil { + count++ + sizeBytes += len(res.Record.Value) + } + } + + return +} + type labelData struct { ls labels.Labels hash uint64 diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2e8f7b895e0f9..cebd46858e177 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" @@ -505,6 +506,76 @@ func TestDistributorPushErrors(t *testing.T) { }) } +func TestDistributorPushToKafka(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + t.Run("with kafka, any failure fails the request", func(t *testing.T) { + kafkaWriter := &mockKafkaWriter{ + failOnWrite: true, + } + distributors, _ := prepare(t, 1, 0, limits, nil) + for _, d := range distributors { + d.cfg.KafkaEnabled = true + d.cfg.IngesterEnabled = false + d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000 + d.kafkaWriter = kafkaWriter + } + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.Error(t, err) + }) + + t.Run("with kafka, no failures is successful", func(t *testing.T) { + kafkaWriter := &mockKafkaWriter{ + failOnWrite: false, + } + distributors, _ := prepare(t, 1, 0, limits, nil) + for _, d := range distributors { + d.cfg.KafkaEnabled = true + d.cfg.IngesterEnabled = false + d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000 + d.kafkaWriter = kafkaWriter + } + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.NoError(t, err) + + require.Equal(t, 1, kafkaWriter.pushed) + }) + + t.Run("with kafka and ingesters, both must complete", func(t *testing.T) { + kafkaWriter := &mockKafkaWriter{ + failOnWrite: false, + } + distributors, ingesters := prepare(t, 1, 3, limits, nil) + ingesters[0].succeedAfter = 5 * time.Millisecond + ingesters[1].succeedAfter = 10 * time.Millisecond + ingesters[2].succeedAfter = 15 * time.Millisecond + + for _, d := range distributors { + d.cfg.KafkaEnabled = true + d.cfg.IngesterEnabled = true + d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000 + d.kafkaWriter = kafkaWriter + } + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.NoError(t, err) + + require.Equal(t, 1, kafkaWriter.pushed) + + require.Equal(t, 1, len(ingesters[0].pushed)) + require.Equal(t, 1, len(ingesters[1].pushed)) + require.Eventually(t, func() bool { + return len(ingesters[2].pushed) == 1 + }, time.Second, 10*time.Millisecond) + }) +} + func Test_SortLabelsOnPush(t *testing.T) { t.Run("with service_name already present in labels", func(t *testing.T) { limits := &validation.Limits{} @@ -1270,9 +1341,26 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) - test.Poll(t, time.Second, numIngesters, func() interface{} { - return ingestersRing.InstancesCount() + partitionRing := ring.NewPartitionRing(ring.PartitionRingDesc{ + Partitions: map[int32]ring.PartitionDesc{ + 1: { + Id: 1, + Tokens: []uint32{1}, + State: ring.PartitionActive, + StateTimestamp: time.Now().Unix(), + }, + }, + Owners: map[string]ring.OwnerDesc{ + "test": { + OwnedPartition: 1, + State: ring.OwnerActive, + UpdatedTimestamp: time.Now().Unix(), + }, + }, }) + partitionRingReader := mockPartitionRingReader{ + ring: partitionRing, + } loopbackName, err := loki_net.LoopbackInterfaceName() require.NoError(t, err) @@ -1299,7 +1387,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) distributors[i] = d @@ -1373,6 +1461,37 @@ func makeWriteRequest(lines, size int) *logproto.PushRequest { return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`}) } +type mockKafkaWriter struct { + failOnWrite bool + pushed int +} + +func (m *mockKafkaWriter) ProduceSync(_ context.Context, _ []*kgo.Record) kgo.ProduceResults { + if m.failOnWrite { + return kgo.ProduceResults{ + { + Err: kgo.ErrRecordTimeout, + }, + } + } + m.pushed++ + return kgo.ProduceResults{ + { + Err: nil, + }, + } +} + +func (m *mockKafkaWriter) Close() {} + +type mockPartitionRingReader struct { + ring *ring.PartitionRing +} + +func (m mockPartitionRingReader) PartitionRing() *ring.PartitionRing { + return m.ring +} + type mockIngester struct { grpc_health_v1.HealthClient logproto.PusherClient diff --git a/pkg/ingester-kafka/kafka/kafka_tee.go b/pkg/ingester-kafka/kafka/kafka_tee.go deleted file mode 100644 index 6aeaad9724e68..0000000000000 --- a/pkg/ingester-kafka/kafka/kafka_tee.go +++ /dev/null @@ -1,209 +0,0 @@ -package kafka - -import ( - "context" - "errors" - "flag" - "fmt" - "math" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/twmb/franz-go/pkg/kgo" - - "github.com/twmb/franz-go/plugin/kprom" - - "github.com/grafana/loki/v3/pkg/distributor" - "github.com/grafana/loki/v3/pkg/logproto" -) - -const writeTimeout = time.Minute - -type Config struct { - Address string `yaml:"address" docs:"the kafka endpoint to connect to"` - Topic string `yaml:"topic" docs:"the kafka topic to write to"` -} - -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", f) -} - -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.Address, prefix+"address", "localhost:9092", "the kafka endpoint to connect to") - f.StringVar(&cfg.Topic, prefix+".topic", "loki.push", "The Kafka topic name.") -} - -type Tee struct { - logger log.Logger - kafkaClient *kgo.Client - partitionRing *ring.PartitionInstanceRing - - ingesterAppends *prometheus.CounterVec -} - -func NewTee( - cfg Config, - metricsNamespace string, - registerer prometheus.Registerer, - logger log.Logger, - partitionRing *ring.PartitionInstanceRing, -) (*Tee, error) { - registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - - metrics := kprom.NewMetrics( - "", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix. - kprom.Registerer(registerer), - kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes)) - - opts := append([]kgo.Opt{}, - kgo.SeedBrokers(cfg.Address), - - kgo.WithHooks(metrics), - // commonKafkaClientOptions(kafkaCfg, metrics, logger), - kgo.RequiredAcks(kgo.AllISRAcks()), - kgo.DefaultProduceTopic(cfg.Topic), - - kgo.AllowAutoTopicCreation(), - // We set the partition field in each record. - kgo.RecordPartitioner(kgo.ManualPartitioner()), - - // Set the upper bounds the size of a record batch. - kgo.ProducerBatchMaxBytes(1024*1024*1), - - // By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency - // (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher - // number of in-flight requests, in addition to short buffering ("linger") in client side before firing the - // next Produce request allows us to reduce the end-to-end latency. - // - // The result of the multiplication of producer linger and max in-flight requests should match the maximum - // Produce latency expected by the Kafka backend in a steady state. For example, 50ms * 20 requests = 1s, - // which means the Kafka client will keep issuing a Produce request every 50ms as far as the Kafka backend - // doesn't take longer than 1s to process them (if it takes longer, the client will buffer data and stop - // issuing new Produce requests until some previous ones complete). - kgo.DisableIdempotentWrite(), - kgo.ProducerLinger(50*time.Millisecond), - kgo.MaxProduceRequestsInflightPerBroker(20), - - // Unlimited number of Produce retries but a deadline on the max time a record can take to be delivered. - // With the default config it would retry infinitely. - // - // Details of the involved timeouts: - // - RecordDeliveryTimeout: how long a Kafka client Produce() call can take for a given record. The overhead - // timeout is NOT applied. - // - ProduceRequestTimeout: how long to wait for the response to the Produce request (the Kafka protocol message) - // after being sent on the network. The actual timeout is increased by the configured overhead. - // - // When a Produce request to Kafka fail, the client will retry up until the RecordDeliveryTimeout is reached. - // Once the timeout is reached, the Produce request will fail and all other buffered requests in the client - // (for the same partition) will fail too. See kgo.RecordDeliveryTimeout() documentation for more info. - kgo.RecordRetries(math.MaxInt), - kgo.RecordDeliveryTimeout(time.Minute), - kgo.ProduceRequestTimeout(time.Minute), - kgo.RequestTimeoutOverhead(time.Minute), - - // Unlimited number of buffered records because we limit on bytes in Writer. The reason why we don't use - // kgo.MaxBufferedBytes() is because it suffers a deadlock issue: - // https://github.com/twmb/franz-go/issues/777 - kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited". - kgo.MaxBufferedBytes(0), - ) - - kafkaClient, err := kgo.NewClient(opts...) - if err != nil { - panic("failed to start kafka client") - } - - t := &Tee{ - logger: log.With(logger, "component", "kafka-tee"), - ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "kafka_ingester_appends_total", - Help: "The total number of appends sent to kafka ingest path.", - }, []string{"partition", "status"}), - kafkaClient: kafkaClient, - partitionRing: partitionRing, - } - - return t, nil -} - -// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters. -func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { - for idx := range streams { - go func(stream distributor.KeyedStream) { - if err := t.sendStream(tenant, stream); err != nil { - level.Error(t.logger).Log("msg", "failed to send stream to kafka", "err", err) - } - }(streams[idx]) - } -} - -func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { - partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) - if err != nil { - t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc() - return fmt.Errorf("failed to find active partition for stream: %w", err) - } - records, err := marshalWriteRequestToRecords(partitionID, tenant, stream.Stream, 1024*1024) - - ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout) - defer cancel() - produceResults := t.kafkaClient.ProduceSync(ctx, records...) - - var finalErr error - for _, result := range produceResults { - if result.Err != nil { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() - finalErr = err - } else { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() - } - } - - return finalErr -} - -// marshalWriteRequestToRecords marshals a mimirpb.WriteRequest to one or more Kafka records. -// The request may be split to multiple records to get that each single Kafka record -// data size is not bigger than maxSize. -// -// This function is a best-effort. The returned Kafka records are not strictly guaranteed to -// have their data size limited to maxSize. The reason is that the WriteRequest is split -// by each individual Timeseries and Metadata: if a single Timeseries or Metadata is bigger than -// maxSize, than the resulting record will be bigger than the limit as well. -func marshalWriteRequestToRecords(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error) { - reqSize := stream.Size() - - if reqSize <= maxSize { - // No need to split the request. We can take a fast path. - rec, err := marshalWriteRequestToRecord(partitionID, tenantID, stream, reqSize) - if err != nil { - return nil, err - } - - return []*kgo.Record{rec}, nil - } - return nil, errors.New("large write requests are not supported yet") - - // return marshalWriteRequestsToRecords(partitionID, tenantID, mimirpb.SplitWriteRequestByMaxMarshalSize(req, reqSize, maxSize)) -} - -func marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream, reqSize int) (*kgo.Record, error) { - // Marshal the request. - data := make([]byte, reqSize) - n, err := stream.MarshalToSizedBuffer(data) - if err != nil { - return nil, fmt.Errorf("failed to serialise write request: %w", err) - } - data = data[:n] - - return &kgo.Record{ - Key: []byte(tenantID), // We don't partition based on the key, so the value here doesn't make any difference. - Value: data, - Partition: partitionID, - }, nil -} diff --git a/pkg/kafka/tee/tee.go b/pkg/kafka/tee/tee.go deleted file mode 100644 index 2228883efb32f..0000000000000 --- a/pkg/kafka/tee/tee.go +++ /dev/null @@ -1,174 +0,0 @@ -package tee - -import ( - "context" - "fmt" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/twmb/franz-go/pkg/kgo" - - "github.com/grafana/loki/v3/pkg/distributor" - "github.com/grafana/loki/v3/pkg/kafka" -) - -const writeTimeout = time.Minute - -// Tee represents a component that duplicates log streams to Kafka. -type Tee struct { - logger log.Logger - producer *kafka.Producer - partitionRing ring.PartitionRingReader - cfg kafka.Config - - ingesterAppends *prometheus.CounterVec - writeLatency prometheus.Histogram - writeBytesTotal prometheus.Counter - recordsPerRequest prometheus.Histogram -} - -// NewTee creates and initializes a new Tee instance. -// -// Parameters: -// - cfg: Kafka configuration -// - metricsNamespace: Namespace for Prometheus metrics -// - registerer: Prometheus registerer for metrics -// - logger: Logger instance -// - partitionRing: Ring for managing partitions -// -// Returns: -// - A new Tee instance and any error encountered during initialization -func NewTee( - cfg kafka.Config, - metricsNamespace string, - registerer prometheus.Registerer, - logger log.Logger, - partitionRing ring.PartitionRingReader, -) (*Tee, error) { - registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - - kafkaClient, err := kafka.NewWriterClient(cfg, 20, logger, registerer) - if err != nil { - return nil, fmt.Errorf("failed to start kafka client: %w", err) - } - producer := kafka.NewProducer(kafkaClient, cfg.ProducerMaxBufferedBytes, - prometheus.WrapRegistererWithPrefix("_kafka_ingester_", registerer)) - - t := &Tee{ - logger: log.With(logger, "component", "kafka-tee"), - ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "kafka_ingester_appends_total", - Help: "The total number of appends sent to kafka ingest path.", - }, []string{"partition", "status"}), - producer: producer, - partitionRing: partitionRing, - cfg: cfg, - // Metrics. - writeLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Name: "kafka_ingester_tee_latency_seconds", - Help: "Latency to write an incoming request to the ingest storage.", - NativeHistogramBucketFactor: 1.1, - NativeHistogramMinResetDuration: 1 * time.Hour, - NativeHistogramMaxBucketNumber: 100, - Buckets: prometheus.DefBuckets, - }), - writeBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Name: "kafka_ingester_tee_sent_bytes_total", - Help: "Total number of bytes sent to the ingest storage.", - }), - recordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Name: "kafka_ingester_tee_records_per_write_request", - Help: "The number of records a single per-partition write request has been split into.", - Buckets: prometheus.ExponentialBuckets(1, 2, 8), - }), - } - - return t, nil -} - -// Duplicate implements the distributor.Tee interface, which is used to duplicate -// distributor requests to pattern ingesters. It asynchronously sends each stream -// to Kafka. -// -// Parameters: -// - tenant: The tenant identifier -// - streams: A slice of KeyedStream to be duplicated -func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { - for idx := range streams { - go func(stream distributor.KeyedStream) { - if err := t.sendStream(tenant, stream); err != nil { - level.Error(t.logger).Log("msg", "failed to send stream to kafka", "err", err) - } - }(streams[idx]) - } -} - -func (t *Tee) Close() { - t.producer.Close() -} - -// sendStream sends a single stream to Kafka. -// -// Parameters: -// - tenant: The tenant identifier -// - stream: The KeyedStream to be sent -// -// Returns: -// - An error if the stream couldn't be sent successfully -func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { - if len(stream.Stream.Entries) == 0 { - return nil - } - partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) - if err != nil { - t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc() - return fmt.Errorf("failed to find active partition for stream: %w", err) - } - - startTime := time.Now() - - records, err := kafka.Encode(partitionID, tenant, stream.Stream, t.cfg.ProducerMaxRecordSizeBytes) - if err != nil { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() - return fmt.Errorf("failed to marshal write request to records: %w", err) - } - - t.recordsPerRequest.Observe(float64(len(records))) - - ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout) - defer cancel() - produceResults := t.producer.ProduceSync(ctx, records) - - if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 { - t.writeLatency.Observe(time.Since(startTime).Seconds()) - t.writeBytesTotal.Add(float64(sizeBytes)) - } - - var finalErr error - for _, result := range produceResults { - if result.Err != nil { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() - finalErr = err - } else { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() - } - } - - return finalErr -} - -func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) { - for _, res := range results { - if res.Err == nil && res.Record != nil { - count++ - sizeBytes += len(res.Record.Value) - } - } - - return -} diff --git a/pkg/kafka/tee/tee_test.go b/pkg/kafka/tee/tee_test.go deleted file mode 100644 index 2431f42033fc7..0000000000000 --- a/pkg/kafka/tee/tee_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package tee - -import ( - "os" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/ring" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/distributor" - "github.com/grafana/loki/v3/pkg/kafka/testkafka" - - "github.com/grafana/loki/pkg/push" -) - -func TestPushKafkaRecords(t *testing.T) { - _, cfg := testkafka.CreateCluster(t, 1, "topic") - tee, err := NewTee(cfg, "test", prometheus.NewRegistry(), log.NewLogfmtLogger(os.Stdout), newTestPartitionRing()) - require.NoError(t, err) - - err = tee.sendStream("test", distributor.KeyedStream{ - HashKey: 1, - Stream: push.Stream{ - Labels: `{foo="bar"}`, - Entries: []push.Entry{ - {Timestamp: time.Now(), Line: "test"}, - }, - }, - }) - require.NoError(t, err) -} - -type testPartitionRing struct { - partitionRing *ring.PartitionRing -} - -func (t *testPartitionRing) PartitionRing() *ring.PartitionRing { - return t.partitionRing -} - -func newTestPartitionRing() ring.PartitionRingReader { - desc := ring.NewPartitionRingDesc() - desc.AddPartition(0, ring.PartitionActive, time.Now()) - return &testPartitionRing{ - partitionRing: ring.NewPartitionRing(*desc), - } -} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 467c03eb0a477..20ea802f12ebb 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -300,6 +300,9 @@ func (c *Config) Validate() error { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid kafka_config config")) } } + if err := c.Distributor.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid distributor config")) + } errs = append(errs, validateSchemaValues(c)...) errs = append(errs, ValidateConfigCompatibility(*c)...) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1d7a99ec066c8..d256ffbd00966 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -321,6 +321,8 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { } func (t *Loki) initDistributor() (services.Service, error) { + t.Cfg.Distributor.KafkaConfig = t.Cfg.KafkaConfig + var err error logger := log.With(util_log.Logger, "component", "distributor") t.distributor, err = distributor.New( @@ -328,6 +330,7 @@ func (t *Loki) initDistributor() (services.Service, error) { t.Cfg.IngesterClient, t.tenantConfigs, t.ring, + t.partitionRing, t.Overrides, prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace,