The aws:pipes/pipe:Pipe resource, part of the Pulumi AWS provider, defines an EventBridge Pipe that connects an event source to a target, optionally filtering and enriching events in transit. This guide focuses on five capabilities: IAM role setup, event filtering, enrichment via API destinations, CloudWatch Logs integration, and SQS-specific configuration.
Pipes require an IAM execution role with permissions to read from the source and write to the target. They may also reference CloudWatch Log Groups or API Destinations for enrichment. The examples are intentionally small. Combine them with your own IAM policies, queues, and monitoring infrastructure.
Connect SQS queues with IAM permissions
Most deployments start by connecting a source queue to a target queue, establishing the IAM role that grants EventBridge Pipes permission to read and write.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const main = aws.getCallerIdentity({});
const example = new aws.iam.Role("example", {assumeRolePolicy: JSON.stringify({
Version: "2012-10-17",
Statement: {
Effect: "Allow",
Action: "sts:AssumeRole",
Principal: {
Service: "pipes.amazonaws.com",
},
Condition: {
StringEquals: {
"aws:SourceAccount": main.then(main => main.accountId),
},
},
},
})});
const sourceQueue = new aws.sqs.Queue("source", {});
const source = new aws.iam.RolePolicy("source", {
role: example.id,
policy: pulumi.jsonStringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Action: [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
],
Resource: [sourceQueue.arn],
}],
}),
});
const targetQueue = new aws.sqs.Queue("target", {});
const target = new aws.iam.RolePolicy("target", {
role: example.id,
policy: pulumi.jsonStringify({
Version: "2012-10-17",
Statement: [{
Effect: "Allow",
Action: ["sqs:SendMessage"],
Resource: [targetQueue.arn],
}],
}),
});
const examplePipe = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: example.arn,
source: sourceQueue.arn,
target: targetQueue.arn,
}, {
dependsOn: [
source,
target,
],
});
import pulumi
import json
import pulumi_aws as aws
main = aws.get_caller_identity()
example = aws.iam.Role("example", assume_role_policy=json.dumps({
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Principal": {
"Service": "pipes.amazonaws.com",
},
"Condition": {
"StringEquals": {
"aws:SourceAccount": main.account_id,
},
},
},
}))
source_queue = aws.sqs.Queue("source")
source = aws.iam.RolePolicy("source",
role=example.id,
policy=pulumi.Output.json_dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
],
"Resource": [source_queue.arn],
}],
}))
target_queue = aws.sqs.Queue("target")
target = aws.iam.RolePolicy("target",
role=example.id,
policy=pulumi.Output.json_dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["sqs:SendMessage"],
"Resource": [target_queue.arn],
}],
}))
example_pipe = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example.arn,
source=source_queue.arn,
target=target_queue.arn,
opts = pulumi.ResourceOptions(depends_on=[
source,
target,
]))
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sqs"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
main, err := aws.GetCallerIdentity(ctx, &aws.GetCallerIdentityArgs{}, nil)
if err != nil {
return err
}
tmpJSON0, err := json.Marshal(map[string]interface{}{
"Version": "2012-10-17",
"Statement": map[string]interface{}{
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Principal": map[string]interface{}{
"Service": "pipes.amazonaws.com",
},
"Condition": map[string]interface{}{
"StringEquals": map[string]interface{}{
"aws:SourceAccount": main.AccountId,
},
},
},
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
example, err := iam.NewRole(ctx, "example", &iam.RoleArgs{
AssumeRolePolicy: pulumi.String(json0),
})
if err != nil {
return err
}
sourceQueue, err := sqs.NewQueue(ctx, "source", nil)
if err != nil {
return err
}
source, err := iam.NewRolePolicy(ctx, "source", &iam.RolePolicyArgs{
Role: example.ID(),
Policy: sourceQueue.Arn.ApplyT(func(arn string) (pulumi.String, error) {
var _zero pulumi.String
tmpJSON1, err := json.Marshal(map[string]interface{}{
"Version": "2012-10-17",
"Statement": []map[string]interface{}{
map[string]interface{}{
"Effect": "Allow",
"Action": []string{
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
},
"Resource": []string{
arn,
},
},
},
})
if err != nil {
return _zero, err
}
json1 := string(tmpJSON1)
return pulumi.String(json1), nil
}).(pulumi.StringOutput),
})
if err != nil {
return err
}
targetQueue, err := sqs.NewQueue(ctx, "target", nil)
if err != nil {
return err
}
target, err := iam.NewRolePolicy(ctx, "target", &iam.RolePolicyArgs{
Role: example.ID(),
Policy: targetQueue.Arn.ApplyT(func(arn string) (pulumi.String, error) {
var _zero pulumi.String
tmpJSON2, err := json.Marshal(map[string]interface{}{
"Version": "2012-10-17",
"Statement": []map[string]interface{}{
map[string]interface{}{
"Effect": "Allow",
"Action": []string{
"sqs:SendMessage",
},
"Resource": []string{
arn,
},
},
},
})
if err != nil {
return _zero, err
}
json2 := string(tmpJSON2)
return pulumi.String(json2), nil
}).(pulumi.StringOutput),
})
if err != nil {
return err
}
_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: example.Arn,
Source: sourceQueue.Arn,
Target: targetQueue.Arn,
}, pulumi.DependsOn([]pulumi.Resource{
source,
target,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var main = Aws.GetCallerIdentity.Invoke();
var example = new Aws.Iam.Role("example", new()
{
AssumeRolePolicy = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["Version"] = "2012-10-17",
["Statement"] = new Dictionary<string, object?>
{
["Effect"] = "Allow",
["Action"] = "sts:AssumeRole",
["Principal"] = new Dictionary<string, object?>
{
["Service"] = "pipes.amazonaws.com",
},
["Condition"] = new Dictionary<string, object?>
{
["StringEquals"] = new Dictionary<string, object?>
{
["aws:SourceAccount"] = main.Apply(getCallerIdentityResult => getCallerIdentityResult.AccountId),
},
},
},
}),
});
var sourceQueue = new Aws.Sqs.Queue("source");
var source = new Aws.Iam.RolePolicy("source", new()
{
Role = example.Id,
Policy = Output.JsonSerialize(Output.Create(new Dictionary<string, object?>
{
["Version"] = "2012-10-17",
["Statement"] = new[]
{
new Dictionary<string, object?>
{
["Effect"] = "Allow",
["Action"] = new[]
{
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
},
["Resource"] = new[]
{
sourceQueue.Arn,
},
},
},
})),
});
var targetQueue = new Aws.Sqs.Queue("target");
var target = new Aws.Iam.RolePolicy("target", new()
{
Role = example.Id,
Policy = Output.JsonSerialize(Output.Create(new Dictionary<string, object?>
{
["Version"] = "2012-10-17",
["Statement"] = new[]
{
new Dictionary<string, object?>
{
["Effect"] = "Allow",
["Action"] = new[]
{
"sqs:SendMessage",
},
["Resource"] = new[]
{
targetQueue.Arn,
},
},
},
})),
});
var examplePipe = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = example.Arn,
Source = sourceQueue.Arn,
Target = targetQueue.Arn,
}, new CustomResourceOptions
{
DependsOn =
{
source,
target,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.AwsFunctions;
import com.pulumi.aws.inputs.GetCallerIdentityArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.sqs.Queue;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var main = AwsFunctions.getCallerIdentity(GetCallerIdentityArgs.builder()
.build());
var example = new Role("example", RoleArgs.builder()
.assumeRolePolicy(serializeJson(
jsonObject(
jsonProperty("Version", "2012-10-17"),
jsonProperty("Statement", jsonObject(
jsonProperty("Effect", "Allow"),
jsonProperty("Action", "sts:AssumeRole"),
jsonProperty("Principal", jsonObject(
jsonProperty("Service", "pipes.amazonaws.com")
)),
jsonProperty("Condition", jsonObject(
jsonProperty("StringEquals", jsonObject(
jsonProperty("aws:SourceAccount", main.accountId())
))
))
))
)))
.build());
var sourceQueue = new Queue("sourceQueue");
var source = new RolePolicy("source", RolePolicyArgs.builder()
.role(example.id())
.policy(sourceQueue.arn().applyValue(_arn -> serializeJson(
jsonObject(
jsonProperty("Version", "2012-10-17"),
jsonProperty("Statement", jsonArray(jsonObject(
jsonProperty("Effect", "Allow"),
jsonProperty("Action", jsonArray(
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage"
)),
jsonProperty("Resource", jsonArray(_arn))
)))
))))
.build());
var targetQueue = new Queue("targetQueue");
var target = new RolePolicy("target", RolePolicyArgs.builder()
.role(example.id())
.policy(targetQueue.arn().applyValue(_arn -> serializeJson(
jsonObject(
jsonProperty("Version", "2012-10-17"),
jsonProperty("Statement", jsonArray(jsonObject(
jsonProperty("Effect", "Allow"),
jsonProperty("Action", jsonArray("sqs:SendMessage")),
jsonProperty("Resource", jsonArray(_arn))
)))
))))
.build());
var examplePipe = new Pipe("examplePipe", PipeArgs.builder()
.name("example-pipe")
.roleArn(example.arn())
.source(sourceQueue.arn())
.target(targetQueue.arn())
.build(), CustomResourceOptions.builder()
.dependsOn(
source,
target)
.build());
}
}
resources:
example:
type: aws:iam:Role
properties:
assumeRolePolicy:
fn::toJSON:
Version: 2012-10-17
Statement:
Effect: Allow
Action: sts:AssumeRole
Principal:
Service: pipes.amazonaws.com
Condition:
StringEquals:
aws:SourceAccount: ${main.accountId}
source:
type: aws:iam:RolePolicy
properties:
role: ${example.id}
policy:
fn::toJSON:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ReceiveMessage
Resource:
- ${sourceQueue.arn}
sourceQueue:
type: aws:sqs:Queue
name: source
target:
type: aws:iam:RolePolicy
properties:
role: ${example.id}
policy:
fn::toJSON:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource:
- ${targetQueue.arn}
targetQueue:
type: aws:sqs:Queue
name: target
examplePipe:
type: aws:pipes:Pipe
name: example
properties:
name: example-pipe
roleArn: ${example.arn}
source: ${sourceQueue.arn}
target: ${targetQueue.arn}
options:
dependsOn:
- ${source}
- ${target}
variables:
main:
fn::invoke:
function: aws:getCallerIdentity
arguments: {}
The pipe reads messages from the source queue and writes them to the target queue. The roleArn property references an IAM role with a trust policy allowing pipes.amazonaws.com to assume it. The inline policies grant specific SQS permissions: ReceiveMessage and DeleteMessage on the source, SendMessage on the target. The dependsOn ensures policies exist before the pipe starts.
Filter events with pattern matching
Pipes can filter incoming events before processing, reducing costs by only forwarding messages that match specific criteria.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: source.arn,
target: target.arn,
sourceParameters: {
filterCriteria: {
filters: [{
pattern: JSON.stringify({
source: ["event-source"],
}),
}],
},
},
});
import pulumi
import json
import pulumi_aws as aws
example = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source["arn"],
target=target["arn"],
source_parameters={
"filter_criteria": {
"filters": [{
"pattern": json.dumps({
"source": ["event-source"],
}),
}],
},
})
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
tmpJSON0, err := json.Marshal(map[string]interface{}{
"source": []string{
"event-source",
},
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(source.Arn),
Target: pulumi.Any(target.Arn),
SourceParameters: &pipes.PipeSourceParametersArgs{
FilterCriteria: &pipes.PipeSourceParametersFilterCriteriaArgs{
Filters: pipes.PipeSourceParametersFilterCriteriaFilterArray{
&pipes.PipeSourceParametersFilterCriteriaFilterArgs{
Pattern: pulumi.String(json0),
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = source.Arn,
Target = target.Arn,
SourceParameters = new Aws.Pipes.Inputs.PipeSourceParametersArgs
{
FilterCriteria = new Aws.Pipes.Inputs.PipeSourceParametersFilterCriteriaArgs
{
Filters = new[]
{
new Aws.Pipes.Inputs.PipeSourceParametersFilterCriteriaFilterArgs
{
Pattern = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["source"] = new[]
{
"event-source",
},
}),
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersFilterCriteriaArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Pipe("example", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(source.arn())
.target(target.arn())
.sourceParameters(PipeSourceParametersArgs.builder()
.filterCriteria(PipeSourceParametersFilterCriteriaArgs.builder()
.filters(PipeSourceParametersFilterCriteriaFilterArgs.builder()
.pattern(serializeJson(
jsonObject(
jsonProperty("source", jsonArray("event-source"))
)))
.build())
.build())
.build())
.build());
}
}
resources:
example:
type: aws:pipes:Pipe
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${source.arn}
target: ${target.arn}
sourceParameters:
filterCriteria:
filters:
- pattern:
fn::toJSON:
source:
- event-source
The sourceParameters.filterCriteria property defines filters using JSON pattern matching. Each filter’s pattern property contains a JSON object that specifies which event fields to match. Here, only events with a source field equal to “event-source” pass through to the target.
Enrich events with API destination calls
Between source and target, pipes can call an enrichment endpoint to transform or augment event data.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: source.arn,
target: target.arn,
enrichment: exampleAwsCloudwatchEventApiDestination.arn,
enrichmentParameters: {
httpParameters: {
pathParameterValues: "example-path-param",
headerParameters: {
"example-header": "example-value",
"second-example-header": "second-example-value",
},
queryStringParameters: {
"example-query-string": "example-value",
"second-example-query-string": "second-example-value",
},
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source["arn"],
target=target["arn"],
enrichment=example_aws_cloudwatch_event_api_destination["arn"],
enrichment_parameters={
"http_parameters": {
"path_parameter_values": "example-path-param",
"header_parameters": {
"example-header": "example-value",
"second-example-header": "second-example-value",
},
"query_string_parameters": {
"example-query-string": "example-value",
"second-example-query-string": "second-example-value",
},
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(source.Arn),
Target: pulumi.Any(target.Arn),
Enrichment: pulumi.Any(exampleAwsCloudwatchEventApiDestination.Arn),
EnrichmentParameters: &pipes.PipeEnrichmentParametersArgs{
HttpParameters: &pipes.PipeEnrichmentParametersHttpParametersArgs{
PathParameterValues: pulumi.String("example-path-param"),
HeaderParameters: pulumi.StringMap{
"example-header": pulumi.String("example-value"),
"second-example-header": pulumi.String("second-example-value"),
},
QueryStringParameters: pulumi.StringMap{
"example-query-string": pulumi.String("example-value"),
"second-example-query-string": pulumi.String("second-example-value"),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = source.Arn,
Target = target.Arn,
Enrichment = exampleAwsCloudwatchEventApiDestination.Arn,
EnrichmentParameters = new Aws.Pipes.Inputs.PipeEnrichmentParametersArgs
{
HttpParameters = new Aws.Pipes.Inputs.PipeEnrichmentParametersHttpParametersArgs
{
PathParameterValues = "example-path-param",
HeaderParameters =
{
{ "example-header", "example-value" },
{ "second-example-header", "second-example-value" },
},
QueryStringParameters =
{
{ "example-query-string", "example-value" },
{ "second-example-query-string", "second-example-value" },
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeEnrichmentParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeEnrichmentParametersHttpParametersArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Pipe("example", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(source.arn())
.target(target.arn())
.enrichment(exampleAwsCloudwatchEventApiDestination.arn())
.enrichmentParameters(PipeEnrichmentParametersArgs.builder()
.httpParameters(PipeEnrichmentParametersHttpParametersArgs.builder()
.pathParameterValues("example-path-param")
.headerParameters(Map.ofEntries(
Map.entry("example-header", "example-value"),
Map.entry("second-example-header", "second-example-value")
))
.queryStringParameters(Map.ofEntries(
Map.entry("example-query-string", "example-value"),
Map.entry("second-example-query-string", "second-example-value")
))
.build())
.build())
.build());
}
}
resources:
example:
type: aws:pipes:Pipe
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${source.arn}
target: ${target.arn}
enrichment: ${exampleAwsCloudwatchEventApiDestination.arn}
enrichmentParameters:
httpParameters:
pathParameterValues: example-path-param
headerParameters:
example-header: example-value
second-example-header: second-example-value
queryStringParameters:
example-query-string: example-value
second-example-query-string: second-example-value
The enrichment property points to an API destination ARN. The enrichmentParameters.httpParameters block configures the HTTP request: pathParameterValues sets URL path segments, headerParameters adds HTTP headers, and queryStringParameters appends query strings. EventBridge calls this endpoint for each event, passing the event data and using the response as input to the target.
Send execution logs to CloudWatch Logs
For debugging and monitoring, pipes can send execution details to CloudWatch Logs, capturing what data flows through and any errors.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.cloudwatch.LogGroup("example", {name: "example-pipe-target"});
const examplePipe = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: sourceAwsSqsQueue.arn,
target: targetAwsSqsQueue.arn,
logConfiguration: {
includeExecutionDatas: ["ALL"],
level: "INFO",
cloudwatchLogsLogDestination: {
logGroupArn: targetAwsCloudwatchLogGroup.arn,
},
},
}, {
dependsOn: [
source,
target,
],
});
import pulumi
import pulumi_aws as aws
example = aws.cloudwatch.LogGroup("example", name="example-pipe-target")
example_pipe = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source_aws_sqs_queue["arn"],
target=target_aws_sqs_queue["arn"],
log_configuration={
"include_execution_datas": ["ALL"],
"level": "INFO",
"cloudwatch_logs_log_destination": {
"log_group_arn": target_aws_cloudwatch_log_group["arn"],
},
},
opts = pulumi.ResourceOptions(depends_on=[
source,
target,
]))
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
Name: pulumi.String("example-pipe-target"),
})
if err != nil {
return err
}
_, err = pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(sourceAwsSqsQueue.Arn),
Target: pulumi.Any(targetAwsSqsQueue.Arn),
LogConfiguration: &pipes.PipeLogConfigurationArgs{
IncludeExecutionDatas: pulumi.StringArray{
pulumi.String("ALL"),
},
Level: pulumi.String("INFO"),
CloudwatchLogsLogDestination: &pipes.PipeLogConfigurationCloudwatchLogsLogDestinationArgs{
LogGroupArn: pulumi.Any(targetAwsCloudwatchLogGroup.Arn),
},
},
}, pulumi.DependsOn([]pulumi.Resource{
source,
target,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.CloudWatch.LogGroup("example", new()
{
Name = "example-pipe-target",
});
var examplePipe = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = sourceAwsSqsQueue.Arn,
Target = targetAwsSqsQueue.Arn,
LogConfiguration = new Aws.Pipes.Inputs.PipeLogConfigurationArgs
{
IncludeExecutionDatas = new[]
{
"ALL",
},
Level = "INFO",
CloudwatchLogsLogDestination = new Aws.Pipes.Inputs.PipeLogConfigurationCloudwatchLogsLogDestinationArgs
{
LogGroupArn = targetAwsCloudwatchLogGroup.Arn,
},
},
}, new CustomResourceOptions
{
DependsOn =
{
source,
target,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeLogConfigurationArgs;
import com.pulumi.aws.pipes.inputs.PipeLogConfigurationCloudwatchLogsLogDestinationArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new LogGroup("example", LogGroupArgs.builder()
.name("example-pipe-target")
.build());
var examplePipe = new Pipe("examplePipe", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(sourceAwsSqsQueue.arn())
.target(targetAwsSqsQueue.arn())
.logConfiguration(PipeLogConfigurationArgs.builder()
.includeExecutionDatas("ALL")
.level("INFO")
.cloudwatchLogsLogDestination(PipeLogConfigurationCloudwatchLogsLogDestinationArgs.builder()
.logGroupArn(targetAwsCloudwatchLogGroup.arn())
.build())
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(
source,
target)
.build());
}
}
resources:
example:
type: aws:cloudwatch:LogGroup
properties:
name: example-pipe-target
examplePipe:
type: aws:pipes:Pipe
name: example
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${sourceAwsSqsQueue.arn}
target: ${targetAwsSqsQueue.arn}
logConfiguration:
includeExecutionDatas:
- ALL
level: INFO
cloudwatchLogsLogDestination:
logGroupArn: ${targetAwsCloudwatchLogGroup.arn}
options:
dependsOn:
- ${source}
- ${target}
The logConfiguration property enables logging. The includeExecutionDatas array controls what data to log (ALL includes full event payloads). The level property sets log verbosity (INFO, ERROR, TRACE, or OFF). The cloudwatchLogsLogDestination.logGroupArn points to where logs are written. Your IAM role needs cloudwatch:PutLogEvents permission.
Tune SQS batching and message attributes
When both source and target are SQS queues, you can control batching behavior and set message attributes for FIFO queue requirements.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.pipes.Pipe("example", {
name: "example-pipe",
roleArn: exampleAwsIamRole.arn,
source: source.arn,
target: target.arn,
sourceParameters: {
sqsQueueParameters: {
batchSize: 1,
maximumBatchingWindowInSeconds: 2,
},
},
targetParameters: {
sqsQueueParameters: {
messageDeduplicationId: "example-dedupe",
messageGroupId: "example-group",
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.pipes.Pipe("example",
name="example-pipe",
role_arn=example_aws_iam_role["arn"],
source=source["arn"],
target=target["arn"],
source_parameters={
"sqs_queue_parameters": {
"batch_size": 1,
"maximum_batching_window_in_seconds": 2,
},
},
target_parameters={
"sqs_queue_parameters": {
"message_deduplication_id": "example-dedupe",
"message_group_id": "example-group",
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/pipes"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := pipes.NewPipe(ctx, "example", &pipes.PipeArgs{
Name: pulumi.String("example-pipe"),
RoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Source: pulumi.Any(source.Arn),
Target: pulumi.Any(target.Arn),
SourceParameters: &pipes.PipeSourceParametersArgs{
SqsQueueParameters: &pipes.PipeSourceParametersSqsQueueParametersArgs{
BatchSize: pulumi.Int(1),
MaximumBatchingWindowInSeconds: pulumi.Int(2),
},
},
TargetParameters: &pipes.PipeTargetParametersArgs{
SqsQueueParameters: &pipes.PipeTargetParametersSqsQueueParametersArgs{
MessageDeduplicationId: pulumi.String("example-dedupe"),
MessageGroupId: pulumi.String("example-group"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Pipes.Pipe("example", new()
{
Name = "example-pipe",
RoleArn = exampleAwsIamRole.Arn,
Source = source.Arn,
Target = target.Arn,
SourceParameters = new Aws.Pipes.Inputs.PipeSourceParametersArgs
{
SqsQueueParameters = new Aws.Pipes.Inputs.PipeSourceParametersSqsQueueParametersArgs
{
BatchSize = 1,
MaximumBatchingWindowInSeconds = 2,
},
},
TargetParameters = new Aws.Pipes.Inputs.PipeTargetParametersArgs
{
SqsQueueParameters = new Aws.Pipes.Inputs.PipeTargetParametersSqsQueueParametersArgs
{
MessageDeduplicationId = "example-dedupe",
MessageGroupId = "example-group",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.pipes.Pipe;
import com.pulumi.aws.pipes.PipeArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeSourceParametersSqsQueueParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeTargetParametersArgs;
import com.pulumi.aws.pipes.inputs.PipeTargetParametersSqsQueueParametersArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Pipe("example", PipeArgs.builder()
.name("example-pipe")
.roleArn(exampleAwsIamRole.arn())
.source(source.arn())
.target(target.arn())
.sourceParameters(PipeSourceParametersArgs.builder()
.sqsQueueParameters(PipeSourceParametersSqsQueueParametersArgs.builder()
.batchSize(1)
.maximumBatchingWindowInSeconds(2)
.build())
.build())
.targetParameters(PipeTargetParametersArgs.builder()
.sqsQueueParameters(PipeTargetParametersSqsQueueParametersArgs.builder()
.messageDeduplicationId("example-dedupe")
.messageGroupId("example-group")
.build())
.build())
.build());
}
}
resources:
example:
type: aws:pipes:Pipe
properties:
name: example-pipe
roleArn: ${exampleAwsIamRole.arn}
source: ${source.arn}
target: ${target.arn}
sourceParameters:
sqsQueueParameters:
batchSize: 1
maximumBatchingWindowInSeconds: 2
targetParameters:
sqsQueueParameters:
messageDeduplicationId: example-dedupe
messageGroupId: example-group
The sourceParameters.sqsQueueParameters block controls how the pipe reads from SQS: batchSize sets how many messages to retrieve per poll, and maximumBatchingWindowInSeconds sets how long to wait for a full batch. The targetParameters.sqsQueueParameters block sets message attributes when writing: messageDeduplicationId prevents duplicates, and messageGroupId groups messages in FIFO queues.
Beyond these examples
These snippets focus on specific pipe-level features: IAM role configuration and permissions, event filtering and enrichment, and CloudWatch Logs integration and SQS-specific tuning. They’re intentionally minimal rather than full event-driven architectures.
The examples may reference pre-existing infrastructure such as SQS queues (source and target), CloudWatch Log Groups, and API Destinations for enrichment. They focus on configuring the pipe rather than provisioning everything around it.
To keep things focused, common pipe patterns are omitted, including:
- KMS encryption (kmsKeyIdentifier)
- Pipe state management (desiredState for RUNNING/STOPPED)
- Other source types (Kinesis, DynamoDB Streams, Kafka)
- Other target types (Step Functions, EventBridge buses, Lambda)
These omissions are intentional: the goal is to illustrate how each pipe feature is wired, not provide drop-in event processing modules. See the EventBridge Pipe resource reference for all available configuration options.
Let's configure AWS EventBridge Pipes
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
IAM & Permissions
sqs:ReceiveMessage, sqs:GetQueueAttributes, and sqs:DeleteMessage, while an SQS target requires sqs:SendMessage. The role must also have a trust policy allowing pipes.amazonaws.com to assume it.dependsOn to ensure policy resources are created first, preventing pipe creation failures due to missing permissions.Configuration & Setup
name to specify an exact pipe name, or namePrefix to auto-generate a unique name with your prefix. These properties are mutually exclusive—you can only use one.name and source properties are immutable. Changing either requires replacing the entire pipe resource.smk:// followed by the bootstrap server’s address as the source value, instead of an ARN.desiredState to RUNNING to start the pipe or STOPPED to pause it.Enrichment & Filtering
sourceParameters.filterCriteria.filters with a pattern matching the events you want to process.enrichment property to an enrichment resource ARN (like an API Destination) and configure enrichmentParameters with HTTP parameters for path, headers, and query strings.Logging & Monitoring
logConfiguration with a log level (e.g., INFO), includeExecutionDatas (e.g., ALL), and cloudwatchLogsLogDestination pointing to your log group ARN.Advanced Configuration
sourceParameters.sqsQueueParameters to set batchSize and maximumBatchingWindowInSeconds for source batching, and targetParameters.sqsQueueParameters for target-specific settings like messageDeduplicationId and messageGroupId.kmsKeyIdentifier with your customer managed KMS key ARN, key ID, or alias.Using a different cloud?
Explore integration guides for other cloud providers: