Skip to content

Commit

Permalink
feat: cron using async
Browse files Browse the repository at this point in the history
  • Loading branch information
safeer committed Aug 16, 2024
1 parent ebb66ad commit aed9777
Show file tree
Hide file tree
Showing 23 changed files with 3,359 additions and 1,424 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ init-db:

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/{cronjobs}/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/{cronjobs}/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} common/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/cronjobs/sql/queries.sql common/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
17 changes: 14 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,8 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
svc.routes.Store(map[string][]dal.Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, conn, svc.tasks, svc.callWithRequest)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, conn)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

pubSub := pubsub.New(ctx, db, svc.tasks, svc)
svc.pubSub = pubSub
Expand Down Expand Up @@ -540,7 +539,7 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
}
}

s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey)
s.cronJobs.CreatedOrReplacedDeloyment(ctx)

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}
Expand Down Expand Up @@ -1554,6 +1553,9 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *

func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata {
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
return &ftlv1.Metadata{}

case dal.AsyncOriginFSM:
return &ftlv1.Metadata{
Values: []*ftlv1.Metadata_Pair{
Expand Down Expand Up @@ -1581,6 +1583,15 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.A

// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
cjk, err := model.ParseCronJobKey(origin.CronJobKey)
if err != nil {
return fmt.Errorf("failed to parse cron job key: %w", err)
}
if err := s.cronJobs.OnJobCompletion(ctx, cjk, failed); err != nil {
return fmt.Errorf("failed to finalize cron async call: %w", err)
}

case dal.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
Expand Down
Loading

0 comments on commit aed9777

Please sign in to comment.