diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 96f0e8e4b2..57be46cf39 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -182,7 +182,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. increaseReplicaFailures: map[string]int{}, } - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc) + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) _, _ = svc.updateControllersList(ctx) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 397ac432b3..2ecf7df1c3 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -68,9 +68,7 @@ type Scheduler interface { Parallel(retry backoff.Backoff, job scheduledtask.Job) } -type CallExecuter interface { - CallWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) -} +type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) type Service struct { config Config @@ -79,7 +77,7 @@ type Service struct { dal DAL scheduler Scheduler - executor CallExecuter // Change the type from *CallExecuter to CallExecuter + call ExecuteCallFunc clock clock.Clock jobChanges *pubsub.Topic[jobChange] @@ -87,18 +85,18 @@ type Service struct { hashRingState atomic.Value[*hashRingState] } -func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, executor CallExecuter) *Service { - return NewForTesting(ctx, key, originURL, config, dal, scheduler, executor, clock.New()) +func New(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc) *Service { + return NewForTesting(ctx, key, originURL, config, dal, scheduler, call, clock.New()) } -func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, executor CallExecuter, clock clock.Clock) *Service { +func NewForTesting(ctx context.Context, key model.ControllerKey, originURL *url.URL, config Config, dal DAL, scheduler Scheduler, call ExecuteCallFunc, clock clock.Clock) *Service { svc := &Service{ config: config, key: key, originURL: originURL, dal: dal, scheduler: scheduler, - executor: executor, + call: call, clock: clock, jobChanges: pubsub.New[jobChange](), } @@ -196,7 +194,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() - _, err = s.executor.CallWithRequest(callCtx, req, optional.Some(requestKey), s.originURL.Host) + _, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host) if err != nil { logger.Errorf(err, "failed to execute cron job %s", job.Ref.String()) } diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 1be35088ec..424bcdf542 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -43,6 +43,7 @@ func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string defer d.lock.Unlock() job := dal.CronJob{ + Key: model.NewCronJobKey(module, verb), DeploymentKey: deploymentKey, Ref: schema.Ref{Module: module, Name: verb}, Schedule: schedule, @@ -137,22 +138,6 @@ func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) { // do nothing } -type mockExecutor struct { - verbCallCount map[string]int - lock sync.Mutex - clock *clock.Mock -} - -func (e *mockExecutor) CallWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) { - verbRef := schema.RefFromProto(req.Msg.Verb) - - e.lock.Lock() - e.verbCallCount[verbRef.Name]++ - e.lock.Unlock() - - return &connect.Response[ftlv1.CallResponse]{}, nil -} - type controller struct { key model.ControllerKey DAL DAL @@ -173,11 +158,9 @@ func TestService(t *testing.T) { attemptCountMap: map[string]int{}, } scheduler := &mockScheduler{} - executor := &mockExecutor{ - verbCallCount: map[string]int{}, - lock: sync.Mutex{}, - clock: clock, - } + + verbCallCount := map[string]int{} + verbCallCountLock := sync.Mutex{} // initial jobs for i := range 20 { @@ -195,9 +178,17 @@ func TestService(t *testing.T) { for i := range 5 { key := model.NewControllerKey("localhost", strconv.Itoa(8080+i)) controller := &controller{ - key: key, - DAL: mockDal, - cronJobs: NewForTesting(ctx, key, &url.URL{Host: "test.com"}, config, mockDal, scheduler, executor, clock), + key: key, + DAL: mockDal, + cronJobs: NewForTesting(ctx, key, &url.URL{Host: "test.com"}, config, mockDal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + verbRef := schema.RefFromProto(r.Msg.Verb) + + verbCallCountLock.Lock() + verbCallCount[verbRef.Name]++ + verbCallCountLock.Unlock() + + return &connect.Response[ftlv1.CallResponse]{}, nil + }, clock), } controllers = append(controllers, controller) } @@ -223,7 +214,7 @@ func TestService(t *testing.T) { } for _, j := range mockDal.jobs { - count := executor.verbCallCount[j.Ref.Name] + count := verbCallCount[j.Ref.Name] assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name) } }