diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index 9c6b0c034..b8f748529 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -11,10 +12,12 @@ import ( "github.com/IBM/sarama" "github.com/alecthomas/types/optional" "github.com/alecthomas/types/result" + "github.com/jpillora/backoff" "github.com/block/ftl/backend/controller/observability" ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1" "github.com/block/ftl/backend/timeline" + "github.com/block/ftl/common/encoding" "github.com/block/ftl/common/schema" "github.com/block/ftl/common/slices" "github.com/block/ftl/internal/channels" @@ -23,19 +26,20 @@ import ( ) type consumer struct { - moduleName string - deployment model.DeploymentKey - verb *schema.Verb - subscriber *schema.MetadataSubscriber - retryParams schema.RetryParams - group sarama.ConsumerGroup - cancel context.CancelFunc + moduleName string + deployment model.DeploymentKey + verb *schema.Verb + subscriber *schema.MetadataSubscriber + retryParams schema.RetryParams + group sarama.ConsumerGroup + deadLetterPublisher optional.Option[*publisher] verbClient VerbClient timelineClient *timeline.Client } -func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.MetadataSubscriber, deployment model.DeploymentKey, verbClient VerbClient, timelineClient *timeline.Client) (*consumer, error) { +func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.MetadataSubscriber, deployment model.DeploymentKey, + deadLetterPublisher optional.Option[*publisher], verbClient VerbClient, timelineClient *timeline.Client) (*consumer, error) { if verb.Runtime == nil { return nil, fmt.Errorf("subscription %s has no runtime", verb.Name) } @@ -60,11 +64,12 @@ func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.Metada } c := &consumer{ - moduleName: moduleName, - deployment: deployment, - verb: verb, - subscriber: subscriber, - group: group, + moduleName: moduleName, + deployment: deployment, + verb: verb, + subscriber: subscriber, + group: group, + deadLetterPublisher: deadLetterPublisher, verbClient: verbClient, timelineClient: timelineClient, @@ -93,13 +98,7 @@ func (c *consumer) kafkaTopicID() string { func (c *consumer) Begin(ctx context.Context) error { // set up config - logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name) - ctx = log.ContextWithLogger(ctx, logger) - - logger.Debugf("Subscribing to %s", c.kafkaTopicID()) - - ctx, cancel := context.WithCancel(ctx) - c.cancel = cancel + log.FromContext(ctx).Debugf("Starting subscription for %v", c.verb.Name) go c.watchErrors(ctx) go c.subscribe(ctx) @@ -109,7 +108,7 @@ func (c *consumer) Begin(ctx context.Context) error { func (c *consumer) watchErrors(ctx context.Context) { logger := log.FromContext(ctx) for err := range channels.IterContext(ctx, c.group.Errors()) { - logger.Errorf(err, "Consumer group error") + logger.Errorf(err, "Consumer group error for %v", c.verb.Name) } } @@ -129,9 +128,9 @@ func (c *consumer) subscribe(ctx context.Context) { err := c.group.Consume(ctx, []string{c.kafkaTopicID()}, c) if err != nil { - logger.Errorf(err, "Session failed for %s", c.verb.Name) + logger.Errorf(err, "Consumer group session failed for %s", c.verb.Name) } else { - logger.Debugf("Ending session") + logger.Debugf("Ending consumer group session for %s", c.verb.Name) } } } @@ -141,7 +140,7 @@ func (c *consumer) Setup(session sarama.ConsumerGroupSession) error { logger := log.FromContext(session.Context()) partitions := session.Claims()[c.kafkaTopicID()] - logger.Debugf("Starting session with partitions [%v]", strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ",")) + logger.Debugf("Starting session for %v with partitions [%v]", c.verb.Name, strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ",")) return nil } @@ -163,7 +162,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram // Channel closed, rebalance or shutdown needed return nil } - logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset) + logger.Debugf("Consuming message from %v[%v:%v]", c.verb.Name, msg.Partition, msg.Offset) remainingRetries := c.retryParams.Count backoff := c.retryParams.MinBackoff for { @@ -175,15 +174,18 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram case <-ctx.Done(): // Do not commit the message if we did not succeed and the context is done. // No need to retry message either. - logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) + logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset) return nil default: } if remainingRetries == 0 { - logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) + logger.Errorf(err, "Failed to consume message from %v[%v,%v]", c.verb.Name, msg.Partition, msg.Offset) + if !c.publishToDeadLetterTopic(ctx, msg, err) { + return nil + } break } - logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds())) + logger.Errorf(err, "Failed to consume message from %v[%v,%v] and will retry in %vs", c.verb.Name, msg.Partition, msg.Offset, int(backoff.Seconds())) time.Sleep(backoff) remainingRetries-- backoff *= 2 @@ -246,3 +248,43 @@ func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) observability.Calls.Request(ctx, req.Verb, start, optional.None[string]()) return nil } + +// publishToDeadLetterTopic tries to publish the message to the dead letter topic. +// +// If it does not succeed it will retry until it succeeds or the context is done. +// Returns true if the message was published or if there is no dead letter queue. +// Returns false if the context is done. +func (c *consumer) publishToDeadLetterTopic(ctx context.Context, msg *sarama.ConsumerMessage, callErr error) bool { + p, ok := c.deadLetterPublisher.Get() + if !ok { + return true + } + + deadLetterEvent, err := encoding.Marshal(map[string]any{ + "event": json.RawMessage(msg.Value), + "error": callErr.Error(), + }) + if err != nil { + panic(fmt.Errorf("failed to marshal dead letter event for %v on partition %v and offset %v: %w", c.kafkaTopicID(), msg.Partition, msg.Offset, err)) + } + + bo := &backoff.Backoff{Min: time.Second, Max: 10 * time.Second} + first := true + for { + var waitDuration time.Duration + if first { + first = false + } else { + waitDuration = bo.Duration() + } + select { + case <-ctx.Done(): + return false + case <-time.After(waitDuration): + } + err := p.publish(ctx, deadLetterEvent, string(msg.Key), schema.Ref{Module: c.moduleName, Name: c.verb.Name}) + if err == nil { + return true + } + } +} diff --git a/backend/runner/pubsub/integration_test.go b/backend/runner/pubsub/integration_test.go index 20e3d23d7..af10cb4f9 100644 --- a/backend/runner/pubsub/integration_test.go +++ b/backend/runner/pubsub/integration_test.go @@ -57,6 +57,8 @@ func TestRetry(t *testing.T) { retriesPerCall := 2 in.Run(t, in.WithLanguages("java", "go"), + + in.WithPubSub(), in.CopyModule("publisher"), in.CopyModule("subscriber"), in.Deploy("publisher"), @@ -70,6 +72,9 @@ func TestRetry(t *testing.T) { checkConsumed("subscriber", "consumeButFailAndRetry", false, retriesPerCall+1, optional.Some("firstCall")), checkConsumed("subscriber", "consumeButFailAndRetry", false, retriesPerCall+1, optional.Some("secondCall")), + checkPublished("subscriber", "consumeButFailAndRetryFailed", 2), + + in.IfLanguage("go", checkConsumed("subscriber", "consumeFromDeadLetter", true, 2, optional.None[string]())), ) } @@ -227,6 +232,40 @@ func checkConsumed(module, verb string, success bool, count int, needle optional } } } + +func checkPublished(module, topic string, count int) in.Action { + return func(t testing.TB, ic in.TestContext) { + in.Infof("Checking for %v published events for %s.%s", count, module, topic) + resp, err := ic.Timeline.GetTimeline(ic.Context, connect.NewRequest(&timelinepb.GetTimelineRequest{ + Limit: 100000, + Filters: []*timelinepb.GetTimelineRequest_Filter{ + { + Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{ + EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{ + EventTypes: []timelinepb.EventType{ + timelinepb.EventType_EVENT_TYPE_PUBSUB_PUBLISH, + }, + }, + }, + }, + }, + })) + assert.NoError(t, err) + events := slices.Filter(slices.Map(resp.Msg.Events, func(e *timelinepb.Event) *timelinepb.PubSubPublishEvent { + return e.GetPubsubPublish() + }), func(e *timelinepb.PubSubPublishEvent) bool { + if e == nil { + return false + } + if e.Topic != topic { + return false + } + return true + }) + assert.Equal(t, count, len(events), "expected %v published events", count) + } +} + func checkGroupMembership(module, subscription string, expectedCount int) in.Action { return func(t testing.TB, ic in.TestContext) { consumerGroup := module + "." + subscription diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index f0aacd4f7..d1e91c4f8 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -52,7 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey } func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error { - logger := log.FromContext(ctx).AppendScope("topic:" + p.topic.Name) + logger := log.FromContext(ctx) requestKey, err := rpc.RequestKeyFromContext(ctx) if err != nil { return fmt.Errorf("failed to get request key: %w", err) @@ -78,12 +78,12 @@ func (p *publisher) publish(ctx context.Context, data []byte, key string, caller }) if err != nil { timelineEvent.Error = optional.Some(err.Error()) - logger.Errorf(err, "Failed to publish message") - return fmt.Errorf("failed to publish message: %w", err) + logger.Errorf(err, "Failed to publish message to %s", p.topic.Name) + return fmt.Errorf("failed to publish message to %s: %w", p.topic.Name, err) } timelineEvent.Partition = int(partition) timelineEvent.Offset = int(offset) p.timelineClient.Publish(ctx, timelineEvent) - logger.Debugf("Published to partition %v with offset %v)", partition, offset) + logger.Debugf("Published to %v[%v:%v]", p.topic.Name, partition, offset) return nil } diff --git a/backend/runner/pubsub/pubsub.go b/backend/runner/pubsub/pubsub.go index 90d582dcc..214660627 100644 --- a/backend/runner/pubsub/pubsub.go +++ b/backend/runner/pubsub/pubsub.go @@ -5,6 +5,7 @@ import ( "fmt" "connectrpc.com/connect" + "github.com/alecthomas/types/optional" pb "github.com/block/ftl/backend/protos/xyz/block/ftl/publish/v1" pbconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/publish/v1/publishpbconnect" @@ -43,7 +44,15 @@ func New(module *schema.Module, deployment model.DeploymentKey, verbClient VerbC if !ok { continue } - consumer, err := newConsumer(module.Name, v, subscriber, deployment, verbClient, timelineClient) + var deadLetterPublisher optional.Option[*publisher] + if subscriber.DeadLetter { + p, ok := publishers[schema.DeadLetterNameForSubscriber(v.Name)] + if !ok { + return nil, fmt.Errorf("dead letter publisher not found for subscription %s", v.Name) + } + deadLetterPublisher = optional.Some(p) + } + consumer, err := newConsumer(module.Name, v, subscriber, deployment, deadLetterPublisher, verbClient, timelineClient) if err != nil { return nil, err } diff --git a/backend/runner/pubsub/testdata/go/publisher/types.ftl.go b/backend/runner/pubsub/testdata/go/publisher/types.ftl.go index e38f0d244..39df1937e 100644 --- a/backend/runner/pubsub/testdata/go/publisher/types.ftl.go +++ b/backend/runner/pubsub/testdata/go/publisher/types.ftl.go @@ -2,10 +2,10 @@ package publisher import ( - "context" - "github.com/block/ftl/common/reflection" - "github.com/block/ftl/go-runtime/ftl" - "github.com/block/ftl/go-runtime/server" + "context" + "github.com/block/ftl/common/reflection" + "github.com/block/ftl/go-runtime/ftl" + "github.com/block/ftl/go-runtime/server" ) type LocalClient func(context.Context, PubSubEvent) error @@ -23,27 +23,27 @@ type PublishTenLocalClient func(context.Context) error func init() { reflection.Register( reflection.ProvideResourcesForVerb( - Local, + Local, ), reflection.ProvideResourcesForVerb( - PublishOne, + PublishOne, server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "testTopic"), ), reflection.ProvideResourcesForVerb( - PublishOneToTopic2, + PublishOneToTopic2, server.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]]("publisher", "topic2"), ), reflection.ProvideResourcesForVerb( - PublishSlow, + PublishSlow, server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "slowTopic"), ), reflection.ProvideResourcesForVerb( - PublishTen, + PublishTen, server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "testTopic"), ), reflection.ProvideResourcesForVerb( - PublishTenLocal, + PublishTenLocal, server.TopicHandle[PubSubEvent, PartitionMapper]("publisher", "localTopic"), ), ) -} \ No newline at end of file +} diff --git a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go index 8a6dbe5d6..3d76bdf4e 100644 --- a/backend/runner/pubsub/testdata/go/subscriber/subscriber.go +++ b/backend/runner/pubsub/testdata/go/subscriber/subscriber.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "ftl/builtin" "ftl/publisher" "github.com/block/ftl/common/reflection" @@ -27,12 +28,19 @@ func ConsumeFromLatest(ctx context.Context, req publisher.PubSubEvent) error { } //ftl:verb -//ftl:subscribe publisher.topic2 from=beginning +//ftl:subscribe publisher.topic2 from=beginning deadletter //ftl:retry 2 1s 1s func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error { return fmt.Errorf("always error: event %v", req.Time) } +//ftl:verb +//ftl:subscribe consumeButFailAndRetryFailed from=beginning +func ConsumeFromDeadLetter(ctx context.Context, req builtin.FailedEvent[publisher.PubSubEvent]) error { + ftl.LoggerFromContext(ctx).Infof("ConsumeFromDeadLetter: %v", req.Event.Time) + return nil +} + //ftl:verb func PublishToExternalModule(ctx context.Context) error { // Get around compile-time checks diff --git a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go index 708227d14..cbeba7020 100644 --- a/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go +++ b/backend/runner/pubsub/testdata/go/subscriber/types.ftl.go @@ -3,6 +3,7 @@ package subscriber import ( "context" + ftlbuiltin "ftl/builtin" ftlpublisher "ftl/publisher" "github.com/block/ftl/common/reflection" ) @@ -11,6 +12,8 @@ type ConsumeClient func(context.Context, ftlpublisher.PubSubEvent) error type ConsumeButFailAndRetryClient func(context.Context, ftlpublisher.PubSubEvent) error +type ConsumeFromDeadLetterClient func(context.Context, ftlbuiltin.FailedEvent[ftlpublisher.PubSubEvent]) error + type ConsumeFromLatestClient func(context.Context, ftlpublisher.PubSubEvent) error type ConsumeSlowClient func(context.Context, ftlpublisher.PubSubEvent) error @@ -25,6 +28,9 @@ func init() { reflection.ProvideResourcesForVerb( ConsumeButFailAndRetry, ), + reflection.ProvideResourcesForVerb( + ConsumeFromDeadLetter, + ), reflection.ProvideResourcesForVerb( ConsumeFromLatest, ), diff --git a/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java b/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java index baa2c89ee..351ecbc79 100644 --- a/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java +++ b/backend/runner/pubsub/testdata/java/subscriber/src/main/java/xyz/block/ftl/java/test/subscriber/Subscriber.java @@ -1,5 +1,6 @@ package xyz.block.ftl.java.test.subscriber; +import ftl.builtin.FailedEvent; import ftl.publisher.PubSubEvent; import ftl.publisher.TestTopicTopic; import ftl.publisher.Topic2Topic; @@ -7,6 +8,15 @@ import xyz.block.ftl.FromOffset; import xyz.block.ftl.Retry; import xyz.block.ftl.Subscription; +import xyz.block.ftl.Topic; +import xyz.block.ftl.TopicPartitionMapper; +import xyz.block.ftl.WriteableTopic; + +class PartitionMapper implements TopicPartitionMapper> { + public String getPartitionKey(FailedEvent event) { + return event.getEvent().getTime().toString(); + } +} public class Subscriber { @@ -20,9 +30,20 @@ void consumeFromLatest(PubSubEvent event) throws Exception { Log.infof("consumeFromLatest: %s", event.getTime()); } - @Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING) + // Java requires the topic to be explicitly defined as an interface for consuming to work + @Topic("consumeButFailAndRetryFailed") + interface ConsumeButFailAndRetryFailedTopic extends WriteableTopic, PartitionMapper> { + + } + + @Subscription(topic = Topic2Topic.class, from = FromOffset.BEGINNING, deadLetter = true) @Retry(count = 2, minBackoff = "1s", maxBackoff = "1s") public void consumeButFailAndRetry(PubSubEvent event) { throw new RuntimeException("always error: event " + event.getTime()); } + + @Subscription(topic = ConsumeButFailAndRetryFailedTopic.class, from = FromOffset.BEGINNING) + public void consumeFromDeadLetter(FailedEvent event) { + Log.infof("consumeFromDeadLetter: %s", event.getEvent().getTime()); + } } diff --git a/common/schema/builtin.go b/common/schema/builtin.go index 41bb2fdb0..5e24aff95 100644 --- a/common/schema/builtin.go +++ b/common/schema/builtin.go @@ -40,6 +40,12 @@ builtin module builtin { requestType String error String } + + // FailedEvent is used in dead letter topics. + export data FailedEvent { + event Event + error String + } } ` diff --git a/common/schema/metadatasubscriber.go b/common/schema/metadatasubscriber.go index 3e43ed1f9..ac2ab6d62 100644 --- a/common/schema/metadatasubscriber.go +++ b/common/schema/metadatasubscriber.go @@ -66,3 +66,7 @@ func (m *MetadataSubscriber) String() string { } return strings.Join(components, " ") } + +func DeadLetterNameForSubscriber(verb string) string { + return verb + "Failed" +} diff --git a/common/schema/schema_test.go b/common/schema/schema_test.go index 8c2e921b3..d23a6c1e9 100644 --- a/common/schema/schema_test.go +++ b/common/schema/schema_test.go @@ -459,6 +459,32 @@ func TestParsing(t *testing.T) { Modules: []*Module{{ Name: "test", Decls: []Decl{ + &Topic{ + Name: "consumesB1Failed", + Event: &Ref{ + Module: "builtin", + Name: "FailedEvent", + TypeParameters: []Type{ + &Ref{ + Module: "test", + Name: "eventB", + }, + }, + }, + }, + &Topic{ + Name: "consumesBothASubsFailed", + Event: &Ref{ + Module: "builtin", + Name: "FailedEvent", + TypeParameters: []Type{ + &Ref{ + Module: "test", + Name: "eventA", + }, + }, + }, + }, &Topic{ Export: true, Name: "topicA", @@ -580,6 +606,14 @@ func TestParsing(t *testing.T) { Name: "catchesB", }, }, + &MetadataPublisher{ + Topics: []*Ref{ + { + Module: "test", + Name: "consumesB1Failed", + }, + }, + }, }, }, &Verb{ @@ -607,6 +641,14 @@ func TestParsing(t *testing.T) { Name: "catchesA", }, }, + &MetadataPublisher{ + Topics: []*Ref{ + { + Module: "test", + Name: "consumesBothASubsFailed", + }, + }, + }, }, }, }}, diff --git a/common/schema/validate.go b/common/schema/validate.go index 78594bb51..dca4d6d32 100644 --- a/common/schema/validate.go +++ b/common/schema/validate.go @@ -263,6 +263,9 @@ func ValidateModule(module *Module) error { merr = append(merr, err) } scopes = scopes.Push() + + generateDeadLetterTopics(module) + // Key is : duplicateDecls := map[string]Decl{} @@ -925,9 +928,90 @@ func validateVerbSubscriptions(module *Module, v *Verb, md *MetadataSubscriber, if _, ok := v.Response.(*Unit); !ok { merr = append(merr, errorf(md, "verb %s: must be a sink to subscribe but found response type %v", v.Name, v.Response)) } + if md.DeadLetter { + if err := validateDeadLetterTopic(module, v, md); err != nil { + merr = append(merr, err) + } + } return merr } +func validateDeadLetterTopic(module *Module, v *Verb, md *MetadataSubscriber) error { + ref := &Ref{ + Module: module.Name, + Name: DeadLetterNameForSubscriber(v.Name), + } + deadLetterDecl, exists := islices.Find(module.Decls, func(d Decl) bool { + return d.GetName() == ref.Name + }) + if !exists { + return errorf(md, "could not find dead letter topic %q", DeadLetterNameForSubscriber(v.Name)) + } + deadLetterTopic, ok := deadLetterDecl.(*Topic) + if !ok { + return errorf(md, "expected %v to be a dead letter topic but was already declared at %v", ref, deadLetterDecl.Position()) + } + eventType := &Ref{ + Module: "builtin", + Name: "FailedEvent", + TypeParameters: []Type{v.Request}, + } + if deadLetterTopic.Event.String() != eventType.String() { + return errorf(md, "dead letter topic %v must have the same event type (%v) as the subscription request type (%v)", ref, deadLetterTopic.Event, eventType) + } + // declare that this verb publishes to the dead letter topic + publishMetadata, ok := islices.FindVariant[*MetadataPublisher](v.Metadata) + if !ok { + publishMetadata = &MetadataPublisher{} + v.Metadata = append(v.Metadata, publishMetadata) + } + if _, ok := islices.Find(publishMetadata.Topics, func(r *Ref) bool { + return r.Name == ref.Name + }); !ok { + publishMetadata.Topics = append(publishMetadata.Topics, ref) + } + return nil +} + +func generateDeadLetterTopics(module *Module) { + for _, decl := range module.Decls { + verb, ok := decl.(*Verb) + if !ok { + continue + } + for _, md := range verb.Metadata { + sub, ok := md.(*MetadataSubscriber) + if !ok { + continue + } + if !sub.DeadLetter { + continue + } + ref := Ref{ + Module: module.Name, + Name: DeadLetterNameForSubscriber(verb.Name), + } + _, exists := islices.Find(module.Decls, func(d Decl) bool { + return d.GetName() == ref.Name + }) + if exists { + // we validate the existing decl later + continue + } + // create a dead letter topic + deadLetterTopic := &Topic{ + Name: ref.Name, + Event: &Ref{ + Module: "builtin", + Name: "FailedEvent", + TypeParameters: []Type{verb.Request}, + }, + } + module.Decls = append(module.Decls, deadLetterTopic) + } + } +} + func validateRetries(module *Module, retry *MetadataRetry, requestType optional.Option[Type], scopes Scopes, schema optional.Option[*Schema]) (merr []error) { // Validate count if retry.Count != nil && *retry.Count < 0 { @@ -937,7 +1021,7 @@ func validateRetries(module *Module, retry *MetadataRetry, requestType optional. // Validate parsing of durations retryParams, err := retry.RetryParams() if err != nil { - merr = append(merr, errorf(retry, err.Error())) + merr = append(merr, errorf(retry, "%s", err.Error())) return } if retryParams.MaxBackoff < retryParams.MinBackoff { diff --git a/common/schema/validate_test.go b/common/schema/validate_test.go index cb894c366..acde70b8e 100644 --- a/common/schema/validate_test.go +++ b/common/schema/validate_test.go @@ -373,18 +373,31 @@ func TestValidate(t *testing.T) { verb wrongEventType(test.eventA) Unit +subscribe test.topicB from=beginning - verb SourceCantSubscribe(Unit) test.eventB + verb sourceCantSubscribe(Unit) test.eventB +subscribe test.topicB from=latest - verb EmptyCantSubscribe(Unit) Unit + verb emptyCantSubscribe(Unit) Unit +subscribe test.topicB from=beginning + + verb subWithDeadLetter(test.eventA) Unit + +subscribe test.topicA from=beginning deadletter + + topic subWithExistingDeadLetterFailed builtin.FailedEvent + verb subWithExistingDeadLetter(test.eventA) Unit + +subscribe test.topicA from=beginning deadletter + + data subWithClashingDeadLetterFailed {} + verb subWithClashingDeadLetter(test.eventA) Unit + +subscribe topicA from=beginning deadletter } `, errs: []string{ "16:6: verb wrongEventType: request type test.eventA differs from subscription's event type test.eventB", - "19:6: verb SourceCantSubscribe: must be a sink to subscribe but found response type test.eventB", - "19:6: verb SourceCantSubscribe: request type Unit differs from subscription's event type test.eventB", - "22:6: verb EmptyCantSubscribe: request type Unit differs from subscription's event type test.eventB", + "19:6: verb sourceCantSubscribe: must be a sink to subscribe but found response type test.eventB", + "19:6: verb sourceCantSubscribe: request type Unit differs from subscription's event type test.eventB", + "22:6: verb emptyCantSubscribe: request type Unit differs from subscription's event type test.eventB", + "29:6: dead letter topic test.subWithExistingDeadLetterFailed must have the same event type (builtin.FailedEvent) as the subscription request type (builtin.FailedEvent)", + "33:6: expected test.subWithClashingDeadLetterFailed to be a dead letter topic but was already declared at 31:5", `7:5: invalid name: must consist of only letters, numbers and underscores, and start with a lowercase letter.`, }, }, diff --git a/internal/log/logger.go b/internal/log/logger.go index 18429338b..1bae8896a 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -48,16 +48,6 @@ func (l Logger) Scope(scope string) *Logger { return l.Attrs(map[string]string{scopeKey: scope}) } -func (l Logger) AppendScope(scope string) *Logger { - s, ok := l.attributes[scopeKey] - if ok { - s = s + ":" + scope - } else { - s = scope - } - return l.Attrs(map[string]string{scopeKey: s}) -} - func (l Logger) Module(module string) *Logger { return l.Attrs(map[string]string{moduleKey: module}) } diff --git a/internal/lsp/hoveritems.go b/internal/lsp/hoveritems.go index a7624fad7..6d8dacfe3 100644 --- a/internal/lsp/hoveritems.go +++ b/internal/lsp/hoveritems.go @@ -6,7 +6,7 @@ var hoverMap = map[string]string{ "//ftl:enum": "## Type enums (sum types)\n\n[Sum types](https://en.wikipedia.org/wiki/Tagged_union) are supported by FTL's type system, but aren't directly supported by Go. However they can be approximated with the use of [sealed interfaces](https://blog.chewxy.com/2018/03/18/golang-interfaces/). To declare a sum type in FTL use the comment directive `//ftl:enum`:\n\n```go\n//ftl:enum\ntype Animal interface { animal() }\n\ntype Cat struct {}\nfunc (Cat) animal() {}\n\ntype Dog struct {}\nfunc (Dog) animal() {}\n```\n## Value enums\n\nA value enum is an enumerated set of string or integer values.\n\n```go\n//ftl:enum\ntype Colour string\n\nconst (\n Red Colour = \"red\"\n Green Colour = \"green\"\n Blue Colour = \"blue\"\n)\n```\n", "//ftl:ingress": "## HTTP Ingress\n\nVerbs annotated with `ftl:ingress` will be exposed via HTTP (`http` is the default ingress type). These endpoints will then be available on one of our default `ingress` ports (local development defaults to `http://localhost:8891`).\n\nThe following will be available at `http://localhost:8891/http/users/123/posts?postId=456`.\n\n\n```go\ntype GetRequestPathParams struct {\n\tUserID string `json:\"userId\"`\n}\n\ntype GetRequestQueryParams struct {\n\tPostID string `json:\"postId\"`\n}\n\ntype GetResponse struct {\n\tMessage string `json:\"msg\"`\n}\n\n//ftl:ingress GET /http/users/{userId}/posts\nfunc Get(ctx context.Context, req builtin.HttpRequest[ftl.Unit, GetRequestPathParams, GetRequestQueryParams]) (builtin.HttpResponse[GetResponse, ErrorResponse], error) {\n // ...\n}\n```\n\nBecause the example above only has a single path parameter it can be simplified by just using a scalar such as `string` or `int64` as the path parameter type:\n\n```go\n\n//ftl:ingress GET /http/users/{userId}/posts\nfunc Get(ctx context.Context, req builtin.HttpRequest[ftl.Unit, int64, GetRequestQueryParams]) (builtin.HttpResponse[GetResponse, ErrorResponse], error) {\n // ...\n}\n```\n\n> **NOTE!**\n> The `req` and `resp` types of HTTP `ingress` [verbs](../verbs) must be `builtin.HttpRequest` and `builtin.HttpResponse` respectively. These types provide the necessary fields for HTTP `ingress` (`headers`, `statusCode`, etc.)\n>\n> You will need to import `ftl/builtin`.\n\nKey points:\n\n- `ingress` verbs will be automatically exported by default.\n\n## Field mapping\n\nThe `HttpRequest` request object takes 3 type parameters, the body, the path parameters and the query parameters.\n\nGiven the following request verb:\n\n```go\n\ntype PostBody struct{\n\tTitle string `json:\"title\"`\n\tContent string `json:\"content\"`\n\tTag ftl.Option[string] `json:\"tag\"`\n}\ntype PostPathParams struct {\n\tUserID string `json:\"userId\"`\n\tPostID string `json:\"postId\"`\n}\n\ntype PostQueryParams struct {\n\tPublish boolean `json:\"publish\"`\n}\n\n//ftl:ingress http PUT /users/{userId}/posts/{postId}\nfunc Get(ctx context.Context, req builtin.HttpRequest[PostBody, PostPathParams, PostQueryParams]) (builtin.HttpResponse[GetResponse, string], error) {\n\treturn builtin.HttpResponse[GetResponse, string]{\n\t\tHeaders: map[string][]string{\"Get\": {\"Header from FTL\"}},\n\t\tBody: ftl.Some(GetResponse{\n\t\t\tMessage: fmt.Sprintf(\"UserID: %s, PostID: %s, Tag: %s\", req.pathParameters.UserID, req.pathParameters.PostID, req.Body.Tag.Default(\"none\")),\n\t\t}),\n\t}, nil\n}\n```\n\nThe rules for how each element is mapped are slightly different, as they have a different structure:\n\n- The body is mapped directly to the body of the request, generally as a JSON object. Scalars are also supported, as well as []byte to get the raw body. If they type is `any` then it will be assumed to be JSON and mapped to the appropriate types based on the JSON structure.\n- The path parameters can be mapped directly to an object with field names corresponding to the name of the path parameter. If there is only a single path parameter it can be injected directly as a scalar. They can also be injected as a `map[string]string`.\n- The path parameters can also be mapped directly to an object with field names corresponding to the name of the path parameter. They can also be injected directly as a `map[string]string`, or `map[string][]string` for multiple values.\n\n#### Optional fields\n\nOptional fields are represented by the `ftl.Option` type. The `Option` type is a wrapper around the actual type and can be `Some` or `None`. In the example above, the `Tag` field is optional.\n\n```sh\ncurl -i http://localhost:8891/users/123/posts/456\n```\n\nBecause the `tag` query parameter is not provided, the response will be:\n\n```json\n{\n \"msg\": \"UserID: 123, PostID: 456, Tag: none\"\n}\n```\n\n#### Casing\n\nField names use lowerCamelCase by default. You can override this by using the `json` tag.\n\n## SumTypes\n\nGiven the following request verb:\n\n```go\n//ftl:enum export\ntype SumType interface {\n\ttag()\n}\n\ntype A string\n\nfunc (A) tag() {}\n\ntype B []string\n\nfunc (B) tag() {}\n\n//ftl:ingress http POST /typeenum\nfunc TypeEnum(ctx context.Context, req builtin.HttpRequest[SumType, ftl.Unit, ftl.Unit]) (builtin.HttpResponse[SumType, string], error) {\n\treturn builtin.HttpResponse[SumType, string]{Body: ftl.Some(req.Body)}, nil\n}\n```\n\nThe following curl request will map the `SumType` name and value to the `req.Body`:\n\n```sh\ncurl -X POST \"http://localhost:8891/typeenum\" \\\n -H \"Content-Type: application/json\" \\\n --data '{\"name\": \"A\", \"value\": \"sample\"}'\n```\n\nThe response will be:\n\n```json\n{\n \"name\": \"A\",\n \"value\": \"sample\"\n}\n```\n\n## Encoding query params as JSON\n\nComplex query params can also be encoded as JSON using the `@json` query parameter. For example:\n\n> `{\"tag\":\"ftl\"}` url-encoded is `%7B%22tag%22%3A%22ftl%22%7D`\n\n```bash\ncurl -i http://localhost:8891/users/123/posts/456?@json=%7B%22tag%22%3A%22ftl%22%7D\n```\n\n\n\n", "//ftl:retry": "## Retries\n\nSome FTL features allow specifying a retry policy via a Go comment directive. Retries back off exponentially until the maximum is reached.\n\nThe directive has the following syntax:\n\n\n```go\n//ftl:retry [] [] [catch ]\n```\n\n\nFor example, the following function will retry up to 10 times, with a delay of 5s, 10s, 20s, 40s, 60s, 60s, etc.\n\n\n```go\n//ftl:retry 10 5s 1m\nfunc Process(ctx context.Context, in Invoice) error {\n // ...\n}\n```\n\n### PubSub\n\nSubscribers can have a retry policy. For example:\n\n\n```go\n//ftl:retry 5 1s catch recoverPaymentProcessing\nfunc ProcessPayment(ctx context.Context, payment Payment) error {\n...\n}\n```\n\n\n## Catching\nAfter all retries have failed, a catch verb can be used to safely recover.\n\nThese catch verbs have a request type of `builtin.CatchRequest` and no response type. If a catch verb returns an error, it will be retried until it succeeds so it is important to handle errors carefully.\n\n\n\n```go\n//ftl:retry 5 1s catch recoverPaymentProcessing\nfunc ProcessPayment(ctx context.Context, payment Payment) error {\n...\n}\n\n//ftl:verb\nfunc RecoverPaymentProcessing(ctx context.Context, request builtin.CatchRequest[Payment]) error {\n// safely handle final failure of the payment\n}\n```\n", - "//ftl:subscribe": "## PubSub\n\nFTL has first-class support for PubSub, modelled on the concepts of topics (where events are sent) and subscribers (a verb which consumes events). Subscribers are, as you would expect, sinks. Each subscriber is a cursor over the topic it is associated with. Each topic may have multiple subscriptions. Each published event has an at least once delivery guarantee for each subscription.\n\n\nFirst, declare a new topic:\n\n```go\npackage payments\n\ntype Invoices = ftl.TopicHandle[Invoice, ftl.SinglePartitionMap[Invoice]]\n```\n\nNote that the name of the topic as represented in the FTL schema is the lower camel case version of the type name.\n\nThe `Invoices` type is a handle to the topic. It is a generic type that takes two arguments: the event type and the partition map type. The partition map type is used to map events to partitions. In this case, we are using a single partition map, which means that all events are sent to the same partition.\n\nThen define a Sink to consume from the topic:\n\n```go\n//ftl:subscribe payments.invoices from=beginning\nfunc SendInvoiceEmail(ctx context.Context, in Invoice) error {\n // ...\n}\n```\n\nEvents can be published to a topic by injecting the topic type into a verb:\n\n```go\nfunc PublishInvoice(ctx context.Context, topic Invoices) error {\n topic.Publish(ctx, Invoice{...})\n // ...\n}\n```\n\n> **NOTE!**\n> PubSub topics cannot be published to from outside the module that declared them, they can only be subscribed to. That is, if a topic is declared in module `A`, module `B` cannot publish to it.\n", + "//ftl:subscribe": "## PubSub\n\nFTL has first-class support for PubSub, modelled on the concepts of topics (where events are sent) and subscribers (a verb which consumes events). Subscribers are, as you would expect, sinks. Each subscriber is a cursor over the topic it is associated with. Each topic may have multiple subscriptions. Each published event has an at least once delivery guarantee for each subscription.\n\n\nFirst, declare a new topic:\n\n```go\npackage payments\n\nimport (\n \"github.com/block/ftl/go-runtime/ftl\"\n)\ntype Invoice struct {\n InvoiceNo string\n}\n\n//ftl:export\ntype Invoices = ftl.TopicHandle[Invoice, ftl.SinglePartitionMap[Invoice]]\n```\n\nNote that the name of the topic as represented in the FTL schema is the lower camel case version of the type name.\n\nThe `Invoices` type is a handle to the topic. It is a generic type that takes two arguments: the event type and the partition map type. The partition map type is used to map events to partitions. In this case, we are using a single partition map, which means that all events are sent to the same partition.\n\nThen define a Sink to consume from the topic:\n\n```go\n//ftl:subscribe payments.invoices from=beginning\nfunc SendInvoiceEmail(ctx context.Context, in Invoice) error {\n // ...\n}\n```\n\nEvents can be published to a topic by injecting the topic type into a verb:\n\n```go\nfunc PublishInvoice(ctx context.Context, topic Invoices) error {\n topic.Publish(ctx, Invoice{...})\n // ...\n}\n```\n\n> **NOTE!**\n> PubSub topics cannot be published to from outside the module that declared them, they can only be subscribed to. That is, if a topic is declared in module `A`, module `B` cannot publish to it.\n", "//ftl:typealias": "## Type aliases\n\nA type alias is an alternate name for an existing type. It can be declared like so:\n\n```go\n//ftl:typealias\ntype Alias Target\n```\nor\n```go\n//ftl:typealias\ntype Alias = Target\n```\n\neg.\n\n```go\n//ftl:typealias\ntype UserID string\n\n//ftl:typealias\ntype UserToken = string\n```\n", "//ftl:verb": "## Verbs\n\n## Defining Verbs\n\n\nTo declare a Verb, write a normal Go function with the following signature, annotated with the Go [comment directive](https://tip.golang.org/doc/comment#syntax) `//ftl:verb`:\n\n```go\n//ftl:verb\nfunc F(context.Context, In) (Out, error) { }\n```\n\neg.\n\n```go\ntype EchoRequest struct {}\n\ntype EchoResponse struct {}\n\n//ftl:verb\nfunc Echo(ctx context.Context, in EchoRequest) (EchoResponse, error) {\n // ...\n}\n```\n\n\nBy default verbs are only [visible](../visibility) to other verbs in the same module.\n\n## Calling Verbs\n\n\nTo call a verb, import the module's verb client (`{ModuleName}.{VerbName}Client`), add it to your verb's signature, then invoke it as a function. eg.\n\n```go\n//ftl:verb\nfunc Echo(ctx context.Context, in EchoRequest, tc time.TimeClient) (EchoResponse, error) {\n\tout, err := tc(ctx, TimeRequest{...})\n}\n```\n\nVerb clients are generated by FTL. If the callee verb belongs to the same module as the caller, you must build the \nmodule first (with callee verb defined) in order to generate its client for use by the caller. Local verb clients are \navailable in the generated `types.ftl.go` file as `{VerbName}Client`.\n\n", }