The gcp:pubsub/subscription:Subscription resource, part of the Pulumi GCP provider, defines how messages from a Pub/Sub topic are delivered to subscribers. This guide focuses on four capabilities: push and pull delivery modes, message filtering and transformation, BigQuery and Cloud Storage integration, and dead letter handling.
Subscriptions reference existing Pub/Sub topics and may deliver to HTTP endpoints, BigQuery tables, or Cloud Storage buckets. The examples are intentionally small. Combine them with your own topics, IAM permissions, and destination infrastructure.
Push messages to an HTTP endpoint
Many event-driven architectures route Pub/Sub messages to HTTP services that process events as they arrive, eliminating the need for subscribers to poll.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
ackDeadlineSeconds: 20,
labels: {
foo: "bar",
},
pushConfig: {
pushEndpoint: "https://example.com/push",
attributes: {
"x-goog-version": "v1",
},
},
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
ack_deadline_seconds=20,
labels={
"foo": "bar",
},
push_config={
"push_endpoint": "https://example.com/push",
"attributes": {
"x-goog-version": "v1",
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
AckDeadlineSeconds: pulumi.Int(20),
Labels: pulumi.StringMap{
"foo": pulumi.String("bar"),
},
PushConfig: &pubsub.SubscriptionPushConfigArgs{
PushEndpoint: pulumi.String("https://example.com/push"),
Attributes: pulumi.StringMap{
"x-goog-version": pulumi.String("v1"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
AckDeadlineSeconds = 20,
Labels =
{
{ "foo", "bar" },
},
PushConfig = new Gcp.PubSub.Inputs.SubscriptionPushConfigArgs
{
PushEndpoint = "https://example.com/push",
Attributes =
{
{ "x-goog-version", "v1" },
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionPushConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.ackDeadlineSeconds(20)
.labels(Map.of("foo", "bar"))
.pushConfig(SubscriptionPushConfigArgs.builder()
.pushEndpoint("https://example.com/push")
.attributes(Map.of("x-goog-version", "v1"))
.build())
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
ackDeadlineSeconds: 20
labels:
foo: bar
pushConfig:
pushEndpoint: https://example.com/push
attributes:
x-goog-version: v1
When a message arrives, Pub/Sub sends an HTTP POST request to the pushEndpoint. The attributes map passes custom headers (like API version) to your endpoint. The ackDeadlineSeconds property sets how long Pub/Sub waits for an HTTP 200 response before considering the delivery failed and retrying.
Configure pull-based message consumption
Applications that need fine-grained control over message processing use pull subscriptions, where the subscriber explicitly requests messages and controls acknowledgment timing.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
labels: {
foo: "bar",
},
messageRetentionDuration: "1200s",
retainAckedMessages: true,
ackDeadlineSeconds: 20,
expirationPolicy: {
ttl: "300000.5s",
},
retryPolicy: {
minimumBackoff: "10s",
},
enableMessageOrdering: false,
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
labels={
"foo": "bar",
},
message_retention_duration="1200s",
retain_acked_messages=True,
ack_deadline_seconds=20,
expiration_policy={
"ttl": "300000.5s",
},
retry_policy={
"minimum_backoff": "10s",
},
enable_message_ordering=False)
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
Labels: pulumi.StringMap{
"foo": pulumi.String("bar"),
},
MessageRetentionDuration: pulumi.String("1200s"),
RetainAckedMessages: pulumi.Bool(true),
AckDeadlineSeconds: pulumi.Int(20),
ExpirationPolicy: &pubsub.SubscriptionExpirationPolicyArgs{
Ttl: pulumi.String("300000.5s"),
},
RetryPolicy: &pubsub.SubscriptionRetryPolicyArgs{
MinimumBackoff: pulumi.String("10s"),
},
EnableMessageOrdering: pulumi.Bool(false),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
Labels =
{
{ "foo", "bar" },
},
MessageRetentionDuration = "1200s",
RetainAckedMessages = true,
AckDeadlineSeconds = 20,
ExpirationPolicy = new Gcp.PubSub.Inputs.SubscriptionExpirationPolicyArgs
{
Ttl = "300000.5s",
},
RetryPolicy = new Gcp.PubSub.Inputs.SubscriptionRetryPolicyArgs
{
MinimumBackoff = "10s",
},
EnableMessageOrdering = false,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionExpirationPolicyArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionRetryPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.labels(Map.of("foo", "bar"))
.messageRetentionDuration("1200s")
.retainAckedMessages(true)
.ackDeadlineSeconds(20)
.expirationPolicy(SubscriptionExpirationPolicyArgs.builder()
.ttl("300000.5s")
.build())
.retryPolicy(SubscriptionRetryPolicyArgs.builder()
.minimumBackoff("10s")
.build())
.enableMessageOrdering(false)
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
labels:
foo: bar
messageRetentionDuration: 1200s
retainAckedMessages: true
ackDeadlineSeconds: 20
expirationPolicy:
ttl: 300000.5s
retryPolicy:
minimumBackoff: 10s
enableMessageOrdering: false
Pull subscriptions store messages in a backlog until your application retrieves them. The messageRetentionDuration controls how long unacknowledged messages remain available (up to 31 days). Setting retainAckedMessages to true keeps acknowledged messages in the backlog for the same duration, enabling time-travel queries. The expirationPolicy automatically deletes inactive subscriptions; setting ttl to an empty string disables expiration. The retryPolicy configures backoff timing when delivery fails.
Filter messages by attribute values
When a topic carries multiple message types, subscriptions can filter by attributes to receive only relevant messages. Pub/Sub automatically acknowledges filtered-out messages.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
labels: {
foo: "bar",
},
filter: ` attributes.foo = \\"foo\\"
AND attributes.bar = \\"bar\\"
`,
ackDeadlineSeconds: 20,
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
labels={
"foo": "bar",
},
filter=""" attributes.foo = \"foo\"
AND attributes.bar = \"bar\"
""",
ack_deadline_seconds=20)
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
Labels: pulumi.StringMap{
"foo": pulumi.String("bar"),
},
Filter: pulumi.String(" attributes.foo = \\\"foo\\\"\n AND attributes.bar = \\\"bar\\\"\n"),
AckDeadlineSeconds: pulumi.Int(20),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
Labels =
{
{ "foo", "bar" },
},
Filter = @" attributes.foo = \""foo\""
AND attributes.bar = \""bar\""
",
AckDeadlineSeconds = 20,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.labels(Map.of("foo", "bar"))
.filter("""
attributes.foo = \"foo\"
AND attributes.bar = \"bar\"
""")
.ackDeadlineSeconds(20)
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
labels:
foo: bar
filter: |2
attributes.foo = \"foo\"
AND attributes.bar = \"bar\"
ackDeadlineSeconds: 20
The filter property uses a SQL-like syntax to match message attributes. Only messages where both attributes.foo = "foo" AND attributes.bar = "bar" are delivered to this subscription. Filters are immutable after creation and have a 256-byte limit.
Route undeliverable messages to a dead letter topic
Messages that repeatedly fail processing can block progress. Dead letter policies automatically move problematic messages to a separate topic after a configured number of delivery attempts.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleDeadLetter = new gcp.pubsub.Topic("example_dead_letter", {name: "example-topic-dead-letter"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
deadLetterPolicy: {
deadLetterTopic: exampleDeadLetter.id,
maxDeliveryAttempts: 10,
},
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_dead_letter = gcp.pubsub.Topic("example_dead_letter", name="example-topic-dead-letter")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
dead_letter_policy={
"dead_letter_topic": example_dead_letter.id,
"max_delivery_attempts": 10,
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
exampleDeadLetter, err := pubsub.NewTopic(ctx, "example_dead_letter", &pubsub.TopicArgs{
Name: pulumi.String("example-topic-dead-letter"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
DeadLetterPolicy: &pubsub.SubscriptionDeadLetterPolicyArgs{
DeadLetterTopic: exampleDeadLetter.ID(),
MaxDeliveryAttempts: pulumi.Int(10),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleDeadLetter = new Gcp.PubSub.Topic("example_dead_letter", new()
{
Name = "example-topic-dead-letter",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
DeadLetterPolicy = new Gcp.PubSub.Inputs.SubscriptionDeadLetterPolicyArgs
{
DeadLetterTopic = exampleDeadLetter.Id,
MaxDeliveryAttempts = 10,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionDeadLetterPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleDeadLetter = new Topic("exampleDeadLetter", TopicArgs.builder()
.name("example-topic-dead-letter")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.deadLetterPolicy(SubscriptionDeadLetterPolicyArgs.builder()
.deadLetterTopic(exampleDeadLetter.id())
.maxDeliveryAttempts(10)
.build())
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleDeadLetter:
type: gcp:pubsub:Topic
name: example_dead_letter
properties:
name: example-topic-dead-letter
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
deadLetterPolicy:
deadLetterTopic: ${exampleDeadLetter.id}
maxDeliveryAttempts: 10
When a message fails delivery maxDeliveryAttempts times (here, 10), Pub/Sub moves it to the deadLetterTopic instead of retrying indefinitely. The Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) needs permission to acknowledge messages on both the main subscription and the dead letter topic.
Stream messages directly into BigQuery tables
Analytics pipelines often load streaming data into BigQuery for querying. BigQuery subscriptions write messages directly to tables without intermediate processing.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const test = new gcp.bigquery.Dataset("test", {datasetId: "example_dataset"});
const testTable = new gcp.bigquery.Table("test", {
tableId: "example_table",
datasetId: test.datasetId,
schema: `[
{
\\"name\\": \\"data\\",
\\"type\\": \\"STRING\\",
\\"mode\\": \\"NULLABLE\\",
\\"description\\": \\"The data\\"
}
]
`,
deletionProtection: false,
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
bigqueryConfig: {
table: pulumi.interpolate`${testTable.project}.${testTable.datasetId}.${testTable.tableId}`,
},
});
const project = gcp.organizations.getProject({});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
test = gcp.bigquery.Dataset("test", dataset_id="example_dataset")
test_table = gcp.bigquery.Table("test",
table_id="example_table",
dataset_id=test.dataset_id,
schema="""[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
""",
deletion_protection=False)
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
bigquery_config={
"table": pulumi.Output.all(
project=test_table.project,
dataset_id=test_table.dataset_id,
table_id=test_table.table_id
).apply(lambda resolved_outputs: f"{resolved_outputs['project']}.{resolved_outputs['dataset_id']}.{resolved_outputs['table_id']}")
,
})
project = gcp.organizations.get_project()
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("example_dataset"),
})
if err != nil {
return err
}
testTable, err := bigquery.NewTable(ctx, "test", &bigquery.TableArgs{
TableId: pulumi.String("example_table"),
DatasetId: test.DatasetId,
Schema: pulumi.String(`[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
`),
DeletionProtection: pulumi.Bool(false),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
BigqueryConfig: &pubsub.SubscriptionBigqueryConfigArgs{
Table: pulumi.All(testTable.Project, testTable.DatasetId, testTable.TableId).ApplyT(func(_args []interface{}) (string, error) {
project := _args[0].(string)
datasetId := _args[1].(string)
tableId := _args[2].(string)
return fmt.Sprintf("%v.%v.%v", project, datasetId, tableId), nil
}).(pulumi.StringOutput),
},
})
if err != nil {
return err
}
_, err = organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "example_dataset",
});
var testTable = new Gcp.BigQuery.Table("test", new()
{
TableId = "example_table",
DatasetId = test.DatasetId,
Schema = @"[
{
\""name\"": \""data\"",
\""type\"": \""STRING\"",
\""mode\"": \""NULLABLE\"",
\""description\"": \""The data\""
}
]
",
DeletionProtection = false,
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
BigqueryConfig = new Gcp.PubSub.Inputs.SubscriptionBigqueryConfigArgs
{
Table = Output.Tuple(testTable.Project, testTable.DatasetId, testTable.TableId).Apply(values =>
{
var project = values.Item1;
var datasetId = values.Item2;
var tableId = values.Item3;
return $"{project}.{datasetId}.{tableId}";
}),
},
});
var project = Gcp.Organizations.GetProject.Invoke();
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Table;
import com.pulumi.gcp.bigquery.TableArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionBigqueryConfigArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("example_dataset")
.build());
var testTable = new Table("testTable", TableArgs.builder()
.tableId("example_table")
.datasetId(test.datasetId())
.schema("""
[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
""")
.deletionProtection(false)
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.bigqueryConfig(SubscriptionBigqueryConfigArgs.builder()
.table(Output.tuple(testTable.project(), testTable.datasetId(), testTable.tableId()).applyValue(values -> {
var project = values.t1;
var datasetId = values.t2;
var tableId = values.t3;
return String.format("%s.%s.%s", project,datasetId,tableId);
}))
.build())
.build());
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
bigqueryConfig:
table: ${testTable.project}.${testTable.datasetId}.${testTable.tableId}
test:
type: gcp:bigquery:Dataset
properties:
datasetId: example_dataset
testTable:
type: gcp:bigquery:Table
name: test
properties:
tableId: example_table
datasetId: ${test.datasetId}
schema: |
[
{
\"name\": \"data\",
\"type\": \"STRING\",
\"mode\": \"NULLABLE\",
\"description\": \"The data\"
}
]
deletionProtection: false
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The bigqueryConfig property specifies the destination table in project.dataset.table format. Pub/Sub writes each message as a row, using the table’s schema to parse message data. The Pub/Sub service account needs BigQuery metadataViewer and dataEditor roles to write to the table.
Write messages to Cloud Storage as files
Data lakes and batch processing systems often consume events from Cloud Storage. Cloud Storage subscriptions batch messages into files with configurable size and time limits.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.storage.Bucket("example", {
name: "example-bucket",
location: "US",
uniformBucketLevelAccess: true,
});
const exampleTopic = new gcp.pubsub.Topic("example", {name: "example-topic"});
const project = gcp.organizations.getProject({});
const admin = new gcp.storage.BucketIAMMember("admin", {
bucket: example.name,
role: "roles/storage.admin",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com`),
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: exampleTopic.id,
cloudStorageConfig: {
bucket: example.name,
filenamePrefix: "pre-",
filenameSuffix: "-_44703",
filenameDatetimeFormat: "YYYY-MM-DD/hh_mm_ssZ",
maxBytes: 1000,
maxDuration: "300s",
maxMessages: 1000,
},
}, {
dependsOn: [
example,
admin,
],
});
import pulumi
import pulumi_gcp as gcp
example = gcp.storage.Bucket("example",
name="example-bucket",
location="US",
uniform_bucket_level_access=True)
example_topic = gcp.pubsub.Topic("example", name="example-topic")
project = gcp.organizations.get_project()
admin = gcp.storage.BucketIAMMember("admin",
bucket=example.name,
role="roles/storage.admin",
member=f"serviceAccount:service-{project.number}@gcp-sa-pubsub.iam.gserviceaccount.com")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example_topic.id,
cloud_storage_config={
"bucket": example.name,
"filename_prefix": "pre-",
"filename_suffix": "-_44703",
"filename_datetime_format": "YYYY-MM-DD/hh_mm_ssZ",
"max_bytes": 1000,
"max_duration": "300s",
"max_messages": 1000,
},
opts = pulumi.ResourceOptions(depends_on=[
example,
admin,
]))
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := storage.NewBucket(ctx, "example", &storage.BucketArgs{
Name: pulumi.String("example-bucket"),
Location: pulumi.String("US"),
UniformBucketLevelAccess: pulumi.Bool(true),
})
if err != nil {
return err
}
exampleTopic, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
admin, err := storage.NewBucketIAMMember(ctx, "admin", &storage.BucketIAMMemberArgs{
Bucket: example.Name,
Role: pulumi.String("roles/storage.admin"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-pubsub.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: exampleTopic.ID(),
CloudStorageConfig: &pubsub.SubscriptionCloudStorageConfigArgs{
Bucket: example.Name,
FilenamePrefix: pulumi.String("pre-"),
FilenameSuffix: pulumi.String("-_44703"),
FilenameDatetimeFormat: pulumi.String("YYYY-MM-DD/hh_mm_ssZ"),
MaxBytes: pulumi.Int(1000),
MaxDuration: pulumi.String("300s"),
MaxMessages: pulumi.Int(1000),
},
}, pulumi.DependsOn([]pulumi.Resource{
example,
admin,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.Storage.Bucket("example", new()
{
Name = "example-bucket",
Location = "US",
UniformBucketLevelAccess = true,
});
var exampleTopic = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var project = Gcp.Organizations.GetProject.Invoke();
var admin = new Gcp.Storage.BucketIAMMember("admin", new()
{
Bucket = example.Name,
Role = "roles/storage.admin",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-pubsub.iam.gserviceaccount.com",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = exampleTopic.Id,
CloudStorageConfig = new Gcp.PubSub.Inputs.SubscriptionCloudStorageConfigArgs
{
Bucket = example.Name,
FilenamePrefix = "pre-",
FilenameSuffix = "-_44703",
FilenameDatetimeFormat = "YYYY-MM-DD/hh_mm_ssZ",
MaxBytes = 1000,
MaxDuration = "300s",
MaxMessages = 1000,
},
}, new CustomResourceOptions
{
DependsOn =
{
example,
admin,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionCloudStorageConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Bucket("example", BucketArgs.builder()
.name("example-bucket")
.location("US")
.uniformBucketLevelAccess(true)
.build());
var exampleTopic = new Topic("exampleTopic", TopicArgs.builder()
.name("example-topic")
.build());
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var admin = new BucketIAMMember("admin", BucketIAMMemberArgs.builder()
.bucket(example.name())
.role("roles/storage.admin")
.member(String.format("serviceAccount:service-%s@gcp-sa-pubsub.iam.gserviceaccount.com", project.number()))
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(exampleTopic.id())
.cloudStorageConfig(SubscriptionCloudStorageConfigArgs.builder()
.bucket(example.name())
.filenamePrefix("pre-")
.filenameSuffix("-_44703")
.filenameDatetimeFormat("YYYY-MM-DD/hh_mm_ssZ")
.maxBytes(1000)
.maxDuration("300s")
.maxMessages(1000)
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(
example,
admin)
.build());
}
}
resources:
example:
type: gcp:storage:Bucket
properties:
name: example-bucket
location: US
uniformBucketLevelAccess: true
exampleTopic:
type: gcp:pubsub:Topic
name: example
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${exampleTopic.id}
cloudStorageConfig:
bucket: ${example.name}
filenamePrefix: pre-
filenameSuffix: -_44703
filenameDatetimeFormat: YYYY-MM-DD/hh_mm_ssZ
maxBytes: 1000
maxDuration: 300s
maxMessages: 1000
options:
dependsOn:
- ${example}
- ${admin}
admin:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${example.name}
role: roles/storage.admin
member: serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
Messages accumulate until reaching maxBytes (1000 bytes), maxDuration (300 seconds), or maxMessages (1000 messages), then Pub/Sub writes a file to the bucket. The filenamePrefix and filenameSuffix customize file naming, and filenameDatetimeFormat adds timestamps. The Pub/Sub service account needs storage.admin role on the bucket.
Transform messages with JavaScript functions
Applications sometimes need to modify message content before delivery, such as redacting sensitive fields or enriching data. Message transforms apply JavaScript functions to each message.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
name: "example-subscription",
topic: example.id,
messageTransforms: [{
javascriptUdf: {
functionName: "isYearEven",
code: `function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
`,
},
}],
});
import pulumi
import pulumi_gcp as gcp
example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
name="example-subscription",
topic=example.id,
message_transforms=[{
"javascript_udf": {
"function_name": "isYearEven",
"code": """function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
""",
},
}])
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
Name: pulumi.String("example-topic"),
})
if err != nil {
return err
}
_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
Name: pulumi.String("example-subscription"),
Topic: example.ID(),
MessageTransforms: pubsub.SubscriptionMessageTransformArray{
&pubsub.SubscriptionMessageTransformArgs{
JavascriptUdf: &pubsub.SubscriptionMessageTransformJavascriptUdfArgs{
FunctionName: pulumi.String("isYearEven"),
Code: pulumi.String("function isYearEven(message, metadata) {\n const data = JSON.parse(message.data);\n return message.year %2 === 0;\n}\n"),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var example = new Gcp.PubSub.Topic("example", new()
{
Name = "example-topic",
});
var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
{
Name = "example-subscription",
Topic = example.Id,
MessageTransforms = new[]
{
new Gcp.PubSub.Inputs.SubscriptionMessageTransformArgs
{
JavascriptUdf = new Gcp.PubSub.Inputs.SubscriptionMessageTransformJavascriptUdfArgs
{
FunctionName = "isYearEven",
Code = @"function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
",
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformJavascriptUdfArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Topic("example", TopicArgs.builder()
.name("example-topic")
.build());
var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
.name("example-subscription")
.topic(example.id())
.messageTransforms(SubscriptionMessageTransformArgs.builder()
.javascriptUdf(SubscriptionMessageTransformJavascriptUdfArgs.builder()
.functionName("isYearEven")
.code("""
function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
""")
.build())
.build())
.build());
}
}
resources:
example:
type: gcp:pubsub:Topic
properties:
name: example-topic
exampleSubscription:
type: gcp:pubsub:Subscription
name: example
properties:
name: example-subscription
topic: ${example.id}
messageTransforms:
- javascriptUdf:
functionName: isYearEven
code: |
function isYearEven(message, metadata) {
const data = JSON.parse(message.data);
return message.year %2 === 0;
}
The messageTransforms array defines JavaScript functions that process messages before delivery. Each function receives the message and metadata, and can modify or filter the message. Functions run in order; returning null filters out the message. The code property contains the JavaScript implementation.
Beyond these examples
These snippets focus on specific subscription-level features: push and pull delivery modes, message filtering and transformation, BigQuery and Cloud Storage integration, and dead letter handling. They’re intentionally minimal rather than full event processing pipelines.
The examples reference pre-existing infrastructure such as Pub/Sub topics, HTTP endpoints for push subscriptions, BigQuery datasets and tables, Cloud Storage buckets, and IAM service accounts and permissions. They focus on configuring the subscription rather than provisioning everything around it.
To keep things focused, common subscription patterns are omitted, including:
- Exactly-once delivery guarantees (enableExactlyOnceDelivery)
- Message ordering (enableMessageOrdering)
- Avro serialization for Cloud Storage (avroConfig)
- Custom service accounts for BigQuery and Cloud Storage writes
- Resource manager tags (tags property)
These omissions are intentional: the goal is to illustrate how each subscription feature is wired, not provide drop-in messaging modules. See the Pub/Sub Subscription resource reference for all available configuration options.
Let's create Google Cloud Pub/Sub Subscriptions
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Delivery Methods & Configuration
pushConfig, bigQueryConfig, or cloudStorageConfig can be set. If all three are empty, the subscription uses pull delivery where subscribers call API methods to retrieve messages.pushConfig with a pushEndpoint URL and optional attributes. For example, set pushEndpoint to “https://example.com/push" and add custom headers via attributes.bigqueryConfig with a table reference in the format {project}.{datasetId}.{tableId}. Optionally specify useTableSchema to use the table’s schema or provide a custom serviceAccountEmail.cloudStorageConfig with the bucket name, filename patterns (filenamePrefix, filenameSuffix, filenameDatetimeFormat), and limits (maxBytes, maxDuration, maxMessages). For Avro format, configure avroConfig with writeMetadata and useTopicSchema.IAM & Permissions
roles/bigquery.metadataViewer and roles/bigquery.dataEditor roles. Grant both roles before creating the subscription, as shown in the service account example with dependsOn.service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) needs roles/storage.admin on the bucket. Grant this role before creating the subscription.service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have permission to Acknowledge() messages on the subscription.gcp.projects.ServiceIdentity resource to retrieve the service account email for IAM bindings.Message Retention & Timeouts
ackDeadlineSeconds defaults to 10 seconds (when set to 0) with a range of 10-600 seconds. messageRetentionDuration defaults to 7 days with a range of 10 minutes (600s) to 31 days (2678400s).expirationPolicy to set a ttl value. If not set, the default is 31 days. To prevent expiration, set ttl to an empty string (""). The minimum allowed ttl is 1 day.retainAckedMessages to true to keep acknowledged messages in the backlog until they fall out of the messageRetentionDuration window. This allows you to seek back in time using subscriptions.seek.Immutable Properties & Lifecycle
name, project, topic, filter, enableMessageOrdering, and tags. Modifying tags after creation triggers resource replacement.labels field is non-authoritative and only manages labels present in your configuration. Use effectiveLabels to see all labels on the resource, including those set by other clients and services.Message Filtering & Transforms
filter property with expressions like attributes.foo = "foo" AND attributes.bar = "bar". The maximum filter length is 256 bytes, and filters are immutable after creation.messageTransforms with JavaScript UDFs. Each transform requires a functionName and code. Transforms are applied in the order specified and can be individually disabled by setting disabled to true.enableExactlyOnceDelivery is true, messages won’t be resent before the ack deadline expires, and acknowledged messages won’t be resent. However, subscribers may still receive duplicates if the publisher sends the same message multiple times (these have distinct messageId values).Using a different cloud?
Explore messaging guides for other cloud providers: