The aws:pipes/pipe:Pipe resource, part of the Pulumi AWS provider, defines an EventBridge Pipe that connects event sources to targets with optional enrichment and filtering. This guide focuses on four capabilities: IAM role configuration, event enrichment, pattern-based filtering, and CloudWatch Logs integration.
EventBridge Pipes connect existing event sources like SQS queues, DynamoDB Streams, and Kinesis streams to targets such as other SQS queues, Lambda functions, or Step Functions. The examples are intentionally small. Combine them with your own event sources, targets, and IAM policies.
Connect SQS queues with IAM permissions
Most pipes start by connecting a source to a target with the necessary IAM permissions for reading and writing.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const main = aws.getCallerIdentity({});
const example = new aws.iam.Role("example", {assumeRolePolicy: JSON.stringify({
Version: "2012-10-17",
Statement: {
Effect: "Allow",
Action: "sts:AssumeRole",
Principal: {
Service: "pipes.amazonaws.com",
},
Condition: {
StringEquals: {
"aws:SourceAccount": main.then(main => main.accountId),
},
},
},
})});
const sourceQueue = new aws.sqs.Queue("source", {});
const source = new aws.iam.RolePolicy("source", {
role: example.id,
policy: pulumi.jsonStringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Action: [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
],
Resource: [sourceQueue.arn],
}],
}),
});
const targetQueue = new aws.sqs.Queue("target", {});
const target = new aws.iam.RolePolicy("target", {
role: example.id,
policy: pulumi.jsonStringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Action: ["sqs:SendMessage"],
Resource: [targetQueue.arn],
}],
}),
});
const examplePipe = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: example.arn,
source: sourceQueue.arn,
target: targetQueue.arn,
}, {
dependsOn: [
source,
target,
],
});
import pulumi
import json
import pulumi_aws as aws
main = aws.get_caller_identity()
example = aws.iam.Role("example", assume_role_policy=json.dumps({
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Principal": {
"Service": "pipes.amazonaws.com",
},
"Condition": {
"StringEquals": {
"aws:SourceAccount": main.account_id,
},
},
},
}))
source_queue = aws.sqs.Queue("source")
source = aws.iam.RolePolicy("source",
role=example.id,
policy=pulumi.Output.json_dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
],
"Resource": [source_queue.arn],
}],
}))
target_queue = aws.sqs.Queue("target")
target = aws.iam.RolePolicy("target",
role=example.id,
policy=pulumi.Output.json_dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["sqs:SendMessage"],
"Resource": [target_queue.arn],
}],
}))
example_pipe = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example.arn,
source=source_queue.arn,
target=target_queue.arn,
opts = pulumi.ResourceOptions(depends_on=[
source,
target,
]))
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sqs"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
main, err := aws.GetCallerIdentity(ctx, &aws.GetCallerIdentityArgs{}, nil)
if err != nil {
return err
}
tmpJSON0, err := json.Marshal(map[string]interface{}{
"Version": "2012-10-17",
"Statement": map[string]interface{}{
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Principal": map[string]interface{}{
"Service": "pipes.amazonaws.com",
},
"Condition": map[string]interface{}{
"StringEquals": map[string]interface{}{
"aws:SourceAccount": main.AccountId,
},
},
},
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
example, err := iam.NewRole(ctx, "example", &iam.RoleArgs{
AssumeRolePolicy: pulumi.String(json0),
})
if err != nil {
return err
}
sourceQueue, err := sqs.NewQueue(ctx, "source", nil)
if err != nil {
return err
}
source, err := iam.NewRolePolicy(ctx, "source", &iam.RolePolicyArgs{
Role: example.ID(),
Policy: sourceQueue.Arn.ApplyT(func(arn string) (pulumi.String, error) {
var _zero pulumi.String
tmpJSON1, err := json.Marshal(map[string]interface{}{
"Version": "2012-10-17",
"Statement": []map[string]interface{}{
map[string]interface{}{
"Effect": "Allow",
"Action": []string{
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
},
"Resource": []string{
arn,
},
},
},
})
if err != nil {
return _zero, err
}
json1 := string(tmpJSON1)
return pulumi.String(json1), nil
}).(pulumi.StringOutput),
})
if err != nil {
return err
}
targetQueue, err := sqs.NewQueue(ctx, "target", nil)
if err != nil {
return err
}
target, err := iam.NewRolePolicy(ctx, "target", &iam.RolePolicyArgs{
Role: example.ID(),
Policy: targetQueue.Arn.ApplyT(func(arn string) (pulumi.String, error) {
var _zero pulumi.String
tmpJSON2, err := json.Marshal(map[string]interface{}{
"Version": "2012-10-17",
"Statement": []map[string]interface{}{
map[string]interface{}{
"Effect": "Allow",
"Action": []string{
"sqs:SendMessage",
},
"Resource": []string{
arn,
},
},
},
})
if err != nil {
return _zero, err
}
json2 := string(tmpJSON2)
return pulumi.String(json2), nil
}).(pulumi.StringOutput),
})
if err != nil {
return err
}
_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: example.Arn,
Source: sourceQueue.Arn,
Target: targetQueue.Arn,
}, pulumi.DependsOn([]pulumi.Resource{
source,
target,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var main = Aws.GetCallerIdentity.Invoke();
var example = new Aws.Iam.Role("example", new()
{
AssumeRolePolicy = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["Version"] = "2012-10-17",
["Statement"] = new Dictionary<string, object?>
{
["Effect"] = "Allow",
["Action"] = "sts:AssumeRole",
["Principal"] = new Dictionary<string, object?>
{
["Service"] = "pipes.amazonaws.com",
},
["Condition"] = new Dictionary<string, object?>
{
["StringEquals"] = new Dictionary<string, object?>
{
["aws:SourceAccount"] = main.Apply(getCallerIdentityResult => getCallerIdentityResult.AccountId),
},
},
},
}),
});
var sourceQueue = new Aws.Sqs.Queue("source");
var source = new Aws.Iam.RolePolicy("source", new()
{
Role = example.Id,
Policy = Output.JsonSerialize(Output.Create(new Dictionary<string, object?>
{
["Version"] = "2012-10-17",
["Statement"] = new[]
{
new Dictionary<string, object?>
{
["Effect"] = "Allow",
["Action"] = new[]
{
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
},
["Resource"] = new[]
{
sourceQueue.Arn,
},
},
},
})),
});
var targetQueue = new Aws.Sqs.Queue("target");
var target = new Aws.Iam.RolePolicy("target", new()
{
Role = example.Id,
Policy = Output.JsonSerialize(Output.Create(new Dictionary<string, object?>
{
["Version"] = "2012-10-17",
["Statement"] = new[]
{
new Dictionary<string, object?>
{
["Effect"] = "Allow",
["Action"] = new[]
{
"sqs:SendMessage",
},
["Resource"] = new[]
{
targetQueue.Arn,
},
},
},
})),
});
var examplePipe = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = example.Arn,
Source = sourceQueue.Arn,
Target = targetQueue.Arn,
}, new CustomResourceOptions
{
DependsOn =
{
source,
target,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.AwsFunctions;
import com.pulumi.aws.inputs.GetCallerIdentityArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.sqs.Queue;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var main = AwsFunctions.getCallerIdentity(GetCallerIdentityArgs.builder()
.build());
var example = new Role("example", RoleArgs.builder()
.assumeRolePolicy(serializeJson(
jsonObject(
jsonProperty("Version", "2012-10-17"),
jsonProperty("Statement", jsonObject(
jsonProperty("Effect", "Allow"),
jsonProperty("Action", "sts:AssumeRole"),
jsonProperty("Principal", jsonObject(
jsonProperty("Service", "pipes.amazonaws.com")
)),
jsonProperty("Condition", jsonObject(
jsonProperty("StringEquals", jsonObject(
jsonProperty("aws:SourceAccount", main.accountId())
))
))
))
)))
.build());
var sourceQueue = new Queue("sourceQueue");
var source = new RolePolicy("source", RolePolicyArgs.builder()
.role(example.id())
.policy(sourceQueue.arn().applyValue(_arn -> serializeJson(
jsonObject(
jsonProperty("Version", "2012-10-17"),
jsonProperty("Statement", jsonArray(jsonObject(
jsonProperty("Effect", "Allow"),
jsonProperty("Action", jsonArray(
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage"
)),
jsonProperty("Resource", jsonArray(_arn))
)))
))))
.build());
var targetQueue = new Queue("targetQueue");
var target = new RolePolicy("target", RolePolicyArgs.builder()
.role(example.id())
.policy(targetQueue.arn().applyValue(_arn -> serializeJson(
jsonObject(
jsonProperty("Version", "2012-10-17"),
jsonProperty("Statement", jsonArray(jsonObject(
jsonProperty("Effect", "Allow"),
jsonProperty("Action", jsonArray("sqs:SendMessage")),
jsonProperty("Resource", jsonArray(_arn))
)))
))))
.build());
var examplePipe = new Pipe("examplePipe", PipeArgs.builder()
.name("example-pipe")
.roleArn(example.arn())
.source(sourceQueue.arn())
.target(targetQueue.arn())
.build(), CustomResourceOptions.builder()
.dependsOn(
source,
target)
.build());
}
}
resources:
example:
type: aws:iam:Role
properties:
assumeRolePolicy:
fn::toJSON:
Version: 2012-10-17
Statement:
Effect: Allow
Action: sts:AssumeRole
Principal:
Service: pipes.amazonaws.com
Condition:
StringEquals:
aws:SourceAccount: ${main.accountId}
source:
type: aws:iam:RolePolicy
properties:
role: ${example.id}
policy:
fn::toJSON:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ReceiveMessage
Resource:
- ${sourceQueue.arn}
sourceQueue:
type: aws:sqs:Queue
name: source
target:
type: aws:iam:RolePolicy
properties:
role: ${example.id}
policy:
fn::toJSON:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource:
- ${targetQueue.arn}
targetQueue:
type: aws:sqs:Queue
name: target
examplePipe:
type: aws:pipes:Pipe
name: example
properties:
name: example-pipe
roleArn: ${example.arn}
source: ${sourceQueue.arn}
target: ${targetQueue.arn}
options:
dependsOn:
- ${source}
- ${target}
variables:
main:
fn::invoke:
function: aws:getCallerIdentity
arguments: {}
The pipe reads messages from the source queue and delivers them to the target queue. The roleArn grants permissions to both read from the source (sqs:ReceiveMessage, sqs:DeleteMessage) and write to the target (sqs:SendMessage). The dependsOn ensures IAM policies are attached before the pipe starts processing.
Enrich events with API Destination calls
Applications often need to augment events with data from external APIs before delivering them to targets.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: source.arn,
target: target.arn,
enrichment: exampleAwsCloudwatchEventApiDestination.arn,
enrichmentParameters: {
httpParameters: {
pathParameterValues: "example-path-param",
headerParameters: {
"example-header": "example-value",
"second-example-header": "second-example-value",
},
queryStringParameters: {
"example-query-string": "example-value",
"second-example-query-string": "second-example-value",
},
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source["arn"],
target=target["arn"],
enrichment=example_aws_cloudwatch_event_api_destination["arn"],
enrichment_parameters={
"http_parameters": {
"path_parameter_values": "example-path-param",
"header_parameters": {
"example-header": "example-value",
"second-example-header": "second-example-value",
},
"query_string_parameters": {
"example-query-string": "example-value",
"second-example-query-string": "second-example-value",
},
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(source.Arn),
Target: pulumi.Any(target.Arn),
Enrichment: pulumi.Any(exampleAwsCloudwatchEventApiDestination.Arn),
EnrichmentParameters: &pipes.PipeEnrichmentParametersArgs{
HttpParameters: &pipes.PipeEnrichmentParametersHttpParametersArgs{
PathParameterValues: pulumi.String("example-path-param"),
HeaderParameters: pulumi.StringMap{
"example-header": pulumi.String("example-value"),
"second-example-header": pulumi.String("second-example-value"),
},
QueryStringParameters: pulumi.StringMap{
"example-query-string": pulumi.String("example-value"),
"second-example-query-string": pulumi.String("second-example-value"),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = source.Arn,
Target = target.Arn,
Enrichment = exampleAwsCloudwatchEventApiDestination.Arn,
EnrichmentParameters = new Aws.Pipes.Inputs.PipeEnrichmentParametersArgs
{
HttpParameters = new Aws.Pipes.Inputs.PipeEnrichmentParametersHttpParametersArgs
{
PathParameterValues = "example-path-param",
HeaderParameters =
{
{ "example-header", "example-value" },
{ "second-example-header", "second-example-value" },
},
QueryStringParameters =
{
{ "example-query-string", "example-value" },
{ "second-example-query-string", "second-example-value" },
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeEnrichmentParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeEnrichmentParametersHttpParametersArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Pipe("example", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(source.arn())
.target(target.arn())
.enrichment(exampleAwsCloudwatchEventApiDestination.arn())
.enrichmentParameters(PipeEnrichmentParametersArgs.builder()
.httpParameters(PipeEnrichmentParametersHttpParametersArgs.builder()
.pathParameterValues("example-path-param")
.headerParameters(Map.ofEntries(
Map.entry("example-header", "example-value"),
Map.entry("second-example-header", "second-example-value")
))
.queryStringParameters(Map.ofEntries(
Map.entry("example-query-string", "example-value"),
Map.entry("second-example-query-string", "second-example-value")
))
.build())
.build())
.build());
}
}
resources:
example:
type: aws:pipes:Pipe
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${source.arn}
target: ${target.arn}
enrichment: ${exampleAwsCloudwatchEventApiDestination.arn}
enrichmentParameters:
httpParameters:
pathParameterValues: example-path-param
headerParameters:
example-header: example-value
second-example-header: second-example-value
queryStringParameters:
example-query-string: example-value
second-example-query-string: second-example-value
The enrichment property points to an API Destination that receives each event. The enrichmentParameters configure HTTP headers, path parameters, and query strings that are sent with each request. The pipe waits for the API response, then forwards the enriched payload to the target.
Filter events with pattern matching
Not all events from a source need to reach the target. Filters reduce processing costs by selecting only matching events.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: source.arn,
target: target.arn,
sourceParameters: {
filterCriteria: {
filters: [{
pattern: JSON.stringify({
source: ["event-source"],
}),
}],
},
},
});
import pulumi
import json
import pulumi_aws as aws
example = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source["arn"],
target=target["arn"],
source_parameters={
"filter_criteria": {
"filters": [{
"pattern": json.dumps({
"source": ["event-source"],
}),
}],
},
})
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
tmpJSON0, err := json.Marshal(map[string]interface{}{
"source": []string{
"event-source",
},
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(source.Arn),
Target: pulumi.Any(target.Arn),
SourceParameters: &pipes.PipeSourceParametersArgs{
FilterCriteria: &pipes.PipeSourceParametersFilterCriteriaArgs{
Filters: pipes.PipeSourceParametersFilterCriteriaFilterArray{
&pipes.PipeSourceParametersFilterCriteriaFilterArgs{
Pattern: pulumi.String(json0),
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = source.Arn,
Target = target.Arn,
SourceParameters = new Aws.Pipes.Inputs.PipeSourceParametersArgs
{
FilterCriteria = new Aws.Pipes.Inputs.PipeSourceParametersFilterCriteriaArgs
{
Filters = new[]
{
new Aws.Pipes.Inputs.PipeSourceParametersFilterCriteriaFilterArgs
{
Pattern = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["source"] = new[]
{
"event-source",
},
}),
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersFilterCriteriaArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Pipe("example", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(source.arn())
.target(target.arn())
.sourceParameters(PipeSourceParametersArgs.builder()
.filterCriteria(PipeSourceParametersFilterCriteriaArgs.builder()
.filters(PipeSourceParametersFilterCriteriaFilterArgs.builder()
.pattern(serializeJson(
jsonObject(
jsonProperty("source", jsonArray("event-source"))
)))
.build())
.build())
.build())
.build());
}
}
resources:
example:
type: aws:pipes:Pipe
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${source.arn}
target: ${target.arn}
sourceParameters:
filterCriteria:
filters:
- pattern:
fn::toJSON:
source:
- event-source
The filterCriteria property defines JSON patterns that events must match. In this configuration, only events with a source field equal to “event-source” pass through. Events that don’t match are dropped before reaching the target.
Send execution logs to CloudWatch
Debugging pipes requires visibility into what events are processed and how they’re transformed.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.cloudwatch.LogGroup("example", {name: "example-pipe-target"});
const examplePipe = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: sourceAwsSqsQueue.arn,
target: targetAwsSqsQueue.arn,
logConfiguration: {
includeExecutionDatas: ["ALL"],
level: "INFO",
cloudwatchLogsLogDestination: {
logGroupArn: targetAwsCloudwatchLogGroup.arn,
},
},
}, {
dependsOn: [
source,
target,
],
});
import pulumi
import pulumi_aws as aws
example = aws.cloudwatch.LogGroup("example", name="example-pipe-target")
example_pipe = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source_aws_sqs_queue["arn"],
target=target_aws_sqs_queue["arn"],
log_configuration={
"include_execution_datas": ["ALL"],
"level": "INFO",
"cloudwatch_logs_log_destination": {
"log_group_arn": target_aws_cloudwatch_log_group["arn"],
},
},
opts = pulumi.ResourceOptions(depends_on=[
source,
target,
]))
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
Name: pulumi.String("example-pipe-target"),
})
if err != nil {
return err
}
_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(sourceAwsSqsQueue.Arn),
Target: pulumi.Any(targetAwsSqsQueue.Arn),
LogConfiguration: &pipes.PipeLogConfigurationArgs{
IncludeExecutionDatas: pulumi.StringArray{
pulumi.String("ALL"),
},
Level: pulumi.String("INFO"),
CloudwatchLogsLogDestination: &pipes.PipeLogConfigurationCloudwatchLogsLogDestinationArgs{
LogGroupArn: pulumi.Any(targetAwsCloudwatchLogGroup.Arn),
},
},
}, pulumi.DependsOn([]pulumi.Resource{
source,
target,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.CloudWatch.LogGroup("example", new()
{
Name = "example-pipe-target",
});
var examplePipe = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = sourceAwsSqsQueue.Arn,
Target = targetAwsSqsQueue.Arn,
LogConfiguration = new Aws.Pipes.Inputs.PipeLogConfigurationArgs
{
IncludeExecutionDatas = new[]
{
"ALL",
},
Level = "INFO",
CloudwatchLogsLogDestination = new Aws.Pipes.Inputs.PipeLogConfigurationCloudwatchLogsLogDestinationArgs
{
LogGroupArn = targetAwsCloudwatchLogGroup.Arn,
},
},
}, new CustomResourceOptions
{
DependsOn =
{
source,
target,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeLogConfigurationArgs;
import com.pulumi.aws.pipes.inputs.PipeLogConfigurationCloudwatchLogsLogDestinationArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new LogGroup("example", LogGroupArgs.builder()
.name("example-pipe-target")
.build());
var examplePipe = new Pipe("examplePipe", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(sourceAwsSqsQueue.arn())
.target(targetAwsSqsQueue.arn())
.logConfiguration(PipeLogConfigurationArgs.builder()
.includeExecutionDatas("ALL")
.level("INFO")
.cloudwatchLogsLogDestination(PipeLogConfigurationCloudwatchLogsLogDestinationArgs.builder()
.logGroupArn(targetAwsCloudwatchLogGroup.arn())
.build())
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(
source,
target)
.build());
}
}
resources:
example:
type: aws:cloudwatch:LogGroup
properties:
name: example-pipe-target
examplePipe:
type: aws:pipes:Pipe
name: example
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${sourceAwsSqsQueue.arn}
target: ${targetAwsSqsQueue.arn}
logConfiguration:
includeExecutionDatas:
- ALL
level: INFO
cloudwatchLogsLogDestination:
logGroupArn: ${targetAwsCloudwatchLogGroup.arn}
options:
dependsOn:
- ${source}
- ${target}
The logConfiguration property sends execution details to a CloudWatch Log Group. The includeExecutionDatas field controls what data is logged (ALL includes input, output, and errors). The level property sets the verbosity (INFO, ERROR, TRACE).
Tune SQS batching and message attributes
SQS sources and targets support parameters that control batching and FIFO queue attributes.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: source.arn,
target: target.arn,
sourceParameters: {
sqsQueueParameters: {
batchSize: 1,
maximumBatchingWindowInSeconds: 2,
},
},
targetParameters: {
sqsQueueParameters: {
messageDeduplicationId: "example-dedupe",
messageGroupId: "example-group",
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source["arn"],
target=target["arn"],
source_parameters={
"sqs_queue_parameters": {
"batch_size": 1,
"maximum_batching_window_in_seconds": 2,
},
},
target_parameters={
"sqs_queue_parameters": {
"message_deduplication_id": "example-dedupe",
"message_group_id": "example-group",
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(source.Arn),
Target: pulumi.Any(target.Arn),
SourceParameters: &pipes.PipeSourceParametersArgs{
SqsQueueParameters: &pipes.PipeSourceParametersSqsQueueParametersArgs{
BatchSize: pulumi.Int(1),
MaximumBatchingWindowInSeconds: pulumi.Int(2),
},
},
TargetParameters: &pipes.PipeTargetParametersArgs{
SqsQueueParameters: &pipes.PipeTargetParametersSqsQueueParametersArgs{
MessageDeduplicationId: pulumi.String("example-dedupe"),
MessageGroupId: pulumi.String("example-group"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = source.Arn,
Target = target.Arn,
SourceParameters = new Aws.Pipes.Inputs.PipeSourceParametersArgs
{
SqsQueueParameters = new Aws.Pipes.Inputs.PipeSourceParametersSqsQueueParametersArgs
{
BatchSize = 1,
MaximumBatchingWindowInSeconds = 2,
},
},
TargetParameters = new Aws.Pipes.Inputs.PipeTargetParametersArgs
{
SqsQueueParameters = new Aws.Pipes.Inputs.PipeTargetParametersSqsQueueParametersArgs
{
MessageDeduplicationId = "example-dedupe",
MessageGroupId = "example-group",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersSqsQueueParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeTargetParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeTargetParametersSqsQueueParametersArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Pipe("example", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(source.arn())
.target(target.arn())
.sourceParameters(PipeSourceParametersArgs.builder()
.sqsQueueParameters(PipeSourceParametersSqsQueueParametersArgs.builder()
.batchSize(1)
.maximumBatchingWindowInSeconds(2)
.build())
.build())
.targetParameters(PipeTargetParametersArgs.builder()
.sqsQueueParameters(PipeTargetParametersSqsQueueParametersArgs.builder()
.messageDeduplicationId("example-dedupe")
.messageGroupId("example-group")
.build())
.build())
.build());
}
}
resources:
example:
type: aws:pipes:Pipe
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${source.arn}
target: ${target.arn}
sourceParameters:
sqsQueueParameters:
batchSize: 1
maximumBatchingWindowInSeconds: 2
targetParameters:
sqsQueueParameters:
messageDeduplicationId: example-dedupe
messageGroupId: example-group
The sourceParameters configure how the pipe reads from SQS: batchSize controls how many messages are retrieved at once, and maximumBatchingWindowInSeconds sets how long to wait for a full batch. The targetParameters set FIFO-specific attributes like messageDeduplicationId and messageGroupId for the destination queue.
Beyond these examples
These snippets focus on specific pipe-level features: source-to-target connections with IAM, enrichment and filtering, and CloudWatch Logs integration and SQS-specific tuning. They’re intentionally minimal rather than full event processing pipelines.
The examples reference pre-existing infrastructure such as SQS queues, API Destinations, CloudWatch Log Groups, and IAM roles with appropriate trust policies and permissions. They focus on configuring the pipe rather than provisioning everything around it.
To keep things focused, common pipe patterns are omitted, including:
- KMS encryption (kmsKeyIdentifier)
- Pipe state management (desiredState)
- DynamoDB Streams, Kinesis, and Kafka source configurations
- Target-specific parameters for services beyond SQS
These omissions are intentional: the goal is to illustrate how each pipe feature is wired, not provide drop-in event processing modules. See the EventBridge Pipe resource reference for all available configuration options.
Let's configure AWS EventBridge Pipes
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
IAM & Permissions
sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes, while an SQS target requires sqs:SendMessage. Use dependsOn to ensure role policies are attached before creating the pipe.Configuration & Immutability
name, namePrefix, and source properties are immutable. Changing any of these requires replacing the entire pipe resource.smk:// followed by the bootstrap server’s address instead of an ARN for the source parameter.Event Processing
sourceParameters.filterCriteria.filters with a pattern matching the events you want to process. The pattern uses JSON format to specify filtering criteria.enrichment parameter to an ARN (such as an API destination), and configure enrichmentParameters.httpParameters with path, header, or query string parameters as needed.Source Configuration
sourceParameters.sqsQueueParameters to set batchSize and maximumBatchingWindowInSeconds for controlling how events are read from the source queue.Logging & Monitoring
logConfiguration with includeExecutionDatas (e.g., ["ALL"]), level (e.g., "INFO"), and cloudwatchLogsLogDestination pointing to a CloudWatch log group ARN.Using a different cloud?
Explore integration guides for other cloud providers: