SQS messages are delivered at least once, meaning your consumer might receive the same message multiple times, which can lead to duplicate processing and unexpected side effects.

Here’s how a typical SQS consume-process-delete loop works, and why it’s not as simple as it sounds:

import boto3

sqs = boto3.client('sqs')
queue_url = 'YOUR_QUEUE_URL'

def process_message(message_body):
    print(f"Processing message: {message_body}")
    # Your actual processing logic here
    # This could involve database writes, API calls, etc.
    pass

while True:
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=1,
        WaitTimeSeconds=20,  # Long polling to reduce empty receives
        VisibilityTimeout=30 # How long the message is hidden from other consumers
    )

    messages = response.get('Messages', [])
    if not messages:
        print("No messages received, waiting...")
        continue

    message = messages[0]
    message_body = message['Body']
    receipt_handle = message['ReceiptHandle']

    try:
        process_message(message_body)
        # If processing is successful, delete the message
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=receipt_handle
        )
        print(f"Deleted message: {receipt_handle}")
    except Exception as e:
        print(f"Error processing message {receipt_handle}: {e}")
        # If processing fails, the message will reappear after VisibilityTimeout
        # No explicit action needed here for basic retry

This code demonstrates the fundamental pattern: receive a message, process its content, and then delete it. The VisibilityTimeout is crucial. When a consumer receives a message, it becomes invisible to other consumers for the duration specified by this timeout. If the consumer successfully processes the message and calls delete_message, it’s gone. If it fails to delete the message (e.g., due to an error during processing, or the consumer crashing), the VisibilityTimeout expires, and the message becomes visible again, ready to be picked up by another (or the same) consumer.

The problem arises because SQS guarantees at-least-once delivery, not exactly-once. This means that even if your consumer successfully processes a message and intends to delete it, a network glitch or a momentary pause in your application could prevent the delete_message call from reaching SQS. The VisibilityTimeout will eventually expire, and the message will be redelivered. Now your consumer might process the same message a second time.

To handle this, you need to make your message processing idempotent. Idempotency means that processing the same message multiple times has the same effect as processing it only once.

The most common and robust way to achieve idempotency for SQS messages is by using a unique message identifier and storing the processing status of messages.

Here’s how you can modify the process_message function to be idempotent:

  1. Generate or Extract a Unique ID: Every message that enters your system should ideally have a unique identifier. If the sender doesn’t provide one, you might generate one upon receiving the message or derive it from the message content if possible. For this example, let’s assume the message body itself contains a unique transaction_id.

  2. Check if Already Processed: Before performing any state-changing operations (like writing to a database), check if you’ve already processed a message with this unique ID. You can store these IDs in a database, a cache (like Redis or DynamoDB), or even a dedicated SQS Dead-Letter Queue (DLQ) if you want to just discard duplicates.

  3. Process and Record: If the message hasn’t been processed, perform your actual processing logic. Crucially, immediately after successful processing (or as part of a single atomic transaction), record the unique message ID as "processed".

  4. Delete the Message: Only after successfully processing and recording the ID, delete the message from the SQS queue.

Let’s illustrate with a conceptual database check:

import boto3
import json
from datetime import datetime

sqs = boto3.client('sqs')
dynamodb = boto3.resource('dynamodb') # Assuming DynamoDB for tracking processed messages

queue_url = 'YOUR_QUEUE_URL'
processed_items_table = dynamodb.Table('ProcessedMessages') # A table to store unique message IDs

def process_message_idempotently(message_body):
    message_data = json.loads(message_body)
    message_id = message_data.get('transaction_id') # Assume a 'transaction_id' field

    if not message_id:
        print("Message is missing a transaction_id, cannot ensure idempotency.")
        # Decide how to handle this: skip, log, send to DLQ, etc.
        return False

    # 1. Check if already processed
    try:
        response = processed_items_table.get_item(Key={'message_id': message_id})
        if 'Item' in response:
            print(f"Message {message_id} already processed. Skipping.")
            return True # Message was already processed, consider it a success for deletion purposes
    except Exception as e:
        print(f"Error checking processed status for {message_id}: {e}")
        # Decide how to handle DB errors - might need to retry or alert

    # 2. If not processed, perform the actual work
    print(f"Processing unique message: {message_id} with body: {message_data}")
    try:
        # --- YOUR ACTUAL PROCESSING LOGIC STARTS HERE ---
        # Example: Save to a database, call an external API, etc.
        # This part must be atomic or handle its own retries/failures.
        print("Performing critical business logic...")
        # Simulate work
        import time
        time.sleep(1)
        # --- YOUR ACTUAL PROCESSING LOGIC ENDS HERE ---

        # 3. Record as processed *after* successful processing
        processed_items_table.put_item(
            Item={
                'message_id': message_id,
                'processed_at': datetime.utcnow().isoformat(),
                'original_message_body': message_body # Optional: store for debugging
            }
        )
        print(f"Successfully processed and recorded message {message_id}.")
        return True # Indicate successful processing and readiness for deletion

    except Exception as e:
        print(f"Error during processing of message {message_id}: {e}")
        # Do NOT record as processed. Message will be retried.
        return False # Indicate failure, message should not be deleted


while True:
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=1,
        WaitTimeSeconds=20,
        VisibilityTimeout=30 # Keep this reasonable for retries
    )

    messages = response.get('Messages', [])
    if not messages:
        continue

    message = messages[0]
    message_body = message['Body']
    receipt_handle = message['ReceiptHandle']

    try:
        if process_message_idempotently(message_body):
            # If processing was successful and recorded, delete the message
            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=receipt_handle
            )
            print(f"Deleted message with receipt handle: {receipt_handle}")
        else:
            print(f"Processing failed for message with receipt handle: {receipt_handle}. Message will be visible again after timeout.")
            # If processing failed, we *don't* delete.
            # The message will reappear after VisibilityTimeout.
            # You might want to implement a mechanism to move messages to a DLQ after
            # a certain number of retries to prevent infinite loops for unsolvable messages.
    except Exception as e:
        print(f"An unexpected error occurred managing message {receipt_handle}: {e}")
        # This catch-all is for errors in the loop logic itself, not message processing.
        # The message will typically be retried due to VisibilityTimeout.

The core idea is that the process_message_idempotently function returns True only if the message was processed and its status recorded. If it returns False (due to an error in processing or if it was already processed), the delete_message call is skipped. The message will then become visible again after its VisibilityTimeout expires, allowing for a retry.

A common pitfall is not correctly identifying the unique message ID, or not recording the "processed" status atomically with the actual work. If your processing involves multiple steps, ensure that either all steps succeed, or none are considered "processed" for idempotency tracking.

Another critical aspect is configuring a Dead-Letter Queue (DLQ) for your SQS queue. After a message has been retried a certain number of times (configured via maxReceiveCount on the source queue’s RedrivePolicy), it will be sent to the DLQ. This prevents messages that consistently fail from blocking the queue indefinitely and allows you to inspect them later for debugging.

The next problem you’ll encounter is managing the lifecycle of messages that should be processed but consistently fail, leading to their accumulation in the DLQ.

Want structured learning?

Take the full Sqs course →