diff --git a/backend/controller/controller.go b/backend/controller/controller.go index d646391a43..30edfe1fe6 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -31,6 +31,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/scaling" + "github.com/TBD54566975/ftl/backend/controller/scaling/localscaling" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect" @@ -166,6 +167,26 @@ type Service struct { increaseReplicaFailures map[string]int } +type jobConfig struct { + job scheduledtask.Job + backoff backoff.Backoff + develBackoff optional.Option[backoff.Backoff] +} + +func (j jobConfig) getBackoff(devel bool) backoff.Backoff { + var bo backoff.Backoff + if devel { + var ok bool + if bo, ok = j.develBackoff.Get(); !ok { + // if in devel and develBackoff is empty, use 1s for min/max + bo = backoff.Backoff{Min: time.Second, Max: time.Second} + } + } else { + bo = j.backoff + } + return bo +} + func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) { key := config.Key if config.Key.IsZero() { @@ -188,17 +209,30 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) - svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes) - svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController) - svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, svc.updateControllersList) - svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners) - svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations) - svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments) - svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileRunners) - // This should only run on one controller, but because dead controllers - // might be selected by the hash ring, we have to run it on all controllers. - // We should use a DB lock at some point. - svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.reapStaleControllers) + parallelJobs := []jobConfig{ + {svc.syncRoutes, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, + {svc.heartbeatController, backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, optional.Some[backoff.Backoff](backoff.Backoff{Min: time.Second * 2, Max: time.Second * 2})}, + {svc.updateControllersList, backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, + // This should only run on one controller, but because dead controllers + // might be selected by the hash ring, we have to run it on all controllers. + // We should use a DB lock at some point. + {svc.reapStaleControllers, backoff.Backoff{Min: time.Second * 20, Max: time.Second * 20}, optional.None[backoff.Backoff]()}, + } + + singletonJobs := []jobConfig{ + {svc.reapStaleRunners, backoff.Backoff{Min: time.Second, Max: time.Second * 10}, optional.None[backoff.Backoff]()}, + {svc.releaseExpiredReservations, backoff.Backoff{Min: time.Second, Max: time.Second * 20}, optional.None[backoff.Backoff]()}, + {svc.reconcileDeployments, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, + {svc.reconcileRunners, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, + } + + _, devel := runnerScaling.(*localscaling.LocalScaling) + for _, j := range parallelJobs { + svc.tasks.Parallel(j.getBackoff(devel), j.job) + } + for _, j := range singletonJobs { + svc.tasks.Singleton(j.getBackoff(devel), j.job) + } return svc, nil }