diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 1dd9dbcce9..e33fc04d68 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -232,12 +232,12 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service config.ControllerTimeout = time.Second * 5 } - encryptionSrv, err := encryption.New(ctx, conn, ftlencryption.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI))) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI))) if err != nil { return nil, fmt.Errorf("failed to create encryption dal: %w", err) } - db := dal.New(ctx, conn, encryptionSrv) + db := dal.New(ctx, conn, encryption) ldb := leasesdal.New(conn) svc := &Service{ tasks: scheduledtask.New(ctx, key, ldb), @@ -253,7 +253,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service svc.routes.Store(map[string][]dal.Route{}) svc.schema.Store(&schema.Schema{}) - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, conn) + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, conn, encryption) svc.cronJobs = cronSvc pubSub := pubsub.New(ctx, db, svc.tasks, svc) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index d36884505d..1910a9783a 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -10,9 +10,11 @@ import ( "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" parentdal "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/encryption" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/cron" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -21,19 +23,21 @@ type Service struct { key model.ControllerKey requestSource string dal dal.DAL + encryption *encryption.Service clock clock.Clock } -func New(ctx context.Context, key model.ControllerKey, requestSource string, conn *sql.DB) *Service { - return NewForTesting(ctx, key, requestSource, *dal.New(conn), clock.New()) +func New(ctx context.Context, key model.ControllerKey, requestSource string, conn *sql.DB, encryption *encryption.Service) *Service { + return NewForTesting(ctx, key, requestSource, *dal.New(conn, encryption), encryption, clock.New()) } -func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, dal dal.DAL, clock clock.Clock) *Service { +func NewForTesting(ctx context.Context, key model.ControllerKey, requestSource string, dal dal.DAL, encryption *encryption.Service, clock clock.Clock) *Service { svc := &Service{ key: key, requestSource: requestSource, dal: dal, clock: clock, + encryption: encryption, } return svc } @@ -172,13 +176,19 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.Cr nextAttemptForJob = now } + var encryptedResult ftlencryption.EncryptedAsyncColumn + type Empty struct{} + err = s.encryption.EncryptJSON(Empty{}, &encryptedResult) + fmt.Println("TOOOOOOOOOOOODOOOOOOOO") + fmt.Printf("encryptedResult: %v", encryptedResult) + logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob) origin := &parentdal.AsyncOriginCron{CronJobKey: job.Key} id, err := tx.CreateAsyncCall(ctx, dal.CreateAsyncCallParams{ ScheduledAt: nextAttemptForJob, Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}, Origin: origin.String(), - Request: []byte(`{}`), + Request: encryptedResult, }) if err != nil { return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err) diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index ea5dce74ac..f9b7efb94e 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -9,6 +9,7 @@ import ( "github.com/alecthomas/assert/v2" "github.com/alecthomas/types/either" + "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" @@ -34,10 +35,11 @@ func TestNewCronJobsForModule(t *testing.T) { key := model.NewControllerKey("localhost", strconv.Itoa(8080+1)) conn := sqltest.OpenForTesting(ctx, t) - dal := dal.New(conn) - encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) + uri := "fake-kms://CK6YwYkBElQKSAowdHlwZS5nb29nbGVhcGlzLmNvbS9nb29nbGUuY3J5cHRvLnRpbmsuQWVzR2NtS2V5EhIaEJy4TIQgfCuwxA3ZZgChp_wYARABGK6YwYkBIAE" + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder().WithKMSURI(optional.Some(uri))) assert.NoError(t, err) + dal := dal.New(conn, encryption) parentDAL := parentdal.New(ctx, conn, encryption) moduleName := "initial" @@ -52,7 +54,7 @@ func TestNewCronJobsForModule(t *testing.T) { // Progress so that start_time is valid clk.Add(time.Second) - cjs := NewForTesting(ctx, key, "test.com", *dal, clk) + cjs := NewForTesting(ctx, key, "test.com", *dal, encryption, clk) // All jobs need to be scheduled expectUnscheduledJobs(t, dal, clk, 2) unscheduledJobs, err := dal.GetUnscheduledCronJobs(ctx, clk.Now()) @@ -84,6 +86,9 @@ func TestNewCronJobsForModule(t *testing.T) { assert.NoError(t, err) assert.Equal(t, call.Verb, job.Verb.ToRefKey()) assert.Equal(t, call.Origin.String(), fmt.Sprintf("cron:%s", job.Key)) + + // err = encryption.DecryptJSON(&call.Request, &schema.CronJobRequest{}) + // assert.NoError(t, err) assert.Equal(t, call.Request, []byte("{}")) assert.Equal(t, call.QueueDepth, int64(len(jobsToCreate)-i)) // widdling down queue diff --git a/backend/controller/cronjobs/dal/dal.go b/backend/controller/cronjobs/dal/dal.go index ef0fa5bc08..fb7bd254dd 100644 --- a/backend/controller/cronjobs/dal/dal.go +++ b/backend/controller/cronjobs/dal/dal.go @@ -8,6 +8,7 @@ import ( "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal/internal/sql" + "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" @@ -17,16 +18,25 @@ import ( type DAL struct { *libdal.Handle[DAL] - db sql.Querier + db sql.Querier + encryption *encryption.Service } -func New(conn libdal.Connection) *DAL { - return &DAL{ +func New(conn libdal.Connection, encryption *encryption.Service) *DAL { + var d *DAL + d = &DAL{ db: sql.New(conn), Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { - return &DAL{Handle: h, db: sql.New(h.Connection)} + return &DAL{ + Handle: h, + db: sql.New(h.Connection), + encryption: d.encryption, + } }), + encryption: encryption, } + + return d } func cronJobFromRow(c sql.CronJob, d sql.Deployment) model.CronJob { diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 7ba0f4dbe0..9d1d92c4c0 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -139,6 +139,8 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx c return nil, ctx, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err) } + fmt.Println("row.Request", row.Request) + decryptedRequest, err := d.encryption.Decrypt(&row.Request) if err != nil { return nil, ctx, fmt.Errorf("failed to decrypt async call request: %w", err) diff --git a/internal/encryption/encryption.go b/internal/encryption/encryption.go index aafafc387b..1c95da74ec 100644 --- a/internal/encryption/encryption.go +++ b/internal/encryption/encryption.go @@ -88,6 +88,7 @@ func NewNoOpEncryptor() NoOpEncryptor { var _ DataEncryptor = NoOpEncryptor{} func (n NoOpEncryptor) Encrypt(cleartext []byte, dest Encrypted) error { + fmt.Println("no op encryptor") dest.Set(cleartext) return nil }