Create Google Cloud Pub/Sub Subscriptions

The gcp:pubsub/subscription:Subscription resource, part of the Pulumi GCP provider, defines how messages from a Pub/Sub topic are delivered to consuming applications. This guide focuses on four capabilities: pull and push delivery configuration, BigQuery and Cloud Storage streaming, message filtering and transformation, and dead letter handling.

Subscriptions reference existing topics and may integrate with BigQuery tables, Cloud Storage buckets, or HTTP endpoints. The examples are intentionally small. Combine them with your own topics, IAM roles, and destination infrastructure.

Configure pull-based message consumption with retention

Most deployments start with pull subscriptions, where applications fetch messages on demand and control their own processing rate.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    labels: {
        foo: "bar",
    },
    messageRetentionDuration: "1200s",
    retainAckedMessages: true,
    ackDeadlineSeconds: 20,
    expirationPolicy: {
        ttl: "300000.5s",
    },
    retryPolicy: {
        minimumBackoff: "10s",
    },
    enableMessageOrdering: false,
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    labels={
        "foo": "bar",
    },
    message_retention_duration="1200s",
    retain_acked_messages=True,
    ack_deadline_seconds=20,
    expiration_policy={
        "ttl": "300000.5s",
    },
    retry_policy={
        "minimum_backoff": "10s",
    },
    enable_message_ordering=False)
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			Labels: pulumi.StringMap{
				"foo": pulumi.String("bar"),
			},
			MessageRetentionDuration: pulumi.String("1200s"),
			RetainAckedMessages:      pulumi.Bool(true),
			AckDeadlineSeconds:       pulumi.Int(20),
			ExpirationPolicy: &pubsub.SubscriptionExpirationPolicyArgs{
				Ttl: pulumi.String("300000.5s"),
			},
			RetryPolicy: &pubsub.SubscriptionRetryPolicyArgs{
				MinimumBackoff: pulumi.String("10s"),
			},
			EnableMessageOrdering: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        Labels = 
        {
            { "foo", "bar" },
        },
        MessageRetentionDuration = "1200s",
        RetainAckedMessages = true,
        AckDeadlineSeconds = 20,
        ExpirationPolicy = new Gcp.PubSub.Inputs.SubscriptionExpirationPolicyArgs
        {
            Ttl = "300000.5s",
        },
        RetryPolicy = new Gcp.PubSub.Inputs.SubscriptionRetryPolicyArgs
        {
            MinimumBackoff = "10s",
        },
        EnableMessageOrdering = false,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionExpirationPolicyArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionRetryPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .labels(Map.of("foo", "bar"))
            .messageRetentionDuration("1200s")
            .retainAckedMessages(true)
            .ackDeadlineSeconds(20)
            .expirationPolicy(SubscriptionExpirationPolicyArgs.builder()
                .ttl("300000.5s")
                .build())
            .retryPolicy(SubscriptionRetryPolicyArgs.builder()
                .minimumBackoff("10s")
                .build())
            .enableMessageOrdering(false)
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      labels:
        foo: bar
      messageRetentionDuration: 1200s
      retainAckedMessages: true
      ackDeadlineSeconds: 20
      expirationPolicy:
        ttl: 300000.5s
      retryPolicy:
        minimumBackoff: 10s
      enableMessageOrdering: false

The topic property references the source topic. The messageRetentionDuration controls how long unacknowledged messages remain available (defaults to 7 days). When retainAckedMessages is true, acknowledged messages also stay in the backlog for the retention window, enabling replay via subscriptions.seek. The ackDeadlineSeconds sets how long Pub/Sub waits for acknowledgment before redelivering (10-600 seconds). The expirationPolicy determines when inactive subscriptions are deleted; setting ttl to empty string prevents expiration. The retryPolicy configures backoff behavior for failed deliveries.

Push messages to an HTTP endpoint

Applications that expose HTTP endpoints can receive messages via push delivery, where Pub/Sub sends each message as an HTTP POST request.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    ackDeadlineSeconds: 20,
    labels: {
        foo: "bar",
    },
    pushConfig: {
        pushEndpoint: "https://example.com/push",
        attributes: {
            "x-goog-version": "v1",
        },
    },
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    ack_deadline_seconds=20,
    labels={
        "foo": "bar",
    },
    push_config={
        "push_endpoint": "https://example.com/push",
        "attributes": {
            "x-goog-version": "v1",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:               pulumi.String("example-subscription"),
			Topic:              example.ID(),
			AckDeadlineSeconds: pulumi.Int(20),
			Labels: pulumi.StringMap{
				"foo": pulumi.String("bar"),
			},
			PushConfig: &pubsub.SubscriptionPushConfigArgs{
				PushEndpoint: pulumi.String("https://example.com/push"),
				Attributes: pulumi.StringMap{
					"x-goog-version": pulumi.String("v1"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        AckDeadlineSeconds = 20,
        Labels = 
        {
            { "foo", "bar" },
        },
        PushConfig = new Gcp.PubSub.Inputs.SubscriptionPushConfigArgs
        {
            PushEndpoint = "https://example.com/push",
            Attributes = 
            {
                { "x-goog-version", "v1" },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionPushConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .ackDeadlineSeconds(20)
            .labels(Map.of("foo", "bar"))
            .pushConfig(SubscriptionPushConfigArgs.builder()
                .pushEndpoint("https://example.com/push")
                .attributes(Map.of("x-goog-version", "v1"))
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      ackDeadlineSeconds: 20
      labels:
        foo: bar
      pushConfig:
        pushEndpoint: https://example.com/push
        attributes:
          x-goog-version: v1

The pushConfig block defines the HTTP endpoint and request attributes. The pushEndpoint must be an HTTPS URL that accepts POST requests. The attributes map adds custom headers to each request (e.g., “x-goog-version” for API versioning). The ackDeadlineSeconds also controls the HTTP request timeout for push delivery.

Filter messages by attributes before delivery

When topics carry multiple message types, subscriptions can filter by message attributes to receive only relevant messages.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    labels: {
        foo: "bar",
    },
    filter: `    attributes.foo = \\"foo\\"
    AND attributes.bar = \\"bar\\"
`,
    ackDeadlineSeconds: 20,
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    labels={
        "foo": "bar",
    },
    filter="""    attributes.foo = \"foo\"
    AND attributes.bar = \"bar\"
""",
    ack_deadline_seconds=20)
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			Labels: pulumi.StringMap{
				"foo": pulumi.String("bar"),
			},
			Filter:             pulumi.String("    attributes.foo = \\\"foo\\\"\n    AND attributes.bar = \\\"bar\\\"\n"),
			AckDeadlineSeconds: pulumi.Int(20),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        Labels = 
        {
            { "foo", "bar" },
        },
        Filter = @"    attributes.foo = \""foo\""
    AND attributes.bar = \""bar\""
",
        AckDeadlineSeconds = 20,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .labels(Map.of("foo", "bar"))
            .filter("""
    attributes.foo = \"foo\"
    AND attributes.bar = \"bar\"
            """)
            .ackDeadlineSeconds(20)
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      labels:
        foo: bar
      filter: |2
            attributes.foo = \"foo\"
            AND attributes.bar = \"bar\"
      ackDeadlineSeconds: 20

The filter property uses a SQL-like syntax to match message attributes. Pub/Sub automatically acknowledges messages that don’t match, so they never reach your application. Filters are immutable after creation and limited to 256 bytes. This extends the basic pull subscription pattern by adding attribute-based filtering at the subscription level.

Route unprocessable messages to a dead letter topic

Messages that repeatedly fail processing can be routed to a separate topic after a configured number of delivery attempts.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleDeadLetter = new gcp.pubsub.Topic("example_dead_letter", {name: "example-topic-dead-letter"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    deadLetterPolicy: {
        deadLetterTopic: exampleDeadLetter.id,
        maxDeliveryAttempts: 10,
    },
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_dead_letter = gcp.pubsub.Topic("example_dead_letter", name="example-topic-dead-letter")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    dead_letter_policy={
        "dead_letter_topic": example_dead_letter.id,
        "max_delivery_attempts": 10,
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		exampleDeadLetter, err := pubsub.NewTopic(ctx, "example_dead_letter", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic-dead-letter"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			DeadLetterPolicy: &pubsub.SubscriptionDeadLetterPolicyArgs{
				DeadLetterTopic:     exampleDeadLetter.ID(),
				MaxDeliveryAttempts: pulumi.Int(10),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleDeadLetter = new Gcp.PubSub.Topic("example_dead_letter", new()
    {
        Name = "example-topic-dead-letter",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        DeadLetterPolicy = new Gcp.PubSub.Inputs.SubscriptionDeadLetterPolicyArgs
        {
            DeadLetterTopic = exampleDeadLetter.Id,
            MaxDeliveryAttempts = 10,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionDeadLetterPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleDeadLetter = new Topic("exampleDeadLetter", TopicArgs.builder()
            .name("example-topic-dead-letter")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .deadLetterPolicy(SubscriptionDeadLetterPolicyArgs.builder()
                .deadLetterTopic(exampleDeadLetter.id())
                .maxDeliveryAttempts(10)
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleDeadLetter:
    type: gcp:pubsub:Topic
    name: example_dead_letter
    properties:
      name: example-topic-dead-letter
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      deadLetterPolicy:
        deadLetterTopic: ${exampleDeadLetter.id}
        maxDeliveryAttempts: 10

The deadLetterPolicy sends messages to a separate topic after maxDeliveryAttempts failures. The deadLetterTopic must exist before creating the subscription. The Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) needs permission to acknowledge messages on both the main subscription and the dead letter topic.

Stream messages directly into BigQuery tables

Analytics pipelines can bypass application code by streaming Pub/Sub messages directly into BigQuery tables.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const test = new gcp.bigquery.Dataset("test", {datasetId: "example_dataset"});
const testTable = new gcp.bigquery.Table("test", {
    tableId: "example_table",
    datasetId: test.datasetId,
    schema: `[
  {
    \\"name\\": \\"data\\",
    \\"type\\": \\"STRING\\",
    \\"mode\\": \\"NULLABLE\\",
    \\"description\\": \\"The data\\"
  }
]
`,
    deletionProtection: false,
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    bigqueryConfig: {
        table: pulumi.interpolate`${testTable.project}.${testTable.datasetId}.${testTable.tableId}`,
    },
});
const project = gcp.organizations.getProject({});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
test = gcp.bigquery.Dataset("test", dataset_id="example_dataset")
test_table = gcp.bigquery.Table("test",
    table_id="example_table",
    dataset_id=test.dataset_id,
    schema="""[
  {
    \"name\": \"data\",
    \"type\": \"STRING\",
    \"mode\": \"NULLABLE\",
    \"description\": \"The data\"
  }
]
""",
    deletion_protection=False)
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    bigquery_config={
        "table": pulumi.Output.all(
            project=test_table.project,
            dataset_id=test_table.dataset_id,
            table_id=test_table.table_id
).apply(lambda resolved_outputs: f"{resolved_outputs['project']}.{resolved_outputs['dataset_id']}.{resolved_outputs['table_id']}")
,
    })
project = gcp.organizations.get_project()
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("example_dataset"),
		})
		if err != nil {
			return err
		}
		testTable, err := bigquery.NewTable(ctx, "test", &bigquery.TableArgs{
			TableId:   pulumi.String("example_table"),
			DatasetId: test.DatasetId,
			Schema: pulumi.String(`[
  {
    \"name\": \"data\",
    \"type\": \"STRING\",
    \"mode\": \"NULLABLE\",
    \"description\": \"The data\"
  }
]
`),
			DeletionProtection: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			BigqueryConfig: &pubsub.SubscriptionBigqueryConfigArgs{
				Table: pulumi.All(testTable.Project, testTable.DatasetId, testTable.TableId).ApplyT(func(_args []interface{}) (string, error) {
					project := _args[0].(string)
					datasetId := _args[1].(string)
					tableId := _args[2].(string)
					return fmt.Sprintf("%v.%v.%v", project, datasetId, tableId), nil
				}).(pulumi.StringOutput),
			},
		})
		if err != nil {
			return err
		}
		_, err = organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "example_dataset",
    });

    var testTable = new Gcp.BigQuery.Table("test", new()
    {
        TableId = "example_table",
        DatasetId = test.DatasetId,
        Schema = @"[
  {
    \""name\"": \""data\"",
    \""type\"": \""STRING\"",
    \""mode\"": \""NULLABLE\"",
    \""description\"": \""The data\""
  }
]
",
        DeletionProtection = false,
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        BigqueryConfig = new Gcp.PubSub.Inputs.SubscriptionBigqueryConfigArgs
        {
            Table = Output.Tuple(testTable.Project, testTable.DatasetId, testTable.TableId).Apply(values =>
            {
                var project = values.Item1;
                var datasetId = values.Item2;
                var tableId = values.Item3;
                return $"{project}.{datasetId}.{tableId}";
            }),
        },
    });

    var project = Gcp.Organizations.GetProject.Invoke();

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Table;
import com.pulumi.gcp.bigquery.TableArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionBigqueryConfigArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("example_dataset")
            .build());

        var testTable = new Table("testTable", TableArgs.builder()
            .tableId("example_table")
            .datasetId(test.datasetId())
            .schema("""
[
  {
    \"name\": \"data\",
    \"type\": \"STRING\",
    \"mode\": \"NULLABLE\",
    \"description\": \"The data\"
  }
]
            """)
            .deletionProtection(false)
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .bigqueryConfig(SubscriptionBigqueryConfigArgs.builder()
                .table(Output.tuple(testTable.project(), testTable.datasetId(), testTable.tableId()).applyValue(values -> {
                    var project = values.t1;
                    var datasetId = values.t2;
                    var tableId = values.t3;
                    return String.format("%s.%s.%s", project,datasetId,tableId);
                }))
                .build())
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      bigqueryConfig:
        table: ${testTable.project}.${testTable.datasetId}.${testTable.tableId}
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: example_dataset
  testTable:
    type: gcp:bigquery:Table
    name: test
    properties:
      tableId: example_table
      datasetId: ${test.datasetId}
      schema: |
        [
          {
            \"name\": \"data\",
            \"type\": \"STRING\",
            \"mode\": \"NULLABLE\",
            \"description\": \"The data\"
          }
        ]        
      deletionProtection: false
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The bigqueryConfig block routes messages to a BigQuery table specified in “project.dataset.table” format. Messages are written as they arrive, enabling real-time analytics. The BigQuery table must exist with a schema that matches your message structure. The Pub/Sub service account needs bigquery.dataEditor and bigquery.metadataViewer roles on the target project.

Archive messages to Cloud Storage buckets

Long-term message archival or data lake ingestion can route messages directly to Cloud Storage.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.storage.Bucket("example", {
    name: "example-bucket",
    location: "US",
    uniformBucketLevelAccess: true,
});
const exampleTopic = new gcp.pubsub.Topic("example", {name: "example-topic"});
const project = gcp.organizations.getProject({});
const admin = new gcp.storage.BucketIAMMember("admin", {
    bucket: example.name,
    role: "roles/storage.admin",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com`),
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: exampleTopic.id,
    cloudStorageConfig: {
        bucket: example.name,
        filenamePrefix: "pre-",
        filenameSuffix: "-_29225",
        filenameDatetimeFormat: "YYYY-MM-DD/hh_mm_ssZ",
        maxBytes: 1000,
        maxDuration: "300s",
        maxMessages: 1000,
    },
}, {
    dependsOn: [
        example,
        admin,
    ],
});
import pulumi
import pulumi_gcp as gcp

example = gcp.storage.Bucket("example",
    name="example-bucket",
    location="US",
    uniform_bucket_level_access=True)
example_topic = gcp.pubsub.Topic("example", name="example-topic")
project = gcp.organizations.get_project()
admin = gcp.storage.BucketIAMMember("admin",
    bucket=example.name,
    role="roles/storage.admin",
    member=f"serviceAccount:service-{project.number}@gcp-sa-pubsub.iam.gserviceaccount.com")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example_topic.id,
    cloud_storage_config={
        "bucket": example.name,
        "filename_prefix": "pre-",
        "filename_suffix": "-_29225",
        "filename_datetime_format": "YYYY-MM-DD/hh_mm_ssZ",
        "max_bytes": 1000,
        "max_duration": "300s",
        "max_messages": 1000,
    },
    opts = pulumi.ResourceOptions(depends_on=[
            example,
            admin,
        ]))
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := storage.NewBucket(ctx, "example", &storage.BucketArgs{
			Name:                     pulumi.String("example-bucket"),
			Location:                 pulumi.String("US"),
			UniformBucketLevelAccess: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		exampleTopic, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		admin, err := storage.NewBucketIAMMember(ctx, "admin", &storage.BucketIAMMemberArgs{
			Bucket: example.Name,
			Role:   pulumi.String("roles/storage.admin"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-pubsub.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: exampleTopic.ID(),
			CloudStorageConfig: &pubsub.SubscriptionCloudStorageConfigArgs{
				Bucket:                 example.Name,
				FilenamePrefix:         pulumi.String("pre-"),
				FilenameSuffix:         pulumi.String("-_29225"),
				FilenameDatetimeFormat: pulumi.String("YYYY-MM-DD/hh_mm_ssZ"),
				MaxBytes:               pulumi.Int(1000),
				MaxDuration:            pulumi.String("300s"),
				MaxMessages:            pulumi.Int(1000),
			},
		}, pulumi.DependsOn([]pulumi.Resource{
			example,
			admin,
		}))
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.Storage.Bucket("example", new()
    {
        Name = "example-bucket",
        Location = "US",
        UniformBucketLevelAccess = true,
    });

    var exampleTopic = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var admin = new Gcp.Storage.BucketIAMMember("admin", new()
    {
        Bucket = example.Name,
        Role = "roles/storage.admin",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-pubsub.iam.gserviceaccount.com",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = exampleTopic.Id,
        CloudStorageConfig = new Gcp.PubSub.Inputs.SubscriptionCloudStorageConfigArgs
        {
            Bucket = example.Name,
            FilenamePrefix = "pre-",
            FilenameSuffix = "-_29225",
            FilenameDatetimeFormat = "YYYY-MM-DD/hh_mm_ssZ",
            MaxBytes = 1000,
            MaxDuration = "300s",
            MaxMessages = 1000,
        },
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            example,
            admin,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionCloudStorageConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Bucket("example", BucketArgs.builder()
            .name("example-bucket")
            .location("US")
            .uniformBucketLevelAccess(true)
            .build());

        var exampleTopic = new Topic("exampleTopic", TopicArgs.builder()
            .name("example-topic")
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var admin = new BucketIAMMember("admin", BucketIAMMemberArgs.builder()
            .bucket(example.name())
            .role("roles/storage.admin")
            .member(String.format("serviceAccount:service-%s@gcp-sa-pubsub.iam.gserviceaccount.com", project.number()))
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(exampleTopic.id())
            .cloudStorageConfig(SubscriptionCloudStorageConfigArgs.builder()
                .bucket(example.name())
                .filenamePrefix("pre-")
                .filenameSuffix("-_29225")
                .filenameDatetimeFormat("YYYY-MM-DD/hh_mm_ssZ")
                .maxBytes(1000)
                .maxDuration("300s")
                .maxMessages(1000)
                .build())
            .build(), CustomResourceOptions.builder()
                .dependsOn(                
                    example,
                    admin)
                .build());

    }
}
resources:
  example:
    type: gcp:storage:Bucket
    properties:
      name: example-bucket
      location: US
      uniformBucketLevelAccess: true
  exampleTopic:
    type: gcp:pubsub:Topic
    name: example
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${exampleTopic.id}
      cloudStorageConfig:
        bucket: ${example.name}
        filenamePrefix: pre-
        filenameSuffix: -_29225
        filenameDatetimeFormat: YYYY-MM-DD/hh_mm_ssZ
        maxBytes: 1000
        maxDuration: 300s
        maxMessages: 1000
    options:
      dependsOn:
        - ${example}
        - ${admin}
  admin:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${example.name}
      role: roles/storage.admin
      member: serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The cloudStorageConfig block writes messages to a bucket with configurable file naming and batching. The filenamePrefix and filenameSuffix customize object names. The filenameDatetimeFormat organizes files by timestamp (e.g., “YYYY-MM-DD/hh_mm_ssZ”). Files are created when they reach maxBytes, maxDuration, or maxMessages thresholds. The Pub/Sub service account needs storage.admin role on the bucket.

Transform messages with JavaScript functions

Applications can apply custom transformations to messages before delivery using JavaScript user-defined functions.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    messageTransforms: [{
        javascriptUdf: {
            functionName: "isYearEven",
            code: `function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
`,
        },
    }],
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    message_transforms=[{
        "javascript_udf": {
            "function_name": "isYearEven",
            "code": """function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
""",
        },
    }])
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			MessageTransforms: pubsub.SubscriptionMessageTransformArray{
				&pubsub.SubscriptionMessageTransformArgs{
					JavascriptUdf: &pubsub.SubscriptionMessageTransformJavascriptUdfArgs{
						FunctionName: pulumi.String("isYearEven"),
						Code:         pulumi.String("function isYearEven(message, metadata) {\n  const data = JSON.parse(message.data);\n  return message.year %2 === 0;\n}\n"),
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        MessageTransforms = new[]
        {
            new Gcp.PubSub.Inputs.SubscriptionMessageTransformArgs
            {
                JavascriptUdf = new Gcp.PubSub.Inputs.SubscriptionMessageTransformJavascriptUdfArgs
                {
                    FunctionName = "isYearEven",
                    Code = @"function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
",
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformJavascriptUdfArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .messageTransforms(SubscriptionMessageTransformArgs.builder()
                .javascriptUdf(SubscriptionMessageTransformJavascriptUdfArgs.builder()
                    .functionName("isYearEven")
                    .code("""
function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
                    """)
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      messageTransforms:
        - javascriptUdf:
            functionName: isYearEven
            code: |
              function isYearEven(message, metadata) {
                const data = JSON.parse(message.data);
                return message.year %2 === 0;
              }              

The messageTransforms array defines JavaScript functions that process messages in order. Each javascriptUdf specifies a functionName and code. Functions receive the message and metadata, and can filter (return null), modify (return transformed message), or pass through unchanged. Transformations run within Pub/Sub before delivery to your application.

Beyond these examples

These snippets focus on specific subscription-level features: pull and push delivery modes, BigQuery and Cloud Storage integration, message filtering and transformation, and dead letter handling. They’re intentionally minimal rather than full messaging architectures.

The examples reference pre-existing infrastructure such as Pub/Sub topics, BigQuery datasets and tables, Cloud Storage buckets, HTTP endpoints for push delivery, and IAM permissions for service accounts. They focus on configuring the subscription rather than provisioning everything around it.

To keep things focused, common subscription patterns are omitted, including:

  • Exactly-once delivery guarantees (enableExactlyOnceDelivery)
  • Message ordering (enableMessageOrdering)
  • Retry policies and backoff configuration
  • Subscription expiration policies
  • Resource manager tags

These omissions are intentional: the goal is to illustrate how each subscription feature is wired, not provide drop-in messaging modules. See the Pub/Sub Subscription resource reference for all available configuration options.

Let's create Google Cloud Pub/Sub Subscriptions

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Delivery & Configuration
Can I use multiple delivery methods like push and BigQuery together?
No, only one of pushConfig, bigqueryConfig, or cloudStorageConfig can be set. If all three are empty, the subscriber will pull and acknowledge messages using API methods.
How do I set up BigQuery delivery with a custom service account?
Configure bigqueryConfig with the table reference and serviceAccountEmail. Grant the service account roles/bigquery.metadataViewer and roles/bigquery.dataEditor, then use dependsOn to ensure IAM bindings are created before the subscription.
How do I set up Cloud Storage delivery?
Configure cloudStorageConfig with the bucket name and optional filename settings (filenamePrefix, filenameSuffix, filenameDatetimeFormat). Grant the Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) the roles/storage.admin role on the bucket.
Message Handling
What are the limits for ackDeadlineSeconds?
The minimum is 10 seconds and the maximum is 600 seconds (10 minutes). If set to 0, a default value of 10 seconds is used.
How long are messages retained in the subscription?
By default, messages are retained for 7 days. You can configure messageRetentionDuration with a minimum of 10 minutes ("600s") and a maximum of 31 days ("2678400s").
What's the difference between enableExactlyOnceDelivery and enableMessageOrdering?
enableExactlyOnceDelivery guarantees no message redelivery before the ack deadline expires and no resending of acknowledged messages. enableMessageOrdering ensures messages with the same orderingKey are delivered in the order received by Pub/Sub. Both are independent features.
Immutability & Lifecycle
What properties can't be changed after creating the subscription?
The following properties are immutable and trigger resource replacement if changed: name, project, topic, enableMessageOrdering, filter, and tags.
Why did my subscription get deleted automatically?
Subscriptions expire after 31 days of inactivity by default. To prevent expiration, set expirationPolicy.ttl to an empty string (""). The minimum TTL if specified is 1 day.
IAM & Permissions
What permissions does the dead letter policy require?
The Cloud Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have permission to Acknowledge() messages on the subscription.
How do I retrieve the Google Managed Pub/Sub Service Account email?
Use the gcp.projects.ServiceIdentity resource to retrieve the service account email for IAM bindings.
Filtering & Transforms
What are the limitations of subscription filters?
Filters have a maximum length of 256 bytes and are immutable after creation. They filter messages by attributes, and Pub/Sub automatically acknowledges messages that don’t match.
Can I chain multiple message transforms?
Yes, use the messageTransforms array. Transforms are applied in the order specified, and individual transforms can be disabled using the disabled flag.
Why doesn't the labels field show all labels on my subscription?
The labels field is non-authoritative and only manages labels in your configuration. Use effectiveLabels to see all labels present on the resource, including those set by other clients and services.

Using a different cloud?

Explore messaging guides for other cloud providers: