Skip to content

Commit

Permalink
feat: pubsub retries (#1683)
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e authored Jun 6, 2024
1 parent 8f3be89 commit c9d63f8
Show file tree
Hide file tree
Showing 17 changed files with 267 additions and 81 deletions.
10 changes: 10 additions & 0 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,22 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
Module: moduleSchema.Name,
Name: v.Name,
}
retryParams := schema.RetryParams{}
if retryMd, ok := slices.FindVariant[*schema.MetadataRetry](v.Metadata); ok {
retryParams, err = retryMd.RetryParams()
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not parse retry parameters for %q: %w", v.Name, err)
}
}
err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{
Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name),
Module: moduleSchema.Name,
SubscriptionName: s.Name,
Deployment: deploymentKey,
Sink: sinkRef,
RetryAttempts: int32(retryParams.Count),
Backoff: retryParams.MinBackoff,
MaxBackoff: retryParams.MaxBackoff,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err))
Expand Down
14 changes: 9 additions & 5 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
continue
}

sink, err := tx.db.GetRandomSubscriberSink(ctx, subscription.Key)
subscriber, err := tx.db.GetRandomSubscriber(ctx, subscription.Key)
if err != nil {
return 0, fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
logger.Tracef("no subscriber for subscription %s", subscription.Key)
continue
}

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
Expand All @@ -88,9 +89,12 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
},
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: sink,
Origin: origin.String(),
Request: nextCursor.Payload,
Verb: subscriber.Sink,
Origin: origin.String(),
Request: nextCursor.Payload,
RemainingAttempts: subscriber.RetryAttempts,
Backoff: subscriber.Backoff,
MaxBackoff: subscriber.MaxBackoff,
})
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
Expand Down
72 changes: 49 additions & 23 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ func TestPubSub(t *testing.T) {
// check that there are the right amount of successful async calls
in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
FROM async_calls
WHERE
state = 'success'
AND origin = '%s'
SELECT COUNT(*)
FROM async_calls
WHERE
state = 'success'
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "test_subscription"}}.String()),
events),
)
}

func TestPubSubConsumptionDelay(t *testing.T) {
func TestConsumptionDelay(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
Expand All @@ -50,9 +50,9 @@ func TestPubSubConsumptionDelay(t *testing.T) {
// pubsub should trigger its poll a few times during this period
// each time it should continue processing each event until it reaches one that is too new to process
func(t testing.TB, ic in.TestContext) {
for i := 0; i < 60; i++ {
for i := 0; i < 120; i++ {
in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic)
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 25)
}
},

Expand All @@ -61,22 +61,48 @@ func TestPubSubConsumptionDelay(t *testing.T) {
// Get all event created ats, and all async call created ats
// Compare each, make sure none are less than 0.2s of each other
in.QueryRow("ftl", `
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from topic_events order by created_at
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from topic_events order by created_at
)
),
async_call_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from async_calls ac order by created_at
)
)
),
async_call_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from async_calls ac order by created_at
)
)
SELECT COUNT(*)
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2;
SELECT COUNT(*)
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2;
`, 0),
)
}

func TestRetry(t *testing.T) {
retriesPerCall := 2
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish events
in.Call("publisher", "publishOneToTopic2", in.Obj{}, func(t testing.TB, resp in.Obj) {}),

in.Sleep(time.Second*6),

// check that there are the right amount of failed async calls
in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
FROM async_calls
WHERE
state = 'error'
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomed_subscription"}}.String()),
1+retriesPerCall),
)
}
25 changes: 14 additions & 11 deletions backend/controller/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@ func PublishTen(ctx context.Context) error {
if err != nil {
return err
}
time.Sleep(time.Microsecond * 20)
}
return nil
}

//ftl:verb
func PublishOne(ctx context.Context) error {
logger := ftl.LoggerFromContext(ctx)
for i := 0; i < 10; i++ {
t := time.Now()
logger.Infof("Publishing %v", t)
err := topic.Publish(ctx, PubSubEvent{Time: t})
if err != nil {
return err
}
time.Sleep(time.Microsecond * 20)
}
return nil
t := time.Now()
logger.Infof("Publishing %v", t)
return topic.Publish(ctx, PubSubEvent{Time: t})
}

//ftl:export
var topic2 = ftl.Topic[PubSubEvent]("topic2")

//ftl:verb
func PublishOneToTopic2(ctx context.Context) error {
logger := ftl.LoggerFromContext(ctx)
t := time.Now()
logger.Infof("Publishing to topic2 %v", t)
return topic2.Publish(ctx, PubSubEvent{Time: t})
}
13 changes: 11 additions & 2 deletions backend/controller/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package subscriber

import (
"context"
"fmt"
"ftl/publisher"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
Expand All @@ -12,7 +13,15 @@ var _ = ftl.Subscription(publisher.Test_topic, "test_subscription")
//ftl:verb
//ftl:subscribe test_subscription
func Consume(ctx context.Context, req publisher.PubSubEvent) error {
logger := ftl.LoggerFromContext(ctx)
logger.Infof("Subscriber is processing %v", req.Time)
ftl.LoggerFromContext(ctx).Infof("Subscriber is consuming %v", req.Time)
return nil
}

var _ = ftl.Subscription(publisher.Topic2, "doomed_subscription")

//ftl:verb
//ftl:subscribe doomed_subscription
//ftl:retry 2 1s 1s
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}
3 changes: 3 additions & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 19 additions & 4 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,15 @@ UPDATE SET
RETURNING id;

-- name: InsertSubscriber :exec
INSERT INTO topic_subscribers (key, topic_subscriptions_id, deployment_id, sink)
INSERT INTO topic_subscribers (
key,
topic_subscriptions_id,
deployment_id,
sink,
retry_attempts,
backoff,
max_backoff
)
VALUES (
sqlc.arg('key')::subscriber_key,
(
Expand All @@ -657,7 +665,11 @@ VALUES (
AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT
),
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key),
sqlc.arg('sink'));
sqlc.arg('sink'),
sqlc.arg('retry_attempts'),
sqlc.arg('backoff')::interval,
sqlc.arg('max_backoff')::interval
);

-- name: PublishEventForTopic :exec
INSERT INTO topic_events (
Expand Down Expand Up @@ -713,9 +725,12 @@ WHERE topics.key = sqlc.arg('topic')::topic_key
ORDER BY events.created_at, events.id
LIMIT 1;

-- name: GetRandomSubscriberSink :one
-- name: GetRandomSubscriber :one
SELECT
subscribers.sink as sink
subscribers.sink as sink,
subscribers.retry_attempts as retry_attempts,
subscribers.backoff as backoff,
subscribers.max_backoff as max_backoff
FROM topic_subscribers as subscribers
JOIN deployments ON subscribers.deployment_id = deployments.id
JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id
Expand Down
51 changes: 42 additions & 9 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,18 @@ CREATE DOMAIN subscriber_key AS TEXT;
--
-- A subscriber is a 1:1 mapping between a subscription and a sink.
CREATE TABLE topic_subscribers (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
"key" subscriber_key UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),

id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
"key" subscriber_key UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
topic_subscriptions_id BIGINT NOT NULL REFERENCES topic_subscriptions(id),

deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
-- Name of the verb to call on the deployment.
sink schema_ref NOT NULL
sink schema_ref NOT NULL,

-- retry options
retry_attempts INT NOT NULL,
backoff INTERVAL NOT NULL,
max_backoff INTERVAL NOT NULL
);

CREATE INDEX topic_subscribers_subscription_idx ON topic_subscribers (topic_subscriptions_id);
Expand Down
Loading

0 comments on commit c9d63f8

Please sign in to comment.