Skip to content

Commit

Permalink
feat: pubsub follows retry policies (#3729)
Browse files Browse the repository at this point in the history
closes #3491
  • Loading branch information
matt2e authored Dec 12, 2024
1 parent 855f219 commit 7eeef45
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 266 deletions.
126 changes: 83 additions & 43 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
)

type consumer struct {
moduleName string
deployment model.DeploymentKey
verb *schema.Verb
subscriber *schema.MetadataSubscriber
moduleName string
deployment model.DeploymentKey
verb *schema.Verb
subscriber *schema.MetadataSubscriber
retryParams schema.RetryParams

verbClient VerbClient
timelineClient *timeline.Client
Expand All @@ -48,6 +49,16 @@ func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.Metada
verbClient: verbClient,
timelineClient: timelineClient,
}
retryMetada, ok := slices.FindVariant[*schema.MetadataRetry](verb.Metadata)
if ok {
retryParams, err := retryMetada.RetryParams()
if err != nil {
return nil, fmt.Errorf("failed to parse retry params for subscription %s: %w", verb.Name, err)
}
c.retryParams = retryParams
} else {
c.retryParams = schema.RetryParams{}
}

return c, nil
}
Expand Down Expand Up @@ -139,48 +150,77 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
ctx := session.Context()
logger := log.FromContext(ctx)
for msg := range claim.Messages() {
start := time.Now()

requestKey := model.NewRequestKey(model.OriginPubsub, schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.String())
logger.Debugf("%s: consuming message (%v:%v): %v", c.verb.Name, msg.Partition, msg.Offset, string(msg.Value))
destRef := &schema.Ref{
Module: c.moduleName,
Name: c.verb.Name,
}
req := &ftlv1.CallRequest{
Verb: schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.ToProto(),
Body: msg.Value,
}
consumeEvent := timeline.PubSubConsume{
DeploymentKey: c.deployment,
RequestKey: optional.Some(requestKey.String()),
Time: time.Now(),
DestVerb: optional.Some(destRef.ToRefKey()),
Topic: c.subscriber.Topic.String(),
Partition: int(msg.Partition),
Offset: int(msg.Offset),
}
callEvent := &timeline.Call{
DeploymentKey: c.deployment,
RequestKey: requestKey,
StartTime: start,
DestVerb: destRef,
Callers: []*schema.Ref{},
Request: req,
logger.Debugf("%s: consuming message %v:%v", c.verb.Name, msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset))
if err == nil {
break
}
if remainingRetries == 0 {
logger.Errorf(err, "%s: failed to consume message %v:%v", c.verb.Name, msg.Partition, msg.Offset)
break
}
logger.Errorf(err, "%s: failed to consume message %v:%v: retrying in %vs", c.verb.Name, msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
if backoff > c.retryParams.MaxBackoff {
backoff = c.retryParams.MaxBackoff
}
}
session.MarkMessage(msg, "")
}
return nil
}

resp, err := c.verbClient.Call(session.Context(), connect.NewRequest(req))
if err != nil {
consumeEvent.Error = optional.Some(err.Error())
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Verb, start, optional.Some("verb call failed"))
} else {
callEvent.Response = result.Ok(resp.Msg)
observability.Calls.Request(ctx, req.Verb, start, optional.None[string]())
func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) error {
start := time.Now()

requestKey := model.NewRequestKey(model.OriginPubsub, schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.String())
destRef := &schema.Ref{
Module: c.moduleName,
Name: c.verb.Name,
}
req := &ftlv1.CallRequest{
Verb: schema.RefKey{Module: c.moduleName, Name: c.verb.Name}.ToProto(),
Body: body,
}
consumeEvent := timeline.PubSubConsume{
DeploymentKey: c.deployment,
RequestKey: optional.Some(requestKey.String()),
Time: time.Now(),
DestVerb: optional.Some(destRef.ToRefKey()),
Topic: c.subscriber.Topic.String(),
Partition: partition,
Offset: offset,
}
defer c.timelineClient.Publish(ctx, consumeEvent)

callEvent := &timeline.Call{
DeploymentKey: c.deployment,
RequestKey: requestKey,
StartTime: start,
DestVerb: destRef,
Callers: []*schema.Ref{},
Request: req,
}
defer c.timelineClient.Publish(ctx, callEvent)

resp, callErr := c.verbClient.Call(ctx, connect.NewRequest(req))
if callErr == nil {
if errResp, ok := resp.Msg.Response.(*ftlv1.CallResponse_Error_); ok {
callErr = fmt.Errorf("verb call failed: %s", errResp.Error.Message)
}
session.MarkMessage(msg, "")
c.timelineClient.Publish(ctx, consumeEvent)
c.timelineClient.Publish(ctx, callEvent)
}
if callErr != nil {
consumeEvent.Error = optional.Some(callErr.Error())
callEvent.Response = result.Err[*ftlv1.CallResponse](callErr)
observability.Calls.Request(ctx, req.Verb, start, optional.Some("verb call failed"))
return callErr
}
callEvent.Response = result.Ok(resp.Msg)
observability.Calls.Request(ctx, req.Verb, start, optional.None[string]())
return nil
}
146 changes: 52 additions & 94 deletions backend/runner/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package pubsub

import (
"strings"
"testing"
"time"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
)

func TestPubSub(t *testing.T) {
Expand All @@ -31,12 +33,52 @@ func TestPubSub(t *testing.T) {
in.Sleep(time.Second*4),

// check that there are the right amount of successful async calls
checkSuccessfullyConsumed("subscriber", "consume", events),
checkConsumed("subscriber", "consume", true, events, optional.None[string]()),
)
}

func checkSuccessfullyConsumed(module, verb string, count int) in.Action {
func TestRetry(t *testing.T) {
retriesPerCall := 2
in.Run(t,
in.WithLanguages("java", "go"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish events
in.Call("publisher", "publishOneToTopic2", map[string]any{"haystack": "firstCall"}, func(t testing.TB, resp in.Obj) {}),
in.Call("publisher", "publishOneToTopic2", map[string]any{"haystack": "secondCall"}, func(t testing.TB, resp in.Obj) {}),

in.Sleep(time.Second*7),

checkConsumed("subscriber", "consumeButFailAndRetry", false, retriesPerCall+1, optional.Some("firstCall")),
checkConsumed("subscriber", "consumeButFailAndRetry", false, retriesPerCall+1, optional.Some("secondCall")),
)
}

func TestExternalPublishRuntimeCheck(t *testing.T) {
// No java as there is no API for this
in.Run(t,
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

in.ExpectError(
in.Call("subscriber", "publishToExternalModule", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
"can not publish to another module's topic",
),
)
}

func checkConsumed(module, verb string, success bool, count int, needle optional.Option[string]) in.Action {
return func(t testing.TB, ic in.TestContext) {
if needle, ok := needle.Get(); ok {
in.Infof("Checking for %v call(s) with needle %v", count, needle)
} else {
in.Infof("Checking for %v call(s)", count)
}
resp, err := ic.Timeline.GetTimeline(ic.Context, connect.NewRequest(&timelinepb.GetTimelineRequest{
Limit: 100000,
Filters: []*timelinepb.GetTimelineRequest_Filter{
Expand Down Expand Up @@ -70,6 +112,9 @@ func checkSuccessfullyConsumed(module, verb string, count int) in.Action {
requestKey, err := model.ParseRequestKey(*c.RequestKey)
assert.NoError(t, err)
assert.Equal(t, requestKey.Payload.Origin, model.OriginPubsub, "expected pubsub origin")
if needle, ok := needle.Get(); ok && !strings.Contains(c.Request, needle) {
return false
}
return true
})
successfulCalls := slices.Filter(calls, func(call *timelinepb.CallEvent) bool {
Expand All @@ -78,97 +123,10 @@ func checkSuccessfullyConsumed(module, verb string, count int) in.Action {
unsuccessfulCalls := slices.Filter(calls, func(call *timelinepb.CallEvent) bool {
return call.Error != nil
})
assert.Equal(t, count, len(successfulCalls), "expected %v successful calls, the following calls failed:\n%v", count, unsuccessfulCalls)
assert.NoError(t, err)
if success {
assert.Equal(t, count, len(successfulCalls), "expected %v successful calls (failed calls: %v)", count, len(unsuccessfulCalls))
} else {
assert.Equal(t, count, len(unsuccessfulCalls), "expected %v unsuccessful calls (successful calls: %v)", count, len(successfulCalls))
}
}
}

func TestRetry(t *testing.T) {
t.Skip("Not implemented for new pubsub yet")
// retriesPerCall := 2
// in.Run(t,
// in.WithLanguages("java", "go"),
// in.CopyModule("publisher"),
// in.CopyModule("subscriber"),
// in.Deploy("publisher"),
// in.Deploy("subscriber"),

// // publish events
// in.Call("publisher", "publishOneToTopic2", in.Obj{}, func(t testing.TB, resp in.Obj) {}),

// in.Sleep(time.Second*6),

// // check that there are the right amount of failed async calls to the verb
// in.QueryRow("ftl",
// fmt.Sprintf(`
// SELECT COUNT(*)
// FROM async_calls
// WHERE
// state = 'error'
// AND verb = 'subscriber.consumeButFailAndRetry'
// AND catching = false
// AND origin = '%s'
// `, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndRetry"}}.String()),
// 1+retriesPerCall),

// // check that there is one failed attempt to catch (we purposely fail the first one)
// in.QueryRow("ftl",
// fmt.Sprintf(`
// SELECT COUNT(*)
// FROM async_calls
// WHERE
// state = 'error'
// AND verb = 'subscriber.consumeButFailAndRetry'
// AND error LIKE '%%subscriber.catch %%'
// AND error LIKE '%%catching error%%'
// AND catching = true
// AND origin = '%s'
// `, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndRetry"}}.String()),
// 1),

// // check that there is one successful attempt to catch (we succeed the second one as long as we receive the correct error in the request)
// in.QueryRow("ftl",
// fmt.Sprintf(`
// SELECT COUNT(*)
// FROM async_calls
// WHERE
// state = 'success'
// AND verb = 'subscriber.consumeButFailAndRetry'
// AND error IS NULL
// AND catching = true
// AND origin = '%s'
// `, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndRetry"}}.String()),
// 1),

// // check that there was one successful attempt to catchAny
// in.QueryRow("ftl",
// fmt.Sprintf(`
// SELECT COUNT(*)
// FROM async_calls
// WHERE
// state = 'success'
// AND verb = 'subscriber.consumeButFailAndCatchAny'
// AND error IS NULL
// AND catching = true
// AND origin = '%s'
//
// `, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "consumeButFailAndCatchAny"}}.String()),
//
// 1),
// )
}

func TestExternalPublishRuntimeCheck(t *testing.T) {
// No java as there is no API for this
in.Run(t,
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

in.ExpectError(
in.Call("subscriber", "publishToExternalModule", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
"can not publish to another module's topic",
),
)
}
15 changes: 12 additions & 3 deletions backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
type TestTopic = ftl.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]]

type PubSubEvent struct {
Time time.Time
Time time.Time
Haystack string
}

//ftl:verb
Expand Down Expand Up @@ -40,10 +41,18 @@ func PublishOne(ctx context.Context, topic TestTopic) error {
//ftl:export
type Topic2 = ftl.TopicHandle[PubSubEvent, ftl.SinglePartitionMap[PubSubEvent]]

//ftl:data
type PublishOneToTopic2Request struct {
Haystack string
}

//ftl:verb
func PublishOneToTopic2(ctx context.Context, topic Topic2) error {
func PublishOneToTopic2(ctx context.Context, req PublishOneToTopic2Request, topic Topic2) error {
logger := ftl.LoggerFromContext(ctx)
t := time.Now()
logger.Infof("Publishing to topic_2 %v", t)
return topic.Publish(ctx, PubSubEvent{Time: t})
return topic.Publish(ctx, PubSubEvent{
Time: t,
Haystack: req.Haystack,
})
}
2 changes: 1 addition & 1 deletion backend/runner/pubsub/testdata/go/publisher/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions backend/runner/pubsub/testdata/go/subscriber/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@ module ftl/subscriber

go 1.23.0

require (
github.com/TBD54566975/ftl v0.238.0
github.com/alecthomas/atomic v0.1.0-alpha2
)
require github.com/TBD54566975/ftl v0.238.0

require (
al.essio.dev/pkg/shellescape v1.5.1 // indirect
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.17.0 // indirect
Expand Down
Loading

0 comments on commit 7eeef45

Please sign in to comment.