From c9d63f8cc4a0297d409d99adffeea1296385f67c Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 6 Jun 2024 16:38:42 +1000 Subject: [PATCH] feat: pubsub retries (#1683) --- backend/controller/dal/dal.go | 10 +++ backend/controller/dal/pubsub.go | 14 ++-- backend/controller/pubsub/integration_test.go | 72 +++++++++++++------ .../pubsub/testdata/go/publisher/publisher.go | 25 ++++--- .../testdata/go/subscriber/subscriber.go | 13 +++- backend/controller/sql/models.go | 3 + backend/controller/sql/querier.go | 2 +- backend/controller/sql/queries.sql | 23 ++++-- backend/controller/sql/queries.sql.go | 51 ++++++++++--- backend/controller/sql/schema/001_init.sql | 17 +++-- backend/schema/schema_test.go | 5 ++ backend/schema/validate.go | 33 ++++----- backend/schema/validate_test.go | 4 +- go-runtime/compile/schema_test.go | 1 + go-runtime/compile/testdata/pubsub/pubsub.go | 1 + go-runtime/ftl/testdata/go/mapper/go.mod | 20 +++++- go-runtime/ftl/testdata/go/mapper/go.sum | 54 ++++++++++++++ 17 files changed, 267 insertions(+), 81 deletions(-) diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 5050788e3f..7e9d39c16d 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -596,12 +596,22 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem Module: moduleSchema.Name, Name: v.Name, } + retryParams := schema.RetryParams{} + if retryMd, ok := slices.FindVariant[*schema.MetadataRetry](v.Metadata); ok { + retryParams, err = retryMd.RetryParams() + if err != nil { + return model.DeploymentKey{}, fmt.Errorf("could not parse retry parameters for %q: %w", v.Name, err) + } + } err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{ Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name), Module: moduleSchema.Name, SubscriptionName: s.Name, Deployment: deploymentKey, Sink: sinkRef, + RetryAttempts: int32(retryParams.Count), + Backoff: retryParams.MinBackoff, + MaxBackoff: retryParams.MaxBackoff, }) if err != nil { return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err)) diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index d26f08bd4c..da63ffd5b0 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -71,9 +71,10 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t continue } - sink, err := tx.db.GetRandomSubscriberSink(ctx, subscription.Key) + subscriber, err := tx.db.GetRandomSubscriber(ctx, subscription.Key) if err != nil { - return 0, fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err)) + logger.Tracef("no subscriber for subscription %s", subscription.Key) + continue } err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey) @@ -88,9 +89,12 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t }, } _, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ - Verb: sink, - Origin: origin.String(), - Request: nextCursor.Payload, + Verb: subscriber.Sink, + Origin: origin.String(), + Request: nextCursor.Payload, + RemainingAttempts: subscriber.RetryAttempts, + Backoff: subscriber.Backoff, + MaxBackoff: subscriber.MaxBackoff, }) if err != nil { return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err)) diff --git a/backend/controller/pubsub/integration_test.go b/backend/controller/pubsub/integration_test.go index c9e462483c..2ac2140fdf 100644 --- a/backend/controller/pubsub/integration_test.go +++ b/backend/controller/pubsub/integration_test.go @@ -29,17 +29,17 @@ func TestPubSub(t *testing.T) { // check that there are the right amount of successful async calls in.QueryRow("ftl", fmt.Sprintf(` - SELECT COUNT(*) - FROM async_calls - WHERE - state = 'success' - AND origin = '%s' + SELECT COUNT(*) + FROM async_calls + WHERE + state = 'success' + AND origin = '%s' `, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "test_subscription"}}.String()), events), ) } -func TestPubSubConsumptionDelay(t *testing.T) { +func TestConsumptionDelay(t *testing.T) { in.Run(t, "", in.CopyModule("publisher"), in.CopyModule("subscriber"), @@ -50,9 +50,9 @@ func TestPubSubConsumptionDelay(t *testing.T) { // pubsub should trigger its poll a few times during this period // each time it should continue processing each event until it reaches one that is too new to process func(t testing.TB, ic in.TestContext) { - for i := 0; i < 60; i++ { + for i := 0; i < 120; i++ { in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic) - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 25) } }, @@ -61,22 +61,48 @@ func TestPubSubConsumptionDelay(t *testing.T) { // Get all event created ats, and all async call created ats // Compare each, make sure none are less than 0.2s of each other in.QueryRow("ftl", ` - WITH event_times AS ( - SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num - FROM ( - select * from topic_events order by created_at + WITH event_times AS ( + SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num + FROM ( + select * from topic_events order by created_at + ) + ), + async_call_times AS ( + SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num + FROM ( + select * from async_calls ac order by created_at + ) ) - ), - async_call_times AS ( - SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num - FROM ( - select * from async_calls ac order by created_at - ) - ) - SELECT COUNT(*) - FROM event_times - JOIN async_call_times ON event_times.row_num = async_call_times.row_num - WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2; + SELECT COUNT(*) + FROM event_times + JOIN async_call_times ON event_times.row_num = async_call_times.row_num + WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2; `, 0), ) } + +func TestRetry(t *testing.T) { + retriesPerCall := 2 + in.Run(t, "", + in.CopyModule("publisher"), + in.CopyModule("subscriber"), + in.Deploy("publisher"), + in.Deploy("subscriber"), + + // publish events + in.Call("publisher", "publishOneToTopic2", in.Obj{}, func(t testing.TB, resp in.Obj) {}), + + in.Sleep(time.Second*6), + + // check that there are the right amount of failed async calls + in.QueryRow("ftl", + fmt.Sprintf(` + SELECT COUNT(*) + FROM async_calls + WHERE + state = 'error' + AND origin = '%s' + `, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomed_subscription"}}.String()), + 1+retriesPerCall), + ) +} diff --git a/backend/controller/pubsub/testdata/go/publisher/publisher.go b/backend/controller/pubsub/testdata/go/publisher/publisher.go index 1f03cb5a38..62d4872514 100644 --- a/backend/controller/pubsub/testdata/go/publisher/publisher.go +++ b/backend/controller/pubsub/testdata/go/publisher/publisher.go @@ -25,7 +25,6 @@ func PublishTen(ctx context.Context) error { if err != nil { return err } - time.Sleep(time.Microsecond * 20) } return nil } @@ -33,14 +32,18 @@ func PublishTen(ctx context.Context) error { //ftl:verb func PublishOne(ctx context.Context) error { logger := ftl.LoggerFromContext(ctx) - for i := 0; i < 10; i++ { - t := time.Now() - logger.Infof("Publishing %v", t) - err := topic.Publish(ctx, PubSubEvent{Time: t}) - if err != nil { - return err - } - time.Sleep(time.Microsecond * 20) - } - return nil + t := time.Now() + logger.Infof("Publishing %v", t) + return topic.Publish(ctx, PubSubEvent{Time: t}) +} + +//ftl:export +var topic2 = ftl.Topic[PubSubEvent]("topic2") + +//ftl:verb +func PublishOneToTopic2(ctx context.Context) error { + logger := ftl.LoggerFromContext(ctx) + t := time.Now() + logger.Infof("Publishing to topic2 %v", t) + return topic2.Publish(ctx, PubSubEvent{Time: t}) } diff --git a/backend/controller/pubsub/testdata/go/subscriber/subscriber.go b/backend/controller/pubsub/testdata/go/subscriber/subscriber.go index da1a72d904..70a93211c0 100644 --- a/backend/controller/pubsub/testdata/go/subscriber/subscriber.go +++ b/backend/controller/pubsub/testdata/go/subscriber/subscriber.go @@ -2,6 +2,7 @@ package subscriber import ( "context" + "fmt" "ftl/publisher" "github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK. @@ -12,7 +13,15 @@ var _ = ftl.Subscription(publisher.Test_topic, "test_subscription") //ftl:verb //ftl:subscribe test_subscription func Consume(ctx context.Context, req publisher.PubSubEvent) error { - logger := ftl.LoggerFromContext(ctx) - logger.Infof("Subscriber is processing %v", req.Time) + ftl.LoggerFromContext(ctx).Infof("Subscriber is consuming %v", req.Time) return nil } + +var _ = ftl.Subscription(publisher.Topic2, "doomed_subscription") + +//ftl:verb +//ftl:subscribe doomed_subscription +//ftl:retry 2 1s 1s +func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error { + return fmt.Errorf("always error: event %v", req.Time) +} diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 0b7a9ce00b..ca7d34f139 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -522,6 +522,9 @@ type TopicSubscriber struct { TopicSubscriptionsID int64 DeploymentID int64 Sink schema.RefKey + RetryAttempts int32 + Backoff time.Duration + MaxBackoff time.Duration } type TopicSubscription struct { diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index bd882760be..4b95dea61e 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -65,7 +65,7 @@ type Querier interface { GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) GetNextEventForSubscription(ctx context.Context, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) - GetRandomSubscriberSink(ctx context.Context, key model.SubscriptionKey) (schema.RefKey, error) + GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) // Retrieve routing information for a runner. GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error) GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 542c223c79..f6ec1fcc90 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -646,7 +646,15 @@ UPDATE SET RETURNING id; -- name: InsertSubscriber :exec -INSERT INTO topic_subscribers (key, topic_subscriptions_id, deployment_id, sink) +INSERT INTO topic_subscribers ( + key, + topic_subscriptions_id, + deployment_id, + sink, + retry_attempts, + backoff, + max_backoff +) VALUES ( sqlc.arg('key')::subscriber_key, ( @@ -657,7 +665,11 @@ VALUES ( AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT ), (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key), - sqlc.arg('sink')); + sqlc.arg('sink'), + sqlc.arg('retry_attempts'), + sqlc.arg('backoff')::interval, + sqlc.arg('max_backoff')::interval +); -- name: PublishEventForTopic :exec INSERT INTO topic_events ( @@ -713,9 +725,12 @@ WHERE topics.key = sqlc.arg('topic')::topic_key ORDER BY events.created_at, events.id LIMIT 1; --- name: GetRandomSubscriberSink :one +-- name: GetRandomSubscriber :one SELECT - subscribers.sink as sink + subscribers.sink as sink, + subscribers.retry_attempts as retry_attempts, + subscribers.backoff as backoff, + subscribers.max_backoff as max_backoff FROM topic_subscribers as subscribers JOIN deployments ON subscribers.deployment_id = deployments.id JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 2c6e7cd65c..c7374fbc4b 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -1264,9 +1264,12 @@ func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, erro return items, nil } -const getRandomSubscriberSink = `-- name: GetRandomSubscriberSink :one +const getRandomSubscriber = `-- name: GetRandomSubscriber :one SELECT - subscribers.sink as sink + subscribers.sink as sink, + subscribers.retry_attempts as retry_attempts, + subscribers.backoff as backoff, + subscribers.max_backoff as max_backoff FROM topic_subscribers as subscribers JOIN deployments ON subscribers.deployment_id = deployments.id JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id @@ -1276,11 +1279,23 @@ ORDER BY RANDOM() LIMIT 1 ` -func (q *Queries) GetRandomSubscriberSink(ctx context.Context, key model.SubscriptionKey) (schema.RefKey, error) { - row := q.db.QueryRow(ctx, getRandomSubscriberSink, key) - var sink schema.RefKey - err := row.Scan(&sink) - return sink, err +type GetRandomSubscriberRow struct { + Sink schema.RefKey + RetryAttempts int32 + Backoff time.Duration + MaxBackoff time.Duration +} + +func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) { + row := q.db.QueryRow(ctx, getRandomSubscriber, key) + var i GetRandomSubscriberRow + err := row.Scan( + &i.Sink, + &i.RetryAttempts, + &i.Backoff, + &i.MaxBackoff, + ) + return i, err } const getRouteForRunner = `-- name: GetRouteForRunner :one @@ -1770,7 +1785,15 @@ func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) } const insertSubscriber = `-- name: InsertSubscriber :exec -INSERT INTO topic_subscribers (key, topic_subscriptions_id, deployment_id, sink) +INSERT INTO topic_subscribers ( + key, + topic_subscriptions_id, + deployment_id, + sink, + retry_attempts, + backoff, + max_backoff +) VALUES ( $1::subscriber_key, ( @@ -1781,7 +1804,11 @@ VALUES ( AND topic_subscriptions.name = $3::TEXT ), (SELECT id FROM deployments WHERE key = $4::deployment_key), - $5) + $5, + $6, + $7::interval, + $8::interval +) ` type InsertSubscriberParams struct { @@ -1790,6 +1817,9 @@ type InsertSubscriberParams struct { SubscriptionName string Deployment model.DeploymentKey Sink schema.RefKey + RetryAttempts int32 + Backoff time.Duration + MaxBackoff time.Duration } func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { @@ -1799,6 +1829,9 @@ func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberPara arg.SubscriptionName, arg.Deployment, arg.Sink, + arg.RetryAttempts, + arg.Backoff, + arg.MaxBackoff, ) return err } diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index e5d701cae2..45c95c1349 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -416,15 +416,18 @@ CREATE DOMAIN subscriber_key AS TEXT; -- -- A subscriber is a 1:1 mapping between a subscription and a sink. CREATE TABLE topic_subscribers ( - id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - "key" subscriber_key UNIQUE NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), - + id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + "key" subscriber_key UNIQUE NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), topic_subscriptions_id BIGINT NOT NULL REFERENCES topic_subscriptions(id), - - deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, + deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, -- Name of the verb to call on the deployment. - sink schema_ref NOT NULL + sink schema_ref NOT NULL, + + -- retry options + retry_attempts INT NOT NULL, + backoff INTERVAL NOT NULL, + max_backoff INTERVAL NOT NULL ); CREATE INDEX topic_subscribers_subscription_idx ON topic_subscribers (topic_subscriptions_id); diff --git a/backend/schema/schema_test.go b/backend/schema/schema_test.go index 8068765e3e..c227946b2d 100644 --- a/backend/schema/schema_test.go +++ b/backend/schema/schema_test.go @@ -535,6 +535,7 @@ func TestParsing(t *testing.T) { verb consumesBothASubs(test.eventA) Unit +subscribe subA1 +subscribe subA2 + +retry 1m5s 1h } `, expected: &Schema{ @@ -630,6 +631,10 @@ func TestParsing(t *testing.T) { &MetadataSubscriber{ Name: "subA2", }, + &MetadataRetry{ + MinBackoff: "1m5s", + MaxBackoff: "1h", + }, }, }, }}, diff --git a/backend/schema/validate.go b/backend/schema/validate.go index b8c799d5a4..a9999da7d8 100644 --- a/backend/schema/validate.go +++ b/backend/schema/validate.go @@ -550,24 +550,25 @@ func validateVerbMetadata(scopes Scopes, module *Module, n *Verb) (merr []error) } case *MetadataRetry: // Only allow retries on FSM transitions for now - fsms := islices.Filter(module.Decls, func(d Decl) bool { - fsm, ok := d.(*FSM) - if !ok { - return false - } - starts := islices.Filter(fsm.Start, func(ref *Ref) bool { - return ref.Name == n.Name - }) - if len(starts) > 0 { - return true + _, isPartOfFSM := islices.Find(module.Decls, func(d Decl) bool { + if d, ok := d.(*FSM); ok { + // check if this verb part of the FSM + if _, isStart := islices.Find(d.Start, func(ref *Ref) bool { + return ref.Name == n.Name + }); isStart { + return true + } + if _, isTransition := islices.Find(d.Transitions, func(t *FSMTransition) bool { + return t.To.Name == n.Name + }); isTransition { + return true + } } - transitions := islices.Filter(fsm.Transitions, func(t *FSMTransition) bool { - return t.To.Name == n.Name - }) - return len(transitions) > 0 + return false }) - if len(fsms) == 0 { - merr = append(merr, errorf(md, "verb %s: retries can only be added to FSM transitions", n.Name)) + _, isSubscriber := islices.FindVariant[*MetadataSubscriber](n.Metadata) + if !isPartOfFSM && !isSubscriber { + merr = append(merr, errorf(md, `verb %s: retries can only be added to subscribers or FSM transitions`, n.Name)) return } diff --git a/backend/schema/validate_test.go b/backend/schema/validate_test.go index a39cd92717..712e247996 100644 --- a/backend/schema/validate_test.go +++ b/backend/schema/validate_test.go @@ -308,8 +308,8 @@ func TestValidate(t *testing.T) { } `, errs: []string{ - `4:7-7: verb A: retries can only be added to FSM transitions`, - `6:7-7: verb B: retries can only be added to FSM transitions`, + `4:7-7: verb A: retries can only be added to subscribers or FSM transitions`, + `6:7-7: verb B: retries can only be added to subscribers or FSM transitions`, }, }, {name: "InvalidRetryDurations", diff --git a/go-runtime/compile/schema_test.go b/go-runtime/compile/schema_test.go index 6edda3ee57..5fd147aac4 100644 --- a/go-runtime/compile/schema_test.go +++ b/go-runtime/compile/schema_test.go @@ -372,6 +372,7 @@ func TestExtractModulePubSub(t *testing.T) { verb processBroadcast(pubsub.PayinEvent) Unit +subscribe broadcastSubscription + +retry 10 1s verb processPayin(pubsub.PayinEvent) Unit +subscribe paymentProcessing diff --git a/go-runtime/compile/testdata/pubsub/pubsub.go b/go-runtime/compile/testdata/pubsub/pubsub.go index 0122811c4a..c79e07bc97 100644 --- a/go-runtime/compile/testdata/pubsub/pubsub.go +++ b/go-runtime/compile/testdata/pubsub/pubsub.go @@ -47,6 +47,7 @@ func Broadcast(ctx context.Context) error { } //ftl:subscribe broadcastSubscription +//ftl:retry 10 1s func ProcessBroadcast(ctx context.Context, event PayinEvent) error { logger := ftl.LoggerFromContext(ctx) logger.Infof("Received broadcast event: %v", event) diff --git a/go-runtime/ftl/testdata/go/mapper/go.mod b/go-runtime/ftl/testdata/go/mapper/go.mod index dcebe302bb..792b2005dd 100644 --- a/go-runtime/ftl/testdata/go/mapper/go.mod +++ b/go-runtime/ftl/testdata/go/mapper/go.mod @@ -4,28 +4,46 @@ go 1.22.2 toolchain go1.22.3 -require github.com/TBD54566975/ftl v1.1.0 +require ( + github.com/TBD54566975/ftl v1.1.0 + github.com/alecthomas/assert/v2 v2.10.0 +) require ( connectrpc.com/connect v1.16.1 // indirect connectrpc.com/grpcreflect v1.2.0 // indirect connectrpc.com/otelconnect v0.7.0 // indirect + github.com/BurntSushi/toml v1.4.0 // indirect + github.com/TBD54566975/scaffolder v1.0.0 // indirect github.com/alecthomas/concurrency v0.0.2 // indirect github.com/alecthomas/participle/v2 v2.1.1 // indirect + github.com/alecthomas/repr v0.4.0 // indirect github.com/alecthomas/types v0.16.0 // indirect github.com/alessio/shellescape v1.4.2 // indirect + github.com/amacneil/dbmate/v2 v2.16.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.27.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect + github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 // indirect + github.com/aws/smithy-go v1.20.2 // indirect github.com/danieljoos/wincred v1.2.0 // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hexops/gotextdiff v1.0.3 // indirect + github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgx/v5 v5.6.0 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect github.com/swaggest/jsonschema-go v0.3.70 // indirect github.com/swaggest/refl v1.3.0 // indirect github.com/zalando/go-keyring v0.2.4 // indirect diff --git a/go-runtime/ftl/testdata/go/mapper/go.sum b/go-runtime/ftl/testdata/go/mapper/go.sum index d823b5af4a..35a4c8928d 100644 --- a/go-runtime/ftl/testdata/go/mapper/go.sum +++ b/go-runtime/ftl/testdata/go/mapper/go.sum @@ -4,6 +4,12 @@ connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY= connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/TBD54566975/scaffolder v1.0.0 h1:QUFSy2wVzumLDg7IHcKC6AP+IYyqWe9Wxiu72nZn5qU= +github.com/TBD54566975/scaffolder v1.0.0/go.mod h1:auVpczIbOAdIhYDVSruIw41DanxOKB9bSvjf6MEl7Fs= github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/concurrency v0.0.2 h1:Q3kGPtLbleMbH9lHX5OBFvJygfyFw29bXZKBg+IEVuo= @@ -16,6 +22,36 @@ github.com/alecthomas/types v0.16.0 h1:o9+JSwCRB6DDaWDeR/Mg7v/zh3R+MlknM6DrnDyY7 github.com/alecthomas/types v0.16.0/go.mod h1:Tswm0qQpjpVq8rn70OquRsUtFxbQKub/8TMyYYGI0+k= github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4uEoM0= github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= +github.com/amacneil/dbmate/v2 v2.16.0 h1:uIah9qFOxA9cIyS1TuPMUgn0KVnwRRSx0MhKu7sFpcI= +github.com/amacneil/dbmate/v2 v2.16.0/go.mod h1:TlxMqFDBxsLVertnF2cAxZxPyt9UjlCrt04iy1UxRXc= +github.com/aws/aws-sdk-go-v2 v1.27.0 h1:7bZWKoXhzI+mMR/HjdMx8ZCC5+6fY0lS5tr0bbgiLlo= +github.com/aws/aws-sdk-go-v2 v1.27.0/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.16 h1:knpCuH7laFVGYTNd99Ns5t+8PuRjDn4HnnZK48csipM= +github.com/aws/aws-sdk-go-v2/config v1.27.16/go.mod h1:vutqgRhDUktwSge3hrC3nkuirzkJ4E/mLj5GvI0BQas= +github.com/aws/aws-sdk-go-v2/credentials v1.17.16 h1:7d2QxY83uYl0l58ceyiSpxg9bSbStqBC6BeEeHEchwo= +github.com/aws/aws-sdk-go-v2/credentials v1.17.16/go.mod h1:Ae6li/6Yc6eMzysRL2BXlPYvnrLLBg3D11/AmOjw50k= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3 h1:dQLK4TjtnlRGb0czOht2CevZ5l6RSyRWAnKeGd7VAFE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3/go.mod h1:TL79f2P6+8Q7dTsILpiVST+AL9lkF6PPGI167Ny0Cjw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 h1:lf/8VTF2cM+N4SLzaYJERKEWAXq8MOMpZfU6wEPWsPk= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7/go.mod h1:4SjkU7QiqK2M9oozyMzfZ/23LmUY+h3oFqhdeP5OMiI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 h1:4OYVp0705xu8yjdyoWix0r9wPIRXnIzzOoUpQVHIJ/g= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7/go.mod h1:vd7ESTEvI76T2Na050gODNmNU7+OyKrIKroYTu4ABiI= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 h1:Wx0rlZoEJR7JwlSZcHnEa7CNjrSIyVxMFWGAaXy4fJY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9/go.mod h1:aVMHdE0aHO3v+f/iw01fmXV/5DbfQ3Bi9nN7nd9bE9Y= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1 h1:NSWsFzdHN41mJ5I/DOFzxgkKSYNHQADHn7Mu+lU/AKw= +github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.29.1/go.mod h1:5mMk0DgUgaHlcqtN65fNyZI0ZDX3i9Cw+nwq75HKB3U= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 h1:aD7AGQhvPuAxlSUfo0CWU7s6FpkbyykMhGYMvlqTjVs= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.9/go.mod h1:c1qtZUWtygI6ZdvKppzCSXsDOq5I4luJPZ0Ud3juFCA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 h1:Pav5q3cA260Zqez42T9UhIlsd9QeypszRPwC9LdSSsQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3/go.mod h1:9lmoVDVLz/yUZwLaQ676TK02fhCu4+PgRSmMaKR1ozk= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 h1:69tpbPED7jKPyzMcrwSvhWcJ9bPnZsZs18NT40JwM0g= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.10/go.mod h1:0Aqn1MnEuitqfsCNyKsdKLhDUOr4txD/g19EfiUqgws= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/bool64/dev v0.2.34 h1:P9n315P8LdpxusnYQ0X7MP1CZXwBK5ae5RZrd+GdSZE= github.com/bool64/dev v0.2.34/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= @@ -25,6 +61,8 @@ github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0S github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -32,8 +70,12 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= +github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -44,6 +86,8 @@ github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUq github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= @@ -56,14 +100,22 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= +github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= +github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= @@ -89,6 +141,8 @@ github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3Ifn github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= github.com/zalando/go-keyring v0.2.4 h1:wi2xxTqdiwMKbM6TWwi+uJCG/Tum2UV0jqaQhCa9/68= github.com/zalando/go-keyring v0.2.4/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk= +github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w= +github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4= go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=