diff --git a/backend/controller/controller.go b/backend/controller/controller.go index d1b16b7d2e..39a70fba52 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -36,6 +36,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/scaling" "github.com/TBD54566975/ftl/backend/controller/scaling/localscaling" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" @@ -177,6 +178,7 @@ type Service struct { tasks *scheduledtask.Scheduler cronJobs *cronjobs.Service + pubSub *pubsub.Manager controllerListListeners []ControllerListListener // Map from endpoint to client. @@ -223,6 +225,10 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, cronSvc) + pubSub := pubsub.New(ctx, key, db) + svc.pubSub = pubSub + svc.controllerListListeners = append(svc.controllerListListeners, pubSub) + go svc.syncSchema(ctx) // Use min, max backoff if we are running in production, otherwise use @@ -1228,6 +1234,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) case dal.AsyncOriginFSM: return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed) + case dal.AsyncOriginPubSub: + return s.pubSub.OnCallCompletion(ctx, tx, origin, failed) + default: panic(fmt.Errorf("unsupported async call origin: %v", call.Origin)) } diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index ddb256be94..2151479e99 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -19,7 +19,7 @@ type asyncOriginParseRoot struct { } var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot]( - participle.Union[AsyncOrigin](AsyncOriginFSM{}), + participle.Union[AsyncOrigin](AsyncOriginFSM{}, AsyncOriginPubSub{}), ) // AsyncOrigin is a sum type representing the originator of an async call. @@ -46,6 +46,19 @@ func (AsyncOriginFSM) asyncOrigin() {} func (a AsyncOriginFSM) Origin() string { return "fsm" } func (a AsyncOriginFSM) String() string { return fmt.Sprintf("fsm:%s:%s", a.FSM, a.Key) } +// AsyncOriginPubSub represents the context for the originator of an PubSub async call. +// +// It is in the form fsm:. +type AsyncOriginPubSub struct { + Subscription schema.RefKey `parser:"'sub' ':' @@"` +} + +var _ AsyncOrigin = AsyncOriginPubSub{} + +func (AsyncOriginPubSub) asyncOrigin() {} +func (a AsyncOriginPubSub) Origin() string { return "sub" } +func (a AsyncOriginPubSub) String() string { return fmt.Sprintf("sub:%s", a.Subscription) } + // ParseAsyncOrigin parses an async origin key. func ParseAsyncOrigin(origin string) (AsyncOrigin, error) { root, err := asyncOriginParser.ParseString("", origin) diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index e6170ee0f2..31045b0c19 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -596,12 +596,16 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem if !ok { continue } + sinkRef := schema.Ref{ + Module: moduleSchema.Name, + Name: v.Name, + }.ToRefKey() err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{ Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name), Module: moduleSchema.Name, SubscriptionName: s.Name, Deployment: deploymentKey, - Sink: v.Name, + Sink: sinkRef, }) if err != nil { return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err)) diff --git a/backend/controller/dal/lease_test.go b/backend/controller/dal/lease_test.go index 8f8474ec76..2be0336f04 100644 --- a/backend/controller/dal/lease_test.go +++ b/backend/controller/dal/lease_test.go @@ -37,6 +37,7 @@ func TestLease(t *testing.T) { dal, err := New(ctx, conn) assert.NoError(t, err) + // TTL is too short, expect an error _, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*1) assert.Error(t, err) diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index e7336a399d..0d6a79576d 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -2,9 +2,14 @@ package dal import ( "context" + "fmt" + "time" "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/types/optional" ) func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error { @@ -19,3 +24,68 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa } return nil } + +func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error) { + rows, err := d.db.GetSubscriptionsNeedingUpdate(ctx) + if err != nil { + return nil, translatePGError(err) + } + return slices.Map(rows, func(row sql.GetSubscriptionsNeedingUpdateRow) model.Subscription { + return model.Subscription{ + Name: row.Name, + Key: row.Key, + Topic: row.Topic, + Cursor: row.Cursor, + } + }), nil +} + +func (d *DAL) ProgressSubscription(ctx context.Context, subscription model.Subscription) error { + tx, err := d.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.CommitOrRollback(ctx, &err) + + nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor) + if err != nil { + return fmt.Errorf("failed to get next cursor: %w", translatePGError(err)) + } + + result, err := tx.db.LockSubscriptionAndGetSink(ctx, subscription.Key, subscription.Cursor) + if err != nil { + return fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err)) + } + + err = tx.db.BeginConsumingTopicEvent(ctx, optional.Some(result.SubscriptionID), nextCursor.Event) + if err != nil { + return fmt.Errorf("failed to progress subscription: %w", translatePGError(err)) + } + + origin := AsyncOriginPubSub{ + Subscription: schema.Ref{ + Module: subscription.Key.Payload.Module, + Name: subscription.Key.Payload.Name, + }.ToRefKey(), + } + _, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ + Verb: result.Sink, + Origin: origin.String(), + Request: nextCursor.Payload, + RemainingAttempts: 0, + Backoff: time.Duration(0), + MaxBackoff: time.Duration(0), + }) + if err != nil { + return fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err)) + } + return nil +} + +func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error { + err := d.db.CompleteEventForSubscription(ctx, name, module) + if err != nil { + return fmt.Errorf("failed to complete event for subscription: %w", translatePGError(err)) + } + return nil +} diff --git a/backend/controller/pubsub/manager.go b/backend/controller/pubsub/manager.go new file mode 100644 index 0000000000..179e3209ea --- /dev/null +++ b/backend/controller/pubsub/manager.go @@ -0,0 +1,158 @@ +package pubsub + +import ( + "context" + "fmt" + "time" + + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/atomic" + "github.com/serialx/hashring" +) + +const ( + controllersPerSubscription = 2 +) + +type Manager struct { + key model.ControllerKey + + // TODO: swap out DAL for a smaller interface once we know what funcs we want + dal *dal.DAL + hashRingState atomic.Value[*hashRingState] +} + +type hashRingState struct { + hashRing *hashring.HashRing + controllers []dal.Controller + idx int +} + +func New(ctx context.Context, key model.ControllerKey, dal *dal.DAL) *Manager { + m := &Manager{ + key: key, + dal: dal, + } + + go m.watchForUpdates(ctx) + return m +} + +func (m *Manager) HandleTopicNotification() { + +} + +func (m *Manager) HandleEventNotification() { + +} + +// UpdatedControllerList synchronises the hash ring with the active controllers. +func (m *Manager) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { + logger := log.FromContext(ctx).Scope("cron") + controllerIdx := -1 + for idx, controller := range controllers { + if controller.Key.String() == m.key.String() { + controllerIdx = idx + break + } + } + if controllerIdx == -1 { + logger.Tracef("controller %q not found in list of controllers", m.key) + } + + oldState := m.hashRingState.Load() + if oldState != nil && len(oldState.controllers) == len(controllers) { + hasChanged := false + for idx, new := range controllers { + old := oldState.controllers[idx] + if new.Key.String() != old.Key.String() { + hasChanged = true + break + } + } + if !hasChanged { + return + } + } + + hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) + m.hashRingState.Store(&hashRingState{ + hashRing: hashRing, + controllers: controllers, + idx: controllerIdx, + }) +} + +// isResponsibleForSubscription indicates whether a this service should be responsible for attempting jobs, +// or if enough other controllers will handle it. This allows us to spread the job load across controllers. +func (m *Manager) isResponsibleForSubscription(subscription model.Subscription) bool { + hashringState := m.hashRingState.Load() + if hashringState == nil { + return true + } + + initialKey, ok := hashringState.hashRing.GetNode(subscription.Key.String()) + if !ok { + return true + } + + initialIdx := -1 + for idx, controller := range hashringState.controllers { + if controller.Key.String() == initialKey { + initialIdx = idx + break + } + } + if initialIdx == -1 { + return true + } + + if initialIdx+controllersPerSubscription > len(hashringState.controllers) { + // wraps around + return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerSubscription)-len(hashringState.controllers) + } + return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerSubscription +} + +func (m *Manager) watchForUpdates(ctx context.Context) { + logger := log.FromContext(ctx).Scope("pubsub") + + // TODO: handle events here. Currently a demo implementation + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * 3): + if err := m.progressSubscriptions(ctx); err != nil { + logger.Errorf(err, "failed to progress subscriptions") + continue + } + } + } +} + +func (m *Manager) progressSubscriptions(ctx context.Context) (err error) { + subscriptions, err := m.dal.GetSubscriptionsNeedingUpdate(ctx) + if err != nil { + return fmt.Errorf("failed to get subscriptions needing update: %w", err) + } + for _, subscription := range subscriptions { + if !m.isResponsibleForSubscription(subscription) { + continue + } + logger := log.FromContext(ctx) + + err := m.dal.ProgressSubscription(ctx, subscription) + if err != nil { + logger.Errorf(err, "failed to progress subscription") + } + } + return nil +} + +func (m *Manager) OnCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginPubSub, failed bool) error { + return m.dal.CompleteEventForSubscription(ctx, origin.Subscription.Module, origin.Subscription.Name) +} diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 2bbae0d919..b2b5a5e8c1 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -319,6 +319,48 @@ func (ns NullRunnerState) Value() (driver.Value, error) { return string(ns.RunnerState), nil } +type TopicSubscriptionState string + +const ( + TopicSubscriptionStateIdle TopicSubscriptionState = "idle" + TopicSubscriptionStateExecuting TopicSubscriptionState = "executing" +) + +func (e *TopicSubscriptionState) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = TopicSubscriptionState(s) + case string: + *e = TopicSubscriptionState(s) + default: + return fmt.Errorf("unsupported scan type for TopicSubscriptionState: %T", src) + } + return nil +} + +type NullTopicSubscriptionState struct { + TopicSubscriptionState TopicSubscriptionState + Valid bool // Valid is true if TopicSubscriptionState is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullTopicSubscriptionState) Scan(value interface{}) error { + if value == nil { + ns.TopicSubscriptionState, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.TopicSubscriptionState.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullTopicSubscriptionState) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.TopicSubscriptionState), nil +} + type Artefact struct { ID int64 CreatedAt time.Time @@ -462,6 +504,7 @@ type Topic struct { ModuleID int64 Name string Type string + Head optional.Option[int64] } type TopicEvent struct { @@ -478,7 +521,7 @@ type TopicSubscriber struct { CreatedAt time.Time TopicSubscriptionsID int64 DeploymentID int64 - Sink string + Sink schema.RefKey } type TopicSubscription struct { @@ -489,4 +532,5 @@ type TopicSubscription struct { ModuleID int64 Name string Cursor optional.Option[int64] + State TopicSubscriptionState } diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 5026e408b6..3671276f8e 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -20,6 +20,8 @@ type Querier interface { // reservation key. AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error + BeginConsumingTopicEvent(ctx context.Context, subscriptionID optional.Option[int64], event NullTopicEventKey) error + CompleteEventForSubscription(ctx context.Context, name string, module string) error // Create a new artefact and return the artefact ID. CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) @@ -61,6 +63,7 @@ type Querier interface { GetIngressRoutes(ctx context.Context, method string) ([]GetIngressRoutesRow, error) GetModuleConfiguration(ctx context.Context, module optional.Option[string], name string) ([]byte, error) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) + GetNextEventForSubscription(ctx context.Context, topic model.TopicKey, cursor NullTopicEventKey) (GetNextEventForSubscriptionRow, error) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) // Retrieve routing information for a runner. GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error) @@ -69,6 +72,7 @@ type Querier interface { GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) + GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error @@ -80,6 +84,8 @@ type Querier interface { KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) ListModuleConfiguration(ctx context.Context) ([]ModuleConfiguration, error) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) + // get a lock on the subscription row, guaranteeing that it is idle and has not consumed more events + LockSubscriptionAndGetSink(ctx context.Context, key model.SubscriptionKey, cursor NullTopicEventKey) (LockSubscriptionAndGetSinkRow, error) NewLease(ctx context.Context, key leases.Key, ttl time.Duration) (uuid.UUID, error) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index e4fa1dfc90..f0df6f0e60 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -657,7 +657,7 @@ VALUES ( AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT ), (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key), - sqlc.arg('sink')::TEXT); + sqlc.arg('sink')); -- name: PublishEventForTopic :exec INSERT INTO topic_events ("key", topic_id, payload) @@ -673,6 +673,79 @@ VALUES ( sqlc.arg('payload') ); +-- name: GetSubscriptionsNeedingUpdate :many +SELECT + subs.key::subscription_key as key, + topic_events.key as cursor, + topics.key::topic_key as topic, + subs.name +FROM topic_subscriptions subs +LEFT JOIN topics ON subs.topic_id = topics.id +LEFT JOIN topic_events ON subs.cursor = topic_events.id +WHERE subs.cursor IS DISTINCT FROM topics.head + AND subs.state = 'idle'; + +-- name: GetNextEventForSubscription :one +WITH cursor AS ( + SELECT + created_at, + id + FROM topic_events + WHERE "key" = sqlc.narg('cursor')::topic_event_key +) +SELECT events."key" as event, + events.payload +FROM topics +LEFT JOIN topic_events as events ON events.topic_id = topics.id +WHERE topics.key = sqlc.arg('topic')::topic_key + AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) +ORDER BY events.created_at, events.id +LIMIT 1; + +-- name: LockSubscriptionAndGetSink :one +WITH subscriber AS ( + -- choose a random subscriber to execute the event + SELECT + subscribers.sink as sink + FROM topic_subscribers as subscribers + JOIN deployments ON subscribers.deployment_id = deployments.id + JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id + WHERE topic_subscriptions.key = sqlc.arg('key')::subscription_key + AND deployments.min_replicas > 0 + ORDER BY RANDOM() + LIMIT 1 +) +-- get a lock on the subscription row, guaranteeing that it is idle and has not consumed more events +SELECT + id as subscription_id, + (SELECT sink FROM subscriber) AS sink +FROM topic_subscriptions +WHERE state = 'idle' + AND key = sqlc.arg('key')::subscription_key + AND cursor IS NOT DISTINCT FROM (SELECT id FROM topic_events WHERE "key" = sqlc.narg('cursor')::topic_event_key) +FOR UPDATE; + +-- name: BeginConsumingTopicEvent :exec +WITH event AS ( + SELECT * + FROM topic_events + WHERE "key" = sqlc.arg('event')::topic_event_key +) +UPDATE topic_subscriptions +SET state = 'executing', + cursor = (SELECT id FROM event) +WHERE id = sqlc.arg('subscription_id'); + +-- name: CompleteEventForSubscription :exec +WITH module AS ( + SELECT id + FROM modules + WHERE name = sqlc.arg('module')::TEXT +) +UPDATE topic_subscriptions +SET state = 'idle' +WHERE name = @name::TEXT; + -- name: GetModuleConfiguration :one SELECT value FROM module_configuration diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index b2e6796d35..bbf4bcae13 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -100,6 +100,39 @@ func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg Assoc return err } +const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec +WITH event AS ( + SELECT id, created_at, key, topic_id, payload + FROM topic_events + WHERE "key" = $2::topic_event_key +) +UPDATE topic_subscriptions +SET state = 'executing', + cursor = (SELECT id FROM event) +WHERE id = $1 +` + +func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscriptionID optional.Option[int64], event NullTopicEventKey) error { + _, err := q.db.Exec(ctx, beginConsumingTopicEvent, subscriptionID, event) + return err +} + +const completeEventForSubscription = `-- name: CompleteEventForSubscription :exec +WITH module AS ( + SELECT id + FROM modules + WHERE name = $2::TEXT +) +UPDATE topic_subscriptions +SET state = 'idle' +WHERE name = $1::TEXT +` + +func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error { + _, err := q.db.Exec(ctx, completeEventForSubscription, name, module) + return err +} + const createArtefact = `-- name: CreateArtefact :one INSERT INTO artefacts (digest, content) VALUES ($1, $2) @@ -1149,6 +1182,36 @@ func (q *Queries) GetModulesByID(ctx context.Context, ids []int64) ([]Module, er return items, nil } +const getNextEventForSubscription = `-- name: GetNextEventForSubscription :one +WITH cursor AS ( + SELECT + created_at, + id + FROM topic_events + WHERE "key" = $2::topic_event_key +) +SELECT events."key" as event, + events.payload +FROM topics +LEFT JOIN topic_events as events ON events.topic_id = topics.id +WHERE topics.key = $1::topic_key + AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) +ORDER BY events.created_at, events.id +LIMIT 1 +` + +type GetNextEventForSubscriptionRow struct { + Event NullTopicEventKey + Payload []byte +} + +func (q *Queries) GetNextEventForSubscription(ctx context.Context, topic model.TopicKey, cursor NullTopicEventKey) (GetNextEventForSubscriptionRow, error) { + row := q.db.QueryRow(ctx, getNextEventForSubscription, topic, cursor) + var i GetNextEventForSubscriptionRow + err := row.Scan(&i.Event, &i.Payload) + return i, err +} + const getProcessList = `-- name: GetProcessList :many SELECT d.min_replicas, d.key AS deployment_key, @@ -1435,6 +1498,51 @@ func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) return items, nil } +const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many +SELECT + subs.key::subscription_key as key, + topic_events.key as cursor, + topics.key::topic_key as topic, + subs.name +FROM topic_subscriptions subs +LEFT JOIN topics ON subs.topic_id = topics.id +LEFT JOIN topic_events ON subs.cursor = topic_events.id +WHERE subs.cursor IS DISTINCT FROM topics.head + AND subs.state = 'idle' +` + +type GetSubscriptionsNeedingUpdateRow struct { + Key model.SubscriptionKey + Cursor NullTopicEventKey + Topic model.TopicKey + Name string +} + +func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { + rows, err := q.db.Query(ctx, getSubscriptionsNeedingUpdate) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetSubscriptionsNeedingUpdateRow + for rows.Next() { + var i GetSubscriptionsNeedingUpdateRow + if err := rows.Scan( + &i.Key, + &i.Cursor, + &i.Topic, + &i.Name, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertCallEvent = `-- name: InsertCallEvent :exec INSERT INTO events (deployment_id, request_id, time_stamp, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) @@ -1645,7 +1753,7 @@ VALUES ( AND topic_subscriptions.name = $3::TEXT ), (SELECT id FROM deployments WHERE key = $4::deployment_key), - $5::TEXT) + $5) ` type InsertSubscriberParams struct { @@ -1653,7 +1761,7 @@ type InsertSubscriberParams struct { Module string SubscriptionName string Deployment model.DeploymentKey - Sink string + Sink schema.RefKey } func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { @@ -1762,6 +1870,42 @@ func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error return i, err } +const lockSubscriptionAndGetSink = `-- name: LockSubscriptionAndGetSink :one +WITH subscriber AS ( + -- choose a random subscriber to execute the event + SELECT + subscribers.sink as sink + FROM topic_subscribers as subscribers + JOIN deployments ON subscribers.deployment_id = deployments.id + JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id + WHERE topic_subscriptions.key = $1::subscription_key + AND deployments.min_replicas > 0 + ORDER BY RANDOM() + LIMIT 1 +) +SELECT + id as subscription_id, + (SELECT sink FROM subscriber) AS sink +FROM topic_subscriptions +WHERE state = 'idle' + AND key = $1::subscription_key + AND cursor IS NOT DISTINCT FROM (SELECT id FROM topic_events WHERE "key" = $2::topic_event_key) +FOR UPDATE +` + +type LockSubscriptionAndGetSinkRow struct { + SubscriptionID int64 + Sink schema.RefKey +} + +// get a lock on the subscription row, guaranteeing that it is idle and has not consumed more events +func (q *Queries) LockSubscriptionAndGetSink(ctx context.Context, key model.SubscriptionKey, cursor NullTopicEventKey) (LockSubscriptionAndGetSinkRow, error) { + row := q.db.QueryRow(ctx, lockSubscriptionAndGetSink, key, cursor) + var i LockSubscriptionAndGetSinkRow + err := row.Scan(&i.SubscriptionID, &i.Sink) + return i, err +} + const newLease = `-- name: NewLease :one INSERT INTO leases (idempotency_key, key, expires_at) VALUES (gen_random_uuid(), $1::lease_key, (NOW() AT TIME ZONE 'utc') + $2::interval) @@ -1776,7 +1920,7 @@ func (q *Queries) NewLease(ctx context.Context, key leases.Key, ttl time.Duratio } const publishEventForTopic = `-- name: PublishEventForTopic :exec -INSERT INTO topic_events (key, topic_id, payload) +INSERT INTO topic_events ("key", topic_id, payload) VALUES ( $1::topic_event_key, ( diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index d873d31066..dad0b1f6be 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -313,7 +313,9 @@ CREATE TABLE topics ( name TEXT NOT NULL, -- Data reference to the payload data type in the owning module's schema. - type TEXT NOT NULL + type TEXT NOT NULL, + + head BIGINT NULL ); CREATE UNIQUE INDEX topics_module_name_idx ON topics(module_id, name); @@ -342,8 +344,32 @@ CREATE TRIGGER topic_events_notify_event FOR EACH ROW EXECUTE PROCEDURE notify_event(); +-- Automatically update module_name when deployment_id is set or unset. +CREATE OR REPLACE FUNCTION topics_update_head() RETURNS TRIGGER AS +$$ +BEGIN + UPDATE topics + SET head = NEW.id + WHERE id = NEW.topic_id; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER topics_update_head + BEFORE INSERT OR UPDATE + ON topic_events + FOR EACH ROW +EXECUTE PROCEDURE topics_update_head(); + CREATE DOMAIN subscription_key AS TEXT; +CREATE TYPE topic_subscription_state AS ENUM ( + -- The subscription is available to consume events. + 'idle', + -- The subscriptions is currently handling an event. + 'executing' +); + -- A subscription to a topic. -- -- Multiple subscribers can consume from a single subscription. @@ -361,7 +387,12 @@ CREATE TABLE topic_subscriptions ( name TEXT UNIQUE NOT NULL, -- Cursor pointing into the topic_events table. - cursor BIGINT REFERENCES topic_events(id) ON DELETE CASCADE + cursor BIGINT REFERENCES topic_events(id) ON DELETE CASCADE, + + -- when a subscription is handling an event, lease_id and active event are filled in + -- if a lease expires, active event will still be set, indicating a async call has already been scheduled + -- this state must be handled separately + state topic_subscription_state NOT NULL DEFAULT 'idle' ); CREATE UNIQUE INDEX topic_subscriptions_module_name_idx ON topic_subscriptions(module_id, name); @@ -380,7 +411,7 @@ CREATE TABLE topic_subscribers ( deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, -- Name of the verb to call on the deployment. - sink TEXT NOT NULL + sink schema_ref NOT NULL ); CREATE DOMAIN lease_key AS TEXT; @@ -465,4 +496,4 @@ CREATE TABLE module_configuration UNIQUE (module, name) ); --- migrate:down +-- migrate:down \ No newline at end of file diff --git a/backend/controller/sql/types.go b/backend/controller/sql/types.go index 432236aca2..1c53616c66 100644 --- a/backend/controller/sql/types.go +++ b/backend/controller/sql/types.go @@ -26,6 +26,10 @@ type NullRunnerKey = optional.Option[model.RunnerKey] type NullCronJobKey = optional.Option[model.CronJobKey] type NullDeploymentKey = optional.Option[model.DeploymentKey] type NullRequestKey = optional.Option[model.RequestKey] +type NullTopicKey = optional.Option[model.TopicKey] +type NullSubscriptionKey = optional.Option[model.SubscriptionKey] +type NullSubscriberKey = optional.Option[model.SubscriberKey] +type NullTopicEventKey = optional.Option[model.TopicEventKey] var _ sql.Scanner = (*NullRunnerKey)(nil) var _ driver.Valuer = (*NullRunnerKey)(nil) @@ -39,6 +43,18 @@ var _ driver.Valuer = (*NullDeploymentKey)(nil) var _ sql.Scanner = (*NullRequestKey)(nil) var _ driver.Valuer = (*NullRequestKey)(nil) +var _ sql.Scanner = (*NullTopicKey)(nil) +var _ driver.Valuer = (*NullTopicKey)(nil) + +var _ sql.Scanner = (*NullSubscriptionKey)(nil) +var _ driver.Valuer = (*NullSubscriptionKey)(nil) + +var _ sql.Scanner = (*NullSubscriberKey)(nil) +var _ driver.Valuer = (*NullSubscriberKey)(nil) + +var _ sql.Scanner = (*NullTopicEventKey)(nil) +var _ driver.Valuer = (*NullTopicEventKey)(nil) + // Type is a database adapter type for schema.Type. // // It encodes to/from the protobuf representation of a Type. diff --git a/internal/model/pubsub.go b/internal/model/pubsub.go new file mode 100644 index 0000000000..958abda54e --- /dev/null +++ b/internal/model/pubsub.go @@ -0,0 +1,12 @@ +package model + +import ( + "github.com/alecthomas/types/optional" +) + +type Subscription struct { + Name string + Key SubscriptionKey + Topic TopicKey + Cursor optional.Option[TopicEventKey] +} diff --git a/sqlc.yaml b/sqlc.yaml index 354fce95ef..73c0214201 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -83,6 +83,30 @@ sql: nullable: true go_type: type: "NullRequestKey" + - db_type: "topic_key" + go_type: "github.com/TBD54566975/ftl/internal/model.TopicKey" + - db_type: "topic_key" + nullable: true + go_type: + type: "NullTopicKey" + - db_type: "subscription_key" + go_type: "github.com/TBD54566975/ftl/internal/model.SubscriptionKey" + - db_type: "subscription_key" + nullable: true + go_type: + type: "NullSubscriptionKey" + - db_type: "subscriber_key" + go_type: "github.com/TBD54566975/ftl/internal/model.SubscriberKey" + - db_type: "subscriber_key" + nullable: true + go_type: + type: "NullSubscriberKey" + - db_type: "topic_event_key" + go_type: "github.com/TBD54566975/ftl/internal/model.TopicEventKey" + - db_type: "topic_event_key" + nullable: true + go_type: + type: "NullTopicEventKey" - db_type: "text" go_type: "string" - db_type: "text" @@ -109,14 +133,6 @@ sql: go_type: "github.com/TBD54566975/ftl/internal/model.DeploymentKey" - column: "events.payload" go_type: "encoding/json.RawMessage" - - db_type: "topic_key" - go_type: "github.com/TBD54566975/ftl/internal/model.TopicKey" - - db_type: "subscription_key" - go_type: "github.com/TBD54566975/ftl/internal/model.SubscriptionKey" - - db_type: "subscriber_key" - go_type: "github.com/TBD54566975/ftl/internal/model.SubscriberKey" - - db_type: "topic_event_key" - go_type: "github.com/TBD54566975/ftl/internal/model.TopicEventKey" rules: - sqlc/db-prepare # - postgresql-query-too-costly