From e2a209c076c9c9fd53732a0a7804acba3bff378e Mon Sep 17 00:00:00 2001 From: benclive Date: Tue, 15 Oct 2024 16:56:19 +0100 Subject: [PATCH] feat(kafka): Add support for SASL auth to Kafka (#14487) --- docs/sources/shared/configuration.md | 10 ++ pkg/distributor/distributor.go | 5 +- pkg/kafka/{ => client}/logger.go | 2 +- pkg/kafka/{ => client}/reader_client.go | 38 ++++++- pkg/kafka/client/reader_client_test.go | 104 ++++++++++++++++++ pkg/kafka/{ => client}/writer_client.go | 36 +++++- pkg/kafka/client/writer_client_test.go | 71 ++++++++++++ pkg/kafka/config.go | 73 ++++-------- pkg/kafka/config_test.go | 54 +++++---- pkg/kafka/partition/committer_test.go | 4 +- pkg/kafka/partition/reader.go | 5 +- pkg/kafka/partition/reader_test.go | 15 +-- pkg/kafka/testkafka/cluster.go | 16 ++- .../twmb/franz-go/pkg/sasl/plain/plain.go | 60 ++++++++++ vendor/modules.txt | 1 + 15 files changed, 387 insertions(+), 107 deletions(-) rename pkg/kafka/{ => client}/logger.go (98%) rename pkg/kafka/{ => client}/reader_client.go (51%) create mode 100644 pkg/kafka/client/reader_client_test.go rename pkg/kafka/{ => client}/writer_client.go (90%) create mode 100644 pkg/kafka/client/writer_client_test.go create mode 100644 vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 132de42b81075..a161232ab9305 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -793,6 +793,16 @@ kafka_config: # CLI flag: -kafka.write-timeout [write_timeout: | default = 10s] + # The SASL username for authentication to Kafka using the PLAIN mechanism. + # Both username and password must be set. + # CLI flag: -kafka.sasl-username + [sasl_username: | default = ""] + + # The SASL password for authentication to Kafka using the PLAIN mechanism. + # Both username and password must be set. + # CLI flag: -kafka.sasl-password + [sasl_password: | default = ""] + # The consumer group used by the consumer to track the last consumed offset. # The consumer group must be different for each ingester. If the configured # consumer group contains the '' placeholder, it is replaced with diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 30383bfcbbbd4..6f69f0e02a84e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -47,6 +47,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/kafka" + kafka_client "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" @@ -234,11 +235,11 @@ func New( var kafkaWriter KafkaProducer if cfg.KafkaEnabled { - kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer) + kafkaClient, err := kafka_client.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer) if err != nil { return nil, fmt.Errorf("failed to start kafka client: %w", err) } - kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, + kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes, prometheus.WrapRegistererWithPrefix("_kafka_", registerer)) } diff --git a/pkg/kafka/logger.go b/pkg/kafka/client/logger.go similarity index 98% rename from pkg/kafka/logger.go rename to pkg/kafka/client/logger.go index e055094a4163b..3be96839e1205 100644 --- a/pkg/kafka/logger.go +++ b/pkg/kafka/client/logger.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package kafka +package client import ( "github.com/go-kit/log" diff --git a/pkg/kafka/reader_client.go b/pkg/kafka/client/reader_client.go similarity index 51% rename from pkg/kafka/reader_client.go rename to pkg/kafka/client/reader_client.go index 9237686fee609..e8bbb2da8c86a 100644 --- a/pkg/kafka/reader_client.go +++ b/pkg/kafka/client/reader_client.go @@ -1,19 +1,25 @@ // SPDX-License-Identifier: AGPL-3.0-only -package kafka +package client import ( + "context" + "fmt" "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/plugin/kprom" + + "github.com/grafana/loki/v3/pkg/kafka" ) // NewReaderClient returns the kgo.Client that should be used by the Reader. -func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { +func NewReaderClient(kafkaCfg kafka.Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) { const fetchMaxBytes = 100_000_000 opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...) @@ -33,7 +39,7 @@ func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, return nil, errors.Wrap(err, "creating kafka client") } if kafkaCfg.AutoCreateTopicEnabled { - kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger) } return client, nil } @@ -44,3 +50,29 @@ func NewReaderClientMetrics(component string, reg prometheus.Registerer) *kprom. // Do not export the client ID, because we use it to specify options to the backend. kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes)) } + +// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers. +// This is best-effort, if setting the option fails, error is logged, but not returned. +func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo.Client, logger log.Logger) { + if cfg.AutoCreateTopicDefaultPartitions <= 0 { + return + } + + // Note: this client doesn't get closed because it is owned by the caller + adm := kadm.NewClient(cl) + + defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions) + _, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{ + { + Op: kadm.SetConfig, + Name: "num.partitions", + Value: &defaultNumberOfPartitions, + }, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err) + return + } + + level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions) +} diff --git a/pkg/kafka/client/reader_client_test.go b/pkg/kafka/client/reader_client_test.go new file mode 100644 index 0000000000000..90980ad0e9128 --- /dev/null +++ b/pkg/kafka/client/reader_client_test.go @@ -0,0 +1,104 @@ +package client + +import ( + "context" + "testing" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/testkafka" +) + +func TestNewReaderClient(t *testing.T) { + _, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password")) + + tests := []struct { + name string + config kafka.Config + wantErr bool + }{ + { + name: "valid config", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: false, + }, + { + name: "wrong password", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("wrong wrong wrong"), + }, + wantErr: true, + }, + { + name: "wrong username", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + SASLUsername: "wrong wrong wrong", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewReaderClient(tt.config, nil, nil) + require.NoError(t, err) + + err = client.Ping(context.Background()) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) { + cluster, err := kfake.NewCluster(kfake.NumBrokers(1)) + require.NoError(t, err) + t.Cleanup(cluster.Close) + + addrs := cluster.ListenAddrs() + require.Len(t, addrs, 1) + + cfg := kafka.Config{ + Address: addrs[0], + AutoCreateTopicDefaultPartitions: 100, + } + + cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) { + r := request.(*kmsg.AlterConfigsRequest) + + require.Len(t, r.Resources, 1) + res := r.Resources[0] + require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType) + require.Len(t, res.Configs, 1) + cfg := res.Configs[0] + require.Equal(t, "num.partitions", cfg.Name) + require.NotNil(t, *cfg.Value) + require.Equal(t, "100", *cfg.Value) + + return &kmsg.AlterConfigsResponse{}, nil, true + }) + + client, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, log.NewNopLogger())...) + require.NoError(t, err) + + setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, client, log.NewNopLogger()) +} diff --git a/pkg/kafka/writer_client.go b/pkg/kafka/client/writer_client.go similarity index 90% rename from pkg/kafka/writer_client.go rename to pkg/kafka/client/writer_client.go index 59fefda31d19b..1493e17f51686 100644 --- a/pkg/kafka/writer_client.go +++ b/pkg/kafka/client/writer_client.go @@ -1,4 +1,4 @@ -package kafka +package client import ( "context" @@ -13,20 +13,30 @@ import ( "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/plugin/kotel" "github.com/twmb/franz-go/plugin/kprom" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" + "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/util/constants" ) +var ( + // writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout. + // You can think about this overhead as an extra time for requests sitting in the client's buffer + // before being sent on the wire and the actual time it takes to send it over the network and + // start being processed by Kafka. + writerRequestTimeoutOverhead = 2 * time.Second +) + // NewWriterClient returns the kgo.Client that should be used by the Writer. // // The input prometheus.Registerer must be wrapped with a prefix (the names of metrics // registered don't have a prefix). -func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) { +func NewWriterClient(kafkaCfg kafka.Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) { // Do not export the client ID, because we use it to specify options to the backend. metrics := kprom.NewMetrics( "", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix. @@ -42,7 +52,7 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log kgo.RecordPartitioner(kgo.ManualPartitioner()), // Set the upper bounds the size of a record batch. - kgo.ProducerBatchMaxBytes(producerBatchMaxBytes), + kgo.ProducerBatchMaxBytes(kafka.ProducerBatchMaxBytes), // By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency // (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher @@ -81,10 +91,14 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited". kgo.MaxBufferedBytes(0), ) + client, err := kgo.NewClient(opts...) + if err != nil { + return nil, err + } if kafkaCfg.AutoCreateTopicEnabled { - kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) + setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger) } - return kgo.NewClient(opts...) + return client, nil } type onlySampledTraces struct { @@ -99,7 +113,7 @@ func (o onlySampledTraces) Inject(ctx context.Context, carrier propagation.TextM o.TextMapPropagator.Inject(ctx, carrier) } -func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt { +func commonKafkaClientOptions(cfg kafka.Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt { opts := []kgo.Opt{ kgo.ClientID(cfg.ClientID), kgo.SeedBrokers(cfg.Address), @@ -139,6 +153,16 @@ func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Log }), } + // SASL plain auth. + if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" { + opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) { + return plain.Auth{ + User: cfg.SASLUsername, + Pass: cfg.SASLPassword.String(), + }, nil + }))) + } + if cfg.AutoCreateTopicEnabled { opts = append(opts, kgo.AllowAutoTopicCreation()) } diff --git a/pkg/kafka/client/writer_client_test.go b/pkg/kafka/client/writer_client_test.go new file mode 100644 index 0000000000000..4feb782ffe639 --- /dev/null +++ b/pkg/kafka/client/writer_client_test.go @@ -0,0 +1,71 @@ +package client + +import ( + "context" + "testing" + "time" + + "github.com/grafana/dskit/flagext" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/testkafka" +) + +func TestNewWriterClient(t *testing.T) { + _, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password")) + + tests := []struct { + name string + config kafka.Config + wantErr bool + }{ + { + name: "valid config", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + WriteTimeout: time.Second, + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: false, + }, + { + name: "wrong password", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + WriteTimeout: time.Second, + SASLUsername: "user", + SASLPassword: flagext.SecretWithValue("wrong wrong wrong"), + }, + wantErr: true, + }, + { + name: "wrong username", + config: kafka.Config{ + Address: addr, + Topic: "abcd", + WriteTimeout: time.Second, + SASLUsername: "wrong wrong wrong", + SASLPassword: flagext.SecretWithValue("password"), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewWriterClient(tt.config, 10, nil, nil) + require.NoError(t, err) + + err = client.Ping(context.Background()) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 13cfb618cfdb9..09008bec93411 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -1,7 +1,6 @@ package kafka import ( - "context" "errors" "flag" "fmt" @@ -9,10 +8,7 @@ import ( "strings" "time" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/twmb/franz-go/pkg/kadm" - "github.com/twmb/franz-go/pkg/kgo" + "github.com/grafana/dskit/flagext" ) const ( @@ -21,29 +17,24 @@ const ( consumeFromEnd = "end" consumeFromTimestamp = "timestamp" - // writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout. - // You can think about this overhead as an extra time for requests sitting in the client's buffer - // before being sent on the wire and the actual time it takes to send it over the network and - // start being processed by Kafka. - writerRequestTimeoutOverhead = 2 * time.Second - - // producerBatchMaxBytes is the max allowed size of a batch of Kafka records. - producerBatchMaxBytes = 16_000_000 + // ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records. + ProducerBatchMaxBytes = 16_000_000 // maxProducerRecordDataBytesLimit is the max allowed size of a single record data. Given we have a limit - // on the max batch size (producerBatchMaxBytes), a Kafka record data can't be bigger than the batch size + // on the max batch size (ProducerBatchMaxBytes), a Kafka record data can't be bigger than the batch size // minus some overhead required to serialise the batch and the record itself. We use 16KB as such overhead // in the worst case scenario, which is expected to be way above the actual one. - maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384 + maxProducerRecordDataBytesLimit = ProducerBatchMaxBytes - 16384 minProducerRecordDataBytesLimit = 1024 * 1024 ) var ( - ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") - ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") - ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") - ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") - ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) + ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") + ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") + ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") + ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") + ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set") + ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) ) // Config holds the generic config for the Kafka backend. @@ -54,6 +45,9 @@ type Config struct { DialTimeout time.Duration `yaml:"dial_timeout"` WriteTimeout time.Duration `yaml:"write_timeout"` + SASLUsername string `yaml:"sasl_username"` + SASLPassword flagext.Secret `yaml:"sasl_password"` + ConsumerGroup string `yaml:"consumer_group"` ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"` @@ -80,6 +74,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.DialTimeout, prefix+".dial-timeout", 2*time.Second, "The maximum time allowed to open a connection to a Kafka broker.") f.DurationVar(&cfg.WriteTimeout, prefix+".write-timeout", 10*time.Second, "How long to wait for an incoming write request to be successfully committed to the Kafka backend.") + f.StringVar(&cfg.SASLUsername, prefix+".sasl-username", "", "The SASL username for authentication to Kafka using the PLAIN mechanism. Both username and password must be set.") + f.Var(&cfg.SASLPassword, prefix+".sasl-password", "The SASL password for authentication to Kafka using the PLAIN mechanism. Both username and password must be set.") + f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '' placeholder, it is replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir uses the ingester instance ID to guarantee uniqueness.") f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.") @@ -113,6 +110,10 @@ func (cfg *Config) Validate() error { return ErrInvalidMaxConsumerLagAtStartup } + if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") { + return ErrInconsistentSASLUsernameAndPassword + } + return nil } @@ -124,35 +125,3 @@ func (cfg *Config) GetConsumerGroup(instanceID string, partitionID int32) string return strings.ReplaceAll(cfg.ConsumerGroup, "", strconv.Itoa(int(partitionID))) } - -// SetDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers. -// This is best-effort, if setting the option fails, error is logged, but not returned. -func (cfg Config) SetDefaultNumberOfPartitionsForAutocreatedTopics(logger log.Logger) { - if cfg.AutoCreateTopicDefaultPartitions <= 0 { - return - } - - cl, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, logger)...) - if err != nil { - level.Error(logger).Log("msg", "failed to create kafka client", "err", err) - return - } - - adm := kadm.NewClient(cl) - defer adm.Close() - - defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions) - _, err = adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{ - { - Op: kadm.SetConfig, - Name: "num.partitions", - Value: &defaultNumberOfPartitions, - }, - }) - if err != nil { - level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err) - return - } - - level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions) -} diff --git a/pkg/kafka/config_test.go b/pkg/kafka/config_test.go index 7c21e38fd141e..87c456f42adc0 100644 --- a/pkg/kafka/config_test.go +++ b/pkg/kafka/config_test.go @@ -3,39 +3,37 @@ package kafka import ( "testing" - "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" "github.com/stretchr/testify/require" - "github.com/twmb/franz-go/pkg/kfake" - "github.com/twmb/franz-go/pkg/kmsg" ) -func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) { - cluster, err := kfake.NewCluster(kfake.NumBrokers(1)) - require.NoError(t, err) - t.Cleanup(cluster.Close) - - addrs := cluster.ListenAddrs() - require.Len(t, addrs, 1) - +func TestBothSASLParamsMustBeSet(t *testing.T) { cfg := Config{ - Address: addrs[0], - AutoCreateTopicDefaultPartitions: 100, + // Other required params + Address: "abcd", + Topic: "abcd", + ProducerMaxRecordSizeBytes: 1048576, } - cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) { - r := request.(*kmsg.AlterConfigsRequest) - - require.Len(t, r.Resources, 1) - res := r.Resources[0] - require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType) - require.Len(t, res.Configs, 1) - cfg := res.Configs[0] - require.Equal(t, "num.partitions", cfg.Name) - require.NotNil(t, *cfg.Value) - require.Equal(t, "100", *cfg.Value) - - return &kmsg.AlterConfigsResponse{}, nil, true - }) + // No SASL params is valid + err := cfg.Validate() + require.NoError(t, err) - cfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(log.NewNopLogger()) + // Just username is invalid + cfg.SASLUsername = "abcd" + cfg.SASLPassword = flagext.Secret{} + err = cfg.Validate() + require.Error(t, err) + + // Just password is invalid + cfg.SASLUsername = "" + cfg.SASLPassword = flagext.SecretWithValue("abcd") + err = cfg.Validate() + require.Error(t, err) + + // Both username and password is valid + cfg.SASLUsername = "abcd" + cfg.SASLPassword = flagext.SecretWithValue("abcd") + err = cfg.Validate() + require.NoError(t, err) } diff --git a/pkg/kafka/partition/committer_test.go b/pkg/kafka/partition/committer_test.go index 9ef02f910e5d0..1739986cd66c8 100644 --- a/pkg/kafka/partition/committer_test.go +++ b/pkg/kafka/partition/committer_test.go @@ -14,7 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/kafka/testkafka" ) @@ -24,7 +24,7 @@ func TestPartitionCommitter(t *testing.T) { topicName := "test-topic" _, kafkaCfg := testkafka.CreateCluster(t, numPartitions, topicName) - client, err := kafka.NewReaderClient(kafkaCfg, kprom.NewMetrics("foo"), log.NewNopLogger()) + client, err := client.NewReaderClient(kafkaCfg, kprom.NewMetrics("foo"), log.NewNopLogger()) require.NoError(t, err) // Create a Kafka admin client diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index e07a65f8a0f15..e364f3bba748e 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -22,6 +22,7 @@ import ( "github.com/twmb/franz-go/plugin/kprom" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" ) var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded") @@ -94,7 +95,7 @@ func NewReader( // This method is called when the PartitionReader service starts. func (p *Reader) start(ctx context.Context) error { var err error - p.client, err = kafka.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger) + p.client, err = client.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger) if err != nil { return errors.Wrap(err, "creating kafka reader client") } @@ -539,7 +540,7 @@ func newReaderMetrics(reg prometheus.Registerer) readerMetrics { return readerMetrics{ receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), - kprom: kafka.NewReaderClientMetrics("partition-reader", reg), + kprom: client.NewReaderClientMetrics("partition-reader", reg), fetchWaitDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "loki_ingest_storage_reader_records_batch_wait_duration_seconds", Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.", diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index 8d548c8312411..dfd653de78e3d 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -17,6 +17,7 @@ import ( "github.com/twmb/franz-go/pkg/kgo" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/kafka/testkafka" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -58,7 +59,7 @@ func (m *mockConsumer) Flush(ctx context.Context) error { } func TestPartitionReader_BasicFunctionality(t *testing.T) { - _, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic") + _, kafkaCfg := testkafka.CreateCluster(t, 1, "test") consumer := newMockConsumer() consumerFactory := func(_ Committer) (Consumer, error) { @@ -67,7 +68,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) - producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), partitionReader) @@ -82,8 +83,8 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { require.NoError(t, err) require.Len(t, records, 1) - producer.ProduceSync(context.Background(), records...) - producer.ProduceSync(context.Background(), records...) + require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr()) + require.NoError(t, producer.ProduceSync(context.Background(), records...).FirstErr()) // Wait for records to be processed assert.Eventually(t, func() bool { @@ -121,7 +122,7 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) { partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) - producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) stream := logproto.Stream{ @@ -175,11 +176,11 @@ func TestPartitionReader_ProcessCommits(t *testing.T) { partitionID := int32(0) partitionReader, err := NewReader(kafkaCfg, partitionID, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) - producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) // Init the client: This usually happens in "start" but we want to manage our own lifecycle for this test. - partitionReader.client, err = kafka.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(), + partitionReader.client, err = client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger(), kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ kafkaCfg.Topic: {partitionID: kgo.NewOffset().AtStart()}, }), diff --git a/pkg/kafka/testkafka/cluster.go b/pkg/kafka/testkafka/cluster.go index cc5847c2bfd35..c70e3da4a71cb 100644 --- a/pkg/kafka/testkafka/cluster.go +++ b/pkg/kafka/testkafka/cluster.go @@ -16,8 +16,8 @@ import ( ) // CreateCluster returns a fake Kafka cluster for unit testing. -func CreateCluster(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, kafka.Config) { - cluster, addr := CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, topicName) +func CreateCluster(t testing.TB, numPartitions int32, topicName string, opts ...kfake.Opt) (*kfake.Cluster, kafka.Config) { + cluster, addr := CreateClusterWithoutCustomConsumerGroupsSupport(t, numPartitions, topicName, opts...) addSupportForConsumerGroups(t, cluster, topicName, numPartitions) return cluster, createTestKafkaConfig(addr, topicName) @@ -34,8 +34,16 @@ func createTestKafkaConfig(clusterAddr, topicName string) kafka.Config { return cfg } -func CreateClusterWithoutCustomConsumerGroupsSupport(t testing.TB, numPartitions int32, topicName string) (*kfake.Cluster, string) { - cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(numPartitions, topicName)) +func CreateClusterWithoutCustomConsumerGroupsSupport(t testing.TB, numPartitions int32, topicName string, opts ...kfake.Opt) (*kfake.Cluster, string) { + cfg := []kfake.Opt{ + kfake.NumBrokers(1), + kfake.SeedTopics(numPartitions, topicName), + } + + // Apply options. + cfg = append(cfg, opts...) + + cluster, err := kfake.NewCluster(cfg...) require.NoError(t, err) t.Cleanup(cluster.Close) diff --git a/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go b/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go new file mode 100644 index 0000000000000..97a9369d13723 --- /dev/null +++ b/vendor/github.com/twmb/franz-go/pkg/sasl/plain/plain.go @@ -0,0 +1,60 @@ +// Package plain provides PLAIN sasl authentication as specified in RFC4616. +package plain + +import ( + "context" + "errors" + + "github.com/twmb/franz-go/pkg/sasl" +) + +// Auth contains information for authentication. +type Auth struct { + // Zid is an optional authorization ID to use in authenticating. + Zid string + + // User is username to use for authentication. + User string + + // Pass is the password to use for authentication. + Pass string + + _ struct{} // require explicit field initialization +} + +// AsMechanism returns a sasl mechanism that will use 'a' as credentials for +// all sasl sessions. +// +// This is a shortcut for using the Plain function and is useful when you do +// not need to live-rotate credentials. +func (a Auth) AsMechanism() sasl.Mechanism { + return Plain(func(context.Context) (Auth, error) { + return a, nil + }) +} + +// Plain returns a sasl mechanism that will call authFn whenever sasl +// authentication is needed. The returned Auth is used for a single session. +func Plain(authFn func(context.Context) (Auth, error)) sasl.Mechanism { + return plain(authFn) +} + +type plain func(context.Context) (Auth, error) + +func (plain) Name() string { return "PLAIN" } +func (fn plain) Authenticate(ctx context.Context, _ string) (sasl.Session, []byte, error) { + auth, err := fn(ctx) + if err != nil { + return nil, nil, err + } + if auth.User == "" || auth.Pass == "" { + return nil, nil, errors.New("PLAIN user and pass must be non-empty") + } + return session{}, []byte(auth.Zid + "\x00" + auth.User + "\x00" + auth.Pass), nil +} + +type session struct{} + +func (session) Challenge([]byte) (bool, []byte, error) { + return true, nil, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 49e7bb611899f..8e8e074487f93 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1602,6 +1602,7 @@ github.com/twmb/franz-go/pkg/kgo github.com/twmb/franz-go/pkg/kgo/internal/sticky github.com/twmb/franz-go/pkg/kversion github.com/twmb/franz-go/pkg/sasl +github.com/twmb/franz-go/pkg/sasl/plain # github.com/twmb/franz-go/pkg/kadm v1.13.0 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kadm