The aws:sns/topicSubscription:TopicSubscription resource, part of the Pulumi AWS provider, connects SNS topics to endpoints that receive published messages: SQS queues, Lambda functions, HTTPS webhooks, or mobile applications. This guide focuses on three capabilities: SQS queue subscriptions, HTTPS delivery with retry policies, and cross-account routing.
Subscriptions reference existing SNS topics and require IAM policies that grant the topic permission to deliver to the endpoint. The examples are intentionally small. Combine them with your own topics, queues, and IAM configuration.
Route SNS messages to an SQS queue
Most integrations route topic messages to an SQS queue for asynchronous processing, decoupling publishers from consumers.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const userUpdates = new aws.sns.Topic("user_updates", {name: "user-updates-topic"});
const sqsQueuePolicy = aws.iam.getPolicyDocumentOutput({
policyId: "arn:aws:sqs:us-west-2:123456789012:user_updates_queue/SQSDefaultPolicy",
statements: [{
sid: "user_updates_sqs_target",
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["sns.amazonaws.com"],
}],
actions: ["SQS:SendMessage"],
resources: ["arn:aws:sqs:us-west-2:123456789012:user-updates-queue"],
conditions: [{
test: "ArnEquals",
variable: "aws:SourceArn",
values: [userUpdates.arn],
}],
}],
});
const userUpdatesQueue = new aws.sqs.Queue("user_updates_queue", {
name: "user-updates-queue",
policy: sqsQueuePolicy.apply(sqsQueuePolicy => sqsQueuePolicy.json),
});
const userUpdatesSqsTarget = new aws.sns.TopicSubscription("user_updates_sqs_target", {
topic: userUpdates.arn,
protocol: "sqs",
endpoint: userUpdatesQueue.arn,
});
import pulumi
import pulumi_aws as aws
user_updates = aws.sns.Topic("user_updates", name="user-updates-topic")
sqs_queue_policy = aws.iam.get_policy_document_output(policy_id="arn:aws:sqs:us-west-2:123456789012:user_updates_queue/SQSDefaultPolicy",
statements=[{
"sid": "user_updates_sqs_target",
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["sns.amazonaws.com"],
}],
"actions": ["SQS:SendMessage"],
"resources": ["arn:aws:sqs:us-west-2:123456789012:user-updates-queue"],
"conditions": [{
"test": "ArnEquals",
"variable": "aws:SourceArn",
"values": [user_updates.arn],
}],
}])
user_updates_queue = aws.sqs.Queue("user_updates_queue",
name="user-updates-queue",
policy=sqs_queue_policy.json)
user_updates_sqs_target = aws.sns.TopicSubscription("user_updates_sqs_target",
topic=user_updates.arn,
protocol="sqs",
endpoint=user_updates_queue.arn)
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sns"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sqs"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
userUpdates, err := sns.NewTopic(ctx, "user_updates", &sns.TopicArgs{
Name: pulumi.String("user-updates-topic"),
})
if err != nil {
return err
}
sqsQueuePolicy := iam.GetPolicyDocumentOutput(ctx, iam.GetPolicyDocumentOutputArgs{
PolicyId: pulumi.String("arn:aws:sqs:us-west-2:123456789012:user_updates_queue/SQSDefaultPolicy"),
Statements: iam.GetPolicyDocumentStatementArray{
&iam.GetPolicyDocumentStatementArgs{
Sid: pulumi.String("user_updates_sqs_target"),
Effect: pulumi.String("Allow"),
Principals: iam.GetPolicyDocumentStatementPrincipalArray{
&iam.GetPolicyDocumentStatementPrincipalArgs{
Type: pulumi.String("Service"),
Identifiers: pulumi.StringArray{
pulumi.String("sns.amazonaws.com"),
},
},
},
Actions: pulumi.StringArray{
pulumi.String("SQS:SendMessage"),
},
Resources: pulumi.StringArray{
pulumi.String("arn:aws:sqs:us-west-2:123456789012:user-updates-queue"),
},
Conditions: iam.GetPolicyDocumentStatementConditionArray{
&iam.GetPolicyDocumentStatementConditionArgs{
Test: pulumi.String("ArnEquals"),
Variable: pulumi.String("aws:SourceArn"),
Values: pulumi.StringArray{
userUpdates.Arn,
},
},
},
},
},
}, nil)
userUpdatesQueue, err := sqs.NewQueue(ctx, "user_updates_queue", &sqs.QueueArgs{
Name: pulumi.String("user-updates-queue"),
Policy: pulumi.String(sqsQueuePolicy.ApplyT(func(sqsQueuePolicy iam.GetPolicyDocumentResult) (*string, error) {
return &sqsQueuePolicy.Json, nil
}).(pulumi.StringPtrOutput)),
})
if err != nil {
return err
}
_, err = sns.NewTopicSubscription(ctx, "user_updates_sqs_target", &sns.TopicSubscriptionArgs{
Topic: userUpdates.Arn,
Protocol: pulumi.String("sqs"),
Endpoint: userUpdatesQueue.Arn,
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var userUpdates = new Aws.Sns.Topic("user_updates", new()
{
Name = "user-updates-topic",
});
var sqsQueuePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
PolicyId = "arn:aws:sqs:us-west-2:123456789012:user_updates_queue/SQSDefaultPolicy",
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Sid = "user_updates_sqs_target",
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"sns.amazonaws.com",
},
},
},
Actions = new[]
{
"SQS:SendMessage",
},
Resources = new[]
{
"arn:aws:sqs:us-west-2:123456789012:user-updates-queue",
},
Conditions = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "ArnEquals",
Variable = "aws:SourceArn",
Values = new[]
{
userUpdates.Arn,
},
},
},
},
},
});
var userUpdatesQueue = new Aws.Sqs.Queue("user_updates_queue", new()
{
Name = "user-updates-queue",
Policy = sqsQueuePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var userUpdatesSqsTarget = new Aws.Sns.TopicSubscription("user_updates_sqs_target", new()
{
Topic = userUpdates.Arn,
Protocol = "sqs",
Endpoint = userUpdatesQueue.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.sns.Topic;
import com.pulumi.aws.sns.TopicArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.sqs.Queue;
import com.pulumi.aws.sqs.QueueArgs;
import com.pulumi.aws.sns.TopicSubscription;
import com.pulumi.aws.sns.TopicSubscriptionArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var userUpdates = new Topic("userUpdates", TopicArgs.builder()
.name("user-updates-topic")
.build());
final var sqsQueuePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.policyId("arn:aws:sqs:us-west-2:123456789012:user_updates_queue/SQSDefaultPolicy")
.statements(GetPolicyDocumentStatementArgs.builder()
.sid("user_updates_sqs_target")
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("sns.amazonaws.com")
.build())
.actions("SQS:SendMessage")
.resources("arn:aws:sqs:us-west-2:123456789012:user-updates-queue")
.conditions(GetPolicyDocumentStatementConditionArgs.builder()
.test("ArnEquals")
.variable("aws:SourceArn")
.values(userUpdates.arn())
.build())
.build())
.build());
var userUpdatesQueue = new Queue("userUpdatesQueue", QueueArgs.builder()
.name("user-updates-queue")
.policy(sqsQueuePolicy.applyValue(_sqsQueuePolicy -> _sqsQueuePolicy.json()))
.build());
var userUpdatesSqsTarget = new TopicSubscription("userUpdatesSqsTarget", TopicSubscriptionArgs.builder()
.topic(userUpdates.arn())
.protocol("sqs")
.endpoint(userUpdatesQueue.arn())
.build());
}
}
resources:
userUpdates:
type: aws:sns:Topic
name: user_updates
properties:
name: user-updates-topic
userUpdatesQueue:
type: aws:sqs:Queue
name: user_updates_queue
properties:
name: user-updates-queue
policy: ${sqsQueuePolicy.json}
userUpdatesSqsTarget:
type: aws:sns:TopicSubscription
name: user_updates_sqs_target
properties:
topic: ${userUpdates.arn}
protocol: sqs
endpoint: ${userUpdatesQueue.arn}
variables:
sqsQueuePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
policyId: arn:aws:sqs:us-west-2:123456789012:user_updates_queue/SQSDefaultPolicy
statements:
- sid: user_updates_sqs_target
effect: Allow
principals:
- type: Service
identifiers:
- sns.amazonaws.com
actions:
- SQS:SendMessage
resources:
- arn:aws:sqs:us-west-2:123456789012:user-updates-queue
conditions:
- test: ArnEquals
variable: aws:SourceArn
values:
- ${userUpdates.arn}
The subscription connects the topic to the queue using protocol “sqs” and the queue’s ARN as the endpoint. The queue policy grants sns.amazonaws.com permission to SendMessage, scoped to the specific topic ARN via a condition. Without this policy, SNS cannot deliver messages.
Configure retry behavior for HTTPS endpoints
HTTPS subscriptions often need custom retry logic to handle transient failures or rate limits.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const exampleWithDeliveryPolicy = new aws.sns.TopicSubscription("example_with_delivery_policy", {
topic: "arn:aws:sns:us-west-2:123456789012:my-topic",
protocol: "https",
endpoint: "https://example.com/endpoint",
rawMessageDelivery: true,
deliveryPolicy: `{
\\"healthyRetryPolicy\\": {
\\"minDelayTarget\\": 20,
\\"maxDelayTarget\\": 20,
\\"numRetries\\": 3,
\\"numMaxDelayRetries\\": 0,
\\"numNoDelayRetries\\": 0,
\\"numMinDelayRetries\\": 0,
\\"backoffFunction\\": \\"linear\\"
},
\\"sicklyRetryPolicy\\": null,
\\"throttlePolicy\\": null,
\\"requestPolicy\\": {
\\"headerContentType\\": \\"text/plain; application/json\\"
},
\\"guaranteed\\": false
}
`,
});
import pulumi
import pulumi_aws as aws
example_with_delivery_policy = aws.sns.TopicSubscription("example_with_delivery_policy",
topic="arn:aws:sns:us-west-2:123456789012:my-topic",
protocol="https",
endpoint="https://example.com/endpoint",
raw_message_delivery=True,
delivery_policy="""{
\"healthyRetryPolicy\": {
\"minDelayTarget\": 20,
\"maxDelayTarget\": 20,
\"numRetries\": 3,
\"numMaxDelayRetries\": 0,
\"numNoDelayRetries\": 0,
\"numMinDelayRetries\": 0,
\"backoffFunction\": \"linear\"
},
\"sicklyRetryPolicy\": null,
\"throttlePolicy\": null,
\"requestPolicy\": {
\"headerContentType\": \"text/plain; application/json\"
},
\"guaranteed\": false
}
""")
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sns"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := sns.NewTopicSubscription(ctx, "example_with_delivery_policy", &sns.TopicSubscriptionArgs{
Topic: pulumi.Any("arn:aws:sns:us-west-2:123456789012:my-topic"),
Protocol: pulumi.String("https"),
Endpoint: pulumi.String("https://example.com/endpoint"),
RawMessageDelivery: pulumi.Bool(true),
DeliveryPolicy: pulumi.String(`{
\"healthyRetryPolicy\": {
\"minDelayTarget\": 20,
\"maxDelayTarget\": 20,
\"numRetries\": 3,
\"numMaxDelayRetries\": 0,
\"numNoDelayRetries\": 0,
\"numMinDelayRetries\": 0,
\"backoffFunction\": \"linear\"
},
\"sicklyRetryPolicy\": null,
\"throttlePolicy\": null,
\"requestPolicy\": {
\"headerContentType\": \"text/plain; application/json\"
},
\"guaranteed\": false
}
`),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var exampleWithDeliveryPolicy = new Aws.Sns.TopicSubscription("example_with_delivery_policy", new()
{
Topic = "arn:aws:sns:us-west-2:123456789012:my-topic",
Protocol = "https",
Endpoint = "https://example.com/endpoint",
RawMessageDelivery = true,
DeliveryPolicy = @"{
\""healthyRetryPolicy\"": {
\""minDelayTarget\"": 20,
\""maxDelayTarget\"": 20,
\""numRetries\"": 3,
\""numMaxDelayRetries\"": 0,
\""numNoDelayRetries\"": 0,
\""numMinDelayRetries\"": 0,
\""backoffFunction\"": \""linear\""
},
\""sicklyRetryPolicy\"": null,
\""throttlePolicy\"": null,
\""requestPolicy\"": {
\""headerContentType\"": \""text/plain; application/json\""
},
\""guaranteed\"": false
}
",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.sns.TopicSubscription;
import com.pulumi.aws.sns.TopicSubscriptionArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var exampleWithDeliveryPolicy = new TopicSubscription("exampleWithDeliveryPolicy", TopicSubscriptionArgs.builder()
.topic("arn:aws:sns:us-west-2:123456789012:my-topic")
.protocol("https")
.endpoint("https://example.com/endpoint")
.rawMessageDelivery(true)
.deliveryPolicy("""
{
\"healthyRetryPolicy\": {
\"minDelayTarget\": 20,
\"maxDelayTarget\": 20,
\"numRetries\": 3,
\"numMaxDelayRetries\": 0,
\"numNoDelayRetries\": 0,
\"numMinDelayRetries\": 0,
\"backoffFunction\": \"linear\"
},
\"sicklyRetryPolicy\": null,
\"throttlePolicy\": null,
\"requestPolicy\": {
\"headerContentType\": \"text/plain; application/json\"
},
\"guaranteed\": false
}
""")
.build());
}
}
resources:
exampleWithDeliveryPolicy:
type: aws:sns:TopicSubscription
name: example_with_delivery_policy
properties:
topic: arn:aws:sns:us-west-2:123456789012:my-topic
protocol: https
endpoint: https://example.com/endpoint
rawMessageDelivery: true
deliveryPolicy: |
{
\"healthyRetryPolicy\": {
\"minDelayTarget\": 20,
\"maxDelayTarget\": 20,
\"numRetries\": 3,
\"numMaxDelayRetries\": 0,
\"numNoDelayRetries\": 0,
\"numMinDelayRetries\": 0,
\"backoffFunction\": \"linear\"
},
\"sicklyRetryPolicy\": null,
\"throttlePolicy\": null,
\"requestPolicy\": {
\"headerContentType\": \"text/plain; application/json\"
},
\"guaranteed\": false
}
The deliveryPolicy property defines retry behavior as a JSON string. The healthyRetryPolicy section controls retry timing: minDelayTarget and maxDelayTarget set the delay range, numRetries limits total attempts, and backoffFunction determines whether delays increase linearly or exponentially. The rawMessageDelivery property controls whether SNS wraps messages in JSON metadata or passes them directly.
Subscribe across AWS accounts and regions
Organizations with multiple accounts often route messages from a central topic to queues in different accounts for isolation.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const config = new pulumi.Config();
const sns = config.getObject<any>("sns") || {
"account-id": "111111111111",
displayName: "example",
name: "example-sns-topic",
region: "us-west-1",
"role-name": "service/service",
};
const sqs = config.getObject<any>("sqs") || {
"account-id": "222222222222",
name: "example-sqs-queue",
region: "us-east-1",
"role-name": "service/service",
};
const snsTopicPolicy = aws.iam.getPolicyDocument({
policyId: "__default_policy_ID",
statements: [
{
actions: [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission",
],
conditions: [{
test: "StringEquals",
variable: "AWS:SourceOwner",
values: [sns["account-id"]],
}],
effect: "Allow",
principals: [{
type: "AWS",
identifiers: ["*"],
}],
resources: [`arn:aws:sns:${sns.region}:${sns["account-id"]}:${sns.name}`],
sid: "__default_statement_ID",
},
{
actions: [
"SNS:Subscribe",
"SNS:Receive",
],
conditions: [{
test: "StringLike",
variable: "SNS:Endpoint",
values: [`arn:aws:sqs:${sqs.region}:${sqs["account-id"]}:${sqs.name}`],
}],
effect: "Allow",
principals: [{
type: "AWS",
identifiers: ["*"],
}],
resources: [`arn:aws:sns:${sns.region}:${sns["account-id"]}:${sns.name}`],
sid: "__console_sub_0",
},
],
});
const sqsQueuePolicy = aws.iam.getPolicyDocument({
policyId: `arn:aws:sqs:${sqs.region}:${sqs["account-id"]}:${sqs.name}/SQSDefaultPolicy`,
statements: [{
sid: "example-sns-topic",
effect: "Allow",
principals: [{
type: "AWS",
identifiers: ["*"],
}],
actions: ["SQS:SendMessage"],
resources: [`arn:aws:sqs:${sqs.region}:${sqs["account-id"]}:${sqs.name}`],
conditions: [{
test: "ArnEquals",
variable: "aws:SourceArn",
values: [`arn:aws:sns:${sns.region}:${sns["account-id"]}:${sns.name}`],
}],
}],
});
const snsTopic = new aws.sns.Topic("sns_topic", {
name: sns.name,
displayName: sns.display_name,
policy: snsTopicPolicy.then(snsTopicPolicy => snsTopicPolicy.json),
});
const sqsQueue = new aws.sqs.Queue("sqs_queue", {
name: sqs.name,
policy: sqsQueuePolicy.then(sqsQueuePolicy => sqsQueuePolicy.json),
});
const snsTopicTopicSubscription = new aws.sns.TopicSubscription("sns_topic", {
topic: snsTopic.arn,
protocol: "sqs",
endpoint: sqsQueue.arn,
});
import pulumi
import pulumi_aws as aws
config = pulumi.Config()
sns = config.get_object("sns")
if sns is None:
sns = {
"account-id": "111111111111",
"displayName": "example",
"name": "example-sns-topic",
"region": "us-west-1",
"role-name": "service/service",
}
sqs = config.get_object("sqs")
if sqs is None:
sqs = {
"account-id": "222222222222",
"name": "example-sqs-queue",
"region": "us-east-1",
"role-name": "service/service",
}
sns_topic_policy = aws.iam.get_policy_document(policy_id="__default_policy_ID",
statements=[
{
"actions": [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission",
],
"conditions": [{
"test": "StringEquals",
"variable": "AWS:SourceOwner",
"values": [sns["account-id"]],
}],
"effect": "Allow",
"principals": [{
"type": "AWS",
"identifiers": ["*"],
}],
"resources": [f"arn:aws:sns:{sns['region']}:{sns['account-id']}:{sns['name']}"],
"sid": "__default_statement_ID",
},
{
"actions": [
"SNS:Subscribe",
"SNS:Receive",
],
"conditions": [{
"test": "StringLike",
"variable": "SNS:Endpoint",
"values": [f"arn:aws:sqs:{sqs['region']}:{sqs['account-id']}:{sqs['name']}"],
}],
"effect": "Allow",
"principals": [{
"type": "AWS",
"identifiers": ["*"],
}],
"resources": [f"arn:aws:sns:{sns['region']}:{sns['account-id']}:{sns['name']}"],
"sid": "__console_sub_0",
},
])
sqs_queue_policy = aws.iam.get_policy_document(policy_id=f"arn:aws:sqs:{sqs['region']}:{sqs['account-id']}:{sqs['name']}/SQSDefaultPolicy",
statements=[{
"sid": "example-sns-topic",
"effect": "Allow",
"principals": [{
"type": "AWS",
"identifiers": ["*"],
}],
"actions": ["SQS:SendMessage"],
"resources": [f"arn:aws:sqs:{sqs['region']}:{sqs['account-id']}:{sqs['name']}"],
"conditions": [{
"test": "ArnEquals",
"variable": "aws:SourceArn",
"values": [f"arn:aws:sns:{sns['region']}:{sns['account-id']}:{sns['name']}"],
}],
}])
sns_topic = aws.sns.Topic("sns_topic",
name=sns["name"],
display_name=sns["display_name"],
policy=sns_topic_policy.json)
sqs_queue = aws.sqs.Queue("sqs_queue",
name=sqs["name"],
policy=sqs_queue_policy.json)
sns_topic_topic_subscription = aws.sns.TopicSubscription("sns_topic",
topic=sns_topic.arn,
protocol="sqs",
endpoint=sqs_queue.arn)
package main
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sns"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sqs"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
cfg := config.New(ctx, "")
sns := map[string]interface{}{
"account-id": "111111111111",
"displayName": "example",
"name": "example-sns-topic",
"region": "us-west-1",
"role-name": "service/service",
};
if param := cfg.GetObject("sns"); param != nil {
sns = param
}
sqs := map[string]interface{}{
"account-id": "222222222222",
"name": "example-sqs-queue",
"region": "us-east-1",
"role-name": "service/service",
};
if param := cfg.GetObject("sqs"); param != nil {
sqs = param
}
snsTopicPolicy, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
PolicyId: pulumi.StringRef("__default_policy_ID"),
Statements: []iam.GetPolicyDocumentStatement{
{
Actions: []string{
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission",
},
Conditions: []iam.GetPolicyDocumentStatementCondition{
{
Test: "StringEquals",
Variable: "AWS:SourceOwner",
Values: interface{}{
sns.AccountId,
},
},
},
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "AWS",
Identifiers: []string{
"*",
},
},
},
Resources: []string{
fmt.Sprintf("arn:aws:sns:%v:%v:%v", sns.Region, sns.AccountId, sns.Name),
},
Sid: pulumi.StringRef("__default_statement_ID"),
},
{
Actions: []string{
"SNS:Subscribe",
"SNS:Receive",
},
Conditions: []iam.GetPolicyDocumentStatementCondition{
{
Test: "StringLike",
Variable: "SNS:Endpoint",
Values: []string{
fmt.Sprintf("arn:aws:sqs:%v:%v:%v", sqs.Region, sqs.AccountId, sqs.Name),
},
},
},
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "AWS",
Identifiers: []string{
"*",
},
},
},
Resources: []string{
fmt.Sprintf("arn:aws:sns:%v:%v:%v", sns.Region, sns.AccountId, sns.Name),
},
Sid: pulumi.StringRef("__console_sub_0"),
},
},
}, nil);
if err != nil {
return err
}
sqsQueuePolicy, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
PolicyId: pulumi.StringRef(fmt.Sprintf("arn:aws:sqs:%v:%v:%v/SQSDefaultPolicy", sqs.Region, sqs.AccountId, sqs.Name)),
Statements: []iam.GetPolicyDocumentStatement{
{
Sid: pulumi.StringRef("example-sns-topic"),
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "AWS",
Identifiers: []string{
"*",
},
},
},
Actions: []string{
"SQS:SendMessage",
},
Resources: []string{
fmt.Sprintf("arn:aws:sqs:%v:%v:%v", sqs.Region, sqs.AccountId, sqs.Name),
},
Conditions: []iam.GetPolicyDocumentStatementCondition{
{
Test: "ArnEquals",
Variable: "aws:SourceArn",
Values: []string{
fmt.Sprintf("arn:aws:sns:%v:%v:%v", sns.Region, sns.AccountId, sns.Name),
},
},
},
},
},
}, nil);
if err != nil {
return err
}
snsTopic, err := sns.NewTopic(ctx, "sns_topic", &sns.TopicArgs{
Name: pulumi.Any(sns.Name),
DisplayName: pulumi.Any(sns.Display_name),
Policy: pulumi.String(snsTopicPolicy.Json),
})
if err != nil {
return err
}
sqsQueue, err := sqs.NewQueue(ctx, "sqs_queue", &sqs.QueueArgs{
Name: pulumi.Any(sqs.Name),
Policy: pulumi.String(sqsQueuePolicy.Json),
})
if err != nil {
return err
}
_, err = sns.NewTopicSubscription(ctx, "sns_topic", &sns.TopicSubscriptionArgs{
Topic: snsTopic.Arn,
Protocol: pulumi.String("sqs"),
Endpoint: sqsQueue.Arn,
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var config = new Config();
var sns = config.GetObject<dynamic>("sns") ??
{
{ "account-id", "111111111111" },
{ "displayName", "example" },
{ "name", "example-sns-topic" },
{ "region", "us-west-1" },
{ "role-name", "service/service" },
};
var sqs = config.GetObject<dynamic>("sqs") ??
{
{ "account-id", "222222222222" },
{ "name", "example-sqs-queue" },
{ "region", "us-east-1" },
{ "role-name", "service/service" },
};
var snsTopicPolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
PolicyId = "__default_policy_ID",
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Actions = new[]
{
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission",
},
Conditions = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "StringEquals",
Variable = "AWS:SourceOwner",
Values = new[]
{
sns.Account_id,
},
},
},
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "AWS",
Identifiers = new[]
{
"*",
},
},
},
Resources = new[]
{
$"arn:aws:sns:{sns.Region}:{sns.Account_id}:{sns.Name}",
},
Sid = "__default_statement_ID",
},
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Actions = new[]
{
"SNS:Subscribe",
"SNS:Receive",
},
Conditions = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "StringLike",
Variable = "SNS:Endpoint",
Values = new[]
{
$"arn:aws:sqs:{sqs.Region}:{sqs.Account_id}:{sqs.Name}",
},
},
},
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "AWS",
Identifiers = new[]
{
"*",
},
},
},
Resources = new[]
{
$"arn:aws:sns:{sns.Region}:{sns.Account_id}:{sns.Name}",
},
Sid = "__console_sub_0",
},
},
});
var sqsQueuePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
PolicyId = $"arn:aws:sqs:{sqs.Region}:{sqs.Account_id}:{sqs.Name}/SQSDefaultPolicy",
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Sid = "example-sns-topic",
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "AWS",
Identifiers = new[]
{
"*",
},
},
},
Actions = new[]
{
"SQS:SendMessage",
},
Resources = new[]
{
$"arn:aws:sqs:{sqs.Region}:{sqs.Account_id}:{sqs.Name}",
},
Conditions = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "ArnEquals",
Variable = "aws:SourceArn",
Values = new[]
{
$"arn:aws:sns:{sns.Region}:{sns.Account_id}:{sns.Name}",
},
},
},
},
},
});
var snsTopic = new Aws.Sns.Topic("sns_topic", new()
{
Name = sns.Name,
DisplayName = sns.Display_name,
Policy = snsTopicPolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var sqsQueue = new Aws.Sqs.Queue("sqs_queue", new()
{
Name = sqs.Name,
Policy = sqsQueuePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var snsTopicTopicSubscription = new Aws.Sns.TopicSubscription("sns_topic", new()
{
Topic = snsTopic.Arn,
Protocol = "sqs",
Endpoint = sqsQueue.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.sns.Topic;
import com.pulumi.aws.sns.TopicArgs;
import com.pulumi.aws.sqs.Queue;
import com.pulumi.aws.sqs.QueueArgs;
import com.pulumi.aws.sns.TopicSubscription;
import com.pulumi.aws.sns.TopicSubscriptionArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var config = ctx.config();
final var sns = config.get("sns").orElse(Map.ofEntries(
Map.entry("account-id", "111111111111"),
Map.entry("displayName", "example"),
Map.entry("name", "example-sns-topic"),
Map.entry("region", "us-west-1"),
Map.entry("role-name", "service/service")
));
final var sqs = config.get("sqs").orElse(Map.ofEntries(
Map.entry("account-id", "222222222222"),
Map.entry("name", "example-sqs-queue"),
Map.entry("region", "us-east-1"),
Map.entry("role-name", "service/service")
));
final var snsTopicPolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.policyId("__default_policy_ID")
.statements(
GetPolicyDocumentStatementArgs.builder()
.actions(
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission")
.conditions(GetPolicyDocumentStatementConditionArgs.builder()
.test("StringEquals")
.variable("AWS:SourceOwner")
.values(sns.account-id())
.build())
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("AWS")
.identifiers("*")
.build())
.resources(String.format("arn:aws:sns:%s:%s:%s", sns.region(),sns.account-id(),sns.name()))
.sid("__default_statement_ID")
.build(),
GetPolicyDocumentStatementArgs.builder()
.actions(
"SNS:Subscribe",
"SNS:Receive")
.conditions(GetPolicyDocumentStatementConditionArgs.builder()
.test("StringLike")
.variable("SNS:Endpoint")
.values(String.format("arn:aws:sqs:%s:%s:%s", sqs.region(),sqs.account-id(),sqs.name()))
.build())
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("AWS")
.identifiers("*")
.build())
.resources(String.format("arn:aws:sns:%s:%s:%s", sns.region(),sns.account-id(),sns.name()))
.sid("__console_sub_0")
.build())
.build());
final var sqsQueuePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.policyId(String.format("arn:aws:sqs:%s:%s:%s/SQSDefaultPolicy", sqs.region(),sqs.account-id(),sqs.name()))
.statements(GetPolicyDocumentStatementArgs.builder()
.sid("example-sns-topic")
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("AWS")
.identifiers("*")
.build())
.actions("SQS:SendMessage")
.resources(String.format("arn:aws:sqs:%s:%s:%s", sqs.region(),sqs.account-id(),sqs.name()))
.conditions(GetPolicyDocumentStatementConditionArgs.builder()
.test("ArnEquals")
.variable("aws:SourceArn")
.values(String.format("arn:aws:sns:%s:%s:%s", sns.region(),sns.account-id(),sns.name()))
.build())
.build())
.build());
var snsTopic = new Topic("snsTopic", TopicArgs.builder()
.name(sns.name())
.displayName(sns.display_name())
.policy(snsTopicPolicy.json())
.build());
var sqsQueue = new Queue("sqsQueue", QueueArgs.builder()
.name(sqs.name())
.policy(sqsQueuePolicy.json())
.build());
var snsTopicTopicSubscription = new TopicSubscription("snsTopicTopicSubscription", TopicSubscriptionArgs.builder()
.topic(snsTopic.arn())
.protocol("sqs")
.endpoint(sqsQueue.arn())
.build());
}
}
configuration:
sns:
type: dynamic
default:
account-id: '111111111111'
displayName: example
name: example-sns-topic
region: us-west-1
role-name: service/service
sqs:
type: dynamic
default:
account-id: '222222222222'
name: example-sqs-queue
region: us-east-1
role-name: service/service
resources:
snsTopic:
type: aws:sns:Topic
name: sns_topic
properties:
name: ${sns.name}
displayName: ${sns.display_name}
policy: ${snsTopicPolicy.json}
sqsQueue:
type: aws:sqs:Queue
name: sqs_queue
properties:
name: ${sqs.name}
policy: ${sqsQueuePolicy.json}
snsTopicTopicSubscription:
type: aws:sns:TopicSubscription
name: sns_topic
properties:
topic: ${snsTopic.arn}
protocol: sqs
endpoint: ${sqsQueue.arn}
variables:
snsTopicPolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
policyId: __default_policy_ID
statements:
- actions:
- SNS:Subscribe
- SNS:SetTopicAttributes
- SNS:RemovePermission
- SNS:Publish
- SNS:ListSubscriptionsByTopic
- SNS:GetTopicAttributes
- SNS:DeleteTopic
- SNS:AddPermission
conditions:
- test: StringEquals
variable: AWS:SourceOwner
values:
- ${sns"account-id"[%!s(MISSING)]}
effect: Allow
principals:
- type: AWS
identifiers:
- '*'
resources:
- arn:aws:sns:${sns.region}:${sns"account-id"[%!s(MISSING)]}:${sns.name}
sid: __default_statement_ID
- actions:
- SNS:Subscribe
- SNS:Receive
conditions:
- test: StringLike
variable: SNS:Endpoint
values:
- arn:aws:sqs:${sqs.region}:${sqs"account-id"[%!s(MISSING)]}:${sqs.name}
effect: Allow
principals:
- type: AWS
identifiers:
- '*'
resources:
- arn:aws:sns:${sns.region}:${sns"account-id"[%!s(MISSING)]}:${sns.name}
sid: __console_sub_0
sqsQueuePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
policyId: arn:aws:sqs:${sqs.region}:${sqs"account-id"[%!s(MISSING)]}:${sqs.name}/SQSDefaultPolicy
statements:
- sid: example-sns-topic
effect: Allow
principals:
- type: AWS
identifiers:
- '*'
actions:
- SQS:SendMessage
resources:
- arn:aws:sqs:${sqs.region}:${sqs"account-id"[%!s(MISSING)]}:${sqs.name}
conditions:
- test: ArnEquals
variable: aws:SourceArn
values:
- arn:aws:sns:${sns.region}:${sns"account-id"[%!s(MISSING)]}:${sns.name}
Cross-account subscriptions require IAM policies on both sides. The SNS topic policy allows the remote account to subscribe, and the SQS queue policy grants the topic permission to SendMessage. The subscription must use a provider in the same region as the SNS topic, even if the queue is in a different region. This example extends the basic SQS pattern with cross-account IAM configuration.
Beyond these examples
These snippets focus on specific subscription features: SQS queue subscriptions, HTTPS endpoint delivery with retry policies, and cross-account and cross-region routing. They’re intentionally minimal rather than full messaging architectures.
The examples reference pre-existing infrastructure such as SNS topics (by ARN) and IAM permissions for cross-account access. They focus on configuring the subscription rather than provisioning topics or queues.
To keep things focused, common subscription patterns are omitted, including:
- Message filtering (filterPolicy, filterPolicyScope)
- Dead letter queues (redrivePolicy)
- Lambda, Firehose, and mobile push subscriptions
- Email and SMS protocols (partially supported, require manual confirmation)
These omissions are intentional: the goal is to illustrate how each subscription feature is wired, not provide drop-in messaging modules. See the SNS Topic Subscription resource reference for all available configuration options.
Let's configure AWS SNS Topic Subscriptions
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Cross-Account & Cross-Region Setup
Subscription Confirmation & Lifecycle
email, email-json, or http/https protocols (without auto-confirmation), destroying the resource removes it from Pulumi state but the subscription remains in AWS until confirmed. You can either confirm the subscription first, delete the SNS topic (which deletes all subscriptions), or import the subscription after confirmation to enable deletion.confirmationTimeoutInMinutes for http and https protocols.endpointAutoConfirms to true if your endpoint can auto-confirm subscriptions (e.g., PagerDuty). This defaults to false.Protocol & Endpoint Configuration
sqs, sms, lambda, firehose, and application. The protocols email, email-json, http, and https are valid but only partially supported.endpoint, protocol, and topic properties are immutable. Changing any of these requires recreating the subscription.Message Delivery & Filtering
aws.sns.Topic, the deliveryPolicy for TopicSubscription should NOT be wrapped in an "http" object. Provide the JSON policy directly with properties like healthyRetryPolicy, throttlePolicy, and requestPolicy.rawMessageDelivery to true to pass the original message directly without wrapping it in JSON. This defaults to false.filterPolicy with a JSON string to filter messages. By default, filterPolicyScope applies to MessageAttributes, but you can set it to MessageBody to filter based on message content instead.redrivePolicy property with a JSON string specifying the dead-letter queue configuration.Using a different cloud?
Explore messaging guides for other cloud providers: