Create Google Cloud Pub/Sub Subscriptions

The gcp:pubsub/subscription:Subscription resource, part of the Pulumi GCP provider, defines how messages from a Pub/Sub topic are delivered to subscribers. This guide focuses on four capabilities: push and pull delivery modes, message filtering and transformation, BigQuery and Cloud Storage integration, and dead letter handling.

Subscriptions reference existing Pub/Sub topics and may deliver to HTTP endpoints, BigQuery tables, or Cloud Storage buckets. The examples are intentionally small. Combine them with your own topics, IAM permissions, and destination infrastructure.

Push messages to an HTTP endpoint

Many event-driven architectures route Pub/Sub messages to HTTP services that process events as they arrive, eliminating the need for subscribers to poll.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    ackDeadlineSeconds: 20,
    labels: {
        foo: "bar",
    },
    pushConfig: {
        pushEndpoint: "https://example.com/push",
        attributes: {
            "x-goog-version": "v1",
        },
    },
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    ack_deadline_seconds=20,
    labels={
        "foo": "bar",
    },
    push_config={
        "push_endpoint": "https://example.com/push",
        "attributes": {
            "x-goog-version": "v1",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:               pulumi.String("example-subscription"),
			Topic:              example.ID(),
			AckDeadlineSeconds: pulumi.Int(20),
			Labels: pulumi.StringMap{
				"foo": pulumi.String("bar"),
			},
			PushConfig: &pubsub.SubscriptionPushConfigArgs{
				PushEndpoint: pulumi.String("https://example.com/push"),
				Attributes: pulumi.StringMap{
					"x-goog-version": pulumi.String("v1"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        AckDeadlineSeconds = 20,
        Labels = 
        {
            { "foo", "bar" },
        },
        PushConfig = new Gcp.PubSub.Inputs.SubscriptionPushConfigArgs
        {
            PushEndpoint = "https://example.com/push",
            Attributes = 
            {
                { "x-goog-version", "v1" },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionPushConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .ackDeadlineSeconds(20)
            .labels(Map.of("foo", "bar"))
            .pushConfig(SubscriptionPushConfigArgs.builder()
                .pushEndpoint("https://example.com/push")
                .attributes(Map.of("x-goog-version", "v1"))
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      ackDeadlineSeconds: 20
      labels:
        foo: bar
      pushConfig:
        pushEndpoint: https://example.com/push
        attributes:
          x-goog-version: v1

When a message arrives, Pub/Sub sends an HTTP POST request to the pushEndpoint. The attributes map passes custom headers (like API version) to your endpoint. The ackDeadlineSeconds property sets how long Pub/Sub waits for an HTTP 200 response before considering the delivery failed and retrying.

Configure pull-based message consumption

Applications that need fine-grained control over message processing use pull subscriptions, where the subscriber explicitly requests messages and controls acknowledgment timing.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    labels: {
        foo: "bar",
    },
    messageRetentionDuration: "1200s",
    retainAckedMessages: true,
    ackDeadlineSeconds: 20,
    expirationPolicy: {
        ttl: "300000.5s",
    },
    retryPolicy: {
        minimumBackoff: "10s",
    },
    enableMessageOrdering: false,
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    labels={
        "foo": "bar",
    },
    message_retention_duration="1200s",
    retain_acked_messages=True,
    ack_deadline_seconds=20,
    expiration_policy={
        "ttl": "300000.5s",
    },
    retry_policy={
        "minimum_backoff": "10s",
    },
    enable_message_ordering=False)
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			Labels: pulumi.StringMap{
				"foo": pulumi.String("bar"),
			},
			MessageRetentionDuration: pulumi.String("1200s"),
			RetainAckedMessages:      pulumi.Bool(true),
			AckDeadlineSeconds:       pulumi.Int(20),
			ExpirationPolicy: &pubsub.SubscriptionExpirationPolicyArgs{
				Ttl: pulumi.String("300000.5s"),
			},
			RetryPolicy: &pubsub.SubscriptionRetryPolicyArgs{
				MinimumBackoff: pulumi.String("10s"),
			},
			EnableMessageOrdering: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        Labels = 
        {
            { "foo", "bar" },
        },
        MessageRetentionDuration = "1200s",
        RetainAckedMessages = true,
        AckDeadlineSeconds = 20,
        ExpirationPolicy = new Gcp.PubSub.Inputs.SubscriptionExpirationPolicyArgs
        {
            Ttl = "300000.5s",
        },
        RetryPolicy = new Gcp.PubSub.Inputs.SubscriptionRetryPolicyArgs
        {
            MinimumBackoff = "10s",
        },
        EnableMessageOrdering = false,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionExpirationPolicyArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionRetryPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .labels(Map.of("foo", "bar"))
            .messageRetentionDuration("1200s")
            .retainAckedMessages(true)
            .ackDeadlineSeconds(20)
            .expirationPolicy(SubscriptionExpirationPolicyArgs.builder()
                .ttl("300000.5s")
                .build())
            .retryPolicy(SubscriptionRetryPolicyArgs.builder()
                .minimumBackoff("10s")
                .build())
            .enableMessageOrdering(false)
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      labels:
        foo: bar
      messageRetentionDuration: 1200s
      retainAckedMessages: true
      ackDeadlineSeconds: 20
      expirationPolicy:
        ttl: 300000.5s
      retryPolicy:
        minimumBackoff: 10s
      enableMessageOrdering: false

Pull subscriptions store messages in a backlog until your application retrieves them. The messageRetentionDuration controls how long unacknowledged messages remain available (up to 31 days). Setting retainAckedMessages to true keeps acknowledged messages in the backlog for the same duration, enabling time-travel queries. The expirationPolicy automatically deletes inactive subscriptions; setting ttl to an empty string disables expiration. The retryPolicy configures backoff timing when delivery fails.

Filter messages by attribute values

When a topic carries multiple message types, subscriptions can filter by attributes to receive only relevant messages. Pub/Sub automatically acknowledges filtered-out messages.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    labels: {
        foo: "bar",
    },
    filter: `    attributes.foo = \\"foo\\"
    AND attributes.bar = \\"bar\\"
`,
    ackDeadlineSeconds: 20,
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    labels={
        "foo": "bar",
    },
    filter="""    attributes.foo = \"foo\"
    AND attributes.bar = \"bar\"
""",
    ack_deadline_seconds=20)
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			Labels: pulumi.StringMap{
				"foo": pulumi.String("bar"),
			},
			Filter:             pulumi.String("    attributes.foo = \\\"foo\\\"\n    AND attributes.bar = \\\"bar\\\"\n"),
			AckDeadlineSeconds: pulumi.Int(20),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        Labels = 
        {
            { "foo", "bar" },
        },
        Filter = @"    attributes.foo = \""foo\""
    AND attributes.bar = \""bar\""
",
        AckDeadlineSeconds = 20,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .labels(Map.of("foo", "bar"))
            .filter("""
    attributes.foo = \"foo\"
    AND attributes.bar = \"bar\"
            """)
            .ackDeadlineSeconds(20)
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      labels:
        foo: bar
      filter: |2
            attributes.foo = \"foo\"
            AND attributes.bar = \"bar\"
      ackDeadlineSeconds: 20

The filter property uses a SQL-like syntax to match message attributes. Only messages where both attributes.foo = "foo" AND attributes.bar = "bar" are delivered to this subscription. Filters are immutable after creation and have a 256-byte limit.

Route undeliverable messages to a dead letter topic

Messages that repeatedly fail processing can block progress. Dead letter policies automatically move problematic messages to a separate topic after a configured number of delivery attempts.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleDeadLetter = new gcp.pubsub.Topic("example_dead_letter", {name: "example-topic-dead-letter"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    deadLetterPolicy: {
        deadLetterTopic: exampleDeadLetter.id,
        maxDeliveryAttempts: 10,
    },
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_dead_letter = gcp.pubsub.Topic("example_dead_letter", name="example-topic-dead-letter")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    dead_letter_policy={
        "dead_letter_topic": example_dead_letter.id,
        "max_delivery_attempts": 10,
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		exampleDeadLetter, err := pubsub.NewTopic(ctx, "example_dead_letter", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic-dead-letter"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			DeadLetterPolicy: &pubsub.SubscriptionDeadLetterPolicyArgs{
				DeadLetterTopic:     exampleDeadLetter.ID(),
				MaxDeliveryAttempts: pulumi.Int(10),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleDeadLetter = new Gcp.PubSub.Topic("example_dead_letter", new()
    {
        Name = "example-topic-dead-letter",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        DeadLetterPolicy = new Gcp.PubSub.Inputs.SubscriptionDeadLetterPolicyArgs
        {
            DeadLetterTopic = exampleDeadLetter.Id,
            MaxDeliveryAttempts = 10,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionDeadLetterPolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleDeadLetter = new Topic("exampleDeadLetter", TopicArgs.builder()
            .name("example-topic-dead-letter")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .deadLetterPolicy(SubscriptionDeadLetterPolicyArgs.builder()
                .deadLetterTopic(exampleDeadLetter.id())
                .maxDeliveryAttempts(10)
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleDeadLetter:
    type: gcp:pubsub:Topic
    name: example_dead_letter
    properties:
      name: example-topic-dead-letter
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      deadLetterPolicy:
        deadLetterTopic: ${exampleDeadLetter.id}
        maxDeliveryAttempts: 10

When a message fails delivery maxDeliveryAttempts times (here, 10), Pub/Sub moves it to the deadLetterTopic instead of retrying indefinitely. The Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) needs permission to acknowledge messages on both the main subscription and the dead letter topic.

Stream messages directly into BigQuery tables

Analytics pipelines often load streaming data into BigQuery for querying. BigQuery subscriptions write messages directly to tables without intermediate processing.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const test = new gcp.bigquery.Dataset("test", {datasetId: "example_dataset"});
const testTable = new gcp.bigquery.Table("test", {
    tableId: "example_table",
    datasetId: test.datasetId,
    schema: `[
  {
    \\"name\\": \\"data\\",
    \\"type\\": \\"STRING\\",
    \\"mode\\": \\"NULLABLE\\",
    \\"description\\": \\"The data\\"
  }
]
`,
    deletionProtection: false,
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    bigqueryConfig: {
        table: pulumi.interpolate`${testTable.project}.${testTable.datasetId}.${testTable.tableId}`,
    },
});
const project = gcp.organizations.getProject({});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
test = gcp.bigquery.Dataset("test", dataset_id="example_dataset")
test_table = gcp.bigquery.Table("test",
    table_id="example_table",
    dataset_id=test.dataset_id,
    schema="""[
  {
    \"name\": \"data\",
    \"type\": \"STRING\",
    \"mode\": \"NULLABLE\",
    \"description\": \"The data\"
  }
]
""",
    deletion_protection=False)
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    bigquery_config={
        "table": pulumi.Output.all(
            project=test_table.project,
            dataset_id=test_table.dataset_id,
            table_id=test_table.table_id
).apply(lambda resolved_outputs: f"{resolved_outputs['project']}.{resolved_outputs['dataset_id']}.{resolved_outputs['table_id']}")
,
    })
project = gcp.organizations.get_project()
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("example_dataset"),
		})
		if err != nil {
			return err
		}
		testTable, err := bigquery.NewTable(ctx, "test", &bigquery.TableArgs{
			TableId:   pulumi.String("example_table"),
			DatasetId: test.DatasetId,
			Schema: pulumi.String(`[
  {
    \"name\": \"data\",
    \"type\": \"STRING\",
    \"mode\": \"NULLABLE\",
    \"description\": \"The data\"
  }
]
`),
			DeletionProtection: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			BigqueryConfig: &pubsub.SubscriptionBigqueryConfigArgs{
				Table: pulumi.All(testTable.Project, testTable.DatasetId, testTable.TableId).ApplyT(func(_args []interface{}) (string, error) {
					project := _args[0].(string)
					datasetId := _args[1].(string)
					tableId := _args[2].(string)
					return fmt.Sprintf("%v.%v.%v", project, datasetId, tableId), nil
				}).(pulumi.StringOutput),
			},
		})
		if err != nil {
			return err
		}
		_, err = organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "example_dataset",
    });

    var testTable = new Gcp.BigQuery.Table("test", new()
    {
        TableId = "example_table",
        DatasetId = test.DatasetId,
        Schema = @"[
  {
    \""name\"": \""data\"",
    \""type\"": \""STRING\"",
    \""mode\"": \""NULLABLE\"",
    \""description\"": \""The data\""
  }
]
",
        DeletionProtection = false,
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        BigqueryConfig = new Gcp.PubSub.Inputs.SubscriptionBigqueryConfigArgs
        {
            Table = Output.Tuple(testTable.Project, testTable.DatasetId, testTable.TableId).Apply(values =>
            {
                var project = values.Item1;
                var datasetId = values.Item2;
                var tableId = values.Item3;
                return $"{project}.{datasetId}.{tableId}";
            }),
        },
    });

    var project = Gcp.Organizations.GetProject.Invoke();

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Table;
import com.pulumi.gcp.bigquery.TableArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionBigqueryConfigArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("example_dataset")
            .build());

        var testTable = new Table("testTable", TableArgs.builder()
            .tableId("example_table")
            .datasetId(test.datasetId())
            .schema("""
[
  {
    \"name\": \"data\",
    \"type\": \"STRING\",
    \"mode\": \"NULLABLE\",
    \"description\": \"The data\"
  }
]
            """)
            .deletionProtection(false)
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .bigqueryConfig(SubscriptionBigqueryConfigArgs.builder()
                .table(Output.tuple(testTable.project(), testTable.datasetId(), testTable.tableId()).applyValue(values -> {
                    var project = values.t1;
                    var datasetId = values.t2;
                    var tableId = values.t3;
                    return String.format("%s.%s.%s", project,datasetId,tableId);
                }))
                .build())
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      bigqueryConfig:
        table: ${testTable.project}.${testTable.datasetId}.${testTable.tableId}
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: example_dataset
  testTable:
    type: gcp:bigquery:Table
    name: test
    properties:
      tableId: example_table
      datasetId: ${test.datasetId}
      schema: |
        [
          {
            \"name\": \"data\",
            \"type\": \"STRING\",
            \"mode\": \"NULLABLE\",
            \"description\": \"The data\"
          }
        ]        
      deletionProtection: false
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The bigqueryConfig property specifies the destination table in project.dataset.table format. Pub/Sub writes each message as a row, using the table’s schema to parse message data. The Pub/Sub service account needs BigQuery metadataViewer and dataEditor roles to write to the table.

Write messages to Cloud Storage as files

Data lakes and batch processing systems often consume events from Cloud Storage. Cloud Storage subscriptions batch messages into files with configurable size and time limits.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.storage.Bucket("example", {
    name: "example-bucket",
    location: "US",
    uniformBucketLevelAccess: true,
});
const exampleTopic = new gcp.pubsub.Topic("example", {name: "example-topic"});
const project = gcp.organizations.getProject({});
const admin = new gcp.storage.BucketIAMMember("admin", {
    bucket: example.name,
    role: "roles/storage.admin",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com`),
});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: exampleTopic.id,
    cloudStorageConfig: {
        bucket: example.name,
        filenamePrefix: "pre-",
        filenameSuffix: "-_44703",
        filenameDatetimeFormat: "YYYY-MM-DD/hh_mm_ssZ",
        maxBytes: 1000,
        maxDuration: "300s",
        maxMessages: 1000,
    },
}, {
    dependsOn: [
        example,
        admin,
    ],
});
import pulumi
import pulumi_gcp as gcp

example = gcp.storage.Bucket("example",
    name="example-bucket",
    location="US",
    uniform_bucket_level_access=True)
example_topic = gcp.pubsub.Topic("example", name="example-topic")
project = gcp.organizations.get_project()
admin = gcp.storage.BucketIAMMember("admin",
    bucket=example.name,
    role="roles/storage.admin",
    member=f"serviceAccount:service-{project.number}@gcp-sa-pubsub.iam.gserviceaccount.com")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example_topic.id,
    cloud_storage_config={
        "bucket": example.name,
        "filename_prefix": "pre-",
        "filename_suffix": "-_44703",
        "filename_datetime_format": "YYYY-MM-DD/hh_mm_ssZ",
        "max_bytes": 1000,
        "max_duration": "300s",
        "max_messages": 1000,
    },
    opts = pulumi.ResourceOptions(depends_on=[
            example,
            admin,
        ]))
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := storage.NewBucket(ctx, "example", &storage.BucketArgs{
			Name:                     pulumi.String("example-bucket"),
			Location:                 pulumi.String("US"),
			UniformBucketLevelAccess: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		exampleTopic, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		admin, err := storage.NewBucketIAMMember(ctx, "admin", &storage.BucketIAMMemberArgs{
			Bucket: example.Name,
			Role:   pulumi.String("roles/storage.admin"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-pubsub.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: exampleTopic.ID(),
			CloudStorageConfig: &pubsub.SubscriptionCloudStorageConfigArgs{
				Bucket:                 example.Name,
				FilenamePrefix:         pulumi.String("pre-"),
				FilenameSuffix:         pulumi.String("-_44703"),
				FilenameDatetimeFormat: pulumi.String("YYYY-MM-DD/hh_mm_ssZ"),
				MaxBytes:               pulumi.Int(1000),
				MaxDuration:            pulumi.String("300s"),
				MaxMessages:            pulumi.Int(1000),
			},
		}, pulumi.DependsOn([]pulumi.Resource{
			example,
			admin,
		}))
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.Storage.Bucket("example", new()
    {
        Name = "example-bucket",
        Location = "US",
        UniformBucketLevelAccess = true,
    });

    var exampleTopic = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var admin = new Gcp.Storage.BucketIAMMember("admin", new()
    {
        Bucket = example.Name,
        Role = "roles/storage.admin",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-pubsub.iam.gserviceaccount.com",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = exampleTopic.Id,
        CloudStorageConfig = new Gcp.PubSub.Inputs.SubscriptionCloudStorageConfigArgs
        {
            Bucket = example.Name,
            FilenamePrefix = "pre-",
            FilenameSuffix = "-_44703",
            FilenameDatetimeFormat = "YYYY-MM-DD/hh_mm_ssZ",
            MaxBytes = 1000,
            MaxDuration = "300s",
            MaxMessages = 1000,
        },
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            example,
            admin,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionCloudStorageConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Bucket("example", BucketArgs.builder()
            .name("example-bucket")
            .location("US")
            .uniformBucketLevelAccess(true)
            .build());

        var exampleTopic = new Topic("exampleTopic", TopicArgs.builder()
            .name("example-topic")
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var admin = new BucketIAMMember("admin", BucketIAMMemberArgs.builder()
            .bucket(example.name())
            .role("roles/storage.admin")
            .member(String.format("serviceAccount:service-%s@gcp-sa-pubsub.iam.gserviceaccount.com", project.number()))
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(exampleTopic.id())
            .cloudStorageConfig(SubscriptionCloudStorageConfigArgs.builder()
                .bucket(example.name())
                .filenamePrefix("pre-")
                .filenameSuffix("-_44703")
                .filenameDatetimeFormat("YYYY-MM-DD/hh_mm_ssZ")
                .maxBytes(1000)
                .maxDuration("300s")
                .maxMessages(1000)
                .build())
            .build(), CustomResourceOptions.builder()
                .dependsOn(                
                    example,
                    admin)
                .build());

    }
}
resources:
  example:
    type: gcp:storage:Bucket
    properties:
      name: example-bucket
      location: US
      uniformBucketLevelAccess: true
  exampleTopic:
    type: gcp:pubsub:Topic
    name: example
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${exampleTopic.id}
      cloudStorageConfig:
        bucket: ${example.name}
        filenamePrefix: pre-
        filenameSuffix: -_44703
        filenameDatetimeFormat: YYYY-MM-DD/hh_mm_ssZ
        maxBytes: 1000
        maxDuration: 300s
        maxMessages: 1000
    options:
      dependsOn:
        - ${example}
        - ${admin}
  admin:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${example.name}
      role: roles/storage.admin
      member: serviceAccount:service-${project.number}@gcp-sa-pubsub.iam.gserviceaccount.com
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

Messages accumulate until reaching maxBytes (1000 bytes), maxDuration (300 seconds), or maxMessages (1000 messages), then Pub/Sub writes a file to the bucket. The filenamePrefix and filenameSuffix customize file naming, and filenameDatetimeFormat adds timestamps. The Pub/Sub service account needs storage.admin role on the bucket.

Transform messages with JavaScript functions

Applications sometimes need to modify message content before delivery, such as redacting sensitive fields or enriching data. Message transforms apply JavaScript functions to each message.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const example = new gcp.pubsub.Topic("example", {name: "example-topic"});
const exampleSubscription = new gcp.pubsub.Subscription("example", {
    name: "example-subscription",
    topic: example.id,
    messageTransforms: [{
        javascriptUdf: {
            functionName: "isYearEven",
            code: `function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
`,
        },
    }],
});
import pulumi
import pulumi_gcp as gcp

example = gcp.pubsub.Topic("example", name="example-topic")
example_subscription = gcp.pubsub.Subscription("example",
    name="example-subscription",
    topic=example.id,
    message_transforms=[{
        "javascript_udf": {
            "function_name": "isYearEven",
            "code": """function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
""",
        },
    }])
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := pubsub.NewTopic(ctx, "example", &pubsub.TopicArgs{
			Name: pulumi.String("example-topic"),
		})
		if err != nil {
			return err
		}
		_, err = pubsub.NewSubscription(ctx, "example", &pubsub.SubscriptionArgs{
			Name:  pulumi.String("example-subscription"),
			Topic: example.ID(),
			MessageTransforms: pubsub.SubscriptionMessageTransformArray{
				&pubsub.SubscriptionMessageTransformArgs{
					JavascriptUdf: &pubsub.SubscriptionMessageTransformJavascriptUdfArgs{
						FunctionName: pulumi.String("isYearEven"),
						Code:         pulumi.String("function isYearEven(message, metadata) {\n  const data = JSON.parse(message.data);\n  return message.year %2 === 0;\n}\n"),
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var example = new Gcp.PubSub.Topic("example", new()
    {
        Name = "example-topic",
    });

    var exampleSubscription = new Gcp.PubSub.Subscription("example", new()
    {
        Name = "example-subscription",
        Topic = example.Id,
        MessageTransforms = new[]
        {
            new Gcp.PubSub.Inputs.SubscriptionMessageTransformArgs
            {
                JavascriptUdf = new Gcp.PubSub.Inputs.SubscriptionMessageTransformJavascriptUdfArgs
                {
                    FunctionName = "isYearEven",
                    Code = @"function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
",
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.pubsub.Subscription;
import com.pulumi.gcp.pubsub.SubscriptionArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformArgs;
import com.pulumi.gcp.pubsub.inputs.SubscriptionMessageTransformJavascriptUdfArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Topic("example", TopicArgs.builder()
            .name("example-topic")
            .build());

        var exampleSubscription = new Subscription("exampleSubscription", SubscriptionArgs.builder()
            .name("example-subscription")
            .topic(example.id())
            .messageTransforms(SubscriptionMessageTransformArgs.builder()
                .javascriptUdf(SubscriptionMessageTransformJavascriptUdfArgs.builder()
                    .functionName("isYearEven")
                    .code("""
function isYearEven(message, metadata) {
  const data = JSON.parse(message.data);
  return message.year %2 === 0;
}
                    """)
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:pubsub:Topic
    properties:
      name: example-topic
  exampleSubscription:
    type: gcp:pubsub:Subscription
    name: example
    properties:
      name: example-subscription
      topic: ${example.id}
      messageTransforms:
        - javascriptUdf:
            functionName: isYearEven
            code: |
              function isYearEven(message, metadata) {
                const data = JSON.parse(message.data);
                return message.year %2 === 0;
              }              

The messageTransforms array defines JavaScript functions that process messages before delivery. Each function receives the message and metadata, and can modify or filter the message. Functions run in order; returning null filters out the message. The code property contains the JavaScript implementation.

Beyond these examples

These snippets focus on specific subscription-level features: push and pull delivery modes, message filtering and transformation, BigQuery and Cloud Storage integration, and dead letter handling. They’re intentionally minimal rather than full event processing pipelines.

The examples reference pre-existing infrastructure such as Pub/Sub topics, HTTP endpoints for push subscriptions, BigQuery datasets and tables, Cloud Storage buckets, and IAM service accounts and permissions. They focus on configuring the subscription rather than provisioning everything around it.

To keep things focused, common subscription patterns are omitted, including:

  • Exactly-once delivery guarantees (enableExactlyOnceDelivery)
  • Message ordering (enableMessageOrdering)
  • Avro serialization for Cloud Storage (avroConfig)
  • Custom service accounts for BigQuery and Cloud Storage writes
  • Resource manager tags (tags property)

These omissions are intentional: the goal is to illustrate how each subscription feature is wired, not provide drop-in messaging modules. See the Pub/Sub Subscription resource reference for all available configuration options.

Let's create Google Cloud Pub/Sub Subscriptions

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Delivery Methods & Configuration
Can I use multiple delivery methods for the same subscription?
No, only one of pushConfig, bigQueryConfig, or cloudStorageConfig can be set. If all three are empty, the subscription uses pull delivery where subscribers call API methods to retrieve messages.
How do I set up push delivery?
Configure pushConfig with a pushEndpoint URL and optional attributes. For example, set pushEndpoint to “https://example.com/push" and add custom headers via attributes.
How do I configure BigQuery delivery?
Set bigqueryConfig with a table reference in the format {project}.{datasetId}.{tableId}. Optionally specify useTableSchema to use the table’s schema or provide a custom serviceAccountEmail.
How do I configure Cloud Storage delivery?
Set cloudStorageConfig with the bucket name, filename patterns (filenamePrefix, filenameSuffix, filenameDatetimeFormat), and limits (maxBytes, maxDuration, maxMessages). For Avro format, configure avroConfig with writeMetadata and useTopicSchema.
IAM & Permissions
What IAM permissions do I need for BigQuery delivery?
The service account needs roles/bigquery.metadataViewer and roles/bigquery.dataEditor roles. Grant both roles before creating the subscription, as shown in the service account example with dependsOn.
What IAM permissions do I need for Cloud Storage delivery?
The Google Managed Pub/Sub Service Account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) needs roles/storage.admin on the bucket. Grant this role before creating the subscription.
What IAM permissions are required for dead letter queues?
The Cloud Pub/Sub service account (service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have permission to Acknowledge() messages on the subscription.
How do I get the Google Managed Pub/Sub Service Account email?
Use the gcp.projects.ServiceIdentity resource to retrieve the service account email for IAM bindings.
Message Retention & Timeouts
What are the timeout and retention limits for subscriptions?
ackDeadlineSeconds defaults to 10 seconds (when set to 0) with a range of 10-600 seconds. messageRetentionDuration defaults to 7 days with a range of 10 minutes (600s) to 31 days (2678400s).
How do I configure subscription expiration?
Use expirationPolicy to set a ttl value. If not set, the default is 31 days. To prevent expiration, set ttl to an empty string (""). The minimum allowed ttl is 1 day.
What's the difference between retaining acknowledged and unacknowledged messages?
Set retainAckedMessages to true to keep acknowledged messages in the backlog until they fall out of the messageRetentionDuration window. This allows you to seek back in time using subscriptions.seek.
Immutable Properties & Lifecycle
What properties can't I change after creating a subscription?
The following properties are immutable: name, project, topic, filter, enableMessageOrdering, and tags. Modifying tags after creation triggers resource replacement.
Why don't I see all labels in my Pulumi configuration?
The labels field is non-authoritative and only manages labels present in your configuration. Use effectiveLabels to see all labels on the resource, including those set by other clients and services.
Message Filtering & Transforms
How do I filter messages by attributes?
Use the filter property with expressions like attributes.foo = "foo" AND attributes.bar = "bar". The maximum filter length is 256 bytes, and filters are immutable after creation.
How do I apply message transforms?
Configure messageTransforms with JavaScript UDFs. Each transform requires a functionName and code. Transforms are applied in the order specified and can be individually disabled by setting disabled to true.
What does exactly-once delivery guarantee?
When enableExactlyOnceDelivery is true, messages won’t be resent before the ack deadline expires, and acknowledged messages won’t be resent. However, subscribers may still receive duplicates if the publisher sends the same message multiple times (these have distinct messageId values).

Using a different cloud?

Explore messaging guides for other cloud providers: