Skip to content

Commit

Permalink
feat: pubsub db changes
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 4, 2024
1 parent 502cf46 commit 23f2bb3
Show file tree
Hide file tree
Showing 14 changed files with 616 additions and 19 deletions.
9 changes: 9 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scaling/localscaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
Expand Down Expand Up @@ -177,6 +178,7 @@ type Service struct {

tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
pubSub *pubsub.Manager
controllerListListeners []ControllerListListener

// Map from endpoint to client.
Expand Down Expand Up @@ -223,6 +225,10 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

pubSub := pubsub.New(ctx, key, db)
svc.pubSub = pubSub
svc.controllerListListeners = append(svc.controllerListListeners, pubSub)

go svc.syncSchema(ctx)

// Use min, max backoff if we are running in production, otherwise use
Expand Down Expand Up @@ -1228,6 +1234,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
case dal.AsyncOriginFSM:
return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed)

case dal.AsyncOriginPubSub:
return s.pubSub.OnCallCompletion(ctx, tx, origin, failed)

default:
panic(fmt.Errorf("unsupported async call origin: %v", call.Origin))
}
Expand Down
15 changes: 14 additions & 1 deletion backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type asyncOriginParseRoot struct {
}

var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot](
participle.Union[AsyncOrigin](AsyncOriginFSM{}),
participle.Union[AsyncOrigin](AsyncOriginFSM{}, AsyncOriginPubSub{}),
)

// AsyncOrigin is a sum type representing the originator of an async call.
Expand All @@ -46,6 +46,19 @@ func (AsyncOriginFSM) asyncOrigin() {}
func (a AsyncOriginFSM) Origin() string { return "fsm" }
func (a AsyncOriginFSM) String() string { return fmt.Sprintf("fsm:%s:%s", a.FSM, a.Key) }

// AsyncOriginPubSub represents the context for the originator of an PubSub async call.
//
// It is in the form fsm:<module>.<subscription_name>
type AsyncOriginPubSub struct {
Subscription schema.RefKey `parser:"'sub' ':' @@"`
}

var _ AsyncOrigin = AsyncOriginPubSub{}

func (AsyncOriginPubSub) asyncOrigin() {}
func (a AsyncOriginPubSub) Origin() string { return "sub" }
func (a AsyncOriginPubSub) String() string { return fmt.Sprintf("sub:%s", a.Subscription) }

// ParseAsyncOrigin parses an async origin key.
func ParseAsyncOrigin(origin string) (AsyncOrigin, error) {
root, err := asyncOriginParser.ParseString("", origin)
Expand Down
6 changes: 5 additions & 1 deletion backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,16 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
if !ok {
continue
}
sinkRef := schema.Ref{
Module: moduleSchema.Name,
Name: v.Name,
}.ToRefKey()
err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{
Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name),
Module: moduleSchema.Name,
SubscriptionName: s.Name,
Deployment: deploymentKey,
Sink: v.Name,
Sink: sinkRef,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err))
Expand Down
1 change: 1 addition & 0 deletions backend/controller/dal/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestLease(t *testing.T) {
dal, err := New(ctx, conn)
assert.NoError(t, err)

// TTL is too short, expect an error
_, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*1)
assert.Error(t, err)

Expand Down
70 changes: 70 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package dal

import (
"context"
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
)

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error {
Expand All @@ -19,3 +24,68 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa
}
return nil
}

func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error) {
rows, err := d.db.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return nil, translatePGError(err)
}
return slices.Map(rows, func(row sql.GetSubscriptionsNeedingUpdateRow) model.Subscription {
return model.Subscription{
Name: row.Name,
Key: row.Key,
Topic: row.Topic,
Cursor: row.Cursor,
}
}), nil
}

func (d *DAL) ProgressSubscription(ctx context.Context, subscription model.Subscription) error {
tx, err := d.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
if err != nil {
return fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
}

result, err := tx.db.LockSubscriptionAndGetSink(ctx, subscription.Key, subscription.Cursor)
if err != nil {
return fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
}

err = tx.db.BeginConsumingTopicEvent(ctx, optional.Some(result.SubscriptionID), nextCursor.Event)
if err != nil {
return fmt.Errorf("failed to progress subscription: %w", translatePGError(err))
}

origin := AsyncOriginPubSub{
Subscription: schema.Ref{
Module: subscription.Key.Payload.Module,
Name: subscription.Key.Payload.Name,
}.ToRefKey(),
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: result.Sink,
Origin: origin.String(),
Request: nextCursor.Payload,
RemainingAttempts: 0,
Backoff: time.Duration(0),
MaxBackoff: time.Duration(0),
})
if err != nil {
return fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
}
return nil
}

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
err := d.db.CompleteEventForSubscription(ctx, name, module)
if err != nil {
return fmt.Errorf("failed to complete event for subscription: %w", translatePGError(err))
}
return nil
}
158 changes: 158 additions & 0 deletions backend/controller/pubsub/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package pubsub

import (
"context"
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/atomic"
"github.com/serialx/hashring"
)

const (
controllersPerSubscription = 2
)

type Manager struct {
key model.ControllerKey

// TODO: swap out DAL for a smaller interface once we know what funcs we want
dal *dal.DAL
hashRingState atomic.Value[*hashRingState]
}

type hashRingState struct {
hashRing *hashring.HashRing
controllers []dal.Controller
idx int
}

func New(ctx context.Context, key model.ControllerKey, dal *dal.DAL) *Manager {
m := &Manager{
key: key,
dal: dal,
}

go m.watchForUpdates(ctx)
return m
}

func (m *Manager) HandleTopicNotification() {

}

func (m *Manager) HandleEventNotification() {

}

// UpdatedControllerList synchronises the hash ring with the active controllers.
func (m *Manager) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) {
logger := log.FromContext(ctx).Scope("cron")
controllerIdx := -1
for idx, controller := range controllers {
if controller.Key.String() == m.key.String() {
controllerIdx = idx
break
}
}
if controllerIdx == -1 {
logger.Tracef("controller %q not found in list of controllers", m.key)
}

oldState := m.hashRingState.Load()
if oldState != nil && len(oldState.controllers) == len(controllers) {
hasChanged := false
for idx, new := range controllers {
old := oldState.controllers[idx]
if new.Key.String() != old.Key.String() {
hasChanged = true
break
}
}
if !hasChanged {
return
}
}

hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() }))
m.hashRingState.Store(&hashRingState{
hashRing: hashRing,
controllers: controllers,
idx: controllerIdx,
})
}

// isResponsibleForSubscription indicates whether a this service should be responsible for attempting jobs,
// or if enough other controllers will handle it. This allows us to spread the job load across controllers.
func (m *Manager) isResponsibleForSubscription(subscription model.Subscription) bool {
hashringState := m.hashRingState.Load()
if hashringState == nil {
return true
}

initialKey, ok := hashringState.hashRing.GetNode(subscription.Key.String())
if !ok {
return true
}

initialIdx := -1
for idx, controller := range hashringState.controllers {
if controller.Key.String() == initialKey {
initialIdx = idx
break
}
}
if initialIdx == -1 {
return true
}

if initialIdx+controllersPerSubscription > len(hashringState.controllers) {
// wraps around
return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerSubscription)-len(hashringState.controllers)
}
return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerSubscription
}

func (m *Manager) watchForUpdates(ctx context.Context) {
logger := log.FromContext(ctx).Scope("pubsub")

// TODO: handle events here. Currently a demo implementation
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 3):
if err := m.progressSubscriptions(ctx); err != nil {
logger.Errorf(err, "failed to progress subscriptions")
continue
}
}
}
}

func (m *Manager) progressSubscriptions(ctx context.Context) (err error) {
subscriptions, err := m.dal.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return fmt.Errorf("failed to get subscriptions needing update: %w", err)
}
for _, subscription := range subscriptions {
if !m.isResponsibleForSubscription(subscription) {
continue
}
logger := log.FromContext(ctx)

err := m.dal.ProgressSubscription(ctx, subscription)
if err != nil {
logger.Errorf(err, "failed to progress subscription")
}
}
return nil
}

func (m *Manager) OnCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginPubSub, failed bool) error {
return m.dal.CompleteEventForSubscription(ctx, origin.Subscription.Module, origin.Subscription.Name)
}
Loading

0 comments on commit 23f2bb3

Please sign in to comment.