diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 6d5ba476f3..84ac0e87e7 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -298,6 +298,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling svc.tasks.Singleton(maybeDevelTask(svc.releaseExpiredReservations, time.Second*2, time.Second, time.Second*20)) svc.tasks.Singleton(maybeDevelTask(svc.reconcileDeployments, time.Second*2, time.Second, time.Second*5)) svc.tasks.Singleton(maybeDevelTask(svc.reconcileRunners, time.Second*2, time.Second, time.Second*5)) + svc.tasks.Singleton(maybeDevelTask(svc.reapAsyncCalls, time.Second*5, time.Second, time.Second*5)) return svc, nil } @@ -1401,7 +1402,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration logger := log.FromContext(ctx) logger.Tracef("Acquiring async call") - call, err := s.dal.AcquireAsyncCall(ctx) + call, leaseCtx, err := s.dal.AcquireAsyncCall(ctx) if errors.Is(err, dalerrs.ErrNotFound) { logger.Tracef("No async calls to execute") return time.Second * 2, nil @@ -1413,6 +1414,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration } return 0, err } + // use originalCtx for things that should are done outside of the lease lifespan + originalCtx := ctx + ctx = leaseCtx // Extract the otel context from the call ctx, err = observability.ExtractTraceContextToContext(ctx, call.TraceContext) @@ -1444,7 +1448,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration break case dal.AsyncOriginPubSub: - go s.pubSub.AsyncCallDidCommit(ctx, origin) + go s.pubSub.AsyncCallDidCommit(originalCtx, origin) default: break @@ -1576,6 +1580,36 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call * return nil } +// fails async calls that have had their leases reaped +func (s *Service) reapAsyncCalls(ctx context.Context) (nextInterval time.Duration, err error) { + tx, err := s.dal.Begin(ctx) + if err != nil { + return 0, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err)) + } + defer tx.CommitOrRollback(ctx, &err) + + limit := 20 + calls, err := tx.GetZombieAsyncCalls(ctx, 20) + if err != nil { + return 0, fmt.Errorf("failed to get zombie async calls: %w", err) + } + for _, call := range calls { + callResult := either.RightOf[[]byte]("async call lease expired") + _, err := tx.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.DAL, isFinalResult bool) error { + return s.finaliseAsyncCall(ctx, tx, call, callResult, isFinalResult) + }) + if err != nil { + return 0, fmt.Errorf("failed to complete zombie async call: %w", err) + } + observability.AsyncCalls.Executed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, true, optional.Some("async call lease failed")) + } + + if len(calls) == limit { + return 0, nil + } + return time.Second * 5, nil +} + func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata { switch origin := call.Origin.(type) { case dal.AsyncOriginCron: diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index d97583c212..98132b604e 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -56,7 +56,7 @@ func TestNewCronJobsForModule(t *testing.T) { assert.Equal(t, len(unscheduledJobs), 2) // No async calls yet - _, err = parentDAL.AcquireAsyncCall(ctx) + _, _, err = parentDAL.AcquireAsyncCall(ctx) assert.IsError(t, err, dalerrs.ErrNotFound) assert.EqualError(t, err, "no pending async calls: not found") @@ -76,7 +76,7 @@ func TestNewCronJobsForModule(t *testing.T) { // Now there should be async calls calls := []*parentdal.AsyncCall{} for i, job := range jobsToCreate { - call, err := parentDAL.AcquireAsyncCall(ctx) + call, _, err := parentDAL.AcquireAsyncCall(ctx) assert.NoError(t, err) assert.Equal(t, call.Verb, job.Verb.ToRefKey()) assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key)) @@ -110,7 +110,7 @@ func TestNewCronJobsForModule(t *testing.T) { } expectUnscheduledJobs(t, dal, clk, 0) for i, job := range jobsToCreate { - call, err := parentDAL.AcquireAsyncCall(ctx) + call, _, err := parentDAL.AcquireAsyncCall(ctx) assert.NoError(t, err) assert.Equal(t, call.Verb, job.Verb.ToRefKey()) assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key)) diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index d806974d74..e9134d5d60 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -2,6 +2,7 @@ package dal import ( "context" + dbsql "database/sql" "errors" "fmt" "time" @@ -116,10 +117,10 @@ type AsyncCall struct { // AcquireAsyncCall acquires a pending async call to execute. // // Returns ErrNotFound if there are no async calls to acquire. -func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error) { +func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx context.Context, err error) { tx, err := d.Begin(ctx) if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) + return nil, ctx, fmt.Errorf("failed to begin transaction: %w", err) } defer tx.CommitOrRollback(ctx, &err) @@ -128,21 +129,21 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error) if err != nil { err = dalerrs.TranslatePGError(err) if errors.Is(err, dalerrs.ErrNotFound) { - return nil, fmt.Errorf("no pending async calls: %w", dalerrs.ErrNotFound) + return nil, ctx, fmt.Errorf("no pending async calls: %w", dalerrs.ErrNotFound) } - return nil, fmt.Errorf("failed to acquire async call: %w", err) + return nil, ctx, fmt.Errorf("failed to acquire async call: %w", err) } origin, err := ParseAsyncOrigin(row.Origin) if err != nil { - return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) + return nil, ctx, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } decryptedRequest, err := d.decrypt(&row.Request) if err != nil { - return nil, fmt.Errorf("failed to decrypt async call request: %w", err) + return nil, ctx, fmt.Errorf("failed to decrypt async call request: %w", err) } - lease, _ := d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl) + lease, leaseCtx := d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl) return &AsyncCall{ ID: row.AsyncCallID, Verb: row.Verb, @@ -159,10 +160,11 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error) Backoff: time.Duration(row.Backoff), MaxBackoff: time.Duration(row.MaxBackoff), Catching: row.Catching, - }, nil + }, leaseCtx, nil } // CompleteAsyncCall completes an async call. +// The call will use the existing transaction if d is a transaction. Otherwise it will create and commit a new transaction. // // "result" is either a []byte representing the successful response, or a string // representing a failure message. @@ -170,18 +172,26 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], finalise func(tx *DAL, isFinalResult bool) error) (didScheduleAnotherCall bool, err error) { - tx, err := d.Begin(ctx) - if err != nil { - return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck + var tx *DAL + switch d.Connection.(type) { + case *dbsql.DB: + tx, err = d.Begin(ctx) + if err != nil { + return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck + } + defer tx.CommitOrRollback(ctx, &err) + case *dbsql.Tx: + tx = d + default: + return false, errors.New("invalid connection type") } - defer tx.CommitOrRollback(ctx, &err) isFinalResult := true didScheduleAnotherCall = false switch result := result.(type) { case either.Left[[]byte, string]: // Successful response. var encryptedResult encryption.EncryptedAsyncColumn - err := d.encrypt(result.Get(), &encryptedResult) + err := tx.encrypt(result.Get(), &encryptedResult) if err != nil { return false, fmt.Errorf("failed to encrypt async call result: %w", err) } @@ -192,7 +202,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, case either.Right[[]byte, string]: // Failure message. if call.RemainingAttempts > 0 { - _, err = d.db.FailAsyncCallWithRetry(ctx, sql.FailAsyncCallWithRetryParams{ + _, err = tx.db.FailAsyncCallWithRetry(ctx, sql.FailAsyncCallWithRetryParams{ ID: call.ID, Error: result.Get(), RemainingAttempts: call.RemainingAttempts - 1, @@ -213,7 +223,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, if call.Catching { scheduledAt = scheduledAt.Add(call.Backoff) } - _, err = d.db.FailAsyncCallWithRetry(ctx, sql.FailAsyncCallWithRetryParams{ + _, err = tx.db.FailAsyncCallWithRetry(ctx, sql.FailAsyncCallWithRetryParams{ ID: call.ID, Error: result.Get(), RemainingAttempts: 0, @@ -261,3 +271,37 @@ func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) { Request: request, }, nil } + +func (d *DAL) GetZombieAsyncCalls(ctx context.Context, limit int) ([]*AsyncCall, error) { + rows, err := d.db.GetZombieAsyncCalls(ctx, int32(limit)) + if err != nil { + return nil, dalerrs.TranslatePGError(err) + } + var calls []*AsyncCall + for _, row := range rows { + origin, err := ParseAsyncOrigin(row.Origin) + if err != nil { + return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) + } + decryptedRequest, err := d.decrypt(&row.Request) + if err != nil { + return nil, fmt.Errorf("failed to decrypt async call request: %w", err) + } + calls = append(calls, &AsyncCall{ + ID: row.ID, + Origin: origin, + ScheduledAt: row.ScheduledAt, + Verb: row.Verb, + CatchVerb: row.CatchVerb, + Request: decryptedRequest, + ParentRequestKey: row.ParentRequestKey, + TraceContext: row.TraceContext.RawMessage, + Error: row.Error, + RemainingAttempts: row.RemainingAttempts, + Backoff: time.Duration(row.Backoff), + MaxBackoff: time.Duration(row.MaxBackoff), + Catching: row.Catching, + }) + } + return calls, nil +} diff --git a/backend/controller/dal/async_calls_test.go b/backend/controller/dal/async_calls_test.go index b154e8da0b..0467d26433 100644 --- a/backend/controller/dal/async_calls_test.go +++ b/backend/controller/dal/async_calls_test.go @@ -20,7 +20,7 @@ func TestNoCallToAcquire(t *testing.T) { dal, err := New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - _, err = dal.AcquireAsyncCall(ctx) + _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, dalerrs.ErrNotFound) assert.EqualError(t, err, "no pending async calls: not found") } diff --git a/backend/controller/dal/fsm_test.go b/backend/controller/dal/fsm_test.go index cb12000118..a51bc58814 100644 --- a/backend/controller/dal/fsm_test.go +++ b/backend/controller/dal/fsm_test.go @@ -21,7 +21,7 @@ func TestSendFSMEvent(t *testing.T) { dal, err := New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - _, err = dal.AcquireAsyncCall(ctx) + _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, dalerrs.ErrNotFound) ref := schema.RefKey{Module: "module", Name: "verb"} @@ -32,7 +32,7 @@ func TestSendFSMEvent(t *testing.T) { assert.IsError(t, err, dalerrs.ErrConflict) assert.EqualError(t, err, "transition already executing: conflict") - call, err := dal.AcquireAsyncCall(ctx) + call, _, err := dal.AcquireAsyncCall(ctx) assert.NoError(t, err) t.Cleanup(func() { err := call.Lease.Release() diff --git a/backend/controller/pubsub/integration_test.go b/backend/controller/pubsub/integration_test.go index 9027d07345..508bb588b9 100644 --- a/backend/controller/pubsub/integration_test.go +++ b/backend/controller/pubsub/integration_test.go @@ -4,6 +4,7 @@ package pubsub import ( "fmt" + "path/filepath" "testing" "time" @@ -169,3 +170,43 @@ func TestExternalPublishRuntimeCheck(t *testing.T) { ), ) } + +func TestLeaseFailure(t *testing.T) { + logFilePath := filepath.Join(t.TempDir(), "pubsub.log") + t.Setenv("FSM_LOG_FILE", logFilePath) + + in.Run(t, + in.CopyModule("slow"), + in.Deploy("slow"), + + // publish 2 events, with the first taking a long time to consume + in.Call("slow", "publish", in.Obj{ + "durations": []int{20, 1}, + }, func(t testing.TB, resp in.Obj) {}), + + // while it is consuming the first event, force delete the lease in the db + in.QueryRow("ftl", ` + WITH deleted_rows AS ( + DELETE FROM leases WHERE id = ( + SELECT lease_id FROM async_calls WHERE verb = 'slow.consume' + ) + RETURNING * + ) + SELECT COUNT(*) FROM deleted_rows; + `, 1), + + in.Sleep(time.Second*7), + + // confirm that the first event failed and the second event succeeded, + in.QueryRow("ftl", `SELECT state, error FROM async_calls WHERE verb = 'slow.consume' ORDER BY created_at`, "error", "async call lease expired"), + in.QueryRow("ftl", `SELECT state, error FROM async_calls WHERE verb = 'slow.consume' ORDER BY created_at OFFSET 1`, "success", nil), + + // confirm that the first call did not keep executing for too long after the lease was expired + in.IfLanguage("go", + in.ExpectError( + in.FileContains(logFilePath, "slept for 5s"), + "Haystack does not contain needle", + ), + ), + ) +} diff --git a/backend/controller/pubsub/testdata/go/slow/ftl.toml b/backend/controller/pubsub/testdata/go/slow/ftl.toml new file mode 100644 index 0000000000..3eaafd974c --- /dev/null +++ b/backend/controller/pubsub/testdata/go/slow/ftl.toml @@ -0,0 +1,2 @@ +module = "slow" +language = "go" diff --git a/backend/controller/pubsub/testdata/go/slow/go.mod b/backend/controller/pubsub/testdata/go/slow/go.mod new file mode 100644 index 0000000000..73dd794dff --- /dev/null +++ b/backend/controller/pubsub/testdata/go/slow/go.mod @@ -0,0 +1,48 @@ +module ftl/slow + +go 1.23.0 + +require github.com/TBD54566975/ftl v1.1.5 + +require ( + connectrpc.com/connect v1.16.2 // indirect + connectrpc.com/grpcreflect v1.2.0 // indirect + connectrpc.com/otelconnect v0.7.1 // indirect + github.com/alecthomas/atomic v0.1.0-alpha2 // indirect + github.com/alecthomas/concurrency v0.0.2 // indirect + github.com/alecthomas/participle/v2 v2.1.1 // indirect + github.com/alecthomas/types v0.16.0 // indirect + github.com/alessio/shellescape v1.4.2 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect + github.com/danieljoos/wincred v1.2.0 // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/hashicorp/cronexpr v1.1.2 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jpillora/backoff v1.0.0 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect + github.com/swaggest/jsonschema-go v0.3.72 // indirect + github.com/swaggest/refl v1.3.0 // indirect + github.com/zalando/go-keyring v0.2.5 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect + golang.org/x/mod v0.20.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) + +replace github.com/TBD54566975/ftl => ../../../../../.. diff --git a/backend/controller/pubsub/testdata/go/slow/go.sum b/backend/controller/pubsub/testdata/go/slow/go.sum new file mode 100644 index 0000000000..e0128d507c --- /dev/null +++ b/backend/controller/pubsub/testdata/go/slow/go.sum @@ -0,0 +1,152 @@ +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= +connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= +connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY= +connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= +github.com/TBD54566975/scaffolder v1.1.0 h1:R92zjC4XiS/lGCxJ8Ebn93g8gC0LU9qo06AAKo9cEJE= +github.com/TBD54566975/scaffolder v1.1.0/go.mod h1:dRi67GryEhZ5u0XRSiR294SYaqAfnCkZ7u3rmc4W6iI= +github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= +github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/atomic v0.1.0-alpha2 h1:dqwXmax66gXvHhsOS4pGPZKqYOlTkapELkLb3MNdlH8= +github.com/alecthomas/atomic v0.1.0-alpha2/go.mod h1:zD6QGEyw49HIq19caJDc2NMXAy8rNi9ROrxtMXATfyI= +github.com/alecthomas/concurrency v0.0.2 h1:Q3kGPtLbleMbH9lHX5OBFvJygfyFw29bXZKBg+IEVuo= +github.com/alecthomas/concurrency v0.0.2/go.mod h1:GmuQb/iHX7mbNtPlC/WDzEFxDMB0HYFer2Qda9QTs7w= +github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8= +github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/alecthomas/types v0.16.0 h1:o9+JSwCRB6DDaWDeR/Mg7v/zh3R+MlknM6DrnDyY7U0= +github.com/alecthomas/types v0.16.0/go.mod h1:Tswm0qQpjpVq8rn70OquRsUtFxbQKub/8TMyYYGI0+k= +github.com/alessio/shellescape v1.4.2 h1:MHPfaU+ddJ0/bYWpgIeUnQUqKrlJ1S7BfEYPM4uEoM0= +github.com/alessio/shellescape v1.4.2/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/bool64/dev v0.2.35 h1:M17TLsO/pV2J7PYI/gpe3Ua26ETkzZGb+dC06eoMqlk= +github.com/bool64/dev v0.2.35/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/bool64/shared v0.1.5 h1:fp3eUhBsrSjNCQPcSdQqZxxh9bBwrYiZ+zOKFkM0/2E= +github.com/bool64/shared v0.1.5/go.mod h1:081yz68YC9jeFB3+Bbmno2RFWvGKv1lPKkMP6MHJlPs= +github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE= +github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0SkRa/eYHgec= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= +github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= +github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= +github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= +github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= +github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= +github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7o2xQ= +github.com/swaggest/assertjson v1.9.0/go.mod h1:b+ZKX2VRiUjxfUIal0HDN85W0nHPAYUbYH5WkkSsFsU= +github.com/swaggest/jsonschema-go v0.3.72 h1:IHaGlR1bdBUBPfhe4tfacN2TGAPKENEGiNyNzvnVHv4= +github.com/swaggest/jsonschema-go v0.3.72/go.mod h1:OrGyEoVqpfSFJ4Am4V/FQcQ3mlEC1vVeleA+5ggbVW4= +github.com/swaggest/refl v1.3.0 h1:PEUWIku+ZznYfsoyheF97ypSduvMApYyGkYF3nabS0I= +github.com/swaggest/refl v1.3.0/go.mod h1:3Ujvbmh1pfSbDYjC6JGG7nMgPvpG0ehQL4iNonnLNbg= +github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= +github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= +github.com/zalando/go-keyring v0.2.5 h1:Bc2HHpjALryKD62ppdEzaFG6VxL6Bc+5v0LYpN8Lba8= +github.com/zalando/go-keyring v0.2.5/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/sdk/metric v1.29.0 h1:K2CfmJohnRgvZ9UAj2/FhIf/okdWcNdBwe1m8xFXiSY= +go.opentelemetry.io/otel/sdk/metric v1.29.0/go.mod h1:6zZLdCl2fkauYoZIOn/soQIDSWFmNSRcICarHfuhNJQ= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa h1:ELnwvuAXPNtPk1TJRuGkI9fDTwym6AYBu0qzT8AcHdI= +golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= +golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= +modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/sqlite v1.32.0 h1:6BM4uGza7bWypsw4fdLRsLxut6bHe4c58VeqjRgST8s= +modernc.org/sqlite v1.32.0/go.mod h1:UqoylwmTb9F+IqXERT8bW9zzOWN8qwAIcLdzeBZs4hA= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/backend/controller/pubsub/testdata/go/slow/slow.go b/backend/controller/pubsub/testdata/go/slow/slow.go new file mode 100644 index 0000000000..fb1218d616 --- /dev/null +++ b/backend/controller/pubsub/testdata/go/slow/slow.go @@ -0,0 +1,62 @@ +package slow + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK. +) + +var Topic = ftl.Topic[Event]("topic") +var _ = ftl.Subscription(Topic, "slowSubscription") + +type Event struct { + Duration int +} + +type PublishRequest struct { + Durations []int +} + +//ftl:verb +func Publish(ctx context.Context, req PublishRequest) error { + for _, duration := range req.Durations { + err := Topic.Publish(ctx, Event{Duration: duration}) + if err != nil { + return err + } + } + return nil +} + +//ftl:verb +//ftl:subscribe slowSubscription +func Consume(ctx context.Context, event Event) error { + for i := range event.Duration { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + appendLog("slept for %ds", i+1) + } + } + return nil +} + +func appendLog(msg string, args ...interface{}) { + dest, ok := os.LookupEnv("FSM_LOG_FILE") + if !ok { + panic("FSM_LOG_FILE not set") + } + w, err := os.OpenFile(dest, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + fmt.Fprintf(w, msg+"\n", args...) + err = w.Close() + if err != nil { + panic(err) + } +} diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 33293fc322..738ae45861 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -90,6 +90,7 @@ type Querier interface { GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) + GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index b930a8a250..e7879e3328 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -566,6 +566,14 @@ SELECT * FROM async_calls WHERE id = @id; +-- name: GetZombieAsyncCalls :many +SELECT * +FROM async_calls +WHERE state = 'executing' + AND lease_id IS NULL +ORDER BY created_at ASC +LIMIT sqlc.arg('limit')::INT; + -- name: GetFSMInstance :one SELECT * FROM fsm_instances diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index dab7ab0350..1ba65230c0 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -1844,6 +1844,56 @@ func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Tim return items, nil } +const getZombieAsyncCalls = `-- name: GetZombieAsyncCalls :many +SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context +FROM async_calls +WHERE state = 'executing' + AND lease_id IS NULL +ORDER BY created_at ASC +LIMIT $1::INT +` + +func (q *Queries) GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error) { + rows, err := q.db.QueryContext(ctx, getZombieAsyncCalls, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []AsyncCall + for rows.Next() { + var i AsyncCall + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.LeaseID, + &i.Verb, + &i.State, + &i.Origin, + &i.ScheduledAt, + &i.Request, + &i.Response, + &i.Error, + &i.RemainingAttempts, + &i.Backoff, + &i.MaxBackoff, + &i.CatchVerb, + &i.Catching, + &i.ParentRequestKey, + &i.TraceContext, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertSubscriber = `-- name: InsertSubscriber :exec INSERT INTO topic_subscribers ( key,