diff --git a/cmd/aws/lambda/substation/dynamodb.go b/cmd/aws/lambda/substation/dynamodb.go index 1a918627..793f095c 100644 --- a/cmd/aws/lambda/substation/dynamodb.go +++ b/cmd/aws/lambda/substation/dynamodb.go @@ -60,17 +60,12 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error { m := message tfGroup.Go(func() error { - msg, err := transform.Apply(tfCtx, sub.Transforms(), m) - if err != nil { + // Transformed messages are never returned to the caller because + // invocation is asynchronous. + if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil { return err } - for _, m := range msg { - if m.IsControl() { - continue - } - } - return nil }) } diff --git a/cmd/aws/lambda/substation/kinesis_stream.go b/cmd/aws/lambda/substation/kinesis_stream.go index 31e6c3b1..060ac2c4 100644 --- a/cmd/aws/lambda/substation/kinesis_stream.go +++ b/cmd/aws/lambda/substation/kinesis_stream.go @@ -57,17 +57,12 @@ func kinesisStreamHandler(ctx context.Context, event events.KinesisEvent) error m := message tfGroup.Go(func() error { - msg, err := transform.Apply(tfCtx, sub.Transforms(), m) - if err != nil { + // Transformed messages are never returned to the caller because + // invocation is asynchronous. + if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil { return err } - for _, m := range msg { - if m.IsControl() { - continue - } - } - return nil }) } diff --git a/cmd/aws/lambda/substation/s3.go b/cmd/aws/lambda/substation/s3.go index 0b3d4d0c..c1b20608 100644 --- a/cmd/aws/lambda/substation/s3.go +++ b/cmd/aws/lambda/substation/s3.go @@ -62,17 +62,12 @@ func s3Handler(ctx context.Context, event events.S3Event) error { m := message tfGroup.Go(func() error { - msg, err := transform.Apply(tfCtx, sub.Transforms(), m) - if err != nil { + // Transformed messages are never returned to the caller because + // invocation is asynchronous. + if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil { return err } - for _, m := range msg { - if m.IsControl() { - continue - } - } - return nil }) } @@ -206,17 +201,12 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error { m := message tfGroup.Go(func() error { - msg, err := transform.Apply(tfCtx, sub.Transforms(), m) - if err != nil { + // Transformed messages are never returned to the caller because + // invocation is asynchronous. + if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil { return err } - for _, m := range msg { - if m.IsControl() { - continue - } - } - return nil }) } diff --git a/cmd/aws/lambda/substation/sns.go b/cmd/aws/lambda/substation/sns.go index e2882abe..a9466bff 100644 --- a/cmd/aws/lambda/substation/sns.go +++ b/cmd/aws/lambda/substation/sns.go @@ -56,17 +56,12 @@ func snsHandler(ctx context.Context, event events.SNSEvent) error { m := message tfGroup.Go(func() error { - msg, err := transform.Apply(tfCtx, sub.Transforms(), m) - if err != nil { + // Transformed messages are never returned to the caller because + // invocation is asynchronous. + if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil { return err } - for _, m := range msg { - if m.IsControl() { - continue - } - } - return nil }) } diff --git a/cmd/aws/lambda/substation/sqs.go b/cmd/aws/lambda/substation/sqs.go index 5ef85d4b..0d0b7833 100644 --- a/cmd/aws/lambda/substation/sqs.go +++ b/cmd/aws/lambda/substation/sqs.go @@ -55,17 +55,12 @@ func sqsHandler(ctx context.Context, event events.SQSEvent) error { m := message tfGroup.Go(func() error { - msg, err := transform.Apply(tfCtx, sub.Transforms(), m) - if err != nil { + // Transformed messages are never returned to the caller because + // invocation is asynchronous. + if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil { return err } - for _, m := range msg { - if m.IsControl() { - continue - } - } - return nil }) }