Skip to content

Commit

Permalink
Merge branch 'main' into dli/trim-buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
deniseli authored Aug 19, 2024
2 parents 9d94288 + f75a5f0 commit 2d76dfa
Show file tree
Hide file tree
Showing 18 changed files with 193 additions and 84 deletions.
6 changes: 5 additions & 1 deletion .go-arch-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,8 @@ deps:
- common
- internal
- sql
- leases
- leases
encoding:
mayDependOn:
- common
- internal
7 changes: 4 additions & 3 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}

decryptedRequest, err := d.decrypt(encryption.AsyncSubKey, row.Request)
decryptedRequest, err := d.decrypt(&row.Request)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
Expand Down Expand Up @@ -158,11 +158,12 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context,
didScheduleAnotherCall = false
switch result := result.(type) {
case either.Left[[]byte, string]: // Successful response.
encryptedResult, err := d.encrypt(encryption.AsyncSubKey, result.Get())
var encryptedResult encryption.EncryptedAsyncColumn
err := d.encrypt(result.Get(), &encryptedResult)
if err != nil {
return false, fmt.Errorf("failed to encrypt async call result: %w", err)
}
_, err = tx.db.SucceedAsyncCall(ctx, encryptedResult, call.ID)
_, err = tx.db.SucceedAsyncCall(ctx, optional.Some(encryptedResult), call.ID)
if err != nil {
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
Expand Down Expand Up @@ -227,7 +228,7 @@ func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}
request, err := d.decrypt(encryption.AsyncSubKey, row.Request)
request, err := d.decrypt(&row.Request)
if err != nil {
return nil, fmt.Errorf("failed to decrypt async call request: %w", err)
}
Expand Down
18 changes: 11 additions & 7 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,11 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return dalerrs.TranslatePGError(err)
}
}
payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]interface{}{
var payload encryption.EncryptedTimelineColumn
err = d.encryptJSON(map[string]interface{}{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
})
}, &payload)
if err != nil {
return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err)
}
Expand Down Expand Up @@ -782,10 +783,11 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{
var payload encryption.EncryptedTimelineColumn
err = d.encryptJSON(map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
})
}, &payload)
if err != nil {
return fmt.Errorf("replace deployment failed to encrypt payload: %w", err)
}
Expand Down Expand Up @@ -1057,7 +1059,8 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
"error": log.Error,
"stack": log.Stack,
}
encryptedPayload, err := d.encryptJSON(encryption.TimelineSubKey, payload)
var encryptedPayload encryption.EncryptedTimelineColumn
err := d.encryptJSON(payload, &encryptedPayload)
if err != nil {
return fmt.Errorf("failed to encrypt log payload: %w", err)
}
Expand Down Expand Up @@ -1137,13 +1140,14 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if pr, ok := call.ParentRequestKey.Get(); ok {
parentRequestKey = optional.Some(pr.String())
}
payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{
var payload encryption.EncryptedTimelineColumn
err := d.encryptJSON(map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
"response": call.Response,
"error": call.Error,
"stack": call.Stack,
})
}, &payload)
if err != nil {
return fmt.Errorf("failed to encrypt call payload: %w", err)
}
Expand Down
28 changes: 14 additions & 14 deletions backend/controller/dal/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,45 @@ import (
"github.com/TBD54566975/ftl/internal/log"
)

func (d *DAL) encrypt(subKey encryption.SubKey, cleartext []byte) ([]byte, error) {
func (d *DAL) encrypt(cleartext []byte, dest encryption.Encrypted) error {
if d.encryptor == nil {
return nil, fmt.Errorf("encryptor not set")
return fmt.Errorf("encryptor not set")
}

v, err := d.encryptor.Encrypt(subKey, cleartext)
err := d.encryptor.Encrypt(cleartext, dest)
if err != nil {
return nil, fmt.Errorf("failed to encrypt binary with subkey %s: %w", subKey, err)
return fmt.Errorf("failed to encrypt binary with subkey %s: %w", dest.SubKey(), err)
}

return v, nil
return nil
}

func (d *DAL) decrypt(subKey encryption.SubKey, encrypted []byte) ([]byte, error) {
func (d *DAL) decrypt(encrypted encryption.Encrypted) ([]byte, error) {
if d.encryptor == nil {
return nil, fmt.Errorf("encryptor not set")
}

v, err := d.encryptor.Decrypt(subKey, encrypted)
v, err := d.encryptor.Decrypt(encrypted)
if err != nil {
return nil, fmt.Errorf("failed to decrypt binary with subkey %s: %w", subKey, err)
return nil, fmt.Errorf("failed to decrypt binary with subkey %s: %w", encrypted.SubKey(), err)
}

return v, nil
}

func (d *DAL) encryptJSON(subKey encryption.SubKey, v any) ([]byte, error) {
func (d *DAL) encryptJSON(v any, dest encryption.Encrypted) error {
serialized, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("failed to marshal JSON: %w", err)
return fmt.Errorf("failed to marshal JSON: %w", err)
}

return d.encrypt(subKey, serialized)
return d.encrypt(serialized, dest)
}

func (d *DAL) decryptJSON(subKey encryption.SubKey, encrypted []byte, v any) error { //nolint:unparam
decrypted, err := d.decrypt(subKey, encrypted)
func (d *DAL) decryptJSON(encrypted encryption.Encrypted, v any) error { //nolint:unparam
decrypted, err := d.decrypt(encrypted)
if err != nil {
return fmt.Errorf("failed to decrypt json with subkey %s: %w", subKey, err)
return fmt.Errorf("failed to decrypt json with subkey %s: %w", encrypted.SubKey(), err)
}

if err = json.Unmarshal(decrypted, v); err != nil {
Expand Down
9 changes: 4 additions & 5 deletions backend/controller/dal/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/sql"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/encryption"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)
Expand Down Expand Up @@ -349,7 +348,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo
switch row.Type {
case sql.EventTypeLog:
var jsonPayload eventLogJSON
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt log event: %w", err)
}

Expand All @@ -371,7 +370,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo

case sql.EventTypeCall:
var jsonPayload eventCallJSON
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt call event: %w", err)
}
var sourceVerb optional.Option[schema.Ref]
Expand All @@ -396,7 +395,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo

case sql.EventTypeDeploymentCreated:
var jsonPayload eventDeploymentCreatedJSON
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt call event: %w", err)
}
out = append(out, &DeploymentCreatedEvent{
Expand All @@ -411,7 +410,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo

case sql.EventTypeDeploymentUpdated:
var jsonPayload eventDeploymentUpdatedJSON
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(&row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt call event: %w", err)
}
out = append(out, &DeploymentUpdatedEvent{
Expand Down
9 changes: 5 additions & 4 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
//
// Note: no validation of the FSM is performed.
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string, destinationState schema.RefKey, request []byte, encrypted bool, retryParams schema.RetryParams) (err error) {
var encryptedRequest []byte
var encryptedRequest encryption.EncryptedAsyncColumn
if encrypted {
encryptedRequest = request
encryptedRequest = encryption.EncryptedAsyncColumn(request)
} else {
encryptedRequest, err = d.encrypt(encryption.AsyncSubKey, request)
err = d.encrypt(request, &encryptedRequest)
if err != nil {
return fmt.Errorf("failed to encrypt FSM request: %w", err)
}
Expand Down Expand Up @@ -146,7 +146,8 @@ func (d *DAL) PopNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKe
}

func (d *DAL) SetNextFSMEvent(ctx context.Context, fsm schema.RefKey, instanceKey string, nextState schema.RefKey, request json.RawMessage, requestType schema.Type) error {
encryptedRequest, err := d.encryptJSON(encryption.AsyncSubKey, request)
var encryptedRequest encryption.EncryptedAsyncColumn
err := d.encryptJSON(request, &encryptedRequest)
if err != nil {
return fmt.Errorf("failed to encrypt FSM request: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error {
encryptedPayload, err := d.encrypt(encryption.AsyncSubKey, payload)
var encryptedPayload encryption.EncryptedAsyncColumn
err := d.encrypt(payload, &encryptedPayload)
if err != nil {
return fmt.Errorf("failed to encrypt payload: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2d76dfa

Please sign in to comment.