Skip to content

Commit

Permalink
add integration test for cron jobs that uses actual db
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 15, 2024
1 parent de6fdee commit 2ce075f
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//go:build integration

package cronjobs

import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"

"connectrpc.com/connect"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
)

type mockScheduler struct {
}

func (s *mockScheduler) Singleton(retry backoff.Backoff, job scheduledtask.Job) {
// do nothing
}

func (s *mockScheduler) Parallel(retry backoff.Backoff, job scheduledtask.Job) {
// do nothing
}

type controller struct {
key model.ControllerKey
DAL DAL
clock *clock.Mock
cronJobs *Service
}

func TestService(t *testing.T) {
t.Parallel()
ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)

conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
assert.NoError(t, err)

config := Config{Timeout: time.Minute * 5}
clock := clock.NewMock()
scheduler := &mockScheduler{}

verbCallCount := map[string]int{}
verbCallCountLock := sync.Mutex{}

// initial jobs
jobsToCreate := []model.CronJob{}
for i := range 20 {
now := clock.Now()
cronStr := "*/10 * * * * * *"
pattern, err := cron.Parse(cronStr)
assert.NoError(t, err)
next, err := cron.NextAfter(pattern, now, false)
assert.NoError(t, err)
jobsToCreate = append(jobsToCreate, model.CronJob{
Key: model.NewCronJobKey("initial", fmt.Sprintf("verb%d", i)),
Verb model.VerbRef{Module: "initial", Name: fmt.Sprintf("verb%d", i)},
Schedule pattern.String(),
StartTime now(),
NextExecution next,
State CronJobStateIdle,
})
}

dal.CreateDeployment(ctx, "go", &schema.Module{
Name: "initial",
}, artefacts []DeploymentArtefact{}, []IngressRoutingEntry{}, jobsToCreate) (key model.DeploymentKey, err error)

controllers := []*controller{}
for i := range 5 {
key := model.NewControllerKey("localhost", strconv.Itoa(8080+i))
controller := &controller{
key: key,
DAL: dal,
clock: clock,
cronJobs: NewForTesting(ctx, key, "test.com", config, dal, scheduler, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) {
verbRef := schema.RefFromProto(r.Msg.Verb)

verbCallCountLock.Lock()
verbCallCount[verbRef.Name]++
verbCallCountLock.Unlock()

return &connect.Response[ftlv1.CallResponse]{}, nil
}, clock),
}
controllers = append(controllers, controller)
}

time.Sleep(time.Millisecond * 100)

for _, c := range controllers {
go func() {
c.cronJobs.UpdatedControllerList(ctx, slices.Map(controllers, func(ctrl *controller) dal.Controller {
return dal.Controller{
Key: ctrl.key,
}
}))
_, _ = c.cronJobs.syncJobs(ctx)
}()
}

clock.Add(time.Second * 5)
time.Sleep(time.Millisecond * 100)
for range 3 {
clock.Add(time.Second * 10)
time.Sleep(time.Millisecond * 100)
}

for _, j := range jobsToCreate {
count := verbCallCount[j.Verb.Name]
assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Verb.Name)
}
}

0 comments on commit 2ce075f

Please sign in to comment.