Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Add Ingestion from Kafka in Ingesters #14192

Merged
merged 13 commits into from
Sep 24, 2024
34 changes: 34 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
Expand Down Expand Up @@ -293,6 +295,10 @@ type Ingester struct {
// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
recalculateOwnedStreams *recalculateOwnedStreams

ingestPartitionID int32
partitionRingLifecycler *ring.PartitionInstanceLifecycler
partitionReader *partition.Reader
}

// New makes a new Ingester.
Expand Down Expand Up @@ -356,6 +362,34 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.lifecyclerWatcher = services.NewFailureWatcher()
i.lifecyclerWatcher.WatchService(i.lifecycler)

if i.cfg.KafkaIngestion.Enabled {
i.ingestPartitionID, err = partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
}
partitionRingKV := cfg.KafkaIngestion.PartitionRingConfig.KVStore.Mock
if partitionRingKV == nil {
partitionRingKV, err = kv.NewClient(cfg.KafkaIngestion.PartitionRingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger)
if err != nil {
return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err)
}
}
i.partitionRingLifecycler = ring.NewPartitionInstanceLifecycler(
i.cfg.KafkaIngestion.PartitionRingConfig.ToLifecyclerConfig(i.ingestPartitionID, cfg.LifecyclerConfig.ID),
PartitionRingName,
PartitionRingKey,
partitionRingKV,
logger,
prometheus.WrapRegistererWithPrefix("loki_", registerer))

i.partitionReader, err = partition.NewReader(cfg.KafkaIngestion.KafkaConfig, i.ingestPartitionID, cfg.LifecyclerConfig.ID, NewKafkaConsumerFactory(i, logger), logger, registerer)
if err != nil {
return nil, err
}
i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler)
i.lifecyclerWatcher.WatchService(i.partitionReader)
}

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
Expand Down
80 changes: 80 additions & 0 deletions pkg/ingester/kafka_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ingester

import (
"context"
math "math"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/user"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/logproto"
)

func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger) partition.ConsumerFactory {
return func(committer partition.Committer) (partition.Consumer, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}
return &kafkaConsumer{
pusher: pusher,
logger: logger,
decoder: decoder,
}, nil
}
}

type kafkaConsumer struct {
pusher logproto.PusherServer
logger log.Logger
decoder *kafka.Decoder
}

func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
level.Info(kc.logger).Log("msg", "shutting down kafka consumer")
return
case records := <-recordsChan:
kc.consume(records)
}
}
}()
return wg.Wait
}

func (kc *kafkaConsumer) consume(records []partition.Record) {
if len(records) == 0 {
return
}
var (
minOffset = int64(math.MaxInt64)
maxOffset = int64(0)
)
for _, record := range records {
minOffset = min(minOffset, record.Offset)
maxOffset = max(maxOffset, record.Offset)
}
level.Debug(kc.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset)
for _, record := range records {
stream, err := kc.decoder.DecodeWithoutLabels(record.Content)
if err != nil {
level.Error(kc.logger).Log("msg", "failed to decode record", "error", err)
continue
}
ctx := user.InjectOrgID(record.Ctx, record.TenantID)
if _, err := kc.pusher.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{stream},
}); err != nil {
level.Error(kc.logger).Log("msg", "failed to push records", "error", err)
}
}
}
48 changes: 44 additions & 4 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package kafka

import (
"context"
"errors"
"flag"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)

const (
Expand Down Expand Up @@ -52,12 +58,13 @@ type Config struct {
DialTimeout time.Duration `yaml:"dial_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`

ConsumerGroup string `yaml:"consumer_group"`
ConsumerGroup string `yaml:"consumer_group"`
ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`

LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`

AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"`
// AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"`
AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"`
AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"`

ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`
Expand All @@ -75,11 +82,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.")

f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '<partition>' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.")
f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.")

f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.")

f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.")
// f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.")
f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 1000, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.")
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved

f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")
Expand Down Expand Up @@ -107,3 +115,35 @@ func (cfg *Config) GetConsumerGroup(instanceID string, partitionID int32) string

return strings.ReplaceAll(cfg.ConsumerGroup, "<partition>", strconv.Itoa(int(partitionID)))
}

// SetDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
// This is best-effort, if setting the option fails, error is logged, but not returned.
func (cfg Config) SetDefaultNumberOfPartitionsForAutocreatedTopics(logger log.Logger) {
if cfg.AutoCreateTopicDefaultPartitions <= 0 {
return
}

cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...)
if err != nil {
level.Error(logger).Log("msg", "failed to create kafka client", "err", err)
return
}

adm := kadm.NewClient(cl)
defer adm.Close()

defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
_, err = adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
{
Op: kadm.SetConfig,
Name: "num.partitions",
Value: &defaultNumberOfPartitions,
},
})
if err != nil {
level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err)
return
}

level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions)
}
41 changes: 41 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kafka

import (
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kmsg"
)

func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) {
cluster, err := kfake.NewCluster(kfake.NumBrokers(1))
require.NoError(t, err)
t.Cleanup(cluster.Close)

addrs := cluster.ListenAddrs()
require.Len(t, addrs, 1)

cfg := Config{
Address: addrs[0],
AutoCreateTopicDefaultPartitions: 100,
}

cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) {
r := request.(*kmsg.AlterConfigsRequest)

require.Len(t, r.Resources, 1)
res := r.Resources[0]
require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType)
require.Len(t, res.Configs, 1)
cfg := res.Configs[0]
require.Equal(t, "num.partitions", cfg.Name)
require.NotNil(t, *cfg.Value)
require.Equal(t, "100", *cfg.Value)

return &kmsg.AlterConfigsResponse{}, nil, true
})

cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(log.NewNopLogger())
}
9 changes: 9 additions & 0 deletions pkg/kafka/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) {
return *d.stream, ls, nil
}

// DecodeWithoutLabels converts a Kafka record's byte data back into a logproto.Stream without parsing labels.
func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) {
d.stream.Entries = d.stream.Entries[:0]
if err := d.stream.Unmarshal(data); err != nil {
return logproto.Stream{}, fmt.Errorf("failed to unmarshal stream: %w", err)
}
return *d.stream, nil
}

// sovPush calculates the size of varint-encoded uint64.
// It is used to determine the number of bytes needed to encode a uint64 value
// in Protocol Buffers' variable-length integer format.
Expand Down
26 changes: 11 additions & 15 deletions pkg/kafka/ingester/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal"
)
Expand All @@ -36,17 +37,12 @@ type MetadataStore interface {
AddBlock(ctx context.Context, in *metastorepb.AddBlockRequest, opts ...grpc.CallOption) (*metastorepb.AddBlockResponse, error)
}

// Committer defines an interface for committing offsets
type Committer interface {
Commit(ctx context.Context, offset int64) error
}

// consumer represents a Kafka consumer that processes and stores log entries
type consumer struct {
metastoreClient MetadataStore
storage ObjectStorage
writer *wal.SegmentWriter
committer Committer
committer partition.Committer
flushInterval time.Duration
maxFlushSize int64
lastOffset int64
Expand All @@ -67,8 +63,8 @@ func NewConsumerFactory(
maxFlushSize int64,
logger log.Logger,
reg prometheus.Registerer,
) ConsumerFactory {
return func(committer Committer) (Consumer, error) {
) partition.ConsumerFactory {
return func(committer partition.Committer) (partition.Consumer, error) {
writer, err := wal.NewWalSegmentWriter()
if err != nil {
return nil, err
Expand All @@ -95,7 +91,7 @@ func NewConsumerFactory(

// Start starts the consumer and returns a function to wait for it to finish
// It consumes records from the recordsChan, and flushes them to storage periodically.
func (c *consumer) Start(ctx context.Context, recordsChan <-chan []record) func() {
func (c *consumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand Down Expand Up @@ -127,7 +123,7 @@ func (c *consumer) Start(ctx context.Context, recordsChan <-chan []record) func(
}

// consume processes a batch of Kafka records, decoding and storing them
func (c *consumer) consume(records []record) error {
func (c *consumer) consume(records []partition.Record) error {
if len(records) == 0 {
return nil
}
Expand All @@ -136,8 +132,8 @@ func (c *consumer) consume(records []record) error {
maxOffset = int64(0)
)
for _, record := range records {
minOffset = min(minOffset, record.offset)
maxOffset = max(maxOffset, record.offset)
minOffset = min(minOffset, record.Offset)
maxOffset = max(maxOffset, record.Offset)
}
level.Debug(c.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset)
return c.retryWithBackoff(context.Background(), backoff.Config{
Expand All @@ -163,9 +159,9 @@ func (c *consumer) consume(records []record) error {
})
}

func (c *consumer) appendRecords(records []record) error {
func (c *consumer) appendRecords(records []partition.Record) error {
for _, record := range records {
stream, labels, err := c.decoder.Decode(record.content)
stream, labels, err := c.decoder.Decode(record.Content)
if err != nil {
return fmt.Errorf("failed to decode record: %w", err)
}
Expand All @@ -184,7 +180,7 @@ func (c *consumer) appendRecords(records []record) error {
Parsed: entry.Parsed,
})
}
c.writer.Append(record.tenantID, stream.Labels, labels, c.toStore, time.Now())
c.writer.Append(record.TenantID, stream.Labels, labels, c.toStore, time.Now())
}
return nil
}
Expand Down
Loading