Skip to content

Commit

Permalink
fix: vastly improve development responsiveness and startup time
Browse files Browse the repository at this point in the history
There are two changes that contribute to this improvement:

1. Reduce most task reschedule timers to 1s.
2. Decrease runner/controller heartbeats to 1s.
3. Clear all transient database rows that slow down restarts - leases,
   runner/controller locks, etc.

Fixes #1010
Fixes #1381
  • Loading branch information
alecthomas committed May 11, 2024
1 parent f7fb7a6 commit ecbd466
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 376 deletions.
52 changes: 34 additions & 18 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
ConsoleURL *url.URL `help:"The public URL of the console (for CORS)." env:"FTL_CONTROLLER_CONSOLE_URL"`
ContentTime time.Time `help:"Time to use for console resource timestamps." default:"${timestamp=1970-01-01T00:00:00Z}"`
RunnerTimeout time.Duration `help:"Runner heartbeat timeout." default:"10s"`
ControllerTimeout time.Duration `help:"Controller heartbeat timeout." default:"10s"`
DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"`
ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"`
CommonConfig
Expand Down Expand Up @@ -177,6 +178,14 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
}
config.SetDefaults()

// Override some defaults during development mode.
_, devel := runnerScaling.(*localscaling.LocalScaling)
if devel {
config.RunnerTimeout = time.Second * 5
config.ControllerTimeout = time.Second * 5
}

svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
dal: db,
Expand All @@ -194,37 +203,44 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) or develBackoff if available.
maybeDevelBackoff := func(min, max time.Duration, develBackoff ...backoff.Backoff) backoff.Backoff {
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
// runtime is capped at 1s.
maybeDevelTask := func(job scheduledtask.Job, maxNext, minDelay, maxDelay time.Duration, develBackoff ...backoff.Backoff) (backoff.Backoff, scheduledtask.Job) {
if len(develBackoff) > 1 {
panic("too many devel backoffs")
}
if _, devel := runnerScaling.(*localscaling.LocalScaling); devel {
if devel {
chain := job
job = func(ctx context.Context) (time.Duration, error) {
next, err := chain(ctx)
// Cap at 1s in development mode.
return min(next, maxNext), err
}
if len(develBackoff) == 1 {
return develBackoff[0]
return develBackoff[0], job
}
return backoff.Backoff{Min: time.Second, Max: time.Second}
return backoff.Backoff{Min: time.Second, Max: time.Second}, job
}
return makeBackoff(min, max)
return makeBackoff(minDelay, maxDelay), job
}

// Parallel tasks.
svc.tasks.Parallel(maybeDevelBackoff(time.Second, time.Second*5), svc.syncRoutes)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*3, time.Second*5), svc.heartbeatController)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*5), svc.updateControllersList)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*10), svc.executeAsyncCalls)
svc.tasks.Parallel(maybeDevelTask(svc.syncRoutes, time.Second, time.Second, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.heartbeatController, time.Second, time.Second*3, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.updateControllersList, time.Second, time.Second*5, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.executeAsyncCalls, time.Second, time.Second*5, time.Second*10))

// This should be a singleton task, but because this is the task that
// actually expires the leases used to run singleton tasks, it must be
// parallel.
svc.tasks.Parallel(maybeDevelBackoff(time.Second, time.Second*5), svc.expireStaleLeases)
svc.tasks.Parallel(maybeDevelTask(svc.expireStaleLeases, time.Second*2, time.Second, time.Second*5))

// Singleton tasks use leases to only run on a single controller.
svc.tasks.Singleton(maybeDevelBackoff(time.Second*20, time.Second*20), svc.reapStaleControllers)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*10), svc.reapStaleRunners)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*20), svc.releaseExpiredReservations)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.reconcileDeployments)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.reconcileRunners)
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleControllers, time.Second*2, time.Second*20, time.Second*20))
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleRunners, time.Second*2, time.Second, time.Second*10))
svc.tasks.Singleton(maybeDevelTask(svc.releaseExpiredReservations, time.Second*2, time.Second, time.Second*20))
svc.tasks.Singleton(maybeDevelTask(svc.reconcileDeployments, time.Second*2, time.Second, time.Second*5))
svc.tasks.Singleton(maybeDevelTask(svc.reconcileRunners, time.Second*2, time.Second, time.Second*5))
return svc, nil
}

Expand Down Expand Up @@ -1089,7 +1105,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
}
switch call.Origin {
case dal.AsyncCallOriginFSM:
return time.Second * 2, s.onAsyncFSMCallCompletion(ctx, call, resp.Msg)
return time.Millisecond * 100, s.onAsyncFSMCallCompletion(ctx, call, resp.Msg)

default:
panic(fmt.Sprintf("unexpected async call origin: %s", call.Origin))
Expand Down Expand Up @@ -1166,7 +1182,7 @@ func (s *Service) reserveRunner(ctx context.Context, reconcile model.Deployment)
// Periodically remove stale (ie. have not heartbeat recently) controllers from the database.
func (s *Service) reapStaleControllers(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
count, err := s.dal.KillStaleControllers(context.Background(), s.config.RunnerTimeout)
count, err := s.dal.KillStaleControllers(context.Background(), s.config.ControllerTimeout)
if err != nil {
return 0, fmt.Errorf("failed to delete stale controllers: %w", err)
} else if count > 0 {
Expand Down
24 changes: 0 additions & 24 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,6 @@ import (
"github.com/TBD54566975/ftl/backend/schema"
)

// SendFSMEvent sends an event to an executing instance of an FSM.
//
// If the instance doesn't exist a new one will be created.
//
// [name] is the name of the state machine to execute, [executionKey] is the
// unique identifier for this execution of the FSM.
//
// Returns ErrConflict if the state machine is already executing.
//
// Note: this does not actually call the FSM, it just enqueues an async call for
// future execution.
//
// Note: no validation of the FSM is performed.
func (d *DAL) SendFSMEvent(ctx context.Context, name, executionKey, destinationState string, verb schema.Ref, request json.RawMessage) error {
_, err := d.db.SendFSMEvent(ctx, sql.SendFSMEventParams{
Key: executionKey,
Name: name,
State: destinationState,
Verb: verb,
Request: request,
})
return translatePGError(err)
}

// AsyncCallOrigin represents the kind of originator of the async call.
type AsyncCallOrigin sql.AsyncCallOrigin

Expand Down
33 changes: 33 additions & 0 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dal

import (
"context"
"encoding/json"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
)

// SendFSMEvent sends an event to an executing instance of an FSM.
//
// If the instance doesn't exist a new one will be created.
//
// [name] is the name of the state machine to execute, [executionKey] is the
// unique identifier for this execution of the FSM.
//
// Returns ErrConflict if the state machine is already executing.
//
// Note: this does not actually call the FSM, it just enqueues an async call for
// future execution.
//
// Note: no validation of the FSM is performed.
func (d *DAL) SendFSMEvent(ctx context.Context, name, executionKey, destinationState string, verb schema.Ref, request json.RawMessage) error {
_, err := d.db.SendFSMEvent(ctx, sql.SendFSMEventParams{
Key: executionKey,
Name: name,
State: destinationState,
Verb: verb,
Request: request,
})
return translatePGError(err)
}
3 changes: 3 additions & 0 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/alecthomas/kong"

Expand Down Expand Up @@ -95,6 +96,8 @@ func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int, idleRunner
}); err != nil {
return err
}
config.HeartbeatPeriod = time.Second
config.HeartbeatJitter = time.Millisecond * 100

runnerCtx := log.ContextWithLogger(ctx, logger.Scope(simpleName))

Expand Down
34 changes: 31 additions & 3 deletions backend/controller/sql/databasetesting/devel.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*pgxpool.Po
if recreate {
// Terminate any dangling connections.
_, err = conn.Exec(ctx, `
SELECT pid, pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = $1 AND pid <> pg_backend_pid()`,
SELECT pid, pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = $1 AND pid <> pg_backend_pid()`,
config.Database)
if err != nil {
return nil, err
Expand All @@ -72,5 +72,33 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*pgxpool.Po
if err != nil {
return nil, err
}
// Reset transient state in the database to a clean state for development purposes.
// This includes things like resetting the state of async calls, leases,
// controller/runner registration, etc. but not anything more.
if !recreate {
_, err = realConn.Exec(ctx, `
WITH deleted AS (
DELETE FROM async_calls
RETURNING 1
), deleted_fsm_executions AS (
DELETE FROM fsm_executions
RETURNING 1
), deleted_leases AS (
DELETE FROM leases
RETURNING 1
), deleted_controllers AS (
DELETE FROM controller
RETURNING 1
), deleted_runners AS (
DELETE FROM runners
RETURNING 1
)
SELECT COUNT(*)
`)
if err != nil {
return nil, err
}
}

return realConn, nil
}
1 change: 1 addition & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ WITH async_call AS (
FROM async_calls
WHERE state = 'pending'
LIMIT 1
FOR UPDATE SKIP LOCKED
), lease AS (
INSERT INTO leases (idempotency_key, key, expires_at)
VALUES (gen_random_uuid(), '/system/async_call/' || (SELECT id FROM async_call), (NOW() AT TIME ZONE 'utc') + @ttl::interval)
Expand Down
1 change: 1 addition & 0 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ecbd466

Please sign in to comment.