SQS workers in Go, when implemented with a polling and processing pattern, fundamentally exist to prevent your application from being overwhelmed by tasks.
Here’s a simplified Go SQS worker processing messages:
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
var sqsClient *sqs.Client
var queueURL string
func init() {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
sqsClient = sqs.NewFromConfig(cfg)
queueURL = os.Getenv("QUEUE_URL")
if queueURL == "" {
log.Fatal("QUEUE_URL environment variable not set")
}
}
func main() {
var wg sync.WaitGroup
numWorkers := 5 // Process up to 5 messages concurrently
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
pollAndProcess()
}()
}
wg.Wait()
}
func pollAndProcess() {
for {
messages, err := receiveMessages(context.TODO())
if err != nil {
log.Printf("Error receiving messages: %v", err)
time.Sleep(10 * time.Second) // Backoff on receive error
continue
}
if len(messages) == 0 {
// No messages, wait a bit before polling again
time.Sleep(5 * time.Second)
continue
}
for _, msg := range messages {
processMessage(context.TODO(), msg)
}
}
}
func receiveMessages(ctx context.Context) ([]types.Message, error) {
result, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 10, // Fetch up to 10 messages at a time
WaitTimeSeconds: aws.Int32(20), // Long polling
VisibilityTimeout: aws.Int32(30), // Messages invisible for 30 seconds
})
if err != nil {
return nil, fmt.Errorf("ReceiveMessage API call failed: %w", err)
}
return result.Messages, nil
}
func processMessage(ctx context.Context, msg types.Message) {
log.Printf("Processing message: %s", aws.ToString(msg.MessageId))
// Simulate work
time.Sleep(2 * time.Second)
// If processing is successful, delete the message
_, err := sqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
log.Printf("Error deleting message %s: %v", aws.ToString(msg.MessageId), err)
// Depending on the error, you might want to retry deletion or let it become visible again
} else {
log.Printf("Successfully processed and deleted message: %s", aws.ToString(msg.MessageId))
}
}
This pattern achieves its goal by having a pool of worker goroutines that continuously poll SQS for new messages. When messages are received, these workers process them. The key is that a worker doesn’t just grab a message and process it instantly; it has a lifecycle of receiving, attempting to process, and then either deleting (success) or letting it become visible again (failure/timeout).
The fundamental problem this solves is decoupling the producer of work from the consumer. A web server might push tasks to SQS, and this worker pattern ensures those tasks are handled asynchronously without blocking the web server. It also provides resilience: if a worker crashes mid-task, SQS makes the message available again after a VisibilityTimeout.
Here’s how it works internally:
- Polling: Workers call
ReceiveMessageon the SQS queue. This API call uses long polling. Instead of returning immediately if no messages are available, it waits for up to 20 seconds (configured byWaitTimeSeconds) for messages to arrive. This dramatically reduces the number of empty responses and thus the cost and load on SQS. - Visibility Timeout: When
ReceiveMessagereturns messages, it also sets aVisibilityTimeout(here, 30 seconds). During this period, the messages are hidden from subsequentReceiveMessagecalls. This ensures that only one worker processes a given message at a time. If the worker successfully processes the message within this timeout, it deletes the message. If it fails or the timeout expires before deletion, SQS makes the message visible again for another worker to pick up. - Processing: The
processMessagefunction simulates the actual work. This could be anything: sending an email, updating a database, calling an external API. - Deletion: Crucially, after successful processing, the worker calls
DeleteMessageusing theReceiptHandleit received. This tells SQS that the message has been handled and should not be redelivered. - Concurrency: Multiple worker goroutines (
numWorkers) run concurrently, allowing the application to process many messages in parallel.
The MaxNumberOfMessages in ReceiveMessage is set to 10, meaning a single poll request can fetch up to 10 messages. The WaitTimeSeconds is set to 20, meaning the poll request will wait up to 20 seconds if fewer than 10 messages are available, before returning. The VisibilityTimeout is set to 30 seconds, meaning once a message is received by a worker, it will be hidden from other workers for 30 seconds. If the worker doesn’t delete it within this time, SQS will make it visible again.
One critical aspect of this pattern is error handling. If ReceiveMessage fails, the worker should back off and retry. If processMessage fails, the worker must not delete the message. The VisibilityTimeout will eventually expire, and SQS will redeliver the message. You might also implement a dead-letter queue (DLQ) strategy: if a message is delivered and fails processing too many times (e.g., 5 times), SQS can automatically move it to a separate DLQ for manual inspection, preventing infinite redelivery loops.
The next logical step after mastering this polling and processing pattern is to implement a robust dead-letter queue strategy for handling persistently failing messages.