From 6a0b46be51a1a310a7cc6936f3cea6151efb8563 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 16:02:37 +1000 Subject: [PATCH 01/17] feat: cron job service --- backend/controller/controller.go | 48 +- backend/controller/cronjobs/cronjobs.go | 472 ++++++++++++++++++ backend/controller/cronjobs/cronjobs_test.go | 220 ++++++++ backend/controller/cronjobs/state.go | 82 +++ .../controller/scheduledtask/scheduledtask.go | 59 +-- .../scheduledtask/scheduledtask_test.go | 9 +- backend/controller/sql/querier.go | 1 - backend/controller/sql/queries.sql | 4 - backend/controller/sql/queries.sql.go | 10 - backend/schema/normalise.go | 6 +- backend/schema/validate.go | 6 + backend/schema/validate_test.go | 15 + 12 files changed, 859 insertions(+), 73 deletions(-) create mode 100644 backend/controller/cronjobs/cronjobs.go create mode 100644 backend/controller/cronjobs/cronjobs_test.go create mode 100644 backend/controller/cronjobs/state.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 563750bed4..57be46cf39 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/TBD54566975/ftl/backend/controller/cronjobs" "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/scaling" @@ -49,10 +50,11 @@ import ( // CommonConfig between the production controller and development server. type CommonConfig struct { - AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"` - NoConsole bool `help:"Disable the console."` - IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"` - WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"` + AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"` + NoConsole bool `help:"Disable the console."` + IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"` + WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"` + CronJobTimeout time.Duration `help:"Timeout for cron jobs." default:"5m"` } type Config struct { @@ -138,12 +140,18 @@ type clients struct { runner ftlv1connect.RunnerServiceClient } +type ControllerListListener interface { + UpdatedControllerList(ctx context.Context, controllers []dal.Controller) +} + type Service struct { dal *dal.DAL key model.ControllerKey deploymentLogsSink *deploymentLogsSink - tasks *scheduledtask.Scheduler + tasks *scheduledtask.Scheduler + cronJobs *cronjobs.Service + controllerListListeners []ControllerListListener // Map from endpoint to client. clients *ttlcache.Cache[string, clients] @@ -163,7 +171,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. } config.SetDefaults() svc := &Service{ - tasks: scheduledtask.New(ctx, key, db), + tasks: scheduledtask.New(ctx, key), dal: db, key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), @@ -174,8 +182,14 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. increaseReplicaFailures: map[string]int{}, } + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) + svc.cronJobs = cronSvc + svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) + _, _ = svc.updateControllersList(ctx) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes) svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController) + svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, svc.updateControllersList) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 10}, svc.reapStaleRunners) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 20}, svc.releaseExpiredReservations) svc.tasks.Singleton(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.reconcileDeployments) @@ -422,6 +436,9 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re return nil, fmt.Errorf("could not replace deployment: %w", err) } } + + s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey) + return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil } @@ -732,11 +749,18 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl } ingressRoutes := extractIngressRoutingEntries(req.Msg) - dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, nil) + cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema) + if err != nil { + logger.Errorf(err, "Could not generate cron jobs for new deployment") + return nil, fmt.Errorf("%s: %w", "could not generate cron jobs for new deployment", err) + } + + dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs) if err != nil { logger.Errorf(err, "Could not create deployment") return nil, fmt.Errorf("could not create deployment: %w", err) } + deploymentLogger := s.getDeploymentLogger(ctx, dkey) deploymentLogger.Debugf("Created deployment %s", dkey) return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil @@ -999,7 +1023,17 @@ func (s *Service) heartbeatController(ctx context.Context) (time.Duration, error return 0, fmt.Errorf("failed to heartbeat controller: %w", err) } return time.Second * 3, nil +} +func (s *Service) updateControllersList(ctx context.Context) (time.Duration, error) { + controllers, err := s.dal.GetControllers(ctx, false) + if err != nil { + return 0, err + } + for _, listener := range s.controllerListListeners { + listener.UpdatedControllerList(ctx, controllers) + } + return time.Second * 5, nil } func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(response *ftlv1.PullSchemaResponse) error) error { diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go new file mode 100644 index 0000000000..2ecf7df1c3 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs.go @@ -0,0 +1,472 @@ +package cronjobs + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/atomic" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/pubsub" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" + "github.com/serialx/hashring" + sl "golang.org/x/exp/slices" +) + +const ( + controllersPerJob = 2 + jobResetInterval = time.Minute + newJobHashRingOverrideInterval = time.Minute + time.Second*20 +) + +type Config struct { + Timeout time.Duration +} + +type jobChangeType int + +const ( + resetJobs jobChangeType = iota + finishedJobs + updatedHashring +) + +type jobChange struct { + changeType jobChangeType + jobs []dal.CronJob + addedDeploymentKey optional.Option[model.DeploymentKey] +} + +type hashRingState struct { + hashRing *hashring.HashRing + controllers []dal.Controller + idx int +} + +type DAL interface { + GetCronJobs(ctx context.Context) ([]dal.CronJob, error) + StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) + EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error) + GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error) +} + +type Scheduler interface { + Singleton(retry backoff.Backoff, job scheduledtask.Job) + Parallel(retry backoff.Backoff, job scheduledtask.Job) +} + +type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) + +type Service struct { + config Config + key model.ControllerKey + originURL *url.URL + + dal DAL + scheduler Scheduler + call ExecuteCallFunc + + clock clock.Clock + jobChanges *pubsub.Topic[jobChange] + + hashRingState atomic.Value[*hashRingState] +} + +func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service { + return NewForTesting(ctx, key, originURL, config, dal, scheduler, call, clock.New()) +} + +func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { + svc := &Service{ + config: config, + key: key, + originURL: originURL, + dal: dal, + scheduler: scheduler, + call: call, + clock: clock, + jobChanges: pubsub.New[jobChange](), + } + svc.UpdatedControllerList(ctx, nil) + + svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: jobResetInterval}, svc.resetJobs) + svc.scheduler.Singleton(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.killOldJobs) + + go svc.watchForUpdates(ctx) + + return svc +} + +func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) (jobs []dal.CronJob, err error) { + logger := log.FromContext(ctx) + + start := s.clock.Now().UTC() + newJobs := []dal.CronJob{} + for _, decl := range module.Decls { + if verb, ok := decl.Value.(*schemapb.Decl_Verb); ok { + for _, metadata := range verb.Verb.Metadata { + if cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob); ok { + cronStr := cronMetadata.CronJob.Cron + schedule, err := cron.Parse(cronStr) + if err != nil { + logger.Errorf(err, "failed to parse cron schedule %q", cronStr) + continue + } + next, err := cron.NextAfter(schedule, start, false) + if err != nil { + logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", module.Name, verb.Verb.Name, schedule) + continue + } + newJobs = append(newJobs, dal.CronJob{ + Key: model.NewCronJobKey(module.Name, verb.Verb.Name), + Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, + Schedule: cronStr, + StartTime: start, + NextExecution: next, + State: dal.JobStateIdle, + // DeploymentKey: Filled in by DAL + }) + } + } + } + } + return newJobs, nil +} + +func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { + // Rather than finding old/new cronjobs and updating our state, we can just reset the list of jobs + _ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) +} + +func (s *Service) resetJobs(ctx context.Context) (time.Duration, error) { + err := s.resetJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) + if err != nil { + return 0, err + } + return jobResetInterval, nil +} + +// resetJobsWithNewDeploymentKey resets the list of jobs and marks the deployment key as added so that it can overrule the hash ring for a short time. +func (s *Service) resetJobsWithNewDeploymentKey(ctx context.Context, deploymentKey optional.Option[model.DeploymentKey]) error { + logger := log.FromContext(ctx) + + jobs, err := s.dal.GetCronJobs(ctx) + if err != nil { + logger.Errorf(err, "failed to get cron jobs") + return fmt.Errorf("%s: %w", "failed to get cron jobs", err) + } + s.jobChanges.Publish(jobChange{ + changeType: resetJobs, + jobs: jobs, + addedDeploymentKey: deploymentKey, + }) + return nil +} + +func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { + logger := log.FromContext(ctx) + requestBody := map[string]any{} + requestJSON, err := json.Marshal(requestBody) + if err != nil { + logger.Errorf(err, "could not build cron job body %v:%v", job.DeploymentKey, job.Ref.String()) + return + } + + req := connect.NewRequest(&ftlv1.CallRequest{ + Verb: &schemapb.Ref{Module: job.Ref.Module, Name: job.Ref.Name}, + Body: requestJSON, + }) + + requestKey := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Ref.Module, job.Ref.Name)) + + callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) + defer cancel() + _, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host) + if err != nil { + logger.Errorf(err, "failed to execute cron job %s", job.Ref.String()) + } + + schedule, err := cron.Parse(job.Schedule) + if err != nil { + logger.Errorf(err, "failed to parse cron schedule %q", job.Schedule) + return + } + next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false) + if err != nil { + logger.Errorf(err, "failed to calculate next execution for cron job %s with schedule %q", job.Ref.String(), job.Schedule) + } + + updatedJob, err := s.dal.EndCronJob(ctx, job, next) + if err != nil { + logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + } else { + s.jobChanges.Publish(jobChange{ + changeType: finishedJobs, + jobs: []dal.CronJob{updatedJob}, + }) + } +} + +func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { + logger := log.FromContext(ctx) + staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) + if err != nil { + return 0, err + } + + updatedJobs := []dal.CronJob{} + for _, stale := range staleJobs { + start := s.clock.Now().UTC() + pattern, err := cron.Parse(stale.Schedule) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %s because schedule could not be parsed: %q", stale.Ref.String(), stale.Schedule) + continue + } + next, err := cron.NextAfter(pattern, start, false) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %s because next date could not be calculated: %q", stale.Ref.String(), stale.Schedule) + continue + } + + updated, err := s.dal.EndCronJob(ctx, stale, next) + if err != nil { + logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Ref.String(), err) + continue + } + logger.Warnf("Killed stale cron job %s", stale.Ref.String()) + updatedJobs = append(updatedJobs, updated) + } + + s.jobChanges.Publish(jobChange{ + changeType: finishedJobs, + jobs: updatedJobs, + }) + + return time.Minute, nil +} + +// watchForUpdates is the centralized place that handles: +// - list of known jobs and their state +// - executing jobs when they are due +// - reacting to events that change the list of jobs, deployments or hash ring +func (s *Service) watchForUpdates(ctx context.Context) { + logger := log.FromContext(ctx) + + jobChanges := make(chan jobChange, 128) + s.jobChanges.Subscribe(jobChanges) + defer s.jobChanges.Unsubscribe(jobChanges) + + state := &State{ + executing: map[string]bool{}, + newJobs: map[string]time.Time{}, + blockedUntil: s.clock.Now(), + } + + for { + sl.SortFunc(state.jobs, func(i, j dal.CronJob) int { + return s.sortJobs(state, i, j) + }) + + now := s.clock.Now() + next := now.Add(time.Hour) // should never be reached, expect a different signal long beforehand + for _, j := range state.jobs { + if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil { + next = possibleNext + break + } + } + + if next.Before(state.blockedUntil) { + next = state.blockedUntil + logger.Tracef("loop blocked for %vs", next.Sub(now)) + } else if next.Sub(now) < time.Second { + next = now.Add(time.Second) + logger.Tracef("loop while gated for 1s") + } else if next.Sub(now) > time.Minute*59 { + logger.Tracef("loop while idling") + } else { + logger.Tracef("loop with next %v, %d jobs", next.Sub(now), len(state.jobs)) + } + + select { + case <-ctx.Done(): + return + case <-s.clock.After(next.Sub(now)): + // Try starting jobs in db + jobsToAttempt := slices.Filter(state.jobs, func(j dal.CronJob) bool { + if n, err := s.nextAttemptForJob(j, state, true); err == nil { + return !n.After(s.clock.Now().UTC()) + } + return false + }) + jobResults, err := s.dal.StartCronJobs(ctx, jobsToAttempt) + if err != nil { + logger.Errorf(err, "failed to start cron jobs in db") + state.blockedUntil = s.clock.Now().Add(time.Second * 5) + continue + } + + // Start jobs that were successfully updated + updatedJobs := []dal.CronJob{} + removedDeploymentKeys := map[string]model.DeploymentKey{} + + for _, job := range jobResults { + updatedJobs = append(updatedJobs, job.CronJob) + if !job.DidStartExecution { + continue + } + if !job.HasMinReplicas { + // We successfully updated the db to start this job but the deployment has min replicas set to 0 + // We need to update the db to end this job + removedDeploymentKeys[job.DeploymentKey.String()] = job.DeploymentKey + _, err := s.dal.EndCronJob(ctx, job.CronJob, next) + if err != nil { + logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + } + continue + } + logger.Infof("executing job %s", job.Ref.String()) + state.startedExecutingJob(job.CronJob) + go s.executeJob(ctx, job.CronJob) + } + + // Update job list + state.updateJobs(updatedJobs) + for _, key := range removedDeploymentKeys { + state.removeDeploymentKey(key) + } + case event := <-jobChanges: + switch event.changeType { + case resetJobs: + logger.Tracef("resetting job list: %d jobs", len(event.jobs)) + state.reset(event.jobs, event.addedDeploymentKey) + case finishedJobs: + logger.Tracef("updating %d jobs", len(event.jobs)) + state.updateJobs(event.jobs) + case updatedHashring: + // do another cycle through the loop to see if new jobs need to be scheduled + } + } + } +} + +func (s *Service) sortJobs(state *State, i, j dal.CronJob) int { + iNext, err := s.nextAttemptForJob(i, state, false) + if err != nil { + return 1 + } + jNext, err := s.nextAttemptForJob(j, state, false) + if err != nil { + return -1 + } + return iNext.Compare(jNext) +} + +func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow bool) (time.Time, error) { + if !s.isResponsibleForJob(job, state) { + return s.clock.Now(), fmt.Errorf("controller is not responsible for job") + } + if job.State == dal.JobStateExecuting { + if state.isExecutingInCurrentController(job) { + // return a time in the future, meaning don't schedule at this time + return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish") + } + // We don't know when the other controller will finish this job + // We should check again when the next execution date is assuming the job finishes + pattern, err := cron.Parse(job.Schedule) + if err != nil { + return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule) + } + next, err := cron.NextAfter(pattern, s.clock.Now().UTC(), allowsNow) + if err == nil { + return next, nil + } + } + return job.NextExecution, nil +} + +// UpdatedControllerList synchronises the hash ring with the active controllers. +func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { + logger := log.FromContext(ctx).Scope("cron") + controllerIdx := -1 + for idx, controller := range controllers { + if controller.Key.String() == s.key.String() { + controllerIdx = idx + break + } + } + if controllerIdx == -1 { + logger.Tracef("controller %q not found in list of controllers", s.key) + } + + oldState := s.hashRingState.Load() + if oldState != nil && len(oldState.controllers) == len(controllers) { + hasChanged := false + for idx, new := range controllers { + old := oldState.controllers[idx] + if new.Key.String() != old.Key.String() { + hasChanged = true + break + } + } + if !hasChanged { + return + } + } + + hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) + s.hashRingState.Store(&hashRingState{ + hashRing: hashRing, + controllers: controllers, + idx: controllerIdx, + }) + + s.jobChanges.Publish(jobChange{ + changeType: updatedHashring, + }) +} + +func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool { + if state.isJobTooNewForHashRing(job) { + return true + } + hashringState := s.hashRingState.Load() + if hashringState == nil { + return true + } + + initialKey, ok := hashringState.hashRing.GetNode(job.Key.String()) + if !ok { + return true + } + + initialIdx := -1 + for idx, controller := range hashringState.controllers { + if controller.Key.String() == initialKey { + initialIdx = idx + break + } + } + if initialIdx == -1 { + return true + } + + if initialIdx+controllersPerJob > len(hashringState.controllers) { + // wraps around + return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerJob)-len(hashringState.controllers) + } + return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerJob +} diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go new file mode 100644 index 0000000000..424bcdf542 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -0,0 +1,220 @@ +package cronjobs + +import ( + "context" + "fmt" + "net/url" + "strconv" + "sync" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" +) + +type mockDAL struct { + lock sync.Mutex + clock *clock.Mock + jobs []dal.CronJob + attemptCountMap map[string]int +} + +func (d *mockDAL) GetCronJobs(ctx context.Context) ([]dal.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return d.jobs, nil +} + +func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string, verb string, schedule string, startTime time.Time, nextExecution time.Time) { + d.lock.Lock() + defer d.lock.Unlock() + + job := dal.CronJob{ + Key: model.NewCronJobKey(module, verb), + DeploymentKey: deploymentKey, + Ref: schema.Ref{Module: module, Name: verb}, + Schedule: schedule, + StartTime: startTime, + NextExecution: nextExecution, + State: dal.JobStateIdle, + } + d.jobs = append(d.jobs, job) +} + +func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) { + for i, j := range d.jobs { + if j.DeploymentKey.String() == job.DeploymentKey.String() && j.Ref.Name == job.Ref.Name { + return i, nil + } + } + return -1, fmt.Errorf("job not found") +} + +func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) { + d.lock.Lock() + defer d.lock.Unlock() + + attemptedJobs = []dal.AttemptedCronJob{} + now := (*d.clock).Now() + + for _, inputJob := range jobs { + i, err := d.indexForJob(inputJob) + if err != nil { + return nil, err + } + job := d.jobs[i] + if !job.NextExecution.After(now) && job.State == dal.JobStateIdle { + job.State = dal.JobStateExecuting + job.StartTime = (*d.clock).Now() + d.jobs[i] = job + attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ + CronJob: job, + DidStartExecution: true, + HasMinReplicas: true, + }) + } else { + attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ + CronJob: job, + DidStartExecution: false, + HasMinReplicas: true, + }) + } + d.attemptCountMap[job.Ref.String()]++ + } + return attemptedJobs, nil +} + +func (d *mockDAL) EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + i, err := d.indexForJob(job) + if err != nil { + return dal.CronJob{}, err + } + internalJob := d.jobs[i] + if internalJob.State != dal.JobStateExecuting { + return dal.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") + } + if internalJob.StartTime != job.StartTime { + return dal.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") + } + internalJob.State = dal.JobStateIdle + internalJob.NextExecution = next + d.jobs[i] = internalJob + return internalJob, nil +} + +func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error) { + d.lock.Lock() + defer d.lock.Unlock() + + return slices.Filter(d.jobs, func(job dal.CronJob) bool { + return (*d.clock).Now().After(job.StartTime.Add(duration)) + }), nil +} + +type mockScheduler struct { +} + +func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +type controller struct { + key model.ControllerKey + DAL DAL + cronJobs *Service +} + +func TestService(t *testing.T) { + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + config := Config{Timeout: time.Minute * 5} + clock := clock.NewMock() + mockDal := &mockDAL{ + clock: clock, + lock: sync.Mutex{}, + attemptCountMap: map[string]int{}, + } + scheduler := &mockScheduler{} + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + // initial jobs + for i := range 20 { + deploymentKey := model.NewDeploymentKey("initial") + now := clock.Now() + cronStr := "*/10 * * * * * *" + pattern, err := cron.Parse(cronStr) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next) + } + + controllers := []*controller{} + for i := range 5 { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + controller := &controller{ + key: key, + DAL: mockDal, + cronJobs: NewForTesting(ctx, key, &url.URL{Host: "test.com"}, config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + go func() { + c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.key, + } + })) + _, _ = c.cronJobs.resetJobs(ctx) + }() + } + + clock.Add(time.Second * 5) + time.Sleep(time.Millisecond * 100) + for range 3 { + clock.Add(time.Second * 10) + time.Sleep(time.Millisecond * 100) + } + + for _, j := range mockDal.jobs { + count := verbCallCount[j.Ref.Name] + assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name) + } +} diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go new file mode 100644 index 0000000000..080c655360 --- /dev/null +++ b/backend/controller/cronjobs/state.go @@ -0,0 +1,82 @@ +package cronjobs + +import ( + "time" + + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/types/optional" +) + +type State struct { + jobs []dal.CronJob + + // Used to determine if this controller is currently executing a job + executing map[string]bool + + // Newly created jobs should be attempted by the controller that created them until other controllers + // have a chance to reset their job lists and share responsibilities through the hash ring + newJobs map[string]time.Time + + blockedUntil time.Time +} + +func (s *State) isExecutingInCurrentController(job dal.CronJob) bool { + return s.executing[job.Key.String()] +} + +func (s *State) startedExecutingJob(job dal.CronJob) { + s.executing[job.Key.String()] = true +} + +func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool { + if t, ok := s.newJobs[job.Key.String()]; ok { + if time.Since(t) < newJobHashRingOverrideInterval { + return true + } + delete(s.newJobs, job.Key.String()) + } + return false +} + +func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { + s.jobs = make([]dal.CronJob, len(jobs)) + copy(s.jobs, jobs) + for _, job := range s.jobs { + if job.State != dal.JobStateExecuting { + delete(s.executing, job.Key.String()) + } + if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() { + // This job is new and should be attempted by the current controller + s.newJobs[job.Key.String()] = time.Now() + } + } +} + +func (s *State) updateJobs(jobs []dal.CronJob) { + updatedJobMap := jobMap(jobs) + for idx, old := range s.jobs { + if updated, exists := updatedJobMap[old.Key.String()]; exists { + //TODO: compare to see if outdated + s.jobs[idx] = updated + if updated.State != dal.JobStateExecuting { + delete(s.executing, updated.Key.String()) + } + } + } +} + +func (s *State) removeDeploymentKey(key model.DeploymentKey) { + s.jobs = slices.Filter(s.jobs, func(j dal.CronJob) bool { + return j.DeploymentKey.String() != key.String() + }) +} + +func jobMap(jobs []dal.CronJob) map[string]dal.CronJob { + m := map[string]dal.CronJob{} + for _, job := range jobs { + m[job.Key.String()] = job + } + return m +} diff --git a/backend/controller/scheduledtask/scheduledtask.go b/backend/controller/scheduledtask/scheduledtask.go index e770197ba4..e3f3a049be 100644 --- a/backend/controller/scheduledtask/scheduledtask.go +++ b/backend/controller/scheduledtask/scheduledtask.go @@ -35,16 +35,8 @@ type descriptor struct { // run. type Job func(ctx context.Context) (time.Duration, error) -type DAL interface { - GetControllers(ctx context.Context, all bool) ([]dal.Controller, error) -} - type DALFunc func(ctx context.Context, all bool) ([]dal.Controller, error) -func (f DALFunc) GetControllers(ctx context.Context, all bool) ([]dal.Controller, error) { - return f(ctx, all) -} - // Scheduler is a task scheduler for the controller. // // Each job runs in its own goroutine. @@ -54,28 +46,25 @@ func (f DALFunc) GetControllers(ctx context.Context, all bool) ([]dal.Controller // as the hash ring is only updated periodically and controllers may have // inconsistent views of the hash ring. type Scheduler struct { - controller DAL - key model.ControllerKey - jobs chan *descriptor - clock clock.Clock + key model.ControllerKey + jobs chan *descriptor + clock clock.Clock hashring atomic.Value[*hashring.HashRing] } // New creates a new [Scheduler]. -func New(ctx context.Context, id model.ControllerKey, controller DAL) *Scheduler { - return NewForTesting(ctx, id, controller, clock.New()) +func New(ctx context.Context, id model.ControllerKey) *Scheduler { + return NewForTesting(ctx, id, clock.New()) } -func NewForTesting(ctx context.Context, id model.ControllerKey, controller DAL, clock clock.Clock) *Scheduler { +func NewForTesting(ctx context.Context, id model.ControllerKey, clock clock.Clock) *Scheduler { s := &Scheduler{ - controller: controller, - key: id, - jobs: make(chan *descriptor), - clock: clock, + key: id, + jobs: make(chan *descriptor), + clock: clock, } - _ = s.updateHashring(ctx) - go s.syncHashRing(ctx) + s.UpdatedControllerList(ctx, nil) go s.run(ctx) return s } @@ -107,7 +96,7 @@ func (s *Scheduler) schedule(retry backoff.Backoff, job Job, singlyHomed bool) { } func (s *Scheduler) run(ctx context.Context) { - logger := log.FromContext(ctx).Scope("cron") + logger := log.FromContext(ctx).Scope("scheduler") // List of jobs to run. // For singleton jobs running on a different host, this can include jobs // scheduled in the past. These are skipped on each run. @@ -147,7 +136,7 @@ func (s *Scheduler) run(ctx context.Context) { } } jobs[i] = nil // Zero out scheduled jobs. - logger.Scope(job.name).Tracef("Running cron job") + logger.Scope(job.name).Tracef("Running scheduled task") go func() { if delay, err := job.job(ctx); err != nil { logger.Scope(job.name).Warnf("%s", err) @@ -168,28 +157,8 @@ func (s *Scheduler) run(ctx context.Context) { } } -// Synchronise the hash ring with the active controllers. -func (s *Scheduler) syncHashRing(ctx context.Context) { - logger := log.FromContext(ctx).Scope("cron") - for { - select { - case <-ctx.Done(): - return - - case <-s.clock.After(time.Second * 5): - if err := s.updateHashring(ctx); err != nil { - logger.Warnf("Failed to get controllers: %s", err) - } - } - } -} - -func (s *Scheduler) updateHashring(ctx context.Context) error { - controllers, err := s.controller.GetControllers(ctx, false) - if err != nil { - return err - } +// UpdatedControllerList synchronises the hash ring with the active controllers. +func (s *Scheduler) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { hashring := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) s.hashring.Store(hashring) - return nil } diff --git a/backend/controller/scheduledtask/scheduledtask_test.go b/backend/controller/scheduledtask/scheduledtask_test.go index 6f759a68fb..a9c36a953e 100644 --- a/backend/controller/scheduledtask/scheduledtask_test.go +++ b/backend/controller/scheduledtask/scheduledtask_test.go @@ -40,9 +40,7 @@ func TestCron(t *testing.T) { clock := clock.NewMock() for _, c := range controllers { - c.cron = NewForTesting(ctx, c.controller.Key, DALFunc(func(ctx context.Context, all bool) ([]dal.Controller, error) { - return slices.Map(controllers, func(c *controller) dal.Controller { return c.controller }), nil - }), clock) + c.cron = NewForTesting(ctx, c.controller.Key, clock) c.cron.Singleton(backoff.Backoff{}, func(ctx context.Context) (time.Duration, error) { singletonCount.Add(1) return time.Second, nil @@ -51,6 +49,11 @@ func TestCron(t *testing.T) { multiCount.Add(1) return time.Second, nil }) + c.cron.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.controller.Key, + } + })) } clock.Add(time.Second * 6) diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 85256f2dd0..eac52ac819 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -17,7 +17,6 @@ type Querier interface { // Create a new artefact and return the artefact ID. CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error - CreateCronRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index eb7e0c2421..9d728deedd 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -409,10 +409,6 @@ VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment INSERT INTO requests (origin, "key", source_addr) VALUES ($1, $2, $3); --- name: CreateCronRequest :exec -INSERT INTO requests (origin, "key", source_addr) -VALUES ($1, $2, $3); - -- name: UpsertController :one INSERT INTO controller (key, endpoint) VALUES ($1, $2) diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 4bbce650bf..728ca631b2 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -86,16 +86,6 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er return err } -const createCronRequest = `-- name: CreateCronRequest :exec -INSERT INTO requests (origin, "key", source_addr) -VALUES ($1, $2, $3) -` - -func (q *Queries) CreateCronRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error { - _, err := q.db.Exec(ctx, createCronRequest, origin, key, sourceAddr) - return err -} - const createDeployment = `-- name: CreateDeployment :exec INSERT INTO deployments (module_id, "schema", "key") VALUES ((SELECT id FROM modules WHERE name = $1::TEXT LIMIT 1), $2::BYTEA, $3::deployment_key) diff --git a/backend/schema/normalise.go b/backend/schema/normalise.go index 1676e407ac..78cb0d2d19 100644 --- a/backend/schema/normalise.go +++ b/backend/schema/normalise.go @@ -111,6 +111,9 @@ func Normalise[T Node](n T) T { c.Pos = zero c.Path = normaliseSlice(c.Path) + case *MetadataCronJob: + c.Pos = zero + case *MetadataAlias: c.Pos = zero @@ -123,9 +126,6 @@ func Normalise[T Node](n T) T { case *IngressPathParameter: c.Pos = zero - case *MetadataCronJob: - c.Pos = zero - case *Config: c.Pos = zero c.Type = Normalise(c.Type) diff --git a/backend/schema/validate.go b/backend/schema/validate.go index 61cabfddd1..272047043f 100644 --- a/backend/schema/validate.go +++ b/backend/schema/validate.go @@ -480,6 +480,12 @@ func validateVerbMetadata(scopes Scopes, n *Verb) (merr []error) { if err != nil { merr = append(merr, err) } + if _, ok := n.Request.(*Unit); !ok { + merr = append(merr, errorf(md, "verb %s: cronjob can not have a request type", n.Name)) + } + if _, ok := n.Response.(*Unit); !ok { + merr = append(merr, errorf(md, "verb %s: cronjob can not have a response type", n.Name)) + } case *MetadataCalls, *MetadataDatabases, *MetadataAlias: } } diff --git a/backend/schema/validate_test.go b/backend/schema/validate_test.go index 92d8cd8377..c4788b1560 100644 --- a/backend/schema/validate_test.go +++ b/backend/schema/validate_test.go @@ -171,6 +171,21 @@ func TestValidate(t *testing.T) { "6:10-10: verb can not have multiple instances of ingress", }, }, + + {name: "CronOnNonEmptyVerb", + schema: ` + module one { + verb verbWithWrongInput(Empty) Unit + +cron * * * * * * * + verb verbWithWrongOutput(Unit) Empty + +cron * * * * * * * + } + `, + errs: []string{ + "4:7-7: verb verbWithWrongInput: cronjob can not have a request type", + "6:7-7: verb verbWithWrongOutput: cronjob can not have a response type", + }, + }, } for _, test := range tests { From f1b31a0d1b98103ba95386b58dcfae63ac71fcbe Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 16:20:40 +1000 Subject: [PATCH 02/17] update comments --- backend/controller/cronjobs/cronjobs.go | 7 ++++++- backend/controller/cronjobs/state.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 2ecf7df1c3..08710b8cc5 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -151,6 +151,7 @@ func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentK _ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) } +// resetJobs is run periodically via a scheduled task func (s *Service) resetJobs(ctx context.Context) (time.Duration, error) { err := s.resetJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) if err != nil { @@ -220,6 +221,8 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { } } +// killOldJobs looks for jobs that have been executing for too long +// This is the hard timout which happens after the usual timeout plus a grace period for the soft timeout to occur (context's timeout which cancel the call) func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) @@ -259,7 +262,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { } // watchForUpdates is the centralized place that handles: -// - list of known jobs and their state +// - the list of known jobs and their state // - executing jobs when they are due // - reacting to events that change the list of jobs, deployments or hash ring func (s *Service) watchForUpdates(ctx context.Context) { @@ -439,6 +442,8 @@ func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.C }) } +// isResponsibleForJob indicates whether a this service should be responsible for attempting jobs, +// or if enough other controllers will handle it. This allows us to spread the job load across controllers. func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool { if state.isJobTooNewForHashRing(job) { return true diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go index 080c655360..757486a436 100644 --- a/backend/controller/cronjobs/state.go +++ b/backend/controller/cronjobs/state.go @@ -19,6 +19,7 @@ type State struct { // have a chance to reset their job lists and share responsibilities through the hash ring newJobs map[string]time.Time + // We delay any job attempts in case of db errors to avoid hammering the db in a tight loop blockedUntil time.Time } @@ -58,7 +59,6 @@ func (s *State) updateJobs(jobs []dal.CronJob) { updatedJobMap := jobMap(jobs) for idx, old := range s.jobs { if updated, exists := updatedJobMap[old.Key.String()]; exists { - //TODO: compare to see if outdated s.jobs[idx] = updated if updated.State != dal.JobStateExecuting { delete(s.executing, updated.Key.String()) From 8d07e5b77dd0860008239f05af9ab7b61ec44aae Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 16:49:25 +1000 Subject: [PATCH 03/17] update comments and test --- backend/controller/cronjobs/cronjobs.go | 44 +++++++++++--------- backend/controller/cronjobs/cronjobs_test.go | 9 +++- backend/schema/normalise.go | 6 +-- backend/schema/validate.go | 4 +- backend/schema/validate_test.go | 4 +- 5 files changed, 39 insertions(+), 28 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 08710b8cc5..b9d60df01a 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -147,7 +147,7 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod } func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { - // Rather than finding old/new cronjobs and updating our state, we can just reset the list of jobs + // Rather than finding old/new cron jobs and updating our state, we can just reset the list of jobs _ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) } @@ -182,7 +182,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { requestBody := map[string]any{} requestJSON, err := json.Marshal(requestBody) if err != nil { - logger.Errorf(err, "could not build cron job body %v:%v", job.DeploymentKey, job.Ref.String()) + logger.Errorf(err, "could not build body for cron job: %v", job.Key) return } @@ -197,7 +197,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { defer cancel() _, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host) if err != nil { - logger.Errorf(err, "failed to execute cron job %s", job.Ref.String()) + logger.Errorf(err, "failed to execute cron job %v", job.Key) } schedule, err := cron.Parse(job.Schedule) @@ -207,12 +207,12 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { } next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false) if err != nil { - logger.Errorf(err, "failed to calculate next execution for cron job %s with schedule %q", job.Ref.String(), job.Schedule) + logger.Errorf(err, "failed to calculate next execution for cron job %v with schedule %q", job.Key, job.Schedule) } updatedJob, err := s.dal.EndCronJob(ctx, job, next) if err != nil { - logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + logger.Errorf(err, "failed to end cron job %v", job.Key) } else { s.jobChanges.Publish(jobChange{ changeType: finishedJobs, @@ -221,13 +221,17 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { } } -// killOldJobs looks for jobs that have been executing for too long -// This is the hard timout which happens after the usual timeout plus a grace period for the soft timeout to occur (context's timeout which cancel the call) +// killOldJobs looks for jobs that have been executing for too long. +// A soft timeout should normally occur from the job's context timing out, but there are cases where this does not happen (eg: unresponsive or dead controller) +// In these cases we need a hard timout after an additional grace period. +// To do this, this function resets these job's state to idle and updates the next execution time in the db so the job can be picked up again next time. func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) if err != nil { return 0, err + } else if len(staleJobs) == 0 { + return time.Minute, nil } updatedJobs := []dal.CronJob{} @@ -235,21 +239,21 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { start := s.clock.Now().UTC() pattern, err := cron.Parse(stale.Schedule) if err != nil { - logger.Errorf(err, "Could not kill stale cron job %s because schedule could not be parsed: %q", stale.Ref.String(), stale.Schedule) + logger.Errorf(err, "Could not kill stale cron job %q because schedule could not be parsed: %q", stale.Key, stale.Schedule) continue } next, err := cron.NextAfter(pattern, start, false) if err != nil { - logger.Errorf(err, "Could not kill stale cron job %s because next date could not be calculated: %q", stale.Ref.String(), stale.Schedule) + logger.Errorf(err, "Could not kill stale cron job %q because next date could not be calculated: %q", stale.Key, stale.Schedule) continue } updated, err := s.dal.EndCronJob(ctx, stale, next) if err != nil { - logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Ref.String(), err) + logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Key, err) continue } - logger.Warnf("Killed stale cron job %s", stale.Ref.String()) + logger.Warnf("Killed stale cron job %s", stale.Key) updatedJobs = append(updatedJobs, updated) } @@ -265,6 +269,8 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { // - the list of known jobs and their state // - executing jobs when they are due // - reacting to events that change the list of jobs, deployments or hash ring +// +// State is private to this function to ensure thread safety. func (s *Service) watchForUpdates(ctx context.Context) { logger := log.FromContext(ctx) @@ -280,7 +286,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { for { sl.SortFunc(state.jobs, func(i, j dal.CronJob) int { - return s.sortJobs(state, i, j) + return s.compareJobs(state, i, j) }) now := s.clock.Now() @@ -294,7 +300,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { if next.Before(state.blockedUntil) { next = state.blockedUntil - logger.Tracef("loop blocked for %vs", next.Sub(now)) + logger.Tracef("loop blocked for %v", next.Sub(now)) } else if next.Sub(now) < time.Second { next = now.Add(time.Second) logger.Tracef("loop while gated for 1s") @@ -337,11 +343,11 @@ func (s *Service) watchForUpdates(ctx context.Context) { removedDeploymentKeys[job.DeploymentKey.String()] = job.DeploymentKey _, err := s.dal.EndCronJob(ctx, job.CronJob, next) if err != nil { - logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + logger.Errorf(err, "failed to end cron job %s", job.Key.String()) } continue } - logger.Infof("executing job %s", job.Ref.String()) + logger.Infof("executing job %v", job.Key) state.startedExecutingJob(job.CronJob) go s.executeJob(ctx, job.CronJob) } @@ -366,7 +372,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { } } -func (s *Service) sortJobs(state *State, i, j dal.CronJob) int { +func (s *Service) compareJobs(state *State, i, j dal.CronJob) int { iNext, err := s.nextAttemptForJob(i, state, false) if err != nil { return 1 @@ -384,11 +390,11 @@ func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow boo } if job.State == dal.JobStateExecuting { if state.isExecutingInCurrentController(job) { - // return a time in the future, meaning don't schedule at this time + // no need to schedule this job until it finishes return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish") } - // We don't know when the other controller will finish this job - // We should check again when the next execution date is assuming the job finishes + // We don't know when the other controller that is executing this job will finish it + // So we should optimistically attempt it when the next execution date is due assuming the job finishes pattern, err := cron.Parse(job.Schedule) if err != nil { return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule) diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 424bcdf542..3e1c1b35a1 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -56,7 +56,7 @@ func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) { for i, j := range d.jobs { - if j.DeploymentKey.String() == job.DeploymentKey.String() && j.Ref.Name == job.Ref.Name { + if j.Key.String() == job.Key.String() { return i, nil } } @@ -92,7 +92,7 @@ func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemp HasMinReplicas: true, }) } - d.attemptCountMap[job.Ref.String()]++ + d.attemptCountMap[job.Key.String()]++ } return attemptedJobs, nil } @@ -216,5 +216,10 @@ func TestService(t *testing.T) { for _, j := range mockDal.jobs { count := verbCallCount[j.Ref.Name] assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name) + + // Make sure each job is not attempted by all controllers, or the responsibility of only one controller + // Target is for 2 controllers to attempt each job + attemptCount := mockDal.attemptCountMap[j.Key.String()] + assert.True(t, attemptCount > 1*count && attemptCount <= 3*attemptCount, "job %v was attempted %d times, expected between > 1 and <= 3 to be attempted", j.Key) } } diff --git a/backend/schema/normalise.go b/backend/schema/normalise.go index 78cb0d2d19..1676e407ac 100644 --- a/backend/schema/normalise.go +++ b/backend/schema/normalise.go @@ -111,9 +111,6 @@ func Normalise[T Node](n T) T { c.Pos = zero c.Path = normaliseSlice(c.Path) - case *MetadataCronJob: - c.Pos = zero - case *MetadataAlias: c.Pos = zero @@ -126,6 +123,9 @@ func Normalise[T Node](n T) T { case *IngressPathParameter: c.Pos = zero + case *MetadataCronJob: + c.Pos = zero + case *Config: c.Pos = zero c.Type = Normalise(c.Type) diff --git a/backend/schema/validate.go b/backend/schema/validate.go index 272047043f..45af0cecae 100644 --- a/backend/schema/validate.go +++ b/backend/schema/validate.go @@ -481,10 +481,10 @@ func validateVerbMetadata(scopes Scopes, n *Verb) (merr []error) { merr = append(merr, err) } if _, ok := n.Request.(*Unit); !ok { - merr = append(merr, errorf(md, "verb %s: cronjob can not have a request type", n.Name)) + merr = append(merr, errorf(md, "verb %s: cron job can not have a request type", n.Name)) } if _, ok := n.Response.(*Unit); !ok { - merr = append(merr, errorf(md, "verb %s: cronjob can not have a response type", n.Name)) + merr = append(merr, errorf(md, "verb %s: cron job can not have a response type", n.Name)) } case *MetadataCalls, *MetadataDatabases, *MetadataAlias: } diff --git a/backend/schema/validate_test.go b/backend/schema/validate_test.go index c4788b1560..a159bf5209 100644 --- a/backend/schema/validate_test.go +++ b/backend/schema/validate_test.go @@ -182,8 +182,8 @@ func TestValidate(t *testing.T) { } `, errs: []string{ - "4:7-7: verb verbWithWrongInput: cronjob can not have a request type", - "6:7-7: verb verbWithWrongOutput: cronjob can not have a response type", + "4:7-7: verb verbWithWrongInput: cron job can not have a request type", + "6:7-7: verb verbWithWrongOutput: cron job can not have a response type", }, }, } From 25c85be117c2557a0458c059a0bbb7fe055aefb9 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 16:49:25 +1000 Subject: [PATCH 04/17] update string interpolation --- backend/controller/controller.go | 2 +- backend/controller/cronjobs/cronjobs.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 57be46cf39..3a7b91fe57 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -752,7 +752,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema) if err != nil { logger.Errorf(err, "Could not generate cron jobs for new deployment") - return nil, fmt.Errorf("%s: %w", "could not generate cron jobs for new deployment", err) + return nil, fmt.Errorf("could not generate cron jobs for new deployment: %w", err) } dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index b9d60df01a..cd9e6407ee 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -167,7 +167,7 @@ func (s *Service) resetJobsWithNewDeploymentKey(ctx context.Context, deploymentK jobs, err := s.dal.GetCronJobs(ctx) if err != nil { logger.Errorf(err, "failed to get cron jobs") - return fmt.Errorf("%s: %w", "failed to get cron jobs", err) + return fmt.Errorf("failed to get cron jobs: %w", err) } s.jobChanges.Publish(jobChange{ changeType: resetJobs, From 2704ee01a707024443fc1968e0882cfad9cb08ea Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 09:23:54 +1000 Subject: [PATCH 05/17] Return errors when creating cron jobs for a new deployment --- backend/controller/cronjobs/cronjobs.go | 61 ++++++++++++++----------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index cd9e6407ee..b3a110e18e 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -3,6 +3,7 @@ package cronjobs import ( "context" "encoding/json" + "errors" "fmt" "net/url" "time" @@ -110,39 +111,45 @@ func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url. return svc } -func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) (jobs []dal.CronJob, err error) { - logger := log.FromContext(ctx) - +func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]dal.CronJob, error) { start := s.clock.Now().UTC() newJobs := []dal.CronJob{} + merr := []error{} for _, decl := range module.Decls { - if verb, ok := decl.Value.(*schemapb.Decl_Verb); ok { - for _, metadata := range verb.Verb.Metadata { - if cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob); ok { - cronStr := cronMetadata.CronJob.Cron - schedule, err := cron.Parse(cronStr) - if err != nil { - logger.Errorf(err, "failed to parse cron schedule %q", cronStr) - continue - } - next, err := cron.NextAfter(schedule, start, false) - if err != nil { - logger.Errorf(err, "failed to calculate next execution for cron job %v:%v with schedule %q", module.Name, verb.Verb.Name, schedule) - continue - } - newJobs = append(newJobs, dal.CronJob{ - Key: model.NewCronJobKey(module.Name, verb.Verb.Name), - Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, - Schedule: cronStr, - StartTime: start, - NextExecution: next, - State: dal.JobStateIdle, - // DeploymentKey: Filled in by DAL - }) - } + verb, ok := decl.Value.(*schemapb.Decl_Verb) + if !ok { + continue + } + for _, metadata := range verb.Verb.Metadata { + cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob) + if !ok { + continue } + cronStr := cronMetadata.CronJob.Cron + schedule, err := cron.Parse(cronStr) + if err != nil { + merr = append(merr, fmt.Errorf("failed to parse cron schedule %q: %w", cronStr, err)) + continue + } + next, err := cron.NextAfter(schedule, start, false) + if err != nil { + merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Verb.Name, schedule, err)) + continue + } + newJobs = append(newJobs, dal.CronJob{ + Key: model.NewCronJobKey(module.Name, verb.Verb.Name), + Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, + Schedule: cronStr, + StartTime: start, + NextExecution: next, + State: dal.JobStateIdle, + // DeploymentKey: Filled in by DAL + }) } } + if len(merr) > 0 { + return nil, errors.Join(merr...) + } return newJobs, nil } From 72953c0dd0dcc8d5c1264f305499edb32a15a8bc Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 09:28:10 +1000 Subject: [PATCH 06/17] Fix issue handling error when calculating next execution --- backend/controller/cronjobs/cronjobs.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index b3a110e18e..dc99a68e64 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -205,6 +205,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { _, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host) if err != nil { logger.Errorf(err, "failed to execute cron job %v", job.Key) + // Do not return, continue to end the job and schedule the next execution } schedule, err := cron.Parse(job.Schedule) @@ -215,6 +216,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false) if err != nil { logger.Errorf(err, "failed to calculate next execution for cron job %v with schedule %q", job.Key, job.Schedule) + return } updatedJob, err := s.dal.EndCronJob(ctx, job, next) From 01e8fa454fb2cf80d0ba0e67a1254f0f9dec4ab7 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 09:41:15 +1000 Subject: [PATCH 07/17] use sumtype for cronjob events --- backend/controller/cronjobs/cronjobs.go | 80 +++++++++++++------------ 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index dc99a68e64..6b9e0ee06d 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -37,20 +37,29 @@ type Config struct { Timeout time.Duration } -type jobChangeType int - -const ( - resetJobs jobChangeType = iota - finishedJobs - updatedHashring -) +//sumtype:decl +type event interface { + // cronJobEvent is a marker to ensure that all events implement the interface. + cronJobEvent() +} -type jobChange struct { - changeType jobChangeType +type resetEvent struct { jobs []dal.CronJob addedDeploymentKey optional.Option[model.DeploymentKey] } +func (resetEvent) cronJobEvent() {} + +type endedJobsEvent struct { + jobs []dal.CronJob +} + +func (endedJobsEvent) cronJobEvent() {} + +type updatedHashRingEvent struct{} + +func (updatedHashRingEvent) cronJobEvent() {} + type hashRingState struct { hashRing *hashring.HashRing controllers []dal.Controller @@ -80,8 +89,8 @@ type Service struct { scheduler Scheduler call ExecuteCallFunc - clock clock.Clock - jobChanges *pubsub.Topic[jobChange] + clock clock.Clock + events *pubsub.Topic[event] hashRingState atomic.Value[*hashRingState] } @@ -92,14 +101,14 @@ func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, confi func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { svc := &Service{ - config: config, - key: key, - originURL: originURL, - dal: dal, - scheduler: scheduler, - call: call, - clock: clock, - jobChanges: pubsub.New[jobChange](), + config: config, + key: key, + originURL: originURL, + dal: dal, + scheduler: scheduler, + call: call, + clock: clock, + events: pubsub.New[event](), } svc.UpdatedControllerList(ctx, nil) @@ -176,8 +185,7 @@ func (s *Service) resetJobsWithNewDeploymentKey(ctx context.Context, deploymentK logger.Errorf(err, "failed to get cron jobs") return fmt.Errorf("failed to get cron jobs: %w", err) } - s.jobChanges.Publish(jobChange{ - changeType: resetJobs, + s.events.Publish(resetEvent{ jobs: jobs, addedDeploymentKey: deploymentKey, }) @@ -223,9 +231,8 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { if err != nil { logger.Errorf(err, "failed to end cron job %v", job.Key) } else { - s.jobChanges.Publish(jobChange{ - changeType: finishedJobs, - jobs: []dal.CronJob{updatedJob}, + s.events.Publish(endedJobsEvent{ + jobs: []dal.CronJob{updatedJob}, }) } } @@ -266,9 +273,8 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { updatedJobs = append(updatedJobs, updated) } - s.jobChanges.Publish(jobChange{ - changeType: finishedJobs, - jobs: updatedJobs, + s.events.Publish(endedJobsEvent{ + jobs: updatedJobs, }) return time.Minute, nil @@ -283,9 +289,9 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { func (s *Service) watchForUpdates(ctx context.Context) { logger := log.FromContext(ctx) - jobChanges := make(chan jobChange, 128) - s.jobChanges.Subscribe(jobChanges) - defer s.jobChanges.Unsubscribe(jobChanges) + events := make(chan event, 128) + s.events.Subscribe(events) + defer s.events.Unsubscribe(events) state := &State{ executing: map[string]bool{}, @@ -366,15 +372,15 @@ func (s *Service) watchForUpdates(ctx context.Context) { for _, key := range removedDeploymentKeys { state.removeDeploymentKey(key) } - case event := <-jobChanges: - switch event.changeType { - case resetJobs: + case e := <-events: + switch event := e.(type) { + case resetEvent: logger.Tracef("resetting job list: %d jobs", len(event.jobs)) state.reset(event.jobs, event.addedDeploymentKey) - case finishedJobs: + case endedJobsEvent: logger.Tracef("updating %d jobs", len(event.jobs)) state.updateJobs(event.jobs) - case updatedHashring: + case updatedHashRingEvent: // do another cycle through the loop to see if new jobs need to be scheduled } } @@ -452,9 +458,7 @@ func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.C idx: controllerIdx, }) - s.jobChanges.Publish(jobChange{ - changeType: updatedHashring, - }) + s.events.Publish(updatedHashRingEvent{}) } // isResponsibleForJob indicates whether a this service should be responsible for attempting jobs, From 63568df6f888141e699e28fbec82ab6f7d99092a Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 09:57:35 +1000 Subject: [PATCH 08/17] rename reset to sync --- backend/controller/cronjobs/cronjobs.go | 30 +++++++++++--------- backend/controller/cronjobs/cronjobs_test.go | 2 +- backend/controller/cronjobs/state.go | 4 +-- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 6b9e0ee06d..95243a37cd 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -43,12 +43,12 @@ type event interface { cronJobEvent() } -type resetEvent struct { +type syncEvent struct { jobs []dal.CronJob addedDeploymentKey optional.Option[model.DeploymentKey] } -func (resetEvent) cronJobEvent() {} +func (syncEvent) cronJobEvent() {} type endedJobsEvent struct { jobs []dal.CronJob @@ -112,7 +112,7 @@ func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url. } svc.UpdatedControllerList(ctx, nil) - svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: jobResetInterval}, svc.resetJobs) + svc.scheduler.Parallel(backoff.Backoff{Min: time.Second, Max: jobResetInterval}, svc.syncJobs) svc.scheduler.Singleton(backoff.Backoff{Min: time.Second, Max: time.Minute}, svc.killOldJobs) go svc.watchForUpdates(ctx) @@ -162,22 +162,24 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod return newJobs, nil } +// CreatedOrReplacedDeloyment: When a controller creates/replaces a deployment, its cron job service is responsible for +// the newly created cron jobs until other controllers have a chance to resync their list of jobs and start sharing responsibility of the new cron jobs. func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { - // Rather than finding old/new cron jobs and updating our state, we can just reset the list of jobs - _ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) + // Rather than finding old/new cron jobs and updating our state, we can just resync the list of jobs + _ = s.syncJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) } -// resetJobs is run periodically via a scheduled task -func (s *Service) resetJobs(ctx context.Context) (time.Duration, error) { - err := s.resetJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) +// syncJobs is run periodically via a scheduled task +func (s *Service) syncJobs(ctx context.Context) (time.Duration, error) { + err := s.syncJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) if err != nil { return 0, err } return jobResetInterval, nil } -// resetJobsWithNewDeploymentKey resets the list of jobs and marks the deployment key as added so that it can overrule the hash ring for a short time. -func (s *Service) resetJobsWithNewDeploymentKey(ctx context.Context, deploymentKey optional.Option[model.DeploymentKey]) error { +// syncJobsWithNewDeploymentKey resyncs the list of jobs and marks the deployment key as added so that it can overrule the hash ring for a short time. +func (s *Service) syncJobsWithNewDeploymentKey(ctx context.Context, deploymentKey optional.Option[model.DeploymentKey]) error { logger := log.FromContext(ctx) jobs, err := s.dal.GetCronJobs(ctx) @@ -185,7 +187,7 @@ func (s *Service) resetJobsWithNewDeploymentKey(ctx context.Context, deploymentK logger.Errorf(err, "failed to get cron jobs") return fmt.Errorf("failed to get cron jobs: %w", err) } - s.events.Publish(resetEvent{ + s.events.Publish(syncEvent{ jobs: jobs, addedDeploymentKey: deploymentKey, }) @@ -374,9 +376,9 @@ func (s *Service) watchForUpdates(ctx context.Context) { } case e := <-events: switch event := e.(type) { - case resetEvent: - logger.Tracef("resetting job list: %d jobs", len(event.jobs)) - state.reset(event.jobs, event.addedDeploymentKey) + case syncEvent: + logger.Tracef("syncing job list: %d jobs", len(event.jobs)) + state.sync(event.jobs, event.addedDeploymentKey) case endedJobsEvent: logger.Tracef("updating %d jobs", len(event.jobs)) state.updateJobs(event.jobs) diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 3e1c1b35a1..f20d3e4b2e 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -202,7 +202,7 @@ func TestService(t *testing.T) { Key: ctrl.key, } })) - _, _ = c.cronJobs.resetJobs(ctx) + _, _ = c.cronJobs.syncJobs(ctx) }() } diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go index 757486a436..37365a427f 100644 --- a/backend/controller/cronjobs/state.go +++ b/backend/controller/cronjobs/state.go @@ -16,7 +16,7 @@ type State struct { executing map[string]bool // Newly created jobs should be attempted by the controller that created them until other controllers - // have a chance to reset their job lists and share responsibilities through the hash ring + // have a chance to resync their job lists and share responsibilities through the hash ring newJobs map[string]time.Time // We delay any job attempts in case of db errors to avoid hammering the db in a tight loop @@ -41,7 +41,7 @@ func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool { return false } -func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { +func (s *State) sync(jobs []dal.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { s.jobs = make([]dal.CronJob, len(jobs)) copy(s.jobs, jobs) for _, job := range s.jobs { From eb7574667883722fd9d4dacb3edcd51a0728c15e Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 10:06:55 +1000 Subject: [PATCH 09/17] add comment --- backend/controller/controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 3a7b91fe57..7ae0082fad 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -140,6 +140,8 @@ type clients struct { runner ftlv1connect.RunnerServiceClient } +// ControllerListListener is regularly notified of the current list of controllers +// This is often used to update a hash ring to distribute work. type ControllerListListener interface { UpdatedControllerList(ctx context.Context, controllers []dal.Controller) } From bbb67368100a0f11f091a2fd9a370a7511efeca8 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 10:55:12 +1000 Subject: [PATCH 10/17] move cron job type to models package --- backend/controller/cronjobs/cronjobs.go | 63 +++++++------------- backend/controller/cronjobs/cronjobs_test.go | 38 ++++++------ backend/controller/cronjobs/state.go | 25 ++++---- backend/controller/dal/dal.go | 63 +++++++------------- backend/controller/sql/models.go | 2 +- backend/controller/sql/queries.sql.go | 8 +-- internal/model/cron_job.go | 22 +++++++ internal/model/verb_ref.go | 12 ++++ sqlc.yaml | 2 + 9 files changed, 117 insertions(+), 118 deletions(-) create mode 100644 internal/model/cron_job.go create mode 100644 internal/model/verb_ref.go diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 95243a37cd..948c3d0aa6 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -13,7 +13,6 @@ import ( "github.com/TBD54566975/ftl/backend/controller/scheduledtask" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" - "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -24,7 +23,6 @@ import ( "github.com/benbjohnson/clock" "github.com/jpillora/backoff" "github.com/serialx/hashring" - sl "golang.org/x/exp/slices" ) const ( @@ -44,14 +42,14 @@ type event interface { } type syncEvent struct { - jobs []dal.CronJob + jobs []model.CronJob addedDeploymentKey optional.Option[model.DeploymentKey] } func (syncEvent) cronJobEvent() {} type endedJobsEvent struct { - jobs []dal.CronJob + jobs []model.CronJob } func (endedJobsEvent) cronJobEvent() {} @@ -67,10 +65,10 @@ type hashRingState struct { } type DAL interface { - GetCronJobs(ctx context.Context) ([]dal.CronJob, error) - StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) - EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error) - GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error) + GetCronJobs(ctx context.Context) ([]model.CronJob, error) + StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) + EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) + GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) } type Scheduler interface { @@ -120,9 +118,9 @@ func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url. return svc } -func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]dal.CronJob, error) { +func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]model.CronJob, error) { start := s.clock.Now().UTC() - newJobs := []dal.CronJob{} + newJobs := []model.CronJob{} merr := []error{} for _, decl := range module.Decls { verb, ok := decl.Value.(*schemapb.Decl_Verb) @@ -145,13 +143,13 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Verb.Name, schedule, err)) continue } - newJobs = append(newJobs, dal.CronJob{ + newJobs = append(newJobs, model.CronJob{ Key: model.NewCronJobKey(module.Name, verb.Verb.Name), - Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, + Verb: model.VerbRef{Module: module.Name, Name: verb.Verb.Name}, Schedule: cronStr, StartTime: start, NextExecution: next, - State: dal.JobStateIdle, + State: model.CronJobStateIdle, // DeploymentKey: Filled in by DAL }) } @@ -194,7 +192,7 @@ func (s *Service) syncJobsWithNewDeploymentKey(ctx context.Context, deploymentKe return nil } -func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { +func (s *Service) executeJob(ctx context.Context, job model.CronJob) { logger := log.FromContext(ctx) requestBody := map[string]any{} requestJSON, err := json.Marshal(requestBody) @@ -204,11 +202,11 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { } req := connect.NewRequest(&ftlv1.CallRequest{ - Verb: &schemapb.Ref{Module: job.Ref.Module, Name: job.Ref.Name}, + Verb: &schemapb.Ref{Module: job.Verb.Module, Name: job.Verb.Name}, Body: requestJSON, }) - requestKey := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Ref.Module, job.Ref.Name)) + requestKey := model.NewRequestKey(model.OriginCron, fmt.Sprintf("%s-%s", job.Verb.Module, job.Verb.Name)) callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() @@ -234,7 +232,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { logger.Errorf(err, "failed to end cron job %v", job.Key) } else { s.events.Publish(endedJobsEvent{ - jobs: []dal.CronJob{updatedJob}, + jobs: []model.CronJob{updatedJob}, }) } } @@ -252,7 +250,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { return time.Minute, nil } - updatedJobs := []dal.CronJob{} + updatedJobs := []model.CronJob{} for _, stale := range staleJobs { start := s.clock.Now().UTC() pattern, err := cron.Parse(stale.Schedule) @@ -302,16 +300,11 @@ func (s *Service) watchForUpdates(ctx context.Context) { } for { - sl.SortFunc(state.jobs, func(i, j dal.CronJob) int { - return s.compareJobs(state, i, j) - }) - now := s.clock.Now() next := now.Add(time.Hour) // should never be reached, expect a different signal long beforehand for _, j := range state.jobs { - if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil { + if possibleNext, err := s.nextAttemptForJob(j, state, false); err == nil && possibleNext.Before(next) { next = possibleNext - break } } @@ -332,7 +325,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { return case <-s.clock.After(next.Sub(now)): // Try starting jobs in db - jobsToAttempt := slices.Filter(state.jobs, func(j dal.CronJob) bool { + jobsToAttempt := slices.Filter(state.jobs, func(j model.CronJob) bool { if n, err := s.nextAttemptForJob(j, state, true); err == nil { return !n.After(s.clock.Now().UTC()) } @@ -346,7 +339,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { } // Start jobs that were successfully updated - updatedJobs := []dal.CronJob{} + updatedJobs := []model.CronJob{} removedDeploymentKeys := map[string]model.DeploymentKey{} for _, job := range jobResults { @@ -389,23 +382,11 @@ func (s *Service) watchForUpdates(ctx context.Context) { } } -func (s *Service) compareJobs(state *State, i, j dal.CronJob) int { - iNext, err := s.nextAttemptForJob(i, state, false) - if err != nil { - return 1 - } - jNext, err := s.nextAttemptForJob(j, state, false) - if err != nil { - return -1 - } - return iNext.Compare(jNext) -} - -func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow bool) (time.Time, error) { +func (s *Service) nextAttemptForJob(job model.CronJob, state *State, allowsNow bool) (time.Time, error) { if !s.isResponsibleForJob(job, state) { return s.clock.Now(), fmt.Errorf("controller is not responsible for job") } - if job.State == dal.JobStateExecuting { + if job.State == model.CronJobStateExecuting { if state.isExecutingInCurrentController(job) { // no need to schedule this job until it finishes return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish") @@ -465,7 +446,7 @@ func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.C // isResponsibleForJob indicates whether a this service should be responsible for attempting jobs, // or if enough other controllers will handle it. This allows us to spread the job load across controllers. -func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool { +func (s *Service) isResponsibleForJob(job model.CronJob, state *State) bool { if state.isJobTooNewForHashRing(job) { return true } diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index f20d3e4b2e..7de898ed16 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -27,11 +27,11 @@ import ( type mockDAL struct { lock sync.Mutex clock *clock.Mock - jobs []dal.CronJob + jobs []model.CronJob attemptCountMap map[string]int } -func (d *mockDAL) GetCronJobs(ctx context.Context) ([]dal.CronJob, error) { +func (d *mockDAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { d.lock.Lock() defer d.lock.Unlock() @@ -42,19 +42,19 @@ func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string d.lock.Lock() defer d.lock.Unlock() - job := dal.CronJob{ + job := model.CronJob{ Key: model.NewCronJobKey(module, verb), DeploymentKey: deploymentKey, - Ref: schema.Ref{Module: module, Name: verb}, + Verb: model.VerbRef{Module: module, Name: verb}, Schedule: schedule, StartTime: startTime, NextExecution: nextExecution, - State: dal.JobStateIdle, + State: model.CronJobStateIdle, } d.jobs = append(d.jobs, job) } -func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) { +func (d *mockDAL) indexForJob(job model.CronJob) (int, error) { for i, j := range d.jobs { if j.Key.String() == job.Key.String() { return i, nil @@ -63,7 +63,7 @@ func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) { return -1, fmt.Errorf("job not found") } -func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) { +func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []dal.AttemptedCronJob, err error) { d.lock.Lock() defer d.lock.Unlock() @@ -76,8 +76,8 @@ func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemp return nil, err } job := d.jobs[i] - if !job.NextExecution.After(now) && job.State == dal.JobStateIdle { - job.State = dal.JobStateExecuting + if !job.NextExecution.After(now) && job.State == model.CronJobStateIdle { + job.State = model.CronJobStateExecuting job.StartTime = (*d.clock).Now() d.jobs[i] = job attemptedJobs = append(attemptedJobs, dal.AttemptedCronJob{ @@ -97,32 +97,32 @@ func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemp return attemptedJobs, nil } -func (d *mockDAL) EndCronJob(ctx context.Context, job dal.CronJob, next time.Time) (dal.CronJob, error) { +func (d *mockDAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { d.lock.Lock() defer d.lock.Unlock() i, err := d.indexForJob(job) if err != nil { - return dal.CronJob{}, err + return model.CronJob{}, err } internalJob := d.jobs[i] - if internalJob.State != dal.JobStateExecuting { - return dal.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") + if internalJob.State != model.CronJobStateExecuting { + return model.CronJob{}, fmt.Errorf("job can not be stopped, it isnt running") } if internalJob.StartTime != job.StartTime { - return dal.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") + return model.CronJob{}, fmt.Errorf("job can not be stopped, start time does not match") } - internalJob.State = dal.JobStateIdle + internalJob.State = model.CronJobStateIdle internalJob.NextExecution = next d.jobs[i] = internalJob return internalJob, nil } -func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]dal.CronJob, error) { +func (d *mockDAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { d.lock.Lock() defer d.lock.Unlock() - return slices.Filter(d.jobs, func(job dal.CronJob) bool { + return slices.Filter(d.jobs, func(job model.CronJob) bool { return (*d.clock).Now().After(job.StartTime.Add(duration)) }), nil } @@ -214,8 +214,8 @@ func TestService(t *testing.T) { } for _, j := range mockDal.jobs { - count := verbCallCount[j.Ref.Name] - assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name) + count := verbCallCount[j.Verb.Name] + assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) // Make sure each job is not attempted by all controllers, or the responsibility of only one controller // Target is for 2 controllers to attempt each job diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go index 37365a427f..fb3c101347 100644 --- a/backend/controller/cronjobs/state.go +++ b/backend/controller/cronjobs/state.go @@ -3,14 +3,13 @@ package cronjobs import ( "time" - "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/slices" "github.com/alecthomas/types/optional" ) type State struct { - jobs []dal.CronJob + jobs []model.CronJob // Used to determine if this controller is currently executing a job executing map[string]bool @@ -23,15 +22,15 @@ type State struct { blockedUntil time.Time } -func (s *State) isExecutingInCurrentController(job dal.CronJob) bool { +func (s *State) isExecutingInCurrentController(job model.CronJob) bool { return s.executing[job.Key.String()] } -func (s *State) startedExecutingJob(job dal.CronJob) { +func (s *State) startedExecutingJob(job model.CronJob) { s.executing[job.Key.String()] = true } -func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool { +func (s *State) isJobTooNewForHashRing(job model.CronJob) bool { if t, ok := s.newJobs[job.Key.String()]; ok { if time.Since(t) < newJobHashRingOverrideInterval { return true @@ -41,11 +40,11 @@ func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool { return false } -func (s *State) sync(jobs []dal.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { - s.jobs = make([]dal.CronJob, len(jobs)) +func (s *State) sync(jobs []model.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { + s.jobs = make([]model.CronJob, len(jobs)) copy(s.jobs, jobs) for _, job := range s.jobs { - if job.State != dal.JobStateExecuting { + if job.State != model.CronJobStateExecuting { delete(s.executing, job.Key.String()) } if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() { @@ -55,12 +54,12 @@ func (s *State) sync(jobs []dal.CronJob, newDeploymentKey optional.Option[model. } } -func (s *State) updateJobs(jobs []dal.CronJob) { +func (s *State) updateJobs(jobs []model.CronJob) { updatedJobMap := jobMap(jobs) for idx, old := range s.jobs { if updated, exists := updatedJobMap[old.Key.String()]; exists { s.jobs[idx] = updated - if updated.State != dal.JobStateExecuting { + if updated.State != model.CronJobStateExecuting { delete(s.executing, updated.Key.String()) } } @@ -68,13 +67,13 @@ func (s *State) updateJobs(jobs []dal.CronJob) { } func (s *State) removeDeploymentKey(key model.DeploymentKey) { - s.jobs = slices.Filter(s.jobs, func(j dal.CronJob) bool { + s.jobs = slices.Filter(s.jobs, func(j model.CronJob) bool { return j.DeploymentKey.String() != key.String() }) } -func jobMap(jobs []dal.CronJob) map[string]dal.CronJob { - m := map[string]dal.CronJob{} +func jobMap(jobs []model.CronJob) map[string]model.CronJob { + m := map[string]model.CronJob{} for _, job := range jobs { m[job.Key.String()] = job } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 7d611de642..2e53cccff2 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -411,7 +411,7 @@ type IngressRoutingEntry struct { // previously created artefacts with it. // // If an existing deployment with identical artefacts exists, it is returned. -func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []CronJob) (key model.DeploymentKey, err error) { +func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []model.CronJob) (key model.DeploymentKey, err error) { logger := log.FromContext(ctx) // Start the transaction @@ -496,8 +496,8 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{ Key: job.Key, DeploymentKey: deploymentKey, - ModuleName: job.Ref.Module, - Verb: job.Ref.Name, + ModuleName: job.Verb.Module, + Verb: job.Verb.Name, StartTime: job.StartTime, Schedule: job.Schedule, NextExecution: job.NextExecution, @@ -916,43 +916,20 @@ func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error) { return count, translatePGError(err) } -type JobState string - -const ( - JobStateIdle = JobState(sql.CronJobStateIdle) - JobStateExecuting = JobState(sql.CronJobStateExecuting) -) - -type CronJob struct { - Key model.CronJobKey - DeploymentKey model.DeploymentKey - Ref schema.Ref - Schedule string - StartTime time.Time - NextExecution time.Time - State JobState -} - -type AttemptedCronJob struct { - DidStartExecution bool - HasMinReplicas bool - CronJob -} - -func cronJobFromRow(row sql.GetCronJobsRow) CronJob { - return CronJob{ +func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob { + return model.CronJob{ Key: row.Key, DeploymentKey: row.DeploymentKey, - Ref: schema.Ref{Module: row.Module, Name: row.Verb}, + Verb: model.VerbRef{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, - State: JobState(row.State), + State: model.CronJobState(row.State), } } // GetCronJobs returns all cron jobs for deployments with min replicas > 0 -func (d *DAL) GetCronJobs(ctx context.Context) ([]CronJob, error) { +func (d *DAL) GetCronJobs(ctx context.Context) ([]model.CronJob, error) { rows, err := d.db.GetCronJobs(ctx) if err != nil { return nil, translatePGError(err) @@ -960,12 +937,18 @@ func (d *DAL) GetCronJobs(ctx context.Context) ([]CronJob, error) { return slices.Map(rows, cronJobFromRow), nil } +type AttemptedCronJob struct { + DidStartExecution bool + HasMinReplicas bool + model.CronJob +} + // StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row -func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs []AttemptedCronJob, err error) { +func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attemptedJobs []AttemptedCronJob, err error) { if len(jobs) == 0 { return nil, nil } - rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() })) + rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job model.CronJob) string { return job.Key.String() })) if err != nil { return nil, translatePGError(err) } @@ -973,14 +956,14 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs attemptedJobs = []AttemptedCronJob{} for _, row := range rows { job := AttemptedCronJob{ - CronJob: CronJob{ + CronJob: model.CronJob{ Key: row.Key, DeploymentKey: row.DeploymentKey, - Ref: schema.Ref{Module: row.Module, Name: row.Verb}, + Verb: model.VerbRef{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, - State: JobState(row.State), + State: model.CronJobState(row.State), }, DidStartExecution: row.Updated, HasMinReplicas: row.HasMinReplicas, @@ -992,21 +975,21 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs // EndCronJob sets the status from executing to idle and updates the next execution time // Can be called on the successful completion of a job, or if the job failed to execute (error or timeout) -func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) { +func (d *DAL) EndCronJob(ctx context.Context, job model.CronJob, next time.Time) (model.CronJob, error) { row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime) if err != nil { - return CronJob{}, translatePGError(err) + return model.CronJob{}, translatePGError(err) } return cronJobFromRow(sql.GetCronJobsRow(row)), nil } // GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration -func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]CronJob, error) { +func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]model.CronJob, error) { rows, err := d.db.GetStaleCronJobs(ctx, duration) if err != nil { return nil, translatePGError(err) } - return slices.Map(rows, func(row sql.GetStaleCronJobsRow) CronJob { + return slices.Map(rows, func(row sql.GetStaleCronJobsRow) model.CronJob { return cronJobFromRow(sql.GetCronJobsRow(row)) }), nil } diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 6626f34cf9..5687d9abb1 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -254,7 +254,7 @@ type CronJob struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState ModuleName string } diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 728ca631b2..aad9cafe90 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -172,7 +172,7 @@ type EndCronJobRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState } func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { @@ -480,7 +480,7 @@ type GetCronJobsRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState } func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { @@ -1168,7 +1168,7 @@ type GetStaleCronJobsRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState } func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) { @@ -1533,7 +1533,7 @@ type StartCronJobsRow struct { Schedule string StartTime time.Time NextExecution time.Time - State CronJobState + State model.CronJobState HasMinReplicas bool Updated bool } diff --git a/internal/model/cron_job.go b/internal/model/cron_job.go new file mode 100644 index 0000000000..42c5bf8347 --- /dev/null +++ b/internal/model/cron_job.go @@ -0,0 +1,22 @@ +package model + +import ( + "time" +) + +type CronJobState string + +const ( + CronJobStateIdle = "idle" + CronJobStateExecuting = "executing" +) + +type CronJob struct { + Key CronJobKey + DeploymentKey DeploymentKey + Verb VerbRef + Schedule string + StartTime time.Time + NextExecution time.Time + State CronJobState +} diff --git a/internal/model/verb_ref.go b/internal/model/verb_ref.go new file mode 100644 index 0000000000..3f4e507e33 --- /dev/null +++ b/internal/model/verb_ref.go @@ -0,0 +1,12 @@ +package model + +import "fmt" + +type VerbRef struct { + Module string + Name string +} + +func (v VerbRef) String() string { + return fmt.Sprintf("%s.%s", v.Module, v.Name) +} diff --git a/sqlc.yaml b/sqlc.yaml index efaabc9c0a..48891026de 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -44,6 +44,8 @@ sql: type: "NullCronJobKey" - db_type: "deployment_key" go_type: "github.com/TBD54566975/ftl/internal/model.DeploymentKey" + - db_type: "cron_job_state" + go_type: "github.com/TBD54566975/ftl/internal/model.CronJobState" - db_type: "deployment_key" nullable: true go_type: From 21ee09d9728d7dbeecba9bc767ac03b7be1bc7c4 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 11:08:01 +1000 Subject: [PATCH 11/17] Make State private and add comment --- backend/controller/cronjobs/cronjobs.go | 6 +++--- backend/controller/cronjobs/state.go | 15 ++++++++------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 948c3d0aa6..690867a67e 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -293,7 +293,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { s.events.Subscribe(events) defer s.events.Unsubscribe(events) - state := &State{ + state := &state{ executing: map[string]bool{}, newJobs: map[string]time.Time{}, blockedUntil: s.clock.Now(), @@ -382,7 +382,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { } } -func (s *Service) nextAttemptForJob(job model.CronJob, state *State, allowsNow bool) (time.Time, error) { +func (s *Service) nextAttemptForJob(job model.CronJob, state *state, allowsNow bool) (time.Time, error) { if !s.isResponsibleForJob(job, state) { return s.clock.Now(), fmt.Errorf("controller is not responsible for job") } @@ -446,7 +446,7 @@ func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.C // isResponsibleForJob indicates whether a this service should be responsible for attempting jobs, // or if enough other controllers will handle it. This allows us to spread the job load across controllers. -func (s *Service) isResponsibleForJob(job model.CronJob, state *State) bool { +func (s *Service) isResponsibleForJob(job model.CronJob, state *state) bool { if state.isJobTooNewForHashRing(job) { return true } diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go index fb3c101347..0620efe753 100644 --- a/backend/controller/cronjobs/state.go +++ b/backend/controller/cronjobs/state.go @@ -8,7 +8,8 @@ import ( "github.com/alecthomas/types/optional" ) -type State struct { +// state models the state of the cron job service's private state for scheduling jobs and reacting to events +type state struct { jobs []model.CronJob // Used to determine if this controller is currently executing a job @@ -22,15 +23,15 @@ type State struct { blockedUntil time.Time } -func (s *State) isExecutingInCurrentController(job model.CronJob) bool { +func (s *state) isExecutingInCurrentController(job model.CronJob) bool { return s.executing[job.Key.String()] } -func (s *State) startedExecutingJob(job model.CronJob) { +func (s *state) startedExecutingJob(job model.CronJob) { s.executing[job.Key.String()] = true } -func (s *State) isJobTooNewForHashRing(job model.CronJob) bool { +func (s *state) isJobTooNewForHashRing(job model.CronJob) bool { if t, ok := s.newJobs[job.Key.String()]; ok { if time.Since(t) < newJobHashRingOverrideInterval { return true @@ -40,7 +41,7 @@ func (s *State) isJobTooNewForHashRing(job model.CronJob) bool { return false } -func (s *State) sync(jobs []model.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { +func (s *state) sync(jobs []model.CronJob, newDeploymentKey optional.Option[model.DeploymentKey]) { s.jobs = make([]model.CronJob, len(jobs)) copy(s.jobs, jobs) for _, job := range s.jobs { @@ -54,7 +55,7 @@ func (s *State) sync(jobs []model.CronJob, newDeploymentKey optional.Option[mode } } -func (s *State) updateJobs(jobs []model.CronJob) { +func (s *state) updateJobs(jobs []model.CronJob) { updatedJobMap := jobMap(jobs) for idx, old := range s.jobs { if updated, exists := updatedJobMap[old.Key.String()]; exists { @@ -66,7 +67,7 @@ func (s *State) updateJobs(jobs []model.CronJob) { } } -func (s *State) removeDeploymentKey(key model.DeploymentKey) { +func (s *state) removeDeploymentKey(key model.DeploymentKey) { s.jobs = slices.Filter(s.jobs, func(j model.CronJob) bool { return j.DeploymentKey.String() != key.String() }) From aa04c43647829f583d86496d26b81cce59233211 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 11:33:36 +1000 Subject: [PATCH 12/17] originSource renamed to requestSource --- backend/controller/controller.go | 2 +- backend/controller/cronjobs/cronjobs.go | 31 ++++++++++---------- backend/controller/cronjobs/cronjobs_test.go | 3 +- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 7ae0082fad..80007683b1 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -184,7 +184,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. increaseReplicaFailures: map[string]int{}, } - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) _, _ = svc.updateControllersList(ctx) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 690867a67e..4664e7ebfd 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "net/url" "time" "connectrpc.com/connect" @@ -79,9 +78,9 @@ type Scheduler interface { type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) type Service struct { - config Config - key model.ControllerKey - originURL *url.URL + config Config + key model.ControllerKey + requestSource string dal DAL scheduler Scheduler @@ -93,20 +92,20 @@ type Service struct { hashRingState atomic.Value[*hashRingState] } -func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service { - return NewForTesting(ctx, key, originURL, config, dal, scheduler, call, clock.New()) +func New(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service { + return NewForTesting(ctx, key, requestSource, config, dal, scheduler, call, clock.New()) } -func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { +func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { svc := &Service{ - config: config, - key: key, - originURL: originURL, - dal: dal, - scheduler: scheduler, - call: call, - clock: clock, - events: pubsub.New[event](), + config: config, + key: key, + requestSource: requestSource, + dal: dal, + scheduler: scheduler, + call: call, + clock: clock, + events: pubsub.New[event](), } svc.UpdatedControllerList(ctx, nil) @@ -210,7 +209,7 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) { callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() - _, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host) + _, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource) if err != nil { logger.Errorf(err, "failed to execute cron job %v", job.Key) // Do not return, continue to end the job and schedule the next execution diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 7de898ed16..9dad6b8df1 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -3,7 +3,6 @@ package cronjobs import ( "context" "fmt" - "net/url" "strconv" "sync" "testing" @@ -180,7 +179,7 @@ func TestService(t *testing.T) { controller := &controller{ key: key, DAL: mockDal, - cronJobs: NewForTesting(ctx, key, &url.URL{Host: "test.com"}, config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { verbRef := schema.RefFromProto(r.Msg.Verb) verbCallCountLock.Lock() From 3fa699fd5d227ad4cf76d776fe7d4fcbb9e59b72 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 13:03:26 +1000 Subject: [PATCH 13/17] remove code that is no longer needed --- backend/controller/controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 80007683b1..d72261ea69 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -187,7 +187,6 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) - _, _ = svc.updateControllersList(ctx) svc.tasks.Parallel(backoff.Backoff{Min: time.Second, Max: time.Second * 5}, svc.syncRoutes) svc.tasks.Parallel(backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, svc.heartbeatController) From 03d8bdd6933c359366fbeb0df5debc8e7fbc6677 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 13:51:48 +1000 Subject: [PATCH 14/17] add test for hash ring --- backend/controller/cronjobs/cronjobs_test.go | 121 ++++++++++++++++++- 1 file changed, 115 insertions(+), 6 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 9dad6b8df1..04e045099b 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -21,6 +21,7 @@ import ( "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" "github.com/jpillora/backoff" + xslices "golang.org/x/exp/slices" ) type mockDAL struct { @@ -140,6 +141,7 @@ func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { type controller struct { key model.ControllerKey DAL DAL + clock *clock.Mock cronJobs *Service } @@ -177,8 +179,9 @@ func TestService(t *testing.T) { for i := range 5 { key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) controller := &controller{ - key: key, - DAL: mockDal, + key: key, + DAL: mockDal, + clock: clock, cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { verbRef := schema.RefFromProto(r.Msg.Verb) @@ -215,10 +218,116 @@ func TestService(t *testing.T) { for _, j := range mockDal.jobs { count := verbCallCount[j.Verb.Name] assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) + } +} + +func TestHashRing(t *testing.T) { + // This test uses multiple mock clocks to progress time for each controller individually + // This allows us to compare attempts for each cron job and know which controller attempted it + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + config := Config{Timeout: time.Minute * 5} + mockDal := &mockDAL{ + clock: clock.NewMock(), + lock: sync.Mutex{}, + attemptCountMap: map[string]int{}, + } + scheduler := &mockScheduler{} + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + // initial jobs + for i := range 100 { + deploymentKey := model.NewDeploymentKey("initial") + now := mockDal.clock.Now() + cronStr := "*/10 * * * * * *" + pattern, err := cron.Parse(cronStr) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + mockDal.createCronJob(deploymentKey, "initial", fmt.Sprintf("verb%d", i), cronStr, now, next) + } - // Make sure each job is not attempted by all controllers, or the responsibility of only one controller - // Target is for 2 controllers to attempt each job - attemptCount := mockDal.attemptCountMap[j.Key.String()] - assert.True(t, attemptCount > 1*count && attemptCount <= 3*attemptCount, "job %v was attempted %d times, expected between > 1 and <= 3 to be attempted", j.Key) + controllers := []*controller{} + for i := range 20 { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + clock := clock.NewMock() + controller := &controller{ + key: key, + DAL: mockDal, + clock: clock, + cronJobs: NewForTesting(ctx, key, "test.com", config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + go func() { + c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.key, + } + })) + _, _ = c.cronJobs.syncJobs(ctx) + }() + } + time.Sleep(time.Millisecond * 100) + + // progress time for each controller one at a time, noting which verbs got attempted each time + // to build a map of verb to controller keys + controllersForVerbs := map[string][]model.ControllerKey{} + for _, c := range controllers { + beforeAttemptCount := map[string]int{} + for k, v := range mockDal.attemptCountMap { + beforeAttemptCount[k] = v + } + + c.clock.Add(time.Second * 15) + time.Sleep(time.Millisecond * 100) + + for k, v := range mockDal.attemptCountMap { + if beforeAttemptCount[k] == v { + continue + } + controllersForVerbs[k] = append(controllersForVerbs[k], c.key) + } + } + + // Check if each job has the same key list + // Theoretically this is is possible for all jobs to have the same assigned controllers, but with 100 jobs and 20 controllers, this is unlikely + keys := []string{} + hasFoundNonMatchingKeys := false + for v, k := range controllersForVerbs { + assert.Equal(t, len(k), 2, "expected verb %s to be attempted by 2 controllers", v) + + kStrs := slices.Map(k, func(k model.ControllerKey) string { return k.String() }) + xslices.Sort(kStrs) + if len(keys) == 0 { + keys = kStrs + continue + } + + if hasFoundNonMatchingKeys == false { + for keyIdx, keyStr := range kStrs { + if keys[keyIdx] != keyStr { + hasFoundNonMatchingKeys = true + } + } + } } + assert.True(t, hasFoundNonMatchingKeys, "expected at least one verb to have different controllers assigned") } From 65de87203a78946dc0399bfe0cc824c7d9e679f2 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 14:01:36 +1000 Subject: [PATCH 15/17] fix lint --- backend/controller/cronjobs/cronjobs.go | 3 ++- backend/controller/dal/dal.go | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 4664e7ebfd..1b9a87793f 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -159,7 +159,8 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod return newJobs, nil } -// CreatedOrReplacedDeloyment: When a controller creates/replaces a deployment, its cron job service is responsible for +// CreatedOrReplacedDeloyment is only called by the responsible controller to its cron service, and will not be received by the other cron services. +// When a controller creates/replaces a deployment, its cron job service is responsible for // the newly created cron jobs until other controllers have a chance to resync their list of jobs and start sharing responsibility of the new cron jobs. func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { // Rather than finding old/new cron jobs and updating our state, we can just resync the list of jobs diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 2e53cccff2..b1586f3aec 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -924,7 +924,7 @@ func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob { Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, - State: model.CronJobState(row.State), + State: row.State, } } @@ -963,7 +963,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attempte Schedule: row.Schedule, StartTime: row.StartTime, NextExecution: row.NextExecution, - State: model.CronJobState(row.State), + State: row.State, }, DidStartExecution: row.Updated, HasMinReplicas: row.HasMinReplicas, From 5938f510dfa3eee3953654068f95b2383a169123 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 14:41:18 +1000 Subject: [PATCH 16/17] add integration test for cron jobs that uses actual db --- .../cronjobs/cronjobs_integration_test.go | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 backend/controller/cronjobs/cronjobs_integration_test.go diff --git a/backend/controller/cronjobs/cronjobs_integration_test.go b/backend/controller/cronjobs/cronjobs_integration_test.go new file mode 100644 index 0000000000..cb07d02fd4 --- /dev/null +++ b/backend/controller/cronjobs/cronjobs_integration_test.go @@ -0,0 +1,130 @@ +//go:build integration + +package cronjobs + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + "github.com/benbjohnson/clock" + "github.com/jpillora/backoff" +) + +type mockScheduler struct { +} + +func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { + // do nothing +} + +type controller struct { + key model.ControllerKey + DAL DAL + clock *clock.Mock + cronJobs *Service +} + +func TestService(t *testing.T) { + t.Parallel() + ctx := log.ContextWithNewDefaultLogger(context.Background()) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + conn := sqltest.OpenForTesting(ctx, t) + dal, err := New(ctx, conn) + assert.NoError(t, err) + + config := Config{Timeout: time.Minute * 5} + clock := clock.NewMock() + scheduler := &mockScheduler{} + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} + + // initial jobs + jobsToCreate := []model.CronJob{} + for i := range 20 { + now := clock.Now() + cronStr := "*/10 * * * * * *" + pattern, err := cron.Parse(cronStr) + assert.NoError(t, err) + next, err := cron.NextAfter(pattern, now, false) + assert.NoError(t, err) + jobsToCreate = append(jobsToCreate, model.CronJob{ + Key: model.NewCronJobKey("initial", fmt.Sprintf("verb%d", i)), + Verb model.VerbRef{Module: "initial", Name: fmt.Sprintf("verb%d", i)}, + Schedule pattern.String(), + StartTime now(), + NextExecution next, + State CronJobStateIdle, + }) + } + + dal.CreateDeployment(ctx, "go", &schema.Module{ + Name: "initial", + }, artefacts []DeploymentArtefact{}, []IngressRoutingEntry{}, jobsToCreate) (key model.DeploymentKey, err error) + + controllers := []*controller{} + for i := range 5 { + key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) + controller := &controller{ + key: key, + DAL: dal, + clock: clock, + cronJobs: NewForTesting(ctx, key, "test.com", config, dal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), + } + controllers = append(controllers, controller) + } + + time.Sleep(time.Millisecond * 100) + + for _, c := range controllers { + go func() { + c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller { + return dal.Controller{ + Key: ctrl.key, + } + })) + _, _ = c.cronJobs.syncJobs(ctx) + }() + } + + clock.Add(time.Second * 5) + time.Sleep(time.Millisecond * 100) + for range 3 { + clock.Add(time.Second * 10) + time.Sleep(time.Millisecond * 100) + } + + for _, j := range jobsToCreate { + count := verbCallCount[j.Verb.Name] + assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name) + } +} From d96d9209c4934658d31163ec0cc93757f9825b4c Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Apr 2024 14:41:24 +1000 Subject: [PATCH 17/17] scope logger to cron --- backend/controller/cronjobs/cronjobs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 1b9a87793f..e2d4ed70e3 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -287,7 +287,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { // // State is private to this function to ensure thread safety. func (s *Service) watchForUpdates(ctx context.Context) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).Scope("cron") events := make(chan event, 128) s.events.Subscribe(events)