EventBridge Pipes let you pipe messages from an SQS queue to a target, transforming them on the fly.

Here’s a simple pipe that takes messages from a standard SQS queue, transforms them, and sends them to a Lambda function.

{
  "Name": "SQSEventBridgePipeToLambda",
  "Description": "Pipe messages from SQS to Lambda with transformation",
  "Source": {
    "Arn": "arn:aws:sqs:us-east-1:123456789012:MySQSQueue",
    "SqsQueueParameters": {
      "BatchSize": 10
    }
  },
  "Enrichment": null,
  "Target": {
    "Arn": "arn:aws:lambda:us-east-1:123456789012:function:MyLambdaFunction",
    "InputTemplate": "{\"messageBody\": \"<$.body>\", \"messageId\": \"<$.attributes.MessageId>\"}"
  },
  "RoleArn": "arn:aws:iam::123456789012:role/service-role/AmazonEventBridgePipeRole"
}

This pipe connects MySQSQueue to MyLambdaFunction. When a message arrives in MySQSQueue, EventBridge Pipes will:

  1. Poll the SQS queue: It pulls messages in batches, up to the BatchSize defined in SqsQueueParameters.
  2. Process each message: For each message, it applies the Target.InputTemplate to format the data that will be sent to the target. In this example, it creates a JSON object containing the message body and its ID.
  3. Invoke the target: It invokes MyLambdaFunction with the transformed payload.

The RoleArn is crucial. The IAM role associated with the pipe needs permissions to:

  • Read from the SQS queue (sqs:ReceiveMessage, sqs:DeleteMessage, sqs:GetQueueAttributes).
  • Invoke the target (e.g., lambda:InvokeFunction for a Lambda target).
  • Use the EventBridge service (events:PutEvents if you were sending to another EventBridge bus).

The InputTemplate uses JSONPath to extract data from the SQS message. <$.body> refers to the message’s content, and <$.attributes.MessageId> refers to its unique identifier. You can access other message attributes as well, like <$.attributes.SenderId>.

This pattern is incredibly useful for decoupling services. Your SQS queue acts as a buffer, and EventBridge Pipes reliably deliver messages to downstream consumers, handling retries and error management automatically. You can also insert an "enrichment" step between the source and target. This could be another Lambda function, an API Gateway endpoint, or even another SQS queue, allowing you to modify or add data to the message before it reaches its final destination.

The InputTemplate is not just for simple JSON construction. You can embed strings, numbers, and even use conditional logic or loops if your transformation needs are more complex, though for truly complex logic, an explicit enrichment step is often cleaner.

When using FIFO queues with EventBridge Pipes, you need to be mindful of message deduplication and ordering. Pipes will preserve the order of messages within a batch they poll, but the overall ordering guarantee relies on the FIFO queue’s behavior and how the pipe polls.

The BatchSize parameter for SQS sources is a critical tuning knob. A larger BatchSize can improve throughput but increases the memory footprint for the pipe’s internal processing. A smaller BatchSize might be necessary for targets with strict per-message processing limits or to reduce the blast radius of a single failed batch.

The Target.InputTemplate is evaluated using the jsonpath-plus library. This means you have access to a rich set of JSONPath expressions, including array indexing, filtering, and negation. For example, if your SQS message body was a JSON string like {"user": {"id": 123, "name": "Alice"}}, you could access the user’s name with <$.body.user.name>.

When you define a pipe, EventBridge creates an internal polling mechanism for the SQS source. This mechanism is designed to be highly available and scalable. If the pipe encounters an error while processing a message (e.g., the target is unavailable), it will automatically retry based on the retry policies configured for the pipe and the source/target. For SQS sources, this means messages will remain visible in the queue until successfully processed or until their visibility timeout expires.

One common pitfall is IAM permissions. If the pipe’s role lacks the necessary sqs:DeleteMessage permission, messages will never be removed from the queue after successful processing, leading to a backlog and repeated invocations. Ensure the role has sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes for the specific SQS queue.

The next step after setting up a basic pipe is often adding an enrichment step to modify or validate message data before it hits the final target.

Want structured learning?

Take the full Sqs course →