Skip to content

Commit

Permalink
Showing 16 changed files with 224 additions and 98 deletions.
7 changes: 2 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
@@ -279,13 +279,11 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca

svc.registry = artefacts.New(conn)

svc.dal = dal.New(ctx, conn, encryption, pubSub)

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn)
svc.cronJobs = cronSvc
svc.dal = dal.New(ctx, conn, encryption, pubSub, cronSvc)

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

@@ -1164,13 +1162,12 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
cronJobs, err := s.cronJobs.NewCronJobsForModule(ctx, req.Msg.Schema)
if err != nil {
logger.Errorf(err, "Could not generate cron jobs for new deployment")
return nil, fmt.Errorf("could not generate cron jobs for new deployment: %w", err)
}

dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, cronJobs)
dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
31 changes: 20 additions & 11 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ import (
encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
@@ -47,35 +46,35 @@ func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource s
return svc
}

func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Module) ([]model.CronJob, error) {
func (s *Service) NewCronJobsForModule(ctx context.Context, module *schema.Module) ([]model.CronJob, error) {
logger := log.FromContext(ctx).Scope("cron").Module(module.Name)
start := s.clock.Now().UTC()
newJobs := []model.CronJob{}
merr := []error{}
for _, decl := range module.Decls {
verb, ok := decl.Value.(*schemapb.Decl_Verb)
verb, ok := decl.(*schema.Verb)
if !ok {
continue
}
for _, metadata := range verb.Verb.Metadata {
cronMetadata, ok := metadata.Value.(*schemapb.Metadata_CronJob)
for _, metadata := range verb.Metadata {
cronMetadata, ok := metadata.(*schema.MetadataCronJob)
if !ok {
continue
}
cronStr := cronMetadata.CronJob.Cron
cronStr := cronMetadata.Cron
schedule, err := cron.Parse(cronStr)
if err != nil {
merr = append(merr, fmt.Errorf("failed to parse cron schedule %q: %w", cronStr, err))
continue
}
next, err := cron.NextAfter(schedule, start, false)
if err != nil {
merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Verb.Name, schedule, err))
merr = append(merr, fmt.Errorf("failed to calculate next execution for cron job %v:%v with schedule %q: %w", module.Name, verb.Name, schedule, err))
continue
}
newJobs = append(newJobs, model.CronJob{
Key: model.NewCronJobKey(module.Name, verb.Verb.Name),
Verb: schema.Ref{Module: module.Name, Name: verb.Verb.Name},
Key: model.NewCronJobKey(module.Name, verb.Name),
Verb: schema.Ref{Module: module.Name, Name: verb.Name},
Schedule: cronStr,
StartTime: start,
NextExecution: next,
@@ -96,15 +95,15 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod
func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context) error {
logger := log.FromContext(ctx).Scope("cron")
logger.Tracef("New deployment; scheduling cron jobs")
err := s.scheduleCronJobs(ctx)
err := s.ScheduleCronJobs(ctx)
if err != nil {
return fmt.Errorf("failed to schedule cron jobs: %w", err)
}
return nil
}

// scheduleCronJobs schedules all cron jobs that are not already scheduled.
func (s *Service) scheduleCronJobs(ctx context.Context) (err error) {
func (s *Service) ScheduleCronJobs(ctx context.Context) (err error) {
logger := log.FromContext(ctx).Scope("cron")
now := s.clock.Now().UTC()

@@ -237,3 +236,13 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr
})
return nil
}

func (s *Service) DeleteCronJobsForDeployment(ctx context.Context, key model.DeploymentKey) error {
logger := log.FromContext(ctx).Scope("cron")
logger.Tracef("Deleting cron jobs for deployment %q", key)
err := s.dal.DeleteCronJobsForDeployment(ctx, key)
if err != nil {
return fmt.Errorf("failed to remove cron jobs for deployment %q: %w", key, err)
}
return nil
}
40 changes: 40 additions & 0 deletions backend/controller/cronjobs/cronjobs_integration_test.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ package cronjobs
import (
"os"
"path/filepath"
"strings"
"testing"
"time"

@@ -35,3 +36,42 @@ func TestCron(t *testing.T) {
},
)
}

func TestCronIsRemoved(t *testing.T) {
dir := t.TempDir()
// We want to make sure that cron jobs are shut down when the deployment is updated
// And we don't end up with double invocations
// To test this we are going to remove the cron and turn it into a normal verb
// If the verb is still invoked after the redeploy then we have a problem
tmpFile := filepath.Join(dir, "cron.txt")
t.Setenv("DEST_FILE", tmpFile)

t.Cleanup(func() { _ = os.Remove(tmpFile) })

in.Run(t,
in.WithLanguages("go"),
in.CopyModule("cron"),
in.Deploy("cron"),
in.Wait("cron"),
in.Sleep(1*time.Second),
func(t testing.TB, ic in.TestContext) {
_, err := os.Stat(tmpFile)
assert.NoError(t, err)
data, err := os.ReadFile(tmpFile)
assert.NoError(t, err)
assert.True(t, strings.Contains(string(data), "Hello, world!"))
},
in.EditFile("cron/cron.go", func(content []byte) []byte {
ret := strings.ReplaceAll(string(content), "//ftl:cron * * * * * * *", "//ftl:verb")
ret = strings.ReplaceAll(ret, "Hello, world!", "NEW VERB")
return []byte(ret)
}),
in.Deploy("cron"),
func(t testing.TB, ic in.TestContext) {
time.Sleep(2 * time.Second)
data, err := os.ReadFile(tmpFile)
assert.NoError(t, err)
assert.False(t, strings.Contains(string(data), "NEW VERB"))
},
)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cronjobs
package internal

import (
"context"
@@ -13,6 +13,7 @@ import (
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/cronjobs"
"github.com/TBD54566975/ftl/backend/controller/cronjobs/internal/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
@@ -37,35 +38,52 @@ func TestNewCronJobsForModule(t *testing.T) {
clk := clock.NewMock()
clk.Add(time.Second) // half way between cron job executions

key := model.NewControllerKey("localhost", strconv.Itoa(8080+1))
conn := sqltest.OpenForTesting(ctx, t)
dal := dal.New(conn)

uri := "fake-kms://CK6YwYkBElQKSAowdHlwZS5nb29nbGVhcGlzLmNvbS9nb29nbGUuY3J5cHRvLnRpbmsuQWVzR2NtS2V5EhIaEJy4TIQgfCuwxA3ZZgChp_wYARABGK6YwYkBIAE"
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder().WithKMSURI(optional.Some(uri)))
assert.NoError(t, err)

key := model.NewControllerKey("localhost", strconv.Itoa(8080+1))
timelineSrv := timeline.New(ctx, conn, encryption)
cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)

scheduler := scheduledtask.New(ctx, key, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
parentDAL := parentdal.New(ctx, conn, encryption, pubSub)
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs)
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute

deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", &schema.Module{
Name: moduleName,
}, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{}, jobsToCreate)
decls := []schema.Decl{}
for _, job := range jobsToCreate {
decls = append(decls, &schema.Verb{
Name: job.Verb.Name,
Metadata: []schema.Metadata{&schema.MetadataCronJob{Cron: job.Schedule}},
Request: &schema.Unit{},
Response: &schema.Unit{},
})
}
moduleSchema := &schema.Module{
Name: moduleName,
Decls: decls,
}
deploymentKey, err := parentDAL.CreateDeployment(ctx, "go", moduleSchema, []dalmodel.DeploymentArtefact{}, []parentdal.IngressRoutingEntry{})
assert.NoError(t, err)
err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1)
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)

// Progress so that start_time is valid
clk.Add(time.Second)
cjs := NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)
// All jobs need to be scheduled
expectUnscheduledJobs(t, dal, clk, 2)
unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now())
jobsByVerb := map[string]model.CronJob{}
for _, job := range unscheduledJobs {
jobsByVerb[job.Verb.Name] = job
}
for i := range jobsToCreate {
jobsToCreate[i].Key = jobsByVerb[jobsToCreate[i].Verb.Name].Key
}
assert.NoError(t, err)
assert.Equal(t, len(unscheduledJobs), 2)

@@ -74,7 +92,7 @@ func TestNewCronJobsForModule(t *testing.T) {
assert.IsError(t, err, libdal.ErrNotFound)
assert.EqualError(t, err, "no pending async calls: not found")

err = cjs.scheduleCronJobs(ctx)
err = cjs.ScheduleCronJobs(ctx)
assert.NoError(t, err)
expectUnscheduledJobs(t, dal, clk, 0)
for _, job := range jobsToCreate {
8 changes: 8 additions & 0 deletions backend/controller/cronjobs/internal/dal/dal.go
Original file line number Diff line number Diff line change
@@ -100,3 +100,11 @@ func (d *DAL) UpdateCronJobExecution(ctx context.Context, params UpdateCronJobEx
}
return nil
}

func (d *DAL) DeleteCronJobsForDeployment(ctx context.Context, key model.DeploymentKey) error {
err := d.db.DeleteCronJobsForDeployment(ctx, key)
if err != nil {
return fmt.Errorf("failed to delete cron jobs for deployment %v: %w", key, libdal.TranslatePGError(err))
}
return nil
}
1 change: 1 addition & 0 deletions backend/controller/cronjobs/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions backend/controller/cronjobs/internal/sql/queries.sql
Original file line number Diff line number Diff line change
@@ -49,3 +49,7 @@ SELECT EXISTS (
AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ
AND ac.state = 'pending'
) AS pending;

-- name: DeleteCronJobsForDeployment :exec
DELETE FROM cron_jobs
WHERE deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1);
10 changes: 10 additions & 0 deletions backend/controller/cronjobs/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ func TestNoCallToAcquire(t *testing.T) {
assert.NoError(t, err)
scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub)
dal := New(ctx, conn, encryption, pubSub, nil)

_, _, err = dal.AcquireAsyncCall(ctx)
assert.IsError(t, err, libdal.ErrNotFound)
Loading

0 comments on commit 5ed558d

Please sign in to comment.