Skip to content

Commit

Permalink
feat: cronjobs db changes (#1229)
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e authored Apr 12, 2024
1 parent fc287af commit 8ca5926
Show file tree
Hide file tree
Showing 11 changed files with 559 additions and 8 deletions.
8 changes: 4 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,14 +732,14 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
dname, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes, nil)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
}
deploymentLogger := s.getDeploymentLogger(ctx, dname)
deploymentLogger.Debugf("Created deployment %s", dname)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dname.String()}), nil
deploymentLogger := s.getDeploymentLogger(ctx, dkey)
deploymentLogger.Debugf("Created deployment %s", dkey)
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentKey: dkey.String()}), nil
}

// Load schemas for existing modules, combine with our new one, and validate the new module in the context
Expand Down
114 changes: 113 additions & 1 deletion backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ type IngressRoutingEntry struct {
// previously created artefacts with it.
//
// If an existing deployment with identical artefacts exists, it is returned.
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentKey, err error) {
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []DeploymentArtefact, ingressRoutes []IngressRoutingEntry, cronJobs []CronJob) (key model.DeploymentKey, err error) {
logger := log.FromContext(ctx)

// Start the transaction
Expand Down Expand Up @@ -490,6 +490,23 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
}
}

for _, job := range cronJobs {
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{
Key: job.Key,
DeploymentKey: deploymentKey,
ModuleName: job.Ref.Module,
Verb: job.Ref.Name,
StartTime: job.StartTime,
Schedule: job.Schedule,
NextExecution: job.NextExecution,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to create cron job: %w", translatePGError(err))
}
}

return deploymentKey, nil
}

Expand Down Expand Up @@ -899,6 +916,101 @@ func (d *DAL) ExpireRunnerClaims(ctx context.Context) (int64, error) {
return count, translatePGError(err)
}

type JobState string

const (
JobStateIdle = JobState(sql.CronJobStateIdle)
JobStateExecuting = JobState(sql.CronJobStateExecuting)
)

type CronJob struct {
Key model.CronJobKey
DeploymentKey model.DeploymentKey
Ref schema.Ref
Schedule string
StartTime time.Time
NextExecution time.Time
State JobState
}

type AttemptedCronJob struct {
DidStartExecution bool
HasMinReplicas bool
CronJob
}

func cronJobFromRow(row sql.GetCronJobsRow) CronJob {
return CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
State: JobState(row.State),
}
}

// GetCronJobs returns all cron jobs for deployments with min replicas > 0
func (d *DAL) GetCronJobs(ctx context.Context) ([]CronJob, error) {
rows, err := d.db.GetCronJobs(ctx)
if err != nil {
return nil, translatePGError(err)
}
return slices.Map(rows, cronJobFromRow), nil
}

// StartCronJobs returns a full list of results so that the caller can update their list of jobs whether or not they successfully updated the row
func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs []AttemptedCronJob, err error) {
if len(jobs) == 0 {
return nil, nil
}
rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() }))
if err != nil {
return nil, translatePGError(err)
}

attemptedJobs = []AttemptedCronJob{}
for _, row := range rows {
job := AttemptedCronJob{
CronJob: CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
State: JobState(row.State),
},
DidStartExecution: row.Updated,
HasMinReplicas: row.HasMinReplicas,
}
attemptedJobs = append(attemptedJobs, job)
}
return attemptedJobs, nil
}

// EndCronJob sets the status from executing to idle and updates the next execution time
// Can be called on the successful completion of a job, or if the job failed to execute (error or timeout)
func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) {
row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime)
if err != nil {
return CronJob{}, translatePGError(err)
}
return cronJobFromRow(sql.GetCronJobsRow(row)), nil
}

// GetStaleCronJobs returns a list of cron jobs that have been executing longer than the duration
func (d *DAL) GetStaleCronJobs(ctx context.Context, duration time.Duration) ([]CronJob, error) {
rows, err := d.db.GetStaleCronJobs(ctx, duration)
if err != nil {
return nil, translatePGError(err)
}
return slices.Map(rows, func(row sql.GetStaleCronJobsRow) CronJob {
return cronJobFromRow(sql.GetCronJobsRow(row))
}), nil
}

func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
attributes, err := json.Marshal(log.Attributes)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDAL(t *testing.T) {
Digest: testSha,
Executable: true,
Path: "dir/filename",
}}, nil)
}}, nil, nil)
assert.NoError(t, err)
})

Expand Down
54 changes: 54 additions & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,66 @@ WITH rows AS (
SELECT COUNT(*)
FROM rows;

-- name: GetCronJobs :many
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE d.min_replicas > 0;

-- name: CreateCronJob :exec
INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution)
VALUES (
sqlc.arg('key')::cron_job_key,
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
sqlc.arg('module_name')::TEXT,
sqlc.arg('verb')::TEXT,
sqlc.arg('schedule')::TEXT,
sqlc.arg('start_time')::TIMESTAMPTZ,
sqlc.arg('next_execution')::TIMESTAMPTZ);

-- name: StartCronJobs :many
WITH updates AS (
UPDATE cron_jobs
SET state = 'executing',
start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
WHERE key = ANY (sqlc.arg('keys'))
AND state = 'idle'
AND start_time < next_execution
AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
RETURNING id, key, state, start_time, next_execution)
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule,
COALESCE(u.start_time, j.start_time) as start_time,
COALESCE(u.next_execution, j.next_execution) as next_execution,
COALESCE(u.state, j.state) as state,
d.min_replicas > 0 as has_min_replicas,
CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
LEFT JOIN updates u on j.id = u.id
WHERE j.key = ANY (sqlc.arg('keys'));

-- name: EndCronJob :one
WITH j AS (
UPDATE cron_jobs
SET state = 'idle',
next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ
WHERE key = sqlc.arg('key')::cron_job_key
AND state = 'executing'
AND start_time = sqlc.arg('start_time')::TIMESTAMPTZ
RETURNING *
)
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM j
INNER JOIN deployments d on j.deployment_id = d.id
LIMIT 1;

-- name: GetStaleCronJobs :many
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE state = 'executing'
AND start_time < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL;

-- name: InsertLogEvent :exec
INSERT INTO events (deployment_id, request_id, time_stamp, custom_key_1, type, payload)
VALUES ((SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
Expand Down Expand Up @@ -349,6 +409,10 @@ VALUES ((SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

-- name: CreateCronRequest :exec
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);

-- name: UpsertController :one
INSERT INTO controller (key, endpoint)
VALUES ($1, $2)
Expand Down
Loading

0 comments on commit 8ca5926

Please sign in to comment.