From a38cf8cee59f3ad9ab775235f504a737218974a4 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Tue, 24 Sep 2024 21:27:52 +1000 Subject: [PATCH] refactor: pull pubsub DAL/SQL out of controller (#2798) This introduces quite a lot of constructor boilerplate in a lot of places which I hope will become cleaner as more and more of the subsystems are decoupled. --------- Co-authored-by: github-actions[bot] --- backend/controller/async/async.go | 85 +++ backend/controller/console/console.go | 3 +- backend/controller/controller.go | 58 +- backend/controller/cronjobs/cronjobs.go | 4 +- backend/controller/cronjobs/cronjobs_test.go | 13 +- .../cronjobs/internal/sql/models.go | 3 +- backend/controller/dal/async_calls.go | 88 +-- backend/controller/dal/async_calls_test.go | 22 +- backend/controller/dal/dal.go | 241 +++----- backend/controller/dal/dal_test.go | 26 +- backend/controller/dal/fsm.go | 5 +- backend/controller/dal/fsm_test.go | 12 +- backend/controller/dal/internal/sql/models.go | 2 +- .../controller/dal/internal/sql/queries.sql | 228 -------- .../dal/internal/sql/queries.sql.go | 240 ++++---- backend/controller/dal/model/model.go | 143 +++++ backend/controller/dal/notify.go | 30 +- backend/controller/ingress/handler.go | 4 +- backend/controller/ingress/handler_test.go | 4 +- backend/controller/ingress/ingress.go | 6 +- backend/controller/ingress/request.go | 4 +- backend/controller/ingress/request_test.go | 4 +- backend/controller/pubsub/integration_test.go | 12 +- .../pubsub.go => pubsub/internal/dal/dal.go} | 44 +- .../pubsub/internal/sql/async_queries.sql.go | 89 +++ backend/controller/pubsub/internal/sql/db.go | 31 + .../controller/pubsub/internal/sql/models.go | 91 +++ .../controller/pubsub/internal/sql/querier.go | 39 ++ .../pubsub/internal/sql/queries.sql | 228 ++++++++ .../pubsub/internal/sql/queries.sql.go | 546 ++++++++++++++++++ backend/controller/pubsub/manager.go | 82 --- backend/controller/pubsub/service.go | 130 +++++ .../controller/scheduledtask/scheduledtask.go | 6 +- .../scheduledtask/scheduledtask_test.go | 12 +- backend/controller/sql/sqltypes/sqltypes.go | 3 + backend/controller/timeline/timeline_test.go | 16 +- backend/libdal/libdal.go | 7 + sqlc.yaml | 12 +- 38 files changed, 1768 insertions(+), 805 deletions(-) create mode 100644 backend/controller/async/async.go create mode 100644 backend/controller/dal/model/model.go rename backend/controller/{dal/pubsub.go => pubsub/internal/dal/dal.go} (89%) create mode 100644 backend/controller/pubsub/internal/sql/async_queries.sql.go create mode 100644 backend/controller/pubsub/internal/sql/db.go create mode 100644 backend/controller/pubsub/internal/sql/models.go create mode 100644 backend/controller/pubsub/internal/sql/querier.go create mode 100644 backend/controller/pubsub/internal/sql/queries.sql create mode 100644 backend/controller/pubsub/internal/sql/queries.sql.go delete mode 100644 backend/controller/pubsub/manager.go create mode 100644 backend/controller/pubsub/service.go diff --git a/backend/controller/async/async.go b/backend/controller/async/async.go new file mode 100644 index 0000000000..f759097ad2 --- /dev/null +++ b/backend/controller/async/async.go @@ -0,0 +1,85 @@ +package async + +import ( + "fmt" + + "github.com/alecthomas/participle/v2" + "github.com/alecthomas/participle/v2/lexer" + + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/model" +) + +type asyncOriginParseRoot struct { + Key AsyncOrigin `parser:"@@"` +} + +var asyncOriginLexer = lexer.MustSimple([]lexer.SimpleRule{ + {"NumberIdent", `[0-9][a-zA-Z0-9_-]*`}, + {"Ident", `[a-zA-Z_][a-zA-Z0-9_-]*`}, + {"Punct", `[:.]`}, +}) + +var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot]( + participle.Union[AsyncOrigin](AsyncOriginCron{}, AsyncOriginFSM{}, AsyncOriginPubSub{}), + participle.Lexer(asyncOriginLexer), +) + +// AsyncOrigin is a sum type representing the originator of an async call. +// +// This is used to determine how to handle the result of the async call. +type AsyncOrigin interface { + asyncOrigin() + // Origin returns the origin type. + Origin() string + String() string +} + +// AsyncOriginCron represents the context for the originator of a cron async call. +// +// It is in the form cron:. +type AsyncOriginCron struct { + CronJobKey model.CronJobKey `parser:"'cron' ':' @(~EOF)+"` +} + +var _ AsyncOrigin = AsyncOriginCron{} + +func (AsyncOriginCron) asyncOrigin() {} +func (a AsyncOriginCron) Origin() string { return "cron" } +func (a AsyncOriginCron) String() string { return fmt.Sprintf("cron:%s", a.CronJobKey) } + +// AsyncOriginFSM represents the context for the originator of an FSM async call. +// +// It is in the form fsm:.: +type AsyncOriginFSM struct { + FSM schema.RefKey `parser:"'fsm' ':' @@"` + Key string `parser:"':' @(~EOF)+"` +} + +var _ AsyncOrigin = AsyncOriginFSM{} + +func (AsyncOriginFSM) asyncOrigin() {} +func (a AsyncOriginFSM) Origin() string { return "fsm" } +func (a AsyncOriginFSM) String() string { return fmt.Sprintf("fsm:%s:%s", a.FSM, a.Key) } + +// AsyncOriginPubSub represents the context for the originator of an PubSub async call. +// +// It is in the form fsm:. +type AsyncOriginPubSub struct { + Subscription schema.RefKey `parser:"'sub' ':' @@"` +} + +var _ AsyncOrigin = AsyncOriginPubSub{} + +func (AsyncOriginPubSub) asyncOrigin() {} +func (a AsyncOriginPubSub) Origin() string { return "sub" } +func (a AsyncOriginPubSub) String() string { return fmt.Sprintf("sub:%s", a.Subscription) } + +// ParseAsyncOrigin parses an async origin key. +func ParseAsyncOrigin(origin string) (AsyncOrigin, error) { + root, err := asyncOriginParser.ParseString("", origin) + if err != nil { + return nil, fmt.Errorf("failed to parse async origin: %w", err) + } + return root.Key, nil +} diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index 2341eea06f..bb459a6be1 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -13,6 +13,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/timeline" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console" @@ -85,7 +86,7 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb } sch := &schema.Schema{ - Modules: slices.Map(deployments, func(d dal.Deployment) *schema.Module { + Modules: slices.Map(deployments, func(d dalmodel.Deployment) *schema.Module { return d.Schema }), } diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 7fc87393e0..ce6eba9ce1 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -35,9 +35,11 @@ import ( "github.com/TBD54566975/ftl" "github.com/TBD54566975/ftl/backend/controller/admin" + "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/console" "github.com/TBD54566975/ftl/backend/controller/cronjobs" "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/leases" @@ -207,7 +209,7 @@ type clients struct { // ControllerListListener is regularly notified of the current list of controllers // This is often used to update a hash ring to distribute work. type ControllerListListener interface { - UpdatedControllerList(ctx context.Context, controllers []dal.Controller) + UpdatedControllerList(ctx context.Context, controllers []dalmodel.Controller) } type Service struct { @@ -219,7 +221,7 @@ type Service struct { tasks *scheduledtask.Scheduler cronJobs *cronjobs.Service - pubSub *pubsub.Manager + pubSub *pubsub.Service timeline *timeline.Service controllerListListeners []ControllerListListener @@ -255,11 +257,11 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca return nil, fmt.Errorf("failed to create encryption dal: %w", err) } - db := dal.New(ctx, conn, encryption) ldb := dbleaser.NewDatabaseLeaser(conn) + scheduler := scheduledtask.New(ctx, key, ldb) + svc := &Service{ - tasks: scheduledtask.New(ctx, key, ldb), - dal: db, + tasks: scheduler, dbleaser: ldb, conn: conn, key: key, @@ -274,9 +276,11 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn) svc.cronJobs = cronSvc - pubSub := pubsub.New(ctx, db, svc.tasks, svc) + pubSub := pubsub.New(conn, encryption, svc.tasks, optional.Some[pubsub.AsyncCallListener](svc)) svc.pubSub = pubSub + svc.dal = dal.New(ctx, conn, encryption, pubSub) + timelineSvc := timeline.New(ctx, conn, encryption) svc.timeline = timelineSvc @@ -397,7 +401,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR } }) replicas := map[string]int32{} - protoRunners, err := slices.MapErr(status.Runners, func(r dal.Runner) (*ftlv1.StatusResponse_Runner, error) { + protoRunners, err := slices.MapErr(status.Runners, func(r dalmodel.Runner) (*ftlv1.StatusResponse_Runner, error) { asString := r.Deployment.String() deployment := &asString replicas[asString]++ @@ -415,7 +419,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR if err != nil { return nil, err } - deployments, err := slices.MapErr(status.Deployments, func(d dal.Deployment) (*ftlv1.StatusResponse_Deployment, error) { + deployments, err := slices.MapErr(status.Deployments, func(d dalmodel.Deployment) (*ftlv1.StatusResponse_Deployment, error) { labels, err := structpb.NewStruct(d.Labels) if err != nil { return nil, fmt.Errorf("could not marshal attributes for deployment %s: %w", d.Key.String(), err) @@ -434,7 +438,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR return nil, err } resp := &ftlv1.StatusResponse{ - Controllers: slices.Map(status.Controllers, func(c dal.Controller) *ftlv1.StatusResponse_Controller { + Controllers: slices.Map(status.Controllers, func(c dalmodel.Controller) *ftlv1.StatusResponse_Controller { return &ftlv1.StatusResponse_Controller{ Key: c.Key.String(), Endpoint: c.Endpoint, @@ -443,7 +447,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR }), Runners: protoRunners, Deployments: deployments, - IngressRoutes: slices.Map(status.IngressRoutes, func(r dal.IngressRouteEntry) *ftlv1.StatusResponse_IngressRoute { + IngressRoutes: slices.Map(status.IngressRoutes, func(r dalmodel.IngressRouteEntry) *ftlv1.StatusResponse_IngressRoute { return &ftlv1.StatusResponse_IngressRoute{ DeploymentKey: r.Deployment.String(), Verb: &schemapb.Ref{Module: r.Module, Name: r.Verb}, @@ -596,7 +600,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) } - err = s.dal.UpsertRunner(ctx, dal.Runner{ + err = s.dal.UpsertRunner(ctx, dalmodel.Runner{ Key: runnerKey, Endpoint: msg.Endpoint, Deployment: deploymentKey, @@ -959,7 +963,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) { // Publish the event. - err := s.dal.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body) + err := s.pubSub.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body) if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to publish a event to topic %s:%s: %w", req.Msg.Topic.Module, req.Msg.Topic.Name, err)) } @@ -1117,14 +1121,14 @@ func (s *Service) UploadArtefact(ctx context.Context, req *connect.Request[ftlv1 func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftlv1.CreateDeploymentRequest]) (*connect.Response[ftlv1.CreateDeploymentResponse], error) { logger := log.FromContext(ctx) - artefacts := make([]dal.DeploymentArtefact, len(req.Msg.Artefacts)) + artefacts := make([]dalmodel.DeploymentArtefact, len(req.Msg.Artefacts)) for i, artefact := range req.Msg.Artefacts { digest, err := sha256.ParseSHA256(artefact.Digest) if err != nil { logger.Errorf(err, "Invalid digest %s", artefact.Digest) return nil, fmt.Errorf("invalid digest: %w", err) } - artefacts[i] = dal.DeploymentArtefact{ + artefacts[i] = dalmodel.DeploymentArtefact{ Executable: artefact.Executable, Path: artefact.Path, Digest: digest, @@ -1167,7 +1171,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl } func (s *Service) ResetSubscription(ctx context.Context, req *connect.Request[ftlv1.ResetSubscriptionRequest]) (*connect.Response[ftlv1.ResetSubscriptionResponse], error) { - err := s.dal.ResetSubscription(ctx, req.Msg.Subscription.Module, req.Msg.Subscription.Name) + err := s.pubSub.ResetSubscription(ctx, req.Msg.Subscription.Module, req.Msg.Subscription.Name) if err != nil { return nil, fmt.Errorf("could not reset subscription: %w", err) } @@ -1289,13 +1293,13 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration if returnErr == nil { // Post-commit notification based on origin switch origin := call.Origin.(type) { - case dal.AsyncOriginCron: + case async.AsyncOriginCron: break - case dal.AsyncOriginFSM: + case async.AsyncOriginFSM: break - case dal.AsyncOriginPubSub: + case async.AsyncOriginPubSub: go s.pubSub.AsyncCallDidCommit(originalCtx, origin) default: @@ -1460,10 +1464,10 @@ func (s *Service) reapAsyncCalls(ctx context.Context) (nextInterval time.Duratio func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata { switch origin := call.Origin.(type) { - case dal.AsyncOriginCron: + case async.AsyncOriginCron: return &ftlv1.Metadata{} - case dal.AsyncOriginFSM: + case async.AsyncOriginFSM: return &ftlv1.Metadata{ Values: []*ftlv1.Metadata_Pair{ { @@ -1477,7 +1481,7 @@ func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata { }, } - case dal.AsyncOriginPubSub: + case async.AsyncOriginPubSub: return &ftlv1.Metadata{} default: @@ -1490,18 +1494,18 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal. // Allow for handling of completion based on origin switch origin := call.Origin.(type) { - case dal.AsyncOriginCron: + case async.AsyncOriginCron: if err := s.cronJobs.OnJobCompletion(ctx, origin.CronJobKey, failed); err != nil { return fmt.Errorf("failed to finalize cron async call: %w", err) } - case dal.AsyncOriginFSM: + case async.AsyncOriginFSM: if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil { return fmt.Errorf("failed to finalize FSM async call: %w", err) } - case dal.AsyncOriginPubSub: - if err := s.pubSub.OnCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil { + case async.AsyncOriginPubSub: + if err := s.pubSub.OnCallCompletion(ctx, tx.Connection, origin, failed, isFinalResult); err != nil { return fmt.Errorf("failed to finalize pubsub async call: %w", err) } @@ -1511,7 +1515,7 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal. return nil } -func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, origin dal.AsyncOriginFSM, failed bool, isFinalResult bool) error { +func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, origin async.AsyncOriginFSM, failed bool, isFinalResult bool) error { logger := log.FromContext(ctx).Scope(origin.FSM.String()) // retrieve the next fsm event and delete it @@ -1757,7 +1761,7 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) { deployments, err := s.dal.GetActiveDeployments(ctx) if errors.Is(err, libdal.ErrNotFound) { - deployments = []dal.Deployment{} + deployments = []dalmodel.Deployment{} } else if err != nil { return 0, err } diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 1310ff7f52..46d5e331c8 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -8,8 +8,8 @@ import ( "github.com/benbjohnson/clock" + "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal" - parentdal "github.com/TBD54566975/ftl/backend/controller/dal" encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/encryption/api" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" @@ -177,7 +177,7 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr } logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob) - origin := &parentdal.AsyncOriginCron{CronJobKey: job.Key} + origin := &async.AsyncOriginCron{CronJobKey: job.Key} var request api.EncryptedColumn[api.AsyncSubKey] err = s.encryption.Encrypt([]byte(`{}`), &request) if err != nil { diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 3a76acfcba..e1bdaad12d 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -12,9 +12,14 @@ import ( "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" + "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal" parentdal "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/pubsub" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" @@ -39,13 +44,15 @@ func TestNewCronJobsForModule(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri))) assert.NoError(t, err) - parentDAL := parentdal.New(ctx, conn, encryption) + scheduler := scheduledtask.New(ctx, key, leases.NewFakeLeaser()) + pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) + parentDAL := parentdal.New(ctx, conn, encryption, pubSub) moduleName := "initial" jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{ Name: moduleName, - }, []parentdal.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate) + }, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate) assert.NoError(t, err) err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1) assert.NoError(t, err) @@ -107,7 +114,7 @@ func TestNewCronJobsForModule(t *testing.T) { expectUnscheduledJobs(t, dal, clk, 2) // Use the completion handler to schedule the next execution for _, call := range calls { - origin, ok := call.Origin.(parentdal.AsyncOriginCron) + origin, ok := call.Origin.(async.AsyncOriginCron) assert.True(t, ok) err = cjs.OnJobCompletion(ctx, origin.CronJobKey, false) assert.NoError(t, err) diff --git a/backend/controller/cronjobs/internal/sql/models.go b/backend/controller/cronjobs/internal/sql/models.go index ad37c8ab73..7fc9ce050e 100644 --- a/backend/controller/cronjobs/internal/sql/models.go +++ b/backend/controller/cronjobs/internal/sql/models.go @@ -8,6 +8,7 @@ import ( "encoding/json" "time" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" @@ -22,7 +23,7 @@ type CronJob struct { StartTime time.Time NextExecution time.Time ModuleName string - LastExecution optional.Option[time.Time] + LastExecution sqltypes.OptionalTime LastAsyncCallID optional.Option[int64] } diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 6f0d49ff27..5fdb9cd794 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -7,98 +7,22 @@ import ( "fmt" "time" - "github.com/alecthomas/participle/v2" - "github.com/alecthomas/participle/v2/lexer" "github.com/alecthomas/types/either" "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" "github.com/TBD54566975/ftl/backend/controller/encryption/api" leasedal "github.com/TBD54566975/ftl/backend/controller/leases/dbleaser" "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/model" ) -type asyncOriginParseRoot struct { - Key AsyncOrigin `parser:"@@"` -} - -var asyncOriginLexer = lexer.MustSimple([]lexer.SimpleRule{ - {"NumberIdent", `[0-9][a-zA-Z0-9_-]*`}, - {"Ident", `[a-zA-Z_][a-zA-Z0-9_-]*`}, - {"Punct", `[:.]`}, -}) - -var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot]( - participle.Union[AsyncOrigin](AsyncOriginCron{}, AsyncOriginFSM{}, AsyncOriginPubSub{}), - participle.Lexer(asyncOriginLexer), -) - -// AsyncOrigin is a sum type representing the originator of an async call. -// -// This is used to determine how to handle the result of the async call. -type AsyncOrigin interface { - asyncOrigin() - // Origin returns the origin type. - Origin() string - String() string -} - -// AsyncOriginCron represents the context for the originator of a cron async call. -// -// It is in the form cron:. -type AsyncOriginCron struct { - CronJobKey model.CronJobKey `parser:"'cron' ':' @(~EOF)+"` -} - -var _ AsyncOrigin = AsyncOriginCron{} - -func (AsyncOriginCron) asyncOrigin() {} -func (a AsyncOriginCron) Origin() string { return "cron" } -func (a AsyncOriginCron) String() string { return fmt.Sprintf("cron:%s", a.CronJobKey) } - -// AsyncOriginFSM represents the context for the originator of an FSM async call. -// -// It is in the form fsm:.: -type AsyncOriginFSM struct { - FSM schema.RefKey `parser:"'fsm' ':' @@"` - Key string `parser:"':' @(~EOF)+"` -} - -var _ AsyncOrigin = AsyncOriginFSM{} - -func (AsyncOriginFSM) asyncOrigin() {} -func (a AsyncOriginFSM) Origin() string { return "fsm" } -func (a AsyncOriginFSM) String() string { return fmt.Sprintf("fsm:%s:%s", a.FSM, a.Key) } - -// AsyncOriginPubSub represents the context for the originator of an PubSub async call. -// -// It is in the form fsm:. -type AsyncOriginPubSub struct { - Subscription schema.RefKey `parser:"'sub' ':' @@"` -} - -var _ AsyncOrigin = AsyncOriginPubSub{} - -func (AsyncOriginPubSub) asyncOrigin() {} -func (a AsyncOriginPubSub) Origin() string { return "sub" } -func (a AsyncOriginPubSub) String() string { return fmt.Sprintf("sub:%s", a.Subscription) } - -// ParseAsyncOrigin parses an async origin key. -func ParseAsyncOrigin(origin string) (AsyncOrigin, error) { - root, err := asyncOriginParser.ParseString("", origin) - if err != nil { - return nil, err - } - return root.Key, nil -} - type AsyncCall struct { *leasedal.Lease // May be nil ID int64 - Origin AsyncOrigin + Origin async.AsyncOrigin Verb schema.RefKey CatchVerb optional.Option[schema.RefKey] Request []byte @@ -134,7 +58,7 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx c } return nil, ctx, fmt.Errorf("failed to acquire async call: %w", err) } - origin, err := ParseAsyncOrigin(row.Origin) + origin, err := async.ParseAsyncOrigin(row.Origin) if err != nil { return nil, ctx, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } @@ -144,7 +68,7 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx c return nil, ctx, fmt.Errorf("failed to decrypt async call request: %w", err) } - lease, leaseCtx := d.leaseDAL.NewLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl) + lease, leaseCtx := d.leaser.NewLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl) return &AsyncCall{ ID: row.AsyncCallID, Verb: row.Verb, @@ -257,7 +181,7 @@ func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) { if err != nil { return nil, libdal.TranslatePGError(err) } - origin, err := ParseAsyncOrigin(row.Origin) + origin, err := async.ParseAsyncOrigin(row.Origin) if err != nil { return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } @@ -280,7 +204,7 @@ func (d *DAL) GetZombieAsyncCalls(ctx context.Context, limit int) ([]*AsyncCall, } var calls []*AsyncCall for _, row := range rows { - origin, err := ParseAsyncOrigin(row.Origin) + origin, err := async.ParseAsyncOrigin(row.Origin) if err != nil { return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } diff --git a/backend/controller/dal/async_calls_test.go b/backend/controller/dal/async_calls_test.go index e69c68bd32..d00a3ef910 100644 --- a/backend/controller/dal/async_calls_test.go +++ b/backend/controller/dal/async_calls_test.go @@ -5,8 +5,13 @@ import ( "testing" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/pubsub" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" @@ -19,8 +24,9 @@ func TestNoCallToAcquire(t *testing.T) { conn := sqltest.OpenForTesting(ctx, t) encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - - dal := New(ctx, conn, encryption) + scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) + pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) + dal := New(ctx, conn, encryption, pubSub) _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, libdal.ErrNotFound) @@ -32,22 +38,22 @@ func TestParser(t *testing.T) { tests := []struct { name string input string - expected AsyncOrigin + expected async.AsyncOrigin }{ - {"Cron", `cron:crn-initial-verb0Cron-5eq2ivpmuv0lvnoc`, AsyncOriginCron{ + {"Cron", `cron:crn-initial-verb0Cron-5eq2ivpmuv0lvnoc`, async.AsyncOriginCron{ CronJobKey: model.CronJobKey{ Payload: model.CronJobPayload{Module: "initial", Verb: "verb0Cron"}, Suffix: []byte("\xfd7\xe6*\xfcƹ\xe9.\x9c"), }}}, - {"FSM", `fsm:module.name:key`, AsyncOriginFSM{FSM: schema.RefKey{Module: "module", Name: "name"}, Key: "key"}}, - {"PubSub", `sub:module.topic`, AsyncOriginPubSub{Subscription: schema.RefKey{Module: "module", Name: "topic"}}}, + {"FSM", `fsm:module.name:key`, async.AsyncOriginFSM{FSM: schema.RefKey{Module: "module", Name: "name"}, Key: "key"}}, + {"PubSub", `sub:module.topic`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "module", Name: "topic"}}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() - origin, err := asyncOriginParser.ParseString("", tt.input) + origin, err := async.ParseAsyncOrigin(tt.input) assert.NoError(t, err) - assert.Equal(t, tt.expected, origin.Key) + assert.Equal(t, tt.expected, origin) }) } } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index c65a0e0370..bc64d52da0 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -11,18 +11,19 @@ import ( "time" "github.com/alecthomas/types/optional" - "github.com/alecthomas/types/pubsub" + inprocesspubsub "github.com/alecthomas/types/pubsub" sets "github.com/deckarep/golang-set/v2" xmaps "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/encryption/api" - leasedal "github.com/TBD54566975/ftl/backend/controller/leases/dbleaser" + "github.com/TBD54566975/ftl/backend/controller/leases/dbleaser" + "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/libdal" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/maps" @@ -31,57 +32,13 @@ import ( "github.com/TBD54566975/ftl/internal/slices" ) -type IngressRoute struct { - Runner model.RunnerKey - Deployment model.DeploymentKey - Endpoint string - Path string - Module string - Verb string -} - -type IngressRouteEntry struct { - Deployment model.DeploymentKey - Module string - Verb string - Method string - Path string -} - -type DeploymentArtefact struct { - Digest sha256.SHA256 - Executable bool - Path string -} - -func (d *DeploymentArtefact) ToProto() *ftlv1.DeploymentArtefact { - return &ftlv1.DeploymentArtefact{ - Digest: d.Digest.String(), - Executable: d.Executable, - Path: d.Path, - } -} - -func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error) { - digest, err := sha256.ParseSHA256(in.Digest) - if err != nil { - return DeploymentArtefact{}, err - } - return DeploymentArtefact{ - Digest: digest, - Executable: in.Executable, - Path: in.Path, - }, nil -} - -func runnerFromDB(row dalsql.GetRunnerRow) Runner { - +func runnerFromDB(row dalsql.GetRunnerRow) dalmodel.Runner { attrs := model.Labels{} if err := json.Unmarshal(row.Labels, &attrs); err != nil { - return Runner{} + return dalmodel.Runner{} } - return Runner{ + return dalmodel.Runner{ Key: row.RunnerKey, Endpoint: row.Endpoint, Deployment: row.DeploymentKey, @@ -89,93 +46,30 @@ func runnerFromDB(row dalsql.GetRunnerRow) Runner { } } -type Runner struct { - Key model.RunnerKey - Endpoint string - ReservationTimeout optional.Option[time.Duration] - Module optional.Option[string] - Deployment model.DeploymentKey - Labels model.Labels -} - -func (r Runner) notification() {} - -type Reconciliation struct { - Deployment model.DeploymentKey - Module string - Language string - - AssignedReplicas int - RequiredReplicas int -} - -type ControllerState string - -// Controller states. -const ( - ControllerStateLive = ControllerState(dalsql.ControllerStateLive) - ControllerStateDead = ControllerState(dalsql.ControllerStateDead) -) - -type RequestOrigin string - -const ( - RequestOriginIngress = RequestOrigin(dalsql.OriginIngress) - RequestOriginCron = RequestOrigin(dalsql.OriginCron) - RequestOriginPubsub = RequestOrigin(dalsql.OriginPubsub) -) - -type Deployment struct { - Key model.DeploymentKey - Language string - Module string - MinReplicas int - Replicas optional.Option[int] // Depending on the query this may or may not be populated. - Schema *schema.Module - CreatedAt time.Time - Labels model.Labels -} - -func (d Deployment) String() string { return d.Key.String() } - -func (d Deployment) notification() {} - -type Controller struct { - Key model.ControllerKey - Endpoint string - State ControllerState -} - -type Status struct { - Controllers []Controller - Runners []Runner - Deployments []Deployment - IngressRoutes []IngressRouteEntry -} - // A Reservation of a Runner. type Reservation interface { - Runner() Runner + Runner() dalmodel.Runner Commit(ctx context.Context) error Rollback(ctx context.Context) error } -func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *DAL { +func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, pubsub *pubsub.Service) *DAL { var d *DAL d = &DAL{ - leaseDAL: leasedal.NewDatabaseLeaser(conn), + leaser: dbleaser.NewDatabaseLeaser(conn), db: dalsql.New(conn), encryption: encryption, Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { return &DAL{ Handle: h, db: dalsql.New(h.Connection), - leaseDAL: leasedal.NewDatabaseLeaser(h.Connection), + leaser: dbleaser.NewDatabaseLeaser(h.Connection), + pubsub: pubsub, encryption: d.encryption, DeploymentChanges: d.DeploymentChanges, } }), - DeploymentChanges: pubsub.New[DeploymentNotification](), + DeploymentChanges: inprocesspubsub.New[DeploymentNotification](), } return d @@ -185,50 +79,51 @@ type DAL struct { *libdal.Handle[DAL] db dalsql.Querier - leaseDAL *leasedal.DatabaseLeaser + leaser *dbleaser.DatabaseLeaser + pubsub *pubsub.Service encryption *encryption.Service // DeploymentChanges is a Topic that receives changes to the deployments table. - DeploymentChanges *pubsub.Topic[DeploymentNotification] + DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification] } -func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error) { +func (d *DAL) GetActiveControllers(ctx context.Context) ([]dalmodel.Controller, error) { controllers, err := d.db.GetActiveControllers(ctx) if err != nil { return nil, libdal.TranslatePGError(err) } - return slices.Map(controllers, func(in dalsql.Controller) Controller { - return Controller{ + return slices.Map(controllers, func(in dalsql.Controller) dalmodel.Controller { + return dalmodel.Controller{ Key: in.Key, Endpoint: in.Endpoint, } }), nil } -func (d *DAL) GetStatus(ctx context.Context) (Status, error) { +func (d *DAL) GetStatus(ctx context.Context) (dalmodel.Status, error) { controllers, err := d.GetActiveControllers(ctx) if err != nil { - return Status{}, fmt.Errorf("could not get control planes: %w", libdal.TranslatePGError(err)) + return dalmodel.Status{}, fmt.Errorf("could not get control planes: %w", libdal.TranslatePGError(err)) } runners, err := d.db.GetActiveRunners(ctx) if err != nil { - return Status{}, fmt.Errorf("could not get active runners: %w", libdal.TranslatePGError(err)) + return dalmodel.Status{}, fmt.Errorf("could not get active runners: %w", libdal.TranslatePGError(err)) } deployments, err := d.db.GetActiveDeployments(ctx) if err != nil { - return Status{}, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err)) + return dalmodel.Status{}, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err)) } ingressRoutes, err := d.db.GetActiveIngressRoutes(ctx) if err != nil { - return Status{}, fmt.Errorf("could not get ingress routes: %w", libdal.TranslatePGError(err)) + return dalmodel.Status{}, fmt.Errorf("could not get ingress routes: %w", libdal.TranslatePGError(err)) } - statusDeployments, err := slices.MapErr(deployments, func(in dalsql.GetActiveDeploymentsRow) (Deployment, error) { + statusDeployments, err := slices.MapErr(deployments, func(in dalsql.GetActiveDeploymentsRow) (dalmodel.Deployment, error) { labels := model.Labels{} err = json.Unmarshal(in.Deployment.Labels, &labels) if err != nil { - return Deployment{}, fmt.Errorf("%q: invalid labels in database: %w", in.ModuleName, err) + return dalmodel.Deployment{}, fmt.Errorf("%q: invalid labels in database: %w", in.ModuleName, err) } - return Deployment{ + return dalmodel.Deployment{ Key: in.Deployment.Key, Module: in.ModuleName, Language: in.Language, @@ -238,15 +133,15 @@ func (d *DAL) GetStatus(ctx context.Context) (Status, error) { }, nil }) if err != nil { - return Status{}, err + return dalmodel.Status{}, fmt.Errorf("could not parse deployments: %w", err) } - domainRunners, err := slices.MapErr(runners, func(in dalsql.GetActiveRunnersRow) (Runner, error) { + domainRunners, err := slices.MapErr(runners, func(in dalsql.GetActiveRunnersRow) (dalmodel.Runner, error) { attrs := model.Labels{} if err := json.Unmarshal(in.Labels, &attrs); err != nil { - return Runner{}, fmt.Errorf("invalid attributes JSON for runner %s: %w", in.RunnerKey, err) + return dalmodel.Runner{}, fmt.Errorf("invalid attributes JSON for runner %s: %w", in.RunnerKey, err) } - return Runner{ + return dalmodel.Runner{ Key: in.RunnerKey, Endpoint: in.Endpoint, Deployment: in.DeploymentKey, @@ -254,14 +149,14 @@ func (d *DAL) GetStatus(ctx context.Context) (Status, error) { }, nil }) if err != nil { - return Status{}, err + return dalmodel.Status{}, fmt.Errorf("could not parse runners: %w", err) } - return Status{ + return dalmodel.Status{ Controllers: controllers, Deployments: statusDeployments, Runners: domainRunners, - IngressRoutes: slices.Map(ingressRoutes, func(in dalsql.GetActiveIngressRoutesRow) IngressRouteEntry { - return IngressRouteEntry{ + IngressRoutes: slices.Map(ingressRoutes, func(in dalsql.GetActiveIngressRoutesRow) dalmodel.IngressRouteEntry { + return dalmodel.IngressRouteEntry{ Deployment: in.DeploymentKey, Module: in.Module, Verb: in.Verb, @@ -272,8 +167,8 @@ func (d *DAL) GetStatus(ctx context.Context) (Status, error) { }, nil } -func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]Runner, error) { - runners := []Runner{} +func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]dalmodel.Runner, error) { + runners := []dalmodel.Runner{} rows, err := d.db.GetRunnersForDeployment(ctx, deployment) if err != nil { return nil, libdal.TranslatePGError(err) @@ -284,7 +179,7 @@ func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.Depl return nil, fmt.Errorf("invalid attributes JSON for runner %d: %w", row.ID, err) } - runners = append(runners, Runner{ + runners = append(runners, dalmodel.Runner{ Key: row.Key, Endpoint: row.Endpoint, Deployment: deployment, @@ -328,7 +223,7 @@ type IngressRoutingEntry struct { // previously created artefacts with it. // // If an existing deployment with identical artefacts exists, it is returned. -func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) { +func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) { logger := log.FromContext(ctx) // Start the parent transaction @@ -346,7 +241,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem return existingDeployment, nil } - artefactsByDigest := maps.FromSlice(artefacts, func(in DeploymentArtefact) (sha256.SHA256, DeploymentArtefact) { + artefactsByDigest := maps.FromSlice(artefacts, func(in dalmodel.DeploymentArtefact) (sha256.SHA256, dalmodel.DeploymentArtefact) { return in.Digest, in }) @@ -386,13 +281,13 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem return model.DeploymentKey{}, fmt.Errorf("failed to create deployment: %w", libdal.TranslatePGError(err)) } - uploadedDigests := slices.Map(artefacts, func(in DeploymentArtefact) []byte { return in.Digest[:] }) + uploadedDigests := slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) []byte { return in.Digest[:] }) artefactDigests, err := tx.db.GetArtefactDigests(ctx, uploadedDigests) if err != nil { return model.DeploymentKey{}, fmt.Errorf("failed to get artefact digests: %w", err) } if len(artefactDigests) != len(artefacts) { - missingDigests := strings.Join(slices.Map(artefacts, func(in DeploymentArtefact) string { return in.Digest.String() }), ", ") + missingDigests := strings.Join(slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) string { return in.Digest.String() }), ", ") return model.DeploymentKey{}, fmt.Errorf("missing %d artefacts: %s", len(artefacts)-len(artefactDigests), missingDigests) } @@ -455,7 +350,7 @@ func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*mode // // ErrConflict will be returned if a runner with the same endpoint and a // different key already exists. -func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error { +func (d *DAL) UpsertRunner(ctx context.Context, runner dalmodel.Runner) error { attrBytes, err := json.Marshal(runner.Labels) if err != nil { return fmt.Errorf("failed to JSON encode runner labels: %w", err) @@ -503,7 +398,7 @@ var _ Reservation = (*postgresClaim)(nil) type postgresClaim struct { tx *DAL - runner Runner + runner dalmodel.Runner cancel context.CancelFunc } @@ -517,7 +412,7 @@ func (p *postgresClaim) Rollback(ctx context.Context) error { return libdal.TranslatePGError(p.tx.Rollback(ctx)) } -func (p *postgresClaim) Runner() Runner { return p.runner } +func (p *postgresClaim) Runner() dalmodel.Runner { return p.runner } // SetDeploymentReplicas activates the given deployment. func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error) { @@ -642,29 +537,37 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl // deploymentWillActivate is called whenever a deployment goes from min_replicas=0 to min_replicas>0. // -// when replacing a deployment, this should be called first before calling deploymentWillDeactivate on the old deployment. +// When replacing a deployment, this should be called first before calling deploymentWillDeactivate on the old deployment. // This allows the new deployment to migrate from the old deployment (such as subscriptions). func (d *DAL) deploymentWillActivate(ctx context.Context, key model.DeploymentKey) error { module, err := d.db.GetSchemaForDeployment(ctx, key) if err != nil { return fmt.Errorf("could not get schema: %w", libdal.TranslatePGError(err)) } - err = d.createSubscriptions(ctx, key, module) + err = d.pubsub.CreateSubscriptions(ctx, key, module) if err != nil { return err } - return d.createSubscribers(ctx, key, module) + err = d.pubsub.CreateSubscribers(ctx, key, module) + if err != nil { + return fmt.Errorf("could not create subscribers: %w", err) + } + return nil } // deploymentWillDeactivate is called whenever a deployment goes to min_replicas=0. // // it may be called when min_replicas was already 0 func (d *DAL) deploymentWillDeactivate(ctx context.Context, key model.DeploymentKey) error { - return d.removeSubscriptionsAndSubscribers(ctx, key) + err := d.pubsub.RemoveSubscriptionsAndSubscribers(ctx, key) + if err != nil { + return fmt.Errorf("could not remove subscriptions and subscribers: %w", err) + } + return nil } // GetActiveDeployments returns all active deployments. -func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error) { +func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment, error) { rows, err := d.db.GetActiveDeployments(ctx) if err != nil { if libdal.IsNotFound(err) { @@ -672,8 +575,8 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error) { } return nil, libdal.TranslatePGError(err) } - return slices.Map(rows, func(in dalsql.GetActiveDeploymentsRow) Deployment { - return Deployment{ + return slices.Map(rows, func(in dalsql.GetActiveDeploymentsRow) dalmodel.Deployment { + return dalmodel.Deployment{ Key: in.Deployment.Key, Module: in.ModuleName, Language: in.Language, @@ -709,7 +612,7 @@ func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) { return sch, nil } -func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, error) { +func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]dalmodel.Deployment, error) { rows, err := d.db.GetDeploymentsWithMinReplicas(ctx) if err != nil { if libdal.IsNotFound(err) { @@ -717,8 +620,8 @@ func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]Deployment, } return nil, libdal.TranslatePGError(err) } - return slices.Map(rows, func(in dalsql.GetDeploymentsWithMinReplicasRow) Deployment { - return Deployment{ + return slices.Map(rows, func(in dalsql.GetDeploymentsWithMinReplicasRow) dalmodel.Deployment { + return dalmodel.Deployment{ Key: in.Deployment.Key, Module: in.ModuleName, Language: in.Language, @@ -783,10 +686,10 @@ func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error) { }) } -func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error) { +func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (dalmodel.Runner, error) { row, err := d.db.GetRunner(ctx, runnerKey) if err != nil { - return Runner{}, libdal.TranslatePGError(err) + return dalmodel.Runner{}, libdal.TranslatePGError(err) } return runnerFromDB(row), nil } @@ -820,7 +723,7 @@ func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr stri return nil } -func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error) { +func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]dalmodel.IngressRoute, error) { routes, err := d.db.GetIngressRoutes(ctx, method) if err != nil { return nil, libdal.TranslatePGError(err) @@ -828,8 +731,8 @@ func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRou if len(routes) == 0 { return nil, libdal.ErrNotFound } - return slices.Map(routes, func(row dalsql.GetIngressRoutesRow) IngressRoute { - return IngressRoute{ + return slices.Map(routes, func(row dalsql.GetIngressRoutesRow) dalmodel.IngressRoute { + return dalmodel.IngressRoute{ Runner: row.RunnerKey, Deployment: row.DeploymentKey, Endpoint: row.Endpoint, @@ -845,24 +748,24 @@ func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, add return id, libdal.TranslatePGError(err) } -func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error) { +func (d *DAL) GetActiveRunners(ctx context.Context) ([]dalmodel.Runner, error) { rows, err := d.db.GetActiveRunners(ctx) if err != nil { return nil, libdal.TranslatePGError(err) } - return slices.Map(rows, func(row dalsql.GetActiveRunnersRow) Runner { + return slices.Map(rows, func(row dalsql.GetActiveRunnersRow) dalmodel.Runner { return runnerFromDB(dalsql.GetRunnerRow(row)) }), nil } // Check if a deployment exists that exactly matches the given artefacts and schema. -func (*DAL) checkForExistingDeployments(ctx context.Context, tx *DAL, moduleSchema *schema.Module, artefacts []DeploymentArtefact) (model.DeploymentKey, error) { +func (*DAL) checkForExistingDeployments(ctx context.Context, tx *DAL, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact) (model.DeploymentKey, error) { schemaBytes, err := schema.ModuleToBytes(moduleSchema) if err != nil { return model.DeploymentKey{}, fmt.Errorf("failed to marshal schema: %w", err) } existing, err := tx.db.GetDeploymentsWithArtefacts(ctx, - sha256esToBytes(slices.Map(artefacts, func(in DeploymentArtefact) sha256.SHA256 { return in.Digest })), + sha256esToBytes(slices.Map(artefacts, func(in dalmodel.DeploymentArtefact) sha256.SHA256 { return in.Digest })), schemaBytes, int64(len(artefacts)), ) diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index 95729deba3..8cd24b9d52 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -12,7 +12,11 @@ import ( "github.com/alecthomas/types/optional" "golang.org/x/sync/errgroup" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/pubsub" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" @@ -27,7 +31,9 @@ func TestDAL(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - dal := New(ctx, conn, encryption) + scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) + pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) + dal := New(ctx, conn, encryption, pubSub) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) var testSHA = sha256.Sum(testContent) @@ -57,7 +63,7 @@ func TestDAL(t *testing.T) { module := &schema.Module{Name: "test"} var deploymentKey model.DeploymentKey t.Run("CreateDeployment", func(t *testing.T) { - deploymentKey, err = dal.CreateDeployment(ctx, "go", module, []DeploymentArtefact{{ + deploymentKey, err = dal.CreateDeployment(ctx, "go", module, []dalmodel.DeploymentArtefact{{ Digest: testSha, Executable: true, Path: "dir/filename", @@ -103,7 +109,7 @@ func TestDAL(t *testing.T) { labels := map[string]any{"languages": []any{"go"}} t.Run("RegisterRunner", func(t *testing.T) { - err = dal.UpsertRunner(ctx, Runner{ + err = dal.UpsertRunner(ctx, dalmodel.Runner{ Key: runnerID, Labels: labels, Endpoint: "http://localhost:8080", @@ -118,7 +124,7 @@ func TestDAL(t *testing.T) { }) t.Run("UpdateRunnerAssigned", func(t *testing.T) { - err := dal.UpsertRunner(ctx, Runner{ + err := dal.UpsertRunner(ctx, dalmodel.Runner{ Key: runnerID, Labels: labels, Endpoint: "http://localhost:8080", @@ -130,7 +136,7 @@ func TestDAL(t *testing.T) { t.Run("GetRunnersForDeployment", func(t *testing.T) { runners, err := dal.GetRunnersForDeployment(ctx, deploymentKey) assert.NoError(t, err) - assert.Equal(t, []Runner{{ + assert.Equal(t, []dalmodel.Runner{{ Key: runnerID, Labels: labels, Endpoint: "http://localhost:8080", @@ -145,7 +151,7 @@ func TestDAL(t *testing.T) { }) t.Run("UpdateRunnerInvalidDeployment", func(t *testing.T) { - err := dal.UpsertRunner(ctx, Runner{ + err := dal.UpsertRunner(ctx, dalmodel.Runner{ Key: runnerID, Labels: labels, Endpoint: "http://localhost:8080", @@ -169,8 +175,8 @@ func TestDAL(t *testing.T) { t.Skip("Skipping this test since we're not using the deployment notification system") dal.DeploymentChanges.Unsubscribe(deploymentChangesCh) expectedDeploymentChanges := []DeploymentNotification{ - {Message: optional.Some(Deployment{Language: "go", Module: "test", Schema: &schema.Module{Name: "test"}})}, - {Message: optional.Some(Deployment{Language: "go", Module: "test", MinReplicas: 1, Schema: &schema.Module{Name: "test"}})}, + {Message: optional.Some(dalmodel.Deployment{Language: "go", Module: "test", Schema: &schema.Module{Name: "test"}})}, + {Message: optional.Some(dalmodel.Deployment{Language: "go", Module: "test", MinReplicas: 1, Schema: &schema.Module{Name: "test"}})}, } err = wg.Wait() assert.NoError(t, err) @@ -185,7 +191,9 @@ func TestCreateArtefactConflict(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - dal := New(ctx, conn, encryption) + scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) + pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) + dal := New(ctx, conn, encryption, pubSub) idch := make(chan sha256.SHA256, 2) diff --git a/backend/controller/dal/fsm.go b/backend/controller/dal/fsm.go index 8b0b8cc581..90ded2e705 100644 --- a/backend/controller/dal/fsm.go +++ b/backend/controller/dal/fsm.go @@ -9,6 +9,7 @@ import ( "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/async" sql2 "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" "github.com/TBD54566975/ftl/backend/controller/encryption/api" "github.com/TBD54566975/ftl/backend/controller/leases" @@ -43,7 +44,7 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanc } // Create an async call for the event. - origin := AsyncOriginFSM{FSM: fsm, Key: instanceKey} + origin := async.AsyncOriginFSM{FSM: fsm, Key: instanceKey} asyncCallID, err := d.db.CreateAsyncCall(ctx, sql2.CreateAsyncCallParams{ ScheduledAt: time.Now(), Verb: destinationState, @@ -192,7 +193,7 @@ type FSMInstance struct { // // The lease must be released by the caller. func (d *DAL) AcquireFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) (*FSMInstance, error) { - lease, _, err := d.leaseDAL.AcquireLease(ctx, leases.SystemKey("fsm_instance", fsm.String(), instanceKey), time.Second*5, optional.None[any]()) + lease, _, err := d.leaser.AcquireLease(ctx, leases.SystemKey("fsm_instance", fsm.String(), instanceKey), time.Second*5, optional.None[any]()) if err != nil { return nil, fmt.Errorf("failed to acquire FSM lease: %w", err) } diff --git a/backend/controller/dal/fsm_test.go b/backend/controller/dal/fsm_test.go index e4e370d1b3..f152200733 100644 --- a/backend/controller/dal/fsm_test.go +++ b/backend/controller/dal/fsm_test.go @@ -7,13 +7,19 @@ import ( "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/either" + "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/leases" leasedal "github.com/TBD54566975/ftl/backend/controller/leases/dbleaser" + "github.com/TBD54566975/ftl/backend/controller/pubsub" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" ) func TestSendFSMEvent(t *testing.T) { @@ -22,7 +28,9 @@ func TestSendFSMEvent(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - dal := New(ctx, conn, encryption) + scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) + pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) + dal := New(ctx, conn, encryption, pubSub) _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, libdal.ErrNotFound) @@ -46,7 +54,7 @@ func TestSendFSMEvent(t *testing.T) { expectedCall := &AsyncCall{ ID: 1, Verb: ref, - Origin: AsyncOriginFSM{ + Origin: async.AsyncOriginFSM{ FSM: schema.RefKey{Module: "test", Name: "test"}, Key: "invoiceID", }, diff --git a/backend/controller/dal/internal/sql/models.go b/backend/controller/dal/internal/sql/models.go index fda5284494..8b2f1e3e6b 100644 --- a/backend/controller/dal/internal/sql/models.go +++ b/backend/controller/dal/internal/sql/models.go @@ -270,7 +270,7 @@ type CronJob struct { StartTime time.Time NextExecution time.Time ModuleName string - LastExecution optional.Option[time.Time] + LastExecution sqltypes.OptionalTime LastAsyncCallID optional.Option[int64] } diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index fc2e8a85bc..da5be4feda 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -372,234 +372,6 @@ WHERE fsm_instance_id = ( ) RETURNING *; --- name: UpsertTopic :exec -INSERT INTO topics (key, module_id, name, type) -VALUES ( - sqlc.arg('topic')::topic_key, - (SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT LIMIT 1), - sqlc.arg('name')::TEXT, - sqlc.arg('event_type')::TEXT -) -ON CONFLICT (name, module_id) DO -UPDATE SET - type = sqlc.arg('event_type')::TEXT -RETURNING id; - --- name: UpsertSubscription :one -INSERT INTO topic_subscriptions ( - key, - topic_id, - module_id, - deployment_id, - name) -VALUES ( - sqlc.arg('key')::subscription_key, - ( - SELECT topics.id as id - FROM topics - INNER JOIN modules ON topics.module_id = modules.id - WHERE modules.name = sqlc.arg('topic_module')::TEXT - AND topics.name = sqlc.arg('topic_name')::TEXT - ), - (SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT), - (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key), - sqlc.arg('name')::TEXT -) -ON CONFLICT (name, module_id) DO -UPDATE SET - topic_id = excluded.topic_id, - deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key) -RETURNING - id, - CASE - WHEN xmax = 0 THEN true - ELSE false - END AS inserted; - --- name: DeleteSubscriptions :many -DELETE FROM topic_subscriptions -WHERE deployment_id IN ( - SELECT deployments.id - FROM deployments - WHERE deployments.key = sqlc.arg('deployment')::deployment_key -) -RETURNING topic_subscriptions.key; - --- name: DeleteSubscribers :many -DELETE FROM topic_subscribers -WHERE deployment_id IN ( - SELECT deployments.id - FROM deployments - WHERE deployments.key = sqlc.arg('deployment')::deployment_key -) -RETURNING topic_subscribers.key; - --- name: InsertSubscriber :exec -INSERT INTO topic_subscribers ( - key, - topic_subscriptions_id, - deployment_id, - sink, - retry_attempts, - backoff, - max_backoff, - catch_verb -) -VALUES ( - sqlc.arg('key')::subscriber_key, - ( - SELECT topic_subscriptions.id as id - FROM topic_subscriptions - INNER JOIN modules ON topic_subscriptions.module_id = modules.id - WHERE modules.name = sqlc.arg('module')::TEXT - AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT - ), - (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key), - sqlc.arg('sink'), - sqlc.arg('retry_attempts'), - sqlc.arg('backoff')::interval, - sqlc.arg('max_backoff')::interval, - sqlc.arg('catch_verb') -); - --- name: PublishEventForTopic :exec -INSERT INTO topic_events ( - "key", - topic_id, - caller, - payload, - request_key, - trace_context - ) -VALUES ( - sqlc.arg('key')::topic_event_key, - ( - SELECT topics.id - FROM topics - INNER JOIN modules ON topics.module_id = modules.id - WHERE modules.name = sqlc.arg('module')::TEXT - AND topics.name = sqlc.arg('topic')::TEXT - ), - sqlc.arg('caller')::TEXT, - sqlc.arg('payload'), - sqlc.arg('request_key')::TEXT, - sqlc.arg('trace_context')::jsonb -); - --- name: GetSubscriptionsNeedingUpdate :many --- Results may not be ready to be scheduled yet due to event consumption delay --- Sorting ensures that brand new events (that may not be ready for consumption) --- don't prevent older events from being consumed --- We also make sure that the subscription belongs to a deployment that has at least one runner -WITH runner_count AS ( - SELECT count(r.deployment_id) as runner_count, - r.deployment_id as deployment - FROM runners r - GROUP BY deployment -) -SELECT - subs.key::subscription_key as key, - curser.key as cursor, - topics.key::topic_key as topic, - subs.name -FROM topic_subscriptions subs -JOIN runner_count on subs.deployment_id = runner_count.deployment -LEFT JOIN topics ON subs.topic_id = topics.id -LEFT JOIN topic_events curser ON subs.cursor = curser.id -WHERE subs.cursor IS DISTINCT FROM topics.head - AND subs.state = 'idle' -ORDER BY curser.created_at -LIMIT 3 -FOR UPDATE OF subs SKIP LOCKED; - --- name: GetNextEventForSubscription :one -WITH cursor AS ( - SELECT - created_at, - id - FROM topic_events - WHERE "key" = sqlc.narg('cursor')::topic_event_key -) -SELECT events."key" as event, - events.payload, - events.created_at, - events.caller, - events.request_key, - events.trace_context, - NOW() - events.created_at >= sqlc.arg('consumption_delay')::interval AS ready -FROM topics -LEFT JOIN topic_events as events ON events.topic_id = topics.id -WHERE topics.key = sqlc.arg('topic')::topic_key - AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) -ORDER BY events.created_at, events.id -LIMIT 1; - --- name: GetRandomSubscriber :one -SELECT - subscribers.sink as sink, - subscribers.retry_attempts as retry_attempts, - subscribers.backoff as backoff, - subscribers.max_backoff as max_backoff, - subscribers.catch_verb as catch_verb -FROM topic_subscribers as subscribers -JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id -WHERE topic_subscriptions.key = sqlc.arg('key')::subscription_key -ORDER BY RANDOM() -LIMIT 1; - --- name: BeginConsumingTopicEvent :exec -WITH event AS ( - SELECT * - FROM topic_events - WHERE "key" = sqlc.arg('event')::topic_event_key -) -UPDATE topic_subscriptions -SET state = 'executing', - cursor = (SELECT id FROM event) -WHERE key = sqlc.arg('subscription')::subscription_key; - --- name: CompleteEventForSubscription :exec -WITH module AS ( - SELECT id - FROM modules - WHERE name = sqlc.arg('module')::TEXT -) -UPDATE topic_subscriptions -SET state = 'idle' -WHERE name = @name::TEXT - AND module_id = (SELECT id FROM module); - --- name: GetSubscription :one -WITH module AS ( - SELECT id - FROM modules - WHERE name = $2::TEXT -) -SELECT * -FROM topic_subscriptions -WHERE name = $1::TEXT - AND module_id = (SELECT id FROM module); - --- name: SetSubscriptionCursor :exec -WITH event AS ( - SELECT id, created_at, key, topic_id, payload - FROM topic_events - WHERE "key" = $2::topic_event_key -) -UPDATE topic_subscriptions -SET cursor = (SELECT id FROM event) -WHERE key = $1::subscription_key; - --- name: GetTopic :one -SELECT * -FROM topics -WHERE id = $1::BIGINT; - --- name: GetTopicEvent :one -SELECT * -FROM topic_events -WHERE id = $1::BIGINT; - -- name: AcquireAsyncCall :one -- Reserve a pending async call for execution, returning the associated lease -- reservation key and accompanying metadata. diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index 0bca963fea..db3114a47f 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -129,9 +129,9 @@ func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg Assoc const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec WITH event AS ( - SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context - FROM topic_events - WHERE "key" = $2::topic_event_key + SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context + FROM topic_events + WHERE "key" = $2::topic_event_key ) UPDATE topic_subscriptions SET state = 'executing', @@ -146,14 +146,14 @@ func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription mod const completeEventForSubscription = `-- name: CompleteEventForSubscription :exec WITH module AS ( - SELECT id - FROM modules - WHERE name = $2::TEXT + SELECT id + FROM modules + WHERE name = $2::TEXT ) UPDATE topic_subscriptions SET state = 'idle' WHERE name = $1::TEXT - AND module_id = (SELECT id FROM module) + AND module_id = (SELECT id FROM module) ` func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error { @@ -259,9 +259,9 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re const deleteSubscribers = `-- name: DeleteSubscribers :many DELETE FROM topic_subscribers WHERE deployment_id IN ( - SELECT deployments.id - FROM deployments - WHERE deployments.key = $1::deployment_key + SELECT deployments.id + FROM deployments + WHERE deployments.key = $1::deployment_key ) RETURNING topic_subscribers.key ` @@ -292,9 +292,9 @@ func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.Deploy const deleteSubscriptions = `-- name: DeleteSubscriptions :many DELETE FROM topic_subscriptions WHERE deployment_id IN ( - SELECT deployments.id - FROM deployments - WHERE deployments.key = $1::deployment_key + SELECT deployments.id + FROM deployments + WHERE deployments.key = $1::deployment_key ) RETURNING topic_subscriptions.key ` @@ -1113,21 +1113,21 @@ func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, er const getNextEventForSubscription = `-- name: GetNextEventForSubscription :one WITH cursor AS ( - SELECT - created_at, - id - FROM topic_events - WHERE "key" = $3::topic_event_key + SELECT + created_at, + id + FROM topic_events + WHERE "key" = $3::topic_event_key ) SELECT events."key" as event, - events.payload, - events.created_at, - events.caller, - events.request_key, - events.trace_context, - NOW() - events.created_at >= $1::interval AS ready + events.payload, + events.created_at, + events.caller, + events.request_key, + events.trace_context, + NOW() - events.created_at >= $1::interval AS ready FROM topics -LEFT JOIN topic_events as events ON events.topic_id = topics.id + LEFT JOIN topic_events as events ON events.topic_id = topics.id WHERE topics.key = $2::topic_key AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) ORDER BY events.created_at, events.id @@ -1137,7 +1137,7 @@ LIMIT 1 type GetNextEventForSubscriptionRow struct { Event optional.Option[model.TopicEventKey] Payload api.OptionalEncryptedAsyncColumn - CreatedAt optional.Option[time.Time] + CreatedAt sqltypes.OptionalTime Caller optional.Option[string] RequestKey optional.Option[string] TraceContext pqtype.NullRawMessage @@ -1213,13 +1213,13 @@ func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, erro const getRandomSubscriber = `-- name: GetRandomSubscriber :one SELECT - subscribers.sink as sink, - subscribers.retry_attempts as retry_attempts, - subscribers.backoff as backoff, - subscribers.max_backoff as max_backoff, - subscribers.catch_verb as catch_verb + subscribers.sink as sink, + subscribers.retry_attempts as retry_attempts, + subscribers.backoff as backoff, + subscribers.max_backoff as max_backoff, + subscribers.catch_verb as catch_verb FROM topic_subscribers as subscribers -JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id + JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id WHERE topic_subscriptions.key = $1::subscription_key ORDER BY RANDOM() LIMIT 1 @@ -1358,14 +1358,14 @@ func (q *Queries) GetSchemaForDeployment(ctx context.Context, key model.Deployme const getSubscription = `-- name: GetSubscription :one WITH module AS ( - SELECT id - FROM modules - WHERE name = $2::TEXT + SELECT id + FROM modules + WHERE name = $2::TEXT ) SELECT id, key, created_at, topic_id, module_id, deployment_id, name, cursor, state FROM topic_subscriptions WHERE name = $1::TEXT - AND module_id = (SELECT id FROM module) + AND module_id = (SELECT id FROM module) ` func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) { @@ -1387,25 +1387,25 @@ func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 s const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many WITH runner_count AS ( - SELECT count(r.deployment_id) as runner_count, - r.deployment_id as deployment - FROM runners r - GROUP BY deployment + SELECT count(r.deployment_id) as runner_count, + r.deployment_id as deployment + FROM runners r + GROUP BY deployment ) SELECT - subs.key::subscription_key as key, - curser.key as cursor, - topics.key::topic_key as topic, - subs.name + subs.key::subscription_key as key, + curser.key as cursor, + topics.key::topic_key as topic, + subs.name FROM topic_subscriptions subs -JOIN runner_count on subs.deployment_id = runner_count.deployment -LEFT JOIN topics ON subs.topic_id = topics.id -LEFT JOIN topic_events curser ON subs.cursor = curser.id + JOIN runner_count on subs.deployment_id = runner_count.deployment + LEFT JOIN topics ON subs.topic_id = topics.id + LEFT JOIN topic_events curser ON subs.cursor = curser.id WHERE subs.cursor IS DISTINCT FROM topics.head AND subs.state = 'idle' ORDER BY curser.created_at LIMIT 3 -FOR UPDATE OF subs SKIP LOCKED + FOR UPDATE OF subs SKIP LOCKED ` type GetSubscriptionsNeedingUpdateRow struct { @@ -1606,31 +1606,31 @@ func (q *Queries) GetZombieAsyncCalls(ctx context.Context, limit int32) ([]Async const insertSubscriber = `-- name: InsertSubscriber :exec INSERT INTO topic_subscribers ( - key, - topic_subscriptions_id, - deployment_id, - sink, - retry_attempts, - backoff, - max_backoff, - catch_verb + key, + topic_subscriptions_id, + deployment_id, + sink, + retry_attempts, + backoff, + max_backoff, + catch_verb ) VALUES ( - $1::subscriber_key, - ( - SELECT topic_subscriptions.id as id - FROM topic_subscriptions - INNER JOIN modules ON topic_subscriptions.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topic_subscriptions.name = $3::TEXT - ), - (SELECT id FROM deployments WHERE key = $4::deployment_key), - $5, - $6, - $7::interval, - $8::interval, - $9 -) + $1::subscriber_key, + ( + SELECT topic_subscriptions.id as id + FROM topic_subscriptions + INNER JOIN modules ON topic_subscriptions.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topic_subscriptions.name = $3::TEXT + ), + (SELECT id FROM deployments WHERE key = $4::deployment_key), + $5, + $6, + $7::interval, + $8::interval, + $9 + ) ` type InsertSubscriberParams struct { @@ -1775,21 +1775,21 @@ INSERT INTO topic_events ( payload, request_key, trace_context - ) -VALUES ( - $1::topic_event_key, - ( - SELECT topics.id - FROM topics - INNER JOIN modules ON topics.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topics.name = $3::TEXT - ), - $4::TEXT, - $5, - $6::TEXT, - $7::jsonb ) +VALUES ( + $1::topic_event_key, + ( + SELECT topics.id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topics.name = $3::TEXT + ), + $4::TEXT, + $5, + $6::TEXT, + $7::jsonb + ) ` type PublishEventForTopicParams struct { @@ -1861,9 +1861,9 @@ func (q *Queries) SetNextFSMEvent(ctx context.Context, arg SetNextFSMEventParams const setSubscriptionCursor = `-- name: SetSubscriptionCursor :exec WITH event AS ( - SELECT id, created_at, key, topic_id, payload - FROM topic_events - WHERE "key" = $2::topic_event_key + SELECT id, created_at, key, topic_id, payload + FROM topic_events + WHERE "key" = $2::topic_event_key ) UPDATE topic_subscriptions SET cursor = (SELECT id FROM event) @@ -2062,34 +2062,34 @@ func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (int const upsertSubscription = `-- name: UpsertSubscription :one INSERT INTO topic_subscriptions ( - key, - topic_id, - module_id, - deployment_id, - name) + key, + topic_id, + module_id, + deployment_id, + name) VALUES ( - $1::subscription_key, - ( - SELECT topics.id as id - FROM topics - INNER JOIN modules ON topics.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topics.name = $3::TEXT - ), - (SELECT id FROM modules WHERE name = $4::TEXT), - (SELECT id FROM deployments WHERE key = $5::deployment_key), - $6::TEXT -) + $1::subscription_key, + ( + SELECT topics.id as id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topics.name = $3::TEXT + ), + (SELECT id FROM modules WHERE name = $4::TEXT), + (SELECT id FROM deployments WHERE key = $5::deployment_key), + $6::TEXT + ) ON CONFLICT (name, module_id) DO -UPDATE SET - topic_id = excluded.topic_id, - deployment_id = (SELECT id FROM deployments WHERE key = $5::deployment_key) + UPDATE SET + topic_id = excluded.topic_id, + deployment_id = (SELECT id FROM deployments WHERE key = $5::deployment_key) RETURNING - id, - CASE - WHEN xmax = 0 THEN true - ELSE false - END AS inserted + id, + CASE + WHEN xmax = 0 THEN true + ELSE false + END AS inserted ` type UpsertSubscriptionParams struct { @@ -2123,14 +2123,14 @@ func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscription const upsertTopic = `-- name: UpsertTopic :exec INSERT INTO topics (key, module_id, name, type) VALUES ( - $1::topic_key, - (SELECT id FROM modules WHERE name = $2::TEXT LIMIT 1), - $3::TEXT, - $4::TEXT -) + $1::topic_key, + (SELECT id FROM modules WHERE name = $2::TEXT LIMIT 1), + $3::TEXT, + $4::TEXT + ) ON CONFLICT (name, module_id) DO -UPDATE SET - type = $4::TEXT + UPDATE SET + type = $4::TEXT RETURNING id ` diff --git a/backend/controller/dal/model/model.go b/backend/controller/dal/model/model.go new file mode 100644 index 0000000000..1fecb358a4 --- /dev/null +++ b/backend/controller/dal/model/model.go @@ -0,0 +1,143 @@ +package model + +import ( + "encoding" + "fmt" + "time" + + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/sha256" +) + +// NotificationPayload is a row from the database. +// +//sumtype:decl +type NotificationPayload interface{ notification() } + +// A Notification from the database. +type Notification[T NotificationPayload, Key any, KeyP interface { + *Key + encoding.TextUnmarshaler +}] struct { + Deleted optional.Option[Key] // If present the object was deleted. + Message optional.Option[T] +} + +func (n Notification[T, Key, KeyP]) String() string { + if key, ok := n.Deleted.Get(); ok { + return fmt.Sprintf("deleted %v", key) + } + return fmt.Sprintf("message %v", n.Message) +} + +type Runner struct { + Key model.RunnerKey + Endpoint string + ReservationTimeout optional.Option[time.Duration] + Module optional.Option[string] + Deployment model.DeploymentKey + Labels model.Labels +} + +func (Runner) notification() {} + +type Reconciliation struct { + Deployment model.DeploymentKey + Module string + Language string + + AssignedReplicas int + RequiredReplicas int +} + +type ControllerState string + +// Controller states. +const ( + ControllerStateLive = ControllerState(sql.ControllerStateLive) + ControllerStateDead = ControllerState(sql.ControllerStateDead) +) + +type RequestOrigin string + +const ( + RequestOriginIngress = RequestOrigin(sql.OriginIngress) + RequestOriginCron = RequestOrigin(sql.OriginCron) + RequestOriginPubsub = RequestOrigin(sql.OriginPubsub) +) + +type Deployment struct { + Key model.DeploymentKey + Language string + Module string + MinReplicas int + Replicas optional.Option[int] // Depending on the query this may or may not be populated. + Schema *schema.Module + CreatedAt time.Time + Labels model.Labels +} + +func (d Deployment) String() string { return d.Key.String() } + +func (d Deployment) notification() {} + +type Controller struct { + Key model.ControllerKey + Endpoint string + State ControllerState +} + +type Status struct { + Controllers []Controller + Runners []Runner + Deployments []Deployment + IngressRoutes []IngressRouteEntry +} + +type IngressRoute struct { + Runner model.RunnerKey + Deployment model.DeploymentKey + Endpoint string + Path string + Module string + Verb string +} + +type IngressRouteEntry struct { + Deployment model.DeploymentKey + Module string + Verb string + Method string + Path string +} + +type DeploymentArtefact struct { + Digest sha256.SHA256 + Executable bool + Path string +} + +func (d *DeploymentArtefact) ToProto() *ftlv1.DeploymentArtefact { + return &ftlv1.DeploymentArtefact{ + Digest: d.Digest.String(), + Executable: d.Executable, + Path: d.Path, + } +} + +func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error) { + digest, err := sha256.ParseSHA256(in.Digest) + if err != nil { + return DeploymentArtefact{}, fmt.Errorf("invalid digest: %w", err) + } + return DeploymentArtefact{ + Digest: digest, + Executable: in.Executable, + Path: in.Path, + }, nil +} diff --git a/backend/controller/dal/notify.go b/backend/controller/dal/notify.go index 5aa78a0ceb..b578e6ba90 100644 --- a/backend/controller/dal/notify.go +++ b/backend/controller/dal/notify.go @@ -3,40 +3,20 @@ package dal import ( "context" "crypto/sha256" - "encoding" + "errors" "fmt" "time" "github.com/alecthomas/types/optional" "github.com/jpillora/backoff" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) -// NotificationPayload is a row from the database. -// -//sumtype:decl -type NotificationPayload interface{ notification() } - -// A Notification from the database. -type Notification[T NotificationPayload, Key any, KeyP interface { - *Key - encoding.TextUnmarshaler -}] struct { - Deleted optional.Option[Key] // If present the object was deleted. - Message optional.Option[T] -} - -func (n Notification[T, Key, KeyP]) String() string { - if key, ok := n.Deleted.Get(); ok { - return fmt.Sprintf("deleted %v", key) - } - return fmt.Sprintf("message %v", n.Message) -} - // DeploymentNotification is a notification from the database when a deployment changes. -type DeploymentNotification = Notification[Deployment, model.DeploymentKey, *model.DeploymentKey] +type DeploymentNotification = dalmodel.Notification[dalmodel.Deployment, model.DeploymentKey, *model.DeploymentKey] type deploymentState struct { Key model.DeploymentKey @@ -44,7 +24,7 @@ type deploymentState struct { minReplicas int } -func deploymentStateFromDeployment(deployment Deployment) (deploymentState, error) { +func deploymentStateFromDeployment(deployment dalmodel.Deployment) (deploymentState, error) { hasher := sha256.New() data := []byte(deployment.Schema.String()) if _, err := hasher.Write(data); err != nil { @@ -70,7 +50,7 @@ func (d *DAL) PollDeployments(ctx context.Context) { deployments, err := d.GetDeploymentsWithMinReplicas(ctx) if err != nil { - if ctx.Err() == context.Canceled { + if errors.Is(ctx.Err(), context.Canceled) { logger.Tracef("Polling stopped: %v", ctx.Err()) return } diff --git a/backend/controller/ingress/handler.go b/backend/controller/ingress/handler.go index 76cac670f5..fd8f6ed2a7 100644 --- a/backend/controller/ingress/handler.go +++ b/backend/controller/ingress/handler.go @@ -13,7 +13,7 @@ import ( "connectrpc.com/connect" "github.com/alecthomas/types/optional" - "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/timeline" "github.com/TBD54566975/ftl/backend/libdal" @@ -29,7 +29,7 @@ func Handle( startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, - routes []dal.IngressRoute, + routes []dalmodel.IngressRoute, w http.ResponseWriter, r *http.Request, timelineService *timeline.Service, diff --git a/backend/controller/ingress/handler_test.go b/backend/controller/ingress/handler_test.go index ab9f355508..c65f701f9b 100644 --- a/backend/controller/ingress/handler_test.go +++ b/backend/controller/ingress/handler_test.go @@ -13,7 +13,7 @@ import ( "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/optional" - "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" @@ -63,7 +63,7 @@ func TestIngress(t *testing.T) { `) assert.NoError(t, err) - routes := []dal.IngressRoute{ + routes := []dalmodel.IngressRoute{ {Path: "/getAlias", Module: "test", Verb: "getAlias"}, {Path: "/getPath/{username}", Module: "test", Verb: "getPath"}, {Path: "/postMissingTypes", Module: "test", Verb: "postMissingTypes"}, diff --git a/backend/controller/ingress/ingress.go b/backend/controller/ingress/ingress.go index 2cd77408ff..837b3eff66 100644 --- a/backend/controller/ingress/ingress.go +++ b/backend/controller/ingress/ingress.go @@ -6,14 +6,14 @@ import ( "math/rand" "strings" - "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/slices" ) -func GetIngressRoute(routes []dal.IngressRoute, method string, path string) (*dal.IngressRoute, error) { - var matchedRoutes = slices.Filter(routes, func(route dal.IngressRoute) bool { +func GetIngressRoute(routes []model.IngressRoute, method string, path string) (*model.IngressRoute, error) { + var matchedRoutes = slices.Filter(routes, func(route model.IngressRoute) bool { return matchSegments(route.Path, path, func(segment, value string) {}) }) diff --git a/backend/controller/ingress/request.go b/backend/controller/ingress/request.go index 27e067cf8b..5452778891 100644 --- a/backend/controller/ingress/request.go +++ b/backend/controller/ingress/request.go @@ -11,13 +11,13 @@ import ( "github.com/alecthomas/types/optional" - "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/slices" ) // BuildRequestBody extracts the HttpRequest body from an HTTP request. -func BuildRequestBody(route *dal.IngressRoute, r *http.Request, sch *schema.Schema) ([]byte, error) { +func BuildRequestBody(route *model.IngressRoute, r *http.Request, sch *schema.Schema) ([]byte, error) { verb := &schema.Verb{} err := sch.ResolveToType(&schema.Ref{Name: route.Verb, Module: route.Module}, verb) if err != nil { diff --git a/backend/controller/ingress/request_test.go b/backend/controller/ingress/request_test.go index af3363f275..a26da0a257 100644 --- a/backend/controller/ingress/request_test.go +++ b/backend/controller/ingress/request_test.go @@ -10,7 +10,7 @@ import ( "github.com/alecthomas/assert/v2" - "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/go-runtime/encoding" "github.com/TBD54566975/ftl/go-runtime/ftl" @@ -240,7 +240,7 @@ func TestBuildRequestBody(t *testing.T) { } r, err := http.NewRequest(test.method, requestURL, bytes.NewReader(body)) //nolint:noctx assert.NoError(t, err) - requestBody, err := BuildRequestBody(&dal.IngressRoute{ + requestBody, err := BuildRequestBody(&model.IngressRoute{ Path: test.routePath, Module: "test", Verb: test.verb, diff --git a/backend/controller/pubsub/integration_test.go b/backend/controller/pubsub/integration_test.go index 508bb588b9..7f6967b39a 100644 --- a/backend/controller/pubsub/integration_test.go +++ b/backend/controller/pubsub/integration_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/schema" in "github.com/TBD54566975/ftl/internal/integration" ) @@ -36,7 +36,7 @@ func TestPubSub(t *testing.T) { WHERE state = 'success' AND origin = '%s' - `, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "testTopicSubscription"}}.String()), + `, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "testTopicSubscription"}}.String()), events), ) } @@ -108,7 +108,7 @@ func TestRetry(t *testing.T) { AND verb = 'subscriber.consumeButFailAndRetry' AND catching = false AND origin = '%s' - `, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()), + `, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()), 1+retriesPerCall), // check that there is one failed attempt to catch (we purposely fail the first one) @@ -123,7 +123,7 @@ func TestRetry(t *testing.T) { AND error LIKE '%%catching error%%' AND catching = true AND origin = '%s' - `, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()), + `, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()), 1), // check that there is one successful attempt to catch (we succeed the second one as long as we receive the correct error in the request) @@ -137,7 +137,7 @@ func TestRetry(t *testing.T) { AND error IS NULL AND catching = true AND origin = '%s' -`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()), +`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()), 1), // check that there was one successful attempt to catchAny @@ -151,7 +151,7 @@ func TestRetry(t *testing.T) { AND error IS NULL AND catching = true AND origin = '%s' -`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription2"}}.String()), +`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription2"}}.String()), 1), ) } diff --git a/backend/controller/dal/pubsub.go b/backend/controller/pubsub/internal/dal/dal.go similarity index 89% rename from backend/controller/dal/pubsub.go rename to backend/controller/pubsub/internal/dal/dal.go index 559fab474c..debe674af5 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/pubsub/internal/dal/dal.go @@ -8,9 +8,11 @@ import ( "github.com/alecthomas/types/optional" - sql2 "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" + "github.com/TBD54566975/ftl/backend/controller/async" + "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/encryption/api" "github.com/TBD54566975/ftl/backend/controller/observability" + dalsql "github.com/TBD54566975/ftl/backend/controller/pubsub/internal/sql" "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" @@ -20,6 +22,26 @@ import ( "github.com/TBD54566975/ftl/internal/slices" ) +type DAL struct { + *libdal.Handle[DAL] + db dalsql.Querier + encryption *encryption.Service +} + +func New(conn libdal.Connection, encryption *encryption.Service) *DAL { + return &DAL{ + Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { + return &DAL{ + Handle: h, + db: dalsql.New(h.Connection), + encryption: encryption, + } + }), + db: dalsql.New(conn), + encryption: encryption, + } +} + func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error { var encryptedPayload api.EncryptedAsyncColumn err := d.encryption.Encrypt(payload, &encryptedPayload) @@ -44,7 +66,7 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller st return fmt.Errorf("failed to get request key: %w", err) } - err = d.db.PublishEventForTopic(ctx, sql2.PublishEventForTopicParams{ + err = d.db.PublishEventForTopic(ctx, dalsql.PublishEventForTopicParams{ Key: model.NewTopicEventKey(module, topic), Module: module, Topic: topic, @@ -65,7 +87,7 @@ func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscr if err != nil { return nil, libdal.TranslatePGError(err) } - return slices.Map(rows, func(row sql2.GetSubscriptionsNeedingUpdateRow) model.Subscription { + return slices.Map(rows, func(row dalsql.GetSubscriptionsNeedingUpdateRow) model.Subscription { return model.Subscription{ Name: row.Name, Key: row.Key, @@ -125,14 +147,14 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t return 0, fmt.Errorf("failed to progress subscription: %w", libdal.TranslatePGError(err)) } - origin := AsyncOriginPubSub{ + origin := async.AsyncOriginPubSub{ Subscription: schema.RefKey{ Module: subscription.Key.Payload.Module, Name: subscription.Key.Payload.Name, }, } - _, err = tx.db.CreateAsyncCall(ctx, sql2.CreateAsyncCallParams{ + _, err = tx.db.CreateAsyncCall(ctx, dalsql.CreateAsyncCallParams{ ScheduledAt: time.Now(), Verb: subscriber.Sink, Origin: origin.String(), @@ -168,7 +190,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t return successful, nil } -func subscriptionRef(subscription sql2.GetSubscriptionsNeedingUpdateRow) schema.RefKey { +func subscriptionRef(subscription dalsql.GetSubscriptionsNeedingUpdateRow) schema.RefKey { return schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name} } @@ -219,7 +241,7 @@ func (d *DAL) ResetSubscription(ctx context.Context, module, name string) (err e return nil } -func (d *DAL) createSubscriptions(ctx context.Context, key model.DeploymentKey, module *schema.Module) error { +func (d *DAL) CreateSubscriptions(ctx context.Context, key model.DeploymentKey, module *schema.Module) error { logger := log.FromContext(ctx) for _, decl := range module.Decls { @@ -237,7 +259,7 @@ func (d *DAL) createSubscriptions(ctx context.Context, key model.DeploymentKey, continue } subscriptionKey := model.NewSubscriptionKey(module.Name, s.Name) - result, err := d.db.UpsertSubscription(ctx, sql2.UpsertSubscriptionParams{ + result, err := d.db.UpsertSubscription(ctx, dalsql.UpsertSubscriptionParams{ Key: subscriptionKey, Module: module.Name, Deployment: key, @@ -276,7 +298,7 @@ func hasSubscribers(subscription *schema.Subscription, decls []schema.Decl) bool return false } -func (d *DAL) createSubscribers(ctx context.Context, key model.DeploymentKey, module *schema.Module) error { +func (d *DAL) CreateSubscribers(ctx context.Context, key model.DeploymentKey, module *schema.Module) error { logger := log.FromContext(ctx) for _, decl := range module.Decls { v, ok := decl.(*schema.Verb) @@ -301,7 +323,7 @@ func (d *DAL) createSubscribers(ctx context.Context, key model.DeploymentKey, mo } } subscriberKey := model.NewSubscriberKey(module.Name, s.Name, v.Name) - err = d.db.InsertSubscriber(ctx, sql2.InsertSubscriberParams{ + err = d.db.InsertSubscriber(ctx, dalsql.InsertSubscriberParams{ Key: subscriberKey, Module: module.Name, SubscriptionName: s.Name, @@ -321,7 +343,7 @@ func (d *DAL) createSubscribers(ctx context.Context, key model.DeploymentKey, mo return nil } -func (d *DAL) removeSubscriptionsAndSubscribers(ctx context.Context, key model.DeploymentKey) error { +func (d *DAL) RemoveSubscriptionsAndSubscribers(ctx context.Context, key model.DeploymentKey) error { logger := log.FromContext(ctx) subscribers, err := d.db.DeleteSubscribers(ctx, key) diff --git a/backend/controller/pubsub/internal/sql/async_queries.sql.go b/backend/controller/pubsub/internal/sql/async_queries.sql.go new file mode 100644 index 0000000000..e6002dd786 --- /dev/null +++ b/backend/controller/pubsub/internal/sql/async_queries.sql.go @@ -0,0 +1,89 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: async_queries.sql + +package sql + +import ( + "context" + "encoding/json" + "time" + + "github.com/TBD54566975/ftl/backend/controller/encryption/api" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/alecthomas/types/optional" +) + +const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one +SELECT count(*) +FROM async_calls +WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') +` + +func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) + var count int64 + err := row.Scan(&count) + return count, err +} + +const createAsyncCall = `-- name: CreateAsyncCall :one +INSERT INTO async_calls ( + scheduled_at, + verb, + origin, + request, + remaining_attempts, + backoff, + max_backoff, + catch_verb, + parent_request_key, + trace_context +) +VALUES ( + $1::TIMESTAMPTZ, + $2, + $3, + $4, + $5, + $6::interval, + $7::interval, + $8, + $9, + $10::jsonb +) +RETURNING id +` + +type CreateAsyncCallParams struct { + ScheduledAt time.Time + Verb schema.RefKey + Origin string + Request api.EncryptedAsyncColumn + RemainingAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] + ParentRequestKey optional.Option[string] + TraceContext json.RawMessage +} + +func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { + row := q.db.QueryRowContext(ctx, createAsyncCall, + arg.ScheduledAt, + arg.Verb, + arg.Origin, + arg.Request, + arg.RemainingAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + arg.ParentRequestKey, + arg.TraceContext, + ) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/backend/controller/pubsub/internal/sql/db.go b/backend/controller/pubsub/internal/sql/db.go new file mode 100644 index 0000000000..0e0973111c --- /dev/null +++ b/backend/controller/pubsub/internal/sql/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/backend/controller/pubsub/internal/sql/models.go b/backend/controller/pubsub/internal/sql/models.go new file mode 100644 index 0000000000..9f21a4c962 --- /dev/null +++ b/backend/controller/pubsub/internal/sql/models.go @@ -0,0 +1,91 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "database/sql/driver" + "fmt" + "time" + + "github.com/TBD54566975/ftl/backend/controller/encryption/api" + "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/types/optional" + "github.com/sqlc-dev/pqtype" +) + +type TopicSubscriptionState string + +const ( + TopicSubscriptionStateIdle TopicSubscriptionState = "idle" + TopicSubscriptionStateExecuting TopicSubscriptionState = "executing" +) + +func (e *TopicSubscriptionState) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = TopicSubscriptionState(s) + case string: + *e = TopicSubscriptionState(s) + default: + return fmt.Errorf("unsupported scan type for TopicSubscriptionState: %T", src) + } + return nil +} + +type NullTopicSubscriptionState struct { + TopicSubscriptionState TopicSubscriptionState + Valid bool // Valid is true if TopicSubscriptionState is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullTopicSubscriptionState) Scan(value interface{}) error { + if value == nil { + ns.TopicSubscriptionState, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.TopicSubscriptionState.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullTopicSubscriptionState) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.TopicSubscriptionState), nil +} + +type Topic struct { + ID int64 + Key model.TopicKey + CreatedAt time.Time + ModuleID int64 + Name string + Type string + Head optional.Option[int64] +} + +type TopicEvent struct { + ID int64 + CreatedAt time.Time + Key model.TopicEventKey + TopicID int64 + Payload api.EncryptedAsyncColumn + Caller optional.Option[string] + RequestKey optional.Option[string] + TraceContext pqtype.NullRawMessage +} + +type TopicSubscription struct { + ID int64 + Key model.SubscriptionKey + CreatedAt time.Time + TopicID int64 + ModuleID int64 + DeploymentID int64 + Name string + Cursor optional.Option[int64] + State TopicSubscriptionState +} diff --git a/backend/controller/pubsub/internal/sql/querier.go b/backend/controller/pubsub/internal/sql/querier.go new file mode 100644 index 0000000000..9c40823ebf --- /dev/null +++ b/backend/controller/pubsub/internal/sql/querier.go @@ -0,0 +1,39 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/types/optional" +) + +type Querier interface { + AsyncCallQueueDepth(ctx context.Context) (int64, error) + BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error + CompleteEventForSubscription(ctx context.Context, name string, module string) error + CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) + DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) + DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) + GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) + GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) + GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) + // Results may not be ready to be scheduled yet due to event consumption delay + // Sorting ensures that brand new events (that may not be ready for consumption) + // don't prevent older events from being consumed + // We also make sure that the subscription belongs to a deployment that has at least one runner + GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) + GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) + GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) + InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error + PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error + SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error + UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) + UpsertTopic(ctx context.Context, arg UpsertTopicParams) error +} + +var _ Querier = (*Queries)(nil) diff --git a/backend/controller/pubsub/internal/sql/queries.sql b/backend/controller/pubsub/internal/sql/queries.sql new file mode 100644 index 0000000000..84df1bc654 --- /dev/null +++ b/backend/controller/pubsub/internal/sql/queries.sql @@ -0,0 +1,228 @@ +-- name: UpsertTopic :exec +INSERT INTO topics (key, module_id, name, type) +VALUES ( + sqlc.arg('topic')::topic_key, + (SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT LIMIT 1), + sqlc.arg('name')::TEXT, + sqlc.arg('event_type')::TEXT + ) +ON CONFLICT (name, module_id) DO + UPDATE SET + type = sqlc.arg('event_type')::TEXT +RETURNING id; + +-- name: UpsertSubscription :one +INSERT INTO topic_subscriptions ( + key, + topic_id, + module_id, + deployment_id, + name) +VALUES ( + sqlc.arg('key')::subscription_key, + ( + SELECT topics.id as id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = sqlc.arg('topic_module')::TEXT + AND topics.name = sqlc.arg('topic_name')::TEXT + ), + (SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT), + (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key), + sqlc.arg('name')::TEXT + ) +ON CONFLICT (name, module_id) DO + UPDATE SET + topic_id = excluded.topic_id, + deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key) +RETURNING + id, + CASE + WHEN xmax = 0 THEN true + ELSE false + END AS inserted; + +-- name: DeleteSubscriptions :many +DELETE FROM topic_subscriptions +WHERE deployment_id IN ( + SELECT deployments.id + FROM deployments + WHERE deployments.key = sqlc.arg('deployment')::deployment_key +) +RETURNING topic_subscriptions.key; + +-- name: DeleteSubscribers :many +DELETE FROM topic_subscribers +WHERE deployment_id IN ( + SELECT deployments.id + FROM deployments + WHERE deployments.key = sqlc.arg('deployment')::deployment_key +) +RETURNING topic_subscribers.key; + +-- name: InsertSubscriber :exec +INSERT INTO topic_subscribers ( + key, + topic_subscriptions_id, + deployment_id, + sink, + retry_attempts, + backoff, + max_backoff, + catch_verb +) +VALUES ( + sqlc.arg('key')::subscriber_key, + ( + SELECT topic_subscriptions.id as id + FROM topic_subscriptions + INNER JOIN modules ON topic_subscriptions.module_id = modules.id + WHERE modules.name = sqlc.arg('module')::TEXT + AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT + ), + (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key), + sqlc.arg('sink'), + sqlc.arg('retry_attempts'), + sqlc.arg('backoff')::interval, + sqlc.arg('max_backoff')::interval, + sqlc.arg('catch_verb') + ); + +-- name: PublishEventForTopic :exec +INSERT INTO topic_events ( + "key", + topic_id, + caller, + payload, + request_key, + trace_context +) +VALUES ( + sqlc.arg('key')::topic_event_key, + ( + SELECT topics.id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = sqlc.arg('module')::TEXT + AND topics.name = sqlc.arg('topic')::TEXT + ), + sqlc.arg('caller')::TEXT, + sqlc.arg('payload'), + sqlc.arg('request_key')::TEXT, + sqlc.arg('trace_context')::jsonb + ); + +-- name: GetSubscriptionsNeedingUpdate :many +-- Results may not be ready to be scheduled yet due to event consumption delay +-- Sorting ensures that brand new events (that may not be ready for consumption) +-- don't prevent older events from being consumed +-- We also make sure that the subscription belongs to a deployment that has at least one runner +WITH runner_count AS ( + SELECT count(r.deployment_id) as runner_count, + r.deployment_id as deployment + FROM runners r + GROUP BY deployment +) +SELECT + subs.key::subscription_key as key, + curser.key as cursor, + topics.key::topic_key as topic, + subs.name +FROM topic_subscriptions subs + JOIN runner_count on subs.deployment_id = runner_count.deployment + LEFT JOIN topics ON subs.topic_id = topics.id + LEFT JOIN topic_events curser ON subs.cursor = curser.id +WHERE subs.cursor IS DISTINCT FROM topics.head + AND subs.state = 'idle' +ORDER BY curser.created_at +LIMIT 3 + FOR UPDATE OF subs SKIP LOCKED; + +-- name: GetNextEventForSubscription :one +WITH cursor AS ( + SELECT + created_at, + id + FROM topic_events + WHERE "key" = sqlc.narg('cursor')::topic_event_key +) +SELECT events."key" as event, + events.payload, + events.created_at, + events.caller, + events.request_key, + events.trace_context, + NOW() - events.created_at >= sqlc.arg('consumption_delay')::interval AS ready +FROM topics + LEFT JOIN topic_events as events ON events.topic_id = topics.id +WHERE topics.key = sqlc.arg('topic')::topic_key + AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) +ORDER BY events.created_at, events.id +LIMIT 1; + +-- name: GetRandomSubscriber :one +SELECT + subscribers.sink as sink, + subscribers.retry_attempts as retry_attempts, + subscribers.backoff as backoff, + subscribers.max_backoff as max_backoff, + subscribers.catch_verb as catch_verb +FROM topic_subscribers as subscribers + JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id +WHERE topic_subscriptions.key = sqlc.arg('key')::subscription_key +ORDER BY RANDOM() +LIMIT 1; + +-- name: BeginConsumingTopicEvent :exec +WITH event AS ( + SELECT * + FROM topic_events + WHERE "key" = sqlc.arg('event')::topic_event_key +) +UPDATE topic_subscriptions +SET state = 'executing', + cursor = (SELECT id FROM event) +WHERE key = sqlc.arg('subscription')::subscription_key; + +-- name: CompleteEventForSubscription :exec +WITH module AS ( + SELECT id + FROM modules + WHERE name = sqlc.arg('module')::TEXT +) +UPDATE topic_subscriptions +SET state = 'idle' +WHERE name = @name::TEXT + AND module_id = (SELECT id FROM module); + +-- name: GetSubscription :one +WITH module AS ( + SELECT id + FROM modules + WHERE name = $2::TEXT +) +SELECT * +FROM topic_subscriptions +WHERE name = $1::TEXT + AND module_id = (SELECT id FROM module); + +-- name: SetSubscriptionCursor :exec +WITH event AS ( + SELECT id, created_at, key, topic_id, payload + FROM topic_events + WHERE "key" = $2::topic_event_key +) +UPDATE topic_subscriptions +SET cursor = (SELECT id FROM event) +WHERE key = $1::subscription_key; + +-- name: GetTopic :one +SELECT * +FROM topics +WHERE id = $1::BIGINT; + +-- name: GetTopicEvent :one +SELECT * +FROM topic_events +WHERE id = $1::BIGINT; + diff --git a/backend/controller/pubsub/internal/sql/queries.sql.go b/backend/controller/pubsub/internal/sql/queries.sql.go new file mode 100644 index 0000000000..1a9ef5dd1d --- /dev/null +++ b/backend/controller/pubsub/internal/sql/queries.sql.go @@ -0,0 +1,546 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: queries.sql + +package sql + +import ( + "context" + "encoding/json" + + "github.com/TBD54566975/ftl/backend/controller/encryption/api" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/types/optional" + "github.com/sqlc-dev/pqtype" +) + +const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec +WITH event AS ( + SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context + FROM topic_events + WHERE "key" = $2::topic_event_key +) +UPDATE topic_subscriptions +SET state = 'executing', + cursor = (SELECT id FROM event) +WHERE key = $1::subscription_key +` + +func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error { + _, err := q.db.ExecContext(ctx, beginConsumingTopicEvent, subscription, event) + return err +} + +const completeEventForSubscription = `-- name: CompleteEventForSubscription :exec +WITH module AS ( + SELECT id + FROM modules + WHERE name = $2::TEXT +) +UPDATE topic_subscriptions +SET state = 'idle' +WHERE name = $1::TEXT + AND module_id = (SELECT id FROM module) +` + +func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error { + _, err := q.db.ExecContext(ctx, completeEventForSubscription, name, module) + return err +} + +const deleteSubscribers = `-- name: DeleteSubscribers :many +DELETE FROM topic_subscribers +WHERE deployment_id IN ( + SELECT deployments.id + FROM deployments + WHERE deployments.key = $1::deployment_key +) +RETURNING topic_subscribers.key +` + +func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) { + rows, err := q.db.QueryContext(ctx, deleteSubscribers, deployment) + if err != nil { + return nil, err + } + defer rows.Close() + var items []model.SubscriberKey + for rows.Next() { + var key model.SubscriberKey + if err := rows.Scan(&key); err != nil { + return nil, err + } + items = append(items, key) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const deleteSubscriptions = `-- name: DeleteSubscriptions :many +DELETE FROM topic_subscriptions +WHERE deployment_id IN ( + SELECT deployments.id + FROM deployments + WHERE deployments.key = $1::deployment_key +) +RETURNING topic_subscriptions.key +` + +func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) { + rows, err := q.db.QueryContext(ctx, deleteSubscriptions, deployment) + if err != nil { + return nil, err + } + defer rows.Close() + var items []model.SubscriptionKey + for rows.Next() { + var key model.SubscriptionKey + if err := rows.Scan(&key); err != nil { + return nil, err + } + items = append(items, key) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getNextEventForSubscription = `-- name: GetNextEventForSubscription :one +WITH cursor AS ( + SELECT + created_at, + id + FROM topic_events + WHERE "key" = $3::topic_event_key +) +SELECT events."key" as event, + events.payload, + events.created_at, + events.caller, + events.request_key, + events.trace_context, + NOW() - events.created_at >= $1::interval AS ready +FROM topics + LEFT JOIN topic_events as events ON events.topic_id = topics.id +WHERE topics.key = $2::topic_key + AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) +ORDER BY events.created_at, events.id +LIMIT 1 +` + +type GetNextEventForSubscriptionRow struct { + Event optional.Option[model.TopicEventKey] + Payload api.OptionalEncryptedAsyncColumn + CreatedAt sqltypes.OptionalTime + Caller optional.Option[string] + RequestKey optional.Option[string] + TraceContext pqtype.NullRawMessage + Ready bool +} + +func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) { + row := q.db.QueryRowContext(ctx, getNextEventForSubscription, consumptionDelay, topic, cursor) + var i GetNextEventForSubscriptionRow + err := row.Scan( + &i.Event, + &i.Payload, + &i.CreatedAt, + &i.Caller, + &i.RequestKey, + &i.TraceContext, + &i.Ready, + ) + return i, err +} + +const getRandomSubscriber = `-- name: GetRandomSubscriber :one +SELECT + subscribers.sink as sink, + subscribers.retry_attempts as retry_attempts, + subscribers.backoff as backoff, + subscribers.max_backoff as max_backoff, + subscribers.catch_verb as catch_verb +FROM topic_subscribers as subscribers + JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id +WHERE topic_subscriptions.key = $1::subscription_key +ORDER BY RANDOM() +LIMIT 1 +` + +type GetRandomSubscriberRow struct { + Sink schema.RefKey + RetryAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] +} + +func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) { + row := q.db.QueryRowContext(ctx, getRandomSubscriber, key) + var i GetRandomSubscriberRow + err := row.Scan( + &i.Sink, + &i.RetryAttempts, + &i.Backoff, + &i.MaxBackoff, + &i.CatchVerb, + ) + return i, err +} + +const getSubscription = `-- name: GetSubscription :one +WITH module AS ( + SELECT id + FROM modules + WHERE name = $2::TEXT +) +SELECT id, key, created_at, topic_id, module_id, deployment_id, name, cursor, state +FROM topic_subscriptions +WHERE name = $1::TEXT + AND module_id = (SELECT id FROM module) +` + +func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) { + row := q.db.QueryRowContext(ctx, getSubscription, column1, column2) + var i TopicSubscription + err := row.Scan( + &i.ID, + &i.Key, + &i.CreatedAt, + &i.TopicID, + &i.ModuleID, + &i.DeploymentID, + &i.Name, + &i.Cursor, + &i.State, + ) + return i, err +} + +const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many +WITH runner_count AS ( + SELECT count(r.deployment_id) as runner_count, + r.deployment_id as deployment + FROM runners r + GROUP BY deployment +) +SELECT + subs.key::subscription_key as key, + curser.key as cursor, + topics.key::topic_key as topic, + subs.name +FROM topic_subscriptions subs + JOIN runner_count on subs.deployment_id = runner_count.deployment + LEFT JOIN topics ON subs.topic_id = topics.id + LEFT JOIN topic_events curser ON subs.cursor = curser.id +WHERE subs.cursor IS DISTINCT FROM topics.head + AND subs.state = 'idle' +ORDER BY curser.created_at +LIMIT 3 + FOR UPDATE OF subs SKIP LOCKED +` + +type GetSubscriptionsNeedingUpdateRow struct { + Key model.SubscriptionKey + Cursor optional.Option[model.TopicEventKey] + Topic model.TopicKey + Name string +} + +// Results may not be ready to be scheduled yet due to event consumption delay +// Sorting ensures that brand new events (that may not be ready for consumption) +// don't prevent older events from being consumed +// We also make sure that the subscription belongs to a deployment that has at least one runner +func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { + rows, err := q.db.QueryContext(ctx, getSubscriptionsNeedingUpdate) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetSubscriptionsNeedingUpdateRow + for rows.Next() { + var i GetSubscriptionsNeedingUpdateRow + if err := rows.Scan( + &i.Key, + &i.Cursor, + &i.Topic, + &i.Name, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getTopic = `-- name: GetTopic :one +SELECT id, key, created_at, module_id, name, type, head +FROM topics +WHERE id = $1::BIGINT +` + +func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) { + row := q.db.QueryRowContext(ctx, getTopic, dollar_1) + var i Topic + err := row.Scan( + &i.ID, + &i.Key, + &i.CreatedAt, + &i.ModuleID, + &i.Name, + &i.Type, + &i.Head, + ) + return i, err +} + +const getTopicEvent = `-- name: GetTopicEvent :one +SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context +FROM topic_events +WHERE id = $1::BIGINT +` + +func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) { + row := q.db.QueryRowContext(ctx, getTopicEvent, dollar_1) + var i TopicEvent + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.Key, + &i.TopicID, + &i.Payload, + &i.Caller, + &i.RequestKey, + &i.TraceContext, + ) + return i, err +} + +const insertSubscriber = `-- name: InsertSubscriber :exec +INSERT INTO topic_subscribers ( + key, + topic_subscriptions_id, + deployment_id, + sink, + retry_attempts, + backoff, + max_backoff, + catch_verb +) +VALUES ( + $1::subscriber_key, + ( + SELECT topic_subscriptions.id as id + FROM topic_subscriptions + INNER JOIN modules ON topic_subscriptions.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topic_subscriptions.name = $3::TEXT + ), + (SELECT id FROM deployments WHERE key = $4::deployment_key), + $5, + $6, + $7::interval, + $8::interval, + $9 + ) +` + +type InsertSubscriberParams struct { + Key model.SubscriberKey + Module string + SubscriptionName string + Deployment model.DeploymentKey + Sink schema.RefKey + RetryAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] +} + +func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { + _, err := q.db.ExecContext(ctx, insertSubscriber, + arg.Key, + arg.Module, + arg.SubscriptionName, + arg.Deployment, + arg.Sink, + arg.RetryAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + ) + return err +} + +const publishEventForTopic = `-- name: PublishEventForTopic :exec +INSERT INTO topic_events ( + "key", + topic_id, + caller, + payload, + request_key, + trace_context +) +VALUES ( + $1::topic_event_key, + ( + SELECT topics.id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topics.name = $3::TEXT + ), + $4::TEXT, + $5, + $6::TEXT, + $7::jsonb + ) +` + +type PublishEventForTopicParams struct { + Key model.TopicEventKey + Module string + Topic string + Caller string + Payload api.EncryptedAsyncColumn + RequestKey string + TraceContext json.RawMessage +} + +func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error { + _, err := q.db.ExecContext(ctx, publishEventForTopic, + arg.Key, + arg.Module, + arg.Topic, + arg.Caller, + arg.Payload, + arg.RequestKey, + arg.TraceContext, + ) + return err +} + +const setSubscriptionCursor = `-- name: SetSubscriptionCursor :exec +WITH event AS ( + SELECT id, created_at, key, topic_id, payload + FROM topic_events + WHERE "key" = $2::topic_event_key +) +UPDATE topic_subscriptions +SET cursor = (SELECT id FROM event) +WHERE key = $1::subscription_key +` + +func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error { + _, err := q.db.ExecContext(ctx, setSubscriptionCursor, column1, column2) + return err +} + +const upsertSubscription = `-- name: UpsertSubscription :one +INSERT INTO topic_subscriptions ( + key, + topic_id, + module_id, + deployment_id, + name) +VALUES ( + $1::subscription_key, + ( + SELECT topics.id as id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topics.name = $3::TEXT + ), + (SELECT id FROM modules WHERE name = $4::TEXT), + (SELECT id FROM deployments WHERE key = $5::deployment_key), + $6::TEXT + ) +ON CONFLICT (name, module_id) DO + UPDATE SET + topic_id = excluded.topic_id, + deployment_id = (SELECT id FROM deployments WHERE key = $5::deployment_key) +RETURNING + id, + CASE + WHEN xmax = 0 THEN true + ELSE false + END AS inserted +` + +type UpsertSubscriptionParams struct { + Key model.SubscriptionKey + TopicModule string + TopicName string + Module string + Deployment model.DeploymentKey + Name string +} + +type UpsertSubscriptionRow struct { + ID int64 + Inserted bool +} + +func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) { + row := q.db.QueryRowContext(ctx, upsertSubscription, + arg.Key, + arg.TopicModule, + arg.TopicName, + arg.Module, + arg.Deployment, + arg.Name, + ) + var i UpsertSubscriptionRow + err := row.Scan(&i.ID, &i.Inserted) + return i, err +} + +const upsertTopic = `-- name: UpsertTopic :exec +INSERT INTO topics (key, module_id, name, type) +VALUES ( + $1::topic_key, + (SELECT id FROM modules WHERE name = $2::TEXT LIMIT 1), + $3::TEXT, + $4::TEXT + ) +ON CONFLICT (name, module_id) DO + UPDATE SET + type = $4::TEXT +RETURNING id +` + +type UpsertTopicParams struct { + Topic model.TopicKey + Module string + Name string + EventType string +} + +func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error { + _, err := q.db.ExecContext(ctx, upsertTopic, + arg.Topic, + arg.Module, + arg.Name, + arg.EventType, + ) + return err +} diff --git a/backend/controller/pubsub/manager.go b/backend/controller/pubsub/manager.go deleted file mode 100644 index 9a7e340e33..0000000000 --- a/backend/controller/pubsub/manager.go +++ /dev/null @@ -1,82 +0,0 @@ -package pubsub - -import ( - "context" - "time" - - "github.com/jpillora/backoff" - - "github.com/TBD54566975/ftl/backend/controller/dal" - "github.com/TBD54566975/ftl/backend/controller/scheduledtask" - "github.com/TBD54566975/ftl/internal/log" -) - -const ( - // Events can be added simultaneously, which can cause events with out of order create_at values - // By adding a delay, we ensure that by the time we read the events, no new events will be added - // with earlier created_at values. - eventConsumptionDelay = 200 * time.Millisecond -) - -type DAL interface { - ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) - CompleteEventForSubscription(ctx context.Context, module, name string) error -} - -type Scheduler interface { - Singleton(retry backoff.Backoff, job scheduledtask.Job) - Parallel(retry backoff.Backoff, job scheduledtask.Job) -} - -type AsyncCallListener interface { - AsyncCallWasAdded(ctx context.Context) -} - -type Manager struct { - dal DAL - scheduler Scheduler - asyncCallListener AsyncCallListener -} - -func New(ctx context.Context, dal *dal.DAL, scheduler Scheduler, asyncCallListener AsyncCallListener) *Manager { - m := &Manager{ - dal: dal, - scheduler: scheduler, - asyncCallListener: asyncCallListener, - } - m.scheduler.Parallel(backoff.Backoff{ - Min: 1 * time.Second, - Max: 5 * time.Second, - Jitter: true, - Factor: 1.5, - }, m.progressSubscriptions) - return m -} - -func (m *Manager) progressSubscriptions(ctx context.Context) (time.Duration, error) { - count, err := m.dal.ProgressSubscriptions(ctx, eventConsumptionDelay) - if err != nil { - return 0, err - } - if count > 0 { - // notify controller that we added an async call - m.asyncCallListener.AsyncCallWasAdded(ctx) - } - return time.Second, err -} - -// OnCallCompletion is called within a transaction after an async call has completed to allow the subscription state to be updated. -func (m *Manager) OnCallCompletion(ctx context.Context, tx *dal.DAL, origin dal.AsyncOriginPubSub, failed bool, isFinalResult bool) error { - if !isFinalResult { - // Wait for the async call's retries to complete before progressing the subscription - return nil - } - return m.dal.CompleteEventForSubscription(ctx, origin.Subscription.Module, origin.Subscription.Name) -} - -// AsyncCallDidCommit is called after an subscription's async call has been completed and committed to the database. -func (m *Manager) AsyncCallDidCommit(ctx context.Context, origin dal.AsyncOriginPubSub) { - if _, err := m.progressSubscriptions(ctx); err != nil { - log.FromContext(ctx).Errorf(err, "failed to progress subscriptions") - } -} diff --git a/backend/controller/pubsub/service.go b/backend/controller/pubsub/service.go new file mode 100644 index 0000000000..5c6bc3cd2f --- /dev/null +++ b/backend/controller/pubsub/service.go @@ -0,0 +1,130 @@ +package pubsub + +import ( + "context" + "fmt" + "time" + + "github.com/alecthomas/types/optional" + "github.com/jpillora/backoff" + + "github.com/TBD54566975/ftl/backend/controller/async" + "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/pubsub/internal/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + "github.com/TBD54566975/ftl/backend/libdal" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" +) + +const ( + // Events can be added simultaneously, which can cause events with out of order create_at values + // By adding a delay, we ensure that by the time we read the events, no new events will be added + // with earlier created_at values. + eventConsumptionDelay = 200 * time.Millisecond +) + +type Scheduler interface { + Singleton(retry backoff.Backoff, job scheduledtask.Job) + Parallel(retry backoff.Backoff, job scheduledtask.Job) +} + +type AsyncCallListener interface { + AsyncCallWasAdded(ctx context.Context) +} + +type Service struct { + dal *dal.DAL + scheduler Scheduler + asyncCallListener optional.Option[AsyncCallListener] +} + +func New(conn libdal.Connection, encryption *encryption.Service, scheduler Scheduler, asyncCallListener optional.Option[AsyncCallListener]) *Service { + m := &Service{ + dal: dal.New(conn, encryption), + scheduler: scheduler, + asyncCallListener: asyncCallListener, + } + m.scheduler.Parallel(backoff.Backoff{ + Min: 1 * time.Second, + Max: 5 * time.Second, + Jitter: true, + Factor: 1.5, + }, m.progressSubscriptions) + return m +} + +func (s *Service) progressSubscriptions(ctx context.Context) (time.Duration, error) { + count, err := s.dal.ProgressSubscriptions(ctx, eventConsumptionDelay) + if err != nil { + return 0, fmt.Errorf("progress subscriptions: %w", err) + } + if count > 0 { + // notify controller that we added an async call + if listener, ok := s.asyncCallListener.Get(); ok { + listener.AsyncCallWasAdded(ctx) + } + } + return time.Second, nil +} + +func (s *Service) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error { + err := s.dal.PublishEventForTopic(ctx, module, topic, caller, payload) + if err != nil { + return fmt.Errorf("%s.%s: publish: %w", module, topic, err) + } + return nil +} + +func (s *Service) ResetSubscription(ctx context.Context, module, name string) (err error) { + err = s.dal.ResetSubscription(ctx, module, name) + if err != nil { + return fmt.Errorf("%s.%s: reset: %w", module, name, err) + } + return nil +} + +// OnCallCompletion is called within a transaction after an async call has completed to allow the subscription state to be updated. +func (s *Service) OnCallCompletion(ctx context.Context, tx libdal.Connection, origin async.AsyncOriginPubSub, failed bool, isFinalResult bool) error { + if !isFinalResult { + // Wait for the async call's retries to complete before progressing the subscription + return nil + } + err := s.dal.Adopt(tx).CompleteEventForSubscription(ctx, origin.Subscription.Module, origin.Subscription.Name) + if err != nil { + return fmt.Errorf("%s: complete: %w", origin, err) + } + return nil +} + +// AsyncCallDidCommit is called after a subscription's async call has been completed and committed to the database. +func (s *Service) AsyncCallDidCommit(ctx context.Context, origin async.AsyncOriginPubSub) { + if _, err := s.progressSubscriptions(ctx); err != nil { + log.FromContext(ctx).Errorf(err, "failed to progress subscriptions") + } +} + +func (s *Service) CreateSubscriptions(ctx context.Context, key model.DeploymentKey, module *schema.Module) error { + err := s.dal.CreateSubscriptions(ctx, key, module) + if err != nil { + return fmt.Errorf("create subscriptions: %w", err) + } + return nil +} + +func (s *Service) RemoveSubscriptionsAndSubscribers(ctx context.Context, key model.DeploymentKey) error { + err := s.dal.RemoveSubscriptionsAndSubscribers(ctx, key) + if err != nil { + return fmt.Errorf("remove subscriptions and subscribers: %w", err) + } + return nil +} + +func (s *Service) CreateSubscribers(ctx context.Context, key model.DeploymentKey, module *schema.Module) error { + err := s.dal.CreateSubscribers(ctx, key, module) + if err != nil { + return fmt.Errorf("create subscribers: %w", err) + } + return nil +} diff --git a/backend/controller/scheduledtask/scheduledtask.go b/backend/controller/scheduledtask/scheduledtask.go index 84dd6c8090..2b905888c0 100644 --- a/backend/controller/scheduledtask/scheduledtask.go +++ b/backend/controller/scheduledtask/scheduledtask.go @@ -12,10 +12,10 @@ import ( "time" "github.com/alecthomas/types/optional" - clock "github.com/benbjohnson/clock" + "github.com/benbjohnson/clock" "github.com/jpillora/backoff" - "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -37,7 +37,7 @@ type descriptor struct { // run. type Job func(ctx context.Context) (time.Duration, error) -type DALFunc func(ctx context.Context, all bool) ([]dal.Controller, error) +type DALFunc func(ctx context.Context, all bool) ([]dalmodel.Controller, error) // Scheduler is a task scheduler for the controller. // diff --git a/backend/controller/scheduledtask/scheduledtask_test.go b/backend/controller/scheduledtask/scheduledtask_test.go index 916b250be7..4e95cb683e 100644 --- a/backend/controller/scheduledtask/scheduledtask_test.go +++ b/backend/controller/scheduledtask/scheduledtask_test.go @@ -10,7 +10,7 @@ import ( "github.com/benbjohnson/clock" "github.com/jpillora/backoff" - "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -26,15 +26,15 @@ func TestScheduledTask(t *testing.T) { var multiCount atomic.Int64 type controller struct { - controller dal.Controller + controller dalmodel.Controller cron *Scheduler } controllers := []*controller{ - {controller: dal.Controller{Key: model.NewControllerKey("localhost", "8080")}}, - {controller: dal.Controller{Key: model.NewControllerKey("localhost", "8081")}}, - {controller: dal.Controller{Key: model.NewControllerKey("localhost", "8082")}}, - {controller: dal.Controller{Key: model.NewControllerKey("localhost", "8083")}}, + {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8080")}}, + {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8081")}}, + {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8082")}}, + {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8083")}}, } clock := clock.NewMock() diff --git a/backend/controller/sql/sqltypes/sqltypes.go b/backend/controller/sql/sqltypes/sqltypes.go index ad397288f2..03bfd6cda2 100644 --- a/backend/controller/sql/sqltypes/sqltypes.go +++ b/backend/controller/sql/sqltypes/sqltypes.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/alecthomas/types/optional" "google.golang.org/protobuf/proto" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" @@ -64,3 +65,5 @@ func (t Type) Value() (driver.Value, error) { } return data, nil } + +type OptionalTime = optional.Option[time.Time] diff --git a/backend/controller/timeline/timeline_test.go b/backend/controller/timeline/timeline_test.go index 92d90de533..346264803b 100644 --- a/backend/controller/timeline/timeline_test.go +++ b/backend/controller/timeline/timeline_test.go @@ -15,7 +15,11 @@ import ( "github.com/alecthomas/types/optional" controllerdal "github.com/TBD54566975/ftl/backend/controller/dal" + dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/pubsub" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/log" @@ -30,7 +34,9 @@ func TestTimeline(t *testing.T) { assert.NoError(t, err) timeline := New(ctx, conn, encryption) - controllerDAL := controllerdal.New(ctx, conn, encryption) + scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) + pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) + controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) @@ -49,7 +55,7 @@ func TestTimeline(t *testing.T) { module := &schema.Module{Name: "test"} var deploymentKey model.DeploymentKey t.Run("CreateDeployment", func(t *testing.T) { - deploymentKey, err = controllerDAL.CreateDeployment(ctx, "go", module, []controllerdal.DeploymentArtefact{{ + deploymentKey, err = controllerDAL.CreateDeployment(ctx, "go", module, []dalmodel.DeploymentArtefact{{ Digest: testSha, Executable: true, Path: "dir/filename", @@ -203,7 +209,9 @@ func TestDeleteOldEvents(t *testing.T) { assert.NoError(t, err) timeline := New(ctx, conn, encryption) - controllerDAL := controllerdal.New(ctx, conn, encryption) + scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser()) + pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]()) + controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) var testSha sha256.SHA256 @@ -216,7 +224,7 @@ func TestDeleteOldEvents(t *testing.T) { module := &schema.Module{Name: "test"} var deploymentKey model.DeploymentKey t.Run("CreateDeployment", func(t *testing.T) { - deploymentKey, err = controllerDAL.CreateDeployment(ctx, "go", module, []controllerdal.DeploymentArtefact{{ + deploymentKey, err = controllerDAL.CreateDeployment(ctx, "go", module, []dalmodel.DeploymentArtefact{{ Digest: testSha, Executable: true, Path: "dir/filename", diff --git a/backend/libdal/libdal.go b/backend/libdal/libdal.go index e417f778be..82fa65dd5b 100644 --- a/backend/libdal/libdal.go +++ b/backend/libdal/libdal.go @@ -38,6 +38,13 @@ func New[T any](sql Connection, fn MakeWithHandle[T]) *Handle[T] { return &Handle[T]{Connection: sql, Make: fn} } +// Adopt creates a new Handle with the given transaction. +// +// TODO: This needs to be removed - DALs should not be shared. +func (h *Handle[T]) Adopt(tx Connection) *T { + return h.Make(&Handle[T]{Connection: tx, Make: h.Make}) +} + // Begin creates a new transaction or increments the transaction counter if the handle is already in a transaction. // // In all cases a new handle is returned. diff --git a/sqlc.yaml b/sqlc.yaml index 5f40400f85..fc98108e42 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -5,6 +5,7 @@ sql: queries: - backend/controller/dal/internal/sql/queries.sql - backend/controller/dal/internal/sql/async_queries.sql + - backend/controller/pubsub/internal/sql/queries.sql # FIXME: Until we fully decouple cron from the controller, we need to include the cron queries here - backend/controller/cronjobs/internal/sql/queries.sql # Some of the timeline entries happen within a controller transaction, so we need to include them here @@ -38,8 +39,7 @@ sql: go_type: "*github.com/TBD54566975/ftl/backend/schema.Module" - db_type: "timestamptz" nullable: true - go_type: - type: "optional.Option[time.Time]" + go_type: "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes.OptionalTime" - db_type: "pg_catalog.varchar" nullable: true go_type: "github.com/alecthomas/types/optional.Option[string]" @@ -188,6 +188,14 @@ sql: go: <<: *gengo out: "backend/controller/timeline/internal/sql" + - <<: *daldir + queries: + - backend/controller/pubsub/internal/sql/queries.sql + - backend/controller/dal/internal/sql/async_queries.sql + gen: + go: + <<: *gengo + out: "backend/controller/pubsub/internal/sql" rules: - name: postgresql-query-too-costly message: "Query cost estimate is too high"