Configure AWS EventBridge Pipes

The aws:pipes/pipe:Pipe resource, part of the Pulumi AWS provider, defines an EventBridge Pipe that connects an event source to a target, optionally filtering and enriching events in transit. This guide focuses on five capabilities: IAM role setup, event filtering, enrichment via API destinations, CloudWatch Logs integration, and SQS-specific configuration.

Pipes require an IAM execution role with permissions to read from the source and write to the target. They may also reference CloudWatch Log Groups or API Destinations for enrichment. The examples are intentionally small. Combine them with your own IAM policies, queues, and monitoring infrastructure.

Connect SQS queues with IAM permissions

Most deployments start by connecting a source queue to a target queue, establishing the IAM role that grants EventBridge Pipes permission to read and write.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const main = aws.getCallerIdentity({});
const example = new aws.iam.Role("example", {assumeRolePolicy: JSON.stringify({
    Version: "2012-10-17",
    Statement: {
        Effect: "Allow",
        Action: "sts:AssumeRole",
        Principal: {
            Service: "pipes.amazonaws.com",
        },
        Condition: {
            StringEquals: {
                "aws:SourceAccount": main.then(main => main.accountId),
            },
        },
    },
})});
const sourceQueue = new aws.sqs.Queue("source", {});
const source = new aws.iam.RolePolicy("source", {
    role: example.id,
    policy: pulumi.jsonStringify({
        Version: "2012-10-17",
        Statement: [{
            Effect: "Allow",
            Action: [
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:ReceiveMessage",
            ],
            Resource: [sourceQueue.arn],
        }],
    }),
});
const targetQueue = new aws.sqs.Queue("target", {});
const target = new aws.iam.RolePolicy("target", {
    role: example.id,
    policy: pulumi.jsonStringify({
        Version: "2012-10-17",
        Statement: [{
            Effect: "Allow",
            Action: ["sqs:SendMessage"],
            Resource: [targetQueue.arn],
        }],
    }),
});
const examplePipe = new aws.pipes.Pipe("example", {
    name: "example-pipe",
    roleArn: example.arn,
    source: sourceQueue.arn,
    target: targetQueue.arn,
}, {
    dependsOn: [
        source,
        target,
    ],
});
import pulumi
import json
import pulumi_aws as aws

main = aws.get_caller_identity()
example = aws.iam.Role("example", assume_role_policy=json.dumps({
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "sts:AssumeRole",
        "Principal": {
            "Service": "pipes.amazonaws.com",
        },
        "Condition": {
            "StringEquals": {
                "aws:SourceAccount": main.account_id,
            },
        },
    },
}))
source_queue = aws.sqs.Queue("source")
source = aws.iam.RolePolicy("source",
    role=example.id,
    policy=pulumi.Output.json_dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:ReceiveMessage",
            ],
            "Resource": [source_queue.arn],
        }],
    }))
target_queue = aws.sqs.Queue("target")
target = aws.iam.RolePolicy("target",
    role=example.id,
    policy=pulumi.Output.json_dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Action": ["sqs:SendMessage"],
            "Resource": [target_queue.arn],
        }],
    }))
example_pipe = aws.pipes.Pipe("example",
    name="example-pipe",
    role_arn=example.arn,
    source=source_queue.arn,
    target=target_queue.arn,
    opts = pulumi.ResourceOptions(depends_on=[
            source,
            target,
        ]))
package main

import (
	"encoding/json"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sqs"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		main, err := aws.GetCallerIdentity(ctx, &aws.GetCallerIdentityArgs{}, nil)
		if err != nil {
			return err
		}
		tmpJSON0, err := json.Marshal(map[string]interface{}{
			"Version": "2012-10-17",
			"Statement": map[string]interface{}{
				"Effect": "Allow",
				"Action": "sts:AssumeRole",
				"Principal": map[string]interface{}{
					"Service": "pipes.amazonaws.com",
				},
				"Condition": map[string]interface{}{
					"StringEquals": map[string]interface{}{
						"aws:SourceAccount": main.AccountId,
					},
				},
			},
		})
		if err != nil {
			return err
		}
		json0 := string(tmpJSON0)
		example, err := iam.NewRole(ctx, "example", &iam.RoleArgs{
			AssumeRolePolicy: pulumi.String(json0),
		})
		if err != nil {
			return err
		}
		sourceQueue, err := sqs.NewQueue(ctx, "source", nil)
		if err != nil {
			return err
		}
		source, err := iam.NewRolePolicy(ctx, "source", &iam.RolePolicyArgs{
			Role: example.ID(),
			Policy: sourceQueue.Arn.ApplyT(func(arn string) (pulumi.String, error) {
				var _zero pulumi.String
				tmpJSON1, err := json.Marshal(map[string]interface{}{
					"Version": "2012-10-17",
					"Statement": []map[string]interface{}{
						map[string]interface{}{
							"Effect": "Allow",
							"Action": []string{
								"sqs:DeleteMessage",
								"sqs:GetQueueAttributes",
								"sqs:ReceiveMessage",
							},
							"Resource": []string{
								arn,
							},
						},
					},
				})
				if err != nil {
					return _zero, err
				}
				json1 := string(tmpJSON1)
				return pulumi.String(json1), nil
			}).(pulumi.StringOutput),
		})
		if err != nil {
			return err
		}
		targetQueue, err := sqs.NewQueue(ctx, "target", nil)
		if err != nil {
			return err
		}
		target, err := iam.NewRolePolicy(ctx, "target", &iam.RolePolicyArgs{
			Role: example.ID(),
			Policy: targetQueue.Arn.ApplyT(func(arn string) (pulumi.String, error) {
				var _zero pulumi.String
				tmpJSON2, err := json.Marshal(map[string]interface{}{
					"Version": "2012-10-17",
					"Statement": []map[string]interface{}{
						map[string]interface{}{
							"Effect": "Allow",
							"Action": []string{
								"sqs:SendMessage",
							},
							"Resource": []string{
								arn,
							},
						},
					},
				})
				if err != nil {
					return _zero, err
				}
				json2 := string(tmpJSON2)
				return pulumi.String(json2), nil
			}).(pulumi.StringOutput),
		})
		if err != nil {
			return err
		}
		_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
			Name:    pulumi.String("example-pipe"),
			RoleArn: example.Arn,
			Source:  sourceQueue.Arn,
			Target:  targetQueue.Arn,
		}, pulumi.DependsOn([]pulumi.Resource{
			source,
			target,
		}))
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var main = Aws.GetCallerIdentity.Invoke();

    var example = new Aws.Iam.Role("example", new()
    {
        AssumeRolePolicy = JsonSerializer.Serialize(new Dictionary<string, object?>
        {
            ["Version"] = "2012-10-17",
            ["Statement"] = new Dictionary<string, object?>
            {
                ["Effect"] = "Allow",
                ["Action"] = "sts:AssumeRole",
                ["Principal"] = new Dictionary<string, object?>
                {
                    ["Service"] = "pipes.amazonaws.com",
                },
                ["Condition"] = new Dictionary<string, object?>
                {
                    ["StringEquals"] = new Dictionary<string, object?>
                    {
                        ["aws:SourceAccount"] = main.Apply(getCallerIdentityResult => getCallerIdentityResult.AccountId),
                    },
                },
            },
        }),
    });

    var sourceQueue = new Aws.Sqs.Queue("source");

    var source = new Aws.Iam.RolePolicy("source", new()
    {
        Role = example.Id,
        Policy = Output.JsonSerialize(Output.Create(new Dictionary<string, object?>
        {
            ["Version"] = "2012-10-17",
            ["Statement"] = new[]
            {
                new Dictionary<string, object?>
                {
                    ["Effect"] = "Allow",
                    ["Action"] = new[]
                    {
                        "sqs:DeleteMessage",
                        "sqs:GetQueueAttributes",
                        "sqs:ReceiveMessage",
                    },
                    ["Resource"] = new[]
                    {
                        sourceQueue.Arn,
                    },
                },
            },
        })),
    });

    var targetQueue = new Aws.Sqs.Queue("target");

    var target = new Aws.Iam.RolePolicy("target", new()
    {
        Role = example.Id,
        Policy = Output.JsonSerialize(Output.Create(new Dictionary<string, object?>
        {
            ["Version"] = "2012-10-17",
            ["Statement"] = new[]
            {
                new Dictionary<string, object?>
                {
                    ["Effect"] = "Allow",
                    ["Action"] = new[]
                    {
                        "sqs:SendMessage",
                    },
                    ["Resource"] = new[]
                    {
                        targetQueue.Arn,
                    },
                },
            },
        })),
    });

    var examplePipe = new Aws.Pipes.Pipe("example", new()
    {
        Name = "example-pipe",
        RoleArn = example.Arn,
        Source = sourceQueue.Arn,
        Target = targetQueue.Arn,
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            source,
            target,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.AwsFunctions;
import com.pulumi.aws.inputs.GetCallerIdentityArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.sqs.Queue;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var main = AwsFunctions.getCallerIdentity(GetCallerIdentityArgs.builder()
            .build());

        var example = new Role("example", RoleArgs.builder()
            .assumeRolePolicy(serializeJson(
                jsonObject(
                    jsonProperty("Version", "2012-10-17"),
                    jsonProperty("Statement", jsonObject(
                        jsonProperty("Effect", "Allow"),
                        jsonProperty("Action", "sts:AssumeRole"),
                        jsonProperty("Principal", jsonObject(
                            jsonProperty("Service", "pipes.amazonaws.com")
                        )),
                        jsonProperty("Condition", jsonObject(
                            jsonProperty("StringEquals", jsonObject(
                                jsonProperty("aws:SourceAccount", main.accountId())
                            ))
                        ))
                    ))
                )))
            .build());

        var sourceQueue = new Queue("sourceQueue");

        var source = new RolePolicy("source", RolePolicyArgs.builder()
            .role(example.id())
            .policy(sourceQueue.arn().applyValue(_arn -> serializeJson(
                jsonObject(
                    jsonProperty("Version", "2012-10-17"),
                    jsonProperty("Statement", jsonArray(jsonObject(
                        jsonProperty("Effect", "Allow"),
                        jsonProperty("Action", jsonArray(
                            "sqs:DeleteMessage", 
                            "sqs:GetQueueAttributes", 
                            "sqs:ReceiveMessage"
                        )),
                        jsonProperty("Resource", jsonArray(_arn))
                    )))
                ))))
            .build());

        var targetQueue = new Queue("targetQueue");

        var target = new RolePolicy("target", RolePolicyArgs.builder()
            .role(example.id())
            .policy(targetQueue.arn().applyValue(_arn -> serializeJson(
                jsonObject(
                    jsonProperty("Version", "2012-10-17"),
                    jsonProperty("Statement", jsonArray(jsonObject(
                        jsonProperty("Effect", "Allow"),
                        jsonProperty("Action", jsonArray("sqs:SendMessage")),
                        jsonProperty("Resource", jsonArray(_arn))
                    )))
                ))))
            .build());

        var examplePipe = new Pipe("examplePipe", PipeArgs.builder()
            .name("example-pipe")
            .roleArn(example.arn())
            .source(sourceQueue.arn())
            .target(targetQueue.arn())
            .build(), CustomResourceOptions.builder()
                .dependsOn(                
                    source,
                    target)
                .build());

    }
}
resources:
  example:
    type: aws:iam:Role
    properties:
      assumeRolePolicy:
        fn::toJSON:
          Version: 2012-10-17
          Statement:
            Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service: pipes.amazonaws.com
            Condition:
              StringEquals:
                aws:SourceAccount: ${main.accountId}
  source:
    type: aws:iam:RolePolicy
    properties:
      role: ${example.id}
      policy:
        fn::toJSON:
          Version: 2012-10-17
          Statement:
            - Effect: Allow
              Action:
                - sqs:DeleteMessage
                - sqs:GetQueueAttributes
                - sqs:ReceiveMessage
              Resource:
                - ${sourceQueue.arn}
  sourceQueue:
    type: aws:sqs:Queue
    name: source
  target:
    type: aws:iam:RolePolicy
    properties:
      role: ${example.id}
      policy:
        fn::toJSON:
          Version: 2012-10-17
          Statement:
            - Effect: Allow
              Action:
                - sqs:SendMessage
              Resource:
                - ${targetQueue.arn}
  targetQueue:
    type: aws:sqs:Queue
    name: target
  examplePipe:
    type: aws:pipes:Pipe
    name: example
    properties:
      name: example-pipe
      roleArn: ${example.arn}
      source: ${sourceQueue.arn}
      target: ${targetQueue.arn}
    options:
      dependsOn:
        - ${source}
        - ${target}
variables:
  main:
    fn::invoke:
      function: aws:getCallerIdentity
      arguments: {}

The pipe reads messages from the source queue and writes them to the target queue. The roleArn property references an IAM role with a trust policy allowing pipes.amazonaws.com to assume it. The inline policies grant specific SQS permissions: ReceiveMessage and DeleteMessage on the source, SendMessage on the target. The dependsOn ensures policies exist before the pipe starts.

Filter events with pattern matching

Pipes can filter incoming events before processing, reducing costs by only forwarding messages that match specific criteria.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.pipes.Pipe("example", {
    name: "example-pipe",
    roleArn: exampleAwsIamRole.arn,
    source: source.arn,
    target: target.arn,
    sourceParameters: {
        filterCriteria: {
            filters: [{
                pattern: JSON.stringify({
                    source: ["event-source"],
                }),
            }],
        },
    },
});
import pulumi
import json
import pulumi_aws as aws

example = aws.pipes.Pipe("example",
    name="example-pipe",
    role_arn=example_aws_iam_role["arn"],
    source=source["arn"],
    target=target["arn"],
    source_parameters={
        "filter_criteria": {
            "filters": [{
                "pattern": json.dumps({
                    "source": ["event-source"],
                }),
            }],
        },
    })
package main

import (
	"encoding/json"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		tmpJSON0, err := json.Marshal(map[string]interface{}{
			"source": []string{
				"event-source",
			},
		})
		if err != nil {
			return err
		}
		json0 := string(tmpJSON0)
		_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
			Name:    pulumi.String("example-pipe"),
			RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
			Source:  pulumi.Any(source.Arn),
			Target:  pulumi.Any(target.Arn),
			SourceParameters: &pipes.PipeSourceParametersArgs{
				FilterCriteria: &pipes.PipeSourceParametersFilterCriteriaArgs{
					Filters: pipes.PipeSourceParametersFilterCriteriaFilterArray{
						&pipes.PipeSourceParametersFilterCriteriaFilterArgs{
							Pattern: pulumi.String(json0),
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Pipes.Pipe("example", new()
    {
        Name = "example-pipe",
        RoleArn = exampleAwsIamRole.Arn,
        Source = source.Arn,
        Target = target.Arn,
        SourceParameters = new Aws.Pipes.Inputs.PipeSourceParametersArgs
        {
            FilterCriteria = new Aws.Pipes.Inputs.PipeSourceParametersFilterCriteriaArgs
            {
                Filters = new[]
                {
                    new Aws.Pipes.Inputs.PipeSourceParametersFilterCriteriaFilterArgs
                    {
                        Pattern = JsonSerializer.Serialize(new Dictionary<string, object?>
                        {
                            ["source"] = new[]
                            {
                                "event-source",
                            },
                        }),
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersFilterCriteriaArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Pipe("example", PipeArgs.builder()
            .name("example-pipe")
            .roleArn(exampleAwsIamRole.arn())
            .source(source.arn())
            .target(target.arn())
            .sourceParameters(PipeSourceParametersArgs.builder()
                .filterCriteria(PipeSourceParametersFilterCriteriaArgs.builder()
                    .filters(PipeSourceParametersFilterCriteriaFilterArgs.builder()
                        .pattern(serializeJson(
                            jsonObject(
                                jsonProperty("source", jsonArray("event-source"))
                            )))
                        .build())
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:pipes:Pipe
    properties:
      name: example-pipe
      roleArn: ${exampleAwsIamRole.arn}
      source: ${source.arn}
      target: ${target.arn}
      sourceParameters:
        filterCriteria:
          filters:
            - pattern:
                fn::toJSON:
                  source:
                    - event-source

The sourceParameters.filterCriteria property defines filters using JSON pattern matching. Each filter’s pattern property contains a JSON object that specifies which event fields to match. Here, only events with a source field equal to “event-source” pass through to the target.

Enrich events with API destination calls

Between source and target, pipes can call an enrichment endpoint to transform or augment event data.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.pipes.Pipe("example", {
    name: "example-pipe",
    roleArn: exampleAwsIamRole.arn,
    source: source.arn,
    target: target.arn,
    enrichment: exampleAwsCloudwatchEventApiDestination.arn,
    enrichmentParameters: {
        httpParameters: {
            pathParameterValues: "example-path-param",
            headerParameters: {
                "example-header": "example-value",
                "second-example-header": "second-example-value",
            },
            queryStringParameters: {
                "example-query-string": "example-value",
                "second-example-query-string": "second-example-value",
            },
        },
    },
});
import pulumi
import pulumi_aws as aws

example = aws.pipes.Pipe("example",
    name="example-pipe",
    role_arn=example_aws_iam_role["arn"],
    source=source["arn"],
    target=target["arn"],
    enrichment=example_aws_cloudwatch_event_api_destination["arn"],
    enrichment_parameters={
        "http_parameters": {
            "path_parameter_values": "example-path-param",
            "header_parameters": {
                "example-header": "example-value",
                "second-example-header": "second-example-value",
            },
            "query_string_parameters": {
                "example-query-string": "example-value",
                "second-example-query-string": "second-example-value",
            },
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
			Name:       pulumi.String("example-pipe"),
			RoleArn:    pulumi.Any(exampleAwsIamRole.Arn),
			Source:     pulumi.Any(source.Arn),
			Target:     pulumi.Any(target.Arn),
			Enrichment: pulumi.Any(exampleAwsCloudwatchEventApiDestination.Arn),
			EnrichmentParameters: &pipes.PipeEnrichmentParametersArgs{
				HttpParameters: &pipes.PipeEnrichmentParametersHttpParametersArgs{
					PathParameterValues: pulumi.String("example-path-param"),
					HeaderParameters: pulumi.StringMap{
						"example-header":        pulumi.String("example-value"),
						"second-example-header": pulumi.String("second-example-value"),
					},
					QueryStringParameters: pulumi.StringMap{
						"example-query-string":        pulumi.String("example-value"),
						"second-example-query-string": pulumi.String("second-example-value"),
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Pipes.Pipe("example", new()
    {
        Name = "example-pipe",
        RoleArn = exampleAwsIamRole.Arn,
        Source = source.Arn,
        Target = target.Arn,
        Enrichment = exampleAwsCloudwatchEventApiDestination.Arn,
        EnrichmentParameters = new Aws.Pipes.Inputs.PipeEnrichmentParametersArgs
        {
            HttpParameters = new Aws.Pipes.Inputs.PipeEnrichmentParametersHttpParametersArgs
            {
                PathParameterValues = "example-path-param",
                HeaderParameters = 
                {
                    { "example-header", "example-value" },
                    { "second-example-header", "second-example-value" },
                },
                QueryStringParameters = 
                {
                    { "example-query-string", "example-value" },
                    { "second-example-query-string", "second-example-value" },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeEnrichmentParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeEnrichmentParametersHttpParametersArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Pipe("example", PipeArgs.builder()
            .name("example-pipe")
            .roleArn(exampleAwsIamRole.arn())
            .source(source.arn())
            .target(target.arn())
            .enrichment(exampleAwsCloudwatchEventApiDestination.arn())
            .enrichmentParameters(PipeEnrichmentParametersArgs.builder()
                .httpParameters(PipeEnrichmentParametersHttpParametersArgs.builder()
                    .pathParameterValues("example-path-param")
                    .headerParameters(Map.ofEntries(
                        Map.entry("example-header", "example-value"),
                        Map.entry("second-example-header", "second-example-value")
                    ))
                    .queryStringParameters(Map.ofEntries(
                        Map.entry("example-query-string", "example-value"),
                        Map.entry("second-example-query-string", "second-example-value")
                    ))
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:pipes:Pipe
    properties:
      name: example-pipe
      roleArn: ${exampleAwsIamRole.arn}
      source: ${source.arn}
      target: ${target.arn}
      enrichment: ${exampleAwsCloudwatchEventApiDestination.arn}
      enrichmentParameters:
        httpParameters:
          pathParameterValues: example-path-param
          headerParameters:
            example-header: example-value
            second-example-header: second-example-value
          queryStringParameters:
            example-query-string: example-value
            second-example-query-string: second-example-value

The enrichment property points to an API destination ARN. The enrichmentParameters.httpParameters block configures the HTTP request: pathParameterValues sets URL path segments, headerParameters adds HTTP headers, and queryStringParameters appends query strings. EventBridge calls this endpoint for each event, passing the event data and using the response as input to the target.

Send execution logs to CloudWatch Logs

For debugging and monitoring, pipes can send execution details to CloudWatch Logs, capturing what data flows through and any errors.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.cloudwatch.LogGroup("example", {name: "example-pipe-target"});
const examplePipe = new aws.pipes.Pipe("example", {
    name: "example-pipe",
    roleArn: exampleAwsIamRole.arn,
    source: sourceAwsSqsQueue.arn,
    target: targetAwsSqsQueue.arn,
    logConfiguration: {
        includeExecutionDatas: ["ALL"],
        level: "INFO",
        cloudwatchLogsLogDestination: {
            logGroupArn: targetAwsCloudwatchLogGroup.arn,
        },
    },
}, {
    dependsOn: [
        source,
        target,
    ],
});
import pulumi
import pulumi_aws as aws

example = aws.cloudwatch.LogGroup("example", name="example-pipe-target")
example_pipe = aws.pipes.Pipe("example",
    name="example-pipe",
    role_arn=example_aws_iam_role["arn"],
    source=source_aws_sqs_queue["arn"],
    target=target_aws_sqs_queue["arn"],
    log_configuration={
        "include_execution_datas": ["ALL"],
        "level": "INFO",
        "cloudwatch_logs_log_destination": {
            "log_group_arn": target_aws_cloudwatch_log_group["arn"],
        },
    },
    opts = pulumi.ResourceOptions(depends_on=[
            source,
            target,
        ]))
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
			Name: pulumi.String("example-pipe-target"),
		})
		if err != nil {
			return err
		}
		_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
			Name:    pulumi.String("example-pipe"),
			RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
			Source:  pulumi.Any(sourceAwsSqsQueue.Arn),
			Target:  pulumi.Any(targetAwsSqsQueue.Arn),
			LogConfiguration: &pipes.PipeLogConfigurationArgs{
				IncludeExecutionDatas: pulumi.StringArray{
					pulumi.String("ALL"),
				},
				Level: pulumi.String("INFO"),
				CloudwatchLogsLogDestination: &pipes.PipeLogConfigurationCloudwatchLogsLogDestinationArgs{
					LogGroupArn: pulumi.Any(targetAwsCloudwatchLogGroup.Arn),
				},
			},
		}, pulumi.DependsOn([]pulumi.Resource{
			source,
			target,
		}))
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.CloudWatch.LogGroup("example", new()
    {
        Name = "example-pipe-target",
    });

    var examplePipe = new Aws.Pipes.Pipe("example", new()
    {
        Name = "example-pipe",
        RoleArn = exampleAwsIamRole.Arn,
        Source = sourceAwsSqsQueue.Arn,
        Target = targetAwsSqsQueue.Arn,
        LogConfiguration = new Aws.Pipes.Inputs.PipeLogConfigurationArgs
        {
            IncludeExecutionDatas = new[]
            {
                "ALL",
            },
            Level = "INFO",
            CloudwatchLogsLogDestination = new Aws.Pipes.Inputs.PipeLogConfigurationCloudwatchLogsLogDestinationArgs
            {
                LogGroupArn = targetAwsCloudwatchLogGroup.Arn,
            },
        },
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            source,
            target,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeLogConfigurationArgs;
import com.pulumi.aws.pipes.inputs.PipeLogConfigurationCloudwatchLogsLogDestinationArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new LogGroup("example", LogGroupArgs.builder()
            .name("example-pipe-target")
            .build());

        var examplePipe = new Pipe("examplePipe", PipeArgs.builder()
            .name("example-pipe")
            .roleArn(exampleAwsIamRole.arn())
            .source(sourceAwsSqsQueue.arn())
            .target(targetAwsSqsQueue.arn())
            .logConfiguration(PipeLogConfigurationArgs.builder()
                .includeExecutionDatas("ALL")
                .level("INFO")
                .cloudwatchLogsLogDestination(PipeLogConfigurationCloudwatchLogsLogDestinationArgs.builder()
                    .logGroupArn(targetAwsCloudwatchLogGroup.arn())
                    .build())
                .build())
            .build(), CustomResourceOptions.builder()
                .dependsOn(                
                    source,
                    target)
                .build());

    }
}
resources:
  example:
    type: aws:cloudwatch:LogGroup
    properties:
      name: example-pipe-target
  examplePipe:
    type: aws:pipes:Pipe
    name: example
    properties:
      name: example-pipe
      roleArn: ${exampleAwsIamRole.arn}
      source: ${sourceAwsSqsQueue.arn}
      target: ${targetAwsSqsQueue.arn}
      logConfiguration:
        includeExecutionDatas:
          - ALL
        level: INFO
        cloudwatchLogsLogDestination:
          logGroupArn: ${targetAwsCloudwatchLogGroup.arn}
    options:
      dependsOn:
        - ${source}
        - ${target}

The logConfiguration property enables logging. The includeExecutionDatas array controls what data to log (ALL includes full event payloads). The level property sets log verbosity (INFO, ERROR, TRACE, or OFF). The cloudwatchLogsLogDestination.logGroupArn points to where logs are written. Your IAM role needs cloudwatch:PutLogEvents permission.

Tune SQS batching and message attributes

When both source and target are SQS queues, you can control batching behavior and set message attributes for FIFO queue requirements.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.pipes.Pipe("example", {
    name: "example-pipe",
    roleArn: exampleAwsIamRole.arn,
    source: source.arn,
    target: target.arn,
    sourceParameters: {
        sqsQueueParameters: {
            batchSize: 1,
            maximumBatchingWindowInSeconds: 2,
        },
    },
    targetParameters: {
        sqsQueueParameters: {
            messageDeduplicationId: "example-dedupe",
            messageGroupId: "example-group",
        },
    },
});
import pulumi
import pulumi_aws as aws

example = aws.pipes.Pipe("example",
    name="example-pipe",
    role_arn=example_aws_iam_role["arn"],
    source=source["arn"],
    target=target["arn"],
    source_parameters={
        "sqs_queue_parameters": {
            "batch_size": 1,
            "maximum_batching_window_in_seconds": 2,
        },
    },
    target_parameters={
        "sqs_queue_parameters": {
            "message_deduplication_id": "example-dedupe",
            "message_group_id": "example-group",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
			Name:    pulumi.String("example-pipe"),
			RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
			Source:  pulumi.Any(source.Arn),
			Target:  pulumi.Any(target.Arn),
			SourceParameters: &pipes.PipeSourceParametersArgs{
				SqsQueueParameters: &pipes.PipeSourceParametersSqsQueueParametersArgs{
					BatchSize:                      pulumi.Int(1),
					MaximumBatchingWindowInSeconds: pulumi.Int(2),
				},
			},
			TargetParameters: &pipes.PipeTargetParametersArgs{
				SqsQueueParameters: &pipes.PipeTargetParametersSqsQueueParametersArgs{
					MessageDeduplicationId: pulumi.String("example-dedupe"),
					MessageGroupId:         pulumi.String("example-group"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Pipes.Pipe("example", new()
    {
        Name = "example-pipe",
        RoleArn = exampleAwsIamRole.Arn,
        Source = source.Arn,
        Target = target.Arn,
        SourceParameters = new Aws.Pipes.Inputs.PipeSourceParametersArgs
        {
            SqsQueueParameters = new Aws.Pipes.Inputs.PipeSourceParametersSqsQueueParametersArgs
            {
                BatchSize = 1,
                MaximumBatchingWindowInSeconds = 2,
            },
        },
        TargetParameters = new Aws.Pipes.Inputs.PipeTargetParametersArgs
        {
            SqsQueueParameters = new Aws.Pipes.Inputs.PipeTargetParametersSqsQueueParametersArgs
            {
                MessageDeduplicationId = "example-dedupe",
                MessageGroupId = "example-group",
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersSqsQueueParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeTargetParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeTargetParametersSqsQueueParametersArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Pipe("example", PipeArgs.builder()
            .name("example-pipe")
            .roleArn(exampleAwsIamRole.arn())
            .source(source.arn())
            .target(target.arn())
            .sourceParameters(PipeSourceParametersArgs.builder()
                .sqsQueueParameters(PipeSourceParametersSqsQueueParametersArgs.builder()
                    .batchSize(1)
                    .maximumBatchingWindowInSeconds(2)
                    .build())
                .build())
            .targetParameters(PipeTargetParametersArgs.builder()
                .sqsQueueParameters(PipeTargetParametersSqsQueueParametersArgs.builder()
                    .messageDeduplicationId("example-dedupe")
                    .messageGroupId("example-group")
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:pipes:Pipe
    properties:
      name: example-pipe
      roleArn: ${exampleAwsIamRole.arn}
      source: ${source.arn}
      target: ${target.arn}
      sourceParameters:
        sqsQueueParameters:
          batchSize: 1
          maximumBatchingWindowInSeconds: 2
      targetParameters:
        sqsQueueParameters:
          messageDeduplicationId: example-dedupe
          messageGroupId: example-group

The sourceParameters.sqsQueueParameters block controls how the pipe reads from SQS: batchSize sets how many messages to retrieve per poll, and maximumBatchingWindowInSeconds sets how long to wait for a full batch. The targetParameters.sqsQueueParameters block sets message attributes when writing: messageDeduplicationId prevents duplicates, and messageGroupId groups messages in FIFO queues.

Beyond these examples

These snippets focus on specific pipe-level features: IAM role configuration and permissions, event filtering and enrichment, and CloudWatch Logs integration and SQS-specific tuning. They’re intentionally minimal rather than full event-driven architectures.

The examples may reference pre-existing infrastructure such as SQS queues (source and target), CloudWatch Log Groups, and API Destinations for enrichment. They focus on configuring the pipe rather than provisioning everything around it.

To keep things focused, common pipe patterns are omitted, including:

  • KMS encryption (kmsKeyIdentifier)
  • Pipe state management (desiredState for RUNNING/STOPPED)
  • Other source types (Kinesis, DynamoDB Streams, Kafka)
  • Other target types (Step Functions, EventBridge buses, Lambda)

These omissions are intentional: the goal is to illustrate how each pipe feature is wired, not provide drop-in event processing modules. See the EventBridge Pipe resource reference for all available configuration options.

Let's configure AWS EventBridge Pipes

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

IAM & Permissions
What IAM permissions does my pipe need?
Your pipe’s IAM role needs permissions for both source and target resources. For example, an SQS source requires sqs:ReceiveMessage, sqs:GetQueueAttributes, and sqs:DeleteMessage, while an SQS target requires sqs:SendMessage. The role must also have a trust policy allowing pipes.amazonaws.com to assume it.
Why should I use dependsOn with IAM policies?
IAM policies must be attached to the role before creating the pipe. Use dependsOn to ensure policy resources are created first, preventing pipe creation failures due to missing permissions.
Configuration & Setup
What's the difference between name and namePrefix?
Use name to specify an exact pipe name, or namePrefix to auto-generate a unique name with your prefix. These properties are mutually exclusive—you can only use one.
What properties can't I change after creating my pipe?
The name and source properties are immutable. Changing either requires replacing the entire pipe resource.
How do I connect to a self-managed Kafka cluster?
For self-managed Kafka, use smk:// followed by the bootstrap server’s address as the source value, instead of an ARN.
How do I control whether my pipe is running?
Set desiredState to RUNNING to start the pipe or STOPPED to pause it.
Enrichment & Filtering
How do I filter which events my pipe processes?
Configure sourceParameters.filterCriteria.filters with a pattern matching the events you want to process.
How do I transform data between source and target?
Set the enrichment property to an enrichment resource ARN (like an API Destination) and configure enrichmentParameters with HTTP parameters for path, headers, and query strings.
Logging & Monitoring
How do I enable CloudWatch logging for my pipe?
Configure logConfiguration with a log level (e.g., INFO), includeExecutionDatas (e.g., ALL), and cloudwatchLogsLogDestination pointing to your log group ARN.
Advanced Configuration
How do I configure SQS batch processing?
Use sourceParameters.sqsQueueParameters to set batchSize and maximumBatchingWindowInSeconds for source batching, and targetParameters.sqsQueueParameters for target-specific settings like messageDeduplicationId and messageGroupId.
How is my pipe data encrypted?
By default, EventBridge uses an AWS owned key to encrypt pipe data. To use your own key, specify kmsKeyIdentifier with your customer managed KMS key ARN, key ID, or alias.
Is EventBridge Pipes the same as CloudWatch Events?
EventBridge was formerly known as CloudWatch Events, but the functionality is identical.

Using a different cloud?

Explore integration guides for other cloud providers: