Skip to content

Commit

Permalink
feat(kafka): Add support for SASL auth to Kafka (#14487)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Oct 15, 2024
1 parent 3e451c7 commit e2a209c
Show file tree
Hide file tree
Showing 15 changed files with 387 additions and 107 deletions.
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,16 @@ kafka_config:
# CLI flag: -kafka.write-timeout
[write_timeout: <duration> | default = 10s]

# The SASL username for authentication to Kafka using the PLAIN mechanism.
# Both username and password must be set.
# CLI flag: -kafka.sasl-username
[sasl_username: <string> | default = ""]

# The SASL password for authentication to Kafka using the PLAIN mechanism.
# Both username and password must be set.
# CLI flag: -kafka.sasl-password
[sasl_password: <string> | default = ""]

# The consumer group used by the consumer to track the last consumed offset.
# The consumer group must be different for each ingester. If the configured
# consumer group contains the '<partition>' placeholder, it is replaced with
Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
Expand Down Expand Up @@ -234,11 +235,11 @@ func New(

var kafkaWriter KafkaProducer
if cfg.KafkaEnabled {
kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
kafkaClient, err := kafka_client.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
if err != nil {
return nil, fmt.Errorf("failed to start kafka client: %w", err)
}
kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
prometheus.WrapRegistererWithPrefix("_kafka_", registerer))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/logger.go → pkg/kafka/client/logger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-only

package kafka
package client

import (
"github.com/go-kit/log"
Expand Down
38 changes: 35 additions & 3 deletions pkg/kafka/reader_client.go → pkg/kafka/client/reader_client.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
// SPDX-License-Identifier: AGPL-3.0-only

package kafka
package client

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/loki/v3/pkg/kafka"
)

// NewReaderClient returns the kgo.Client that should be used by the Reader.
func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) {
func NewReaderClient(kafkaCfg kafka.Config, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*kgo.Client, error) {
const fetchMaxBytes = 100_000_000

opts = append(opts, commonKafkaClientOptions(kafkaCfg, metrics, logger)...)
Expand All @@ -33,7 +39,7 @@ func NewReaderClient(kafkaCfg Config, metrics *kprom.Metrics, logger log.Logger,
return nil, errors.Wrap(err, "creating kafka client")
}
if kafkaCfg.AutoCreateTopicEnabled {
kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger)
setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger)
}
return client, nil
}
Expand All @@ -44,3 +50,29 @@ func NewReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.
// Do not export the client ID, because we use it to specify options to the backend.
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
}

// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
// This is best-effort, if setting the option fails, error is logged, but not returned.
func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo.Client, logger log.Logger) {
if cfg.AutoCreateTopicDefaultPartitions <= 0 {
return
}

// Note: this client doesn't get closed because it is owned by the caller
adm := kadm.NewClient(cl)

defaultNumberOfPartitions := fmt.Sprintf("%d", cfg.AutoCreateTopicDefaultPartitions)
_, err := adm.AlterBrokerConfigsState(context.Background(), []kadm.AlterConfig{
{
Op: kadm.SetConfig,
Name: "num.partitions",
Value: &defaultNumberOfPartitions,
},
})
if err != nil {
level.Error(logger).Log("msg", "failed to alter default number of partitions", "err", err)
return
}

level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions)
}
104 changes: 104 additions & 0 deletions pkg/kafka/client/reader_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package client

import (
"context"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
)

func TestNewReaderClient(t *testing.T) {
_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password"))

tests := []struct {
name string
config kafka.Config
wantErr bool
}{
{
name: "valid config",
config: kafka.Config{
Address: addr,
Topic: "abcd",
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: false,
},
{
name: "wrong password",
config: kafka.Config{
Address: addr,
Topic: "abcd",
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("wrong wrong wrong"),
},
wantErr: true,
},
{
name: "wrong username",
config: kafka.Config{
Address: addr,
Topic: "abcd",
SASLUsername: "wrong wrong wrong",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewReaderClient(tt.config, nil, nil)
require.NoError(t, err)

err = client.Ping(context.Background())
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

func TestSetDefaultNumberOfPartitionsForAutocreatedTopics(t *testing.T) {
cluster, err := kfake.NewCluster(kfake.NumBrokers(1))
require.NoError(t, err)
t.Cleanup(cluster.Close)

addrs := cluster.ListenAddrs()
require.Len(t, addrs, 1)

cfg := kafka.Config{
Address: addrs[0],
AutoCreateTopicDefaultPartitions: 100,
}

cluster.ControlKey(kmsg.AlterConfigs.Int16(), func(request kmsg.Request) (kmsg.Response, error, bool) {
r := request.(*kmsg.AlterConfigsRequest)

require.Len(t, r.Resources, 1)
res := r.Resources[0]
require.Equal(t, kmsg.ConfigResourceTypeBroker, res.ResourceType)
require.Len(t, res.Configs, 1)
cfg := res.Configs[0]
require.Equal(t, "num.partitions", cfg.Name)
require.NotNil(t, *cfg.Value)
require.Equal(t, "100", *cfg.Value)

return &kmsg.AlterConfigsResponse{}, nil, true
})

client, err := kgo.NewClient(commonKafkaClientOptions(cfg, nil, log.NewNopLogger())...)
require.NoError(t, err)

setDefaultNumberOfPartitionsForAutocreatedTopics(cfg, client, log.NewNopLogger())
}
36 changes: 30 additions & 6 deletions pkg/kafka/writer_client.go → pkg/kafka/client/writer_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package client

import (
"context"
Expand All @@ -13,20 +13,30 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/plugin/kotel"
"github.com/twmb/franz-go/plugin/kprom"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/util/constants"
)

var (
// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
// You can think about this overhead as an extra time for requests sitting in the client's buffer
// before being sent on the wire and the actual time it takes to send it over the network and
// start being processed by Kafka.
writerRequestTimeoutOverhead = 2 * time.Second
)

// NewWriterClient returns the kgo.Client that should be used by the Writer.
//
// The input prometheus.Registerer must be wrapped with a prefix (the names of metrics
// registered don't have a prefix).
func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) {
func NewWriterClient(kafkaCfg kafka.Config, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) {
// Do not export the client ID, because we use it to specify options to the backend.
metrics := kprom.NewMetrics(
"", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix.
Expand All @@ -42,7 +52,7 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log
kgo.RecordPartitioner(kgo.ManualPartitioner()),

// Set the upper bounds the size of a record batch.
kgo.ProducerBatchMaxBytes(producerBatchMaxBytes),
kgo.ProducerBatchMaxBytes(kafka.ProducerBatchMaxBytes),

// By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency
// (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher
Expand Down Expand Up @@ -81,10 +91,14 @@ func NewWriterClient(kafkaCfg Config, maxInflightProduceRequests int, logger log
kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited".
kgo.MaxBufferedBytes(0),
)
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, err
}
if kafkaCfg.AutoCreateTopicEnabled {
kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger)
setDefaultNumberOfPartitionsForAutocreatedTopics(kafkaCfg, client, logger)
}
return kgo.NewClient(opts...)
return client, nil
}

type onlySampledTraces struct {
Expand All @@ -99,7 +113,7 @@ func (o onlySampledTraces) Inject(ctx context.Context, carrier propagation.TextM
o.TextMapPropagator.Inject(ctx, carrier)
}

func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
func commonKafkaClientOptions(cfg kafka.Config, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
opts := []kgo.Opt{
kgo.ClientID(cfg.ClientID),
kgo.SeedBrokers(cfg.Address),
Expand Down Expand Up @@ -139,6 +153,16 @@ func commonKafkaClientOptions(cfg Config, metrics *kprom.Metrics, logger log.Log
}),
}

// SASL plain auth.
if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" {
opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) {
return plain.Auth{
User: cfg.SASLUsername,
Pass: cfg.SASLPassword.String(),
}, nil
})))
}

if cfg.AutoCreateTopicEnabled {
opts = append(opts, kgo.AllowAutoTopicCreation())
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/kafka/client/writer_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package client

import (
"context"
"testing"
"time"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kfake"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
)

func TestNewWriterClient(t *testing.T) {
_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "test", kfake.EnableSASL(), kfake.Superuser("PLAIN", "user", "password"))

tests := []struct {
name string
config kafka.Config
wantErr bool
}{
{
name: "valid config",
config: kafka.Config{
Address: addr,
Topic: "abcd",
WriteTimeout: time.Second,
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: false,
},
{
name: "wrong password",
config: kafka.Config{
Address: addr,
Topic: "abcd",
WriteTimeout: time.Second,
SASLUsername: "user",
SASLPassword: flagext.SecretWithValue("wrong wrong wrong"),
},
wantErr: true,
},
{
name: "wrong username",
config: kafka.Config{
Address: addr,
Topic: "abcd",
WriteTimeout: time.Second,
SASLUsername: "wrong wrong wrong",
SASLPassword: flagext.SecretWithValue("password"),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewWriterClient(tt.config, 10, nil, nil)
require.NoError(t, err)

err = client.Ping(context.Background())
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Loading

0 comments on commit e2a209c

Please sign in to comment.