diff --git a/internal/api/consuming.go b/internal/api/consuming.go index 02811a6ae8..af157118c3 100644 --- a/internal/api/consuming.go +++ b/internal/api/consuming.go @@ -7,6 +7,8 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/centrifugal/centrifuge" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) // ConsumingHandlerConfig configures ConsumingHandler. @@ -32,7 +34,39 @@ func NewConsumingHandler(n *centrifuge.Node, apiExecutor *Executor, c ConsumingH } func (h *ConsumingHandler) logNonRetryableConsumingError(err error, method string) { - h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "non retryable error during consuming", map[string]any{"error": err.Error(), "method": method})) + h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "non retryable error during consuming, skip message", map[string]any{"error": err.Error(), "method": method})) +} + +func (h *ConsumingHandler) Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error { + res, err := h.broadcastRequest(ctx, req) + if err != nil { + var apiError *apiproto.Error + if errors.As(err, &apiError) && apiError.Code == apiproto.ErrorInternal.Code { + return err + } + h.logNonRetryableConsumingError(err, "publication_data_broadcast") + return nil + } + for _, resp := range res.Responses { + if resp.Error != nil && resp.Error.Code == apiproto.ErrorInternal.Code { + // Any internal error in any channel response will result into a retry by a consumer. + // To prevent duplicate messages publishers may use idempotency keys. + return resp.Error + } + } + return nil +} + +func (h *ConsumingHandler) broadcastRequest(ctx context.Context, req *apiproto.BroadcastRequest) (*apiproto.BroadcastResult, error) { + resp := h.api.Broadcast(ctx, req) + if h.config.UseOpenTelemetry && resp.Error != nil { + span := trace.SpanFromContext(ctx) + span.SetStatus(codes.Error, resp.Error.Error()) + } + if resp.Error != nil { + return nil, resp.Error + } + return resp.Result, nil } // Dispatch processes commands received from asynchronous consumers. diff --git a/internal/config/validate.go b/internal/config/validate.go index 04a527d8f2..6f86732c94 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -118,6 +118,19 @@ func (c Config) Validate() error { if !slices.Contains(configtypes.KnownConsumerTypes, config.Type) { return fmt.Errorf("unknown consumer type: %s", config.Type) } + if config.Enabled { + switch config.Type { + case configtypes.ConsumerTypeKafka: + if err := config.Kafka.Validate(); err != nil { + return fmt.Errorf("in consumer %s (kafka): %w", config.Name, err) + } + case configtypes.ConsumerTypePostgres: + if err := config.Postgres.Validate(); err != nil { + return fmt.Errorf("in consumer %s (postgres): %w", config.Name, err) + } + default: + } + } consumerNames = append(consumerNames, config.Name) } diff --git a/internal/configtypes/types.go b/internal/configtypes/types.go index a1e50c01f0..4c3a08f951 100644 --- a/internal/configtypes/types.go +++ b/internal/configtypes/types.go @@ -3,6 +3,7 @@ package configtypes import ( "context" "encoding/json" + "errors" "fmt" "net" ) @@ -588,6 +589,16 @@ type PostgresConsumerConfig struct { TLS TLSConfig `mapstructure:"tls" json:"tls" envconfig:"tls" yaml:"tls" toml:"tls"` } +func (c PostgresConsumerConfig) Validate() error { + if c.DSN == "" { + return errors.New("no Postgres DSN provided") + } + if c.OutboxTableName == "" { + return errors.New("no Postgres outbox table name provided") + } + return nil +} + // KafkaConsumerConfig is a configuration for Kafka async consumer. type KafkaConsumerConfig struct { Brokers []string `mapstructure:"brokers" json:"brokers" envconfig:"brokers" yaml:"brokers" toml:"brokers"` @@ -608,4 +619,34 @@ type KafkaConsumerConfig struct { // will pause fetching records from Kafka. By default, this is 16. // Set to -1 to use non-buffered channel. PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size" envconfig:"partition_buffer_size" default:"16" yaml:"partition_buffer_size" toml:"partition_buffer_size"` + + // PublicationDataMode is a configuration for the mode where message payload already contains data ready to publish into channels, instead of API command. + PublicationDataMode KafkaPublicationModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"` +} + +func (c KafkaConsumerConfig) Validate() error { + if len(c.Brokers) == 0 { + return errors.New("no Kafka brokers provided") + } + if len(c.Topics) == 0 { + return errors.New("no Kafka topics provided") + } + if c.ConsumerGroup == "" { + return errors.New("no Kafka consumer group provided") + } + if c.PublicationDataMode.Enabled && c.PublicationDataMode.ChannelsHeaderName == "" { + return errors.New("no Kafka channels_header_name provided for publication data mode") + } + return nil +} + +type KafkaPublicationModeConfig struct { + // Enabled enables Kafka publication data mode for the Kafka consumer. + Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"` + // ChannelsHeaderName is a header name to extract publication channels (channels must be comma-separated). + ChannelsHeaderName string `mapstructure:"channels_header_name" json:"channels_header_name" envconfig:"channels_header_name" yaml:"channels_header_name" toml:"channels_header_name"` + // IdempotencyKeyHeaderName is a header name to extract idempotency key from Kafka message. + IdempotencyKeyHeaderName string `mapstructure:"idempotency_key_header_name" json:"idempotency_key_header_name" envconfig:"idempotency_key_header_name" yaml:"idempotency_key_header_name" toml:"idempotency_key_header_name"` + // DeltaHeaderName is a header name to extract delta flag from Kafka message. + DeltaHeaderName string `mapstructure:"delta_header_name" json:"delta_header_name" envconfig:"delta_header_name" yaml:"delta_header_name" toml:"delta_header_name"` } diff --git a/internal/consuming/consuming.go b/internal/consuming/consuming.go index 315c039f73..e810a0ea6f 100644 --- a/internal/consuming/consuming.go +++ b/internal/consuming/consuming.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifugo/v5/internal/service" @@ -15,6 +16,7 @@ type ConsumerConfig = configtypes.Consumer type Dispatcher interface { Dispatch(ctx context.Context, method string, data []byte) error + Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error } type Logger interface { diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index b491ffc4e6..008050b43a 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -7,9 +7,12 @@ import ( "errors" "fmt" "net" + "strconv" + "strings" "sync" "time" + "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifuge" @@ -336,6 +339,7 @@ func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned m cl: cl, topic: topic, partition: partition, + config: c.config, quit: make(chan struct{}), done: make(chan struct{}), @@ -387,12 +391,61 @@ type partitionConsumer struct { cl *kgo.Client topic string partition int32 + config KafkaConfig quit chan struct{} done chan struct{} recs chan kgo.FetchTopicPartition } +func getHeaderValue(record *kgo.Record, headerKey string) string { + if headerKey == "" { + return "" + } + for _, header := range record.Headers { + if header.Key == headerKey { + return string(header.Value) + } + } + return "" +} + +func (pc *partitionConsumer) processPublicationDataRecord(ctx context.Context, record *kgo.Record) error { + var delta bool + if pc.config.PublicationDataMode.DeltaHeaderName != "" { + var err error + delta, err = strconv.ParseBool(getHeaderValue(record, pc.config.PublicationDataMode.DeltaHeaderName)) + if err != nil { + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error parsing delta header value, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) + return nil + } + } + req := &apiproto.BroadcastRequest{ + Data: record.Value, + Channels: strings.Split(getHeaderValue(record, pc.config.PublicationDataMode.ChannelsHeaderName), ","), + IdempotencyKey: getHeaderValue(record, pc.config.PublicationDataMode.IdempotencyKeyHeaderName), + Delta: delta, + } + return pc.dispatcher.Broadcast(ctx, req) +} + +func (pc *partitionConsumer) processAPICommandRecord(ctx context.Context, record *kgo.Record) error { + var e KafkaJSONEvent + err := json.Unmarshal(record.Value, &e) + if err != nil { + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling event from Kafka, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) + return nil + } + return pc.dispatcher.Dispatch(ctx, e.Method, e.Payload) +} + +func (pc *partitionConsumer) processRecord(ctx context.Context, record *kgo.Record) error { + if pc.config.PublicationDataMode.Enabled { + return pc.processPublicationDataRecord(ctx, record) + } + return pc.processAPICommandRecord(ctx, record) +} + func (pc *partitionConsumer) processRecords(records []*kgo.Record) { for _, record := range records { select { @@ -403,28 +456,20 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { default: } - var e KafkaJSONEvent - err := json.Unmarshal(record.Value, &e) - if err != nil { - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling event from Kafka", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) - pc.cl.MarkCommitRecords(record) - continue - } - var backoffDuration time.Duration = 0 retries := 0 for { - err := pc.dispatcher.Dispatch(pc.clientCtx, e.Method, e.Payload) + err := pc.processRecord(pc.clientCtx, record) if err == nil { if retries > 0 { - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing events after errors", map[string]any{})) + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing message after errors", map[string]any{"topic": record.Topic, "partition": record.Partition})) } pc.cl.MarkCommitRecords(record) break } retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed event", map[string]any{"error": err.Error(), "method": e.Method, "nextAttemptIn": backoffDuration.String()})) + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed message", map[string]any{"error": err.Error(), "nextAttemptIn": backoffDuration.String(), "topic": record.Topic, "partition": record.Partition})) select { case <-time.After(backoffDuration): case <-pc.quit: diff --git a/internal/consuming/kafka_test.go b/internal/consuming/kafka_test.go index 60e782aff6..280f863f61 100644 --- a/internal/consuming/kafka_test.go +++ b/internal/consuming/kafka_test.go @@ -11,6 +11,10 @@ import ( "testing" "time" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" + + "github.com/centrifugal/centrifugo/v5/internal/apiproto" + "github.com/centrifugal/centrifuge" "github.com/google/uuid" "github.com/stretchr/testify/require" @@ -24,13 +28,18 @@ const ( // MockDispatcher implements the Dispatcher interface for testing. type MockDispatcher struct { - onDispatch func(ctx context.Context, method string, data []byte) error + onDispatch func(ctx context.Context, method string, data []byte) error + onBroadcast func(ctx context.Context, req *apiproto.BroadcastRequest) error } func (m *MockDispatcher) Dispatch(ctx context.Context, method string, data []byte) error { return m.onDispatch(ctx, method, data) } +func (m *MockDispatcher) Broadcast(ctx context.Context, req *apiproto.BroadcastRequest) error { + return m.onBroadcast(ctx, req) +} + // MockLogger implements the Logger interface for testing. type MockLogger struct { // Add necessary fields to simulate behavior or record calls @@ -44,7 +53,7 @@ func (m *MockLogger) Log(_ centrifuge.LogEntry) { // Implement mock logic, e.g., storing log entries for assertions } -func produceTestMessage(topic string, message []byte) error { +func produceTestMessage(topic string, message []byte, headers []kgo.RecordHeader) error { // Create a new client client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL)) if err != nil { @@ -53,7 +62,12 @@ func produceTestMessage(topic string, message []byte) error { defer client.Close() // Produce a message - err = client.ProduceSync(context.Background(), &kgo.Record{Topic: topic, Partition: 0, Value: message}).FirstErr() + err = client.ProduceSync(context.Background(), &kgo.Record{ + Topic: topic, + Partition: 0, + Value: message, + Headers: headers, + }).FirstErr() if err != nil { return fmt.Errorf("failed to produce message: %w", err) } @@ -162,7 +176,7 @@ func TestKafkaConsumer_GreenScenario(t *testing.T) { Payload: JSONRawOrString(testPayload), } testMessage, _ := json.Marshal(testEvent) - err = produceTestMessage(testKafkaTopic, testMessage) + err = produceTestMessage(testKafkaTopic, testMessage, nil) require.NoError(t, err) waitCh(t, eventReceived, 30*time.Second, "timeout waiting for event") @@ -214,7 +228,7 @@ func TestKafkaConsumer_SeveralConsumers(t *testing.T) { Payload: JSONRawOrString(testPayload), } testMessage, _ := json.Marshal(testEvent) - err = produceTestMessage(testKafkaTopic, testMessage) + err = produceTestMessage(testKafkaTopic, testMessage, nil) require.NoError(t, err) waitCh(t, eventReceived, 30*time.Second, "timeout waiting for event") @@ -272,7 +286,7 @@ func TestKafkaConsumer_RetryAfterDispatchError(t *testing.T) { Payload: JSONRawOrString(testPayload), } testMessage, _ := json.Marshal(testEvent) - err = produceTestMessage(testKafkaTopic, testMessage) + err = produceTestMessage(testKafkaTopic, testMessage, nil) require.NoError(t, err) waitCh(t, successCh, 30*time.Second, "timeout waiting for successful event process") @@ -332,12 +346,12 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherTopic(t *testing.T) { require.NoError(t, err) go func() { - err = produceTestMessage(testKafkaTopic1, testPayload1) + err = produceTestMessage(testKafkaTopic1, testPayload1, nil) require.NoError(t, err) // Wait until the first message is received to make sure messages read by separate PollRecords calls. <-event1Received - err = produceTestMessage(testKafkaTopic2, testPayload2) + err = produceTestMessage(testKafkaTopic2, testPayload2, nil) require.NoError(t, err) }() @@ -427,3 +441,71 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T }) } } + +func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) { + t.Parallel() + testKafkaTopic := "centrifugo_consumer_test_" + uuid.New().String() + testChannels := []string{"channel1", "channel2"} + testPayload := []byte(`{"key":"value"}`) + testIdempotencyKey := "test-idempotency-key" + const testDelta = true + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := createTestTopic(ctx, testKafkaTopic, 1, 1) + require.NoError(t, err) + + config := KafkaConfig{ + Brokers: []string{testKafkaBrokerURL}, // Adjust as needed + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + PublicationDataMode: configtypes.KafkaPublicationModeConfig{ + Enabled: true, + ChannelsHeaderName: "centrifugo-channels", + IdempotencyKeyHeaderName: "centrifugo-idempotency-key", + DeltaHeaderName: "centrifugo-delta", + }, + } + + eventReceived := make(chan struct{}) + consumerClosed := make(chan struct{}) + + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{ + onBroadcast: func(ctx context.Context, req *apiproto.BroadcastRequest) error { + require.Equal(t, testChannels, req.Channels) + require.Equal(t, testPayload, req.Data) + require.Equal(t, testIdempotencyKey, req.IdempotencyKey) + require.Equal(t, testDelta, req.Delta) + close(eventReceived) + return nil + }, + }, config) + require.NoError(t, err) + + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() + + err = produceTestMessage(testKafkaTopic, testPayload, []kgo.RecordHeader{ + { + Key: config.PublicationDataMode.ChannelsHeaderName, + Value: []byte(strings.Join(testChannels, ",")), + }, + { + Key: config.PublicationDataMode.IdempotencyKeyHeaderName, + Value: []byte(testIdempotencyKey), + }, + { + Key: config.PublicationDataMode.DeltaHeaderName, + Value: []byte(fmt.Sprintf("%v", testDelta)), + }, + }) + require.NoError(t, err) + + waitCh(t, eventReceived, 30*time.Second, "timeout waiting for event") + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") +}