Skip to content

Commit

Permalink
fix: add a sharded task scheduler (#766)
Browse files Browse the repository at this point in the history
This ensures that for all tasks that require it, they will only run on a
single controller, and will be distributed across the controllers.

This is implemented on top of a hash ring that is periodically synced
from the registered controllers in the DB.
  • Loading branch information
alecthomas authored Jan 12, 2024
1 parent 1445b3b commit 422d68a
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 38 deletions.
71 changes: 41 additions & 30 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/schema"
frontend "github.com/TBD54566975/ftl/frontend"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
Expand Down Expand Up @@ -127,6 +128,8 @@ type Service struct {
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink

tasks *scheduledtask.Scheduler

// Map from endpoint to client.
clients *ttlcache.Cache[string, clients]

Expand All @@ -143,6 +146,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
}
config.SetDefaults()
svc := &Service{
tasks: scheduledtask.New(ctx, key, db.GetControllers),
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand All @@ -152,13 +156,16 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
runnerScaling: runnerScaling,
}

go runWithRetries(ctx, time.Second*1, time.Second*2, svc.syncRoutes)
go runWithRetries(ctx, time.Second*3, time.Second*5, svc.heartbeatController)
go runWithRetries(ctx, time.Second*10, time.Second*20, svc.reapStaleControllers)
go runWithRetries(ctx, config.RunnerTimeout, time.Second*10, svc.reapStaleRunners)
go runWithRetries(ctx, config.DeploymentReservationTimeout, time.Second*20, svc.releaseExpiredReservations)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileDeployments)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileRunners)
svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes)
svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments)
svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileRunners)
// This should only run on one controller, but because dead controllers
// might be selected by the hash ring, we have to run it on all controllers.
// We should use a DB lock at some point.
svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.reapStaleControllers)
return svc, nil
}

Expand Down Expand Up @@ -790,32 +797,35 @@ func (s *Service) clientsForEndpoint(endpoint string) clients {
return client
}

func (s *Service) reapStaleRunners(ctx context.Context) error {
func (s *Service) reapStaleRunners(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
count, err := s.dal.KillStaleRunners(context.Background(), s.config.RunnerTimeout)
if err != nil {
return fmt.Errorf("%s: %w", "Failed to delete stale runners", err)
return 0, fmt.Errorf("%s: %w", "Failed to delete stale runners", err)
} else if count > 0 {
logger.Warnf("Reaped %d stale runners", count)
}
return nil
return s.config.RunnerTimeout, nil
}

func (s *Service) releaseExpiredReservations(ctx context.Context) error {
// Release any expired runner deployment reservations.
func (s *Service) releaseExpiredReservations(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
count, err := s.dal.ExpireRunnerClaims(ctx)
if err != nil {
return fmt.Errorf("%s: %w", "Failed to expire runner reservations", err)
return 0, fmt.Errorf("%s: %w", "Failed to expire runner reservations", err)
} else if count > 0 {
logger.Warnf("Expired %d runner reservations", count)
}
return nil
return s.config.DeploymentReservationTimeout, nil
}

func (s *Service) reconcileDeployments(ctx context.Context) error {
// Attempt to bring the converge the active number of replicas for each
// deployment with the desired number.
func (s *Service) reconcileDeployments(ctx context.Context) (time.Duration, error) {
reconciliation, err := s.dal.GetDeploymentsNeedingReconciliation(ctx)
if err != nil {
return fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err)
return 0, fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err)
}
wg, ctx := concurrency.New(ctx, concurrency.WithConcurrencyLimit(4))
for _, reconcile := range reconciliation {
Expand Down Expand Up @@ -854,13 +864,14 @@ func (s *Service) reconcileDeployments(ctx context.Context) error {
}
}
_ = wg.Wait()
return nil
return time.Second, nil
}

func (s *Service) reconcileRunners(ctx context.Context) error {
// Attempt to bring the number of active runners in line with the number of active deployments.
func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) {
activeDeployments, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err)
return 0, fmt.Errorf("%s: %w", "failed to get deployments needing reconciliation", err)
}

totalRunners := 0
Expand All @@ -872,17 +883,17 @@ func (s *Service) reconcileRunners(ctx context.Context) error {
// reconciliation cycle.
idleRunners, err := s.dal.GetIdleRunners(ctx, 16, model.Labels{})
if err != nil {
return err
return 0, err
}

idleRunnerKeys := slices.Map(idleRunners, func(r dal.Runner) model.RunnerKey { return r.Key })

err = s.runnerScaling.SetReplicas(ctx, totalRunners, idleRunnerKeys)
if err != nil {
return err
return 0, err
}

return nil
return time.Second, nil
}

func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentName) (bool, error) {
Expand Down Expand Up @@ -941,24 +952,24 @@ func (s *Service) reserveRunner(ctx context.Context, reconcile model.Deployment)
}

// Periodically remove stale (ie. have not heartbeat recently) controllers from the database.
func (s *Service) reapStaleControllers(ctx context.Context) error {
func (s *Service) reapStaleControllers(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
count, err := s.dal.KillStaleControllers(context.Background(), s.config.RunnerTimeout)
if err != nil {
return fmt.Errorf("%s: %w", "failed to delete stale controllers", err)
return 0, fmt.Errorf("%s: %w", "failed to delete stale controllers", err)
} else if count > 0 {
logger.Warnf("Reaped %d stale controllers", count)
}
return nil
return time.Second * 10, nil
}

// Periodically update the DB with the current state of the controller.
func (s *Service) heartbeatController(ctx context.Context) error {
func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error) {
_, err := s.dal.UpsertController(ctx, s.key, s.config.Advertise.String())
if err != nil {
return fmt.Errorf("%s: %w", "failed to heartbeat controller", err)
return 0, fmt.Errorf("%s: %w", "failed to heartbeat controller", err)
}
return nil
return time.Second * 3, nil

}

Expand Down Expand Up @@ -1086,17 +1097,17 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentName model.
}

// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) error {
func (s *Service) syncRoutes(ctx context.Context) (time.Duration, error) {
routes, err := s.dal.GetRoutingTable(ctx, nil)
if errors.Is(err, dal.ErrNotFound) {
routes = map[string][]dal.Route{}
} else if err != nil {
return err
return 0, err
}
s.routesMu.Lock()
s.routes = routes
s.routesMu.Unlock()
return nil
return time.Second, nil
}

func (s *Service) getActiveSchema(ctx context.Context) (*schema.Schema, error) {
Expand Down
24 changes: 16 additions & 8 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,25 @@ type DAL struct {
// RouteChanges is a Topic that receives changes to the routing table.
}

func (d *DAL) GetControllers(ctx context.Context, allControllers bool) ([]Controller, error) {
controllers, err := d.db.GetControllers(ctx, allControllers)
if err != nil {
return nil, translatePGError(err)
}
return slices.Map(controllers, func(in sql.Controller) Controller {
return Controller{
Key: in.Key,
Endpoint: in.Endpoint,
State: ControllerState(in.State),
}
}), nil
}

func (d *DAL) GetStatus(
ctx context.Context,
allControllers, allRunners, allDeployments, allIngressRoutes bool,
) (Status, error) {
controllers, err := d.db.GetControllers(ctx, allControllers)
controllers, err := d.GetControllers(ctx, allControllers)
if err != nil {
return Status{}, fmt.Errorf("%s: %w", "could not get control planes", translatePGError(err))
}
Expand Down Expand Up @@ -315,13 +329,7 @@ func (d *DAL) GetStatus(
return Status{}, err
}
return Status{
Controllers: slices.Map(controllers, func(in sql.Controller) Controller {
return Controller{
Key: in.Key,
Endpoint: in.Endpoint,
State: ControllerState(in.State),
}
}),
Controllers: controllers,
Deployments: statusDeployments,
Runners: domainRunners,
IngressRoutes: slices.Map(ingressRoutes, func(in sql.GetAllIngressRoutesRow) IngressRouteEntry {
Expand Down
Loading

0 comments on commit 422d68a

Please sign in to comment.