Skip to content

Commit

Permalink
add tests, fixed case where slice was mutable by multiple owners
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 10, 2024
1 parent b2e2022 commit f479fd1
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 43 deletions.
2 changes: 1 addition & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
}
config.SetDefaults()
svc := &Service{
tasks: scheduledtask.New(ctx, key, db),
tasks: scheduledtask.New(ctx, key),
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand Down
57 changes: 38 additions & 19 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,28 @@ type hashRingState struct {
idx int
}

type DAL interface {
GetCronJobs(ctx context.Context) ([]dal.CronJob, error)
CreateCronJob(ctx context.Context, deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) (dal.CronJob, error)
StartCronJobs(ctx context.Context, jobs []dal.CronJob) (jobMap map[dal.CronJob]bool, keysWithNoReplicas map[model.DeploymentKey]bool, err error)
EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error)
GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error)
}

type Scheduler interface {
Singleton(retry backoff.Backoff, job scheduledtask.Job)
Parallel(retry backoff.Backoff, job scheduledtask.Job)
}

type CallExecuter interface {
Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error)
}

type Service struct {
config Config
key model.ControllerKey
dal *dal.DAL
scheduler *scheduledtask.Scheduler
dal DAL
scheduler Scheduler
executor CallExecuter // Change the type from *CallExecuter to CallExecuter

clock clock.Clock
Expand All @@ -65,18 +82,18 @@ type Service struct {
hashRingState atomic.Value[*hashRingState]
}

type CallExecuter interface {
Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error)
func New(ctx context.Context, key model.ControllerKey, config Config, dal DAL, scheduler Scheduler, executor CallExecuter) *Service {
return NewForTesting(ctx, key, config, dal, scheduler, executor, clock.New())
}

func New(ctx context.Context, key model.ControllerKey, config Config, dal *dal.DAL, scheduler *scheduledtask.Scheduler, executor CallExecuter) *Service {
func NewForTesting(ctx context.Context, key model.ControllerKey, config Config, dal DAL, scheduler Scheduler, executor CallExecuter, clock clock.Clock) *Service {
svc := &Service{
config: config,
key: key,
dal: dal,
scheduler: scheduler,
executor: executor,
clock: clock.New(),
clock: clock,
jobChanges: pubsub.New[jobChange](),
}
svc.UpdatedControllerList(ctx, nil)
Expand All @@ -102,7 +119,7 @@ func (s *Service) CreatedDeployment(ctx context.Context, deploymentKey model.Dep
continue
}

start := time.Now().UTC()
start := s.clock.Now().UTC()
next, err := cron.NextAfter(schedule, start, false)
if err != nil {
logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", deploymentKey, verb.Name, schedule)
Expand Down Expand Up @@ -177,7 +194,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
logger.Errorf(err, "failed to parse cron schedule %q", job.Schedule)
return
}
next, err := cron.NextAfter(schedule, time.Now().UTC(), false)
next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false)
if err != nil {
logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", job.DeploymentKey, job.Verb, job.Schedule)
}
Expand All @@ -202,7 +219,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) {

updatedJobs := []dal.CronJob{}
for _, stale := range staleJobs {
start := time.Now().UTC()
start := s.clock.Now().UTC()
pattern, err := cron.Parse(stale.Schedule)
if err != nil {
logger.Errorf(err, "Could not kill stale cron job %v:%v because schedule could not be parsed: %q", stale.DeploymentKey, stale.Verb, stale.Schedule)
Expand Down Expand Up @@ -231,8 +248,10 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) {
return time.Minute, nil
}

// watchForUpdates listens for updates and schedules jobs
// Other parts of the service publish into this channel
// watchForUpdates is the centralized place that handles:
// - list of known jobs and their state
// - executing jobs when they are due
// - reacting to events that change the list of jobs, deployments or hash ring
func (s *Service) watchForUpdates(ctx context.Context) {
logger := log.FromContext(ctx)

Expand All @@ -243,7 +262,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {
state := &State{
executing: map[jobIdentifier]bool{},
newJobs: map[jobIdentifier]bool{},
blockedUntil: time.Now(),
blockedUntil: s.clock.Now(),
}

for {
Expand All @@ -252,7 +271,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {
})

now := s.clock.Now()
next := time.Now().Add(time.Hour) // should never be reached, expect a different signal long beforehand
next := now.Add(time.Hour) // should never be reached, expect a different signal long beforehand
for _, j := range state.jobs {
if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil {
next = possibleNext
Expand All @@ -279,14 +298,14 @@ func (s *Service) watchForUpdates(ctx context.Context) {
// Try starting jobs in db
jobsToAttempt := slices.Filter(state.jobs, func(j dal.CronJob) bool {
if n, err := s.nextAttemptForJob(j, state, true); err == nil {
return !n.After(time.Now().UTC())
return !n.After(s.clock.Now().UTC())
}
return false
})
jobResults, removedDeploymentKeys, err := s.dal.StartCronJobs(ctx, jobsToAttempt)
if err != nil {
logger.Errorf(err, "failed to start cron jobs in db")
state.blockedUntil = time.Now().Add(time.Second * 5)
state.blockedUntil = s.clock.Now().Add(time.Second * 5)
continue
}

Expand Down Expand Up @@ -351,20 +370,20 @@ func (s *Service) sortJobs(state *State, i, j dal.CronJob) int {

func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow bool) (time.Time, error) {
if !s.isResponsibleForJob(job, state) {
return time.Now(), fmt.Errorf("controller is not responsible for job")
return s.clock.Now(), fmt.Errorf("controller is not responsible for job")
}
if job.State == dal.JobStateExecuting {
if state.isExecutingInCurrentController(job) {
// return a time in the future, meaning don't schedule at this time
return time.Now(), fmt.Errorf("controller is already waiting for job to finish")
return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish")
}
// We don't know when the other controller will finish this job
// We should check again when the next execution date is assuming the job finishes
pattern, err := cron.Parse(job.Schedule)
if err != nil {
return time.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule)
return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule)
}
next, err := cron.NextAfter(pattern, time.Now().UTC(), allowsNow)
next, err := cron.NextAfter(pattern, s.clock.Now().UTC(), allowsNow)
if err == nil {
return next, nil
}
Expand Down
225 changes: 225 additions & 0 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package cronjobs

import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"

"connectrpc.com/connect"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
)

type mockDAL struct {
lock sync.Mutex
clock *clock.Mock
jobs []dal.CronJob
attemptCountMap map[string]int
}

func (d *mockDAL) GetCronJobs(ctx context.Context) ([]dal.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

return d.jobs, nil
}

func (d *mockDAL) CreateCronJob(ctx context.Context, deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) (dal.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

job := dal.CronJob{
DeploymentKey: deploymentKey,
Module: module,
Verb: verb,
Schedule: schedule,
StartTime: startTime,
NextExecution: nextExecution,
State: dal.JobStateIdle,
}
d.jobs = append(d.jobs, job)
return job, nil
}

func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) {
for i, j := range d.jobs {
if j.DeploymentKey == job.DeploymentKey && j.Verb == job.Verb {
return i, nil
}
}
return -1, fmt.Errorf("job not found")
}

func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (jobMap map[dal.CronJob]bool, keysWithNoReplicas map[model.DeploymentKey]bool, err error) {
d.lock.Lock()
defer d.lock.Unlock()

jobMap = map[dal.CronJob]bool{}
now := (*d.clock).Now()

for _, inputJob := range jobs {
i, err := d.indexForJob(inputJob)
if err != nil {
return nil, nil, err
}
job := d.jobs[i]
if !job.NextExecution.After(now) && job.State == dal.JobStateIdle {
job.State = dal.JobStateExecuting
job.StartTime = (*d.clock).Now()
d.jobs[i] = job
jobMap[job] = true
} else {
jobMap[job] = false
}
d.attemptCountMap[job.Verb]++
}
return jobMap, map[model.DeploymentKey]bool{}, nil
}

func (d *mockDAL) EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

i, err := d.indexForJob(job)
if err != nil {
return dal.CronJob{}, err
}
internalJob := d.jobs[i]
if internalJob.State != dal.JobStateExecuting {
return dal.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running")
}
if internalJob.StartTime != job.StartTime {
return dal.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match")
}
internalJob.State = dal.JobStateIdle
internalJob.NextExecution = next
d.jobs[i] = internalJob
return internalJob, nil
}

func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error) {
d.lock.Lock()
defer d.lock.Unlock()

return slices.Filter(d.jobs, func(job dal.CronJob) bool {
return (*d.clock).Now().After(job.StartTime.Add(duration))
}), nil
}

type mockScheduler struct {
}

func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) {
// do nothing
}

func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) {
// do nothing
}

type mockExecutor struct {
verbCallCount map[string]int
lock sync.Mutex
clock *clock.Mock
}

func (e *mockExecutor) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(req.Msg.Verb)

e.lock.Lock()
e.verbCallCount[verbRef.Name]++
e.lock.Unlock()

return &connect.Response[ftlv1.CallResponse]{}, nil
}

type controller struct {
key model.ControllerKey
DAL DAL
cronJobs *Service
}

func TestService(t *testing.T) {
t.Parallel()
ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

// var singletonCount atomic.Int64
// var multiCount atomic.Int64

config := Config{Timeout: time.Minute * 5}
clock := clock.NewMock()
mockDal := &mockDAL{
clock: clock,
lock: sync.Mutex{},
attemptCountMap: map[string]int{},
}
scheduler := &mockScheduler{}
executor := &mockExecutor{
verbCallCount: map[string]int{},
lock: sync.Mutex{},
clock: clock,
}

// initial jobs
for i := range 20 {
deploymentKey := model.NewDeploymentKey("initial")
now := clock.Now()
cronStr := "*/10 * * * * * *"
pattern, err := cron.Parse(cronStr)
assert.NoError(t, err)
next, err := cron.NextAfter(pattern, now, false)
assert.NoError(t, err)
_, err = mockDal.CreateCronJob(ctx, deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next)
assert.NoError(t, err)
}

controllers := []*controller{}
for i := range 5 {
key := model.NewControllerKey("localhost", strconv.Itoa(8080+i))
controller := &controller{
key: key,
DAL: mockDal,
cronJobs: NewForTesting(ctx, key, config, mockDal, scheduler, executor, clock),
}
controllers = append(controllers, controller)
}

time.Sleep(time.Millisecond * 100)

for _, c := range controllers {
go func() {
c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller {
return dal.Controller{
Key: ctrl.key,
}
}))
_, _ = c.cronJobs.resetJobs(ctx)
}()
}

clock.Add(time.Second * 5)
time.Sleep(time.Millisecond * 100)
for range 3 {
clock.Add(time.Second * 10)
time.Sleep(time.Millisecond * 100)
}

for _, j := range mockDal.jobs {
count := executor.verbCallCount[j.Verb]
assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb)
}
}
3 changes: 2 additions & 1 deletion backend/controller/cronjobs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool {
}

func (s *State) reset(jobs []dal.CronJob) {
s.jobs = jobs
s.jobs = make([]dal.CronJob, len(jobs))
copy(s.jobs, jobs)
for _, job := range s.jobs {
if job.State != dal.JobStateExecuting {
delete(s.executing, identifierForJob(job))
Expand Down
Loading

0 comments on commit f479fd1

Please sign in to comment.