Skip to content

Commit

Permalink
encrypt fsm next event payload
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Aug 13, 2024
1 parent 83147c6 commit 7b68b92
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 39 deletions.
9 changes: 4 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,14 +888,14 @@ func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.S
}
defer instance.Release() //nolint:errcheck

err = s.sendFSMEventInTx(ctx, tx, instance, fsm, eventType, msg.Body)
err = s.sendFSMEventInTx(ctx, tx, instance, fsm, eventType, msg.Body, false)
if err != nil {
return nil, err
}
return connect.NewResponse(&ftlv1.SendFSMEventResponse{}), nil
}

func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.Tx, instance *dal.FSMInstance, fsm *schema.FSM, eventType schema.Type, body []byte) error {
func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.Tx, instance *dal.FSMInstance, fsm *schema.FSM, eventType schema.Type, body []byte, encrypted bool) error {
// Populated if we find a matching transition.
var destinationRef *schema.Ref
var destinationVerb *schema.Verb
Expand Down Expand Up @@ -956,7 +956,7 @@ func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.Tx, instance *da
return connect.NewError(connect.CodeInternal, err)
}

err = tx.StartFSMTransition(ctx, instance.FSM, instance.Key, destinationRef.ToRefKey(), body, retryParams)
err = tx.StartFSMTransition(ctx, instance.FSM, instance.Key, destinationRef.ToRefKey(), body, encrypted, retryParams)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not start fsm transition: %w", err))
}
Expand Down Expand Up @@ -1664,9 +1664,8 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, orig

// If there's a next event enqueued, we immediately start it.
if next, ok := next.Get(); ok {
return s.sendFSMEventInTx(ctx, tx, instance, fsm, next.RequestType, next.Request)
return s.sendFSMEventInTx(ctx, tx, instance, fsm, next.RequestType, next.Request, true)
}

return nil
}

Expand Down
21 changes: 15 additions & 6 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ import (
// future execution.
//
// Note: no validation of the FSM is performed.
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, destinationState schema.RefKey, request json.RawMessage, retryParams schema.RetryParams) (err error) {
encryptedRequest, err := d.encryptors.Async.EncryptJSON(request)
if err != nil {
return fmt.Errorf("failed to encrypt FSM request: %w", err)
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, destinationState schema.RefKey, request []byte, encrypted bool, retryParams schema.RetryParams) (err error) {
var encryptedRequest []byte
if encrypted {
encryptedRequest = request
} else {
encryptedRequest, err = d.encryptors.Async.EncryptJSON(json.RawMessage(request))
if err != nil {
return fmt.Errorf("failed to encrypt FSM request: %w", err)
}
}

// Create an async call for the event.
Expand Down Expand Up @@ -136,11 +141,15 @@ func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKe
}

func (d *DAL) SetNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string, nextState schema.RefKey, request json.RawMessage, requestType schema.Type) error {
_, err := d.db.SetNextFSMEvent(ctx, sql.SetNextFSMEventParams{
encryptedRequest, err := d.encryptors.Async.EncryptJSON(request)
if err != nil {
return fmt.Errorf("failed to encrypt FSM request: %w", err)
}
_, err = d.db.SetNextFSMEvent(ctx, sql.SetNextFSMEventParams{
Fsm: fsm,
InstanceKey: instanceKey,
Event: nextState,
Request: request,
Request: encryptedRequest,
RequestType: sql.Type{
Type: requestType,
},
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/dal/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ func TestSendFSMEvent(t *testing.T) {
assert.IsError(t, err, dalerrs.ErrNotFound)

ref := schema.RefKey{Module: "module", Name: "verb"}
err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), schema.RetryParams{})
err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), false, schema.RetryParams{})
assert.NoError(t, err)

err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), schema.RetryParams{})
err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), false, schema.RetryParams{})
assert.IsError(t, err, dalerrs.ErrConflict)
assert.EqualError(t, err, "transition already executing: conflict")

Expand Down
19 changes: 9 additions & 10 deletions backend/controller/dal/testdata/go/fsmnext/fsmnext.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import (
"github.com/TBD54566975/ftl/go-runtime/ftl"
)

// This FSM allows transitions moving forward through the alphabet
// Each transition also declares the next state(s) to transition to using State
//
//ftl:retry 2 1s
var fsm *ftl.FSMHandle

func init() {
fsm = ftl.FSM("fsm",
func fsm() *ftl.FSMHandle {
// This FSM allows transitions moving forward through the alphabet
// Each transition also declares the next state(s) to transition to using State
//
//ftl:retry 2 1s
var fsm = ftl.FSM("fsm",
ftl.Start(StateA),
ftl.Transition(StateA, StateB),
ftl.Transition(StateA, StateC),
Expand All @@ -23,6 +21,7 @@ func init() {
ftl.Transition(StateB, StateD),
ftl.Transition(StateC, StateD),
)
return fsm
}

type State string
Expand Down Expand Up @@ -81,7 +80,7 @@ type Request struct {

//ftl:verb export
func SendOne(ctx context.Context, in Request) error {
return fsm.Send(ctx, in.Event.Instance, eventFor(in.Event, in.State))
return fsm().Send(ctx, in.Event.Instance, eventFor(in.Event, in.State))
}

func handleEvent(ctx context.Context, in Event) error {
Expand All @@ -96,7 +95,7 @@ func handleEvent(ctx context.Context, in Event) error {
attempts := in.NextAttempts.Default(1)
for i := range attempts {
ftl.LoggerFromContext(ctx).Infof("scheduling next event for %s (%d/%d)", in.Instance, i+1, attempts)
if err := fsm.Next(ctx, in.Instance, event); err != nil {
if err := fsm().Next(ctx, in.Instance, event); err != nil {
return err
}
}
Expand Down
25 changes: 19 additions & 6 deletions internal/encryption/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestEncryptionForLogs(t *testing.T) {
)
}

func TestEncryptionForubSub(t *testing.T) {
func TestEncryptionForPubSub(t *testing.T) {
in.RunWithEncryption(t, "",
in.CopyModule("encryption"),
in.Deploy("encryption"),
Expand All @@ -86,13 +86,26 @@ func TestEncryptionForFSM(t *testing.T) {
in.RunWithEncryption(t, "",
in.CopyModule("encryption"),
in.Deploy("encryption"),
// "Rosebud" goes from created -> paid to test normal encryption
in.Call[map[string]interface{}, any]("encryption", "beginFsm", map[string]interface{}{"name": "Rosebud"}, nil),
in.Sleep(3*time.Second),
in.Call[map[string]interface{}, any]("encryption", "transitionFsm", map[string]interface{}{"name": "Rosebud"}, nil),
in.Sleep(3*time.Second),

in.Sleep(2*time.Second),
in.Call[map[string]interface{}, any]("encryption", "transitionToPaid", map[string]interface{}{"name": "Rosebud"}, nil),
in.Sleep(2*time.Second),
validateAsyncCall("created", "Rosebud"),
validateAsyncCall("paid", "Rosebud"),

// "Next" goes from created -> nextAndSleep to test fsm next event encryption
in.Call[map[string]interface{}, any]("encryption", "beginFsm", map[string]interface{}{"name": "Next"}, nil),
in.Sleep(2*time.Second),
in.Call[map[string]interface{}, any]("encryption", "transitionToNextAndSleep", map[string]interface{}{"name": "Next"}, nil),
func(t testing.TB, ic in.TestContext) {
in.QueryRow("ftl", "SELECT COUNT(*) FROM fsm_next_event LIMIT 1", int64(1))(t, ic)
values := in.GetRow(t, ic, "ftl", "SELECT request FROM fsm_next_event LIMIT 1", 1)
request, ok := values[0].([]byte)
assert.True(t, ok, "could not convert payload to bytes")
assert.Contains(t, string(request), "encrypted", "raw request string should not be stored in the table")
assert.NotContains(t, string(request), "Next", "raw request string should not be stored in the table")
},
)
}

Expand All @@ -102,7 +115,7 @@ func validateAsyncCall(verb string, sensitive string) in.Action {

values := in.GetRow(t, ic, "ftl", fmt.Sprintf("SELECT request FROM async_calls WHERE verb = 'encryption.%s' AND state = 'success'", verb), 1)
request, ok := values[0].([]byte)
assert.True(t, ok, "could not convert payload to string")
assert.True(t, ok, "could not convert payload to bytes")
assert.Contains(t, string(request), "encrypted", "raw request string should not be stored in the table")
assert.NotContains(t, string(request), sensitive, "raw request string should not be stored in the table")
}
Expand Down
46 changes: 36 additions & 10 deletions internal/encryption/testdata/go/encryption/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package encryption
import (
"context"
"fmt"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)
Expand Down Expand Up @@ -55,13 +56,17 @@ func Consume(ctx context.Context, e Event) error {
// FSM
//
// Used to test encryption of async_calls tables via FSM operations

var fsm = ftl.FSM("payment",
ftl.Start(Created),
ftl.Start(Paid),
ftl.Transition(Created, Paid),
ftl.Transition(Paid, Completed),
)
func fsm() *ftl.FSMHandle {
var fsm = ftl.FSM("payment",
ftl.Start(Created),
ftl.Start(Paid),
ftl.Transition(Created, Paid),
ftl.Transition(Paid, Completed),
ftl.Transition(Created, NextAndSleep),
ftl.Transition(NextAndSleep, Completed),
)
return fsm
}

type OnlinePaymentCompleted struct {
Name string `json:"name"`
Expand All @@ -73,14 +78,23 @@ type OnlinePaymentCreated struct {
Name string `json:"name"`
}

type NextAndSleepEvent struct {
Name string `json:"name"`
}

//ftl:verb
func BeginFSM(ctx context.Context, req OnlinePaymentCreated) error {
return fsm.Send(ctx, "test", req)
return fsm().Send(ctx, req.Name, req)
}

//ftl:verb
func TransitionFSM(ctx context.Context, req OnlinePaymentPaid) error {
return fsm.Send(ctx, "test", req)
func TransitionToPaid(ctx context.Context, req OnlinePaymentPaid) error {
return fsm().Send(ctx, req.Name, req)
}

//ftl:verb
func TransitionToNextAndSleep(ctx context.Context, req NextAndSleepEvent) error {
return fsm().Send(ctx, req.Name, req)
}

//ftl:verb
Expand All @@ -97,3 +111,15 @@ func Created(ctx context.Context, in OnlinePaymentCreated) error {
func Paid(ctx context.Context, in OnlinePaymentPaid) error {
return nil
}

// NextAndSleep calls fsm.Next() and then sleeps so we can test what is put into the fsm next event table
//
//ftl:verb
func NextAndSleep(ctx context.Context, in NextAndSleepEvent) error {
err := fsm().Next(ctx, in.Name, OnlinePaymentCompleted{Name: in.Name})
if err != nil {
return err
}
time.Sleep(1 * time.Minute)
return nil
}

0 comments on commit 7b68b92

Please sign in to comment.