Skip to content

Commit

Permalink
publication data mode for Kafka consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Nov 13, 2024
1 parent faf559f commit a121c28
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 20 deletions.
36 changes: 35 additions & 1 deletion internal/api/consuming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/centrifugal/centrifugo/v5/internal/apiproto"

"github.com/centrifugal/centrifuge"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// ConsumingHandlerConfig configures ConsumingHandler.
Expand All @@ -32,7 +34,39 @@ func NewConsumingHandler(n *centrifuge.Node, apiExecutor *Executor, c ConsumingH
}

func (h *ConsumingHandler) logNonRetryableConsumingError(err error, method string) {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "non retryable error during consuming", map[string]any{"error": err.Error(), "method": method}))
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "non retryable error during consuming, skip message", map[string]any{"error": err.Error(), "method": method}))
}

func (h *ConsumingHandler) Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error {
res, err := h.broadcastRequest(ctx, req)
if err != nil {
var apiError *apiproto.Error
if errors.As(err, &apiError) && apiError.Code == apiproto.ErrorInternal.Code {
return err
}
h.logNonRetryableConsumingError(err, "publication_data_broadcast")
return nil
}
for _, resp := range res.Responses {
if resp.Error != nil && resp.Error.Code == apiproto.ErrorInternal.Code {
// Any internal error in any channel response will result into a retry by a consumer.
// To prevent duplicate messages publishers may use idempotency keys.
return resp.Error
}
}
return nil
}

func (h *ConsumingHandler) broadcastRequest(ctx context.Context, req *apiproto.BroadcastRequest) (*apiproto.BroadcastResult, error) {
resp := h.api.Broadcast(ctx, req)
if h.config.UseOpenTelemetry && resp.Error != nil {
span := trace.SpanFromContext(ctx)
span.SetStatus(codes.Error, resp.Error.Error())
}
if resp.Error != nil {
return nil, resp.Error
}
return resp.Result, nil
}

// Dispatch processes commands received from asynchronous consumers.
Expand Down
13 changes: 13 additions & 0 deletions internal/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ func (c Config) Validate() error {
if !slices.Contains(configtypes.KnownConsumerTypes, config.Type) {
return fmt.Errorf("unknown consumer type: %s", config.Type)
}
if config.Enabled {
switch config.Type {
case configtypes.ConsumerTypeKafka:
if err := config.Kafka.Validate(); err != nil {
return fmt.Errorf("in consumer %s (kafka): %w", config.Name, err)
}
case configtypes.ConsumerTypePostgres:
if err := config.Postgres.Validate(); err != nil {
return fmt.Errorf("in consumer %s (postgres): %w", config.Name, err)
}
default:
}
}
consumerNames = append(consumerNames, config.Name)
}

Expand Down
41 changes: 41 additions & 0 deletions internal/configtypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package configtypes
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
)
Expand Down Expand Up @@ -588,6 +589,16 @@ type PostgresConsumerConfig struct {
TLS TLSConfig `mapstructure:"tls" json:"tls" envconfig:"tls" yaml:"tls" toml:"tls"`
}

func (c PostgresConsumerConfig) Validate() error {
if c.DSN == "" {
return errors.New("no Postgres DSN provided")
}
if c.OutboxTableName == "" {
return errors.New("no Postgres outbox table name provided")
}
return nil
}

// KafkaConsumerConfig is a configuration for Kafka async consumer.
type KafkaConsumerConfig struct {
Brokers []string `mapstructure:"brokers" json:"brokers" envconfig:"brokers" yaml:"brokers" toml:"brokers"`
Expand All @@ -608,4 +619,34 @@ type KafkaConsumerConfig struct {
// will pause fetching records from Kafka. By default, this is 16.
// Set to -1 to use non-buffered channel.
PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size" envconfig:"partition_buffer_size" default:"16" yaml:"partition_buffer_size" toml:"partition_buffer_size"`

// PublicationDataMode is a configuration for the mode where message payload already contains data ready to publish into channels, instead of API command.
PublicationDataMode KafkaPublicationModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"`
}

func (c KafkaConsumerConfig) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("no Kafka brokers provided")
}
if len(c.Topics) == 0 {
return errors.New("no Kafka topics provided")
}
if c.ConsumerGroup == "" {
return errors.New("no Kafka consumer group provided")
}
if c.PublicationDataMode.Enabled && c.PublicationDataMode.ChannelsHeaderName == "" {
return errors.New("no Kafka channels_header_name provided for publication data mode")
}
return nil
}

type KafkaPublicationModeConfig struct {
// Enabled enables Kafka publication data mode for the Kafka consumer.
Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"`
// ChannelsHeaderName is a header name to extract publication channels (channels must be comma-separated).
ChannelsHeaderName string `mapstructure:"channels_header_name" json:"channels_header_name" envconfig:"channels_header_name" yaml:"channels_header_name" toml:"channels_header_name"`
// IdempotencyKeyHeaderName is a header name to extract idempotency key from Kafka message.
IdempotencyKeyHeaderName string `mapstructure:"idempotency_key_header_name" json:"idempotency_key_header_name" envconfig:"idempotency_key_header_name" yaml:"idempotency_key_header_name" toml:"idempotency_key_header_name"`
// DeltaHeaderName is a header name to extract delta flag from Kafka message.
DeltaHeaderName string `mapstructure:"delta_header_name" json:"delta_header_name" envconfig:"delta_header_name" yaml:"delta_header_name" toml:"delta_header_name"`
}
2 changes: 2 additions & 0 deletions internal/consuming/consuming.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/centrifugal/centrifugo/v5/internal/apiproto"
"github.com/centrifugal/centrifugo/v5/internal/configtypes"
"github.com/centrifugal/centrifugo/v5/internal/service"

Expand All @@ -15,6 +16,7 @@ type ConsumerConfig = configtypes.Consumer

type Dispatcher interface {
Dispatch(ctx context.Context, method string, data []byte) error
Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error
}

type Logger interface {
Expand Down
67 changes: 56 additions & 11 deletions internal/consuming/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"

"github.com/centrifugal/centrifugo/v5/internal/apiproto"
"github.com/centrifugal/centrifugo/v5/internal/configtypes"

"github.com/centrifugal/centrifuge"
Expand Down Expand Up @@ -336,6 +339,7 @@ func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned m
cl: cl,
topic: topic,
partition: partition,
config: c.config,

quit: make(chan struct{}),
done: make(chan struct{}),
Expand Down Expand Up @@ -387,12 +391,61 @@ type partitionConsumer struct {
cl *kgo.Client
topic string
partition int32
config KafkaConfig

quit chan struct{}
done chan struct{}
recs chan kgo.FetchTopicPartition
}

func getHeaderValue(record *kgo.Record, headerKey string) string {
if headerKey == "" {
return ""
}
for _, header := range record.Headers {
if header.Key == headerKey {
return string(header.Value)
}
}
return ""
}

func (pc *partitionConsumer) processPublicationDataRecord(ctx context.Context, record *kgo.Record) error {
var delta bool
if pc.config.PublicationDataMode.DeltaHeaderName != "" {
var err error
delta, err = strconv.ParseBool(getHeaderValue(record, pc.config.PublicationDataMode.DeltaHeaderName))
if err != nil {
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error parsing delta header value, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition}))
return nil
}
}
req := &apiproto.BroadcastRequest{
Data: record.Value,
Channels: strings.Split(getHeaderValue(record, pc.config.PublicationDataMode.ChannelsHeaderName), ","),
IdempotencyKey: getHeaderValue(record, pc.config.PublicationDataMode.IdempotencyKeyHeaderName),
Delta: delta,
}
return pc.dispatcher.Broadcast(ctx, req)
}

func (pc *partitionConsumer) processAPICommandRecord(ctx context.Context, record *kgo.Record) error {
var e KafkaJSONEvent
err := json.Unmarshal(record.Value, &e)
if err != nil {
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling event from Kafka, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition}))
return nil
}
return pc.dispatcher.Dispatch(ctx, e.Method, e.Payload)
}

func (pc *partitionConsumer) processRecord(ctx context.Context, record *kgo.Record) error {
if pc.config.PublicationDataMode.Enabled {
return pc.processPublicationDataRecord(ctx, record)
}
return pc.processAPICommandRecord(ctx, record)
}

func (pc *partitionConsumer) processRecords(records []*kgo.Record) {
for _, record := range records {
select {
Expand All @@ -403,28 +456,20 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) {
default:
}

var e KafkaJSONEvent
err := json.Unmarshal(record.Value, &e)
if err != nil {
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling event from Kafka", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition}))
pc.cl.MarkCommitRecords(record)
continue
}

var backoffDuration time.Duration = 0
retries := 0
for {
err := pc.dispatcher.Dispatch(pc.clientCtx, e.Method, e.Payload)
err := pc.processRecord(pc.clientCtx, record)
if err == nil {
if retries > 0 {
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing events after errors", map[string]any{}))
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing message after errors", map[string]any{"topic": record.Topic, "partition": record.Partition}))
}
pc.cl.MarkCommitRecords(record)
break
}
retries++
backoffDuration = getNextBackoffDuration(backoffDuration, retries)
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed event", map[string]any{"error": err.Error(), "method": e.Method, "nextAttemptIn": backoffDuration.String()}))
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed message", map[string]any{"error": err.Error(), "nextAttemptIn": backoffDuration.String(), "topic": record.Topic, "partition": record.Partition}))
select {
case <-time.After(backoffDuration):
case <-pc.quit:
Expand Down
Loading

0 comments on commit a121c28

Please sign in to comment.