The gcp:pubsub/subscription:Subscription resource, part of the Pulumi GCP provider, defines how messages from a Pub/Sub topic are delivered to consuming applications. This guide focuses on four capabilities: pull and push delivery configuration, BigQuery and Cloud Storage streaming, message filtering and transformation, and dead letter handling.
Subscriptions reference existing topics and may integrate with BigQuery tables, Cloud Storage buckets, or HTTP endpoints. The examples are intentionally small. Combine them with your own topics, IAM roles, and destination infrastructure.
Configure pull-based message consumption with retention
Most deployments start with pull subscriptions, where applications fetch messages on demand and control their own processing rate.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
labels: {
foo: "bar",
},
messageRetentionDuration: "1200s",
retainAckedMessages: true,
ackDeadlineSeconds: 20,
expirationPolicy: {
ttl: "300000.5s",
},
retryPolicy: {
minimumBackoff: "10s",
},
enableMessageOrdering: false,
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
labels={
"foo": "bar",
},
message_retention_duration="1200s",
retain_acked_messages=True,
ack_deadline_seconds=20,
expiration_policy={
"ttl": "300000.5s",
},
retry_policy={
"minimum_backoff": "10s",
},
enable_message_ordering=False)
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
Labels: pulumi.StringMap{
"foo": pulumi.String("bar"),
},
MessageRetentionDuration: pulumi.String("1200s"),
RetainAckedMessages: pulumi.Bool(true),
AckDeadlineSeconds: pulumi.Int(20),
ExpirationPolicy: &pubsub.SubscriptionExpirationPolicyArgs{
Ttl: pulumi.String("300000.5s"),
},
RetryPolicy: &pubsub.SubscriptionRetryPolicyArgs{
MinimumBackoff: pulumi.String("10s"),
},
EnableMessageOrdering: pulumi.Bool(false),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
Labels =
{
{ "foo", "bar" },
},
MessageRetentionDuration = "1200s",
RetainAckedMessages = true,
AckDeadlineSeconds = 20,
ExpirationPolicy = new Gcp.PubSub.Inputs.SubscriptionExpirationPolicyArgs
{
Ttl = "300000.5s",
},
RetryPolicy = new Gcp.PubSub.Inputs.SubscriptionRetryPolicyArgs
{
MinimumBackoff = "10s",
},
EnableMessageOrdering = false,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionExpirationPolicyArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionRetryPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.labels(Map.of("foo", "bar"))
.messageRetentionDuration("1200s")
.retainAckedMessages(true)
.ackDeadlineSeconds(20)
.expirationPolicy(SubscriptionExpirationPolicyArgs.builder()
.ttl("300000.5s")
.build())
.retryPolicy(SubscriptionRetryPolicyArgs.builder()
.minimumBackoff("10s")
.build())
.enableMessageOrdering(false)
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
labels:
foo: bar
messageRetentionDuration: 1200s
retainAckedMessages: true
ackDeadlineSeconds: 20
expirationPolicy:
ttl: 300000.5s
retryPolicy:
minimumBackoff: 10s
enableMessageOrdering: false
The topic property references the source topic. The messageRetentionDuration controls how long unacknowledged messages remain available (defaults to 7 days). When retainAckedMessages is true, acknowledged messages also stay in the backlog for the retention window, enabling replay via subscriptions.seek. The ackDeadlineSeconds sets how long Pub/Sub waits for acknowledgment before redelivering (10-600 seconds). The expirationPolicy determines when inactive subscriptions are deleted; setting ttl to empty string prevents expiration. The retryPolicy configures backoff behavior for failed deliveries.
Push messages to an HTTP endpoint
Applications that expose HTTP endpoints can receive messages via push delivery, where Pub/Sub sends each message as an HTTP POST request.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
ackDeadlineSeconds: 20,
labels: {
foo: "bar",
},
pushConfig: {
pushEndpoint: "https://example.com/push",
attributes: {
"x-goog-version": "v1",
},
},
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
ack_deadline_seconds=20,
labels={
"foo": "bar",
},
push_config={
"push_endpoint": "https://example.com/push",
"attributes": {
"x-goog-version": "v1",
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
AckDeadlineSeconds: pulumi.Int(20),
Labels: pulumi.StringMap{
"foo": pulumi.String("bar"),
},
PushConfig: &pubsub.SubscriptionPushConfigArgs{
PushEndpoint: pulumi.String("https://example.com/push"),
Attributes: pulumi.StringMap{
"x-goog-version": pulumi.String("v1"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
AckDeadlineSeconds = 20,
Labels =
{
{ "foo", "bar" },
},
PushConfig = new Gcp.PubSub.Inputs.SubscriptionPushConfigArgs
{
PushEndpoint = "https://example.com/push",
Attributes =
{
{ "x-goog-version", "v1" },
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionPushConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.ackDeadlineSeconds(20)
.labels(Map.of("foo", "bar"))
.pushConfig(SubscriptionPushConfigArgs.builder()
.pushEndpoint("https://example.com/push")
.attributes(Map.of("x-goog-version", "v1"))
.build())
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
ackDeadlineSeconds: 20
labels:
foo: bar
pushConfig:
pushEndpoint: https://example.com/push
attributes:
x-goog-version: v1
The pushConfig block defines the HTTP endpoint and request attributes. The pushEndpoint must be an HTTPS URL that accepts POST requests. The attributes map adds custom headers to each request (e.g., “x-goog-version” for API versioning). The ackDeadlineSeconds also controls the HTTP request timeout for push delivery.
Filter messages by attributes before delivery
When topics carry multiple message types, subscriptions can filter by message attributes to receive only relevant messages.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
labels: {
foo: "bar",
},
filter: ` attributes.foo = \\"foo\\"
AND attributes.bar = \\"bar\\"
`,
ackDeadlineSeconds: 20,
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
labels={
"foo": "bar",
},
filter=""" attributes.foo = \"foo\"
AND attributes.bar = \"bar\"
""",
ack_deadline_seconds=20)
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
Labels: pulumi.StringMap{
"foo": pulumi.String("bar"),
},
Filter: pulumi.String(" attributes.foo = \\\"foo\\\"\n AND attributes.bar = \\\"bar\\\"\n"),
AckDeadlineSeconds: pulumi.Int(20),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
Labels =
{
{ "foo", "bar" },
},
Filter = @" attributes.foo = \""foo\""
AND attributes.bar = \""bar\""
",
AckDeadlineSeconds = 20,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.labels(Map.of("foo", "bar"))
.filter("""
attributes.foo = \"foo\"
AND attributes.bar = \"bar\"
""")
.ackDeadlineSeconds(20)
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
labels:
foo: bar
filter: |2
attributes.foo = \"foo\"
AND attributes.bar = \"bar\"
ackDeadlineSeconds: 20
The filter property uses a SQL-like syntax to match message attributes. Pub/Sub automatically acknowledges messages that don’t match, so they never reach your application. Filters are immutable after creation and limited to 256 bytes. This extends the basic pull subscription pattern by adding attribute-based filtering at the subscription level.
Route unprocessable messages to a dead letter topic
Messages that repeatedly fail processing can be routed to a separate topic after a configured number of delivery attempts.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleDeadLetter = new gcp.pubsub.Topic("example_dead_letter", {name: "example-topic-dead-letter"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
deadLetterPolicy: {
deadLetterTopic: exampleDeadLetter.id,
maxDeliveryAttempts: 10,
},
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_dead_letter = gcp.pubsub.Topic("example_dead_letter", name="example-topic-dead-letter")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
dead_letter_policy={
"dead_letter_topic": example_dead_letter.id,
"max_delivery_attempts": 10,
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
exampleDeadLetter, err := pubsub.NewTopic(ctx, "example_dead_letter", &pubsub.TopicArgs{
Name: pulumi.String("example-topic-dead-letter"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
DeadLetterPolicy: &pubsub.SubscriptionDeadLetterPolicyArgs{
DeadLetterTopic: exampleDeadLetter.ID(),
MaxDeliveryAttempts: pulumi.Int(10),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleDeadLetter = new Gcp.PubSub.Topic("example_dead_letter", new()
{
Name = "example-topic-dead-letter",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
DeadLetterPolicy = new Gcp.PubSub.Inputs.SubscriptionDeadLetterPolicyArgs
{
DeadLetterTopic = exampleDeadLetter.Id,
MaxDeliveryAttempts = 10,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionDeadLetterPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleDeadLetter = new Topic("exampleDeadLetter", TopicArgs.builder()
.name("example-topic-dead-letter")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.deadLetterPolicy(SubscriptionDeadLetterPolicyArgs.builder()
.deadLetterTopic(exampleDeadLetter.id())
.maxDeliveryAttempts(10)
.build())
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleDeadLetter:
type: gcp:pubsub:Topic
name: example_dead_letter
properties:
name: example-topic-dead-letter
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
deadLetterPolicy:
deadLetterTopic: ${exampleDeadLetter.id}
maxDeliveryAttempts: 10
The deadLetterPolicy sends messages to a separate topic after maxDeliveryAttempts failures. The deadLetterTopic must exist before creating the subscription. The Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) needs permission to acknowledge messages on both the main subscription and the dead letter topic.
Stream messages directly into BigQuery tables
Analytics pipelines can bypass application code by streaming Pub/Sub messages directly into BigQuery tables.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const test = new gcp.bigquery.Dataset("test", {datasetId: "example_dataset"});
const testTable = new gcp.bigquery.Table("test", {
tableId: "example_table",
datasetId: test.datasetId,
schema: `[
{
\\"name\\": \\"data\\",
\\"type\\": \\"STRING\\",
\\"mode\\": \\"NULLABLE\\",
\\"description\\": \\"The data\\"
}
]
`,
deletionProtection: false,
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
bigqueryConfig: {
table: pulumi.interpolate`${testTable.project}.${testTable.datasetId}.${testTable.tableId}`,
},
});
const project = gcp.organizations.getProject({});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
test = gcp.bigquery.Dataset("test", dataset_id="example_dataset")
test_table = gcp.bigquery.Table("test",
table_id="example_table",
dataset_id=test.dataset_id,
schema="""[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
""",
deletion_protection=False)
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
bigquery_config={
"table": pulumi.Output.all(
project=test_table.project,
dataset_id=test_table.dataset_id,
table_id=test_table.table_id
).apply(lambda resolved_outputs: f"{resolved_outputs['project']}.{resolved_outputs['dataset_id']}.{resolved_outputs['table_id']}")
,
})
project = gcp.organizations.get_project()
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("example_dataset"),
})
if err != nil {
return err
}
testTable, err := bigquery.NewTable(ctx, "test", &bigquery.TableArgs{
TableId: pulumi.String("example_table"),
DatasetId: test.DatasetId,
Schema: pulumi.String(`[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
`),
DeletionProtection: pulumi.Bool(false),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
BigqueryConfig: &pubsub.SubscriptionBigqueryConfigArgs{
Table: pulumi.All(testTable.Project, testTable.DatasetId, testTable.TableId).ApplyT(func(_args []interface{}) (string, error) {
project := _args[0].(string)
datasetId := _args[1].(string)
tableId := _args[2].(string)
return fmt.Sprintf("%v.%v.%v", project, datasetId, tableId), nil
}).(pulumi.StringOutput),
},
})
if err != nil {
return err
}
_, err = organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "example_dataset",
});
var testTable = new Gcp.BigQuery.Table("test", new()
{
TableId = "example_table",
DatasetId = test.DatasetId,
Schema = @"[
{
\""name\"": \""data\"",
\""type\"": \""STRING\"",
\""mode\"": \""NULLABLE\"",
\""description\"": \""The data\""
}
]
",
DeletionProtection = false,
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
BigqueryConfig = new Gcp.PubSub.Inputs.SubscriptionBigqueryConfigArgs
{
Table = Output.Tuple(testTable.Project, testTable.DatasetId, testTable.TableId).Apply(values =>
{
var project = values.Item1;
var datasetId = values.Item2;
var tableId = values.Item3;
return $"{project}.{datasetId}.{tableId}";
}),
},
});
var project = Gcp.Organizations.GetProject.Invoke();
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Table;
import com.pulumi.gcp.bigquery.TableArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionBigqueryConfigArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("example_dataset")
.build());
var testTable = new Table("testTable", TableArgs.builder()
.tableId("example_table")
.datasetId(test.datasetId())
.schema("""
[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
""")
.deletionProtection(false)
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.bigqueryConfig(SubscriptionBigqueryConfigArgs.builder()
.table(Output.tuple(testTable.project(), testTable.datasetId(), testTable.tableId()).applyValue(values -> {
var project = values.t1;
var datasetId = values.t2;
var tableId = values.t3;
return String.format("%s.%s.%s", project,datasetId,tableId);
}))
.build())
.build());
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
bigqueryConfig:
table: ${testTable.project}.${testTable.datasetId}.${testTable.tableId}
test:
type: gcp:bigquery:Dataset
properties:
datasetId: example_dataset
testTable:
type: gcp:bigquery:Table
name: test
properties:
tableId: example_table
datasetId: ${test.datasetId}
schema: |
[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
deletionProtection: false
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The bigqueryConfig block routes messages to a BigQuery table specified in “project.dataset.table” format. Messages are written as they arrive, enabling real-time analytics. The BigQuery table must exist with a schema that matches your message structure. The Pub/Sub service account needs bigquery.dataEditor and bigquery.metadataViewer roles on the target project.
Archive messages to Cloud Storage buckets
Long-term message archival or data lake ingestion can route messages directly to Cloud Storage.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.storage.Bucket("example", {
name: "example-bucket",
location: "US",
uniformBucketLevelAccess: true,
});
const exampleTopic = new gcp.pubsub.Topic("example", {name: "example-topic"});
const project = gcp.organizations.getProject({});
const admin = new gcp.storage.BucketIAMMember("admin", {
bucket: example.name,
role: "roles/storage.admin",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com`),
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: exampleTopic.id,
cloudStorageConfig: {
bucket: example.name,
filenamePrefix: "pre-",
filenameSuffix: "-_29225",
filenameDatetimeFormat: "YYYY-MM-DD/hh_mm_ssZ",
maxBytes: 1000,
maxDuration: "300s",
maxMessages: 1000,
},
}, {
dependsOn: [
example,
admin,
],
});
import pulumi
import pulumi_gcp as gcp
example = gcp.storage.Bucket("example",
name="example-bucket",
location="US",
uniform_bucket_level_access=True)
example_topic = gcp.pubsub.Topic("example", name="example-topic")
project = gcp.organizations.get_project()
admin = gcp.storage.BucketIAMMember("admin",
bucket=example.name,
role="roles/storage.admin",
member=f"serviceAccount:service-{project.number}@gcp-sa-pubsub.iam.gserviceaccount.com")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example_topic.id,
cloud_storage_config={
"bucket": example.name,
"filename_prefix": "pre-",
"filename_suffix": "-_29225",
"filename_datetime_format": "YYYY-MM-DD/hh_mm_ssZ",
"max_bytes": 1000,
"max_duration": "300s",
"max_messages": 1000,
},
opts = pulumi.ResourceOptions(depends_on=[
example,
admin,
]))
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := storage.NewBucket(ctx, "example", &storage.BucketArgs{
Name: pulumi.String("example-bucket"),
Location: pulumi.String("US"),
UniformBucketLevelAccess: pulumi.Bool(true),
})
if err != nil {
return err
}
exampleTopic, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
admin, err := storage.NewBucketIAMMember(ctx, "admin", &storage.BucketIAMMemberArgs{
Bucket: example.Name,
Role: pulumi.String("roles/storage.admin"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-pubsub.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: exampleTopic.ID(),
CloudStorageConfig: &pubsub.SubscriptionCloudStorageConfigArgs{
Bucket: example.Name,
FilenamePrefix: pulumi.String("pre-"),
FilenameSuffix: pulumi.String("-_29225"),
FilenameDatetimeFormat: pulumi.String("YYYY-MM-DD/hh_mm_ssZ"),
MaxBytes: pulumi.Int(1000),
MaxDuration: pulumi.String("300s"),
MaxMessages: pulumi.Int(1000),
},
}, pulumi.DependsOn([]pulumi.Resource{
example,
admin,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.Storage.Bucket("example", new()
{
Name = "example-bucket",
Location = "US",
UniformBucketLevelAccess = true,
});
var exampleTopic = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var project = Gcp.Organizations.GetProject.Invoke();
var admin = new Gcp.Storage.BucketIAMMember("admin", new()
{
Bucket = example.Name,
Role = "roles/storage.admin",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-pubsub.iam.gserviceaccount.com",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = exampleTopic.Id,
CloudStorageConfig = new Gcp.PubSub.Inputs.SubscriptionCloudStorageConfigArgs
{
Bucket = example.Name,
FilenamePrefix = "pre-",
FilenameSuffix = "-_29225",
FilenameDatetimeFormat = "YYYY-MM-DD/hh_mm_ssZ",
MaxBytes = 1000,
MaxDuration = "300s",
MaxMessages = 1000,
},
}, new CustomResourceOptions
{
DependsOn =
{
example,
admin,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionCloudStorageConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Bucket("example", BucketArgs.builder()
.name("example-bucket")
.location("US")
.uniformBucketLevelAccess(true)
.build());
var exampleTopic = new Topic("exampleTopic", TopicArgs.builder()
.name("example-topic")
.build());
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var admin = new BucketIAMMember("admin", BucketIAMMemberArgs.builder()
.bucket(example.name())
.role("roles/storage.admin")
.member(String.format("serviceAccount:service-%s@gcp-sa-pubsub.iam.gserviceaccount.com", project.number()))
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(exampleTopic.id())
.cloudStorageConfig(SubscriptionCloudStorageConfigArgs.builder()
.bucket(example.name())
.filenamePrefix("pre-")
.filenameSuffix("-_29225")
.filenameDatetimeFormat("YYYY-MM-DD/hh_mm_ssZ")
.maxBytes(1000)
.maxDuration("300s")
.maxMessages(1000)
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(
example,
admin)
.build());
}
}
resources:
example:
type: gcp:storage:Bucket
properties:
name: example-bucket
location: US
uniformBucketLevelAccess: true
exampleTopic:
type: gcp:pubsub:Topic
name: example
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${exampleTopic.id}
cloudStorageConfig:
bucket: ${example.name}
filenamePrefix: pre-
filenameSuffix: -_29225
filenameDatetimeFormat: YYYY-MM-DD/hh_mm_ssZ
maxBytes: 1000
maxDuration: 300s
maxMessages: 1000
options:
dependsOn:
- ${example}
- ${admin}
admin:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${example.name}
role: roles/storage.admin
member: serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The cloudStorageConfig block writes messages to a bucket with configurable file naming and batching. The filenamePrefix and filenameSuffix customize object names. The filenameDatetimeFormat organizes files by timestamp (e.g., “YYYY-MM-DD/hh_mm_ssZ”). Files are created when they reach maxBytes, maxDuration, or maxMessages thresholds. The Pub/Sub service account needs storage.admin role on the bucket.
Transform messages with JavaScript functions
Applications can apply custom transformations to messages before delivery using JavaScript user-defined functions.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
messageTransforms: [{
javascriptUdf: {
functionName: "isYearEven",
code: `function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
`,
},
}],
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
message_transforms=[{
"javascript_udf": {
"function_name": "isYearEven",
"code": """function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
""",
},
}])
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
MessageTransforms: pubsub.SubscriptionMessageTransformArray{
&pubsub.SubscriptionMessageTransformArgs{
JavascriptUdf: &pubsub.SubscriptionMessageTransformJavascriptUdfArgs{
FunctionName: pulumi.String("isYearEven"),
Code: pulumi.String("function isYearEven(message, metadata) {\n const data = JSON.parse(message.data);\n return message.year %2 === 0;\n}\n"),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
MessageTransforms = new[]
{
new Gcp.PubSub.Inputs.SubscriptionMessageTransformArgs
{
JavascriptUdf = new Gcp.PubSub.Inputs.SubscriptionMessageTransformJavascriptUdfArgs
{
FunctionName = "isYearEven",
Code = @"function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
",
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformJavascriptUdfArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.messageTransforms(SubscriptionMessageTransformArgs.builder()
.javascriptUdf(SubscriptionMessageTransformJavascriptUdfArgs.builder()
.functionName("isYearEven")
.code("""
function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
""")
.build())
.build())
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
messageTransforms:
- javascriptUdf:
functionName: isYearEven
code: |
function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
The messageTransforms array defines JavaScript functions that process messages in order. Each javascriptUdf specifies a functionName and code. Functions receive the message and metadata, and can filter (return null), modify (return transformed message), or pass through unchanged. Transformations run within Pub/Sub before delivery to your application.
Beyond these examples
These snippets focus on specific subscription-level features: pull and push delivery modes, BigQuery and Cloud Storage integration, message filtering and transformation, and dead letter handling. They’re intentionally minimal rather than full messaging architectures.
The examples reference pre-existing infrastructure such as Pub/Sub topics, BigQuery datasets and tables, Cloud Storage buckets, HTTP endpoints for push delivery, and IAM permissions for service accounts. They focus on configuring the subscription rather than provisioning everything around it.
To keep things focused, common subscription patterns are omitted, including:
- Exactly-once delivery guarantees (enableExactlyOnceDelivery)
- Message ordering (enableMessageOrdering)
- Retry policies and backoff configuration
- Subscription expiration policies
- Resource manager tags
These omissions are intentional: the goal is to illustrate how each subscription feature is wired, not provide drop-in messaging modules. See the Pub/Sub Subscription resource reference for all available configuration options.
Let's create Google Cloud Pub/Sub Subscriptions
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Delivery & Configuration
pushConfig, bigqueryConfig, or cloudStorageConfig can be set. If all three are empty, the subscriber will pull and acknowledge messages using API methods.bigqueryConfig with the table reference and serviceAccountEmail. Grant the service account roles/bigquery.metadataViewer and roles/bigquery.dataEditor, then use dependsOn to ensure IAM bindings are created before the subscription.cloudStorageConfig with the bucket name and optional filename settings (filenamePrefix, filenameSuffix, filenameDatetimeFormat). Grant the Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) the roles/storage.admin role on the bucket.Message Handling
messageRetentionDuration with a minimum of 10 minutes ("600s") and a maximum of 31 days ("2678400s").enableExactlyOnceDelivery guarantees no message redelivery before the ack deadline expires and no resending of acknowledged messages. enableMessageOrdering ensures messages with the same orderingKey are delivered in the order received by Pub/Sub. Both are independent features.Immutability & Lifecycle
name, project, topic, enableMessageOrdering, filter, and tags.expirationPolicy.ttl to an empty string (""). The minimum TTL if specified is 1 day.IAM & Permissions
service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have permission to Acknowledge() messages on the subscription.gcp.projects.ServiceIdentity resource to retrieve the service account email for IAM bindings.Filtering & Transforms
messageTransforms array. Transforms are applied in the order specified, and individual transforms can be disabled using the disabled flag.labels field is non-authoritative and only manages labels in your configuration. Use effectiveLabels to see all labels present on the resource, including those set by other clients and services.Using a different cloud?
Explore messaging guides for other cloud providers: