diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 00bd256bb9..f31f4430a0 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -40,6 +40,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/scaling" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/schema" frontend "github.com/TBD54566975/ftl/frontend" ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1" @@ -127,6 +128,8 @@ type Service struct { key model.ControllerKey deploymentLogsSink *deploymentLogsSink + tasks *scheduledtask.Scheduler + // Map from endpoint to client. clients *ttlcache.Cache[string, clients] @@ -143,6 +146,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. } config.SetDefaults() svc := &Service{ + tasks: scheduledtask.New(ctx, key, db.GetControllers), dal: db, key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), @@ -152,13 +156,16 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. runnerScaling: runnerScaling, } - go runWithRetries(ctx, time.Second*1, time.Second*2, svc.syncRoutes) - go runWithRetries(ctx, time.Second*3, time.Second*5, svc.heartbeatController) - go runWithRetries(ctx, time.Second*10, time.Second*20, svc.reapStaleControllers) - go runWithRetries(ctx, config.RunnerTimeout, time.Second*10, svc.reapStaleRunners) - go runWithRetries(ctx, config.DeploymentReservationTimeout, time.Second*20, svc.releaseExpiredReservations) - go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileDeployments) - go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileRunners) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController) + svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners) + svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations) + svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments) + svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileRunners) + // This should only run on one controller, but because dead controllers + // might be selected by the hash ring, we have to run it on all controllers. + // We should use a DB lock at some point. + svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.reapStaleControllers) return svc, nil } @@ -790,32 +797,35 @@ func (s *Service) clientsForEndpoint(endpoint string) clients { return client } -func (s *Service) reapStaleRunners(ctx context.Context) error { +func (s *Service) reapStaleRunners(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) count, err := s.dal.KillStaleRunners(context.Background(), s.config.RunnerTimeout) if err != nil { - return fmt.Errorf("%s: %w", "Failed to delete stale runners", err) + return 0, fmt.Errorf("%s: %w", "Failed to delete stale runners", err) } else if count > 0 { logger.Warnf("Reaped %d stale runners", count) } - return nil + return s.config.RunnerTimeout, nil } -func (s *Service) releaseExpiredReservations(ctx context.Context) error { +// Release any expired runner deployment reservations. +func (s *Service) releaseExpiredReservations(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) count, err := s.dal.ExpireRunnerClaims(ctx) if err != nil { - return fmt.Errorf("%s: %w", "Failed to expire runner reservations", err) + return 0, fmt.Errorf("%s: %w", "Failed to expire runner reservations", err) } else if count > 0 { logger.Warnf("Expired %d runner reservations", count) } - return nil + return s.config.DeploymentReservationTimeout, nil } -func (s *Service) reconcileDeployments(ctx context.Context) error { +// Attempt to bring the converge the active number of replicas for each +// deployment with the desired number. +func (s *Service) reconcileDeployments(ctx context.Context) (time.Duration, error) { reconciliation, err := s.dal.GetDeploymentsNeedingReconciliation(ctx) if err != nil { - return fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err) + return 0, fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err) } wg, ctx := concurrency.New(ctx, concurrency.WithConcurrencyLimit(4)) for _, reconcile := range reconciliation { @@ -854,13 +864,14 @@ func (s *Service) reconcileDeployments(ctx context.Context) error { } } _ = wg.Wait() - return nil + return time.Second, nil } -func (s *Service) reconcileRunners(ctx context.Context) error { +// Attempt to bring the number of active runners in line with the number of active deployments. +func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) { activeDeployments, err := s.dal.GetActiveDeployments(ctx) if err != nil { - return fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err) + return 0, fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err) } totalRunners := 0 @@ -872,17 +883,17 @@ func (s *Service) reconcileRunners(ctx context.Context) error { // reconciliation cycle. idleRunners, err := s.dal.GetIdleRunners(ctx, 16, model.Labels{}) if err != nil { - return err + return 0, err } idleRunnerKeys := slices.Map(idleRunners, func(r dal.Runner) model.RunnerKey { return r.Key }) err = s.runnerScaling.SetReplicas(ctx, totalRunners, idleRunnerKeys) if err != nil { - return err + return 0, err } - return nil + return time.Second, nil } func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentName) (bool, error) { @@ -941,24 +952,24 @@ func (s *Service) reserveRunner(ctx context.Context, reconcile model.Deployment) } // Periodically remove stale (ie. have not heartbeat recently) controllers from the database. -func (s *Service) reapStaleControllers(ctx context.Context) error { +func (s *Service) reapStaleControllers(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) count, err := s.dal.KillStaleControllers(context.Background(), s.config.RunnerTimeout) if err != nil { - return fmt.Errorf("%s: %w", "failed to delete stale controllers", err) + return 0, fmt.Errorf("%s: %w", "failed to delete stale controllers", err) } else if count > 0 { logger.Warnf("Reaped %d stale controllers", count) } - return nil + return time.Second * 10, nil } // Periodically update the DB with the current state of the controller. -func (s *Service) heartbeatController(ctx context.Context) error { +func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error) { _, err := s.dal.UpsertController(ctx, s.key, s.config.Advertise.String()) if err != nil { - return fmt.Errorf("%s: %w", "failed to heartbeat controller", err) + return 0, fmt.Errorf("%s: %w", "failed to heartbeat controller", err) } - return nil + return time.Second * 3, nil } @@ -1086,17 +1097,17 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentName model. } // Periodically sync the routing table from the DB. -func (s *Service) syncRoutes(ctx context.Context) error { +func (s *Service) syncRoutes(ctx context.Context) (time.Duration, error) { routes, err := s.dal.GetRoutingTable(ctx, nil) if errors.Is(err, dal.ErrNotFound) { routes = map[string][]dal.Route{} } else if err != nil { - return err + return 0, err } s.routesMu.Lock() s.routes = routes s.routesMu.Unlock() - return nil + return time.Second, nil } func (s *Service) getActiveSchema(ctx context.Context) (*schema.Schema, error) { diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index b112b5775f..c85c3b4453 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -244,11 +244,25 @@ type DAL struct { // RouteChanges is a Topic that receives changes to the routing table. } +func (d *DAL) GetControllers(ctx context.Context, allControllers bool) ([]Controller, error) { + controllers, err := d.db.GetControllers(ctx, allControllers) + if err != nil { + return nil, translatePGError(err) + } + return slices.Map(controllers, func(in sql.Controller) Controller { + return Controller{ + Key: in.Key, + Endpoint: in.Endpoint, + State: ControllerState(in.State), + } + }), nil +} + func (d *DAL) GetStatus( ctx context.Context, allControllers, allRunners, allDeployments, allIngressRoutes bool, ) (Status, error) { - controllers, err := d.db.GetControllers(ctx, allControllers) + controllers, err := d.GetControllers(ctx, allControllers) if err != nil { return Status{}, fmt.Errorf("%s: %w", "could not get control planes", translatePGError(err)) } @@ -315,13 +329,7 @@ func (d *DAL) GetStatus( return Status{}, err } return Status{ - Controllers: slices.Map(controllers, func(in sql.Controller) Controller { - return Controller{ - Key: in.Key, - Endpoint: in.Endpoint, - State: ControllerState(in.State), - } - }), + Controllers: controllers, Deployments: statusDeployments, Runners: domainRunners, IngressRoutes: slices.Map(ingressRoutes, func(in sql.GetAllIngressRoutesRow) IngressRouteEntry { diff --git a/backend/controller/scheduledtask/scheduledtask.go b/backend/controller/scheduledtask/scheduledtask.go new file mode 100644 index 0000000000..cd3a06abcb --- /dev/null +++ b/backend/controller/scheduledtask/scheduledtask.go @@ -0,0 +1,174 @@ +// Package scheduledtask implements a task scheduler. +package scheduledtask + +import ( + "context" + "math/rand" + "reflect" + "runtime" + "sort" + "strings" + "time" + + "github.com/alecthomas/atomic" + "github.com/jpillora/backoff" + "github.com/serialx/hashring" + + "github.com/TBD54566975/ftl/backend/common/log" + "github.com/TBD54566975/ftl/backend/common/model" + "github.com/TBD54566975/ftl/backend/common/slices" + "github.com/TBD54566975/ftl/backend/controller/dal" +) + +type descriptor struct { + next time.Time + name string + retry backoff.Backoff + job Job + singlyHomed bool +} + +// A Job is a function that is scheduled to run periodically. +// +// The Job itself controls its schedule by returning the next time it should +// run. +type Job func(ctx context.Context) (time.Duration, error) + +// Scheduler is a task scheduler for the controller. +// +// Each job runs in its own goroutine. +// +// The scheduler uses a consistent hash ring to attempt to ensure that jobs are +// only run on a single controller at a time. This is not guaranteed, however, +// as the hash ring is only updated periodically and controllers may have +// inconsistent views of the hash ring. +type Scheduler struct { + getControllers func(ctx context.Context, all bool) ([]dal.Controller, error) + key model.ControllerKey + jobs chan *descriptor + + hashring atomic.Value[*hashring.HashRing] +} + +// New creates a new [Scheduler]. +func New(ctx context.Context, id model.ControllerKey, getControllers func(ctx context.Context, all bool) ([]dal.Controller, error)) *Scheduler { + s := &Scheduler{getControllers: getControllers, key: id, jobs: make(chan *descriptor)} + _ = s.updateHashring(ctx) + go s.syncHashRing(ctx) + go s.run(ctx) + return s +} + +// Singleton schedules a job to attempt to run on only a single controller. +// +// This is not guaranteed, however, as controllers may have inconsistent views +// of the hash ring. +func (s *Scheduler) Singleton(retry backoff.Backoff, job Job) { + s.schedule(retry, job, true) +} + +// Parallel schedules a job to run on every controller. +func (s *Scheduler) Parallel(retry backoff.Backoff, job Job) { + s.schedule(retry, job, false) +} + +func (s *Scheduler) schedule(retry backoff.Backoff, job Job, singlyHomed bool) { + name := runtime.FuncForPC(reflect.ValueOf(job).Pointer()).Name() + name = name[strings.LastIndex(name, ".")+1:] + name = strings.TrimSuffix(name, "-fm") + s.jobs <- &descriptor{ + name: name, + retry: retry, + job: job, + singlyHomed: singlyHomed, + next: time.Now().Add(time.Millisecond * time.Duration(rand.Int63n(2000))), //nolint:gosec + } +} + +func (s *Scheduler) run(ctx context.Context) { + logger := log.FromContext(ctx).Scope("cron") + // List of jobs to run. + // For singleton jobs running on a different host, this can include jobs + // scheduled in the past. These are skipped on each run. + jobs := []*descriptor{} + for { + next := time.Now().Add(time.Second) + // Find the next job to run. + if len(jobs) > 0 { + sort.Slice(jobs, func(i, j int) bool { return jobs[i].next.Before(jobs[j].next) }) + for _, job := range jobs { + if job.next.IsZero() { + continue + } + next = job.next + break + } + } + + select { + case <-ctx.Done(): + return + + case <-time.After(time.Until(next)): + // Jobs to reschedule on the next run. + for i, job := range jobs { + if job.next.After(time.Now()) { + continue + } + job := job + hashring := s.hashring.Load() + + // If the job is singly homed, check that we are the active controller. + if job.singlyHomed { + if node, ok := hashring.GetNode(job.name); !ok || node != s.key.String() { + job.next = time.Time{} + continue + } + } + jobs[i] = nil // Zero out scheduled jobs. + logger.Scope(job.name).Tracef("Running cron job") + go func() { + if delay, err := job.job(ctx); err != nil { + logger.Scope(job.name).Warnf("%s", err) + job.next = time.Now().Add(job.retry.Duration()) + } else { + // Reschedule the job. + job.retry.Reset() + job.next = time.Now().Add(delay) + } + s.jobs <- job + }() + } + jobs = slices.Filter(jobs, func(job *descriptor) bool { return job != nil }) + + case job := <-s.jobs: + jobs = append(jobs, job) + } + } +} + +// Synchronise the hash ring with the active controllers. +func (s *Scheduler) syncHashRing(ctx context.Context) { + logger := log.FromContext(ctx).Scope("cron") + for { + select { + case <-ctx.Done(): + return + + case <-time.After(time.Second * 5): + if err := s.updateHashring(ctx); err != nil { + logger.Warnf("Failed to get controllers: %s", err) + } + } + } +} + +func (s *Scheduler) updateHashring(ctx context.Context) error { + controllers, err := s.getControllers(ctx, false) + if err != nil { + return err + } + hashring := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) + s.hashring.Store(hashring) + return nil +} diff --git a/backend/controller/scheduledtask/scheduledtask_test.go b/backend/controller/scheduledtask/scheduledtask_test.go new file mode 100644 index 0000000000..7ece8a5331 --- /dev/null +++ b/backend/controller/scheduledtask/scheduledtask_test.go @@ -0,0 +1,59 @@ +package scheduledtask + +import ( + "context" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/jpillora/backoff" + + "github.com/TBD54566975/ftl/backend/common/log" + "github.com/TBD54566975/ftl/backend/common/model" + "github.com/TBD54566975/ftl/backend/common/slices" + "github.com/TBD54566975/ftl/backend/controller/dal" +) + +func TestCron(t *testing.T) { + t.Parallel() + ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Info})) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + var singletonCount atomic.Int64 + var multiCount atomic.Int64 + + type controller struct { + controller dal.Controller + cron *Scheduler + } + + controllers := []*controller{ + {controller: dal.Controller{Key: model.NewControllerKey()}}, + {controller: dal.Controller{Key: model.NewControllerKey()}}, + {controller: dal.Controller{Key: model.NewControllerKey()}}, + {controller: dal.Controller{Key: model.NewControllerKey()}}, + } + + for _, c := range controllers { + c := c + c.cron = New(ctx, c.controller.Key, func(ctx context.Context, all bool) ([]dal.Controller, error) { + return slices.Map(controllers, func(c *controller) dal.Controller { return c.controller }), nil + }) + c.cron.Singleton(backoff.Backoff{}, func(ctx context.Context) (time.Duration, error) { + singletonCount.Add(1) + return time.Second, nil + }) + c.cron.Parallel(backoff.Backoff{}, func(ctx context.Context) (time.Duration, error) { + multiCount.Add(1) + return time.Second, nil + }) + } + + time.Sleep(time.Second * 6) + + assert.True(t, singletonCount.Load() >= 5 && singletonCount.Load() < 10, "expected singletonCount to be >= 5 but was %d", singletonCount.Load()) + assert.True(t, multiCount.Load() >= 20 && multiCount.Load() < 30, "expected multiCount to be >= 20 but was %d", multiCount.Load()) +} diff --git a/go.mod b/go.mod index 6dc9923db0..91add82079 100644 --- a/go.mod +++ b/go.mod @@ -101,6 +101,7 @@ require ( github.com/lib/pq v1.10.9 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b github.com/swaggest/refl v1.3.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect golang.design/x/reflect v0.0.0-20220504060917-02c43be63f3b diff --git a/go.sum b/go.sum index 5c046f08b8..6d8390b8c0 100644 --- a/go.sum +++ b/go.sum @@ -212,6 +212,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6Ng github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= +github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b h1:h+3JX2VoWTFuyQEo87pStk/a99dzIO1mM9KxIyLPGTU= +github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=