Skip to content

Commit

Permalink
refactor: pull pubsub DAL/SQL out of controller (#2798)
Browse files Browse the repository at this point in the history
This introduces quite a lot of constructor boilerplate in a lot of
places which I hope will become cleaner as more and more of the
subsystems are decoupled.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
alecthomas and github-actions[bot] authored Sep 24, 2024
1 parent f94cbb1 commit a38cf8c
Show file tree
Hide file tree
Showing 38 changed files with 1,768 additions and 805 deletions.
85 changes: 85 additions & 0 deletions backend/controller/async/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package async

import (
"fmt"

"github.com/alecthomas/participle/v2"
"github.com/alecthomas/participle/v2/lexer"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/model"
)

type asyncOriginParseRoot struct {
Key AsyncOrigin `parser:"@@"`
}

var asyncOriginLexer = lexer.MustSimple([]lexer.SimpleRule{
{"NumberIdent", `[0-9][a-zA-Z0-9_-]*`},
{"Ident", `[a-zA-Z_][a-zA-Z0-9_-]*`},
{"Punct", `[:.]`},
})

var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot](
participle.Union[AsyncOrigin](AsyncOriginCron{}, AsyncOriginFSM{}, AsyncOriginPubSub{}),
participle.Lexer(asyncOriginLexer),
)

// AsyncOrigin is a sum type representing the originator of an async call.
//
// This is used to determine how to handle the result of the async call.
type AsyncOrigin interface {
asyncOrigin()
// Origin returns the origin type.
Origin() string
String() string
}

// AsyncOriginCron represents the context for the originator of a cron async call.
//
// It is in the form cron:<module>.<verb>
type AsyncOriginCron struct {
CronJobKey model.CronJobKey `parser:"'cron' ':' @(~EOF)+"`
}

var _ AsyncOrigin = AsyncOriginCron{}

func (AsyncOriginCron) asyncOrigin() {}
func (a AsyncOriginCron) Origin() string { return "cron" }
func (a AsyncOriginCron) String() string { return fmt.Sprintf("cron:%s", a.CronJobKey) }

// AsyncOriginFSM represents the context for the originator of an FSM async call.
//
// It is in the form fsm:<module>.<name>:<key>
type AsyncOriginFSM struct {
FSM schema.RefKey `parser:"'fsm' ':' @@"`
Key string `parser:"':' @(~EOF)+"`
}

var _ AsyncOrigin = AsyncOriginFSM{}

func (AsyncOriginFSM) asyncOrigin() {}
func (a AsyncOriginFSM) Origin() string { return "fsm" }
func (a AsyncOriginFSM) String() string { return fmt.Sprintf("fsm:%s:%s", a.FSM, a.Key) }

// AsyncOriginPubSub represents the context for the originator of an PubSub async call.
//
// It is in the form fsm:<module>.<subscription_name>
type AsyncOriginPubSub struct {
Subscription schema.RefKey `parser:"'sub' ':' @@"`
}

var _ AsyncOrigin = AsyncOriginPubSub{}

func (AsyncOriginPubSub) asyncOrigin() {}
func (a AsyncOriginPubSub) Origin() string { return "sub" }
func (a AsyncOriginPubSub) String() string { return fmt.Sprintf("sub:%s", a.Subscription) }

// ParseAsyncOrigin parses an async origin key.
func ParseAsyncOrigin(origin string) (AsyncOrigin, error) {
root, err := asyncOriginParser.ParseString("", origin)
if err != nil {
return nil, fmt.Errorf("failed to parse async origin: %w", err)
}
return root.Key, nil
}
3 changes: 2 additions & 1 deletion backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/timeline"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
}

sch := &schema.Schema{
Modules: slices.Map(deployments, func(d dal.Deployment) *schema.Module {
Modules: slices.Map(deployments, func(d dalmodel.Deployment) *schema.Module {
return d.Schema
}),
}
Expand Down
58 changes: 31 additions & 27 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import (

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/console"
"github.com/TBD54566975/ftl/backend/controller/cronjobs"
"github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
Expand Down Expand Up @@ -207,7 +209,7 @@ type clients struct {
// ControllerListListener is regularly notified of the current list of controllers
// This is often used to update a hash ring to distribute work.
type ControllerListListener interface {
UpdatedControllerList(ctx context.Context, controllers []dal.Controller)
UpdatedControllerList(ctx context.Context, controllers []dalmodel.Controller)
}

type Service struct {
Expand All @@ -219,7 +221,7 @@ type Service struct {

tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
pubSub *pubsub.Manager
pubSub *pubsub.Service
timeline *timeline.Service
controllerListListeners []ControllerListListener

Expand Down Expand Up @@ -255,11 +257,11 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
return nil, fmt.Errorf("failed to create encryption dal: %w", err)
}

db := dal.New(ctx, conn, encryption)
ldb := dbleaser.NewDatabaseLeaser(conn)
scheduler := scheduledtask.New(ctx, key, ldb)

svc := &Service{
tasks: scheduledtask.New(ctx, key, ldb),
dal: db,
tasks: scheduler,
dbleaser: ldb,
conn: conn,
key: key,
Expand All @@ -274,9 +276,11 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

pubSub := pubsub.New(ctx, db, svc.tasks, svc)
pubSub := pubsub.New(conn, encryption, svc.tasks, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub

svc.dal = dal.New(ctx, conn, encryption, pubSub)

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc

Expand Down Expand Up @@ -397,7 +401,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
}
})
replicas := map[string]int32{}
protoRunners, err := slices.MapErr(status.Runners, func(r dal.Runner) (*ftlv1.StatusResponse_Runner, error) {
protoRunners, err := slices.MapErr(status.Runners, func(r dalmodel.Runner) (*ftlv1.StatusResponse_Runner, error) {
asString := r.Deployment.String()
deployment := &asString
replicas[asString]++
Expand All @@ -415,7 +419,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
if err != nil {
return nil, err
}
deployments, err := slices.MapErr(status.Deployments, func(d dal.Deployment) (*ftlv1.StatusResponse_Deployment, error) {
deployments, err := slices.MapErr(status.Deployments, func(d dalmodel.Deployment) (*ftlv1.StatusResponse_Deployment, error) {
labels, err := structpb.NewStruct(d.Labels)
if err != nil {
return nil, fmt.Errorf("could not marshal attributes for deployment %s: %w", d.Key.String(), err)
Expand All @@ -434,7 +438,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
return nil, err
}
resp := &ftlv1.StatusResponse{
Controllers: slices.Map(status.Controllers, func(c dal.Controller) *ftlv1.StatusResponse_Controller {
Controllers: slices.Map(status.Controllers, func(c dalmodel.Controller) *ftlv1.StatusResponse_Controller {
return &ftlv1.StatusResponse_Controller{
Key: c.Key.String(),
Endpoint: c.Endpoint,
Expand All @@ -443,7 +447,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
}),
Runners: protoRunners,
Deployments: deployments,
IngressRoutes: slices.Map(status.IngressRoutes, func(r dal.IngressRouteEntry) *ftlv1.StatusResponse_IngressRoute {
IngressRoutes: slices.Map(status.IngressRoutes, func(r dalmodel.IngressRouteEntry) *ftlv1.StatusResponse_IngressRoute {
return &ftlv1.StatusResponse_IngressRoute{
DeploymentKey: r.Deployment.String(),
Verb: &schemapb.Ref{Module: r.Module, Name: r.Verb},
Expand Down Expand Up @@ -596,7 +600,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
err = s.dal.UpsertRunner(ctx, dal.Runner{
err = s.dal.UpsertRunner(ctx, dalmodel.Runner{
Key: runnerKey,
Endpoint: msg.Endpoint,
Deployment: deploymentKey,
Expand Down Expand Up @@ -959,7 +963,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
// Publish the event.
err := s.dal.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body)
err := s.pubSub.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to publish a event to topic %s:%s: %w", req.Msg.Topic.Module, req.Msg.Topic.Name, err))
}
Expand Down Expand Up @@ -1117,14 +1121,14 @@ func (s *Service) UploadArtefact(ctx context.Context, req *connect.Request[ftlv1
func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftlv1.CreateDeploymentRequest]) (*connect.Response[ftlv1.CreateDeploymentResponse], error) {
logger := log.FromContext(ctx)

artefacts := make([]dal.DeploymentArtefact, len(req.Msg.Artefacts))
artefacts := make([]dalmodel.DeploymentArtefact, len(req.Msg.Artefacts))
for i, artefact := range req.Msg.Artefacts {
digest, err := sha256.ParseSHA256(artefact.Digest)
if err != nil {
logger.Errorf(err, "Invalid digest %s", artefact.Digest)
return nil, fmt.Errorf("invalid digest: %w", err)
}
artefacts[i] = dal.DeploymentArtefact{
artefacts[i] = dalmodel.DeploymentArtefact{
Executable: artefact.Executable,
Path: artefact.Path,
Digest: digest,
Expand Down Expand Up @@ -1167,7 +1171,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

func (s *Service) ResetSubscription(ctx context.Context, req *connect.Request[ftlv1.ResetSubscriptionRequest]) (*connect.Response[ftlv1.ResetSubscriptionResponse], error) {
err := s.dal.ResetSubscription(ctx, req.Msg.Subscription.Module, req.Msg.Subscription.Name)
err := s.pubSub.ResetSubscription(ctx, req.Msg.Subscription.Module, req.Msg.Subscription.Name)
if err != nil {
return nil, fmt.Errorf("could not reset subscription: %w", err)
}
Expand Down Expand Up @@ -1289,13 +1293,13 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if returnErr == nil {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
case async.AsyncOriginCron:
break

case dal.AsyncOriginFSM:
case async.AsyncOriginFSM:
break

case dal.AsyncOriginPubSub:
case async.AsyncOriginPubSub:
go s.pubSub.AsyncCallDidCommit(originalCtx, origin)

default:
Expand Down Expand Up @@ -1460,10 +1464,10 @@ func (s *Service) reapAsyncCalls(ctx context.Context) (nextInterval time.Duratio

func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata {
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
case async.AsyncOriginCron:
return &ftlv1.Metadata{}

case dal.AsyncOriginFSM:
case async.AsyncOriginFSM:
return &ftlv1.Metadata{
Values: []*ftlv1.Metadata_Pair{
{
Expand All @@ -1477,7 +1481,7 @@ func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata {
},
}

case dal.AsyncOriginPubSub:
case async.AsyncOriginPubSub:
return &ftlv1.Metadata{}

default:
Expand All @@ -1490,18 +1494,18 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal.

// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
case async.AsyncOriginCron:
if err := s.cronJobs.OnJobCompletion(ctx, origin.CronJobKey, failed); err != nil {
return fmt.Errorf("failed to finalize cron async call: %w", err)
}

case dal.AsyncOriginFSM:
case async.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
}

case dal.AsyncOriginPubSub:
if err := s.pubSub.OnCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
case async.AsyncOriginPubSub:
if err := s.pubSub.OnCallCompletion(ctx, tx.Connection, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize pubsub async call: %w", err)
}

Expand All @@ -1511,7 +1515,7 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal.
return nil
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, origin dal.AsyncOriginFSM, failed bool, isFinalResult bool) error {
func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, origin async.AsyncOriginFSM, failed bool, isFinalResult bool) error {
logger := log.FromContext(ctx).Scope(origin.FSM.String())

// retrieve the next fsm event and delete it
Expand Down Expand Up @@ -1757,7 +1761,7 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) {
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
deployments = []dal.Deployment{}
deployments = []dalmodel.Deployment{}
} else if err != nil {
return 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/encryption/api"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand Down Expand Up @@ -177,7 +177,7 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr
}

logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob)
origin := &parentdal.AsyncOriginCron{CronJobKey: job.Key}
origin := &async.AsyncOriginCron{CronJobKey: job.Key}
var request api.EncryptedColumn[api.AsyncSubKey]
err = s.encryption.Encrypt([]byte(`{}`), &request)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ import (
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/schema"
Expand All @@ -39,13 +44,15 @@ func TestNewCronJobsForModule(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri)))
assert.NoError(t, err)

parentDAL := parentdal.New(ctx, conn, encryption)
scheduler := scheduledtask.New(ctx, key, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
parentDAL := parentdal.New(ctx, conn, encryption, pubSub)
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute

deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{
Name: moduleName,
}, []parentdal.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate)
}, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate)
assert.NoError(t, err)
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)
Expand Down Expand Up @@ -107,7 +114,7 @@ func TestNewCronJobsForModule(t *testing.T) {
expectUnscheduledJobs(t, dal, clk, 2)
// Use the completion handler to schedule the next execution
for _, call := range calls {
origin, ok := call.Origin.(parentdal.AsyncOriginCron)
origin, ok := call.Origin.(async.AsyncOriginCron)
assert.True(t, ok)
err = cjs.OnJobCompletion(ctx, origin.CronJobKey, false)
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/cronjobs/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a38cf8c

Please sign in to comment.