Skip to content

Commit

Permalink
perf: lambda async sources
Browse files Browse the repository at this point in the history
  • Loading branch information
jshlbrd committed Nov 1, 2023
1 parent c73e2b8 commit 49d5c4b
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 48 deletions.
11 changes: 3 additions & 8 deletions cmd/aws/lambda/substation/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,12 @@ func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {

m := message
tfGroup.Go(func() error {
msg, err := transform.Apply(tfCtx, sub.Transforms(), m)
if err != nil {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil {
return err
}

for _, m := range msg {
if m.IsControl() {
continue
}
}

return nil
})
}
Expand Down
11 changes: 3 additions & 8 deletions cmd/aws/lambda/substation/kinesis_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,12 @@ func kinesisStreamHandler(ctx context.Context, event events.KinesisEvent) error

m := message
tfGroup.Go(func() error {
msg, err := transform.Apply(tfCtx, sub.Transforms(), m)
if err != nil {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil {
return err
}

for _, m := range msg {
if m.IsControl() {
continue
}
}

return nil
})
}
Expand Down
22 changes: 6 additions & 16 deletions cmd/aws/lambda/substation/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,12 @@ func s3Handler(ctx context.Context, event events.S3Event) error {

m := message
tfGroup.Go(func() error {
msg, err := transform.Apply(tfCtx, sub.Transforms(), m)
if err != nil {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil {
return err
}

for _, m := range msg {
if m.IsControl() {
continue
}
}

return nil
})
}
Expand Down Expand Up @@ -206,17 +201,12 @@ func s3SnsHandler(ctx context.Context, event events.SNSEvent) error {

m := message
tfGroup.Go(func() error {
msg, err := transform.Apply(tfCtx, sub.Transforms(), m)
if err != nil {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil {
return err
}

for _, m := range msg {
if m.IsControl() {
continue
}
}

return nil
})
}
Expand Down
11 changes: 3 additions & 8 deletions cmd/aws/lambda/substation/sns.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,12 @@ func snsHandler(ctx context.Context, event events.SNSEvent) error {

m := message
tfGroup.Go(func() error {
msg, err := transform.Apply(tfCtx, sub.Transforms(), m)
if err != nil {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil {
return err
}

for _, m := range msg {
if m.IsControl() {
continue
}
}

return nil
})
}
Expand Down
11 changes: 3 additions & 8 deletions cmd/aws/lambda/substation/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,12 @@ func sqsHandler(ctx context.Context, event events.SQSEvent) error {

m := message
tfGroup.Go(func() error {
msg, err := transform.Apply(tfCtx, sub.Transforms(), m)
if err != nil {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := transform.Apply(tfCtx, sub.Transforms(), m); err != nil {
return err
}

for _, m := range msg {
if m.IsControl() {
continue
}
}

return nil
})
}
Expand Down

0 comments on commit 49d5c4b

Please sign in to comment.