Create GCP Datastream Streams

The gcp:datastream/stream:Stream resource, part of the Pulumi GCP provider, defines a Datastream replication stream that continuously moves data from source databases to destinations like BigQuery or Cloud Storage. This guide focuses on four capabilities: source database configuration for MySQL, PostgreSQL, and SQL Server; BigQuery and Cloud Storage destinations; CDC mechanisms and filtering; and BigLake Managed Tables.

Streams depend on connection profiles for both source and destination endpoints. The source database must have CDC enabled through binary logs, replication slots, or change tracking. The examples are intentionally small. Combine them with your own connection profiles, IAM configuration, and monitoring.

Stream MySQL to Cloud Storage with filtering and encryption

Data pipelines often replicate MySQL databases to Cloud Storage for analytics, applying table and column filters to control what gets replicated.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "my-instance",
    databaseVersion: "MYSQL_8_0",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        backupConfiguration: {
            enabled: true,
            binaryLogEnabled: true,
        },
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    host: "%",
    password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
    displayName: "Source connection profile",
    location: "us-central1",
    connectionProfileId: "source-profile",
    mysqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
    },
});
const bucket = new gcp.storage.Bucket("bucket", {
    name: "my-bucket",
    location: "US",
    uniformBucketLevelAccess: true,
});
const viewer = new gcp.storage.BucketIAMMember("viewer", {
    bucket: bucket.name,
    role: "roles/storage.objectViewer",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const creator = new gcp.storage.BucketIAMMember("creator", {
    bucket: bucket.name,
    role: "roles/storage.objectCreator",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const reader = new gcp.storage.BucketIAMMember("reader", {
    bucket: bucket.name,
    role: "roles/storage.legacyBucketReader",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const keyUser = new gcp.kms.CryptoKeyIAMMember("key_user", {
    cryptoKeyId: "kms-name",
    role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    gcsProfile: {
        bucket: bucket.name,
        rootPath: "/path",
    },
});
const _default = new gcp.datastream.Stream("default", {
    streamId: "my-stream",
    desiredState: "NOT_STARTED",
    location: "us-central1",
    displayName: "my stream",
    labels: {
        key: "value",
    },
    sourceConfig: {
        sourceConnectionProfile: sourceConnectionProfile.id,
        mysqlSourceConfig: {
            includeObjects: {
                mysqlDatabases: [{
                    database: "my-database",
                    mysqlTables: [
                        {
                            table: "includedTable",
                            mysqlColumns: [{
                                column: "includedColumn",
                                dataType: "VARCHAR",
                                collation: "utf8mb4",
                                primaryKey: false,
                                nullable: false,
                                ordinalPosition: 0,
                            }],
                        },
                        {
                            table: "includedTable_2",
                        },
                    ],
                }],
            },
            excludeObjects: {
                mysqlDatabases: [{
                    database: "my-database",
                    mysqlTables: [{
                        table: "excludedTable",
                        mysqlColumns: [{
                            column: "excludedColumn",
                            dataType: "VARCHAR",
                            collation: "utf8mb4",
                            primaryKey: false,
                            nullable: false,
                            ordinalPosition: 0,
                        }],
                    }],
                }],
            },
            maxConcurrentCdcTasks: 5,
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destinationConnectionProfile.id,
        gcsDestinationConfig: {
            path: "mydata",
            fileRotationMb: 200,
            fileRotationInterval: "60s",
            jsonFileFormat: {
                schemaFileFormat: "NO_SCHEMA_FILE",
                compression: "GZIP",
            },
        },
    },
    backfillAll: {
        mysqlExcludedObjects: {
            mysqlDatabases: [{
                database: "my-database",
                mysqlTables: [{
                    table: "excludedTable",
                    mysqlColumns: [{
                        column: "excludedColumn",
                        dataType: "VARCHAR",
                        collation: "utf8mb4",
                        primaryKey: false,
                        nullable: false,
                        ordinalPosition: 0,
                    }],
                }],
            }],
        },
    },
    customerManagedEncryptionKey: "kms-name",
}, {
    dependsOn: [keyUser],
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
    name="my-instance",
    database_version="MYSQL_8_0",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "backup_configuration": {
            "enabled": True,
            "binary_log_enabled": True,
        },
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    host="%",
    password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
    display_name="Source connection profile",
    location="us-central1",
    connection_profile_id="source-profile",
    mysql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
    })
bucket = gcp.storage.Bucket("bucket",
    name="my-bucket",
    location="US",
    uniform_bucket_level_access=True)
viewer = gcp.storage.BucketIAMMember("viewer",
    bucket=bucket.name,
    role="roles/storage.objectViewer",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
creator = gcp.storage.BucketIAMMember("creator",
    bucket=bucket.name,
    role="roles/storage.objectCreator",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
reader = gcp.storage.BucketIAMMember("reader",
    bucket=bucket.name,
    role="roles/storage.legacyBucketReader",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
key_user = gcp.kms.CryptoKeyIAMMember("key_user",
    crypto_key_id="kms-name",
    role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="destination-profile",
    gcs_profile={
        "bucket": bucket.name,
        "root_path": "/path",
    })
default = gcp.datastream.Stream("default",
    stream_id="my-stream",
    desired_state="NOT_STARTED",
    location="us-central1",
    display_name="my stream",
    labels={
        "key": "value",
    },
    source_config={
        "source_connection_profile": source_connection_profile.id,
        "mysql_source_config": {
            "include_objects": {
                "mysql_databases": [{
                    "database": "my-database",
                    "mysql_tables": [
                        {
                            "table": "includedTable",
                            "mysql_columns": [{
                                "column": "includedColumn",
                                "data_type": "VARCHAR",
                                "collation": "utf8mb4",
                                "primary_key": False,
                                "nullable": False,
                                "ordinal_position": 0,
                            }],
                        },
                        {
                            "table": "includedTable_2",
                        },
                    ],
                }],
            },
            "exclude_objects": {
                "mysql_databases": [{
                    "database": "my-database",
                    "mysql_tables": [{
                        "table": "excludedTable",
                        "mysql_columns": [{
                            "column": "excludedColumn",
                            "data_type": "VARCHAR",
                            "collation": "utf8mb4",
                            "primary_key": False,
                            "nullable": False,
                            "ordinal_position": 0,
                        }],
                    }],
                }],
            },
            "max_concurrent_cdc_tasks": 5,
        },
    },
    destination_config={
        "destination_connection_profile": destination_connection_profile.id,
        "gcs_destination_config": {
            "path": "mydata",
            "file_rotation_mb": 200,
            "file_rotation_interval": "60s",
            "json_file_format": {
                "schema_file_format": "NO_SCHEMA_FILE",
                "compression": "GZIP",
            },
        },
    },
    backfill_all={
        "mysql_excluded_objects": {
            "mysql_databases": [{
                "database": "my-database",
                "mysql_tables": [{
                    "table": "excludedTable",
                    "mysql_columns": [{
                        "column": "excludedColumn",
                        "data_type": "VARCHAR",
                        "collation": "utf8mb4",
                        "primary_key": False,
                        "nullable": False,
                        "ordinal_position": 0,
                    }],
                }],
            }],
        },
    },
    customer_managed_encryption_key="kms-name",
    opts = pulumi.ResourceOptions(depends_on=[key_user]))
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/kms"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("my-instance"),
			DatabaseVersion: pulumi.String("MYSQL_8_0"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
					Enabled:          pulumi.Bool(true),
					BinaryLogEnabled: pulumi.Bool(true),
				},
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Host:     pulumi.String("%"),
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Source connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
			},
		})
		if err != nil {
			return err
		}
		bucket, err := storage.NewBucket(ctx, "bucket", &storage.BucketArgs{
			Name:                     pulumi.String("my-bucket"),
			Location:                 pulumi.String("US"),
			UniformBucketLevelAccess: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = storage.NewBucketIAMMember(ctx, "viewer", &storage.BucketIAMMemberArgs{
			Bucket: bucket.Name,
			Role:   pulumi.String("roles/storage.objectViewer"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		_, err = storage.NewBucketIAMMember(ctx, "creator", &storage.BucketIAMMemberArgs{
			Bucket: bucket.Name,
			Role:   pulumi.String("roles/storage.objectCreator"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		_, err = storage.NewBucketIAMMember(ctx, "reader", &storage.BucketIAMMemberArgs{
			Bucket: bucket.Name,
			Role:   pulumi.String("roles/storage.legacyBucketReader"),
			Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		keyUser, err := kms.NewCryptoKeyIAMMember(ctx, "key_user", &kms.CryptoKeyIAMMemberArgs{
			CryptoKeyId: pulumi.String("kms-name"),
			Role:        pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
			Member:      pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			GcsProfile: &datastream.ConnectionProfileGcsProfileArgs{
				Bucket:   bucket.Name,
				RootPath: pulumi.String("/path"),
			},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			StreamId:     pulumi.String("my-stream"),
			DesiredState: pulumi.String("NOT_STARTED"),
			Location:     pulumi.String("us-central1"),
			DisplayName:  pulumi.String("my stream"),
			Labels: pulumi.StringMap{
				"key": pulumi.String("value"),
			},
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: sourceConnectionProfile.ID(),
				MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{
					IncludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs{
						MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArray{
							&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs{
								Database: pulumi.String("my-database"),
								MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArray{
									&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
										Table: pulumi.String("includedTable"),
										MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
											&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
												Column:          pulumi.String("includedColumn"),
												DataType:        pulumi.String("VARCHAR"),
												Collation:       pulumi.String("utf8mb4"),
												PrimaryKey:      pulumi.Bool(false),
												Nullable:        pulumi.Bool(false),
												OrdinalPosition: pulumi.Int(0),
											},
										},
									},
									&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
										Table: pulumi.String("includedTable_2"),
									},
								},
							},
						},
					},
					ExcludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs{
						MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArray{
							&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs{
								Database: pulumi.String("my-database"),
								MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArray{
									&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs{
										Table: pulumi.String("excludedTable"),
										MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
											&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
												Column:          pulumi.String("excludedColumn"),
												DataType:        pulumi.String("VARCHAR"),
												Collation:       pulumi.String("utf8mb4"),
												PrimaryKey:      pulumi.Bool(false),
												Nullable:        pulumi.Bool(false),
												OrdinalPosition: pulumi.Int(0),
											},
										},
									},
								},
							},
						},
					},
					MaxConcurrentCdcTasks: pulumi.Int(5),
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destinationConnectionProfile.ID(),
				GcsDestinationConfig: &datastream.StreamDestinationConfigGcsDestinationConfigArgs{
					Path:                 pulumi.String("mydata"),
					FileRotationMb:       pulumi.Int(200),
					FileRotationInterval: pulumi.String("60s"),
					JsonFileFormat: &datastream.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs{
						SchemaFileFormat: pulumi.String("NO_SCHEMA_FILE"),
						Compression:      pulumi.String("GZIP"),
					},
				},
			},
			BackfillAll: &datastream.StreamBackfillAllArgs{
				MysqlExcludedObjects: &datastream.StreamBackfillAllMysqlExcludedObjectsArgs{
					MysqlDatabases: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArray{
						&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs{
							Database: pulumi.String("my-database"),
							MysqlTables: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArray{
								&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs{
									Table: pulumi.String("excludedTable"),
									MysqlColumns: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
										&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
											Column:          pulumi.String("excludedColumn"),
											DataType:        pulumi.String("VARCHAR"),
											Collation:       pulumi.String("utf8mb4"),
											PrimaryKey:      pulumi.Bool(false),
											Nullable:        pulumi.Bool(false),
											OrdinalPosition: pulumi.Int(0),
										},
									},
								},
							},
						},
					},
				},
			},
			CustomerManagedEncryptionKey: pulumi.String("kms-name"),
		}, pulumi.DependsOn([]pulumi.Resource{
			keyUser,
		}))
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "my-instance",
        DatabaseVersion = "MYSQL_8_0",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
            {
                Enabled = true,
                BinaryLogEnabled = true,
            },
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Host = "%",
        Password = pwd.Result,
    });

    var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
    {
        DisplayName = "Source connection profile",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
        },
    });

    var bucket = new Gcp.Storage.Bucket("bucket", new()
    {
        Name = "my-bucket",
        Location = "US",
        UniformBucketLevelAccess = true,
    });

    var viewer = new Gcp.Storage.BucketIAMMember("viewer", new()
    {
        Bucket = bucket.Name,
        Role = "roles/storage.objectViewer",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var creator = new Gcp.Storage.BucketIAMMember("creator", new()
    {
        Bucket = bucket.Name,
        Role = "roles/storage.objectCreator",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var reader = new Gcp.Storage.BucketIAMMember("reader", new()
    {
        Bucket = bucket.Name,
        Role = "roles/storage.legacyBucketReader",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var keyUser = new Gcp.Kms.CryptoKeyIAMMember("key_user", new()
    {
        CryptoKeyId = "kms-name",
        Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
    });

    var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        GcsProfile = new Gcp.Datastream.Inputs.ConnectionProfileGcsProfileArgs
        {
            Bucket = bucket.Name,
            RootPath = "/path",
        },
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        StreamId = "my-stream",
        DesiredState = "NOT_STARTED",
        Location = "us-central1",
        DisplayName = "my stream",
        Labels = 
        {
            { "key", "value" },
        },
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = sourceConnectionProfile.Id,
            MysqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigArgs
            {
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs
                {
                    MysqlDatabases = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs
                        {
                            Database = "my-database",
                            MysqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
                                {
                                    Table = "includedTable",
                                    MysqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
                                        {
                                            Column = "includedColumn",
                                            DataType = "VARCHAR",
                                            Collation = "utf8mb4",
                                            PrimaryKey = false,
                                            Nullable = false,
                                            OrdinalPosition = 0,
                                        },
                                    },
                                },
                                new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
                                {
                                    Table = "includedTable_2",
                                },
                            },
                        },
                    },
                },
                ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs
                {
                    MysqlDatabases = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs
                        {
                            Database = "my-database",
                            MysqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs
                                {
                                    Table = "excludedTable",
                                    MysqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
                                        {
                                            Column = "excludedColumn",
                                            DataType = "VARCHAR",
                                            Collation = "utf8mb4",
                                            PrimaryKey = false,
                                            Nullable = false,
                                            OrdinalPosition = 0,
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
                MaxConcurrentCdcTasks = 5,
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destinationConnectionProfile.Id,
            GcsDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigArgs
            {
                Path = "mydata",
                FileRotationMb = 200,
                FileRotationInterval = "60s",
                JsonFileFormat = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs
                {
                    SchemaFileFormat = "NO_SCHEMA_FILE",
                    Compression = "GZIP",
                },
            },
        },
        BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
        {
            MysqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsArgs
            {
                MysqlDatabases = new[]
                {
                    new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs
                    {
                        Database = "my-database",
                        MysqlTables = new[]
                        {
                            new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs
                            {
                                Table = "excludedTable",
                                MysqlColumns = new[]
                                {
                                    new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
                                    {
                                        Column = "excludedColumn",
                                        DataType = "VARCHAR",
                                        Collation = "utf8mb4",
                                        PrimaryKey = false,
                                        Nullable = false,
                                        OrdinalPosition = 0,
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
        CustomerManagedEncryptionKey = "kms-name",
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            keyUser,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.kms.CryptoKeyIAMMember;
import com.pulumi.gcp.kms.CryptoKeyIAMMemberArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileGcsProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllMysqlExcludedObjectsArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("my-instance")
            .databaseVersion("MYSQL_8_0")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
                    .enabled(true)
                    .binaryLogEnabled(true)
                    .build())
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(true)
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .host("%")
            .password(pwd.result())
            .build());

        var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Source connection profile")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .build())
            .build());

        var bucket = new Bucket("bucket", BucketArgs.builder()
            .name("my-bucket")
            .location("US")
            .uniformBucketLevelAccess(true)
            .build());

        var viewer = new BucketIAMMember("viewer", BucketIAMMemberArgs.builder()
            .bucket(bucket.name())
            .role("roles/storage.objectViewer")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var creator = new BucketIAMMember("creator", BucketIAMMemberArgs.builder()
            .bucket(bucket.name())
            .role("roles/storage.objectCreator")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var reader = new BucketIAMMember("reader", BucketIAMMemberArgs.builder()
            .bucket(bucket.name())
            .role("roles/storage.legacyBucketReader")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var keyUser = new CryptoKeyIAMMember("keyUser", CryptoKeyIAMMemberArgs.builder()
            .cryptoKeyId("kms-name")
            .role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
            .build());

        var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .gcsProfile(ConnectionProfileGcsProfileArgs.builder()
                .bucket(bucket.name())
                .rootPath("/path")
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .streamId("my-stream")
            .desiredState("NOT_STARTED")
            .location("us-central1")
            .displayName("my stream")
            .labels(Map.of("key", "value"))
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(sourceConnectionProfile.id())
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .includeObjects(StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs.builder()
                        .mysqlDatabases(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs.builder()
                            .database("my-database")
                            .mysqlTables(                            
                                StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
                                    .table("includedTable")
                                    .mysqlColumns(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
                                        .column("includedColumn")
                                        .dataType("VARCHAR")
                                        .collation("utf8mb4")
                                        .primaryKey(false)
                                        .nullable(false)
                                        .ordinalPosition(0)
                                        .build())
                                    .build(),
                                StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
                                    .table("includedTable_2")
                                    .build())
                            .build())
                        .build())
                    .excludeObjects(StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs.builder()
                        .mysqlDatabases(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs.builder()
                            .database("my-database")
                            .mysqlTables(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs.builder()
                                .table("excludedTable")
                                .mysqlColumns(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
                                    .column("excludedColumn")
                                    .dataType("VARCHAR")
                                    .collation("utf8mb4")
                                    .primaryKey(false)
                                    .nullable(false)
                                    .ordinalPosition(0)
                                    .build())
                                .build())
                            .build())
                        .build())
                    .maxConcurrentCdcTasks(5)
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destinationConnectionProfile.id())
                .gcsDestinationConfig(StreamDestinationConfigGcsDestinationConfigArgs.builder()
                    .path("mydata")
                    .fileRotationMb(200)
                    .fileRotationInterval("60s")
                    .jsonFileFormat(StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs.builder()
                        .schemaFileFormat("NO_SCHEMA_FILE")
                        .compression("GZIP")
                        .build())
                    .build())
                .build())
            .backfillAll(StreamBackfillAllArgs.builder()
                .mysqlExcludedObjects(StreamBackfillAllMysqlExcludedObjectsArgs.builder()
                    .mysqlDatabases(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs.builder()
                        .database("my-database")
                        .mysqlTables(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs.builder()
                            .table("excludedTable")
                            .mysqlColumns(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
                                .column("excludedColumn")
                                .dataType("VARCHAR")
                                .collation("utf8mb4")
                                .primaryKey(false)
                                .nullable(false)
                                .ordinalPosition(0)
                                .build())
                            .build())
                        .build())
                    .build())
                .build())
            .customerManagedEncryptionKey("kms-name")
            .build(), CustomResourceOptions.builder()
                .dependsOn(keyUser)
                .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: my-instance
      databaseVersion: MYSQL_8_0
      region: us-central1
      settings:
        tier: db-f1-micro
        backupConfiguration:
          enabled: true
          binaryLogEnabled: true
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: true
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      host: '%'
      password: ${pwd.result}
  sourceConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: source_connection_profile
    properties:
      displayName: Source connection profile
      location: us-central1
      connectionProfileId: source-profile
      mysqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}
  bucket:
    type: gcp:storage:Bucket
    properties:
      name: my-bucket
      location: US
      uniformBucketLevelAccess: true
  viewer:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${bucket.name}
      role: roles/storage.objectViewer
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  creator:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${bucket.name}
      role: roles/storage.objectCreator
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  reader:
    type: gcp:storage:BucketIAMMember
    properties:
      bucket: ${bucket.name}
      role: roles/storage.legacyBucketReader
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  keyUser:
    type: gcp:kms:CryptoKeyIAMMember
    name: key_user
    properties:
      cryptoKeyId: kms-name
      role: roles/cloudkms.cryptoKeyEncrypterDecrypter
      member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
  destinationConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: destination_connection_profile
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: destination-profile
      gcsProfile:
        bucket: ${bucket.name}
        rootPath: /path
  default:
    type: gcp:datastream:Stream
    properties:
      streamId: my-stream
      desiredState: NOT_STARTED
      location: us-central1
      displayName: my stream
      labels:
        key: value
      sourceConfig:
        sourceConnectionProfile: ${sourceConnectionProfile.id}
        mysqlSourceConfig:
          includeObjects:
            mysqlDatabases:
              - database: my-database
                mysqlTables:
                  - table: includedTable
                    mysqlColumns:
                      - column: includedColumn
                        dataType: VARCHAR
                        collation: utf8mb4
                        primaryKey: false
                        nullable: false
                        ordinalPosition: 0
                  - table: includedTable_2
          excludeObjects:
            mysqlDatabases:
              - database: my-database
                mysqlTables:
                  - table: excludedTable
                    mysqlColumns:
                      - column: excludedColumn
                        dataType: VARCHAR
                        collation: utf8mb4
                        primaryKey: false
                        nullable: false
                        ordinalPosition: 0
          maxConcurrentCdcTasks: 5
      destinationConfig:
        destinationConnectionProfile: ${destinationConnectionProfile.id}
        gcsDestinationConfig:
          path: mydata
          fileRotationMb: 200
          fileRotationInterval: 60s
          jsonFileFormat:
            schemaFileFormat: NO_SCHEMA_FILE
            compression: GZIP
      backfillAll:
        mysqlExcludedObjects:
          mysqlDatabases:
            - database: my-database
              mysqlTables:
                - table: excludedTable
                  mysqlColumns:
                    - column: excludedColumn
                      dataType: VARCHAR
                      collation: utf8mb4
                      primaryKey: false
                      nullable: false
                      ordinalPosition: 0
      customerManagedEncryptionKey: kms-name
    options:
      dependsOn:
        - ${keyUser}
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The sourceConfig references a MySQL connection profile and defines includeObjects and excludeObjects to filter tables and columns. The destinationConfig points to a GCS connection profile and configures file rotation and JSON formatting. The backfillAll property controls initial data load, while customerManagedEncryptionKey enables KMS encryption. The maxConcurrentCdcTasks property controls parallelism for change data capture.

Stream PostgreSQL to BigQuery with replication slots

PostgreSQL replication to BigQuery enables real-time analytics on operational data using publication and replication slot mechanisms.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const source = new gcp.datastream.ConnectionProfile("source", {
    displayName: "Postgresql Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    postgresqlProfile: {
        hostname: "hostname",
        port: 5432,
        username: "user",
        password: "pass",
        database: "postgres",
    },
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
    displayName: "BigQuery Destination",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "Postgres to BigQuery",
    location: "us-central1",
    streamId: "my-stream",
    desiredState: "RUNNING",
    sourceConfig: {
        sourceConnectionProfile: source.id,
        postgresqlSourceConfig: {
            maxConcurrentBackfillTasks: 12,
            publication: "publication",
            replicationSlot: "replication_slot",
            includeObjects: {
                postgresqlSchemas: [{
                    schema: "schema",
                    postgresqlTables: [{
                        table: "table",
                        postgresqlColumns: [{
                            column: "column",
                        }],
                    }],
                }],
            },
            excludeObjects: {
                postgresqlSchemas: [{
                    schema: "schema",
                    postgresqlTables: [{
                        table: "table",
                        postgresqlColumns: [{
                            column: "column",
                        }],
                    }],
                }],
            },
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destination.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
        },
    },
    backfillAll: {
        postgresqlExcludedObjects: {
            postgresqlSchemas: [{
                schema: "schema",
                postgresqlTables: [{
                    table: "table",
                    postgresqlColumns: [{
                        column: "column",
                    }],
                }],
            }],
        },
    },
});
import pulumi
import pulumi_gcp as gcp

source = gcp.datastream.ConnectionProfile("source",
    display_name="Postgresql Source",
    location="us-central1",
    connection_profile_id="source-profile",
    postgresql_profile={
        "hostname": "hostname",
        "port": 5432,
        "username": "user",
        "password": "pass",
        "database": "postgres",
    })
destination = gcp.datastream.ConnectionProfile("destination",
    display_name="BigQuery Destination",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    display_name="Postgres to BigQuery",
    location="us-central1",
    stream_id="my-stream",
    desired_state="RUNNING",
    source_config={
        "source_connection_profile": source.id,
        "postgresql_source_config": {
            "max_concurrent_backfill_tasks": 12,
            "publication": "publication",
            "replication_slot": "replication_slot",
            "include_objects": {
                "postgresql_schemas": [{
                    "schema": "schema",
                    "postgresql_tables": [{
                        "table": "table",
                        "postgresql_columns": [{
                            "column": "column",
                        }],
                    }],
                }],
            },
            "exclude_objects": {
                "postgresql_schemas": [{
                    "schema": "schema",
                    "postgresql_tables": [{
                        "table": "table",
                        "postgresql_columns": [{
                            "column": "column",
                        }],
                    }],
                }],
            },
        },
    },
    destination_config={
        "destination_connection_profile": destination.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
        },
    },
    backfill_all={
        "postgresql_excluded_objects": {
            "postgresql_schemas": [{
                "schema": "schema",
                "postgresql_tables": [{
                    "table": "table",
                    "postgresql_columns": [{
                        "column": "column",
                    }],
                }],
            }],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Postgresql Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			PostgresqlProfile: &datastream.ConnectionProfilePostgresqlProfileArgs{
				Hostname: pulumi.String("hostname"),
				Port:     pulumi.Int(5432),
				Username: pulumi.String("user"),
				Password: pulumi.String("pass"),
				Database: pulumi.String("postgres"),
			},
		})
		if err != nil {
			return err
		}
		destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("BigQuery Destination"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName:  pulumi.String("Postgres to BigQuery"),
			Location:     pulumi.String("us-central1"),
			StreamId:     pulumi.String("my-stream"),
			DesiredState: pulumi.String("RUNNING"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: source.ID(),
				PostgresqlSourceConfig: &datastream.StreamSourceConfigPostgresqlSourceConfigArgs{
					MaxConcurrentBackfillTasks: pulumi.Int(12),
					Publication:                pulumi.String("publication"),
					ReplicationSlot:            pulumi.String("replication_slot"),
					IncludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs{
						PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArray{
							&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs{
								Schema: pulumi.String("schema"),
								PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArray{
									&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs{
										Table: pulumi.String("table"),
										PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
											&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
												Column: pulumi.String("column"),
											},
										},
									},
								},
							},
						},
					},
					ExcludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs{
						PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArray{
							&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs{
								Schema: pulumi.String("schema"),
								PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArray{
									&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs{
										Table: pulumi.String("table"),
										PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
											&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
												Column: pulumi.String("column"),
											},
										},
									},
								},
							},
						},
					},
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destination.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
				},
			},
			BackfillAll: &datastream.StreamBackfillAllArgs{
				PostgresqlExcludedObjects: &datastream.StreamBackfillAllPostgresqlExcludedObjectsArgs{
					PostgresqlSchemas: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArray{
						&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs{
							Schema: pulumi.String("schema"),
							PostgresqlTables: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArray{
								&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs{
									Table: pulumi.String("table"),
									PostgresqlColumns: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
										&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
											Column: pulumi.String("column"),
										},
									},
								},
							},
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var source = new Gcp.Datastream.ConnectionProfile("source", new()
    {
        DisplayName = "Postgresql Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
        {
            Hostname = "hostname",
            Port = 5432,
            Username = "user",
            Password = "pass",
            Database = "postgres",
        },
    });

    var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
    {
        DisplayName = "BigQuery Destination",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "Postgres to BigQuery",
        Location = "us-central1",
        StreamId = "my-stream",
        DesiredState = "RUNNING",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = source.Id,
            PostgresqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigArgs
            {
                MaxConcurrentBackfillTasks = 12,
                Publication = "publication",
                ReplicationSlot = "replication_slot",
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs
                {
                    PostgresqlSchemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs
                        {
                            Schema = "schema",
                            PostgresqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs
                                {
                                    Table = "table",
                                    PostgresqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
                                        {
                                            Column = "column",
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
                ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs
                {
                    PostgresqlSchemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs
                        {
                            Schema = "schema",
                            PostgresqlTables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs
                                {
                                    Table = "table",
                                    PostgresqlColumns = new[]
                                    {
                                        new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
                                        {
                                            Column = "column",
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destination.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
            },
        },
        BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
        {
            PostgresqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs
            {
                PostgresqlSchemas = new[]
                {
                    new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs
                    {
                        Schema = "schema",
                        PostgresqlTables = new[]
                        {
                            new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs
                            {
                                Table = "table",
                                PostgresqlColumns = new[]
                                {
                                    new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
                                    {
                                        Column = "column",
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePostgresqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
            .displayName("Postgresql Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .postgresqlProfile(ConnectionProfilePostgresqlProfileArgs.builder()
                .hostname("hostname")
                .port(5432)
                .username("user")
                .password("pass")
                .database("postgres")
                .build())
            .build());

        var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
            .displayName("BigQuery Destination")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("Postgres to BigQuery")
            .location("us-central1")
            .streamId("my-stream")
            .desiredState("RUNNING")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(source.id())
                .postgresqlSourceConfig(StreamSourceConfigPostgresqlSourceConfigArgs.builder()
                    .maxConcurrentBackfillTasks(12)
                    .publication("publication")
                    .replicationSlot("replication_slot")
                    .includeObjects(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs.builder()
                        .postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs.builder()
                            .schema("schema")
                            .postgresqlTables(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
                                .table("table")
                                .postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
                                    .column("column")
                                    .build())
                                .build())
                            .build())
                        .build())
                    .excludeObjects(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs.builder()
                        .postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs.builder()
                            .schema("schema")
                            .postgresqlTables(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
                                .table("table")
                                .postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
                                    .column("column")
                                    .build())
                                .build())
                            .build())
                        .build())
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destination.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .build())
                .build())
            .backfillAll(StreamBackfillAllArgs.builder()
                .postgresqlExcludedObjects(StreamBackfillAllPostgresqlExcludedObjectsArgs.builder()
                    .postgresqlSchemas(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs.builder()
                        .schema("schema")
                        .postgresqlTables(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
                            .table("table")
                            .postgresqlColumns(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
                                .column("column")
                                .build())
                            .build())
                        .build())
                    .build())
                .build())
            .build());

    }
}
resources:
  source:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: Postgresql Source
      location: us-central1
      connectionProfileId: source-profile
      postgresqlProfile:
        hostname: hostname
        port: 5432
        username: user
        password: pass
        database: postgres
  destination:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: BigQuery Destination
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: Postgres to BigQuery
      location: us-central1
      streamId: my-stream
      desiredState: RUNNING
      sourceConfig:
        sourceConnectionProfile: ${source.id}
        postgresqlSourceConfig:
          maxConcurrentBackfillTasks: 12
          publication: publication
          replicationSlot: replication_slot
          includeObjects:
            postgresqlSchemas:
              - schema: schema
                postgresqlTables:
                  - table: table
                    postgresqlColumns:
                      - column: column
          excludeObjects:
            postgresqlSchemas:
              - schema: schema
                postgresqlTables:
                  - table: table
                    postgresqlColumns:
                      - column: column
      destinationConfig:
        destinationConnectionProfile: ${destination.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
      backfillAll:
        postgresqlExcludedObjects:
          postgresqlSchemas:
            - schema: schema
              postgresqlTables:
                - table: table
                  postgresqlColumns:
                    - column: column

The postgresqlSourceConfig specifies publication and replicationSlot names that must exist in PostgreSQL. The bigqueryDestinationConfig uses sourceHierarchyDatasets to automatically create datasets matching the source schema structure. The dataFreshness property controls how often BigQuery tables are updated with new changes. The maxConcurrentBackfillTasks property controls parallelism during initial data load.

Stream SQL Server to BigQuery using transaction logs

SQL Server transaction log-based replication captures all changes without requiring application-level triggers or change tracking tables.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "sql-server",
    databaseVersion: "SQLSERVER_2019_STANDARD",
    region: "us-central1",
    rootPassword: "root-password",
    deletionProtection: true,
    settings: {
        tier: "db-custom-2-4096",
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: "password",
});
const db = new gcp.sql.Database("db", {
    name: "db",
    instance: instance.name,
}, {
    dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
    displayName: "SQL Server Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    sqlServerProfile: {
        hostname: instance.publicIpAddress,
        port: 1433,
        username: user.name,
        password: user.password,
        database: db.name,
    },
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
    displayName: "BigQuery Destination",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "SQL Server to BigQuery",
    location: "us-central1",
    streamId: "stream",
    sourceConfig: {
        sourceConnectionProfile: source.id,
        sqlServerSourceConfig: {
            includeObjects: {
                schemas: [{
                    schema: "schema",
                    tables: [{
                        table: "table",
                    }],
                }],
            },
            transactionLogs: {},
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destination.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
        },
    },
    backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp

instance = gcp.sql.DatabaseInstance("instance",
    name="sql-server",
    database_version="SQLSERVER_2019_STANDARD",
    region="us-central1",
    root_password="root-password",
    deletion_protection=True,
    settings={
        "tier": "db-custom-2-4096",
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    })
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password="password")
db = gcp.sql.Database("db",
    name="db",
    instance=instance.name,
    opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
    display_name="SQL Server Source",
    location="us-central1",
    connection_profile_id="source-profile",
    sql_server_profile={
        "hostname": instance.public_ip_address,
        "port": 1433,
        "username": user.name,
        "password": user.password,
        "database": db.name,
    })
destination = gcp.datastream.ConnectionProfile("destination",
    display_name="BigQuery Destination",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    display_name="SQL Server to BigQuery",
    location="us-central1",
    stream_id="stream",
    source_config={
        "source_connection_profile": source.id,
        "sql_server_source_config": {
            "include_objects": {
                "schemas": [{
                    "schema": "schema",
                    "tables": [{
                        "table": "table",
                    }],
                }],
            },
            "transaction_logs": {},
        },
    },
    destination_config={
        "destination_connection_profile": destination.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
        },
    },
    backfill_none={})
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:               pulumi.String("sql-server"),
			DatabaseVersion:    pulumi.String("SQLSERVER_2019_STANDARD"),
			Region:             pulumi.String("us-central1"),
			RootPassword:       pulumi.String("root-password"),
			DeletionProtection: pulumi.Bool(true),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-custom-2-4096"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Password: pulumi.String("password"),
		})
		if err != nil {
			return err
		}
		db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Name:     pulumi.String("db"),
			Instance: instance.Name,
		}, pulumi.DependsOn([]pulumi.Resource{
			user,
		}))
		if err != nil {
			return err
		}
		source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("SQL Server Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
				Hostname: instance.PublicIpAddress,
				Port:     pulumi.Int(1433),
				Username: user.Name,
				Password: user.Password,
				Database: db.Name,
			},
		})
		if err != nil {
			return err
		}
		destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("BigQuery Destination"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName: pulumi.String("SQL Server to BigQuery"),
			Location:    pulumi.String("us-central1"),
			StreamId:    pulumi.String("stream"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: source.ID(),
				SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
					IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
						Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
							&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
								Schema: pulumi.String("schema"),
								Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
									&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
										Table: pulumi.String("table"),
									},
								},
							},
						},
					},
					TransactionLogs: &datastream.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs{},
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destination.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "sql-server",
        DatabaseVersion = "SQLSERVER_2019_STANDARD",
        Region = "us-central1",
        RootPassword = "root-password",
        DeletionProtection = true,
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-custom-2-4096",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = "password",
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Name = "db",
        Instance = instance.Name,
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            user,
        },
    });

    var source = new Gcp.Datastream.ConnectionProfile("source", new()
    {
        DisplayName = "SQL Server Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Port = 1433,
            Username = user.Name,
            Password = user.Password,
            Database = db.Name,
        },
    });

    var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
    {
        DisplayName = "BigQuery Destination",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "SQL Server to BigQuery",
        Location = "us-central1",
        StreamId = "stream",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = source.Id,
            SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
            {
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
                {
                    Schemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
                        {
                            Schema = "schema",
                            Tables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
                                {
                                    Table = "table",
                                },
                            },
                        },
                    },
                },
                TransactionLogs = null,
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destination.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
            },
        },
        BackfillNone = null,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("sql-server")
            .databaseVersion("SQLSERVER_2019_STANDARD")
            .region("us-central1")
            .rootPassword("root-password")
            .deletionProtection(true)
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-custom-2-4096")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .password("password")
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .name("db")
            .instance(instance.name())
            .build(), CustomResourceOptions.builder()
                .dependsOn(user)
                .build());

        var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
            .displayName("SQL Server Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .port(1433)
                .username(user.name())
                .password(user.password())
                .database(db.name())
                .build())
            .build());

        var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
            .displayName("BigQuery Destination")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("SQL Server to BigQuery")
            .location("us-central1")
            .streamId("stream")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(source.id())
                .sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
                    .includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
                        .schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
                            .schema("schema")
                            .tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
                                .table("table")
                                .build())
                            .build())
                        .build())
                    .transactionLogs(StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs.builder()
                        .build())
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destination.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: sql-server
      databaseVersion: SQLSERVER_2019_STANDARD
      region: us-central1
      rootPassword: root-password
      deletionProtection: true
      settings:
        tier: db-custom-2-4096
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
  db:
    type: gcp:sql:Database
    properties:
      name: db
      instance: ${instance.name}
    options:
      dependsOn:
        - ${user}
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      password: password
  source:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: SQL Server Source
      location: us-central1
      connectionProfileId: source-profile
      sqlServerProfile:
        hostname: ${instance.publicIpAddress}
        port: 1433
        username: ${user.name}
        password: ${user.password}
        database: ${db.name}
  destination:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: BigQuery Destination
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: SQL Server to BigQuery
      location: us-central1
      streamId: stream
      sourceConfig:
        sourceConnectionProfile: ${source.id}
        sqlServerSourceConfig:
          includeObjects:
            schemas:
              - schema: schema
                tables:
                  - table: table
          transactionLogs: {}
      destinationConfig:
        destinationConnectionProfile: ${destination.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
      backfillNone: {}

The sqlServerSourceConfig with transactionLogs enables CDC through SQL Server’s native transaction log mechanism. The backfillNone property skips initial data load, starting replication from the current point in time. This approach requires SQL Server to have transaction log backup enabled.

Stream SQL Server using change tracking tables

Some SQL Server deployments use change tracking tables instead of transaction logs, providing an alternative CDC mechanism.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "sql-server",
    databaseVersion: "SQLSERVER_2019_STANDARD",
    region: "us-central1",
    rootPassword: "root-password",
    deletionProtection: true,
    settings: {
        tier: "db-custom-2-4096",
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: "password",
});
const db = new gcp.sql.Database("db", {
    name: "db",
    instance: instance.name,
}, {
    dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
    displayName: "SQL Server Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    sqlServerProfile: {
        hostname: instance.publicIpAddress,
        port: 1433,
        username: user.name,
        password: user.password,
        database: db.name,
    },
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
    displayName: "BigQuery Destination",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "SQL Server to BigQuery",
    location: "us-central1",
    streamId: "stream",
    sourceConfig: {
        sourceConnectionProfile: source.id,
        sqlServerSourceConfig: {
            includeObjects: {
                schemas: [{
                    schema: "schema",
                    tables: [{
                        table: "table",
                    }],
                }],
            },
            changeTables: {},
        },
    },
    destinationConfig: {
        destinationConnectionProfile: destination.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
        },
    },
    backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp

instance = gcp.sql.DatabaseInstance("instance",
    name="sql-server",
    database_version="SQLSERVER_2019_STANDARD",
    region="us-central1",
    root_password="root-password",
    deletion_protection=True,
    settings={
        "tier": "db-custom-2-4096",
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    })
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password="password")
db = gcp.sql.Database("db",
    name="db",
    instance=instance.name,
    opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
    display_name="SQL Server Source",
    location="us-central1",
    connection_profile_id="source-profile",
    sql_server_profile={
        "hostname": instance.public_ip_address,
        "port": 1433,
        "username": user.name,
        "password": user.password,
        "database": db.name,
    })
destination = gcp.datastream.ConnectionProfile("destination",
    display_name="BigQuery Destination",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    display_name="SQL Server to BigQuery",
    location="us-central1",
    stream_id="stream",
    source_config={
        "source_connection_profile": source.id,
        "sql_server_source_config": {
            "include_objects": {
                "schemas": [{
                    "schema": "schema",
                    "tables": [{
                        "table": "table",
                    }],
                }],
            },
            "change_tables": {},
        },
    },
    destination_config={
        "destination_connection_profile": destination.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
        },
    },
    backfill_none={})
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:               pulumi.String("sql-server"),
			DatabaseVersion:    pulumi.String("SQLSERVER_2019_STANDARD"),
			Region:             pulumi.String("us-central1"),
			RootPassword:       pulumi.String("root-password"),
			DeletionProtection: pulumi.Bool(true),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-custom-2-4096"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Password: pulumi.String("password"),
		})
		if err != nil {
			return err
		}
		db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Name:     pulumi.String("db"),
			Instance: instance.Name,
		}, pulumi.DependsOn([]pulumi.Resource{
			user,
		}))
		if err != nil {
			return err
		}
		source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("SQL Server Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
				Hostname: instance.PublicIpAddress,
				Port:     pulumi.Int(1433),
				Username: user.Name,
				Password: user.Password,
				Database: db.Name,
			},
		})
		if err != nil {
			return err
		}
		destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("BigQuery Destination"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName: pulumi.String("SQL Server to BigQuery"),
			Location:    pulumi.String("us-central1"),
			StreamId:    pulumi.String("stream"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: source.ID(),
				SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
					IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
						Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
							&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
								Schema: pulumi.String("schema"),
								Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
									&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
										Table: pulumi.String("table"),
									},
								},
							},
						},
					},
					ChangeTables: &datastream.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs{},
				},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destination.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "sql-server",
        DatabaseVersion = "SQLSERVER_2019_STANDARD",
        Region = "us-central1",
        RootPassword = "root-password",
        DeletionProtection = true,
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-custom-2-4096",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = "password",
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Name = "db",
        Instance = instance.Name,
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            user,
        },
    });

    var source = new Gcp.Datastream.ConnectionProfile("source", new()
    {
        DisplayName = "SQL Server Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Port = 1433,
            Username = user.Name,
            Password = user.Password,
            Database = db.Name,
        },
    });

    var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
    {
        DisplayName = "BigQuery Destination",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "SQL Server to BigQuery",
        Location = "us-central1",
        StreamId = "stream",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = source.Id,
            SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
            {
                IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
                {
                    Schemas = new[]
                    {
                        new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
                        {
                            Schema = "schema",
                            Tables = new[]
                            {
                                new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
                                {
                                    Table = "table",
                                },
                            },
                        },
                    },
                },
                ChangeTables = null,
            },
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destination.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
            },
        },
        BackfillNone = null,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("sql-server")
            .databaseVersion("SQLSERVER_2019_STANDARD")
            .region("us-central1")
            .rootPassword("root-password")
            .deletionProtection(true)
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-custom-2-4096")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .password("password")
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .name("db")
            .instance(instance.name())
            .build(), CustomResourceOptions.builder()
                .dependsOn(user)
                .build());

        var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
            .displayName("SQL Server Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .port(1433)
                .username(user.name())
                .password(user.password())
                .database(db.name())
                .build())
            .build());

        var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
            .displayName("BigQuery Destination")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("SQL Server to BigQuery")
            .location("us-central1")
            .streamId("stream")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(source.id())
                .sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
                    .includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
                        .schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
                            .schema("schema")
                            .tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
                                .table("table")
                                .build())
                            .build())
                        .build())
                    .changeTables(StreamSourceConfigSqlServerSourceConfigChangeTablesArgs.builder()
                        .build())
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destination.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: sql-server
      databaseVersion: SQLSERVER_2019_STANDARD
      region: us-central1
      rootPassword: root-password
      deletionProtection: true
      settings:
        tier: db-custom-2-4096
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
  db:
    type: gcp:sql:Database
    properties:
      name: db
      instance: ${instance.name}
    options:
      dependsOn:
        - ${user}
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      password: password
  source:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: SQL Server Source
      location: us-central1
      connectionProfileId: source-profile
      sqlServerProfile:
        hostname: ${instance.publicIpAddress}
        port: 1433
        username: ${user.name}
        password: ${user.password}
        database: ${db.name}
  destination:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: BigQuery Destination
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: SQL Server to BigQuery
      location: us-central1
      streamId: stream
      sourceConfig:
        sourceConnectionProfile: ${source.id}
        sqlServerSourceConfig:
          includeObjects:
            schemas:
              - schema: schema
                tables:
                  - table: table
          changeTables: {}
      destinationConfig:
        destinationConnectionProfile: ${destination.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
      backfillNone: {}

The changeTables property enables CDC through SQL Server’s change tracking feature rather than transaction logs. This approach has different performance characteristics and requires change tracking to be enabled on the source tables. It’s an alternative to the transaction log approach shown in the previous example.

Stream to a specific BigQuery dataset

When you need precise control over where data lands in BigQuery, you can target a specific dataset rather than letting Datastream create one automatically.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const postgres = new gcp.bigquery.Dataset("postgres", {
    datasetId: "postgres",
    friendlyName: "postgres",
    description: "Database of postgres",
    location: "us-central1",
});
const destinationConnectionProfile2 = new gcp.datastream.ConnectionProfile("destination_connection_profile2", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "dest-profile",
    bigqueryProfile: {},
});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "instance-name",
    databaseVersion: "MYSQL_8_0",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        backupConfiguration: {
            enabled: true,
            binaryLogEnabled: true,
        },
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: false,
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "my-user",
    instance: instance.name,
    host: "%",
    password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
    displayName: "Source connection profile",
    location: "us-central1",
    connectionProfileId: "source-profile",
    mysqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
    },
});
const _default = new gcp.datastream.Stream("default", {
    displayName: "postgres to bigQuery",
    location: "us-central1",
    streamId: "postgres-bigquery",
    sourceConfig: {
        sourceConnectionProfile: sourceConnectionProfile.id,
        mysqlSourceConfig: {},
    },
    destinationConfig: {
        destinationConnectionProfile: destinationConnectionProfile2.id,
        bigqueryDestinationConfig: {
            dataFreshness: "900s",
            singleTargetDataset: {
                datasetId: postgres.id,
            },
        },
    },
    backfillAll: {},
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

postgres = gcp.bigquery.Dataset("postgres",
    dataset_id="postgres",
    friendly_name="postgres",
    description="Database of postgres",
    location="us-central1")
destination_connection_profile2 = gcp.datastream.ConnectionProfile("destination_connection_profile2",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="dest-profile",
    bigquery_profile={})
instance = gcp.sql.DatabaseInstance("instance",
    name="instance-name",
    database_version="MYSQL_8_0",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "backup_configuration": {
            "enabled": True,
            "binary_log_enabled": True,
        },
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=False)
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="my-user",
    instance=instance.name,
    host="%",
    password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
    display_name="Source connection profile",
    location="us-central1",
    connection_profile_id="source-profile",
    mysql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
    })
default = gcp.datastream.Stream("default",
    display_name="postgres to bigQuery",
    location="us-central1",
    stream_id="postgres-bigquery",
    source_config={
        "source_connection_profile": source_connection_profile.id,
        "mysql_source_config": {},
    },
    destination_config={
        "destination_connection_profile": destination_connection_profile2.id,
        "bigquery_destination_config": {
            "data_freshness": "900s",
            "single_target_dataset": {
                "dataset_id": postgres.id,
            },
        },
    },
    backfill_all={})
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		postgres, err := bigquery.NewDataset(ctx, "postgres", &bigquery.DatasetArgs{
			DatasetId:    pulumi.String("postgres"),
			FriendlyName: pulumi.String("postgres"),
			Description:  pulumi.String("Database of postgres"),
			Location:     pulumi.String("us-central1"),
		})
		if err != nil {
			return err
		}
		destinationConnectionProfile2, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile2", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("dest-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("instance-name"),
			DatabaseVersion: pulumi.String("MYSQL_8_0"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
					Enabled:          pulumi.Bool(true),
					BinaryLogEnabled: pulumi.Bool(true),
				},
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("my-user"),
			Instance: instance.Name,
			Host:     pulumi.String("%"),
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Source connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
			},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			DisplayName: pulumi.String("postgres to bigQuery"),
			Location:    pulumi.String("us-central1"),
			StreamId:    pulumi.String("postgres-bigquery"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: sourceConnectionProfile.ID(),
				MysqlSourceConfig:       &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destinationConnectionProfile2.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					DataFreshness: pulumi.String("900s"),
					SingleTargetDataset: &datastream.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs{
						DatasetId: postgres.ID(),
					},
				},
			},
			BackfillAll: &datastream.StreamBackfillAllArgs{},
		})
		if err != nil {
			return err
		}
		_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var postgres = new Gcp.BigQuery.Dataset("postgres", new()
    {
        DatasetId = "postgres",
        FriendlyName = "postgres",
        Description = "Database of postgres",
        Location = "us-central1",
    });

    var destinationConnectionProfile2 = new Gcp.Datastream.ConnectionProfile("destination_connection_profile2", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "dest-profile",
        BigqueryProfile = null,
    });

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "instance-name",
        DatabaseVersion = "MYSQL_8_0",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
            {
                Enabled = true,
                BinaryLogEnabled = true,
            },
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = false,
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "my-user",
        Instance = instance.Name,
        Host = "%",
        Password = pwd.Result,
    });

    var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
    {
        DisplayName = "Source connection profile",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
        },
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        DisplayName = "postgres to bigQuery",
        Location = "us-central1",
        StreamId = "postgres-bigquery",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = sourceConnectionProfile.Id,
            MysqlSourceConfig = null,
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destinationConnectionProfile2.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                DataFreshness = "900s",
                SingleTargetDataset = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs
                {
                    DatasetId = postgres.Id,
                },
            },
        },
        BackfillAll = null,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var postgres = new Dataset("postgres", DatasetArgs.builder()
            .datasetId("postgres")
            .friendlyName("postgres")
            .description("Database of postgres")
            .location("us-central1")
            .build());

        var destinationConnectionProfile2 = new ConnectionProfile("destinationConnectionProfile2", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("dest-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("instance-name")
            .databaseVersion("MYSQL_8_0")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
                    .enabled(true)
                    .binaryLogEnabled(true)
                    .build())
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(false)
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("my-user")
            .instance(instance.name())
            .host("%")
            .password(pwd.result())
            .build());

        var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Source connection profile")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .displayName("postgres to bigQuery")
            .location("us-central1")
            .streamId("postgres-bigquery")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(sourceConnectionProfile.id())
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destinationConnectionProfile2.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .dataFreshness("900s")
                    .singleTargetDataset(StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs.builder()
                        .datasetId(postgres.id())
                        .build())
                    .build())
                .build())
            .backfillAll(StreamBackfillAllArgs.builder()
                .build())
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

    }
}
resources:
  postgres:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: postgres
      friendlyName: postgres
      description: Database of postgres
      location: us-central1
  default:
    type: gcp:datastream:Stream
    properties:
      displayName: postgres to bigQuery
      location: us-central1
      streamId: postgres-bigquery
      sourceConfig:
        sourceConnectionProfile: ${sourceConnectionProfile.id}
        mysqlSourceConfig: {}
      destinationConfig:
        destinationConnectionProfile: ${destinationConnectionProfile2.id}
        bigqueryDestinationConfig:
          dataFreshness: 900s
          singleTargetDataset:
            datasetId: ${postgres.id}
      backfillAll: {}
  destinationConnectionProfile2:
    type: gcp:datastream:ConnectionProfile
    name: destination_connection_profile2
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: dest-profile
      bigqueryProfile: {}
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: instance-name
      databaseVersion: MYSQL_8_0
      region: us-central1
      settings:
        tier: db-f1-micro
        backupConfiguration:
          enabled: true
          binaryLogEnabled: true
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: false
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: my-user
      instance: ${instance.name}
      host: '%'
      password: ${pwd.result}
  sourceConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: source_connection_profile
    properties:
      displayName: Source connection profile
      location: us-central1
      connectionProfileId: source-profile
      mysqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}

The singleTargetDataset property directs all replicated data to a specific BigQuery dataset by ID. This contrasts with sourceHierarchyDatasets, which creates datasets automatically based on source schema structure. The referenced dataset must exist before the stream starts.

Use append-only mode for immutable data

Append-only mode writes all changes as new rows rather than updating existing ones, preserving complete change history.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "my-instance",
    databaseVersion: "MYSQL_8_0",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        backupConfiguration: {
            enabled: true,
            binaryLogEnabled: true,
        },
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    host: "%",
    password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
    displayName: "Source connection profile",
    location: "us-central1",
    connectionProfileId: "source-profile",
    mysqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
    },
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    streamId: "my-stream",
    location: "us-central1",
    displayName: "my stream",
    sourceConfig: {
        sourceConnectionProfile: sourceConnectionProfile.id,
        mysqlSourceConfig: {},
    },
    destinationConfig: {
        destinationConnectionProfile: destinationConnectionProfile.id,
        bigqueryDestinationConfig: {
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
            appendOnly: {},
        },
    },
    backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
    name="my-instance",
    database_version="MYSQL_8_0",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "backup_configuration": {
            "enabled": True,
            "binary_log_enabled": True,
        },
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    host="%",
    password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
    display_name="Source connection profile",
    location="us-central1",
    connection_profile_id="source-profile",
    mysql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
    })
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    stream_id="my-stream",
    location="us-central1",
    display_name="my stream",
    source_config={
        "source_connection_profile": source_connection_profile.id,
        "mysql_source_config": {},
    },
    destination_config={
        "destination_connection_profile": destination_connection_profile.id,
        "bigquery_destination_config": {
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
            "append_only": {},
        },
    },
    backfill_none={})
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("my-instance"),
			DatabaseVersion: pulumi.String("MYSQL_8_0"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
					Enabled:          pulumi.Bool(true),
					BinaryLogEnabled: pulumi.Bool(true),
				},
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Host:     pulumi.String("%"),
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Source connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
			},
		})
		if err != nil {
			return err
		}
		destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			StreamId:    pulumi.String("my-stream"),
			Location:    pulumi.String("us-central1"),
			DisplayName: pulumi.String("my stream"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: sourceConnectionProfile.ID(),
				MysqlSourceConfig:       &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destinationConnectionProfile.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
					AppendOnly: &datastream.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs{},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "my-instance",
        DatabaseVersion = "MYSQL_8_0",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
            {
                Enabled = true,
                BinaryLogEnabled = true,
            },
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Host = "%",
        Password = pwd.Result,
    });

    var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
    {
        DisplayName = "Source connection profile",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
        },
    });

    var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        StreamId = "my-stream",
        Location = "us-central1",
        DisplayName = "my stream",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = sourceConnectionProfile.Id,
            MysqlSourceConfig = null,
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destinationConnectionProfile.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
                AppendOnly = null,
            },
        },
        BackfillNone = null,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("my-instance")
            .databaseVersion("MYSQL_8_0")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
                    .enabled(true)
                    .binaryLogEnabled(true)
                    .build())
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(true)
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .host("%")
            .password(pwd.result())
            .build());

        var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Source connection profile")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .build())
            .build());

        var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .streamId("my-stream")
            .location("us-central1")
            .displayName("my stream")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(sourceConnectionProfile.id())
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destinationConnectionProfile.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .appendOnly(StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs.builder()
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: my-instance
      databaseVersion: MYSQL_8_0
      region: us-central1
      settings:
        tier: db-f1-micro
        backupConfiguration:
          enabled: true
          binaryLogEnabled: true
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: true
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      host: '%'
      password: ${pwd.result}
  sourceConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: source_connection_profile
    properties:
      displayName: Source connection profile
      location: us-central1
      connectionProfileId: source-profile
      mysqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}
  destinationConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: destination_connection_profile
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      streamId: my-stream
      location: us-central1
      displayName: my stream
      sourceConfig:
        sourceConnectionProfile: ${sourceConnectionProfile.id}
        mysqlSourceConfig: {}
      destinationConfig:
        destinationConnectionProfile: ${destinationConnectionProfile.id}
        bigqueryDestinationConfig:
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
          appendOnly: {}
      backfillNone: {}
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The appendOnly property in bigqueryDestinationConfig changes how Datastream writes to BigQuery. Instead of updating rows in place, every change becomes a new row with metadata indicating the operation type. This preserves full change history for audit trails or time-travel queries.

Configure BigLake Managed Tables for query federation

BigLake Managed Tables store data in Cloud Storage while providing BigQuery query access, enabling cost-effective storage with federated analytics.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "blmt-instance",
    databaseVersion: "MYSQL_8_0",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    host: "%",
    password: pwd.result,
});
const blmtBucket = new gcp.storage.Bucket("blmt_bucket", {
    name: "blmt-bucket",
    location: "us-central1",
    forceDestroy: true,
});
const blmtConnection = new gcp.bigquery.Connection("blmt_connection", {
    project: project.then(project => project.projectId),
    location: "us-central1",
    connectionId: "blmt-connection",
    friendlyName: "Datastream BLMT Test Connection",
    description: "Connection for Datastream BLMT test",
    cloudResource: {},
});
const blmtConnectionBucketAdmin = new gcp.storage.BucketIAMMember("blmt_connection_bucket_admin", {
    bucket: blmtBucket.name,
    role: "roles/storage.admin",
    member: blmtConnection.cloudResource.apply(cloudResource => `serviceAccount:${cloudResource?.serviceAccountId}`),
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
    displayName: "Source connection profile",
    location: "us-central1",
    connectionProfileId: "blmt-source-profile",
    mysqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
    },
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "blmt-destination-profile",
    bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
    streamId: "blmt-stream",
    location: "us-central1",
    displayName: "My BLMT stream",
    sourceConfig: {
        sourceConnectionProfile: sourceConnectionProfile.id,
        mysqlSourceConfig: {},
    },
    destinationConfig: {
        destinationConnectionProfile: destinationConnectionProfile.id,
        bigqueryDestinationConfig: {
            sourceHierarchyDatasets: {
                datasetTemplate: {
                    location: "us-central1",
                },
            },
            blmtConfig: {
                bucket: blmtBucket.name,
                connectionName: pulumi.all([blmtConnection.project, blmtConnection.location, blmtConnection.connectionId]).apply(([project, location, connectionId]) => `${project}.${location}.${connectionId}`),
                fileFormat: "PARQUET",
                tableFormat: "ICEBERG",
                rootPath: "/",
            },
            appendOnly: {},
        },
    },
    backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
    name="blmt-instance",
    database_version="MYSQL_8_0",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    host="%",
    password=pwd["result"])
blmt_bucket = gcp.storage.Bucket("blmt_bucket",
    name="blmt-bucket",
    location="us-central1",
    force_destroy=True)
blmt_connection = gcp.bigquery.Connection("blmt_connection",
    project=project.project_id,
    location="us-central1",
    connection_id="blmt-connection",
    friendly_name="Datastream BLMT Test Connection",
    description="Connection for Datastream BLMT test",
    cloud_resource={})
blmt_connection_bucket_admin = gcp.storage.BucketIAMMember("blmt_connection_bucket_admin",
    bucket=blmt_bucket.name,
    role="roles/storage.admin",
    member=blmt_connection.cloud_resource.apply(lambda cloud_resource: f"serviceAccount:{cloud_resource.service_account_id}"))
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
    display_name="Source connection profile",
    location="us-central1",
    connection_profile_id="blmt-source-profile",
    mysql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
    })
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="blmt-destination-profile",
    bigquery_profile={})
default = gcp.datastream.Stream("default",
    stream_id="blmt-stream",
    location="us-central1",
    display_name="My BLMT stream",
    source_config={
        "source_connection_profile": source_connection_profile.id,
        "mysql_source_config": {},
    },
    destination_config={
        "destination_connection_profile": destination_connection_profile.id,
        "bigquery_destination_config": {
            "source_hierarchy_datasets": {
                "dataset_template": {
                    "location": "us-central1",
                },
            },
            "blmt_config": {
                "bucket": blmt_bucket.name,
                "connection_name": pulumi.Output.all(
                    project=blmt_connection.project,
                    location=blmt_connection.location,
                    connection_id=blmt_connection.connection_id
).apply(lambda resolved_outputs: f"{resolved_outputs['project']}.{resolved_outputs['location']}.{resolved_outputs['connection_id']}")
,
                "file_format": "PARQUET",
                "table_format": "ICEBERG",
                "root_path": "/",
            },
            "append_only": {},
        },
    },
    backfill_none={})
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("blmt-instance"),
			DatabaseVersion: pulumi.String("MYSQL_8_0"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Host:     pulumi.String("%"),
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		blmtBucket, err := storage.NewBucket(ctx, "blmt_bucket", &storage.BucketArgs{
			Name:         pulumi.String("blmt-bucket"),
			Location:     pulumi.String("us-central1"),
			ForceDestroy: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		blmtConnection, err := bigquery.NewConnection(ctx, "blmt_connection", &bigquery.ConnectionArgs{
			Project:       pulumi.String(project.ProjectId),
			Location:      pulumi.String("us-central1"),
			ConnectionId:  pulumi.String("blmt-connection"),
			FriendlyName:  pulumi.String("Datastream BLMT Test Connection"),
			Description:   pulumi.String("Connection for Datastream BLMT test"),
			CloudResource: &bigquery.ConnectionCloudResourceArgs{},
		})
		if err != nil {
			return err
		}
		_, err = storage.NewBucketIAMMember(ctx, "blmt_connection_bucket_admin", &storage.BucketIAMMemberArgs{
			Bucket: blmtBucket.Name,
			Role:   pulumi.String("roles/storage.admin"),
			Member: blmtConnection.CloudResource.ApplyT(func(cloudResource bigquery.ConnectionCloudResource) (string, error) {
				return fmt.Sprintf("serviceAccount:%v", cloudResource.ServiceAccountId), nil
			}).(pulumi.StringOutput),
		})
		if err != nil {
			return err
		}
		sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Source connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("blmt-source-profile"),
			MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
			},
		})
		if err != nil {
			return err
		}
		destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("blmt-destination-profile"),
			BigqueryProfile:     &datastream.ConnectionProfileBigqueryProfileArgs{},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
			StreamId:    pulumi.String("blmt-stream"),
			Location:    pulumi.String("us-central1"),
			DisplayName: pulumi.String("My BLMT stream"),
			SourceConfig: &datastream.StreamSourceConfigArgs{
				SourceConnectionProfile: sourceConnectionProfile.ID(),
				MysqlSourceConfig:       &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
			},
			DestinationConfig: &datastream.StreamDestinationConfigArgs{
				DestinationConnectionProfile: destinationConnectionProfile.ID(),
				BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
					SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
						DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
							Location: pulumi.String("us-central1"),
						},
					},
					BlmtConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs{
						Bucket: blmtBucket.Name,
						ConnectionName: pulumi.All(blmtConnection.Project, blmtConnection.Location, blmtConnection.ConnectionId).ApplyT(func(_args []interface{}) (string, error) {
							project := _args[0].(string)
							location := _args[1].(*string)
							connectionId := _args[2].(string)
							return fmt.Sprintf("%v.%v.%v", project, location, connectionId), nil
						}).(pulumi.StringOutput),
						FileFormat:  pulumi.String("PARQUET"),
						TableFormat: pulumi.String("ICEBERG"),
						RootPath:    pulumi.String("/"),
					},
					AppendOnly: &datastream.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs{},
				},
			},
			BackfillNone: &datastream.StreamBackfillNoneArgs{},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "blmt-instance",
        DatabaseVersion = "MYSQL_8_0",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Host = "%",
        Password = pwd.Result,
    });

    var blmtBucket = new Gcp.Storage.Bucket("blmt_bucket", new()
    {
        Name = "blmt-bucket",
        Location = "us-central1",
        ForceDestroy = true,
    });

    var blmtConnection = new Gcp.BigQuery.Connection("blmt_connection", new()
    {
        Project = project.Apply(getProjectResult => getProjectResult.ProjectId),
        Location = "us-central1",
        ConnectionId = "blmt-connection",
        FriendlyName = "Datastream BLMT Test Connection",
        Description = "Connection for Datastream BLMT test",
        CloudResource = null,
    });

    var blmtConnectionBucketAdmin = new Gcp.Storage.BucketIAMMember("blmt_connection_bucket_admin", new()
    {
        Bucket = blmtBucket.Name,
        Role = "roles/storage.admin",
        Member = blmtConnection.CloudResource.Apply(cloudResource => $"serviceAccount:{cloudResource?.ServiceAccountId}"),
    });

    var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
    {
        DisplayName = "Source connection profile",
        Location = "us-central1",
        ConnectionProfileId = "blmt-source-profile",
        MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
        },
    });

    var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "blmt-destination-profile",
        BigqueryProfile = null,
    });

    var @default = new Gcp.Datastream.Stream("default", new()
    {
        StreamId = "blmt-stream",
        Location = "us-central1",
        DisplayName = "My BLMT stream",
        SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
        {
            SourceConnectionProfile = sourceConnectionProfile.Id,
            MysqlSourceConfig = null,
        },
        DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
        {
            DestinationConnectionProfile = destinationConnectionProfile.Id,
            BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
            {
                SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
                {
                    DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
                    {
                        Location = "us-central1",
                    },
                },
                BlmtConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs
                {
                    Bucket = blmtBucket.Name,
                    ConnectionName = Output.Tuple(blmtConnection.Project, blmtConnection.Location, blmtConnection.ConnectionId).Apply(values =>
                    {
                        var project = values.Item1;
                        var location = values.Item2;
                        var connectionId = values.Item3;
                        return $"{project}.{location}.{connectionId}";
                    }),
                    FileFormat = "PARQUET",
                    TableFormat = "ICEBERG",
                    RootPath = "/",
                },
                AppendOnly = null,
            },
        },
        BackfillNone = null,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionCloudResourceArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("blmt-instance")
            .databaseVersion("MYSQL_8_0")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(true)
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .host("%")
            .password(pwd.result())
            .build());

        var blmtBucket = new Bucket("blmtBucket", BucketArgs.builder()
            .name("blmt-bucket")
            .location("us-central1")
            .forceDestroy(true)
            .build());

        var blmtConnection = new Connection("blmtConnection", ConnectionArgs.builder()
            .project(project.projectId())
            .location("us-central1")
            .connectionId("blmt-connection")
            .friendlyName("Datastream BLMT Test Connection")
            .description("Connection for Datastream BLMT test")
            .cloudResource(ConnectionCloudResourceArgs.builder()
                .build())
            .build());

        var blmtConnectionBucketAdmin = new BucketIAMMember("blmtConnectionBucketAdmin", BucketIAMMemberArgs.builder()
            .bucket(blmtBucket.name())
            .role("roles/storage.admin")
            .member(blmtConnection.cloudResource().applyValue(_cloudResource -> String.format("serviceAccount:%s", _cloudResource.serviceAccountId())))
            .build());

        var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Source connection profile")
            .location("us-central1")
            .connectionProfileId("blmt-source-profile")
            .mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .build())
            .build());

        var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("blmt-destination-profile")
            .bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
                .build())
            .build());

        var default_ = new Stream("default", StreamArgs.builder()
            .streamId("blmt-stream")
            .location("us-central1")
            .displayName("My BLMT stream")
            .sourceConfig(StreamSourceConfigArgs.builder()
                .sourceConnectionProfile(sourceConnectionProfile.id())
                .mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
                    .build())
                .build())
            .destinationConfig(StreamDestinationConfigArgs.builder()
                .destinationConnectionProfile(destinationConnectionProfile.id())
                .bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
                    .sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
                        .datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
                            .location("us-central1")
                            .build())
                        .build())
                    .blmtConfig(StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs.builder()
                        .bucket(blmtBucket.name())
                        .connectionName(Output.tuple(blmtConnection.project(), blmtConnection.location(), blmtConnection.connectionId()).applyValue(values -> {
                            var project = values.t1;
                            var location = values.t2;
                            var connectionId = values.t3;
                            return String.format("%s.%s.%s", project,location,connectionId);
                        }))
                        .fileFormat("PARQUET")
                        .tableFormat("ICEBERG")
                        .rootPath("/")
                        .build())
                    .appendOnly(StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs.builder()
                        .build())
                    .build())
                .build())
            .backfillNone(StreamBackfillNoneArgs.builder()
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: blmt-instance
      databaseVersion: MYSQL_8_0
      region: us-central1
      settings:
        tier: db-f1-micro
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: true
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      host: '%'
      password: ${pwd.result}
  blmtBucket:
    type: gcp:storage:Bucket
    name: blmt_bucket
    properties:
      name: blmt-bucket
      location: us-central1
      forceDestroy: true
  blmtConnection:
    type: gcp:bigquery:Connection
    name: blmt_connection
    properties:
      project: ${project.projectId}
      location: us-central1
      connectionId: blmt-connection
      friendlyName: Datastream BLMT Test Connection
      description: Connection for Datastream BLMT test
      cloudResource: {}
  blmtConnectionBucketAdmin:
    type: gcp:storage:BucketIAMMember
    name: blmt_connection_bucket_admin
    properties:
      bucket: ${blmtBucket.name}
      role: roles/storage.admin
      member: serviceAccount:${blmtConnection.cloudResource.serviceAccountId}
  sourceConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: source_connection_profile
    properties:
      displayName: Source connection profile
      location: us-central1
      connectionProfileId: blmt-source-profile
      mysqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}
  destinationConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: destination_connection_profile
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: blmt-destination-profile
      bigqueryProfile: {}
  default:
    type: gcp:datastream:Stream
    properties:
      streamId: blmt-stream
      location: us-central1
      displayName: My BLMT stream
      sourceConfig:
        sourceConnectionProfile: ${sourceConnectionProfile.id}
        mysqlSourceConfig: {}
      destinationConfig:
        destinationConnectionProfile: ${destinationConnectionProfile.id}
        bigqueryDestinationConfig:
          sourceHierarchyDatasets:
            datasetTemplate:
              location: us-central1
          blmtConfig:
            bucket: ${blmtBucket.name}
            connectionName: ${blmtConnection.project}.${blmtConnection.location}.${blmtConnection.connectionId}
            fileFormat: PARQUET
            tableFormat: ICEBERG
            rootPath: /
          appendOnly: {}
      backfillNone: {}
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The blmtConfig property configures BigLake Managed Tables, which store data in GCS (specified by bucket) while exposing it through BigQuery. The connectionName references a BigQuery connection that provides the service account for GCS access. The fileFormat and tableFormat properties control how data is stored (PARQUET files in ICEBERG table format). The BigQuery connection’s service account needs storage.admin role on the bucket.

Beyond these examples

These snippets focus on specific stream-level features: source database configuration for MySQL, PostgreSQL, Oracle, SQL Server, and MongoDB; destination configuration for Cloud Storage and BigQuery; CDC mechanisms like transaction logs, replication slots, and change tables; and data filtering and backfill strategies. They’re intentionally minimal rather than full replication pipelines.

The examples reference pre-existing infrastructure such as Datastream connection profiles for sources and destinations, source databases with CDC enabled, BigQuery datasets, GCS buckets, KMS keys, and IAM permissions for the Datastream service account. They focus on configuring the stream rather than provisioning the surrounding infrastructure.

To keep things focused, common stream patterns are omitted, including:

  • Stream lifecycle management (desiredState for starting/pausing)
  • Private connectivity configuration (VPC peering, Private Service Connect)
  • Monitoring and alerting integration
  • Advanced filtering (complex include/exclude patterns)
  • Error handling and retry configuration
  • Cross-region replication patterns

These omissions are intentional: the goal is to illustrate how each stream feature is wired, not provide drop-in replication modules. See the Datastream Stream resource reference for all available configuration options.

Let's create GCP Datastream Streams

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Lifecycle
What properties can't I change after creating a stream?
The location, streamId, customerManagedEncryptionKey, and createWithoutValidation properties are immutable. Changing them requires recreating the stream.
How do I control when my stream starts?
Set desiredState to RUNNING to start immediately, NOT_STARTED to create without starting (default), or PAUSED to pause a running stream.
Why aren't all my labels showing up in my configuration?
The labels field is non-authoritative and only manages labels in your configuration. Use effectiveLabels to view all labels on the resource, including those managed by other clients.
IAM & Permissions
What IAM permissions are required for GCS destinations?
Grant the Datastream service account (service-PROJECT_NUMBER@gcp-sa-datastream.iam.gserviceaccount.com) three roles on the GCS bucket: roles/storage.objectViewer, roles/storage.objectCreator, and roles/storage.legacyBucketReader.
How do I enable KMS encryption for my stream?
Set customerManagedEncryptionKey to your KMS key name and grant the service account roles/cloudkms.cryptoKeyEncrypterDecrypter on the key. Use dependsOn to ensure the IAM binding completes before stream creation.
How do I stream to a BigQuery dataset in a different project?
Set projectId in sourceHierarchyDatasets to specify the target project. Ensure the Datastream service account has roles/bigquery.admin in the target project.
Backfill Strategies
What's the difference between backfillAll and backfillNone?
backfillAll automatically backfills the stream’s objects (with optional excludeObjects), while backfillNone disables automatic backfill. Choose one based on whether you need historical data.
How do I exclude specific objects from backfill?
Use backfillAll with source-specific excludeObjects (e.g., mysqlExcludedObjects, postgresqlExcludedObjects, oracleExcludedObjects) to specify databases, tables, or columns to exclude.
Source-Specific Configuration
What's the difference between transactionLogs and changeTables for SQL Server?
transactionLogs uses SQL Server transaction logs for change data capture, while changeTables uses SQL Server change tables. Choose one based on your SQL Server CDC configuration.
How do I use GTID for MySQL replication?
Set gtid: {} in mysqlSourceConfig instead of using binary log position. This enables GTID-based replication for MySQL sources.
Destination Configuration
How do I stream to a single BigQuery dataset instead of creating multiple datasets?
Use singleTargetDataset with datasetId in bigqueryDestinationConfig instead of sourceHierarchyDatasets.
What is append-only mode for BigQuery destinations?
Set appendOnly: {} in bigqueryDestinationConfig to append data without updates or deletes. This mode is useful for audit logs or immutable data.
What is BLMT for BigQuery destinations?
BigQuery Lakehouse Managed Tables (BLMT) stores data in GCS with Iceberg or Parquet format. Configure blmtConfig with bucket, connectionName, fileFormat (PARQUET), tableFormat (ICEBERG), and rootPath. The BigQuery connection service account needs roles/storage.admin on the bucket.

Using a different cloud?

Explore analytics guides for other cloud providers: