From 46ad223051fb4080605443d58e9ef7e6c09803af Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Thu, 19 Sep 2024 14:55:28 +0100 Subject: [PATCH 1/8] feat: Optionally enforce writes to kafka on Push --- pkg/distributor/distributor.go | 39 +++-- pkg/distributor/distributor_test.go | 2 +- pkg/distributor/tee.go | 26 ++++ pkg/ingester-kafka/kafka/kafka_tee.go | 209 -------------------------- pkg/ingester/ingester.go | 1 + pkg/kafka/ingester/ingester.go | 1 + pkg/kafka/tee/tee.go | 26 +++- pkg/loki/modules.go | 15 ++ 8 files changed, 92 insertions(+), 227 deletions(-) delete mode 100644 pkg/ingester-kafka/kafka/kafka_tee.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f6ae454e1482a..4c7da078c3889 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -116,6 +116,7 @@ type Distributor struct { validator *Validator pool *ring_client.Pool tee Tee + trackedTee TrackedTee rateStore RateStore shardTracker *ShardTracker @@ -158,6 +159,7 @@ func New( registerer prometheus.Registerer, metricsNamespace string, tee Tee, + trackedTee TrackedTee, usageTracker push.UsageTracker, logger log.Logger, ) (*Distributor, error) { @@ -206,6 +208,7 @@ func New( healthyInstancesCount: atomic.NewUint32(0), rateLimitStrat: rateLimitStrat, tee: tee, + trackedTee: trackedTee, usageTracker: usageTracker, ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, @@ -312,11 +315,11 @@ type streamTracker struct { } // TODO taken from Cortex, see if we can refactor out an usable interface. -type pushTracker struct { - streamsPending atomic.Int32 - streamsFailed atomic.Int32 - done chan struct{} - err chan error +type PushTracker struct { + StreamsPending atomic.Int32 + StreamsFailed atomic.Int32 + Done chan struct{} + Err chan error } // Push a set of streams. @@ -522,11 +525,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return nil, err } - tracker := pushTracker{ - done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each - err: make(chan error, 1), + tracker := PushTracker{ + Done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each + Err: make(chan error, 1), } - tracker.streamsPending.Store(int32(len(streams))) + if d.trackedTee != nil { + d.trackedTee.DuplicateWithTracking(tenantID, streams, &tracker) + } + + tracker.StreamsPending.Add(int32(len(streams))) for ingester, streams := range streamsByIngester { go func(ingester ring.InstanceDesc, samples []*streamTracker) { // Use a background context to make sure all ingesters get samples even if we return early @@ -540,9 +547,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log }(ingesterDescs[ingester], streams) } select { - case err := <-tracker.err: + case err := <-tracker.Err: return nil, err - case <-tracker.done: + case <-tracker.Done: return &logproto.PushResponse{}, validationErr case <-ctx.Done(): return nil, ctx.Err() @@ -727,7 +734,7 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto } // TODO taken from Cortex, see if we can refactor out an usable interface. -func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { +func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *PushTracker) { err := d.sendStreamsErr(ctx, ingester, streamTrackers) // If we succeed, decrement each stream's pending count by one. @@ -744,15 +751,15 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { continue } - if pushTracker.streamsFailed.Inc() == 1 { - pushTracker.err <- err + if pushTracker.StreamsFailed.Inc() == 1 { + pushTracker.Err <- err } } else { if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { continue } - if pushTracker.streamsPending.Dec() == 0 { - pushTracker.done <- struct{}{} + if pushTracker.StreamsPending.Dec() == 0 { + pushTracker.Done <- struct{}{} } } } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2e8f7b895e0f9..df199083cdd93 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1299,7 +1299,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) distributors[i] = d diff --git a/pkg/distributor/tee.go b/pkg/distributor/tee.go index 04acb1e22c0df..3ce3f514fc665 100644 --- a/pkg/distributor/tee.go +++ b/pkg/distributor/tee.go @@ -5,6 +5,10 @@ type Tee interface { Duplicate(tenant string, streams []KeyedStream) } +type TrackedTee interface { + DuplicateWithTracking(tenantID string, streams []KeyedStream, tracker *PushTracker) +} + // WrapTee wraps a new Tee around an existing Tee. func WrapTee(existing, new Tee) Tee { if existing == nil { @@ -25,3 +29,25 @@ func (m *multiTee) Duplicate(tenant string, streams []KeyedStream) { tee.Duplicate(tenant, streams) } } + +type multiTrackedTee struct { + tees []TrackedTee +} + +// WrapTrackedTee wraps a new TrackedTee around an existing TrackedTee. +func WrapTrackedTee(existing, new TrackedTee) TrackedTee { + if existing == nil { + return new + } + if multi, ok := existing.(*multiTrackedTee); ok { + return &multiTrackedTee{append(multi.tees, new)} + } + return &multiTrackedTee{tees: []TrackedTee{existing, new}} +} + +func (m *multiTrackedTee) DuplicateWithTracking(tenant string, streams []KeyedStream, tracker *PushTracker) { + tracker.StreamsPending.Add(int32(len(streams) * len(m.tees))) + for _, tee := range m.tees { + go tee.DuplicateWithTracking(tenant, streams, tracker) + } +} diff --git a/pkg/ingester-kafka/kafka/kafka_tee.go b/pkg/ingester-kafka/kafka/kafka_tee.go deleted file mode 100644 index 6aeaad9724e68..0000000000000 --- a/pkg/ingester-kafka/kafka/kafka_tee.go +++ /dev/null @@ -1,209 +0,0 @@ -package kafka - -import ( - "context" - "errors" - "flag" - "fmt" - "math" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/twmb/franz-go/pkg/kgo" - - "github.com/twmb/franz-go/plugin/kprom" - - "github.com/grafana/loki/v3/pkg/distributor" - "github.com/grafana/loki/v3/pkg/logproto" -) - -const writeTimeout = time.Minute - -type Config struct { - Address string `yaml:"address" docs:"the kafka endpoint to connect to"` - Topic string `yaml:"topic" docs:"the kafka topic to write to"` -} - -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", f) -} - -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.Address, prefix+"address", "localhost:9092", "the kafka endpoint to connect to") - f.StringVar(&cfg.Topic, prefix+".topic", "loki.push", "The Kafka topic name.") -} - -type Tee struct { - logger log.Logger - kafkaClient *kgo.Client - partitionRing *ring.PartitionInstanceRing - - ingesterAppends *prometheus.CounterVec -} - -func NewTee( - cfg Config, - metricsNamespace string, - registerer prometheus.Registerer, - logger log.Logger, - partitionRing *ring.PartitionInstanceRing, -) (*Tee, error) { - registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - - metrics := kprom.NewMetrics( - "", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix. - kprom.Registerer(registerer), - kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes)) - - opts := append([]kgo.Opt{}, - kgo.SeedBrokers(cfg.Address), - - kgo.WithHooks(metrics), - // commonKafkaClientOptions(kafkaCfg, metrics, logger), - kgo.RequiredAcks(kgo.AllISRAcks()), - kgo.DefaultProduceTopic(cfg.Topic), - - kgo.AllowAutoTopicCreation(), - // We set the partition field in each record. - kgo.RecordPartitioner(kgo.ManualPartitioner()), - - // Set the upper bounds the size of a record batch. - kgo.ProducerBatchMaxBytes(1024*1024*1), - - // By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency - // (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher - // number of in-flight requests, in addition to short buffering ("linger") in client side before firing the - // next Produce request allows us to reduce the end-to-end latency. - // - // The result of the multiplication of producer linger and max in-flight requests should match the maximum - // Produce latency expected by the Kafka backend in a steady state. For example, 50ms * 20 requests = 1s, - // which means the Kafka client will keep issuing a Produce request every 50ms as far as the Kafka backend - // doesn't take longer than 1s to process them (if it takes longer, the client will buffer data and stop - // issuing new Produce requests until some previous ones complete). - kgo.DisableIdempotentWrite(), - kgo.ProducerLinger(50*time.Millisecond), - kgo.MaxProduceRequestsInflightPerBroker(20), - - // Unlimited number of Produce retries but a deadline on the max time a record can take to be delivered. - // With the default config it would retry infinitely. - // - // Details of the involved timeouts: - // - RecordDeliveryTimeout: how long a Kafka client Produce() call can take for a given record. The overhead - // timeout is NOT applied. - // - ProduceRequestTimeout: how long to wait for the response to the Produce request (the Kafka protocol message) - // after being sent on the network. The actual timeout is increased by the configured overhead. - // - // When a Produce request to Kafka fail, the client will retry up until the RecordDeliveryTimeout is reached. - // Once the timeout is reached, the Produce request will fail and all other buffered requests in the client - // (for the same partition) will fail too. See kgo.RecordDeliveryTimeout() documentation for more info. - kgo.RecordRetries(math.MaxInt), - kgo.RecordDeliveryTimeout(time.Minute), - kgo.ProduceRequestTimeout(time.Minute), - kgo.RequestTimeoutOverhead(time.Minute), - - // Unlimited number of buffered records because we limit on bytes in Writer. The reason why we don't use - // kgo.MaxBufferedBytes() is because it suffers a deadlock issue: - // https://github.com/twmb/franz-go/issues/777 - kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited". - kgo.MaxBufferedBytes(0), - ) - - kafkaClient, err := kgo.NewClient(opts...) - if err != nil { - panic("failed to start kafka client") - } - - t := &Tee{ - logger: log.With(logger, "component", "kafka-tee"), - ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "kafka_ingester_appends_total", - Help: "The total number of appends sent to kafka ingest path.", - }, []string{"partition", "status"}), - kafkaClient: kafkaClient, - partitionRing: partitionRing, - } - - return t, nil -} - -// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters. -func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { - for idx := range streams { - go func(stream distributor.KeyedStream) { - if err := t.sendStream(tenant, stream); err != nil { - level.Error(t.logger).Log("msg", "failed to send stream to kafka", "err", err) - } - }(streams[idx]) - } -} - -func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { - partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) - if err != nil { - t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc() - return fmt.Errorf("failed to find active partition for stream: %w", err) - } - records, err := marshalWriteRequestToRecords(partitionID, tenant, stream.Stream, 1024*1024) - - ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout) - defer cancel() - produceResults := t.kafkaClient.ProduceSync(ctx, records...) - - var finalErr error - for _, result := range produceResults { - if result.Err != nil { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() - finalErr = err - } else { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() - } - } - - return finalErr -} - -// marshalWriteRequestToRecords marshals a mimirpb.WriteRequest to one or more Kafka records. -// The request may be split to multiple records to get that each single Kafka record -// data size is not bigger than maxSize. -// -// This function is a best-effort. The returned Kafka records are not strictly guaranteed to -// have their data size limited to maxSize. The reason is that the WriteRequest is split -// by each individual Timeseries and Metadata: if a single Timeseries or Metadata is bigger than -// maxSize, than the resulting record will be bigger than the limit as well. -func marshalWriteRequestToRecords(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error) { - reqSize := stream.Size() - - if reqSize <= maxSize { - // No need to split the request. We can take a fast path. - rec, err := marshalWriteRequestToRecord(partitionID, tenantID, stream, reqSize) - if err != nil { - return nil, err - } - - return []*kgo.Record{rec}, nil - } - return nil, errors.New("large write requests are not supported yet") - - // return marshalWriteRequestsToRecords(partitionID, tenantID, mimirpb.SplitWriteRequestByMaxMarshalSize(req, reqSize, maxSize)) -} - -func marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream, reqSize int) (*kgo.Record, error) { - // Marshal the request. - data := make([]byte, reqSize) - n, err := stream.MarshalToSizedBuffer(data) - if err != nil { - return nil, fmt.Errorf("failed to serialise write request: %w", err) - } - data = data[:n] - - return &kgo.Record{ - Key: []byte(tenantID), // We don't partition based on the key, so the value here doesn't make any difference. - Value: data, - Partition: partitionID, - }, nil -} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9c913f9049f44..dbb1ccb665216 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -193,6 +193,7 @@ func (cfg *Config) Validate() error { type KafkaIngestionConfig struct { Enabled bool `yaml:"enabled" doc:"description=Whether the kafka ingester is enabled."` + Required bool `yaml:"required" doc:"description=Whether ingestion to kafka is required to succeed during a Push request or if they are best effort."` PartitionRingConfig partitionring.Config `yaml:"partition_ring" category:"experimental"` KafkaConfig kafka.Config `yaml:"-"` } diff --git a/pkg/kafka/ingester/ingester.go b/pkg/kafka/ingester/ingester.go index ef356778e4390..f54971e6540fd 100644 --- a/pkg/kafka/ingester/ingester.go +++ b/pkg/kafka/ingester/ingester.go @@ -41,6 +41,7 @@ var ( // Config for an ingester. type Config struct { Enabled bool `yaml:"enabled" doc:"description=Whether the kafka ingester is enabled."` + RequireAcks bool `yaml:"require_acks" doc:"description=Whether the distributor should wait for acks from this ingester before accepting writes."` LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."` ShutdownMarkerPath string `yaml:"shutdown_marker_path"` FlushInterval time.Duration `yaml:"flush_interval" doc:"description=The interval at which the ingester will flush and commit offsets to Kafka. If not set, the default flush interval will be used."` diff --git a/pkg/kafka/tee/tee.go b/pkg/kafka/tee/tee.go index 2228883efb32f..62cc8c4fdd9a4 100644 --- a/pkg/kafka/tee/tee.go +++ b/pkg/kafka/tee/tee.go @@ -92,7 +92,7 @@ func NewTee( } // Duplicate implements the distributor.Tee interface, which is used to duplicate -// distributor requests to pattern ingesters. It asynchronously sends each stream +// distributor requests to different destinations. It asynchronously sends each stream // to Kafka. // // Parameters: @@ -108,6 +108,30 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { } } +// DuplicateWithTracking implements the distributor.TrackedTee interface, which is used to duplicate +// distributor requests to different destinations. It sends each stream +// to Kafka and requires all streams to be ack'd to the tracker. +// +// Parameters: +// - tenant: The tenant identifier +// - streams: A slice of KeyedStream to be duplicated +func (t *Tee) DuplicateWithTracking(tenant string, streams []distributor.KeyedStream, tracker *distributor.PushTracker) { + for idx := range streams { + go func(stream distributor.KeyedStream) { + err := t.sendStream(tenant, stream) + if err != nil { + if tracker.StreamsFailed.Inc() == 1 { + tracker.Err <- err + } + } else { + if tracker.StreamsPending.Dec() == 0 { + tracker.Done <- struct{}{} + } + } + }(streams[idx]) + } +} + func (t *Tee) Close() { t.producer.Close() } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1d7a99ec066c8..c9a02f77c448d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -49,6 +49,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" ingesterkafka "github.com/grafana/loki/v3/pkg/kafka/ingester" + kafka_tee "github.com/grafana/loki/v3/pkg/kafka/tee" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -321,6 +322,19 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { } func (t *Loki) initDistributor() (services.Service, error) { + var trackedTee distributor.TrackedTee + if t.Cfg.Ingester.KafkaIngestion.Enabled { + kafkaTee, err := kafka_tee.NewTee(t.Cfg.KafkaConfig, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, util_log.Logger, t.partitionRing) + if err != nil { + return nil, err + } + if t.Cfg.Ingester.KafkaIngestion.Required { + trackedTee = distributor.WrapTrackedTee(trackedTee, kafkaTee) + } else { + t.Tee = distributor.WrapTee(t.Tee, kafkaTee) + } + } + var err error logger := log.With(util_log.Logger, "component", "distributor") t.distributor, err = distributor.New( @@ -332,6 +346,7 @@ func (t *Loki) initDistributor() (services.Service, error) { prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace, t.Tee, + trackedTee, t.UsageTracker, logger, ) From 0fb76fdee6b09f45c5cbce128806f777ae9e434e Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Thu, 19 Sep 2024 15:00:44 +0100 Subject: [PATCH 2/8] remove unused param --- pkg/kafka/ingester/ingester.go | 1 - pkg/kafka/tee/tee.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kafka/ingester/ingester.go b/pkg/kafka/ingester/ingester.go index f54971e6540fd..ef356778e4390 100644 --- a/pkg/kafka/ingester/ingester.go +++ b/pkg/kafka/ingester/ingester.go @@ -41,7 +41,6 @@ var ( // Config for an ingester. type Config struct { Enabled bool `yaml:"enabled" doc:"description=Whether the kafka ingester is enabled."` - RequireAcks bool `yaml:"require_acks" doc:"description=Whether the distributor should wait for acks from this ingester before accepting writes."` LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."` ShutdownMarkerPath string `yaml:"shutdown_marker_path"` FlushInterval time.Duration `yaml:"flush_interval" doc:"description=The interval at which the ingester will flush and commit offsets to Kafka. If not set, the default flush interval will be used."` diff --git a/pkg/kafka/tee/tee.go b/pkg/kafka/tee/tee.go index 62cc8c4fdd9a4..3076ae836633c 100644 --- a/pkg/kafka/tee/tee.go +++ b/pkg/kafka/tee/tee.go @@ -115,6 +115,7 @@ func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { // Parameters: // - tenant: The tenant identifier // - streams: A slice of KeyedStream to be duplicated +// - tracker: A tracker object that returns emits when all streams are ack'd or on first error. func (t *Tee) DuplicateWithTracking(tenant string, streams []distributor.KeyedStream, tracker *distributor.PushTracker) { for idx := range streams { go func(stream distributor.KeyedStream) { From ae3d2f65434421628ba9da6d1fa63cbb4085d681 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 20 Sep 2024 14:40:41 +0100 Subject: [PATCH 3/8] Make kafka writes a first-class component of the distributor --- pkg/distributor/distributor.go | 264 +++++++++++++++++++++------- pkg/distributor/distributor_test.go | 2 +- pkg/distributor/tee.go | 26 --- pkg/distributor/tee_test.go | 3 +- pkg/ingester/ingester.go | 1 - pkg/kafka/tee/tee.go | 199 --------------------- pkg/kafka/tee/tee_test.go | 50 ------ pkg/loki/loki.go | 3 + pkg/loki/modules.go | 16 +- 9 files changed, 210 insertions(+), 354 deletions(-) delete mode 100644 pkg/kafka/tee/tee.go delete mode 100644 pkg/kafka/tee/tee_test.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4c7da078c3889..8bd13b4aae26c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/prometheus/prometheus/model/labels" + "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc/codes" @@ -44,6 +45,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" + "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" @@ -88,6 +90,10 @@ type Config struct { WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."` OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"` + + KafkaEnabled bool `yaml:"kafka_writes_enabled"` + IngesterEnabled bool `yaml:"ingester_writes_enabled"` + KafkaConfig kafka.Config `yaml:"-"` } // RegisterFlags registers distributor-related flags. @@ -96,6 +102,16 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.DistributorRing.RegisterFlags(fs) cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs) cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs) + + fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.") + fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.") +} + +func (cfg *Config) Validate() error { + if !cfg.KafkaEnabled && !cfg.IngesterEnabled { + return fmt.Errorf("at least one of kafka and ingestor writes must be enabled") + } + return nil } // RateStore manages the ingestion rate of streams, populated by data fetched from ingesters. @@ -116,7 +132,6 @@ type Distributor struct { validator *Validator pool *ring_client.Pool tee Tee - trackedTee TrackedTee rateStore RateStore shardTracker *ShardTracker @@ -147,6 +162,16 @@ type Distributor struct { streamShardCount prometheus.Counter usageTracker push.UsageTracker + + // kafka + kafkaWriter *kafka.Producer + partitionRing ring.PartitionRingReader + + // kafka metrics + kafkaAppends *prometheus.CounterVec + kafkaWriteBytesTotal prometheus.Counter + kafkaWriteLatency prometheus.Histogram + kafkaRecordsPerRequest prometheus.Histogram } // New a distributor creates. @@ -155,11 +180,11 @@ func New( clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, + partitionRing ring.PartitionRingReader, overrides Limits, registerer prometheus.Registerer, metricsNamespace string, tee Tee, - trackedTee TrackedTee, usageTracker push.UsageTracker, logger log.Logger, ) (*Distributor, error) { @@ -194,6 +219,20 @@ func New( return nil, err } + if partitionRing == nil && cfg.KafkaEnabled { + return nil, fmt.Errorf("partition ring is required for kafka writes") + } + + var kafkaWriter *kafka.Producer + if cfg.KafkaEnabled { + kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer) + if err != nil { + return nil, fmt.Errorf("failed to start kafka client: %w", err) + } + kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, + prometheus.WrapRegistererWithPrefix("_kafka_ingester_", registerer)) + } + d := &Distributor{ cfg: cfg, logger: logger, @@ -208,7 +247,6 @@ func New( healthyInstancesCount: atomic.NewUint32(0), rateLimitStrat: rateLimitStrat, tee: tee, - trackedTee: trackedTee, usageTracker: usageTracker, ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, @@ -230,7 +268,30 @@ func New( Name: "stream_sharding_count", Help: "Total number of times the distributor has sharded streams", }), + kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "kafka_appends_total", + Help: "The total number of appends sent to kafka ingest path.", + }, []string{"partition", "status"}), + kafkaWriteLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_latency_seconds", + Help: "Latency to write an incoming request to the ingest storage.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMinResetDuration: 1 * time.Hour, + NativeHistogramMaxBucketNumber: 100, + Buckets: prometheus.DefBuckets, + }), + kafkaWriteBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "kafka_sent_bytes_total", + Help: "Total number of bytes sent to the ingest storage.", + }), + kafkaRecordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_records_per_write_request", + Help: "The number of records a single per-partition write request has been split into.", + Buckets: prometheus.ExponentialBuckets(1, 2, 8), + }), writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"), + kafkaWriter: kafkaWriter, + partitionRing: partitionRing, } if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { @@ -315,11 +376,11 @@ type streamTracker struct { } // TODO taken from Cortex, see if we can refactor out an usable interface. -type PushTracker struct { - StreamsPending atomic.Int32 - StreamsFailed atomic.Int32 - Done chan struct{} - Err chan error +type pushTracker struct { + streamsPending atomic.Int32 + streamsFailed atomic.Int32 + done chan struct{} + err chan error } // Push a set of streams. @@ -491,65 +552,78 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck var descs [maxExpectedReplicationSet]ring.InstanceDesc - streamTrackers := make([]streamTracker, len(streams)) - streamsByIngester := map[string][]*streamTracker{} - ingesterDescs := map[string]ring.InstanceDesc{} + tracker := pushTracker{ + done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each + err: make(chan error, 1), + } + streamsToWrite := 0 + if d.cfg.IngesterEnabled { + streamsToWrite += len(streams) + } + if d.cfg.KafkaEnabled { + streamsToWrite += len(streams) + } + // We must correctly set streamsPending before beginning any writes to ensure we don't have a race between finishing all of one path before starting the other. + tracker.streamsPending.Store(int32(streamsToWrite)) - if err := func() error { - sp := opentracing.SpanFromContext(ctx) - if sp != nil { - sp.LogKV("event", "started to query ingesters ring") - defer func() { - sp.LogKV("event", "finished to query ingesters ring") - }() - } + if d.cfg.KafkaEnabled { + // We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded. + d.sendStreamsToKafka(ctx, streams, tenantID, &tracker) + } - for i, stream := range streams { - replicationSet, err := d.ingestersRing.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) - if err != nil { - return err - } + if d.cfg.IngesterEnabled { + streamTrackers := make([]streamTracker, len(streams)) + streamsByIngester := map[string][]*streamTracker{} + ingesterDescs := map[string]ring.InstanceDesc{} - streamTrackers[i] = streamTracker{ - KeyedStream: stream, - minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors, - maxFailures: replicationSet.MaxErrors, + if err := func() error { + sp := opentracing.SpanFromContext(ctx) + if sp != nil { + sp.LogKV("event", "started to query ingesters ring") + defer func() { + sp.LogKV("event", "finished to query ingesters ring") + }() } - for _, ingester := range replicationSet.Instances { - streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streamTrackers[i]) - ingesterDescs[ingester.Addr] = ingester + + for i, stream := range streams { + replicationSet, err := d.ingestersRing.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) + if err != nil { + return err + } + + streamTrackers[i] = streamTracker{ + KeyedStream: stream, + minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors, + maxFailures: replicationSet.MaxErrors, + } + for _, ingester := range replicationSet.Instances { + streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streamTrackers[i]) + ingesterDescs[ingester.Addr] = ingester + } } + return nil + }(); err != nil { + return nil, err } - return nil - }(); err != nil { - return nil, err - } - tracker := PushTracker{ - Done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each - Err: make(chan error, 1), - } - if d.trackedTee != nil { - d.trackedTee.DuplicateWithTracking(tenantID, streams, &tracker) + for ingester, streams := range streamsByIngester { + go func(ingester ring.InstanceDesc, samples []*streamTracker) { + // Use a background context to make sure all ingesters get samples even if we return early + localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) + defer cancel() + localCtx = user.InjectOrgID(localCtx, tenantID) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + localCtx = opentracing.ContextWithSpan(localCtx, sp) + } + d.sendStreams(localCtx, ingester, samples, &tracker) + }(ingesterDescs[ingester], streams) + } } - tracker.StreamsPending.Add(int32(len(streams))) - for ingester, streams := range streamsByIngester { - go func(ingester ring.InstanceDesc, samples []*streamTracker) { - // Use a background context to make sure all ingesters get samples even if we return early - localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) - defer cancel() - localCtx = user.InjectOrgID(localCtx, tenantID) - if sp := opentracing.SpanFromContext(ctx); sp != nil { - localCtx = opentracing.ContextWithSpan(localCtx, sp) - } - d.sendStreams(localCtx, ingester, samples, &tracker) - }(ingesterDescs[ingester], streams) - } select { - case err := <-tracker.Err: + case err := <-tracker.err: return nil, err - case <-tracker.Done: + case <-tracker.done: return &logproto.PushResponse{}, validationErr case <-ctx.Done(): return nil, ctx.Err() @@ -734,7 +808,7 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto } // TODO taken from Cortex, see if we can refactor out an usable interface. -func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *PushTracker) { +func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { err := d.sendStreamsErr(ctx, ingester, streamTrackers) // If we succeed, decrement each stream's pending count by one. @@ -751,15 +825,15 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { continue } - if pushTracker.StreamsFailed.Inc() == 1 { - pushTracker.Err <- err + if pushTracker.streamsFailed.Inc() == 1 { + pushTracker.err <- err } } else { if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { continue } - if pushTracker.StreamsPending.Dec() == 0 { - pushTracker.Done <- struct{}{} + if pushTracker.streamsPending.Dec() == 0 { + pushTracker.done <- struct{}{} } } } @@ -792,6 +866,74 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance return err } +func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker) { + for _, s := range streams { + go func(s KeyedStream) { + err := d.sendStreamToKafka(ctx, s, tenant) + if err != nil { + if tracker.streamsFailed.Inc() == 1 { + tracker.err <- fmt.Errorf("failed to write stream to kafka: %w", err) + } + } else { + if tracker.streamsPending.Dec() == 0 { + tracker.done <- struct{}{} + } + } + }(s) + } +} + +func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string) error { + if len(stream.Stream.Entries) == 0 { + return nil + } + partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) + if err != nil { + d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() + return fmt.Errorf("failed to find active partition for stream: %w", err) + } + + startTime := time.Now() + + records, err := kafka.Encode(partitionID, tenant, stream.Stream, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes) + if err != nil { + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + return fmt.Errorf("failed to marshal write request to records: %w", err) + } + + d.kafkaRecordsPerRequest.Observe(float64(len(records))) + + produceResults := d.kafkaWriter.ProduceSync(ctx, records) + + if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 { + d.kafkaWriteLatency.Observe(time.Since(startTime).Seconds()) + d.kafkaWriteBytesTotal.Add(float64(sizeBytes)) + } + + var finalErr error + for _, result := range produceResults { + if result.Err != nil { + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + finalErr = err + } else { + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() + } + } + + return finalErr +} + +func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) { + for _, res := range results { + if res.Err == nil && res.Record != nil { + count++ + sizeBytes += len(res.Record.Value) + } + } + + return +} + type labelData struct { ls labels.Labels hash uint64 diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index df199083cdd93..39df95efe91f7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1299,7 +1299,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, nil, log.NewNopLogger()) + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, nil, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) distributors[i] = d diff --git a/pkg/distributor/tee.go b/pkg/distributor/tee.go index 3ce3f514fc665..04acb1e22c0df 100644 --- a/pkg/distributor/tee.go +++ b/pkg/distributor/tee.go @@ -5,10 +5,6 @@ type Tee interface { Duplicate(tenant string, streams []KeyedStream) } -type TrackedTee interface { - DuplicateWithTracking(tenantID string, streams []KeyedStream, tracker *PushTracker) -} - // WrapTee wraps a new Tee around an existing Tee. func WrapTee(existing, new Tee) Tee { if existing == nil { @@ -29,25 +25,3 @@ func (m *multiTee) Duplicate(tenant string, streams []KeyedStream) { tee.Duplicate(tenant, streams) } } - -type multiTrackedTee struct { - tees []TrackedTee -} - -// WrapTrackedTee wraps a new TrackedTee around an existing TrackedTee. -func WrapTrackedTee(existing, new TrackedTee) TrackedTee { - if existing == nil { - return new - } - if multi, ok := existing.(*multiTrackedTee); ok { - return &multiTrackedTee{append(multi.tees, new)} - } - return &multiTrackedTee{tees: []TrackedTee{existing, new}} -} - -func (m *multiTrackedTee) DuplicateWithTracking(tenant string, streams []KeyedStream, tracker *PushTracker) { - tracker.StreamsPending.Add(int32(len(streams) * len(m.tees))) - for _, tee := range m.tees { - go tee.DuplicateWithTracking(tenant, streams, tracker) - } -} diff --git a/pkg/distributor/tee_test.go b/pkg/distributor/tee_test.go index f953e09b75111..ccb8e5c9c2365 100644 --- a/pkg/distributor/tee_test.go +++ b/pkg/distributor/tee_test.go @@ -3,9 +3,8 @@ package distributor import ( "testing" - "github.com/stretchr/testify/mock" - "github.com/grafana/loki/pkg/push" + "github.com/stretchr/testify/mock" ) type mockedTee struct { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dbb1ccb665216..9c913f9049f44 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -193,7 +193,6 @@ func (cfg *Config) Validate() error { type KafkaIngestionConfig struct { Enabled bool `yaml:"enabled" doc:"description=Whether the kafka ingester is enabled."` - Required bool `yaml:"required" doc:"description=Whether ingestion to kafka is required to succeed during a Push request or if they are best effort."` PartitionRingConfig partitionring.Config `yaml:"partition_ring" category:"experimental"` KafkaConfig kafka.Config `yaml:"-"` } diff --git a/pkg/kafka/tee/tee.go b/pkg/kafka/tee/tee.go deleted file mode 100644 index 3076ae836633c..0000000000000 --- a/pkg/kafka/tee/tee.go +++ /dev/null @@ -1,199 +0,0 @@ -package tee - -import ( - "context" - "fmt" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/twmb/franz-go/pkg/kgo" - - "github.com/grafana/loki/v3/pkg/distributor" - "github.com/grafana/loki/v3/pkg/kafka" -) - -const writeTimeout = time.Minute - -// Tee represents a component that duplicates log streams to Kafka. -type Tee struct { - logger log.Logger - producer *kafka.Producer - partitionRing ring.PartitionRingReader - cfg kafka.Config - - ingesterAppends *prometheus.CounterVec - writeLatency prometheus.Histogram - writeBytesTotal prometheus.Counter - recordsPerRequest prometheus.Histogram -} - -// NewTee creates and initializes a new Tee instance. -// -// Parameters: -// - cfg: Kafka configuration -// - metricsNamespace: Namespace for Prometheus metrics -// - registerer: Prometheus registerer for metrics -// - logger: Logger instance -// - partitionRing: Ring for managing partitions -// -// Returns: -// - A new Tee instance and any error encountered during initialization -func NewTee( - cfg kafka.Config, - metricsNamespace string, - registerer prometheus.Registerer, - logger log.Logger, - partitionRing ring.PartitionRingReader, -) (*Tee, error) { - registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - - kafkaClient, err := kafka.NewWriterClient(cfg, 20, logger, registerer) - if err != nil { - return nil, fmt.Errorf("failed to start kafka client: %w", err) - } - producer := kafka.NewProducer(kafkaClient, cfg.ProducerMaxBufferedBytes, - prometheus.WrapRegistererWithPrefix("_kafka_ingester_", registerer)) - - t := &Tee{ - logger: log.With(logger, "component", "kafka-tee"), - ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "kafka_ingester_appends_total", - Help: "The total number of appends sent to kafka ingest path.", - }, []string{"partition", "status"}), - producer: producer, - partitionRing: partitionRing, - cfg: cfg, - // Metrics. - writeLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Name: "kafka_ingester_tee_latency_seconds", - Help: "Latency to write an incoming request to the ingest storage.", - NativeHistogramBucketFactor: 1.1, - NativeHistogramMinResetDuration: 1 * time.Hour, - NativeHistogramMaxBucketNumber: 100, - Buckets: prometheus.DefBuckets, - }), - writeBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Name: "kafka_ingester_tee_sent_bytes_total", - Help: "Total number of bytes sent to the ingest storage.", - }), - recordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ - Name: "kafka_ingester_tee_records_per_write_request", - Help: "The number of records a single per-partition write request has been split into.", - Buckets: prometheus.ExponentialBuckets(1, 2, 8), - }), - } - - return t, nil -} - -// Duplicate implements the distributor.Tee interface, which is used to duplicate -// distributor requests to different destinations. It asynchronously sends each stream -// to Kafka. -// -// Parameters: -// - tenant: The tenant identifier -// - streams: A slice of KeyedStream to be duplicated -func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { - for idx := range streams { - go func(stream distributor.KeyedStream) { - if err := t.sendStream(tenant, stream); err != nil { - level.Error(t.logger).Log("msg", "failed to send stream to kafka", "err", err) - } - }(streams[idx]) - } -} - -// DuplicateWithTracking implements the distributor.TrackedTee interface, which is used to duplicate -// distributor requests to different destinations. It sends each stream -// to Kafka and requires all streams to be ack'd to the tracker. -// -// Parameters: -// - tenant: The tenant identifier -// - streams: A slice of KeyedStream to be duplicated -// - tracker: A tracker object that returns emits when all streams are ack'd or on first error. -func (t *Tee) DuplicateWithTracking(tenant string, streams []distributor.KeyedStream, tracker *distributor.PushTracker) { - for idx := range streams { - go func(stream distributor.KeyedStream) { - err := t.sendStream(tenant, stream) - if err != nil { - if tracker.StreamsFailed.Inc() == 1 { - tracker.Err <- err - } - } else { - if tracker.StreamsPending.Dec() == 0 { - tracker.Done <- struct{}{} - } - } - }(streams[idx]) - } -} - -func (t *Tee) Close() { - t.producer.Close() -} - -// sendStream sends a single stream to Kafka. -// -// Parameters: -// - tenant: The tenant identifier -// - stream: The KeyedStream to be sent -// -// Returns: -// - An error if the stream couldn't be sent successfully -func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { - if len(stream.Stream.Entries) == 0 { - return nil - } - partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) - if err != nil { - t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc() - return fmt.Errorf("failed to find active partition for stream: %w", err) - } - - startTime := time.Now() - - records, err := kafka.Encode(partitionID, tenant, stream.Stream, t.cfg.ProducerMaxRecordSizeBytes) - if err != nil { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() - return fmt.Errorf("failed to marshal write request to records: %w", err) - } - - t.recordsPerRequest.Observe(float64(len(records))) - - ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout) - defer cancel() - produceResults := t.producer.ProduceSync(ctx, records) - - if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 { - t.writeLatency.Observe(time.Since(startTime).Seconds()) - t.writeBytesTotal.Add(float64(sizeBytes)) - } - - var finalErr error - for _, result := range produceResults { - if result.Err != nil { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() - finalErr = err - } else { - t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() - } - } - - return finalErr -} - -func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) { - for _, res := range results { - if res.Err == nil && res.Record != nil { - count++ - sizeBytes += len(res.Record.Value) - } - } - - return -} diff --git a/pkg/kafka/tee/tee_test.go b/pkg/kafka/tee/tee_test.go deleted file mode 100644 index 2431f42033fc7..0000000000000 --- a/pkg/kafka/tee/tee_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package tee - -import ( - "os" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/ring" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/distributor" - "github.com/grafana/loki/v3/pkg/kafka/testkafka" - - "github.com/grafana/loki/pkg/push" -) - -func TestPushKafkaRecords(t *testing.T) { - _, cfg := testkafka.CreateCluster(t, 1, "topic") - tee, err := NewTee(cfg, "test", prometheus.NewRegistry(), log.NewLogfmtLogger(os.Stdout), newTestPartitionRing()) - require.NoError(t, err) - - err = tee.sendStream("test", distributor.KeyedStream{ - HashKey: 1, - Stream: push.Stream{ - Labels: `{foo="bar"}`, - Entries: []push.Entry{ - {Timestamp: time.Now(), Line: "test"}, - }, - }, - }) - require.NoError(t, err) -} - -type testPartitionRing struct { - partitionRing *ring.PartitionRing -} - -func (t *testPartitionRing) PartitionRing() *ring.PartitionRing { - return t.partitionRing -} - -func newTestPartitionRing() ring.PartitionRingReader { - desc := ring.NewPartitionRingDesc() - desc.AddPartition(0, ring.PartitionActive, time.Now()) - return &testPartitionRing{ - partitionRing: ring.NewPartitionRing(*desc), - } -} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 467c03eb0a477..20ea802f12ebb 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -300,6 +300,9 @@ func (c *Config) Validate() error { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid kafka_config config")) } } + if err := c.Distributor.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid distributor config")) + } errs = append(errs, validateSchemaValues(c)...) errs = append(errs, ValidateConfigCompatibility(*c)...) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index c9a02f77c448d..d256ffbd00966 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -49,7 +49,6 @@ import ( "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" ingesterkafka "github.com/grafana/loki/v3/pkg/kafka/ingester" - kafka_tee "github.com/grafana/loki/v3/pkg/kafka/tee" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -322,18 +321,7 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { } func (t *Loki) initDistributor() (services.Service, error) { - var trackedTee distributor.TrackedTee - if t.Cfg.Ingester.KafkaIngestion.Enabled { - kafkaTee, err := kafka_tee.NewTee(t.Cfg.KafkaConfig, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, util_log.Logger, t.partitionRing) - if err != nil { - return nil, err - } - if t.Cfg.Ingester.KafkaIngestion.Required { - trackedTee = distributor.WrapTrackedTee(trackedTee, kafkaTee) - } else { - t.Tee = distributor.WrapTee(t.Tee, kafkaTee) - } - } + t.Cfg.Distributor.KafkaConfig = t.Cfg.KafkaConfig var err error logger := log.With(util_log.Logger, "component", "distributor") @@ -342,11 +330,11 @@ func (t *Loki) initDistributor() (services.Service, error) { t.Cfg.IngesterClient, t.tenantConfigs, t.ring, + t.partitionRing, t.Overrides, prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace, t.Tee, - trackedTee, t.UsageTracker, logger, ) From 6228141e8fcbfbea65f350d5b3c265a523787ac7 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 20 Sep 2024 16:37:31 +0100 Subject: [PATCH 4/8] Add tests for pushing to Kafka --- pkg/distributor/distributor.go | 14 +++- pkg/distributor/distributor_test.go | 125 +++++++++++++++++++++++++++- 2 files changed, 133 insertions(+), 6 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 8bd13b4aae26c..e8eb62df9154a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -119,6 +119,11 @@ type RateStore interface { RateFor(tenantID string, streamHash uint64) (int64, float64) } +type KafkaProducer interface { + ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults + Close() +} + // Distributor coordinates replicates and distribution of log streams. type Distributor struct { services.Service @@ -164,7 +169,7 @@ type Distributor struct { usageTracker push.UsageTracker // kafka - kafkaWriter *kafka.Producer + kafkaWriter KafkaProducer partitionRing ring.PartitionRingReader // kafka metrics @@ -223,7 +228,7 @@ func New( return nil, fmt.Errorf("partition ring is required for kafka writes") } - var kafkaWriter *kafka.Producer + var kafkaWriter KafkaProducer if cfg.KafkaEnabled { kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer) if err != nil { @@ -358,6 +363,9 @@ func (d *Distributor) running(ctx context.Context) error { } func (d *Distributor) stopping(_ error) error { + if d.kafkaWriter != nil { + d.kafkaWriter.Close() + } return services.StopManagerAndAwaitStopped(context.Background(), d.subservices) } @@ -914,7 +922,7 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, for _, result := range produceResults { if result.Err != nil { d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() - finalErr = err + finalErr = result.Err } else { d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 39df95efe91f7..4c43fb6d4cf94 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" @@ -505,6 +506,76 @@ func TestDistributorPushErrors(t *testing.T) { }) } +func TestDistributorPushToKafka(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + t.Run("with kafka, any failure fails the request", func(t *testing.T) { + kafkaWriter := &mockKafkaWriter{ + failOnWrite: true, + } + distributors, _ := prepare(t, 1, 0, limits, nil) + for _, d := range distributors { + d.cfg.KafkaEnabled = true + d.cfg.IngesterEnabled = false + d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000 + d.kafkaWriter = kafkaWriter + } + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.Error(t, err) + }) + + t.Run("with kafka, no failures is successful", func(t *testing.T) { + kafkaWriter := &mockKafkaWriter{ + failOnWrite: false, + } + distributors, _ := prepare(t, 1, 0, limits, nil) + for _, d := range distributors { + d.cfg.KafkaEnabled = true + d.cfg.IngesterEnabled = false + d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000 + d.kafkaWriter = kafkaWriter + } + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.NoError(t, err) + + require.Equal(t, 1, kafkaWriter.pushed) + }) + + t.Run("with kafka and ingesters, both must complete", func(t *testing.T) { + kafkaWriter := &mockKafkaWriter{ + failOnWrite: false, + } + distributors, ingesters := prepare(t, 1, 3, limits, nil) + ingesters[0].succeedAfter = 5 * time.Millisecond + ingesters[1].succeedAfter = 10 * time.Millisecond + ingesters[2].succeedAfter = 15 * time.Millisecond + + for _, d := range distributors { + d.cfg.KafkaEnabled = true + d.cfg.IngesterEnabled = true + d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000 + d.kafkaWriter = kafkaWriter + } + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.NoError(t, err) + + require.Equal(t, 1, kafkaWriter.pushed) + + require.Equal(t, 1, len(ingesters[0].pushed)) + require.Equal(t, 1, len(ingesters[1].pushed)) + require.Eventually(t, func() bool { + return len(ingesters[2].pushed) == 1 + }, time.Second, 10*time.Millisecond) + }) +} + func Test_SortLabelsOnPush(t *testing.T) { t.Run("with service_name already present in labels", func(t *testing.T) { limits := &validation.Limits{} @@ -1270,9 +1341,26 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) - test.Poll(t, time.Second, numIngesters, func() interface{} { - return ingestersRing.InstancesCount() + partitionRing := ring.NewPartitionRing(ring.PartitionRingDesc{ + Partitions: map[int32]ring.PartitionDesc{ + 1: { + Id: 1, + Tokens: []uint32{1}, + State: ring.PartitionActive, + StateTimestamp: time.Now().Unix(), + }, + }, + Owners: map[string]ring.OwnerDesc{ + "test": { + OwnedPartition: 1, + State: ring.OwnerActive, + UpdatedTimestamp: time.Now().Unix(), + }, + }, }) + partitionRingReader := mockPartitionRingReader{ + ring: partitionRing, + } loopbackName, err := loki_net.LoopbackInterfaceName() require.NoError(t, err) @@ -1299,7 +1387,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, nil, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) distributors[i] = d @@ -1373,6 +1461,37 @@ func makeWriteRequest(lines, size int) *logproto.PushRequest { return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`}) } +type mockKafkaWriter struct { + failOnWrite bool + pushed int +} + +func (m *mockKafkaWriter) ProduceSync(_ context.Context, _ []*kgo.Record) kgo.ProduceResults { + if m.failOnWrite { + return kgo.ProduceResults{ + { + Err: kgo.ErrRecordTimeout, + }, + } + } + m.pushed += 1 + return kgo.ProduceResults{ + { + Err: nil, + }, + } +} + +func (m *mockKafkaWriter) Close() {} + +type mockPartitionRingReader struct { + ring *ring.PartitionRing +} + +func (m mockPartitionRingReader) PartitionRing() *ring.PartitionRing { + return m.ring +} + type mockIngester struct { grpc_health_v1.HealthClient logproto.PusherClient From 173c230c6aad9c1df2436690c4370e7023d4636a Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 20 Sep 2024 16:56:20 +0100 Subject: [PATCH 5/8] docs --- docs/sources/shared/configuration.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d50669016356c..6df1c5bd8cbc7 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2251,6 +2251,14 @@ otlp_config: # List of default otlp resource attributes to be picked as index labels # CLI flag: -distributor.otlp.default_resource_attributes_as_index_labels [default_resource_attributes_as_index_labels: | default = [service.name service.namespace service.instance.id deployment.environment cloud.region cloud.availability_zone k8s.cluster.name k8s.namespace.name k8s.pod.name k8s.container.name container.name k8s.replicaset.name k8s.deployment.name k8s.statefulset.name k8s.daemonset.name k8s.cronjob.name k8s.job.name]] + +# Enable writes to Kafka during Push requests. +# CLI flag: -distributor.kafka-writes-enabled +[kafka_writes_enabled: | default = false] + +# Enable writes to Ingesters during Push requests. Defaults to true. +# CLI flag: -distributor.ingester-writes-enabled +[ingester_writes_enabled: | default = true] ``` ### etcd From 3602877f7241368ce1bea1e1151ae42df381e15b Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 20 Sep 2024 17:36:20 +0100 Subject: [PATCH 6/8] lint --- pkg/distributor/distributor_test.go | 2 +- pkg/distributor/tee_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4c43fb6d4cf94..cebd46858e177 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1474,7 +1474,7 @@ func (m *mockKafkaWriter) ProduceSync(_ context.Context, _ []*kgo.Record) kgo.Pr }, } } - m.pushed += 1 + m.pushed++ return kgo.ProduceResults{ { Err: nil, diff --git a/pkg/distributor/tee_test.go b/pkg/distributor/tee_test.go index ccb8e5c9c2365..f953e09b75111 100644 --- a/pkg/distributor/tee_test.go +++ b/pkg/distributor/tee_test.go @@ -3,8 +3,9 @@ package distributor import ( "testing" - "github.com/grafana/loki/pkg/push" "github.com/stretchr/testify/mock" + + "github.com/grafana/loki/pkg/push" ) type mockedTee struct { From 7f082e2eb5205af78f8322152e92007c00549767 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Mon, 23 Sep 2024 10:34:36 +0100 Subject: [PATCH 7/8] rewrite pushtracker to make logic clearer --- pkg/distributor/distributor.go | 43 ++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e8eb62df9154a..fc325ae86b6ee 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -391,6 +391,21 @@ type pushTracker struct { err chan error } +// doneWithResult records the result of a stream push. +// If err is nil, the stream push is considered successful. +// If err is not nil, the stream push is considered failed. +func (p *pushTracker) doneWithResult(err error) { + if err == nil { + if p.streamsPending.Dec() == 0 { + p.done <- struct{}{} + } + } else { + if p.streamsFailed.Inc() == 1 { + p.err <- err + } + } +} + // Push a set of streams. // The returned error is the last one seen. func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { @@ -833,16 +848,12 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { continue } - if pushTracker.streamsFailed.Inc() == 1 { - pushTracker.err <- err - } + pushTracker.doneWithResult(err) } else { if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { continue } - if pushTracker.streamsPending.Dec() == 0 { - pushTracker.done <- struct{}{} - } + pushTracker.doneWithResult(nil) } } } @@ -879,14 +890,9 @@ func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStr go func(s KeyedStream) { err := d.sendStreamToKafka(ctx, s, tenant) if err != nil { - if tracker.streamsFailed.Inc() == 1 { - tracker.err <- fmt.Errorf("failed to write stream to kafka: %w", err) - } - } else { - if tracker.streamsPending.Dec() == 0 { - tracker.done <- struct{}{} - } + err = fmt.Errorf("failed to write stream to kafka: %w", err) } + tracker.doneWithResult(err) }(s) } } @@ -895,11 +901,12 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, if len(stream.Stream.Entries) == 0 { return nil } - partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) - if err != nil { - d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() - return fmt.Errorf("failed to find active partition for stream: %w", err) - } + /* partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) + if err != nil { + d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() + return fmt.Errorf("failed to find active partition for stream: %w", err) + }*/ + partitionID := int32(0) startTime := time.Now() From 4397e497d8a027fa0d18487dd269b2b366f336a4 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Mon, 23 Sep 2024 10:41:22 +0100 Subject: [PATCH 8/8] rename metric prefix to kafka --- pkg/distributor/distributor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index fc325ae86b6ee..08fba483ec9bc 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -235,7 +235,7 @@ func New( return nil, fmt.Errorf("failed to start kafka client: %w", err) } kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, - prometheus.WrapRegistererWithPrefix("_kafka_ingester_", registerer)) + prometheus.WrapRegistererWithPrefix("_kafka_", registerer)) } d := &Distributor{