diff --git a/Justfile b/Justfile index 08de97540b..1bb14ba4a3 100644 --- a/Justfile +++ b/Justfile @@ -42,7 +42,7 @@ build-generate: @mk internal/log/log_level_string.go : internal/log/api.go -- go generate -x ./internal/log # Build command-line tools -build +tools: build-protos build-sqlc build-zips build-frontend +build +tools: build-protos build-zips build-frontend #!/bin/bash shopt -s extglob for tool in $@; do mk "{{RELEASE}}/$tool" : !(build) -- go build -o "{{RELEASE}}/$tool" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./cmd/$tool"; done @@ -56,8 +56,8 @@ init-db: dbmate --migrations-dir backend/controller/sql/schema up # Regenerate SQLC code (requires init-db to be run first) -build-sqlc: - @mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema -- sqlc generate +build-sqlc: init-db + @mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- sqlc generate # Build the ZIP files that are embedded in the FTL release binaries build-zips: build-kt-runtime diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 9a4f7fe1c5..9e12e0f110 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -168,26 +168,6 @@ type Service struct { increaseReplicaFailures map[string]int } -type jobConfig struct { - job scheduledtask.Job - backoff backoff.Backoff - develBackoff optional.Option[backoff.Backoff] -} - -func (j jobConfig) getBackoff(devel bool) backoff.Backoff { - var bo backoff.Backoff - if devel { - var ok bool - if bo, ok = j.develBackoff.Get(); !ok { - // if in devel and develBackoff is empty, use 1s for min/max - bo = backoff.Backoff{Min: time.Second, Max: time.Second} - } - } else { - bo = j.backoff - } - return bo -} - func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) { key := config.Key if config.Key.IsZero() { @@ -195,7 +175,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. } config.SetDefaults() svc := &Service{ - tasks: scheduledtask.New(ctx, key), + tasks: scheduledtask.New(ctx, key, db), dal: db, key: key, deploymentLogsSink: newDeploymentLogsSink(ctx, db), @@ -208,32 +188,31 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest) svc.cronJobs = cronSvc - svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc) + svc.controllerListListeners = append(svc.controllerListListeners, cronSvc) - parallelJobs := []jobConfig{ - {svc.syncRoutes, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, - {svc.heartbeatController, backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, optional.Some[backoff.Backoff](backoff.Backoff{Min: time.Second * 2, Max: time.Second * 2})}, - {svc.updateControllersList, backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, - // This should only run on one controller, but because dead controllers - // might be selected by the hash ring, we have to run it on all controllers. - // We should use a DB lock at some point. - {svc.reapStaleControllers, backoff.Backoff{Min: time.Second * 20, Max: time.Second * 20}, optional.None[backoff.Backoff]()}, + // Use min, max backoff if we are running in production, otherwise use develBackoff if available. + maybeDevelBackoff := func(min, max time.Duration, develBackoff ...backoff.Backoff) backoff.Backoff { + if len(develBackoff) > 1 { + panic("too many devel backoffs") + } + if _, devel := runnerScaling.(*localscaling.LocalScaling); devel && len(develBackoff) == 1 { + return develBackoff[0] + } + return makeBackoff(min, max) } - singletonJobs := []jobConfig{ - {svc.reapStaleRunners, backoff.Backoff{Min: time.Second, Max: time.Second * 10}, optional.None[backoff.Backoff]()}, - {svc.releaseExpiredReservations, backoff.Backoff{Min: time.Second, Max: time.Second * 20}, optional.None[backoff.Backoff]()}, - {svc.reconcileDeployments, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, - {svc.reconcileRunners, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()}, - } + // Parallel tasks. + svc.tasks.Parallel(maybeDevelBackoff(time.Second, time.Second*5), svc.syncRoutes) + svc.tasks.Parallel(maybeDevelBackoff(time.Second*3, time.Second*5, makeBackoff(time.Second*2, time.Second*2)), svc.heartbeatController) + svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*5), svc.updateControllersList) - _, devel := runnerScaling.(*localscaling.LocalScaling) - for _, j := range parallelJobs { - svc.tasks.Parallel(j.getBackoff(devel), j.job) - } - for _, j := range singletonJobs { - svc.tasks.Singleton(j.getBackoff(devel), j.job) - } + // Singleton tasks use leases to only run on a single controller. + svc.tasks.Singleton(maybeDevelBackoff(time.Second*20, time.Second*20), svc.reapStaleControllers) + svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*10), svc.reapStaleRunners) + svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*20), svc.releaseExpiredReservations) + svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.reconcileDeployments) + svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.reconcileRunners) + svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.expireStaleLeases) return svc, nil } @@ -1006,6 +985,14 @@ func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) { return time.Second, nil } +func (s *Service) expireStaleLeases(ctx context.Context) (time.Duration, error) { + err := s.dal.ExpireLeases(ctx) + if err != nil { + return 0, fmt.Errorf("failed to expire leases: %w", err) + } + return time.Second * 1, nil +} + func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentKey) (bool, error) { runners, err := s.dal.GetRunnersForDeployment(ctx, key) if err != nil { @@ -1312,3 +1299,12 @@ func ingressPathString(path []*schemapb.IngressPathComponent) string { } return "/" + strings.Join(pathString, "/") } + +func makeBackoff(min, max time.Duration) backoff.Backoff { + return backoff.Backoff{ + Min: min, + Max: max, + Jitter: true, + Factor: 2, + } +} diff --git a/backend/controller/dal/lease.go b/backend/controller/dal/lease.go new file mode 100644 index 0000000000..14acbfe965 --- /dev/null +++ b/backend/controller/dal/lease.go @@ -0,0 +1,100 @@ +package dal + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/internal/log" +) + +const leaseRenewalInterval = time.Second * 2 + +var _ leases.Leaser = (*DAL)(nil) + +// Lease represents a lease that is held by a controller. +type Lease struct { + idempotencyKey uuid.UUID + context any + key leases.Key + db *sql.DB + ttl time.Duration + errch chan error + release chan bool + leak bool // For testing. +} + +func (l *Lease) String() string { + return fmt.Sprintf("%s:%s", l.key, l.idempotencyKey) +} + +// Periodically renew the lease until it is released. +func (l *Lease) renew(ctx context.Context) { + defer close(l.errch) + logger := log.FromContext(ctx).Scope("lease(" + l.key.String() + ")") + logger.Debugf("Acquired lease %s", l.key) + for { + select { + case <-time.After(leaseRenewalInterval): + logger.Tracef("Renewing lease %s", l.key) + ctx, cancel := context.WithTimeout(ctx, leaseRenewalInterval) + _, err := l.db.RenewLease(ctx, l.ttl, l.idempotencyKey, l.key) + cancel() + + if err != nil { + logger.Errorf(err, "Failed to renew lease %s", l.key) + l.errch <- translatePGError(err) + return + } + + case <-l.release: + if l.leak { // For testing. + return + } + logger.Debugf("Releasing lease %s", l.key) + _, err := l.db.ReleaseLease(ctx, l.idempotencyKey, l.key) + l.errch <- translatePGError(err) + return + } + } +} + +func (l *Lease) Release() error { + close(l.release) + return <-l.errch +} + +func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duration) (leases.Lease, error) { + if ttl < time.Second*5 { + return nil, fmt.Errorf("lease TTL must be at least 5 seconds") + } + idempotencyKey, err := d.db.NewLease(ctx, key, time.Now().Add(ttl)) + if err != nil { + return nil, translatePGError(err) + } + lease := &Lease{ + idempotencyKey: idempotencyKey, + context: nil, + key: key, + db: d.db, + ttl: ttl, + release: make(chan bool), + errch: make(chan error, 1), + } + go lease.renew(ctx) + return lease, nil +} + +// ExpireLeases expires (deletes) all leases that have expired. +func (d *DAL) ExpireLeases(ctx context.Context) error { + count, err := d.db.ExpireLeases(ctx) + // TODO: Return and log the actual lease keys? + if count > 0 { + log.FromContext(ctx).Warnf("Expired %d leases", count) + } + return translatePGError(err) +} diff --git a/backend/controller/dal/lease_test.go b/backend/controller/dal/lease_test.go new file mode 100644 index 0000000000..ca4e76a87a --- /dev/null +++ b/backend/controller/dal/lease_test.go @@ -0,0 +1,94 @@ +package dal + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/google/uuid" + + "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/sql" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" + "github.com/TBD54566975/ftl/internal/log" +) + +func leaseExists(t *testing.T, conn sql.DBI, idempotencyKey uuid.UUID, key leases.Key) bool { + t.Helper() + var count int + err := translatePGError(conn. + QueryRow(context.Background(), "SELECT COUNT(*) FROM leases WHERE idempotency_key = $1 AND key = $2", idempotencyKey, key). + Scan(&count)) + if errors.Is(err, ErrNotFound) { + return false + } + assert.NoError(t, err) + return count > 0 +} + +func TestLease(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + conn := sqltest.OpenForTesting(ctx, t) + dal, err := New(ctx, conn) + assert.NoError(t, err) + + _, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*1) + assert.Error(t, err) + + leasei, err := dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5) + assert.NoError(t, err) + + lease := leasei.(*Lease) //nolint:forcetypeassert + + // Try to acquire the same lease again, which should fail. + _, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5) + assert.IsError(t, err, ErrConflict) + + time.Sleep(time.Second * 6) + + assert.True(t, leaseExists(t, conn, lease.idempotencyKey, lease.key)) + + err = lease.Release() + assert.NoError(t, err) + + assert.False(t, leaseExists(t, conn, lease.idempotencyKey, lease.key)) +} + +func TestExpireLeases(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + conn := sqltest.OpenForTesting(ctx, t) + dal, err := New(ctx, conn) + assert.NoError(t, err) + + leasei, err := dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5) + assert.NoError(t, err) + + lease := leasei.(*Lease) //nolint:forcetypeassert + + err = dal.ExpireLeases(ctx) + assert.NoError(t, err) + + assert.True(t, leaseExists(t, conn, lease.idempotencyKey, lease.key)) + + // Pretend that the lease expired. + lease.leak = true + err = lease.Release() + assert.NoError(t, err) + + assert.True(t, leaseExists(t, conn, lease.idempotencyKey, lease.key)) + + time.Sleep(time.Second * 6) + + err = dal.ExpireLeases(ctx) + assert.NoError(t, err) + + assert.False(t, leaseExists(t, conn, lease.idempotencyKey, lease.key)) + + leasei, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5) + assert.NoError(t, err) + + err = leasei.Release() + assert.NoError(t, err) +} diff --git a/backend/controller/leases/fake_lease.go b/backend/controller/leases/fake_lease.go new file mode 100644 index 0000000000..f25a9b79ec --- /dev/null +++ b/backend/controller/leases/fake_lease.go @@ -0,0 +1,40 @@ +package leases + +import ( + "context" + "time" + + "github.com/puzpuzpuz/xsync/v3" +) + +func NewFakeLeaser() *FakeLeaser { + return &FakeLeaser{ + leases: xsync.NewMapOf[string, struct{}](), + } +} + +var _ Leaser = (*FakeLeaser)(nil) + +// FakeLeaser is a fake implementation of the [Leaser] interface. +type FakeLeaser struct { + leases *xsync.MapOf[string, struct{}] +} + +func (f *FakeLeaser) AcquireLease(ctx context.Context, key Key, ttl time.Duration) (Lease, error) { + if _, loaded := f.leases.LoadOrStore(key.String(), struct{}{}); loaded { + return nil, ErrConflict + } + return &FakeLease{leaser: f, key: key}, nil +} + +type FakeLease struct { + leaser *FakeLeaser + key Key +} + +func (f *FakeLease) Release() error { + f.leaser.leases.Delete(f.key.String()) + return nil +} + +func (f *FakeLease) String() string { return f.key.String() } diff --git a/backend/controller/leases/fake_lease_test.go b/backend/controller/leases/fake_lease_test.go new file mode 100644 index 0000000000..d717a93a80 --- /dev/null +++ b/backend/controller/leases/fake_lease_test.go @@ -0,0 +1,27 @@ +package leases + +import ( + "context" + "testing" + "time" + + "github.com/alecthomas/assert/v2" +) + +func TestFakeLease(t *testing.T) { + leaser := NewFakeLeaser() + + lease1, err := leaser.AcquireLease(context.Background(), SystemKey("test"), time.Second) + assert.NoError(t, err) + + _, err = leaser.AcquireLease(context.Background(), SystemKey("test"), time.Second) + assert.IsError(t, err, ErrConflict) + + err = lease1.Release() + assert.NoError(t, err) + + lease2, err := leaser.AcquireLease(context.Background(), SystemKey("test"), time.Second) + assert.NoError(t, err) + err = lease2.Release() + assert.NoError(t, err) +} diff --git a/backend/controller/leases/key.go b/backend/controller/leases/key.go new file mode 100644 index 0000000000..3ce9a4a39d --- /dev/null +++ b/backend/controller/leases/key.go @@ -0,0 +1,73 @@ +package leases + +import ( + "database/sql" + "database/sql/driver" + "fmt" + "net/url" + "strings" +) + +// Key is a unique identifier for a lease. +// +// It is a / separated list of strings where each element is URL-path-escaped. +// +// Userspace leases are always in the form "/module//..." (eg. +// "/module/idv/user/bob"). Internal leases are always in the form "/system/..." +// (eg. "/system/runner/deployment-reservation/= 5 && singletonCount.Load() < 10, "expected singletonCount to be >= 5 but was %d", singletonCount.Load()) assert.True(t, multiCount.Load() >= 20 && multiCount.Load() < 30, "expected multiCount to be >= 20 but was %d", multiCount.Load()) diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 5687d9abb1..f77639d98f 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -10,9 +10,11 @@ import ( "fmt" "time" + "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" + "github.com/google/uuid" ) type ControllerState string @@ -297,6 +299,14 @@ type IngressRoute struct { Verb string } +type Lease struct { + ID int64 + IdempotencyKey uuid.UUID + Key leases.Key + CreatedAt time.Time + ExpiresAt time.Time +} + type Module struct { ID int64 Language string diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index f212d3dae3..5bb81fd023 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -8,8 +8,10 @@ import ( "context" "time" + "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" + "github.com/google/uuid" ) type Querier interface { @@ -22,6 +24,7 @@ type Querier interface { CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) EndCronJob(ctx context.Context, nextExecution time.Time, key model.CronJobKey, startTime time.Time) (EndCronJobRow, error) + ExpireLeases(ctx context.Context) (int64, error) ExpireRunnerReservations(ctx context.Context) (int64, error) GetActiveControllers(ctx context.Context) ([]Controller, error) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) @@ -62,6 +65,9 @@ type Querier interface { // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error) KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) + NewLease(ctx context.Context, key leases.Key, expiresAt time.Time) (uuid.UUID, error) + ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) + RenewLease(ctx context.Context, expiresIn time.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) ReplaceDeployment(ctx context.Context, oldDeployment model.DeploymentKey, newDeployment model.DeploymentKey, minReplicas int32) (int64, error) // Find an idle runner and reserve it for the given deployment. ReserveRunner(ctx context.Context, reservationTimeout time.Time, deploymentKey model.DeploymentKey, labels []byte) (Runner, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 7df0afcf08..c91460e771 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -307,8 +307,8 @@ WITH updates AS ( AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ RETURNING id, key, state, start_time, next_execution) SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, - COALESCE(u.start_time, j.start_time) as start_time, - COALESCE(u.next_execution, j.next_execution) as next_execution, + COALESCE(u.start_time, j.start_time) as start_time, + COALESCE(u.next_execution, j.next_execution) as next_execution, COALESCE(u.state, j.state) as state, d.min_replicas > 0 as has_min_replicas, CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated @@ -458,3 +458,28 @@ INSERT INTO events (deployment_id, request_id, type, payload) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id; + +-- name: NewLease :one +INSERT INTO leases (idempotency_key, key, expires_at) +VALUES (gen_random_uuid(), @key, @expires_at::timestamptz) +RETURNING idempotency_key; + +-- name: RenewLease :one +UPDATE leases +SET expires_at = (NOW() AT TIME ZONE 'utc') + @expires_in::interval +WHERE idempotency_key = @idempotency_key AND key = @key +RETURNING true; + +-- name: ReleaseLease :one +DELETE FROM leases +WHERE idempotency_key = @idempotency_key AND key = @key +RETURNING true; + +-- name: ExpireLeases :one +WITH expired AS ( + DELETE FROM leases + WHERE expires_at < NOW() AT TIME ZONE 'utc' + RETURNING 1 +) +SELECT COUNT(*) +FROM expired; \ No newline at end of file diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 45285ce029..ea436c582d 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -10,9 +10,11 @@ import ( "encoding/json" "time" + "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" + "github.com/google/uuid" ) const associateArtefactWithDeployment = `-- name: AssociateArtefactWithDeployment :exec @@ -191,6 +193,23 @@ func (q *Queries) EndCronJob(ctx context.Context, nextExecution time.Time, key m return i, err } +const expireLeases = `-- name: ExpireLeases :one +WITH expired AS ( + DELETE FROM leases + WHERE expires_at < NOW() AT TIME ZONE 'utc' + RETURNING 1 +) +SELECT COUNT(*) +FROM expired +` + +func (q *Queries) ExpireLeases(ctx context.Context) (int64, error) { + row := q.db.QueryRow(ctx, expireLeases) + var count int64 + err := row.Scan(&count) + return count, err +} + const expireRunnerReservations = `-- name: ExpireRunnerReservations :one WITH rows AS ( UPDATE runners @@ -1431,6 +1450,46 @@ func (q *Queries) KillStaleRunners(ctx context.Context, timeout time.Duration) ( return count, err } +const newLease = `-- name: NewLease :one +INSERT INTO leases (idempotency_key, key, expires_at) +VALUES (gen_random_uuid(), $1, $2::timestamptz) +RETURNING idempotency_key +` + +func (q *Queries) NewLease(ctx context.Context, key leases.Key, expiresAt time.Time) (uuid.UUID, error) { + row := q.db.QueryRow(ctx, newLease, key, expiresAt) + var idempotency_key uuid.UUID + err := row.Scan(&idempotency_key) + return idempotency_key, err +} + +const releaseLease = `-- name: ReleaseLease :one +DELETE FROM leases +WHERE idempotency_key = $1 AND key = $2 +RETURNING true +` + +func (q *Queries) ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) { + row := q.db.QueryRow(ctx, releaseLease, idempotencyKey, key) + var column_1 bool + err := row.Scan(&column_1) + return column_1, err +} + +const renewLease = `-- name: RenewLease :one +UPDATE leases +SET expires_at = (NOW() AT TIME ZONE 'utc') + $1::interval +WHERE idempotency_key = $2 AND key = $3 +RETURNING true +` + +func (q *Queries) RenewLease(ctx context.Context, expiresIn time.Duration, idempotencyKey uuid.UUID, key leases.Key) (bool, error) { + row := q.db.QueryRow(ctx, renewLease, expiresIn, idempotencyKey, key) + var column_1 bool + err := row.Scan(&column_1) + return column_1, err +} + const replaceDeployment = `-- name: ReplaceDeployment :one WITH update_container AS ( UPDATE deployments AS d @@ -1511,8 +1570,8 @@ WITH updates AS ( AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ RETURNING id, key, state, start_time, next_execution) SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, - COALESCE(u.start_time, j.start_time) as start_time, - COALESCE(u.next_execution, j.next_execution) as next_execution, + COALESCE(u.start_time, j.start_time) as start_time, + COALESCE(u.next_execution, j.next_execution) as next_execution, COALESCE(u.state, j.state) as state, d.min_replicas > 0 as has_min_replicas, CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index 82bdef4c51..48787de259 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -36,14 +36,14 @@ $$ LANGUAGE plpgsql; CREATE TABLE modules ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - language VARCHAR NOT NULL, - name VARCHAR UNIQUE NOT NULL + language TEXT NOT NULL, + name TEXT UNIQUE NOT NULL ); -- Proto-encoded module schema. CREATE DOMAIN module_schema_pb AS BYTEA; -CREATE DOMAIN deployment_key AS VARCHAR; +CREATE DOMAIN deployment_key AS TEXT; CREATE TABLE deployments ( @@ -88,7 +88,7 @@ CREATE TABLE deployment_artefacts created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), executable BOOLEAN NOT NULL, -- Path relative to the module root. - path VARCHAR NOT NULL + path TEXT NOT NULL ); CREATE INDEX deployment_artefacts_deployment_id_idx ON deployment_artefacts (deployment_id); @@ -104,7 +104,7 @@ CREATE TYPE runner_state AS ENUM ( 'dead' ); -CREATE DOMAIN runner_key AS VARCHAR; +CREATE DOMAIN runner_key AS TEXT; -- Runners are processes that are available to run modules. CREATE TABLE runners @@ -117,9 +117,9 @@ CREATE TABLE runners -- If the runner is reserved, this is the time at which the reservation expires. reservation_timeout TIMESTAMPTZ, state runner_state NOT NULL DEFAULT 'idle', - endpoint VARCHAR NOT NULL, + endpoint TEXT NOT NULL, -- Some denormalisation for performance. Without this we need to do a two table join. - module_name VARCHAR, + module_name TEXT, deployment_id BIGINT REFERENCES deployments (id) ON DELETE SET NULL, labels JSONB NOT NULL DEFAULT '{}' ); @@ -175,13 +175,13 @@ CREATE INDEX runners_labels_idx ON runners USING GIN (labels); CREATE TABLE ingress_routes ( - method VARCHAR NOT NULL, - path VARCHAR NOT NULL, + method TEXT NOT NULL, + path TEXT NOT NULL, -- The deployment that should handle this route. deployment_id BIGINT NOT NULL REFERENCES deployments (id) ON DELETE CASCADE, -- Duplicated here to avoid having to join from this to deployments then modules. - module VARCHAR NOT NULL, - verb VARCHAR NOT NULL + module TEXT NOT NULL, + verb TEXT NOT NULL ); CREATE INDEX ingress_routes_method_path_idx ON ingress_routes (method, path); @@ -193,7 +193,7 @@ CREATE TYPE origin AS ENUM ( 'pubsub' ); -CREATE DOMAIN request_key AS VARCHAR; +CREATE DOMAIN request_key AS TEXT; -- Requests originating from outside modules, either from external sources or from -- events within the system. @@ -207,7 +207,7 @@ CREATE TABLE requests -- cron: cron-- (eg. cron-poll-news-sources-) -- pubsub: pubsub-- (eg. pubsub-articles-) "key" request_key UNIQUE NOT NULL, - source_addr VARCHAR NOT NULL + source_addr TEXT NOT NULL ); CREATE INDEX requests_origin_idx ON requests (origin); @@ -218,7 +218,7 @@ CREATE TYPE controller_state AS ENUM ( 'dead' ); -CREATE DOMAIN controller_key AS VARCHAR; +CREATE DOMAIN controller_key AS TEXT; CREATE TABLE controller ( @@ -227,7 +227,7 @@ CREATE TABLE controller created TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), last_seen TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), state controller_state NOT NULL DEFAULT 'live', - endpoint VARCHAR NOT NULL + endpoint TEXT NOT NULL ); CREATE UNIQUE INDEX controller_endpoint_not_dead_idx ON controller (endpoint) WHERE state <> 'dead'; @@ -237,21 +237,21 @@ CREATE TYPE cron_job_state AS ENUM ( 'executing' ); -CREATE DOMAIN cron_job_key AS VARCHAR; +CREATE DOMAIN cron_job_key AS TEXT; CREATE TABLE cron_jobs ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, key cron_job_key UNIQUE NOT NULL, deployment_id BIGINT NOT NULL REFERENCES deployments (id) ON DELETE CASCADE, - verb VARCHAR NOT NULL, - schedule VARCHAR NOT NULL, + verb TEXT NOT NULL, + schedule TEXT NOT NULL, start_time TIMESTAMPTZ NOT NULL, next_execution TIMESTAMPTZ NOT NULL, state cron_job_state NOT NULL DEFAULT 'idle', -- Some denormalisation for performance. Without this we need to do a two table join. - module_name VARCHAR NOT NULL + module_name TEXT NOT NULL ); CREATE INDEX cron_jobs_executing_start_time_idx ON cron_jobs (start_time) WHERE state = 'executing'; @@ -275,10 +275,10 @@ CREATE TABLE events type event_type NOT NULL, -- Type-specific keys used to index events for searching. - custom_key_1 VARCHAR NULL, - custom_key_2 VARCHAR NULL, - custom_key_3 VARCHAR NULL, - custom_key_4 VARCHAR NULL, + custom_key_1 TEXT NULL, + custom_key_2 TEXT NULL, + custom_key_3 TEXT NULL, + custom_key_4 TEXT NULL, payload JSONB NOT NULL ); @@ -292,7 +292,7 @@ CREATE INDEX events_custom_key_2_idx ON events (custom_key_2); CREATE INDEX events_custom_key_3_idx ON events (custom_key_3); CREATE INDEX events_custom_key_4_idx ON events (custom_key_4); -CREATE DOMAIN topic_key AS VARCHAR; +CREATE DOMAIN topic_key AS TEXT; -- Topics are a way to asynchronously publish data between modules. CREATE TABLE topics ( @@ -304,10 +304,10 @@ CREATE TABLE topics ( module_id BIGINT NOT NULL REFERENCES modules(id), -- Name of the topic. - name VARCHAR NOT NULL, + name TEXT NOT NULL, -- Data reference to the payload data type in the owning module's schema. - type VARCHAR NOT NULL + type TEXT NOT NULL ); CREATE UNIQUE INDEX topics_module_name_idx ON topics(module_id, name); @@ -334,7 +334,7 @@ CREATE TRIGGER topic_events_notify_event FOR EACH ROW EXECUTE PROCEDURE notify_event(); -CREATE DOMAIN subscription_key AS VARCHAR; +CREATE DOMAIN subscription_key AS TEXT; -- A subscription to a topic. -- @@ -347,13 +347,13 @@ CREATE TABLE topic_subscriptions ( topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE, -- Name of the subscription. - name VARCHAR UNIQUE NOT NULL, + name TEXT UNIQUE NOT NULL, -- Cursor pointing into the topic_events table. cursor BIGINT NOT NULL REFERENCES topic_events(id) ON DELETE CASCADE ); -CREATE DOMAIN subscriber_key AS VARCHAR; +CREATE DOMAIN subscriber_key AS TEXT; -- A subscriber to a topic. -- @@ -367,7 +367,17 @@ CREATE TABLE topic_subscribers ( deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, -- Name of the verb to call on the deployment. - verb VARCHAR NOT NULL + verb TEXT NOT NULL ); --- migrate:down +CREATE TABLE leases ( + id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + idempotency_key UUID UNIQUE NOT NULL, + key VARCHAR UNIQUE NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + expires_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc') +); + +CREATE INDEX leases_expires_at_idx ON leases (expires_at); + +-- migrate:down \ No newline at end of file diff --git a/backend/controller/sql/types.go b/backend/controller/sql/types.go index 9c23751131..49b1c9fdc0 100644 --- a/backend/controller/sql/types.go +++ b/backend/controller/sql/types.go @@ -6,10 +6,14 @@ import ( "time" "github.com/alecthomas/types/optional" + "github.com/google/uuid" + "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/internal/model" ) +type NullUUID = optional.Option[uuid.UUID] +type NullLeaseKey = optional.Option[leases.Key] type NullTime = optional.Option[time.Time] type NullDuration = optional.Option[time.Duration] type NullRunnerKey = optional.Option[model.RunnerKey] diff --git a/backend/schema/ref.go b/backend/schema/ref.go index e345c3e360..36e4c9869e 100644 --- a/backend/schema/ref.go +++ b/backend/schema/ref.go @@ -23,6 +23,8 @@ type RefKey struct { Name string } +func (r RefKey) String() string { return makeRef(r.Module, r.Name) } + func (r Ref) ToRefKey() RefKey { return RefKey{Module: r.Module, Name: r.Name} } diff --git a/sqlc.yaml b/sqlc.yaml index 48891026de..acf3788887 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -13,6 +13,12 @@ sql: emit_interface: true query_parameter_limit: 3 overrides: + - db_type: "uuid" + go_type: "github.com/google/uuid.UUID" + - db_type: "uuid" + nullable: true + go_type: + type: "NullUUID" - db_type: "timestamptz" go_type: "time.Time" - db_type: "pg_catalog.interval" @@ -81,6 +87,12 @@ sql: go_type: "github.com/TBD54566975/ftl/internal/model.DeploymentKey" - column: "events.payload" go_type: "encoding/json.RawMessage" + - column: "leases.key" + go_type: "github.com/TBD54566975/ftl/backend/controller/leases.Key" + - column: "leases.key" + nullable: true + go_type: + type: "NullLeaseKey" rules: - sqlc/db-prepare # - postgresql-query-too-costly