SQS consumers can easily become a bottleneck, but scaling them up based on queue depth is a surprisingly effective way to keep your pipelines humming without over-provisioning.
Let’s imagine we have a system processing customer orders. Orders arrive in an SQS queue, and our worker instances pull messages, process them (say, updating a database), and then delete them. If too many orders pile up, orders get delayed. If we have too many workers, we’re just burning money.
Here’s a small Go application that simulates workers pulling from an SQS queue. Notice how the MaxNumberOfMessages is set to 10, meaning each poll can grab up to 10 messages at once.
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
var (
queueURL = os.Getenv("SQS_QUEUE_URL")
region = os.Getenv("AWS_REGION")
)
func main() {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(region))
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
client := sqs.NewFromConfig(cfg)
var wg sync.WaitGroup
numWorkers := 5 // Start with 5 workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(client, &wg, i)
}
wg.Wait()
}
func worker(client *sqs.Client, wg *sync.WaitGroup, id int) {
defer wg.Done()
log.Printf("Worker %d started", id)
for {
// Poll for messages
resp, err := client.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
QueueUrl: &queueURL,
MaxNumberOfMessages: 10, // Grab up to 10 messages per poll
VisibilityTimeout: 60, // 60 second visibility timeout
WaitTimeSeconds: 20, // Long polling for 20 seconds
})
if err != nil {
log.Printf("Worker %d: Error receiving message: %v", id, err)
time.Sleep(5 * time.Second) // Backoff
continue
}
if len(resp.Messages) == 0 {
log.Printf("Worker %d: No messages received, continuing to poll", id)
continue
}
for _, msg := range resp.Messages {
log.Printf("Worker %d: Processing message %s", id, *msg.MessageId)
// Simulate processing time
time.Sleep(2 * time.Second)
// Delete message after processing
_, err := client.DeleteMessage(context.TODO(), &sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
log.Printf("Worker %d: Error deleting message %s: %v", id, *msg.MessageId, err)
// Depending on the error, you might want to retry or log for manual inspection
} else {
log.Printf("Worker %d: Successfully processed and deleted message %s", id, *msg.MessageId)
}
}
}
}
The core problem this solves is resource utilization efficiency. Without dynamic scaling, you either:
- Over-provision: Run many workers all the time, hoping to catch the peak load, leading to high costs during low-traffic periods.
- Under-provision: Run few workers, and have a growing backlog of messages during peak loads, causing increased latency and frustrated users.
The mental model for scaling SQS consumers by queue depth involves a feedback loop:
- Observe: Periodically check the number of messages visible in the SQS queue. This is the backlog that needs processing.
- Decide: Based on the observed queue depth and current worker count, determine if more workers are needed or if some can be shut down.
- Act: Increment or decrement the number of running worker instances.
The key levers you control are:
MaxNumberOfMessages: How many messages a singleReceiveMessagecall can fetch. Higher values mean fewer API calls but potentially longer processing times for a single worker if messages vary wildly in complexity.WaitTimeSeconds: Configures long polling. A value of 0 means short polling (expensive, polls immediately even if empty). A value > 0 enables long polling, where SQS holds the request open for up to the specified time if no messages are available, reducing emptyReceiveMessagecalls. 20 seconds is a common, cost-effective setting.VisibilityTimeout: The duration after a message is received during which it’s hidden from other consumers. If a worker fails to delete the message within this timeout, it becomes visible again for another worker. This should be longer than your expected processing time per message.- Scaling Thresholds: The specific queue depths that trigger adding or removing workers. These are application-specific and require tuning. For example, "add a worker if queue depth > 1000" and "remove a worker if queue depth < 100".
- Worker Concurrency: How many messages a single worker instance processes concurrently. In our Go example, a single worker instance polls and processes messages sequentially. You could modify this to have goroutines within a worker instance handle multiple messages fetched in a single poll.
The ApproximateNumberOfMessagesVisible metric in CloudWatch is your primary input. You’d typically have a separate "scaler" process (or Lambda function) that:
- Queries SQS for
ApproximateNumberOfMessagesVisible. - Compares this to your current worker count.
- Uses an Auto Scaling Group (or similar mechanism) to adjust the number of EC2 instances or ECS tasks running your workers.
A common scaling policy might look like this:
- Scale Out: If
ApproximateNumberOfMessagesVisibleexceedsXmessages per running worker, addYworkers. - Scale In: If
ApproximateNumberOfMessagesVisibledrops belowZmessages per running worker for a sustained period, removeWworkers.
The exact formula for X, Y, Z, and W depends on your message processing time, your desired latency, and your cost targets. A simple rule of thumb is to aim for a target number of messages per worker, say 100. If ApproximateNumberOfMessagesVisible / current_workers > 100, add more workers. If it drops below 50 for a while, remove workers.
What often trips people up is mistaking ApproximateNumberOfMessagesVisible for ApproximateNumberOfMessagesNotVisible (messages that are currently being processed and are in their visibility timeout) or ApproximateNumberOfMessages (the sum of both). You only want to react to the messages that are actively waiting to be picked up.
The next logical step after mastering queue-depth-based scaling is to integrate this with Dead-Letter Queues (DLQs) to handle messages that consistently fail processing, preventing them from blocking your main queue indefinitely.