diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 3f356082ac..0ffe863e7c 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -133,12 +133,13 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod continue } newJobs = append(newJobs, dal.CronJob{ - // DeploymentKey: Filled in by DAL, + Key: model.NewCronJobKey(module.Name, verb.Verb.Name), Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name}, Schedule: cronStr, StartTime: start, NextExecution: next, State: dal.JobStateIdle, + // DeploymentKey: Filled in by DAL }) } } @@ -271,8 +272,8 @@ func (s *Service) watchForUpdates(ctx context.Context) { defer s.jobChanges.Unsubscribe(jobChanges) state := &State{ - executing: map[jobIdentifier]bool{}, - newJobs: map[jobIdentifier]time.Time{}, + executing: map[string]bool{}, + newJobs: map[string]time.Time{}, blockedUntil: s.clock.Now(), } @@ -449,7 +450,7 @@ func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool { return true } - initialKey, ok := hashringState.hashRing.GetNode(identifierForJob(job).String()) + initialKey, ok := hashringState.hashRing.GetNode(job.Key.String()) if !ok { return true } diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go index 96260138ce..080c655360 100644 --- a/backend/controller/cronjobs/state.go +++ b/backend/controller/cronjobs/state.go @@ -1,7 +1,6 @@ package cronjobs import ( - "fmt" "time" "github.com/TBD54566975/ftl/backend/controller/dal" @@ -10,49 +9,33 @@ import ( "github.com/alecthomas/types/optional" ) -type jobIdentifier struct { - deploymentKey string - verb string -} - -func identifierForJob(job dal.CronJob) jobIdentifier { - return jobIdentifier{ - deploymentKey: job.DeploymentKey.String(), - verb: job.Ref.Name, - } -} - -func (i jobIdentifier) String() string { - return fmt.Sprintf("%s:::%s", i.deploymentKey, i.verb) -} - type State struct { jobs []dal.CronJob // Used to determine if this controller is currently executing a job - executing map[jobIdentifier]bool + executing map[string]bool // Newly created jobs should be attempted by the controller that created them until other controllers // have a chance to reset their job lists and share responsibilities through the hash ring - newJobs map[jobIdentifier]time.Time + newJobs map[string]time.Time blockedUntil time.Time } func (s *State) isExecutingInCurrentController(job dal.CronJob) bool { - return s.executing[identifierForJob(job)] + return s.executing[job.Key.String()] } func (s *State) startedExecutingJob(job dal.CronJob) { - s.executing[identifierForJob(job)] = true + s.executing[job.Key.String()] = true } func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool { - if t, ok := s.newJobs[identifierForJob(job)]; ok { + if t, ok := s.newJobs[job.Key.String()]; ok { if time.Since(t) < newJobHashRingOverrideInterval { return true } - delete(s.newJobs, identifierForJob(job)) + delete(s.newJobs, job.Key.String()) } return false } @@ -62,11 +45,11 @@ func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model copy(s.jobs, jobs) for _, job := range s.jobs { if job.State != dal.JobStateExecuting { - delete(s.executing, identifierForJob(job)) + delete(s.executing, job.Key.String()) } if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() { // This job is new and should be attempted by the current controller - s.newJobs[identifierForJob(job)] = time.Now() + s.newJobs[job.Key.String()] = time.Now() } } } @@ -74,11 +57,11 @@ func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model func (s *State) updateJobs(jobs []dal.CronJob) { updatedJobMap := jobMap(jobs) for idx, old := range s.jobs { - if updated, exists := updatedJobMap[identifierForJob(old)]; exists { + if updated, exists := updatedJobMap[old.Key.String()]; exists { //TODO: compare to see if outdated s.jobs[idx] = updated if updated.State != dal.JobStateExecuting { - delete(s.executing, identifierForJob(updated)) + delete(s.executing, updated.Key.String()) } } } @@ -90,10 +73,10 @@ func (s *State) removeDeploymentKey(key model.DeploymentKey) { }) } -func jobMap(jobs []dal.CronJob) map[jobIdentifier]dal.CronJob { - m := map[jobIdentifier]dal.CronJob{} +func jobMap(jobs []dal.CronJob) map[string]dal.CronJob { + m := map[string]dal.CronJob{} for _, job := range jobs { - m[identifierForJob(job)] = job + m[job.Key.String()] = job } return m } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index e446f309fb..9ba0c2d614 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -494,6 +494,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem // Start time must be calculated by the caller rather than generated by db // This ensures that nextExecution is after start time, otherwise the job will never be triggered err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{ + Key: job.Key, DeploymentKey: deploymentKey, ModuleName: job.Ref.Module, Verb: job.Ref.Name, @@ -923,7 +924,7 @@ const ( ) type CronJob struct { - id int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Ref schema.Ref Schedule string @@ -940,7 +941,7 @@ type AttemptedCronJob struct { func cronJobFromRow(row sql.GetCronJobsRow) CronJob { return CronJob{ - id: row.ID, + Key: row.Key, DeploymentKey: row.DeploymentKey, Ref: schema.Ref{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, @@ -965,7 +966,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs if len(jobs) == 0 { return nil, nil } - rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) int64 { return job.id })) + rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() })) if err != nil { return nil, translatePGError(err) } @@ -974,7 +975,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs for _, row := range rows { job := AttemptedCronJob{ CronJob: CronJob{ - id: row.ID, + Key: row.Key, DeploymentKey: row.DeploymentKey, Ref: schema.Ref{Module: row.Module, Name: row.Verb}, Schedule: row.Schedule, @@ -991,7 +992,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs } func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) { - row, err := d.db.EndCronJob(ctx, next, job.id, job.StartTime) + row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime) if err != nil { return CronJob{}, translatePGError(err) } diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 5a0a8a75fd..6626f34cf9 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -248,6 +248,7 @@ type Controller struct { type CronJob struct { ID int64 + Key model.CronJobKey DeploymentID int64 Verb string Schedule string diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 6663ba4764..eac52ac819 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -21,7 +21,7 @@ type Querier interface { CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) - EndCronJob(ctx context.Context, nextExecution time.Time, iD int64, startTime time.Time) (EndCronJobRow, error) + EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) ExpireRunnerReservations(ctx context.Context) (int64, error) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) @@ -66,7 +66,7 @@ type Querier interface { // Find an idle runner and reserve it for the given deployment. ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error) SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error - StartCronJobs(ctx context.Context, ids []int64) ([]StartCronJobsRow, error) + StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) // Upsert a runner and return the deployment ID that it is assigned to, if any. diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 5b143b2f15..9d728deedd 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -281,14 +281,16 @@ SELECT COUNT(*) FROM rows; -- name: GetCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0; -- name: CreateCronJob :exec -INSERT INTO cron_jobs (deployment_id, module_name, verb, schedule, start_time, next_execution) - VALUES ((SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), +INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) + VALUES ( + sqlc.arg('key')::cron_job_key, + (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), sqlc.arg('module_name')::TEXT, sqlc.arg('verb')::TEXT, sqlc.arg('schedule')::TEXT, @@ -300,39 +302,39 @@ WITH updates AS ( UPDATE cron_jobs SET state = 'executing', start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - WHERE id = ANY (sqlc.arg('ids')) + WHERE key = ANY (sqlc.arg('keys')) AND state = 'idle' AND start_time < next_execution AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - RETURNING id, state, start_time, next_execution) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, + RETURNING id, key, state, start_time, next_execution) +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, COALESCE(u.start_time, j.start_time) as start_time, COALESCE(u.next_execution, j.next_execution) as next_execution, COALESCE(u.state, j.state) as state, d.min_replicas > 0 as has_min_replicas, - CASE WHEN u.id IS NULL THEN FALSE ELSE TRUE END as updated + CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id LEFT JOIN updates u on j.id = u.id -WHERE j.id = ANY (sqlc.arg('ids')); +WHERE j.key = ANY (sqlc.arg('keys')); -- name: EndCronJob :one WITH j AS ( UPDATE cron_jobs SET state = 'idle', next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ - WHERE id = sqlc.arg('id')::BIGINT + WHERE key = sqlc.arg('key')::cron_job_key AND state = 'executing' AND start_time = sqlc.arg('start_time')::TIMESTAMPTZ RETURNING * ) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM j INNER JOIN deployments d on j.deployment_id = d.id LIMIT 1; -- name: GetStaleCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE state = 'executing' diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 480db14253..728ca631b2 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -52,16 +52,19 @@ func (q *Queries) CreateArtefact(ctx context.Context, digest []byte, content []b } const createCronJob = `-- name: CreateCronJob :exec - INSERT INTO cron_jobs (deployment_id, module_name, verb, schedule, start_time, next_execution) - VALUES ((SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1), - $2::TEXT, - $3::TEXT, - $4::TEXT, - $5::TIMESTAMPTZ, - $6::TIMESTAMPTZ) +INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution) + VALUES ( + $1::cron_job_key, + (SELECT id FROM deployments WHERE key = $2::deployment_key LIMIT 1), + $3::TEXT, + $4::TEXT, + $5::TEXT, + $6::TIMESTAMPTZ, + $7::TIMESTAMPTZ) ` type CreateCronJobParams struct { + Key model.CronJobKey DeploymentKey model.DeploymentKey ModuleName string Verb string @@ -72,6 +75,7 @@ type CreateCronJobParams struct { func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error { _, err := q.db.Exec(ctx, createCronJob, + arg.Key, arg.DeploymentKey, arg.ModuleName, arg.Verb, @@ -149,19 +153,19 @@ WITH j AS ( UPDATE cron_jobs SET state = 'idle', next_execution = $1::TIMESTAMPTZ - WHERE id = $2::BIGINT + WHERE key = $2::cron_job_key AND state = 'executing' AND start_time = $3::TIMESTAMPTZ - RETURNING id, deployment_id, verb, schedule, start_time, next_execution, state, module_name + RETURNING id, key, deployment_id, verb, schedule, start_time, next_execution, state, module_name ) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM j INNER JOIN deployments d on j.deployment_id = d.id LIMIT 1 ` type EndCronJobRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -171,11 +175,11 @@ type EndCronJobRow struct { State CronJobState } -func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, iD int64, startTime time.Time) (EndCronJobRow, error) { - row := q.db.QueryRow(ctx, endCronJob, nextExecution, iD, startTime) +func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) { + row := q.db.QueryRow(ctx, endCronJob, nextExecution, key, startTime) var i EndCronJobRow err := row.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, @@ -462,14 +466,14 @@ func (q *Queries) GetControllers(ctx context.Context, all bool) ([]Controller, e } const getCronJobs = `-- name: GetCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE d.min_replicas > 0 ` type GetCronJobsRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -489,7 +493,7 @@ func (q *Queries) GetCronJobs(ctx context.Context) ([]GetCronJobsRow, error) { for rows.Next() { var i GetCronJobsRow if err := rows.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, @@ -1149,7 +1153,7 @@ func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.Deploym } const getStaleCronJobs = `-- name: GetStaleCronJobs :many -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id WHERE state = 'executing' @@ -1157,7 +1161,7 @@ WHERE state = 'executing' ` type GetStaleCronJobsRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -1177,7 +1181,7 @@ func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) for rows.Next() { var i GetStaleCronJobsRow if err := rows.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, @@ -1504,25 +1508,25 @@ WITH updates AS ( UPDATE cron_jobs SET state = 'executing', start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - WHERE id = ANY ($1) + WHERE key = ANY ($1) AND state = 'idle' AND start_time < next_execution AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ - RETURNING id, state, start_time, next_execution) -SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, + RETURNING id, key, state, start_time, next_execution) +SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, COALESCE(u.start_time, j.start_time) as start_time, COALESCE(u.next_execution, j.next_execution) as next_execution, COALESCE(u.state, j.state) as state, d.min_replicas > 0 as has_min_replicas, - CASE WHEN u.id IS NULL THEN FALSE ELSE TRUE END as updated + CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated FROM cron_jobs j INNER JOIN deployments d on j.deployment_id = d.id LEFT JOIN updates u on j.id = u.id -WHERE j.id = ANY ($1) +WHERE j.key = ANY ($1) ` type StartCronJobsRow struct { - ID int64 + Key model.CronJobKey DeploymentKey model.DeploymentKey Module string Verb string @@ -1534,8 +1538,8 @@ type StartCronJobsRow struct { Updated bool } -func (q *Queries) StartCronJobs(ctx context.Context, ids []int64) ([]StartCronJobsRow, error) { - rows, err := q.db.Query(ctx, startCronJobs, ids) +func (q *Queries) StartCronJobs(ctx context.Context, keys []string) ([]StartCronJobsRow, error) { + rows, err := q.db.Query(ctx, startCronJobs, keys) if err != nil { return nil, err } @@ -1544,7 +1548,7 @@ func (q *Queries) StartCronJobs(ctx context.Context, ids []int64) ([]StartCronJo for rows.Next() { var i StartCronJobsRow if err := rows.Scan( - &i.ID, + &i.Key, &i.DeploymentKey, &i.Module, &i.Verb, diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index 3023009700..82bdef4c51 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -237,9 +237,12 @@ CREATE TYPE cron_job_state AS ENUM ( 'executing' ); +CREATE DOMAIN cron_job_key AS VARCHAR; + CREATE TABLE cron_jobs ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + key cron_job_key UNIQUE NOT NULL, deployment_id BIGINT NOT NULL REFERENCES deployments (id) ON DELETE CASCADE, verb VARCHAR NOT NULL, schedule VARCHAR NOT NULL, @@ -252,6 +255,7 @@ CREATE TABLE cron_jobs ); CREATE INDEX cron_jobs_executing_start_time_idx ON cron_jobs (start_time) WHERE state = 'executing'; +CREATE UNIQUE INDEX cron_jobs_key_idx ON cron_jobs (key); CREATE TYPE event_type AS ENUM ( 'call', diff --git a/backend/controller/sql/types.go b/backend/controller/sql/types.go index 6f4d66bd56..9c23751131 100644 --- a/backend/controller/sql/types.go +++ b/backend/controller/sql/types.go @@ -13,12 +13,16 @@ import ( type NullTime = optional.Option[time.Time] type NullDuration = optional.Option[time.Duration] type NullRunnerKey = optional.Option[model.RunnerKey] +type NullCronJobKey = optional.Option[model.CronJobKey] type NullDeploymentKey = optional.Option[model.DeploymentKey] type NullRequestKey = optional.Option[model.RequestKey] var _ sql.Scanner = (*NullRunnerKey)(nil) var _ driver.Valuer = (*NullRunnerKey)(nil) +var _ sql.Scanner = (*NullCronJobKey)(nil) +var _ driver.Valuer = (*NullCronJobKey)(nil) + var _ sql.Scanner = (*NullDeploymentKey)(nil) var _ driver.Valuer = (*NullDeploymentKey)(nil) diff --git a/internal/model/cron_job_key.go b/internal/model/cron_job_key.go new file mode 100644 index 0000000000..fea4d2984d --- /dev/null +++ b/internal/model/cron_job_key.go @@ -0,0 +1,31 @@ +package model + +import ( + "errors" + "strings" +) + +type CronJobKey = KeyType[CronJobPayload, *CronJobPayload] + +func NewCronJobKey(module, verb string) CronJobKey { + return newKey[CronJobPayload](strings.Join([]string{module, verb}, "-")) +} + +func ParseCronJobKey(key string) (CronJobKey, error) { return parseKey[CronJobPayload](key) } + +type CronJobPayload struct { + Ref string +} + +var _ KeyPayload = (*CronJobPayload)(nil) + +func (d *CronJobPayload) Kind() string { return "crn" } +func (d *CronJobPayload) String() string { return d.Ref } +func (d *CronJobPayload) Parse(parts []string) error { + if len(parts) == 0 { + return errors.New("expected - but got empty string") + } + d.Ref = strings.Join(parts, "-") + return nil +} +func (d *CronJobPayload) RandomBytes() int { return 10 } diff --git a/sqlc.yaml b/sqlc.yaml index 050c7648b6..efaabc9c0a 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -36,6 +36,12 @@ sql: nullable: true go_type: type: "NullRunnerKey" + - db_type: "cron_job_key" + go_type: "github.com/TBD54566975/ftl/internal/model.CronJobKey" + - db_type: "cron_job_key" + nullable: true + go_type: + type: "NullCronJobKey" - db_type: "deployment_key" go_type: "github.com/TBD54566975/ftl/internal/model.DeploymentKey" - db_type: "deployment_key"