Create GCP Datastream Streams

The gcp:datastream/stream:Stream resource, part of the Pulumi GCP provider, defines a Datastream replication stream that continuously copies data from a source database to a destination. This guide focuses on three capabilities: source database configuration for MySQL, PostgreSQL, and SQL Server; destination configuration for Cloud Storage and BigQuery; and filtering, backfill strategies, and BigQuery optimizations.

Streams depend on connection profiles that define source and destination endpoints, plus IAM permissions for the Datastream service account to access storage, BigQuery, and KMS resources. The examples are intentionally small. Combine them with your own connection profiles and infrastructure.

Stream MySQL to Cloud Storage with filtering and encryption

Data pipelines often replicate MySQL databases to Cloud Storage for analytics, applying table and column filters to control what gets replicated.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "my-instance",
    databaseVersion: "MYSQL_8_0",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        backupConfiguration: {
            enabled: true,
            binaryLogEnabled: true,
        },
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    host: "%",
    password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
    displayName: "Source connection profile",
    location: "us-central1",
    connectionProfileId: "source-profile",
    mysqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
    },
});
const bucket = new gcp.storage.Bucket("bucket", {
    name: "my-bucket",
    location: "US",
    uniformBucketLevelAccess: true,
});
const viewer = new gcp.storage.BucketIAMMember("viewer", {
    bucket: bucket.name,
    role: "roles/storage.objectViewer",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const creator = new gcp.storage.BucketIAMMember("creator", {
    bucket: bucket.name,
    role: "roles/storage.objectCreator",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const reader = new gcp.storage.BucketIAMMember("reader", {
    bucket: bucket.name,
    role: "roles/storage.legacyBucketReader",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const keyUser = new gcp.kms.CryptoKeyIAMMember("key_user", {
    cryptoKeyId: "kms-name",
    role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    gcsProfile: {
        bucket: bucket.name,
        rootPath: "/path",
    },
});
const _default = new gcp.datastream.Stream("default", {
    streamId: "my-stream",
    desiredState: "NOT_STARTED",
    location: "us-central1",
    displayName: "my stream",
    labels: {
        key: "value",
    },
    sourceConfig: {
        sourceConnectionProfile: sourceConnectionProfile.id,
        mysqlSourceConfig: {
            includeObjects: {
                mysqlDatabases: [{
                    database: "my-database",
                    mysqlTables: [
                        {
                            table: "includedTable",
                            mysqlColumns: [{
                                column: "includedColumn",
                                dataType: "VARCHAR",
                                collation: "utf8mb4",
                                primaryKey: false,
                                nullable: false,
                                ordinalPosition: 0,
                            }],
                        },
                        {
                            table: "includedTable_2",
                        },
                    ],
                }],
            },
            excludeObjects: {
                mysqlDatabases: [{
                    database: "my-database",
                    mysqlTables: [{
                        table: "excludedTable",
                        mysqlColumns: [{
                            column: "excludedColumn",
                            dataType: "VARCHAR",
                            collation: "utf8mb4",
                            primaryKey: false,
                            nullable: false,
                            ordinalPosition: 0,
                        }],
                    }],
                }],
            },
            maxConcurrentCdcTasks: 5,
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destinationConnectionProfile.id,
        gcsDestinationConfig: {
            path: "mydata",
            fileRotationMb: 200,
            fileRotationInterval: "60s",
            jsonFileFormat: {
                schemaFileFormat: "NO_SCHEMA_FILE",
                compression: "GZIP",
            },
        },
    },
    backfillAll: {
        mysqlExcludedObjects: {
            mysqlDatabases: [{
                database: "my-database",
                mysqlTables: [{
                    table: "excludedTable",
                    mysqlColumns: [{
                        column: "excludedColumn",
                        dataType: "VARCHAR",
                        collation: "utf8mb4",
                        primaryKey: false,
                        nullable: false,
                        ordinalPosition: 0,
                    }],
                }],
            }],
        },
    },
    customerManagedEncryptionKey: "kms-name",
}, {
    dependsOn: [keyUser],
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
    name="my-instance",
    database_version="MYSQL_8_0",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "backup_configuration": {
            "enabled": True,
            "binary_log_enabled": True,
        },
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    host="%",
    password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
    display_name="Source connection profile",
    location="us-central1",
    connection_profile_id="source-profile",
    mysql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
    })
bucket = gcp.storage.Bucket("bucket",
    name="my-bucket",
    location="US",
    uniform_bucket_level_access=True)
viewer = gcp.storage.BucketIAMMember("viewer",
    bucket=bucket.name,
    role="roles/storage.objectViewer",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
creator = gcp.storage.BucketIAMMember("creator",
    bucket=bucket.name,
    role="roles/storage.objectCreator",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
reader = gcp.storage.BucketIAMMember("reader",
    bucket=bucket.name,
    role="roles/storage.legacyBucketReader",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
key_user = gcp.kms.CryptoKeyIAMMember("key_user",
    crypto_key_id="kms-name",
    role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="destination-profile",
    gcs_profile={
        "bucket": bucket.name,
        "root_path": "/path",
    })
default = gcp.datastream.Stream("default",
    stream_id="my-stream",
    desired_state="NOT_STARTED",
    location="us-central1",
    display_name="my stream",
    labels={
        "key": "value",
    },
    source_config={
        "source_connection_profile": source_connection_profile.id,
        "mysql_source_config": {
            "include_objects": {
                "mysql_databases": [{
                    "database": "my-database",
                    "mysql_tables": [
                        {
                            "table": "includedTable",
                            "mysql_columns": [{
                                "column": "includedColumn",
                                "data_type": "VARCHAR",
                                "collation": "utf8mb4",
                                "primary_key": False,
                                "nullable": False,
                                "ordinal_position": 0,
                            }],
                        },
                        {
                            "table": "includedTable_2",
                        },
                    ],
                }],
            },
            "exclude_objects": {
                "mysql_databases": [{
                    "database": "my-database",
                    "mysql_tables": [{
                        "table": "excludedTable",
                        "mysql_columns": [{
                            "column": "excludedColumn",
                            "data_type": "VARCHAR",
                            "collation": "utf8mb4",
                            "primary_key": False,
                            "nullable": False,
                            "ordinal_position": 0,
                        }],
                    }],
                }],
            },
            "max_concurrent_cdc_tasks": 5,
        },
    },
    destination_config={
        "destination_connection_profile": destination_connection_profile.id,
        "gcs_destination_config": {
            "path": "mydata",
            "file_rotation_mb": 200,
            "file_rotation_interval": "60s",
            "json_file_format": {
                "schema_file_format": "NO_SCHEMA_FILE",
                "compression": "GZIP",
            },
        },
    },
    backfill_all={
        "mysql_excluded_objects": {
            "mysql_databases": [{
                "database": "my-database",
                "mysql_tables": [{
                    "table": "excludedTable",
                    "mysql_columns": [{
                        "column": "excludedColumn",
                        "data_type": "VARCHAR",
                        "collation": "utf8mb4",
                        "primary_key": False,
                        "nullable": False,
                        "ordinal_position": 0,
                    }],
                }],
            }],
        },
    },
    customer_managed_encryption_key="kms-name",
    opts = pulumi.ResourceOptions(depends_on=[key_user]))
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/kms"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("my-instance"),
			DatabaseVersion: pulumi.String("MYSQL_8_0"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
					Enabled:          pulumi.Bool(true),
					BinaryLogEnabled: pulumi.Bool(true),
				},
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Host:     pulumi.String("%"),
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Source connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
			},
		})
		if err != nil {
			return err
		}
		bucket, err := storage.NewBucket(ctx, "bucket", &storage.BucketArgs{
			Name:                     pulumi.String("my-bucket"),
			Location:                 pulumi.String("US"),
			UniformBucketLevelAccess: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = storage.NewBucketIAMMember(ctx, "viewer", &storage.BucketIAMMemberArgs{
			Bucket: bucket.Name,
			Role:   pulumi.String("roles/storage.objectViewer"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		_, err = storage.NewBucketIAMMember(ctx, "creator", &storage.BucketIAMMemberArgs{
			Bucket: bucket.Name,
			Role:   pulumi.String("roles/storage.objectCreator"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		_, err = storage.NewBucketIAMMember(ctx, "reader", &storage.BucketIAMMemberArgs{
			Bucket: bucket.Name,
			Role:   pulumi.String("roles/storage.legacyBucketReader"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		keyUser, err := kms.NewCryptoKeyIAMMember(ctx, "key_user", &kms.CryptoKeyIAMMemberArgs{
			CryptoKeyId: pulumi.String("kms-name"),
			Role:        pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
			Member:      pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			GcsProfile: &datastream.ConnectionProfileGcsProfileArgs{
				Bucket:   bucket.Name,
				RootPath: pulumi.String("/path"),
			},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			StreamId:     pulumi.String("my-stream"),
			DesiredState: pulumi.String("NOT_STARTED"),
			Location:     pulumi.String("us-central1"),
			DisplayName:  pulumi.String("my stream"),
			Labels: pulumi.StringMap{
				"key": pulumi.String("value"),
			},
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: sourceConnectionProfile.ID(),
				MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{
					IncludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs{
						MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArray{
							&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs{
								Database: pulumi.String("my-database"),
								MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArray{
									&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
										Table: pulumi.String("includedTable"),
										MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
											&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
												Column:          pulumi.String("includedColumn"),
												DataType:        pulumi.String("VARCHAR"),
												Collation:       pulumi.String("utf8mb4"),
												PrimaryKey:      pulumi.Bool(false),
												Nullable:        pulumi.Bool(false),
												OrdinalPosition: pulumi.Int(0),
											},
										},
									},
									&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
										Table: pulumi.String("includedTable_2"),
									},
								},
							},
						},
					},
					ExcludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs{
						MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArray{
							&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs{
								Database: pulumi.String("my-database"),
								MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArray{
									&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs{
										Table: pulumi.String("excludedTable"),
										MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
											&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
												Column:          pulumi.String("excludedColumn"),
												DataType:        pulumi.String("VARCHAR"),
												Collation:       pulumi.String("utf8mb4"),
												PrimaryKey:      pulumi.Bool(false),
												Nullable:        pulumi.Bool(false),
												OrdinalPosition: pulumi.Int(0),
											},
										},
									},
								},
							},
						},
					},
					MaxConcurrentCdcTasks: pulumi.Int(5),
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destinationConnectionProfile.ID(),
				GcsDestinationConfig: &datastream.StreamDestinationConfigGcsDestinationConfigArgs{
					Path:                 pulumi.String("mydata"),
					FileRotationMb:       pulumi.Int(200),
					FileRotationInterval: pulumi.String("60s"),
					JsonFileFormat: &datastream.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs{
						SchemaFileFormat: pulumi.String("NO_SCHEMA_FILE"),
						Compression:      pulumi.String("GZIP"),
					},
				},
			},
			BackfillAll: &datastream.StreamBackfillAllArgs{
				MysqlExcludedObjects: &datastream.StreamBackfillAllMysqlExcludedObjectsArgs{
					MysqlDatabases: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArray{
						&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs{
							Database: pulumi.String("my-database"),
							MysqlTables: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArray{
								&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs{
									Table: pulumi.String("excludedTable"),
									MysqlColumns: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
										&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
											Column:          pulumi.String("excludedColumn"),
											DataType:        pulumi.String("VARCHAR"),
											Collation:       pulumi.String("utf8mb4"),
											PrimaryKey:      pulumi.Bool(false),
											Nullable:        pulumi.Bool(false),
											OrdinalPosition: pulumi.Int(0),
										},
									},
								},
							},
						},
					},
				},
			},
			CustomerManagedEncryptionKey: pulumi.String("kms-name"),
		}, pulumi.DependsOn([]pulumi.Resource{
			keyUser,
		}))
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "my-instance",
        DatabaseVersion = "MYSQL_8_0",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
            {
                Enabled = true,
                BinaryLogEnabled = true,
            },
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Host = "%",
        Password = pwd.Result,
    });

    var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
    {
        DisplayName = "Source connection profile",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
        },
    });

    var bucket = new Gcp.Storage.Bucket("bucket", new()
    {
        Name = "my-bucket",
        Location = "US",
        UniformBucketLevelAccess = true,
    });

    var viewer = new Gcp.Storage.BucketIAMMember("viewer", new()
    {
        Bucket = bucket.Name,
        Role = "roles/storage.objectViewer",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var creator = new Gcp.Storage.BucketIAMMember("creator", new()
    {
        Bucket = bucket.Name,
        Role = "roles/storage.objectCreator",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var reader = new Gcp.Storage.BucketIAMMember("reader", new()
    {
        Bucket = bucket.Name,
        Role = "roles/storage.legacyBucketReader",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var keyUser = new Gcp.Kms.CryptoKeyIAMMember("key_user", new()
    {
        CryptoKeyId = "kms-name",
        Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        GcsProfile = new Gcp.Datastream.Inputs.ConnectionProfileGcsProfileArgs
        {
            Bucket = bucket.Name,
            RootPath = "/path",
        },
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        StreamId = "my-stream",
        DesiredState = "NOT_STARTED",
        Location = "us-central1",
        DisplayName = "my stream",
        Labels = 
        {
            { "key", "value" },
        },
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = sourceConnectionProfile.Id,
            MysqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigArgs
            {
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs
                {
                    MysqlDatabases = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs
                        {
                            Database = "my-database",
                            MysqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
                                {
                                    Table = "includedTable",
                                    MysqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
                                        {
                                            Column = "includedColumn",
                                            DataType = "VARCHAR",
                                            Collation = "utf8mb4",
                                            PrimaryKey = false,
                                            Nullable = false,
                                            OrdinalPosition = 0,
                                        },
                                    },
                                },
                                new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
                                {
                                    Table = "includedTable_2",
                                },
                            },
                        },
                    },
                },
                ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs
                {
                    MysqlDatabases = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs
                        {
                            Database = "my-database",
                            MysqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs
                                {
                                    Table = "excludedTable",
                                    MysqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
                                        {
                                            Column = "excludedColumn",
                                            DataType = "VARCHAR",
                                            Collation = "utf8mb4",
                                            PrimaryKey = false,
                                            Nullable = false,
                                            OrdinalPosition = 0,
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
                MaxConcurrentCdcTasks = 5,
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destinationConnectionProfile.Id,
            GcsDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigArgs
            {
                Path = "mydata",
                FileRotationMb = 200,
                FileRotationInterval = "60s",
                JsonFileFormat = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs
                {
                    SchemaFileFormat = "NO_SCHEMA_FILE",
                    Compression = "GZIP",
                },
            },
        },
        BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
        {
            MysqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsArgs
            {
                MysqlDatabases = new[]
                {
                    new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs
                    {
                        Database = "my-database",
                        MysqlTables = new[]
                        {
                            new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs
                            {
                                Table = "excludedTable",
                                MysqlColumns = new[]
                                {
                                    new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
                                    {
                                        Column = "excludedColumn",
                                        DataType = "VARCHAR",
                                        Collation = "utf8mb4",
                                        PrimaryKey = false,
                                        Nullable = false,
                                        OrdinalPosition = 0,
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
        CustomerManagedEncryptionKey = "kms-name",
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            keyUser,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.kms.CryptoKeyIAMMember;
import com.pulumi.gcp.kms.CryptoKeyIAMMemberArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileGcsProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllMysqlExcludedObjectsArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("my-instance")
            .databaseVersion("MYSQL_8_0")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
                    .enabled(true)
                    .binaryLogEnabled(true)
                    .build())
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(true)
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .host("%")
            .password(pwd.result())
            .build());

        var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Source connection profile")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .build())
            .build());

        var bucket = new Bucket("bucket", BucketArgs.builder()
            .name("my-bucket")
            .location("US")
            .uniformBucketLevelAccess(true)
            .build());

        var viewer = new BucketIAMMember("viewer", BucketIAMMemberArgs.builder()
            .bucket(bucket.name())
            .role("roles/storage.objectViewer")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var creator = new BucketIAMMember("creator", BucketIAMMemberArgs.builder()
            .bucket(bucket.name())
            .role("roles/storage.objectCreator")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var reader = new BucketIAMMember("reader", BucketIAMMemberArgs.builder()
            .bucket(bucket.name())
            .role("roles/storage.legacyBucketReader")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var keyUser = new CryptoKeyIAMMember("keyUser", CryptoKeyIAMMemberArgs.builder()
            .cryptoKeyId("kms-name")
            .role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .gcsProfile(ConnectionProfileGcsProfileArgs.builder()
                .bucket(bucket.name())
                .rootPath("/path")
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .streamId("my-stream")
            .desiredState("NOT_STARTED")
            .location("us-central1")
            .displayName("my stream")
            .labels(Map.of("key", "value"))
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(sourceConnectionProfile.id())
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .includeObjects(StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs.builder()
                        .mysqlDatabases(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs.builder()
                            .database("my-database")
                            .mysqlTables(                            
                                StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
                                    .table("includedTable")
                                    .mysqlColumns(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
                                        .column("includedColumn")
                                        .dataType("VARCHAR")
                                        .collation("utf8mb4")
                                        .primaryKey(false)
                                        .nullable(false)
                                        .ordinalPosition(0)
                                        .build())
                                    .build(),
                                StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
                                    .table("includedTable_2")
                                    .build())
                            .build())
                        .build())
                    .excludeObjects(StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs.builder()
                        .mysqlDatabases(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs.builder()
                            .database("my-database")
                            .mysqlTables(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs.builder()
                                .table("excludedTable")
                                .mysqlColumns(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
                                    .column("excludedColumn")
                                    .dataType("VARCHAR")
                                    .collation("utf8mb4")
                                    .primaryKey(false)
                                    .nullable(false)
                                    .ordinalPosition(0)
                                    .build())
                                .build())
                            .build())
                        .build())
                    .maxConcurrentCdcTasks(5)
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destinationConnectionProfile.id())
                .gcsDestinationConfig(StreamDestinationConfigGcsDestinationConfigArgs.builder()
                    .path("mydata")
                    .fileRotationMb(200)
                    .fileRotationInterval("60s")
                    .jsonFileFormat(StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs.builder()
                        .schemaFileFormat("NO_SCHEMA_FILE")
                        .compression("GZIP")
                        .build())
                    .build())
                .build())
            .backfillAll(StreamBackfillAllArgs.builder()
                .mysqlExcludedObjects(StreamBackfillAllMysqlExcludedObjectsArgs.builder()
                    .mysqlDatabases(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs.builder()
                        .database("my-database")
                        .mysqlTables(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs.builder()
                            .table("excludedTable")
                            .mysqlColumns(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
                                .column("excludedColumn")
                                .dataType("VARCHAR")
                                .collation("utf8mb4")
                                .primaryKey(false)
                                .nullable(false)
                                .ordinalPosition(0)
                                .build())
                            .build())
                        .build())
                    .build())
                .build())
            .customerManagedEncryptionKey("kms-name")
            .build(), CustomResourceOptions.builder()
                .dependsOn(keyUser)
                .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: my-instance
      databaseVersion: MYSQL_8_0
      region: us-central1
      settings:
        tier: db-f1-micro
        backupConfiguration:
          enabled: true
          binaryLogEnabled: true
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: true
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      host: '%'
      password: ${pwd.result}
  sourceConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: source_connection_profile
    properties:
      displayName: Source connection profile
      location: us-central1
      connectionProfileId: source-profile
      mysqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}
  bucket:
    type: gcp:storage:Bucket
    properties:
      name: my-bucket
      location: US
      uniformBucketLevelAccess: true
  viewer:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${bucket.name}
      role: roles/storage.objectViewer
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  creator:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${bucket.name}
      role: roles/storage.objectCreator
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  reader:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${bucket.name}
      role: roles/storage.legacyBucketReader
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  keyUser:
    type: gcp:kms:CryptoKeyIAMMember
    name: key_user
    properties:
      cryptoKeyId: kms-name
      role: roles/cloudkms.cryptoKeyEncrypterDecrypter
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  destinationConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: destination_connection_profile
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: destination-profile
      gcsProfile:
        bucket: ${bucket.name}
        rootPath: /path
  default:
    type: gcp:datastream:Stream
    properties:
      streamId: my-stream
      desiredState: NOT_STARTED
      location: us-central1
      displayName: my stream
      labels:
        key: value
      sourceConfig:
        sourceConnectionProfile: ${sourceConnectionProfile.id}
        mysqlSourceConfig:
          includeObjects:
            mysqlDatabases:
              - database: my-database
                mysqlTables:
                  - table: includedTable
                    mysqlColumns:
                      - column: includedColumn
                        dataType: VARCHAR
                        collation: utf8mb4
                        primaryKey: false
                        nullable: false
                        ordinalPosition: 0
                  - table: includedTable_2
          excludeObjects:
            mysqlDatabases:
              - database: my-database
                mysqlTables:
                  - table: excludedTable
                    mysqlColumns:
                      - column: excludedColumn
                        dataType: VARCHAR
                        collation: utf8mb4
                        primaryKey: false
                        nullable: false
                        ordinalPosition: 0
          maxConcurrentCdcTasks: 5
      destinationConfig:
        destinationConnectionProfile: ${destinationConnectionProfile.id}
        gcsDestinationConfig:
          path: mydata
          fileRotationMb: 200
          fileRotationInterval: 60s
          jsonFileFormat:
            schemaFileFormat: NO_SCHEMA_FILE
            compression: GZIP
      backfillAll:
        mysqlExcludedObjects:
          mysqlDatabases:
            - database: my-database
              mysqlTables:
                - table: excludedTable
                  mysqlColumns:
                    - column: excludedColumn
                      dataType: VARCHAR
                      collation: utf8mb4
                      primaryKey: false
                      nullable: false
                      ordinalPosition: 0
      customerManagedEncryptionKey: kms-name
    options:
      dependsOn:
        - ${keyUser}
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The sourceConfig references a MySQL connection profile and defines includeObjects and excludeObjects to filter which databases, tables, and columns get replicated. The destinationConfig points to a GCS connection profile and configures file rotation and JSON formatting. The customerManagedEncryptionKey property encrypts data at rest using a KMS key. The backfillAll property controls initial historical data load, with its own exclusion filters.

Stream PostgreSQL to BigQuery with publication slots

PostgreSQL replication to BigQuery requires configuring publication and replication slots on the source database for change data capture.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const source = new gcp.datastream.ConnectionProfile("source", {
    displayName: "Postgresql Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    postgresqlProfile: {
        hostname: "hostname",
        port: 5432,
        username: "user",
        password: "pass",
        database: "postgres",
    },
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
    displayName: "BigQuery Destination",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "Postgres to BigQuery",
    location: "us-central1",
    streamId: "my-stream",
    desiredState: "RUNNING",
    sourceConfig: {
        sourceConnectionProfile: source.id,
        postgresqlSourceConfig: {
            maxConcurrentBackfillTasks: 12,
            publication: "publication",
            replicationSlot: "replication_slot",
            includeObjects: {
                postgresqlSchemas: [{
                    schema: "schema",
                    postgresqlTables: [{
                        table: "table",
                        postgresqlColumns: [{
                            column: "column",
                        }],
                    }],
                }],
            },
            excludeObjects: {
                postgresqlSchemas: [{
                    schema: "schema",
                    postgresqlTables: [{
                        table: "table",
                        postgresqlColumns: [{
                            column: "column",
                        }],
                    }],
                }],
            },
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destination.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
        },
    },
    backfillAll: {
        postgresqlExcludedObjects: {
            postgresqlSchemas: [{
                schema: "schema",
                postgresqlTables: [{
                    table: "table",
                    postgresqlColumns: [{
                        column: "column",
                    }],
                }],
            }],
        },
    },
});
import pulumi
import pulumi_gcp as gcp

source = gcp.datastream.ConnectionProfile("source",
    display_name="Postgresql Source",
    location="us-central1",
    connection_profile_id="source-profile",
    postgresql_profile={
        "hostname": "hostname",
        "port": 5432,
        "username": "user",
        "password": "pass",
        "database": "postgres",
    })
destination = gcp.datastream.ConnectionProfile("destination",
    display_name="BigQuery Destination",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    display_name="Postgres to BigQuery",
    location="us-central1",
    stream_id="my-stream",
    desired_state="RUNNING",
    source_config={
        "source_connection_profile": source.id,
        "postgresql_source_config": {
            "max_concurrent_backfill_tasks": 12,
            "publication": "publication",
            "replication_slot": "replication_slot",
            "include_objects": {
                "postgresql_schemas": [{
                    "schema": "schema",
                    "postgresql_tables": [{
                        "table": "table",
                        "postgresql_columns": [{
                            "column": "column",
                        }],
                    }],
                }],
            },
            "exclude_objects": {
                "postgresql_schemas": [{
                    "schema": "schema",
                    "postgresql_tables": [{
                        "table": "table",
                        "postgresql_columns": [{
                            "column": "column",
                        }],
                    }],
                }],
            },
        },
    },
    destination_config={
        "destination_connection_profile": destination.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
        },
    },
    backfill_all={
        "postgresql_excluded_objects": {
            "postgresql_schemas": [{
                "schema": "schema",
                "postgresql_tables": [{
                    "table": "table",
                    "postgresql_columns": [{
                        "column": "column",
                    }],
                }],
            }],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Postgresql Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			PostgresqlProfile: &datastream.ConnectionProfilePostgresqlProfileArgs{
				Hostname: pulumi.String("hostname"),
				Port:     pulumi.Int(5432),
				Username: pulumi.String("user"),
				Password: pulumi.String("pass"),
				Database: pulumi.String("postgres"),
			},
		})
		if err != nil {
			return err
		}
		destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("BigQuery Destination"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName:  pulumi.String("Postgres to BigQuery"),
			Location:     pulumi.String("us-central1"),
			StreamId:     pulumi.String("my-stream"),
			DesiredState: pulumi.String("RUNNING"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: source.ID(),
				PostgresqlSourceConfig: &datastream.StreamSourceConfigPostgresqlSourceConfigArgs{
					MaxConcurrentBackfillTasks: pulumi.Int(12),
					Publication:                pulumi.String("publication"),
					ReplicationSlot:            pulumi.String("replication_slot"),
					IncludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs{
						PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArray{
							&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs{
								Schema: pulumi.String("schema"),
								PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArray{
									&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs{
										Table: pulumi.String("table"),
										PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
											&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
												Column: pulumi.String("column"),
											},
										},
									},
								},
							},
						},
					},
					ExcludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs{
						PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArray{
							&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs{
								Schema: pulumi.String("schema"),
								PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArray{
									&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs{
										Table: pulumi.String("table"),
										PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
											&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
												Column: pulumi.String("column"),
											},
										},
									},
								},
							},
						},
					},
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destination.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
				},
			},
			BackfillAll: &datastream.StreamBackfillAllArgs{
				PostgresqlExcludedObjects: &datastream.StreamBackfillAllPostgresqlExcludedObjectsArgs{
					PostgresqlSchemas: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArray{
						&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs{
							Schema: pulumi.String("schema"),
							PostgresqlTables: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArray{
								&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs{
									Table: pulumi.String("table"),
									PostgresqlColumns: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
										&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
											Column: pulumi.String("column"),
										},
									},
								},
							},
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var source = new Gcp.Datastream.ConnectionProfile("source", new()
    {
        DisplayName = "Postgresql Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
        {
            Hostname = "hostname",
            Port = 5432,
            Username = "user",
            Password = "pass",
            Database = "postgres",
        },
    });

    var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
    {
        DisplayName = "BigQuery Destination",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "Postgres to BigQuery",
        Location = "us-central1",
        StreamId = "my-stream",
        DesiredState = "RUNNING",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = source.Id,
            PostgresqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigArgs
            {
                MaxConcurrentBackfillTasks = 12,
                Publication = "publication",
                ReplicationSlot = "replication_slot",
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs
                {
                    PostgresqlSchemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs
                        {
                            Schema = "schema",
                            PostgresqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs
                                {
                                    Table = "table",
                                    PostgresqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
                                        {
                                            Column = "column",
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
                ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs
                {
                    PostgresqlSchemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs
                        {
                            Schema = "schema",
                            PostgresqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs
                                {
                                    Table = "table",
                                    PostgresqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
                                        {
                                            Column = "column",
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destination.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
            },
        },
        BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
        {
            PostgresqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs
            {
                PostgresqlSchemas = new[]
                {
                    new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs
                    {
                        Schema = "schema",
                        PostgresqlTables = new[]
                        {
                            new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs
                            {
                                Table = "table",
                                PostgresqlColumns = new[]
                                {
                                    new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
                                    {
                                        Column = "column",
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePostgresqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
            .displayName("Postgresql Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .postgresqlProfile(ConnectionProfilePostgresqlProfileArgs.builder()
                .hostname("hostname")
                .port(5432)
                .username("user")
                .password("pass")
                .database("postgres")
                .build())
            .build());

        var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
            .displayName("BigQuery Destination")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("Postgres to BigQuery")
            .location("us-central1")
            .streamId("my-stream")
            .desiredState("RUNNING")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(source.id())
                .postgresqlSourceConfig(StreamSourceConfigPostgresqlSourceConfigArgs.builder()
                    .maxConcurrentBackfillTasks(12)
                    .publication("publication")
                    .replicationSlot("replication_slot")
                    .includeObjects(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs.builder()
                        .postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs.builder()
                            .schema("schema")
                            .postgresqlTables(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
                                .table("table")
                                .postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
                                    .column("column")
                                    .build())
                                .build())
                            .build())
                        .build())
                    .excludeObjects(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs.builder()
                        .postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs.builder()
                            .schema("schema")
                            .postgresqlTables(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
                                .table("table")
                                .postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
                                    .column("column")
                                    .build())
                                .build())
                            .build())
                        .build())
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destination.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .build())
                .build())
            .backfillAll(StreamBackfillAllArgs.builder()
                .postgresqlExcludedObjects(StreamBackfillAllPostgresqlExcludedObjectsArgs.builder()
                    .postgresqlSchemas(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs.builder()
                        .schema("schema")
                        .postgresqlTables(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
                            .table("table")
                            .postgresqlColumns(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
                                .column("column")
                                .build())
                            .build())
                        .build())
                    .build())
                .build())
            .build());

    }
}
resources:
  source:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: Postgresql Source
      location: us-central1
      connectionProfileId: source-profile
      postgresqlProfile:
        hostname: hostname
        port: 5432
        username: user
        password: pass
        database: postgres
  destination:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: BigQuery Destination
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: Postgres to BigQuery
      location: us-central1
      streamId: my-stream
      desiredState: RUNNING
      sourceConfig:
        sourceConnectionProfile: ${source.id}
        postgresqlSourceConfig:
          maxConcurrentBackfillTasks: 12
          publication: publication
          replicationSlot: replication_slot
          includeObjects:
            postgresqlSchemas:
              - schema: schema
                postgresqlTables:
                  - table: table
                    postgresqlColumns:
                      - column: column
          excludeObjects:
            postgresqlSchemas:
              - schema: schema
                postgresqlTables:
                  - table: table
                    postgresqlColumns:
                      - column: column
      destinationConfig:
        destinationConnectionProfile: ${destination.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
      backfillAll:
        postgresqlExcludedObjects:
          postgresqlSchemas:
            - schema: schema
              postgresqlTables:
                - table: table
                  postgresqlColumns:
                    - column: column

The postgresqlSourceConfig specifies the publication and replicationSlot names that must exist in your PostgreSQL database. The bigqueryDestinationConfig uses sourceHierarchyDatasets to create separate BigQuery datasets matching your source schema structure. The dataFreshness property controls how often BigQuery tables are updated with new changes.

Stream SQL Server using transaction logs

SQL Server replication can use transaction logs to capture changes, providing low-latency CDC without additional database configuration.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "sql-server",
    databaseVersion: "SQLSERVER_2019_STANDARD",
    region: "us-central1",
    rootPassword: "root-password",
    deletionProtection: true,
    settings: {
        tier: "db-custom-2-4096",
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: "password",
});
const db = new gcp.sql.Database("db", {
    name: "db",
    instance: instance.name,
}, {
    dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
    displayName: "SQL Server Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    sqlServerProfile: {
        hostname: instance.publicIpAddress,
        port: 1433,
        username: user.name,
        password: user.password,
        database: db.name,
    },
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
    displayName: "BigQuery Destination",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "SQL Server to BigQuery",
    location: "us-central1",
    streamId: "stream",
    sourceConfig: {
        sourceConnectionProfile: source.id,
        sqlServerSourceConfig: {
            includeObjects: {
                schemas: [{
                    schema: "schema",
                    tables: [{
                        table: "table",
                    }],
                }],
            },
            transactionLogs: {},
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destination.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
        },
    },
    backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp

instance = gcp.sql.DatabaseInstance("instance",
    name="sql-server",
    database_version="SQLSERVER_2019_STANDARD",
    region="us-central1",
    root_password="root-password",
    deletion_protection=True,
    settings={
        "tier": "db-custom-2-4096",
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    })
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password="password")
db = gcp.sql.Database("db",
    name="db",
    instance=instance.name,
    opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
    display_name="SQL Server Source",
    location="us-central1",
    connection_profile_id="source-profile",
    sql_server_profile={
        "hostname": instance.public_ip_address,
        "port": 1433,
        "username": user.name,
        "password": user.password,
        "database": db.name,
    })
destination = gcp.datastream.ConnectionProfile("destination",
    display_name="BigQuery Destination",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    display_name="SQL Server to BigQuery",
    location="us-central1",
    stream_id="stream",
    source_config={
        "source_connection_profile": source.id,
        "sql_server_source_config": {
            "include_objects": {
                "schemas": [{
                    "schema": "schema",
                    "tables": [{
                        "table": "table",
                    }],
                }],
            },
            "transaction_logs": {},
        },
    },
    destination_config={
        "destination_connection_profile": destination.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
        },
    },
    backfill_none={})
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:               pulumi.String("sql-server"),
			DatabaseVersion:    pulumi.String("SQLSERVER_2019_STANDARD"),
			Region:             pulumi.String("us-central1"),
			RootPassword:       pulumi.String("root-password"),
			DeletionProtection: pulumi.Bool(true),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-custom-2-4096"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Password: pulumi.String("password"),
		})
		if err != nil {
			return err
		}
		db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Name:     pulumi.String("db"),
			Instance: instance.Name,
		}, pulumi.DependsOn([]pulumi.Resource{
			user,
		}))
		if err != nil {
			return err
		}
		source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("SQL Server Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
				Hostname: instance.PublicIpAddress,
				Port:     pulumi.Int(1433),
				Username: user.Name,
				Password: user.Password,
				Database: db.Name,
			},
		})
		if err != nil {
			return err
		}
		destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("BigQuery Destination"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName: pulumi.String("SQL Server to BigQuery"),
			Location:    pulumi.String("us-central1"),
			StreamId:    pulumi.String("stream"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: source.ID(),
				SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
					IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
						Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
							&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
								Schema: pulumi.String("schema"),
								Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
									&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
										Table: pulumi.String("table"),
									},
								},
							},
						},
					},
					TransactionLogs: &datastream.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs{},
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destination.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "sql-server",
        DatabaseVersion = "SQLSERVER_2019_STANDARD",
        Region = "us-central1",
        RootPassword = "root-password",
        DeletionProtection = true,
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-custom-2-4096",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = "password",
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Name = "db",
        Instance = instance.Name,
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            user,
        },
    });

    var source = new Gcp.Datastream.ConnectionProfile("source", new()
    {
        DisplayName = "SQL Server Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Port = 1433,
            Username = user.Name,
            Password = user.Password,
            Database = db.Name,
        },
    });

    var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
    {
        DisplayName = "BigQuery Destination",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "SQL Server to BigQuery",
        Location = "us-central1",
        StreamId = "stream",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = source.Id,
            SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
            {
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
                {
                    Schemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
                        {
                            Schema = "schema",
                            Tables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
                                {
                                    Table = "table",
                                },
                            },
                        },
                    },
                },
                TransactionLogs = null,
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destination.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
            },
        },
        BackfillNone = null,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("sql-server")
            .databaseVersion("SQLSERVER_2019_STANDARD")
            .region("us-central1")
            .rootPassword("root-password")
            .deletionProtection(true)
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-custom-2-4096")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .password("password")
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .name("db")
            .instance(instance.name())
            .build(), CustomResourceOptions.builder()
                .dependsOn(user)
                .build());

        var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
            .displayName("SQL Server Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .port(1433)
                .username(user.name())
                .password(user.password())
                .database(db.name())
                .build())
            .build());

        var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
            .displayName("BigQuery Destination")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("SQL Server to BigQuery")
            .location("us-central1")
            .streamId("stream")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(source.id())
                .sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
                    .includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
                        .schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
                            .schema("schema")
                            .tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
                                .table("table")
                                .build())
                            .build())
                        .build())
                    .transactionLogs(StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs.builder()
                        .build())
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destination.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: sql-server
      databaseVersion: SQLSERVER_2019_STANDARD
      region: us-central1
      rootPassword: root-password
      deletionProtection: true
      settings:
        tier: db-custom-2-4096
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
  db:
    type: gcp:sql:Database
    properties:
      name: db
      instance: ${instance.name}
    options:
      dependsOn:
        - ${user}
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      password: password
  source:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: SQL Server Source
      location: us-central1
      connectionProfileId: source-profile
      sqlServerProfile:
        hostname: ${instance.publicIpAddress}
        port: 1433
        username: ${user.name}
        password: ${user.password}
        database: ${db.name}
  destination:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: BigQuery Destination
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: SQL Server to BigQuery
      location: us-central1
      streamId: stream
      sourceConfig:
        sourceConnectionProfile: ${source.id}
        sqlServerSourceConfig:
          includeObjects:
            schemas:
              - schema: schema
                tables:
                  - table: table
          transactionLogs: {}
      destinationConfig:
        destinationConnectionProfile: ${destination.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
      backfillNone: {}

The sqlServerSourceConfig with transactionLogs captures changes directly from SQL Server transaction logs. The includeObjects property filters by schema and table. The backfillNone property skips historical data load, starting replication from the current point in time.

Stream SQL Server using change tables

SQL Server change data capture can use change tables instead of transaction logs, which may be preferred for certain database configurations.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "sql-server",
    databaseVersion: "SQLSERVER_2019_STANDARD",
    region: "us-central1",
    rootPassword: "root-password",
    deletionProtection: true,
    settings: {
        tier: "db-custom-2-4096",
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: "password",
});
const db = new gcp.sql.Database("db", {
    name: "db",
    instance: instance.name,
}, {
    dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
    displayName: "SQL Server Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    sqlServerProfile: {
        hostname: instance.publicIpAddress,
        port: 1433,
        username: user.name,
        password: user.password,
        database: db.name,
    },
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
    displayName: "BigQuery Destination",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "SQL Server to BigQuery",
    location: "us-central1",
    streamId: "stream",
    sourceConfig: {
        sourceConnectionProfile: source.id,
        sqlServerSourceConfig: {
            includeObjects: {
                schemas: [{
                    schema: "schema",
                    tables: [{
                        table: "table",
                    }],
                }],
            },
            changeTables: {},
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destination.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
        },
    },
    backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp

instance = gcp.sql.DatabaseInstance("instance",
    name="sql-server",
    database_version="SQLSERVER_2019_STANDARD",
    region="us-central1",
    root_password="root-password",
    deletion_protection=True,
    settings={
        "tier": "db-custom-2-4096",
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    })
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password="password")
db = gcp.sql.Database("db",
    name="db",
    instance=instance.name,
    opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
    display_name="SQL Server Source",
    location="us-central1",
    connection_profile_id="source-profile",
    sql_server_profile={
        "hostname": instance.public_ip_address,
        "port": 1433,
        "username": user.name,
        "password": user.password,
        "database": db.name,
    })
destination = gcp.datastream.ConnectionProfile("destination",
    display_name="BigQuery Destination",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    display_name="SQL Server to BigQuery",
    location="us-central1",
    stream_id="stream",
    source_config={
        "source_connection_profile": source.id,
        "sql_server_source_config": {
            "include_objects": {
                "schemas": [{
                    "schema": "schema",
                    "tables": [{
                        "table": "table",
                    }],
                }],
            },
            "change_tables": {},
        },
    },
    destination_config={
        "destination_connection_profile": destination.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
        },
    },
    backfill_none={})
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:               pulumi.String("sql-server"),
			DatabaseVersion:    pulumi.String("SQLSERVER_2019_STANDARD"),
			Region:             pulumi.String("us-central1"),
			RootPassword:       pulumi.String("root-password"),
			DeletionProtection: pulumi.Bool(true),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-custom-2-4096"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Password: pulumi.String("password"),
		})
		if err != nil {
			return err
		}
		db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Name:     pulumi.String("db"),
			Instance: instance.Name,
		}, pulumi.DependsOn([]pulumi.Resource{
			user,
		}))
		if err != nil {
			return err
		}
		source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("SQL Server Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
				Hostname: instance.PublicIpAddress,
				Port:     pulumi.Int(1433),
				Username: user.Name,
				Password: user.Password,
				Database: db.Name,
			},
		})
		if err != nil {
			return err
		}
		destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("BigQuery Destination"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName: pulumi.String("SQL Server to BigQuery"),
			Location:    pulumi.String("us-central1"),
			StreamId:    pulumi.String("stream"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: source.ID(),
				SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
					IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
						Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
							&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
								Schema: pulumi.String("schema"),
								Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
									&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
										Table: pulumi.String("table"),
									},
								},
							},
						},
					},
					ChangeTables: &datastream.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs{},
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destination.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "sql-server",
        DatabaseVersion = "SQLSERVER_2019_STANDARD",
        Region = "us-central1",
        RootPassword = "root-password",
        DeletionProtection = true,
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-custom-2-4096",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = "password",
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Name = "db",
        Instance = instance.Name,
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            user,
        },
    });

    var source = new Gcp.Datastream.ConnectionProfile("source", new()
    {
        DisplayName = "SQL Server Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Port = 1433,
            Username = user.Name,
            Password = user.Password,
            Database = db.Name,
        },
    });

    var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
    {
        DisplayName = "BigQuery Destination",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "SQL Server to BigQuery",
        Location = "us-central1",
        StreamId = "stream",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = source.Id,
            SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
            {
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
                {
                    Schemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
                        {
                            Schema = "schema",
                            Tables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
                                {
                                    Table = "table",
                                },
                            },
                        },
                    },
                },
                ChangeTables = null,
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destination.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
            },
        },
        BackfillNone = null,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("sql-server")
            .databaseVersion("SQLSERVER_2019_STANDARD")
            .region("us-central1")
            .rootPassword("root-password")
            .deletionProtection(true)
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-custom-2-4096")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .password("password")
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .name("db")
            .instance(instance.name())
            .build(), CustomResourceOptions.builder()
                .dependsOn(user)
                .build());

        var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
            .displayName("SQL Server Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .port(1433)
                .username(user.name())
                .password(user.password())
                .database(db.name())
                .build())
            .build());

        var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
            .displayName("BigQuery Destination")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("SQL Server to BigQuery")
            .location("us-central1")
            .streamId("stream")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(source.id())
                .sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
                    .includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
                        .schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
                            .schema("schema")
                            .tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
                                .table("table")
                                .build())
                            .build())
                        .build())
                    .changeTables(StreamSourceConfigSqlServerSourceConfigChangeTablesArgs.builder()
                        .build())
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destination.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: sql-server
      databaseVersion: SQLSERVER_2019_STANDARD
      region: us-central1
      rootPassword: root-password
      deletionProtection: true
      settings:
        tier: db-custom-2-4096
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
  db:
    type: gcp:sql:Database
    properties:
      name: db
      instance: ${instance.name}
    options:
      dependsOn:
        - ${user}
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      password: password
  source:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: SQL Server Source
      location: us-central1
      connectionProfileId: source-profile
      sqlServerProfile:
        hostname: ${instance.publicIpAddress}
        port: 1433
        username: ${user.name}
        password: ${user.password}
        database: ${db.name}
  destination:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: BigQuery Destination
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: SQL Server to BigQuery
      location: us-central1
      streamId: stream
      sourceConfig:
        sourceConnectionProfile: ${source.id}
        sqlServerSourceConfig:
          includeObjects:
            schemas:
              - schema: schema
                tables:
                  - table: table
          changeTables: {}
      destinationConfig:
        destinationConnectionProfile: ${destination.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
      backfillNone: {}

The changeTables property configures CDC to read from SQL Server change tables rather than transaction logs. This approach requires CDC to be enabled on your SQL Server instance with change tables configured for the tables you want to replicate.

Stream to a single BigQuery dataset

Instead of creating separate datasets per source schema, you can consolidate all replicated tables into a single BigQuery dataset.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const postgres = new gcp.bigquery.Dataset("postgres", {
    datasetId: "postgres",
    friendlyName: "postgres",
    description: "Database of postgres",
    location: "us-central1",
});
const destinationConnectionProfile2 = new gcp.datastream.ConnectionProfile("destination_connection_profile2", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "dest-profile",
    bigqueryProfile: {},
});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "instance-name",
    databaseVersion: "MYSQL_8_0",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        backupConfiguration: {
            enabled: true,
            binaryLogEnabled: true,
        },
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: false,
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "my-user",
    instance: instance.name,
    host: "%",
    password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
    displayName: "Source connection profile",
    location: "us-central1",
    connectionProfileId: "source-profile",
    mysqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
    },
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "postgres to bigQuery",
    location: "us-central1",
    streamId: "postgres-bigquery",
    sourceConfig: {
        sourceConnectionProfile: sourceConnectionProfile.id,
        mysqlSourceConfig: {},
    },
    destinationConfig: {
        destinationConnectionProfile: destinationConnectionProfile2.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            singleTargetDataset: {
                datasetId: postgres.id,
            },
        },
    },
    backfillAll: {},
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

postgres = gcp.bigquery.Dataset("postgres",
    dataset_id="postgres",
    friendly_name="postgres",
    description="Database of postgres",
    location="us-central1")
destination_connection_profile2 = gcp.datastream.ConnectionProfile("destination_connection_profile2",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="dest-profile",
    bigquery_profile={})
instance = gcp.sql.DatabaseInstance("instance",
    name="instance-name",
    database_version="MYSQL_8_0",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "backup_configuration": {
            "enabled": True,
            "binary_log_enabled": True,
        },
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=False)
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="my-user",
    instance=instance.name,
    host="%",
    password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
    display_name="Source connection profile",
    location="us-central1",
    connection_profile_id="source-profile",
    mysql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
    })
default = gcp.datastream.Stream("default",
    display_name="postgres to bigQuery",
    location="us-central1",
    stream_id="postgres-bigquery",
    source_config={
        "source_connection_profile": source_connection_profile.id,
        "mysql_source_config": {},
    },
    destination_config={
        "destination_connection_profile": destination_connection_profile2.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "single_target_dataset": {
                "dataset_id": postgres.id,
            },
        },
    },
    backfill_all={})
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		postgres, err := bigquery.NewDataset(ctx, "postgres", &bigquery.DatasetArgs{
			DatasetId:    pulumi.String("postgres"),
			FriendlyName: pulumi.String("postgres"),
			Description:  pulumi.String("Database of postgres"),
			Location:     pulumi.String("us-central1"),
		})
		if err != nil {
			return err
		}
		destinationConnectionProfile2, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile2", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("dest-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("instance-name"),
			DatabaseVersion: pulumi.String("MYSQL_8_0"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
					Enabled:          pulumi.Bool(true),
					BinaryLogEnabled: pulumi.Bool(true),
				},
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("my-user"),
			Instance: instance.Name,
			Host:     pulumi.String("%"),
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Source connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
			},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName: pulumi.String("postgres to bigQuery"),
			Location:    pulumi.String("us-central1"),
			StreamId:    pulumi.String("postgres-bigquery"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: sourceConnectionProfile.ID(),
				MysqlSourceConfig:       &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destinationConnectionProfile2.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SingleTargetDataset: &datastream.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs{
						DatasetId: postgres.ID(),
					},
				},
			},
			BackfillAll: &datastream.StreamBackfillAllArgs{},
		})
		if err != nil {
			return err
		}
		_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var postgres = new Gcp.BigQuery.Dataset("postgres", new()
    {
        DatasetId = "postgres",
        FriendlyName = "postgres",
        Description = "Database of postgres",
        Location = "us-central1",
    });

    var destinationConnectionProfile2 = new Gcp.Datastream.ConnectionProfile("destination_connection_profile2", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "dest-profile",
        BigqueryProfile = null,
    });

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "instance-name",
        DatabaseVersion = "MYSQL_8_0",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
            {
                Enabled = true,
                BinaryLogEnabled = true,
            },
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = false,
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "my-user",
        Instance = instance.Name,
        Host = "%",
        Password = pwd.Result,
    });

    var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
    {
        DisplayName = "Source connection profile",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
        },
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "postgres to bigQuery",
        Location = "us-central1",
        StreamId = "postgres-bigquery",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = sourceConnectionProfile.Id,
            MysqlSourceConfig = null,
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destinationConnectionProfile2.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SingleTargetDataset = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs
                {
                    DatasetId = postgres.Id,
                },
            },
        },
        BackfillAll = null,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var postgres = new Dataset("postgres", DatasetArgs.builder()
            .datasetId("postgres")
            .friendlyName("postgres")
            .description("Database of postgres")
            .location("us-central1")
            .build());

        var destinationConnectionProfile2 = new ConnectionProfile("destinationConnectionProfile2", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("dest-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("instance-name")
            .databaseVersion("MYSQL_8_0")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
                    .enabled(true)
                    .binaryLogEnabled(true)
                    .build())
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(false)
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("my-user")
            .instance(instance.name())
            .host("%")
            .password(pwd.result())
            .build());

        var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Source connection profile")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("postgres to bigQuery")
            .location("us-central1")
            .streamId("postgres-bigquery")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(sourceConnectionProfile.id())
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destinationConnectionProfile2.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .singleTargetDataset(StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs.builder()
                        .datasetId(postgres.id())
                        .build())
                    .build())
                .build())
            .backfillAll(StreamBackfillAllArgs.builder()
                .build())
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

    }
}
resources:
  postgres:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: postgres
      friendlyName: postgres
      description: Database of postgres
      location: us-central1
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: postgres to bigQuery
      location: us-central1
      streamId: postgres-bigquery
      sourceConfig:
        sourceConnectionProfile: ${sourceConnectionProfile.id}
        mysqlSourceConfig: {}
      destinationConfig:
        destinationConnectionProfile: ${destinationConnectionProfile2.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          singleTargetDataset:
            datasetId: ${postgres.id}
      backfillAll: {}
  destinationConnectionProfile2:
    type: gcp:datastream:ConnectionProfile
    name: destination_connection_profile2
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: dest-profile
      bigqueryProfile: {}
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: instance-name
      databaseVersion: MYSQL_8_0
      region: us-central1
      settings:
        tier: db-f1-micro
        backupConfiguration:
          enabled: true
          binaryLogEnabled: true
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: false
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: my-user
      instance: ${instance.name}
      host: '%'
      password: ${pwd.result}
  sourceConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: source_connection_profile
    properties:
      displayName: Source connection profile
      location: us-central1
      connectionProfileId: source-profile
      mysqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}

The singleTargetDataset property directs all replicated tables to one BigQuery dataset, regardless of source schema. This simplifies BigQuery organization when you don’t need schema-level separation.

Use append-only mode for BigQuery writes

Append-only mode writes all changes as new rows rather than updating existing rows, preserving full change history for audit or time-travel queries.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "my-instance",
    databaseVersion: "MYSQL_8_0",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        backupConfiguration: {
            enabled: true,
            binaryLogEnabled: true,
        },
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    host: "%",
    password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
    displayName: "Source connection profile",
    location: "us-central1",
    connectionProfileId: "source-profile",
    mysqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
    },
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    streamId: "my-stream",
    location: "us-central1",
    displayName: "my stream",
    sourceConfig: {
        sourceConnectionProfile: sourceConnectionProfile.id,
        mysqlSourceConfig: {},
    },
    destinationConfig: {
        destinationConnectionProfile: destinationConnectionProfile.id,
        bigqueryDestinationConfig: {
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
            appendOnly: {},
        },
    },
    backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
    name="my-instance",
    database_version="MYSQL_8_0",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "backup_configuration": {
            "enabled": True,
            "binary_log_enabled": True,
        },
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    host="%",
    password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
    display_name="Source connection profile",
    location="us-central1",
    connection_profile_id="source-profile",
    mysql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
    })
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    stream_id="my-stream",
    location="us-central1",
    display_name="my stream",
    source_config={
        "source_connection_profile": source_connection_profile.id,
        "mysql_source_config": {},
    },
    destination_config={
        "destination_connection_profile": destination_connection_profile.id,
        "bigquery_destination_config": {
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
            "append_only": {},
        },
    },
    backfill_none={})
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("my-instance"),
			DatabaseVersion: pulumi.String("MYSQL_8_0"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
					Enabled:          pulumi.Bool(true),
					BinaryLogEnabled: pulumi.Bool(true),
				},
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Host:     pulumi.String("%"),
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Source connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
			},
		})
		if err != nil {
			return err
		}
		destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			StreamId:    pulumi.String("my-stream"),
			Location:    pulumi.String("us-central1"),
			DisplayName: pulumi.String("my stream"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: sourceConnectionProfile.ID(),
				MysqlSourceConfig:       &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destinationConnectionProfile.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
					AppendOnly: &datastream.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs{},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "my-instance",
        DatabaseVersion = "MYSQL_8_0",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
            {
                Enabled = true,
                BinaryLogEnabled = true,
            },
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Host = "%",
        Password = pwd.Result,
    });

    var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
    {
        DisplayName = "Source connection profile",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
        },
    });

    var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        StreamId = "my-stream",
        Location = "us-central1",
        DisplayName = "my stream",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = sourceConnectionProfile.Id,
            MysqlSourceConfig = null,
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destinationConnectionProfile.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
                AppendOnly = null,
            },
        },
        BackfillNone = null,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("my-instance")
            .databaseVersion("MYSQL_8_0")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
                    .enabled(true)
                    .binaryLogEnabled(true)
                    .build())
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(true)
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .host("%")
            .password(pwd.result())
            .build());

        var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Source connection profile")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .build())
            .build());

        var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .streamId("my-stream")
            .location("us-central1")
            .displayName("my stream")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(sourceConnectionProfile.id())
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destinationConnectionProfile.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .appendOnly(StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs.builder()
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: my-instance
      databaseVersion: MYSQL_8_0
      region: us-central1
      settings:
        tier: db-f1-micro
        backupConfiguration:
          enabled: true
          binaryLogEnabled: true
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: true
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      host: '%'
      password: ${pwd.result}
  sourceConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: source_connection_profile
    properties:
      displayName: Source connection profile
      location: us-central1
      connectionProfileId: source-profile
      mysqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}
  destinationConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: destination_connection_profile
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      streamId: my-stream
      location: us-central1
      displayName: my stream
      sourceConfig:
        sourceConnectionProfile: ${sourceConnectionProfile.id}
        mysqlSourceConfig: {}
      destinationConfig:
        destinationConnectionProfile: ${destinationConnectionProfile.id}
        bigqueryDestinationConfig:
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
          appendOnly: {}
      backfillNone: {}
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The appendOnly property in bigqueryDestinationConfig changes write behavior to insert-only, creating a new row for every change event. This preserves complete change history but requires more storage and different query patterns.

Apply clustering and partitioning rules to BigQuery tables

BigQuery performance improves when tables are properly clustered and partitioned. Rule sets let you define these optimizations per source table.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const project = gcp.organizations.getProject({});
const stream = new gcp.datastream.Stream("stream", {
    streamId: "rules-stream",
    location: "us-central1",
    displayName: "BigQuery Stream with Rules",
    sourceConfig: {
        sourceConnectionProfile: "rules-source-profile",
        mysqlSourceConfig: {
            includeObjects: {
                mysqlDatabases: [{
                    database: "my_database",
                }],
            },
            binaryLogPosition: {},
        },
    },
    destinationConfig: {
        destinationConnectionProfile: "rules-dest-profile",
        bigqueryDestinationConfig: {
            singleTargetDataset: {
                datasetId: "rules-project:rules-dataset",
            },
        },
    },
    backfillNone: {},
    ruleSets: [
        {
            objectFilter: {
                sourceObjectIdentifier: {
                    mysqlIdentifier: {
                        database: "test_database",
                        table: "test_table_1",
                    },
                },
            },
            customizationRules: [
                {
                    bigqueryClustering: {
                        columns: ["user_id"],
                    },
                },
                {
                    bigqueryPartitioning: {
                        ingestionTimePartition: {},
                    },
                },
            ],
        },
        {
            objectFilter: {
                sourceObjectIdentifier: {
                    mysqlIdentifier: {
                        database: "test_database",
                        table: "test_table_2",
                    },
                },
            },
            customizationRules: [
                {
                    bigqueryClustering: {
                        columns: ["event_time"],
                    },
                },
                {
                    bigqueryPartitioning: {
                        timeUnitPartition: {
                            column: "event_time",
                            partitioningTimeGranularity: "PARTITIONING_TIME_GRANULARITY_DAY",
                        },
                    },
                },
            ],
        },
    ],
});
import pulumi
import pulumi_gcp as gcp

project = gcp.organizations.get_project()
stream = gcp.datastream.Stream("stream",
    stream_id="rules-stream",
    location="us-central1",
    display_name="BigQuery Stream with Rules",
    source_config={
        "source_connection_profile": "rules-source-profile",
        "mysql_source_config": {
            "include_objects": {
                "mysql_databases": [{
                    "database": "my_database",
                }],
            },
            "binary_log_position": {},
        },
    },
    destination_config={
        "destination_connection_profile": "rules-dest-profile",
        "bigquery_destination_config": {
            "single_target_dataset": {
                "dataset_id": "rules-project:rules-dataset",
            },
        },
    },
    backfill_none={},
    rule_sets=[
        {
            "object_filter": {
                "source_object_identifier": {
                    "mysql_identifier": {
                        "database": "test_database",
                        "table": "test_table_1",
                    },
                },
            },
            "customization_rules": [
                {
                    "bigquery_clustering": {
                        "columns": ["user_id"],
                    },
                },
                {
                    "bigquery_partitioning": {
                        "ingestion_time_partition": {},
                    },
                },
            ],
        },
        {
            "object_filter": {
                "source_object_identifier": {
                    "mysql_identifier": {
                        "database": "test_database",
                        "table": "test_table_2",
                    },
                },
            },
            "customization_rules": [
                {
                    "bigquery_clustering": {
                        "columns": ["event_time"],
                    },
                },
                {
                    "bigquery_partitioning": {
                        "time_unit_partition": {
                            "column": "event_time",
                            "partitioning_time_granularity": "PARTITIONING_TIME_GRANULARITY_DAY",
                        },
                    },
                },
            ],
        },
    ])
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "stream", &datastream.StreamArgs{
			StreamId:    pulumi.String("rules-stream"),
			Location:    pulumi.String("us-central1"),
			DisplayName: pulumi.String("BigQuery Stream with Rules"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: pulumi.String("rules-source-profile"),
				MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{
					IncludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs{
						MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArray{
							&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs{
								Database: pulumi.String("my_database"),
							},
						},
					},
					BinaryLogPosition: &datastream.StreamSourceConfigMysqlSourceConfigBinaryLogPositionArgs{},
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: pulumi.String("rules-dest-profile"),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					SingleTargetDataset: &datastream.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs{
						DatasetId: pulumi.String("rules-project:rules-dataset"),
					},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
			RuleSets: datastream.StreamRuleSetArray{
				&datastream.StreamRuleSetArgs{
					ObjectFilter: &datastream.StreamRuleSetObjectFilterArgs{
						SourceObjectIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierArgs{
							MysqlIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs{
								Database: pulumi.String("test_database"),
								Table:    pulumi.String("test_table_1"),
							},
						},
					},
					CustomizationRules: datastream.StreamRuleSetCustomizationRuleArray{
						&datastream.StreamRuleSetCustomizationRuleArgs{
							BigqueryClustering: &datastream.StreamRuleSetCustomizationRuleBigqueryClusteringArgs{
								Columns: pulumi.StringArray{
									pulumi.String("user_id"),
								},
							},
						},
						&datastream.StreamRuleSetCustomizationRuleArgs{
							BigqueryPartitioning: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs{
								IngestionTimePartition: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningIngestionTimePartitionArgs{},
							},
						},
					},
				},
				&datastream.StreamRuleSetArgs{
					ObjectFilter: &datastream.StreamRuleSetObjectFilterArgs{
						SourceObjectIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierArgs{
							MysqlIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs{
								Database: pulumi.String("test_database"),
								Table:    pulumi.String("test_table_2"),
							},
						},
					},
					CustomizationRules: datastream.StreamRuleSetCustomizationRuleArray{
						&datastream.StreamRuleSetCustomizationRuleArgs{
							BigqueryClustering: &datastream.StreamRuleSetCustomizationRuleBigqueryClusteringArgs{
								Columns: pulumi.StringArray{
									pulumi.String("event_time"),
								},
							},
						},
						&datastream.StreamRuleSetCustomizationRuleArgs{
							BigqueryPartitioning: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs{
								TimeUnitPartition: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningTimeUnitPartitionArgs{
									Column:                      pulumi.String("event_time"),
									PartitioningTimeGranularity: pulumi.String("PARTITIONING_TIME_GRANULARITY_DAY"),
								},
							},
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var stream = new Gcp.Datastream.Stream("stream", new()
    {
        StreamId = "rules-stream",
        Location = "us-central1",
        DisplayName = "BigQuery Stream with Rules",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = "rules-source-profile",
            MysqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigArgs
            {
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs
                {
                    MysqlDatabases = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs
                        {
                            Database = "my_database",
                        },
                    },
                },
                BinaryLogPosition = null,
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = "rules-dest-profile",
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                SingleTargetDataset = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs
                {
                    DatasetId = "rules-project:rules-dataset",
                },
            },
        },
        BackfillNone = null,
        RuleSets = new[]
        {
            new Gcp.Datastream.Inputs.StreamRuleSetArgs
            {
                ObjectFilter = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterArgs
                {
                    SourceObjectIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierArgs
                    {
                        MysqlIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs
                        {
                            Database = "test_database",
                            Table = "test_table_1",
                        },
                    },
                },
                CustomizationRules = new[]
                {
                    new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
                    {
                        BigqueryClustering = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryClusteringArgs
                        {
                            Columns = new[]
                            {
                                "user_id",
                            },
                        },
                    },
                    new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
                    {
                        BigqueryPartitioning = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs
                        {
                            IngestionTimePartition = null,
                        },
                    },
                },
            },
            new Gcp.Datastream.Inputs.StreamRuleSetArgs
            {
                ObjectFilter = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterArgs
                {
                    SourceObjectIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierArgs
                    {
                        MysqlIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs
                        {
                            Database = "test_database",
                            Table = "test_table_2",
                        },
                    },
                },
                CustomizationRules = new[]
                {
                    new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
                    {
                        BigqueryClustering = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryClusteringArgs
                        {
                            Columns = new[]
                            {
                                "event_time",
                            },
                        },
                    },
                    new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
                    {
                        BigqueryPartitioning = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs
                        {
                            TimeUnitPartition = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryPartitioningTimeUnitPartitionArgs
                            {
                                Column = "event_time",
                                PartitioningTimeGranularity = "PARTITIONING_TIME_GRANULARITY_DAY",
                            },
                        },
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigBinaryLogPositionArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetObjectFilterArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetObjectFilterSourceObjectIdentifierArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var stream = new Stream("stream", StreamArgs.builder()
            .streamId("rules-stream")
            .location("us-central1")
            .displayName("BigQuery Stream with Rules")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile("rules-source-profile")
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .includeObjects(StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs.builder()
                        .mysqlDatabases(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs.builder()
                            .database("my_database")
                            .build())
                        .build())
                    .binaryLogPosition(StreamSourceConfigMysqlSourceConfigBinaryLogPositionArgs.builder()
                        .build())
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile("rules-dest-profile")
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .singleTargetDataset(StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs.builder()
                        .datasetId("rules-project:rules-dataset")
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .ruleSets(            
                StreamRuleSetArgs.builder()
                    .objectFilter(StreamRuleSetObjectFilterArgs.builder()
                        .sourceObjectIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierArgs.builder()
                            .mysqlIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs.builder()
                                .database("test_database")
                                .table("test_table_1")
                                .build())
                            .build())
                        .build())
                    .customizationRules(                    
                        StreamRuleSetCustomizationRuleArgs.builder()
                            .bigqueryClustering(StreamRuleSetCustomizationRuleBigqueryClusteringArgs.builder()
                                .columns("user_id")
                                .build())
                            .build(),
                        StreamRuleSetCustomizationRuleArgs.builder()
                            .bigqueryPartitioning(StreamRuleSetCustomizationRuleBigqueryPartitioningArgs.builder()
                                .ingestionTimePartition(StreamRuleSetCustomizationRuleBigqueryPartitioningIngestionTimePartitionArgs.builder()
                                    .build())
                                .build())
                            .build())
                    .build(),
                StreamRuleSetArgs.builder()
                    .objectFilter(StreamRuleSetObjectFilterArgs.builder()
                        .sourceObjectIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierArgs.builder()
                            .mysqlIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs.builder()
                                .database("test_database")
                                .table("test_table_2")
                                .build())
                            .build())
                        .build())
                    .customizationRules(                    
                        StreamRuleSetCustomizationRuleArgs.builder()
                            .bigqueryClustering(StreamRuleSetCustomizationRuleBigqueryClusteringArgs.builder()
                                .columns("event_time")
                                .build())
                            .build(),
                        StreamRuleSetCustomizationRuleArgs.builder()
                            .bigqueryPartitioning(StreamRuleSetCustomizationRuleBigqueryPartitioningArgs.builder()
                                .timeUnitPartition(StreamRuleSetCustomizationRuleBigqueryPartitioningTimeUnitPartitionArgs.builder()
                                    .column("event_time")
                                    .partitioningTimeGranularity("PARTITIONING_TIME_GRANULARITY_DAY")
                                    .build())
                                .build())
                            .build())
                    .build())
            .build());

    }
}
resources:
  stream:
    type: gcp:datastream:Stream
    properties:
      streamId: rules-stream
      location: us-central1
      displayName: BigQuery Stream with Rules
      sourceConfig:
        sourceConnectionProfile: rules-source-profile
        mysqlSourceConfig:
          includeObjects:
            mysqlDatabases:
              - database: my_database
          binaryLogPosition: {}
      destinationConfig:
        destinationConnectionProfile: rules-dest-profile
        bigqueryDestinationConfig:
          singleTargetDataset:
            datasetId: rules-project:rules-dataset
      backfillNone: {}
      ruleSets:
        - objectFilter:
            sourceObjectIdentifier:
              mysqlIdentifier:
                database: test_database
                table: test_table_1
          customizationRules:
            - bigqueryClustering:
                columns:
                  - user_id
            - bigqueryPartitioning:
                ingestionTimePartition: {}
        - objectFilter:
            sourceObjectIdentifier:
              mysqlIdentifier:
                database: test_database
                table: test_table_2
          customizationRules:
            - bigqueryClustering:
                columns:
                  - event_time
            - bigqueryPartitioning:
                timeUnitPartition:
                  column: event_time
                  partitioningTimeGranularity: PARTITIONING_TIME_GRANULARITY_DAY
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The ruleSets property defines per-table customizations. Each rule set has an objectFilter that identifies the source table and customizationRules that specify BigQuery clustering columns and partitioning strategy. Clustering improves query performance for filtered queries, while partitioning reduces costs by limiting data scanned.

Beyond these examples

These snippets focus on specific stream-level features: source database configuration for MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB; destination configuration for Cloud Storage and BigQuery; and filtering, backfill strategies, and encryption. They’re intentionally minimal rather than full replication pipelines.

The examples reference pre-existing infrastructure such as connection profiles for sources and destinations, Cloud SQL instances or external databases with replication enabled, GCS buckets or BigQuery datasets, KMS keys for encryption, and IAM permissions for the Datastream service account. They focus on configuring the stream rather than provisioning the surrounding infrastructure.

To keep things focused, common stream patterns are omitted, including:

  • Connection profile creation (referenced but not shown)
  • Stream state management (desiredState property)
  • Private connectivity configuration
  • Stream validation (createWithoutValidation)

These omissions are intentional: the goal is to illustrate how each stream feature is wired, not provide drop-in replication modules. See the Datastream Stream resource reference for all available configuration options.

Let's create GCP Datastream Streams

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Immutability & Configuration Constraints
What properties can't I change after creating a stream?
Five properties are immutable: location, streamId, project, customerManagedEncryptionKey, and createWithoutValidation. Changing any of these requires replacing the stream.
Can I skip validation when creating a stream?
Yes, set createWithoutValidation to true. This property is immutable and must be set at creation time.
Stream Lifecycle & State Management
How do I control the stream lifecycle?
Use desiredState with three values: NOT_STARTED (default, creates without starting), RUNNING (starts the stream), or PAUSED (pauses from RUNNING state).
Backfill Strategies
What's the difference between backfillAll and backfillNone?
These are mutually exclusive strategies. backfillAll automatically backfills the stream’s objects (with optional exclusions), while backfillNone disables automatic backfill entirely.
Source Configuration
What source databases are supported?
MySQL, PostgreSQL, Oracle, SQL Server, and MongoDB. Each uses a specific config: mysqlSourceConfig, postgresqlSourceConfig, oracleSourceConfig, sqlServerSourceConfig, or mongodbSourceConfig.
What's the difference between transactionLogs and changeTables for SQL Server?
Two CDC methods: transactionLogs captures changes from transaction logs, while changeTables uses SQL Server Change Tracking. Configure one in sqlServerSourceConfig.
How do I configure PostgreSQL replication?
Set publication and replicationSlot in postgresqlSourceConfig. For example: publication: "publication", replicationSlot: "replication_slot".
Destination Configuration
What are the BigQuery destination options?
Two options: sourceHierarchyDatasets creates datasets matching source hierarchy, or singleTargetDataset writes all data to one dataset. Configure in bigqueryDestinationConfig.
Can I customize BigQuery table structure?
Yes, use ruleSets to configure clustering and partitioning. Specify bigqueryClustering with columns and bigqueryPartitioning with ingestionTimePartition or timeUnitPartition.
Security & Permissions
What IAM roles does the Datastream service account need for GCS destinations?
Grant three roles on the destination bucket: roles/storage.objectViewer, roles/storage.objectCreator, and roles/storage.legacyBucketReader.
How do I use customer-managed encryption with BigQuery?
Set customerManagedEncryptionKey on the stream and kmsKeyName in datasetTemplate. Grant roles/cloudkms.cryptoKeyEncrypterDecrypter to the BigQuery service account, and use dependsOn to ensure IAM binding completes first.

Using a different cloud?

Explore analytics guides for other cloud providers: