SQS doesn’t actually guarantee you process each message once; it guarantees you receive each message at most once.

Let’s see this in action. Imagine a simple SQS queue where we want to process incoming user_signup events. Our consumer code, running in a loop, looks something like this:

import boto3
from botocore.exceptions import ClientError

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-signup-queue'

def process_signup(message_body):
    print(f"Processing signup for user: {message_body}")
    # Simulate some work, like writing to a database
    # For this example, we'll just print and assume success

while True:
    try:
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=20, # Long polling
            VisibilityTimeout=30 # Message invisible for 30s
        )

        if 'Messages' in response:
            message = response['Messages'][0]
            receipt_handle = message['ReceiptHandle']
            message_body = message['Body']

            try:
                process_signup(message_body)
                # If processing is successful, delete the message
                sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=receipt_handle
                )
                print(f"Deleted message: {receipt_handle}")
            except Exception as e:
                print(f"Error processing message {receipt_handle}: {e}")
                # If processing fails, the message will become visible again
                # after VisibilityTimeout expires.
        else:
            print("No messages received.")

    except ClientError as e:
        print(f"SQS Client Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

This code works fine most of the time. When a message is successfully processed, it’s deleted from the queue. But what if the process_signup function succeeds, but the subsequent sqs.delete_message call fails? Or what if the consumer crashes after processing but before deleting?

This is where the "at most once" delivery comes in. SQS marks the message as invisible for the VisibilityTimeout duration (30 seconds in our example). If the consumer doesn’t delete the message within that timeout, SQS makes it visible again, and another consumer (or the same one if it restarts) might pick it up. If our process_signup logic isn’t designed to handle receiving the same event twice, we could end up processing a signup twice, which is bad.

This is why we need idempotency. An operation is idempotent if applying it multiple times has the same effect as applying it once. For our SQS consumer, process_signup needs to be idempotent.

The core problem is that SQS guarantees message delivery and visibility management, but it’s up to our application logic to ensure the effect of processing a message is idempotent.

Here’s how we build that idempotency:

1. Unique Message Identifiers

Every message should have a unique identifier that we can use to track if we’ve already processed it. SQS doesn’t assign a globally unique, persistent ID for this purpose. The MessageId is unique per send, but it’s not what you’d use for idempotency across retries. Instead, the sender should include a unique ID in the message body itself.

For a user_signup event, this could be the user_id if it’s guaranteed to be unique and immutable, or a dedicated event_id generated by the producer.

Producer Side (Conceptual):

import uuid
import boto3

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-signup-queue'

def send_signup_event(user_data):
    event_id = str(uuid.uuid4()) # Unique ID for this event
    message_body = {
        "event_id": event_id,
        "user_id": user_data['id'],
        "email": user_data['email']
        # ... other user data
    }
    sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message_body),
        MessageAttributes={
            'EventType': {'StringValue': 'UserSignup', 'DataType': 'String'}
        }
    )
    print(f"Sent signup event for user {user_data['id']} with event ID {event_id}")

2. Tracking Processed Messages

The consumer needs a way to remember which unique message identifiers it has successfully processed. A common and effective way to do this is using a database or a distributed cache like Redis.

Consumer Side (Enhanced):

First, let’s assume we have a processed_events table in a database (e.g., DynamoDB, PostgreSQL) with a primary key on event_id.

import boto3
import json
from botocore.exceptions import ClientError
import os # For environment variables

# Assume a database client is initialized elsewhere and available
# For demonstration, we'll mock it. In reality, use your DB driver.
class MockProcessedEventsDB:
    def __init__(self):
        self.processed = set()

    def add(self, event_id):
        self.processed.add(event_id)
        print(f"DB: Marked {event_id} as processed.")

    def exists(self, event_id):
        return event_id in self.processed

processed_events_db = MockProcessedEventsDB() # Replace with your actual DB client

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-signup-queue'
visibility_timeout = int(os.environ.get('SQS_VISIBILITY_TIMEOUT', '30')) # Use env var

def process_signup(user_id, email):
    print(f"Attempting to process signup for user_id: {user_id}, email: {email}")
    # Simulate database write for user creation
    # In a real scenario, this might be a SQL INSERT or DynamoDB put_item
    # If this operation fails, it should ideally be transactional or retryable
    print(f"DB: Creating user {user_id} with email {email}")
    # If the DB write fails, we don't want to mark the SQS message as processed.
    # The SQS message will time out and be redelivered.
    # The DB operation itself might need its own retry logic.

while True:
    try:
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=20,
            VisibilityTimeout=visibility_timeout
        )

        if 'Messages' in response:
            message = response['Messages'][0]
            receipt_handle = message['ReceiptHandle']
            message_body_str = message['Body']

            try:
                message_data = json.loads(message_body_str)
                event_id = message_data.get('event_id')
                user_id = message_data.get('user_id')
                email = message_data.get('email')

                if not event_id or not user_id:
                    print(f"Invalid message format, missing event_id or user_id: {message_body_str}")
                    # Delete malformed messages to prevent infinite loops
                    sqs.delete_message(
                        QueueUrl=queue_url,
                        ReceiptHandle=receipt_handle
                    )
                    continue

                # --- Idempotency Check ---
                if processed_events_db.exists(event_id):
                    print(f"Event {event_id} already processed. Deleting duplicate message.")
                    sqs.delete_message(
                        QueueUrl=queue_url,
                        ReceiptHandle=receipt_handle
                    )
                    continue
                # --- End Idempotency Check ---

                # If we reach here, it's a new, unprocessed event.
                process_signup(user_id, email)

                # --- Transactional Logic (Ideal) ---
                # The critical part is to ensure that marking the event as processed
                # and potentially other side effects are atomic.
                # A common pattern:
                # 1. Perform the business logic (e.g., DB write for user creation).
                # 2. Add the event_id to our tracking DB.
                # 3. Delete the SQS message.
                # If step 1 or 2 fails, the SQS message won't be deleted and will be retried.
                # If step 3 fails *after* steps 1 and 2 succeeded, the message will be
                # retried, but our idempotency check will prevent re-processing.

                # For simplicity here, we assume process_signup is "atomic enough"
                # and we add to DB *after* it. A true ACID transaction would be better.
                processed_events_db.add(event_id) # Mark as processed in our tracking DB

                # Delete the message from SQS only AFTER successful processing AND
                # marking it in our tracking DB.
                sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=receipt_handle
                )
                print(f"Successfully processed and deleted event {event_id} (message: {receipt_handle})")

            except json.JSONDecodeError:
                print(f"Failed to decode JSON for message: {message_body_str}")
                # Delete malformed messages
                sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=receipt_handle
                )
            except Exception as e:
                print(f"Error processing message {receipt_handle} (event_id: {event_id}): {e}")
                # IMPORTANT: Do NOT delete the message here.
                # Let it time out and be redelivered. The error might be transient.
                # If the error is permanent, the application logic needs to handle it
                # (e.g., move to a Dead Letter Queue after N retries).
        else:
            print("No messages received.")

    except ClientError as e:
        print(f"SQS Client Error: {e}")
        # Handle specific SQS errors if necessary, e.g., throttling
    except Exception as e:
        print(f"An unexpected error occurred in the consumer loop: {e}")
        # Consider adding a small sleep here to prevent tight loops on persistent errors.
        time.sleep(5)

The key is this processed_events_db.exists(event_id) check. If the event_id is already in our tracking database, we know we’ve handled this exact event before. Even if SQS redelivers the message, we simply delete it without re-executing process_signup.

3. Transactional Guarantees (The Hard Part)

The most robust solution involves making the business logic execution and the marking of the event as processed atomic. This means either both happen, or neither happens.

  • Database Transactions: If your process_signup logic involves writing to a relational database, you can wrap the user creation and the insertion of the event_id into a single database transaction. If the transaction commits, both are recorded. If it rolls back (due to an error), neither is recorded, and the SQS message will be redelivered.

  • Conditional Writes (e.g., DynamoDB): For NoSQL databases like DynamoDB, you can use conditional writes. When you put_item for the user, you can add a condition that the event_id (or a primary key based on it) does not already exist. If it does, the write fails, and you don’t delete the SQS message. You’d then record the event_id after a successful conditional write.

  • Atomic Operations: If using a cache like Redis, you might use SETNX (Set if Not Exists) to atomically add the event_id as a key. If SETNX returns 1, it was a new key, and you proceed. If it returns 0, the key already existed, and you delete the SQS message.

The critical path in the enhanced consumer code is:

  1. Receive message.
  2. Parse message, get event_id.
  3. Check if event_id is already processed (idempotency check).
  4. If processed, delete SQS message and continue.
  5. If not processed: a. Execute business logic (process_signup). b. Atomically mark event_id as processed (e.g., DB insert, cache set). c. Delete SQS message.

The most common pitfall is deleting the SQS message before successfully marking the event as processed in your tracking system. If the marking fails, but the SQS message is deleted, you’ve lost the message and potentially created an inconsistency.

The next problem you’ll encounter is handling transient errors during message processing that aren’t covered by your idempotency check.

Want structured learning?

Take the full Sqs course →