Skip to content

Commit

Permalink
fix: test scheduledtask with a fake clock (#823)
Browse files Browse the repository at this point in the history
So that the tests don't take 6 seconds, which is annoying.
  • Loading branch information
alecthomas authored Jan 20, 2024
1 parent eb4827f commit 11cf7e1
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
2 changes: 1 addition & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
}
config.SetDefaults()
svc := &Service{
tasks: scheduledtask.New(ctx, key, db.GetControllers),
tasks: scheduledtask.New(ctx, key, db),
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand Down
48 changes: 35 additions & 13 deletions backend/controller/scheduledtask/scheduledtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/alecthomas/atomic"
clock "github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
"github.com/serialx/hashring"

Expand All @@ -34,6 +35,16 @@ type descriptor struct {
// run.
type Job func(ctx context.Context) (time.Duration, error)

type DAL interface {
GetControllers(ctx context.Context, all bool) ([]dal.Controller, error)
}

type DALFunc func(ctx context.Context, all bool) ([]dal.Controller, error)

func (f DALFunc) GetControllers(ctx context.Context, all bool) ([]dal.Controller, error) {
return f(ctx, all)
}

// Scheduler is a task scheduler for the controller.
//
// Each job runs in its own goroutine.
Expand All @@ -43,16 +54,26 @@ type Job func(ctx context.Context) (time.Duration, error)
// as the hash ring is only updated periodically and controllers may have
// inconsistent views of the hash ring.
type Scheduler struct {
getControllers func(ctx context.Context, all bool) ([]dal.Controller, error)
key model.ControllerKey
jobs chan *descriptor
controller DAL
key model.ControllerKey
jobs chan *descriptor
clock clock.Clock

hashring atomic.Value[*hashring.HashRing]
}

// New creates a new [Scheduler].
func New(ctx context.Context, id model.ControllerKey, getControllers func(ctx context.Context, all bool) ([]dal.Controller, error)) *Scheduler {
s := &Scheduler{getControllers: getControllers, key: id, jobs: make(chan *descriptor)}
func New(ctx context.Context, id model.ControllerKey, controller DAL) *Scheduler {
return NewForTesting(ctx, id, controller, clock.New())
}

func NewForTesting(ctx context.Context, id model.ControllerKey, controller DAL, clock clock.Clock) *Scheduler {
s := &Scheduler{
controller: controller,
key: id,
jobs: make(chan *descriptor),
clock: clock,
}
_ = s.updateHashring(ctx)
go s.syncHashRing(ctx)
go s.run(ctx)
Expand Down Expand Up @@ -81,7 +102,7 @@ func (s *Scheduler) schedule(retry backoff.Backoff, job Job, singlyHomed bool) {
retry: retry,
job: job,
singlyHomed: singlyHomed,
next: time.Now().Add(time.Millisecond * time.Duration(rand.Int63n(2000))), //nolint:gosec
next: s.clock.Now().Add(time.Millisecond * time.Duration(rand.Int63n(2000))), //nolint:gosec
}
}

Expand All @@ -92,7 +113,7 @@ func (s *Scheduler) run(ctx context.Context) {
// scheduled in the past. These are skipped on each run.
jobs := []*descriptor{}
for {
next := time.Now().Add(time.Second)
next := s.clock.Now().Add(time.Second)
// Find the next job to run.
if len(jobs) > 0 {
sort.Slice(jobs, func(i, j int) bool { return jobs[i].next.Before(jobs[j].next) })
Expand All @@ -105,14 +126,15 @@ func (s *Scheduler) run(ctx context.Context) {
}
}

now := s.clock.Now()
select {
case <-ctx.Done():
return

case <-time.After(time.Until(next)):
case <-s.clock.After(next.Sub(now)):
// Jobs to reschedule on the next run.
for i, job := range jobs {
if job.next.After(time.Now()) {
if job.next.After(s.clock.Now()) {
continue
}
job := job
Expand All @@ -130,11 +152,11 @@ func (s *Scheduler) run(ctx context.Context) {
go func() {
if delay, err := job.job(ctx); err != nil {
logger.Scope(job.name).Warnf("%s", err)
job.next = time.Now().Add(job.retry.Duration())
job.next = s.clock.Now().Add(job.retry.Duration())
} else {
// Reschedule the job.
job.retry.Reset()
job.next = time.Now().Add(delay)
job.next = s.clock.Now().Add(delay)
}
s.jobs <- job
}()
Expand All @@ -155,7 +177,7 @@ func (s *Scheduler) syncHashRing(ctx context.Context) {
case <-ctx.Done():
return

case <-time.After(time.Second * 5):
case <-s.clock.After(time.Second * 5):
if err := s.updateHashring(ctx); err != nil {
logger.Warnf("Failed to get controllers: %s", err)
}
Expand All @@ -164,7 +186,7 @@ func (s *Scheduler) syncHashRing(ctx context.Context) {
}

func (s *Scheduler) updateHashring(ctx context.Context) error {
controllers, err := s.getControllers(ctx, false)
controllers, err := s.controller.GetControllers(ctx, false)
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions backend/controller/scheduledtask/scheduledtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/alecthomas/assert/v2"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"

"github.com/TBD54566975/ftl/backend/common/log"
Expand Down Expand Up @@ -37,11 +38,13 @@ func TestCron(t *testing.T) {
{controller: dal.Controller{Key: model.NewControllerKey()}},
}

clock := clock.NewMock()

for _, c := range controllers {
c := c
c.cron = New(ctx, c.controller.Key, func(ctx context.Context, all bool) ([]dal.Controller, error) {
c.cron = NewForTesting(ctx, c.controller.Key, DALFunc(func(ctx context.Context, all bool) ([]dal.Controller, error) {
return slices.Map(controllers, func(c *controller) dal.Controller { return c.controller }), nil
})
}), clock)
c.cron.Singleton(backoff.Backoff{}, func(ctx context.Context) (time.Duration, error) {
singletonCount.Add(1)
return time.Second, nil
Expand All @@ -52,7 +55,7 @@ func TestCron(t *testing.T) {
})
}

time.Sleep(time.Second * 6)
clock.Add(time.Second * 6)

assert.True(t, singletonCount.Load() >= 5 && singletonCount.Load() < 10, "expected singletonCount to be >= 5 but was %d", singletonCount.Load())
assert.True(t, multiCount.Load() >= 20 && multiCount.Load() < 30, "expected multiCount to be >= 20 but was %d", multiCount.Load())
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ require (
require (
github.com/alecthomas/repr v0.3.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/benbjohnson/clock v1.3.5
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/dlclark/regexp2 v1.8.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 11cf7e1

Please sign in to comment.