Node.js faster than Go when receiving SQS messages? #5375
-
Hey folks, this is a "cross-post" of aws/aws-sdk-go#5024. Since we're in the JavaScript discussions, I don't expect people to tell me how to improve the Go code :-) But it's maybe interesting for you, since you might think about the same topics. we're trying to benchmark whether a switch to Go for some of our high load services makes sense. We've created a simple benchmark, comparing how many messages per second a program can pull from an SQS queue and we're super surprised that Node.js outperforms Go with >6,000 messages per second vs. 2,200 messages per second. We expected that it should be the other way around but cannot pinpoint the root cause why it's the opposite. For Go we use the latest aws-sdk-go, for Node.js we're using https://github.com/bbc/sqs-consumer which is just a wrapper around @aws-sdk/client-sqs - under the hood it's doing the same. The programs are very simple / minimal. They fetch items from the queue, increase a counter per received message and remove the message from the queue. They use batches of 10 and both Long Polling (Short Polling didn't make a difference.) Both ran on the very same machine (MacBook Pro 14", M1 Pro) at the same time of day, same network conditions. Thanks for any hints what why the results are what they are. Benchmark Results:GoNode.js :Go Code: package main
import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
func main() {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
log.Fatalf("Unable to load SDK config, %v", err)
}
var wg sync.WaitGroup
numGoroutines := 200
// Counter for the number of messages processed, to be incremented atomically
var messagesProcessed int64
// Start a separate goroutine to log processed messages every second
go func() {
for range time.Tick(time.Second) {
// Since multiple goroutines can update messagesProcessed, we retrieve the value atomically.
count := atomic.LoadInt64(&messagesProcessed)
fmt.Printf("Messages processed per second: %d\n", count)
// Reset the counter
atomic.StoreInt64(&messagesProcessed, 0)
}
}()
// Start multiple goroutines to process messages concurrently
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(workerId int) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", workerId)
for {
client := sqs.NewFromConfig(cfg)
queueUrl := "https://sqs.eu-central-1.amazonaws.com/0123456789/benchmark-queue"
receiveMessageInput := &sqs.ReceiveMessageInput{
QueueUrl: &queueUrl,
MaxNumberOfMessages: 10, // same as for the Node.js version
WaitTimeSeconds: 20, // Enable long polling like in Node.js sqs-consumer version - Benchmark: no difference regarding performance compared to short polling
}
receiveMessageOutput, err := client.ReceiveMessage(context.TODO(), receiveMessageInput)
if err != nil {
fmt.Printf("Worker %d: Error receiving messages: %s\n", workerId, err)
continue
}
// If no messages are available, ReceiveMessage returns an empty slice
if len(receiveMessageOutput.Messages) == 0 {
fmt.Printf("Worker %d: Received no messages\n", workerId)
continue
}
for _, message := range receiveMessageOutput.Messages {
// Simulating message processing by incrementing the counter
atomic.AddInt64(&messagesProcessed, 1)
// After processing the message, delete it from the queue.
deleteInput := &sqs.DeleteMessageInput{
QueueUrl: &queueUrl,
ReceiptHandle: message.ReceiptHandle,
}
_, err := client.DeleteMessage(context.TODO(), deleteInput)
if err != nil {
fmt.Printf("Worker %d: Failed to delete message: %s\n", workerId, err)
}
}
}
}(i)
}
wg.Wait()
} JavaScript code: import { Consumer } from 'sqs-consumer'
const cluster = require('cluster')
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`)
// Total count of messages processed
let totalCount = 0
// Fork workers
for (let i = 0; i < 50; i++) {
cluster.fork()
}
// Function to handle message counts received from workers
function messageHandler(msg) {
if (msg.type === 'count') {
totalCount += msg.count
}
}
// Listen for messages from worker processes
for (const id in cluster.workers) {
cluster.workers[id].on('message', messageHandler)
}
// Log the total count every second and reset for the next interval
setInterval(() => {
console.log(`Messages per second: ${totalCount}`)
totalCount = 0
}, 1000)
} else {
let messageCount = 0
async function handleMessage(_snsMessage) {
messageCount++
}
const app = Consumer.create({
queueUrl: process.env.SQS_QUEUE_URL,
batchSize: 10,
handleMessageBatch: async (snsMessages) => {
const promises = []
for (const snsMessage of snsMessages) {
promises.push(handleMessage(snsMessage))
}
await Promise.all(promises)
},
handleMessage: async (snsMessage) => {
return await handleMessage(snsMessage)
},
})
// Send the message count to the master process every second, then reset to 0
setInterval(() => {
process.send({ type: 'count', count: messageCount })
messageCount = 0
}, 1000)
console.log('Starting SQS benchmark...')
app.start()
}
`` |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
After applying batch delete to the Go program, it performs exactly the same as the Node one. |
Beta Was this translation helpful? Give feedback.
-
Hello! Reopening this discussion to make it searchable. |
Beta Was this translation helpful? Give feedback.
sqs-consumer
does batch delete internally when using it withhandleMessageBatch
.My Go implementation deleted every single message inside the loop.
After applying batch delete to the Go program, it performs exactly the same as the Node one.