diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 1dd9dbcce9..faf24eb8ce 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -253,7 +253,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service svc.routes.Store(map[string][]dal.Route{}) svc.schema.Store(&schema.Schema{}) - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, conn) + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryptionSrv, conn) svc.cronJobs = cronSvc pubSub := pubsub.New(ctx, db, svc.tasks, svc) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index d36884505d..e586c144c9 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -10,9 +10,11 @@ import ( "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" parentdal "github.com/TBD54566975/ftl/backend/controller/dal" + encryptionsvc "github.com/TBD54566975/ftl/backend/controller/encryption" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/cron" + "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -21,18 +23,20 @@ type Service struct { key model.ControllerKey requestSource string dal dal.DAL + encryption *encryptionsvc.Service clock clock.Clock } -func New(ctx context.Context, key model.ControllerKey, requestSource string, conn *sql.DB) *Service { - return NewForTesting(ctx, key, requestSource, *dal.New(conn), clock.New()) +func New(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, conn *sql.DB) *Service { + return NewForTesting(ctx, key, requestSource, encryption, *dal.New(conn), clock.New()) } -func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, dal dal.DAL, clock clock.Clock) *Service { +func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, encryption *encryptionsvc.Service, dal dal.DAL, clock clock.Clock) *Service { svc := &Service{ key: key, requestSource: requestSource, dal: dal, + encryption: encryption, clock: clock, } return svc @@ -174,11 +178,16 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob) origin := &parentdal.AsyncOriginCron{CronJobKey: job.Key} + var request encryption.EncryptedColumn[encryption.AsyncSubKey] + err = s.encryption.Encrypt([]byte(`{}`), &request) + if err != nil { + return fmt.Errorf("failed to encrypt request for job %q: %w", job.Key, err) + } id, err := tx.CreateAsyncCall(ctx, dal.CreateAsyncCallParams{ ScheduledAt: nextAttemptForJob, Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}, Origin: origin.String(), - Request: []byte(`{}`), + Request: request, }) if err != nil { return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err) diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index ea5dce74ac..784ea21e22 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -24,7 +24,6 @@ import ( ) func TestNewCronJobsForModule(t *testing.T) { - t.Parallel() ctx := log.ContextWithNewDefaultLogger(context.Background()) ctx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) @@ -52,7 +51,7 @@ func TestNewCronJobsForModule(t *testing.T) { // Progress so that start_time is valid clk.Add(time.Second) - cjs := NewForTesting(ctx, key, "test.com", *dal, clk) + cjs := NewForTesting(ctx, key, "test.com", encryption, *dal, clk) // All jobs need to be scheduled expectUnscheduledJobs(t, dal, clk, 2) unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now()) @@ -70,8 +69,8 @@ func TestNewCronJobsForModule(t *testing.T) { for _, job := range jobsToCreate { j, err := dal.GetCronJobByKey(ctx, job.Key) assert.NoError(t, err) - assert.Equal(t, job.StartTime, j.StartTime) - assert.Equal(t, j.NextExecution, clk.Now().Add(time.Second)) + assert.Equal(t, j.StartTime, job.StartTime) + assert.Equal(t, clk.Now().Add(time.Second), j.NextExecution) p, err := dal.IsCronJobPending(ctx, job.Key, job.StartTime) assert.NoError(t, err) @@ -82,10 +81,10 @@ func TestNewCronJobsForModule(t *testing.T) { for i, job := range jobsToCreate { call, _, err := parentDAL.AcquireAsyncCall(ctx) assert.NoError(t, err) - assert.Equal(t, call.Verb, job.Verb.ToRefKey()) - assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key)) - assert.Equal(t, call.Request, []byte("{}")) - assert.Equal(t, call.QueueDepth, int64(len(jobsToCreate)-i)) // widdling down queue + assert.Equal(t, job.Verb.ToRefKey(), call.Verb) + assert.Equal(t, fmt.Sprintf("cron:%s", job.Key), call.Origin.String()) + assert.Equal(t, []byte("{}"), call.Request) + assert.Equal(t, int64(len(jobsToCreate)-i), call.QueueDepth) // widdling down queue p, err := dal.IsCronJobPending(ctx, job.Key, job.StartTime) assert.NoError(t, err) @@ -116,10 +115,10 @@ func TestNewCronJobsForModule(t *testing.T) { for i, job := range jobsToCreate { call, _, err := parentDAL.AcquireAsyncCall(ctx) assert.NoError(t, err) - assert.Equal(t, call.Verb, job.Verb.ToRefKey()) - assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key)) - assert.Equal(t, call.Request, []byte("{}")) - assert.Equal(t, call.QueueDepth, int64(len(jobsToCreate)-i)) // widdling down queue + assert.Equal(t, job.Verb.ToRefKey(), call.Verb) + assert.Equal(t, fmt.Sprintf("cron:%s", job.Key), call.Origin.String()) + assert.Equal(t, []byte("{}"), call.Request) + assert.Equal(t, int64(len(jobsToCreate)-i), call.QueueDepth) // widdling down queue assert.Equal(t, call.ScheduledAt, clk.Now()) diff --git a/backend/controller/cronjobs/dal/internal/sql/models.go b/backend/controller/cronjobs/dal/internal/sql/models.go index f6e6518d41..aca898349d 100644 --- a/backend/controller/cronjobs/dal/internal/sql/models.go +++ b/backend/controller/cronjobs/dal/internal/sql/models.go @@ -414,7 +414,7 @@ type FsmNextEvent struct { CreatedAt time.Time FsmInstanceID int64 NextState schema.RefKey - Request []byte + Request encryption.EncryptedAsyncColumn RequestType sqltypes.Type } @@ -505,7 +505,7 @@ type TopicEvent struct { CreatedAt time.Time Key model.TopicEventKey TopicID int64 - Payload []byte + Payload encryption.EncryptedAsyncColumn Caller optional.Option[string] RequestKey optional.Option[string] TraceContext pqtype.NullRawMessage diff --git a/backend/controller/dal/fsm.go b/backend/controller/dal/fsm.go index 670fe409a8..1007674556 100644 --- a/backend/controller/dal/fsm.go +++ b/backend/controller/dal/fsm.go @@ -34,7 +34,7 @@ import ( func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, destinationState schema.RefKey, request []byte, encrypted bool, retryParams schema.RetryParams) (err error) { var encryptedRequest encryption.EncryptedAsyncColumn if encrypted { - encryptedRequest = encryption.EncryptedAsyncColumn(request) + encryptedRequest.Set(request) } else { err = d.encryption.Encrypt(request, &encryptedRequest) if err != nil { @@ -139,9 +139,16 @@ func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKe } return optional.None[NextFSMEvent](), err } + + var decryptedRequest json.RawMessage + err = d.encryption.DecryptJSON(&next.Request, &decryptedRequest) + if err != nil { + return optional.None[NextFSMEvent](), fmt.Errorf("failed to decrypt FSM request: %w", err) + } + return optional.Some(NextFSMEvent{ DestinationState: next.NextState, - Request: next.Request, + Request: decryptedRequest, RequestType: next.RequestType, }), nil } diff --git a/backend/controller/dal/internal/sql/models.go b/backend/controller/dal/internal/sql/models.go index f6e6518d41..aca898349d 100644 --- a/backend/controller/dal/internal/sql/models.go +++ b/backend/controller/dal/internal/sql/models.go @@ -414,7 +414,7 @@ type FsmNextEvent struct { CreatedAt time.Time FsmInstanceID int64 NextState schema.RefKey - Request []byte + Request encryption.EncryptedAsyncColumn RequestType sqltypes.Type } @@ -505,7 +505,7 @@ type TopicEvent struct { CreatedAt time.Time Key model.TopicEventKey TopicID int64 - Payload []byte + Payload encryption.EncryptedAsyncColumn Caller optional.Option[string] RequestKey optional.Option[string] TraceContext pqtype.NullRawMessage diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index b6c3f1f1ff..9843b87651 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -1159,7 +1159,7 @@ LIMIT 1 type GetNextEventForSubscriptionRow struct { Event optional.Option[model.TopicEventKey] - Payload []byte + Payload encryption.OptionalEncryptedAsyncColumn CreatedAt optional.Option[time.Time] Caller optional.Option[string] RequestKey optional.Option[string] @@ -2127,7 +2127,7 @@ type PublishEventForTopicParams struct { Module string Topic string Caller string - Payload []byte + Payload encryption.EncryptedAsyncColumn RequestKey string TraceContext json.RawMessage } @@ -2172,7 +2172,7 @@ type SetNextFSMEventParams struct { Fsm schema.RefKey InstanceKey string Event schema.RefKey - Request []byte + Request encryption.EncryptedAsyncColumn RequestType sqltypes.Type } diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 447735c42b..d117605d16 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -98,6 +98,11 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]()) return 0, fmt.Errorf("failed to get next cursor: %w", libdal.TranslatePGError(err)) } + payload, ok := nextCursor.Payload.Get() + if !ok { + observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Payload.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]()) + return 0, fmt.Errorf("could not find payload to progress subscription: %w", libdal.TranslatePGError(err)) + } nextCursorKey, ok := nextCursor.Event.Get() if !ok { observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]()) @@ -131,7 +136,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t ScheduledAt: time.Now(), Verb: subscriber.Sink, Origin: origin.String(), - Request: nextCursor.Payload, // already encrypted + Request: payload, // already encrypted RemainingAttempts: subscriber.RetryAttempts, Backoff: subscriber.Backoff, MaxBackoff: subscriber.MaxBackoff, diff --git a/backend/controller/encryption/dal/dal.go b/backend/controller/encryption/dal/dal.go index 7574b6e048..02fc564c1a 100644 --- a/backend/controller/encryption/dal/dal.go +++ b/backend/controller/encryption/dal/dal.go @@ -83,18 +83,18 @@ func (d *DAL) VerifyEncryptor(ctx context.Context, encryptor encryption.DataEncr if err != nil { return fmt.Errorf("failed to verify timeline subkey: %w", err) } - if newTimeline != nil { + if newTimeline.Ok() { needsUpdate = true - row.VerifyTimeline = optional.Some(newTimeline) + row.VerifyTimeline = newTimeline } newAsync, err := verifySubkey(encryptor, row.VerifyAsync) if err != nil { return fmt.Errorf("failed to verify async subkey: %w", err) } - if newAsync != nil { + if newAsync.Ok() { needsUpdate = true - row.VerifyAsync = optional.Some(newAsync) + row.VerifyAsync = newAsync } if !needsUpdate { @@ -115,25 +115,30 @@ func (d *DAL) VerifyEncryptor(ctx context.Context, encryptor encryption.DataEncr // verifySubkey checks if the subkey is set and if not, sets it to a verification string. // returns (nil, nil) if verified and not changed -func verifySubkey[SK encryption.SubKey](encryptor encryption.DataEncryptor, encrypted optional.Option[encryption.EncryptedColumn[SK]]) (encryption.EncryptedColumn[SK], error) { +func verifySubkey[SK encryption.SubKey]( + encryptor encryption.DataEncryptor, + encrypted optional.Option[encryption.EncryptedColumn[SK]], +) (optional.Option[encryption.EncryptedColumn[SK]], error) { + type EC = encryption.EncryptedColumn[SK] + verifyField, ok := encrypted.Get() if !ok { err := encryptor.Encrypt([]byte(verification), &verifyField) if err != nil { - return nil, fmt.Errorf("failed to encrypt verification sanity string: %w", err) + return optional.None[EC](), fmt.Errorf("failed to encrypt verification sanity string: %w", err) } - return verifyField, nil + return optional.Some(verifyField), nil } decrypted, err := encryptor.Decrypt(&verifyField) if err != nil { - return nil, fmt.Errorf("failed to decrypt verification sanity string: %w", err) + return optional.None[EC](), fmt.Errorf("failed to decrypt verification sanity string: %w", err) } if string(decrypted) != verification { - return nil, fmt.Errorf("decrypted verification string does not match expected value") + return optional.None[EC](), fmt.Errorf("decrypted verification string does not match expected value") } // verified, no need to update - return nil, nil + return optional.None[EC](), nil } diff --git a/backend/controller/encryption/dal/internal/sql/models.go b/backend/controller/encryption/dal/internal/sql/models.go index f6e6518d41..aca898349d 100644 --- a/backend/controller/encryption/dal/internal/sql/models.go +++ b/backend/controller/encryption/dal/internal/sql/models.go @@ -414,7 +414,7 @@ type FsmNextEvent struct { CreatedAt time.Time FsmInstanceID int64 NextState schema.RefKey - Request []byte + Request encryption.EncryptedAsyncColumn RequestType sqltypes.Type } @@ -505,7 +505,7 @@ type TopicEvent struct { CreatedAt time.Time Key model.TopicEventKey TopicID int64 - Payload []byte + Payload encryption.EncryptedAsyncColumn Caller optional.Option[string] RequestKey optional.Option[string] TraceContext pqtype.NullRawMessage diff --git a/backend/controller/leases/dal/internal/sql/models.go b/backend/controller/leases/dal/internal/sql/models.go index f6e6518d41..aca898349d 100644 --- a/backend/controller/leases/dal/internal/sql/models.go +++ b/backend/controller/leases/dal/internal/sql/models.go @@ -414,7 +414,7 @@ type FsmNextEvent struct { CreatedAt time.Time FsmInstanceID int64 NextState schema.RefKey - Request []byte + Request encryption.EncryptedAsyncColumn RequestType sqltypes.Type } @@ -505,7 +505,7 @@ type TopicEvent struct { CreatedAt time.Time Key model.TopicEventKey TopicID int64 - Payload []byte + Payload encryption.EncryptedAsyncColumn Caller optional.Option[string] RequestKey optional.Option[string] TraceContext pqtype.NullRawMessage diff --git a/backend/controller/sql/schema/20240913035022_encrypted_fsm_next_request.sql b/backend/controller/sql/schema/20240913035022_encrypted_fsm_next_request.sql new file mode 100644 index 0000000000..903b4c032b --- /dev/null +++ b/backend/controller/sql/schema/20240913035022_encrypted_fsm_next_request.sql @@ -0,0 +1,6 @@ +-- migrate:up + +ALTER TABLE fsm_next_event + ALTER COLUMN request TYPE encrypted_async; + +-- migrate:down diff --git a/backend/controller/sql/schema/20240913041619_encrypted_topic_events_payload.sql b/backend/controller/sql/schema/20240913041619_encrypted_topic_events_payload.sql new file mode 100644 index 0000000000..0bc9c6afdf --- /dev/null +++ b/backend/controller/sql/schema/20240913041619_encrypted_topic_events_payload.sql @@ -0,0 +1,6 @@ +-- migrate:up + +ALTER TABLE topic_events + ALTER COLUMN payload TYPE encrypted_async; + +-- migrate:down diff --git a/internal/configuration/dal/internal/sql/models.go b/internal/configuration/dal/internal/sql/models.go index f6e6518d41..aca898349d 100644 --- a/internal/configuration/dal/internal/sql/models.go +++ b/internal/configuration/dal/internal/sql/models.go @@ -414,7 +414,7 @@ type FsmNextEvent struct { CreatedAt time.Time FsmInstanceID int64 NextState schema.RefKey - Request []byte + Request encryption.EncryptedAsyncColumn RequestType sqltypes.Type } @@ -505,7 +505,7 @@ type TopicEvent struct { CreatedAt time.Time Key model.TopicEventKey TopicID int64 - Payload []byte + Payload encryption.EncryptedAsyncColumn Caller optional.Option[string] RequestKey optional.Option[string] TraceContext pqtype.NullRawMessage diff --git a/internal/encryption/database.go b/internal/encryption/database.go index f51e9c38c4..3d19171d69 100644 --- a/internal/encryption/database.go +++ b/internal/encryption/database.go @@ -13,28 +13,25 @@ var _ Encrypted = &EncryptedColumn[TimelineSubKey]{} // EncryptedColumn is a type that represents an encrypted column. // // It can be used by sqlc to map to/from a bytea column in the database. -type EncryptedColumn[SK SubKey] []byte +type EncryptedColumn[SK SubKey] struct{ data []byte } var _ driver.Valuer = &EncryptedColumn[TimelineSubKey]{} var _ sql.Scanner = &EncryptedColumn[TimelineSubKey]{} -func (e *EncryptedColumn[SK]) SubKey() string { var sk SK; return sk.SubKey() } -func (e *EncryptedColumn[SK]) Bytes() []byte { return *e } -func (e *EncryptedColumn[SK]) Set(b []byte) { *e = b } -func (e *EncryptedColumn[SK]) Value() (driver.Value, error) { - return []byte(*e), nil +func (e *EncryptedColumn[SK]) SubKey() string { var sk SK; return sk.SubKey() } +func (e *EncryptedColumn[SK]) Bytes() []byte { return e.data } +func (e *EncryptedColumn[SK]) Set(b []byte) { e.data = b } +func (e EncryptedColumn[SK]) Value() (driver.Value, error) { return e.data, nil } +func (e *EncryptedColumn[SK]) GoString() string { + return fmt.Sprintf("EncryptedColumn[%s](%d bytes)", e.SubKey(), len(e.data)) } func (e *EncryptedColumn[SK]) Scan(src interface{}) error { - if src == nil { - *e = nil - return nil - } b, ok := src.([]byte) if !ok { return fmt.Errorf("expected []byte, got %T", src) } - *e = b + e.data = b return nil } diff --git a/internal/encryption/encryption.go b/internal/encryption/encryption.go index aafafc387b..93dff3060d 100644 --- a/internal/encryption/encryption.go +++ b/internal/encryption/encryption.go @@ -232,12 +232,12 @@ func (k *KMSEncryptor) getDerivedPrimitive(subKey SubKey) (tink.AEAD, error) { func (k *KMSEncryptor) Encrypt(cleartext []byte, dest Encrypted) error { primitive, err := k.getDerivedPrimitive(dest) if err != nil { - return fmt.Errorf("failed to get derived primitive: %w", err) + return fmt.Errorf("%s: failed to get derived primitive: %w", dest.SubKey(), err) } encrypted, err := primitive.Encrypt(cleartext, nil) if err != nil { - return fmt.Errorf("failed to encrypt: %w", err) + return fmt.Errorf("%s: failed to encrypt: %w", dest.SubKey(), err) } dest.Set(encrypted) @@ -247,12 +247,12 @@ func (k *KMSEncryptor) Encrypt(cleartext []byte, dest Encrypted) error { func (k *KMSEncryptor) Decrypt(encrypted Encrypted) ([]byte, error) { primitive, err := k.getDerivedPrimitive(encrypted) if err != nil { - return nil, fmt.Errorf("failed to get derived primitive: %w", err) + return nil, fmt.Errorf("%s: failed to get derived primitive: %w", encrypted.SubKey(), err) } decrypted, err := primitive.Decrypt(encrypted.Bytes(), nil) if err != nil { - return nil, fmt.Errorf("failed to decrypt: %w", err) + return nil, fmt.Errorf("%s: failed to decrypt: %w", encrypted.SubKey(), err) } return decrypted, nil