Skip to content

Commit

Permalink
feat: dead letter topics (#3830)
Browse files Browse the repository at this point in the history
[in progress]
closes #3490
  • Loading branch information
matt2e authored Dec 20, 2024
1 parent e085fc2 commit 2041cb0
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 63 deletions.
98 changes: 70 additions & 28 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
Expand All @@ -11,10 +12,12 @@ import (
"github.com/IBM/sarama"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"
"github.com/jpillora/backoff"

"github.com/block/ftl/backend/controller/observability"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/backend/timeline"
"github.com/block/ftl/common/encoding"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/channels"
Expand All @@ -23,19 +26,20 @@ import (
)

type consumer struct {
moduleName string
deployment model.DeploymentKey
verb *schema.Verb
subscriber *schema.MetadataSubscriber
retryParams schema.RetryParams
group sarama.ConsumerGroup
cancel context.CancelFunc
moduleName string
deployment model.DeploymentKey
verb *schema.Verb
subscriber *schema.MetadataSubscriber
retryParams schema.RetryParams
group sarama.ConsumerGroup
deadLetterPublisher optional.Option[*publisher]

verbClient VerbClient
timelineClient *timeline.Client
}

func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.MetadataSubscriber, deployment model.DeploymentKey, verbClient VerbClient, timelineClient *timeline.Client) (*consumer, error) {
func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.MetadataSubscriber, deployment model.DeploymentKey,
deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient *timeline.Client) (*consumer, error) {
if verb.Runtime == nil {
return nil, fmt.Errorf("subscription %s has no runtime", verb.Name)
}
Expand All @@ -60,11 +64,12 @@ func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.Metada
}

c := &consumer{
moduleName: moduleName,
deployment: deployment,
verb: verb,
subscriber: subscriber,
group: group,
moduleName: moduleName,
deployment: deployment,
verb: verb,
subscriber: subscriber,
group: group,
deadLetterPublisher: deadLetterPublisher,

verbClient: verbClient,
timelineClient: timelineClient,
Expand Down Expand Up @@ -93,13 +98,7 @@ func (c *consumer) kafkaTopicID() string {

func (c *consumer) Begin(ctx context.Context) error {
// set up config
logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name)
ctx = log.ContextWithLogger(ctx, logger)

logger.Debugf("Subscribing to %s", c.kafkaTopicID())

ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
log.FromContext(ctx).Debugf("Starting subscription for %v", c.verb.Name)

go c.watchErrors(ctx)
go c.subscribe(ctx)
Expand All @@ -109,7 +108,7 @@ func (c *consumer) Begin(ctx context.Context) error {
func (c *consumer) watchErrors(ctx context.Context) {
logger := log.FromContext(ctx)
for err := range channels.IterContext(ctx, c.group.Errors()) {
logger.Errorf(err, "Consumer group error")
logger.Errorf(err, "Consumer group error for %v", c.verb.Name)
}
}

Expand All @@ -129,9 +128,9 @@ func (c *consumer) subscribe(ctx context.Context) {

err := c.group.Consume(ctx, []string{c.kafkaTopicID()}, c)
if err != nil {
logger.Errorf(err, "Session failed for %s", c.verb.Name)
logger.Errorf(err, "Consumer group session failed for %s", c.verb.Name)
} else {
logger.Debugf("Ending session")
logger.Debugf("Ending consumer group session for %s", c.verb.Name)
}
}
}
Expand All @@ -141,7 +140,7 @@ func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
logger := log.FromContext(session.Context())

partitions := session.Claims()[c.kafkaTopicID()]
logger.Debugf("Starting session with partitions [%v]", strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))
logger.Debugf("Starting session for %v with partitions [%v]", c.verb.Name, strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))

return nil
}
Expand All @@ -163,7 +162,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
// Channel closed, rebalance or shutdown needed
return nil
}
logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset)
logger.Debugf("Consuming message from %v[%v:%v]", c.verb.Name, msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
Expand All @@ -175,15 +174,18 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
case <-ctx.Done():
// Do not commit the message if we did not succeed and the context is done.
// No need to retry message either.
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset)
return nil
default:
}
if remainingRetries == 0 {
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset)
if !c.publishToDeadLetterTopic(ctx, msg, err) {
return nil
}
break
}
logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds()))
logger.Errorf(err, "Failed to consume message from %v[%v,%v] and will retry in %vs", c.verb.Name, msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
Expand Down Expand Up @@ -246,3 +248,43 @@ func (c *consumer) call(ctx context.Context, body []byte, partition, offset int)
observability.Calls.Request(ctx, req.Verb, start, optional.None[string]())
return nil
}

// publishToDeadLetterTopic tries to publish the message to the dead letter topic.
//
// If it does not succeed it will retry until it succeeds or the context is done.
// Returns true if the message was published or if there is no dead letter queue.
// Returns false if the context is done.
func (c *consumer) publishToDeadLetterTopic(ctx context.Context, msg *sarama.ConsumerMessage, callErr error) bool {
p, ok := c.deadLetterPublisher.Get()
if !ok {
return true
}

deadLetterEvent, err := encoding.Marshal(map[string]any{
"event": json.RawMessage(msg.Value),
"error": callErr.Error(),
})
if err != nil {
panic(fmt.Errorf("failed to marshal dead letter event for %v on partition %v and offset %v: %w", c.kafkaTopicID(), msg.Partition, msg.Offset, err))
}

bo := &backoff.Backoff{Min: time.Second, Max: 10 * time.Second}
first := true
for {
var waitDuration time.Duration
if first {
first = false
} else {
waitDuration = bo.Duration()
}
select {
case <-ctx.Done():
return false
case <-time.After(waitDuration):
}
err := p.publish(ctx, deadLetterEvent, string(msg.Key), schema.Ref{Module: c.moduleName, Name: c.verb.Name})
if err == nil {
return true
}
}
}
39 changes: 39 additions & 0 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func TestRetry(t *testing.T) {
retriesPerCall := 2
in.Run(t,
in.WithLanguages("java", "go"),

in.WithPubSub(),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
Expand All @@ -70,6 +72,9 @@ func TestRetry(t *testing.T) {

checkConsumed("subscriber", "consumeButFailAndRetry", false, retriesPerCall+1, optional.Some("firstCall")),
checkConsumed("subscriber", "consumeButFailAndRetry", false, retriesPerCall+1, optional.Some("secondCall")),
checkPublished("subscriber", "consumeButFailAndRetryFailed", 2),

in.IfLanguage("go", checkConsumed("subscriber", "consumeFromDeadLetter", true, 2, optional.None[string]())),
)
}

Expand Down Expand Up @@ -227,6 +232,40 @@ func checkConsumed(module, verb string, success bool, count int, needle optional
}
}
}

func checkPublished(module, topic string, count int) in.Action {
return func(t testing.TB, ic in.TestContext) {
in.Infof("Checking for %v published events for %s.%s", count, module, topic)
resp, err := ic.Timeline.GetTimeline(ic.Context, connect.NewRequest(&timelinepb.GetTimelineRequest{
Limit: 100000,
Filters: []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{
timelinepb.EventType_EVENT_TYPE_PUBSUB_PUBLISH,
},
},
},
},
},
}))
assert.NoError(t, err)
events := slices.Filter(slices.Map(resp.Msg.Events, func(e *timelinepb.Event) *timelinepb.PubSubPublishEvent {
return e.GetPubsubPublish()
}), func(e *timelinepb.PubSubPublishEvent) bool {
if e == nil {
return false
}
if e.Topic != topic {
return false
}
return true
})
assert.Equal(t, count, len(events), "expected %v published events", count)
}
}

func checkGroupMembership(module, subscription string, expectedCount int) in.Action {
return func(t testing.TB, ic in.TestContext) {
consumerGroup := module + "." + subscription
Expand Down
8 changes: 4 additions & 4 deletions backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey
}

func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error {
logger := log.FromContext(ctx).AppendScope("topic:" + p.topic.Name)
logger := log.FromContext(ctx)
requestKey, err := rpc.RequestKeyFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to get request key: %w", err)
Expand All @@ -78,12 +78,12 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller
})
if err != nil {
timelineEvent.Error = optional.Some(err.Error())
logger.Errorf(err, "Failed to publish message")
return fmt.Errorf("failed to publish message: %w", err)
logger.Errorf(err, "Failed to publish message to %s", p.topic.Name)
return fmt.Errorf("failed to publish message to %s: %w", p.topic.Name, err)
}
timelineEvent.Partition = int(partition)
timelineEvent.Offset = int(offset)
p.timelineClient.Publish(ctx, timelineEvent)
logger.Debugf("Published to partition %v with offset %v)", partition, offset)
logger.Debugf("Published to %v[%v:%v]", p.topic.Name, partition, offset)
return nil
}
11 changes: 10 additions & 1 deletion backend/runner/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"

pb "github.com/block/ftl/backend/protos/xyz/block/ftl/publish/v1"
pbconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/publish/v1/publishpbconnect"
Expand Down Expand Up @@ -43,7 +44,15 @@ func New(module *schema.Module, deployment model.DeploymentKey, verbClient VerbC
if !ok {
continue
}
consumer, err := newConsumer(module.Name, v, subscriber, deployment, verbClient, timelineClient)
var deadLetterPublisher optional.Option[*publisher]
if subscriber.DeadLetter {
p, ok := publishers[schema.DeadLetterNameForSubscriber(v.Name)]
if !ok {
return nil, fmt.Errorf("dead letter publisher not found for subscription %s", v.Name)
}
deadLetterPublisher = optional.Some(p)
}
consumer, err := newConsumer(module.Name, v, subscriber, deployment, deadLetterPublisher, verbClient, timelineClient)
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions backend/runner/pubsub/testdata/go/publisher/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion backend/runner/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"ftl/builtin"
"ftl/publisher"

"github.com/block/ftl/common/reflection"
Expand All @@ -27,12 +28,19 @@ func ConsumeFromLatest(ctx context.Context, req publisher.PubSubEvent) error {
}

//ftl:verb
//ftl:subscribe publisher.topic2 from=beginning
//ftl:subscribe publisher.topic2 from=beginning deadletter
//ftl:retry 2 1s 1s
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
//ftl:subscribe consumeButFailAndRetryFailed from=beginning
func ConsumeFromDeadLetter(ctx context.Context, req builtin.FailedEvent[publisher.PubSubEvent]) error {
ftl.LoggerFromContext(ctx).Infof("ConsumeFromDeadLetter: %v", req.Event.Time)
return nil
}

//ftl:verb
func PublishToExternalModule(ctx context.Context) error {
// Get around compile-time checks
Expand Down
Loading

0 comments on commit 2041cb0

Please sign in to comment.