Configure GCP Datastream Connection Profiles

The gcp:datastream/connectionProfile:ConnectionProfile resource, part of the Pulumi GCP provider, defines reusable connection configurations for Datastream sources and destinations: database credentials, network paths, and authentication methods. This guide focuses on four capabilities: Cloud Storage destinations, PostgreSQL and SQL Server sources with public and private connectivity, SSL configuration and Secret Manager integration, and MongoDB replica set connections.

Connection profiles reference existing databases, VPC networks, private connections, and Secret Manager secrets. The examples are intentionally small. Combine them with your own infrastructure and security configuration.

Connect to Cloud Storage with bucket and path

Most Datastream deployments begin by defining a destination where replicated data will land. Cloud Storage profiles provide a simple starting point for batch-oriented pipelines.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const _default = new gcp.datastream.ConnectionProfile("default", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "my-profile",
    gcsProfile: {
        bucket: "my-bucket",
        rootPath: "/path",
    },
});
import pulumi
import pulumi_gcp as gcp

default = gcp.datastream.ConnectionProfile("default",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="my-profile",
    gcs_profile={
        "bucket": "my-bucket",
        "root_path": "/path",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := datastream.NewConnectionProfile(ctx, "default", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("my-profile"),
			GcsProfile: &datastream.ConnectionProfileGcsProfileArgs{
				Bucket:   pulumi.String("my-bucket"),
				RootPath: pulumi.String("/path"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var @default = new Gcp.Datastream.ConnectionProfile("default", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "my-profile",
        GcsProfile = new Gcp.Datastream.Inputs.ConnectionProfileGcsProfileArgs
        {
            Bucket = "my-bucket",
            RootPath = "/path",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileGcsProfileArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var default_ = new ConnectionProfile("default", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("my-profile")
            .gcsProfile(ConnectionProfileGcsProfileArgs.builder()
                .bucket("my-bucket")
                .rootPath("/path")
                .build())
            .build());

    }
}
resources:
  default:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: my-profile
      gcsProfile:
        bucket: my-bucket
        rootPath: /path

The gcsProfile property specifies the destination bucket and optional rootPath for organizing replicated data. The connectionProfileId provides a stable identifier for referencing this profile in stream configurations. Without additional connectivity options, this profile is ready to receive data from any Datastream source.

Connect to PostgreSQL with public IP access

Applications replicating from PostgreSQL databases often start with public IP connectivity, where the source database allows connections from Datastream’s static IP addresses.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "my-instance",
    databaseVersion: "POSTGRES_14",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: pwd.result,
});
const _default = new gcp.datastream.ConnectionProfile("default", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "my-profile",
    postgresqlProfile: {
        hostname: instance.publicIpAddress,
        username: user.name,
        password: user.password,
        database: db.name,
    },
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

instance = gcp.sql.DatabaseInstance("instance",
    name="my-instance",
    database_version="POSTGRES_14",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password=pwd["result"])
default = gcp.datastream.ConnectionProfile("default",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="my-profile",
    postgresql_profile={
        "hostname": instance.public_ip_address,
        "username": user.name,
        "password": user.password,
        "database": db.name,
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("my-instance"),
			DatabaseVersion: pulumi.String("POSTGRES_14"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewConnectionProfile(ctx, "default", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("my-profile"),
			PostgresqlProfile: &datastream.ConnectionProfilePostgresqlProfileArgs{
				Hostname: instance.PublicIpAddress,
				Username: user.Name,
				Password: user.Password,
				Database: db.Name,
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "my-instance",
        DatabaseVersion = "POSTGRES_14",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = pwd.Result,
    });

    var @default = new Gcp.Datastream.ConnectionProfile("default", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "my-profile",
        PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Username = user.Name,
            Password = user.Password,
            Database = db.Name,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePostgresqlProfileArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("my-instance")
            .databaseVersion("POSTGRES_14")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .deletionProtection(true)
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .password(pwd.result())
            .build());

        var default_ = new ConnectionProfile("default", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("my-profile")
            .postgresqlProfile(ConnectionProfilePostgresqlProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .username(user.name())
                .password(user.password())
                .database(db.name())
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: my-instance
      databaseVersion: POSTGRES_14
      region: us-central1
      settings:
        tier: db-f1-micro
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
      deletionProtection: true
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      password: ${pwd.result}
  default:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: my-profile
      postgresqlProfile:
        hostname: ${instance.publicIpAddress}
        username: ${user.name}
        password: ${user.password}
        database: ${db.name}

The postgresqlProfile property defines connection details: hostname (the Cloud SQL public IP), port, credentials, and target database. Cloud SQL’s authorizedNetworks must include Datastream’s static IP addresses (34.71.242.81, 34.72.28.29, and others) to allow inbound connections. This approach works for databases that can expose public endpoints.

Connect to PostgreSQL through private VPC peering

Production databases often restrict public access and require private connectivity through VPC peering. Datastream uses a NAT VM to route traffic from the peered network to the database.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";

const _default = new gcp.compute.Network("default", {
    name: "my-network",
    autoCreateSubnetworks: false,
});
const defaultSubnetwork = new gcp.compute.Subnetwork("default", {
    name: "my-subnetwork",
    ipCidrRange: "10.1.0.0/16",
    region: "us-central1",
    network: _default.id,
});
const privateConnection = new gcp.datastream.PrivateConnection("private_connection", {
    displayName: "Private connection",
    location: "us-central1",
    privateConnectionId: "my-connection",
    vpcPeeringConfig: {
        vpc: _default.id,
        subnet: "10.0.0.0/29",
    },
});
const natVmIp = new gcp.compute.Address("nat_vm_ip", {name: "nat-vm-ip"});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "my-instance",
    databaseVersion: "POSTGRES_14",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        ipConfiguration: {
            authorizedNetworks: [{
                value: natVmIp.address,
            }],
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: pwd.result,
});
const natVm = new gcp.compute.Instance("nat_vm", {
    name: "nat-vm",
    machineType: "e2-medium",
    zone: "us-central1-a",
    desiredStatus: "RUNNING",
    bootDisk: {
        initializeParams: {
            image: "debian-cloud/debian-12",
        },
    },
    networkInterfaces: [{
        network: privateConnection.vpcPeeringConfig.apply(vpcPeeringConfig => vpcPeeringConfig?.vpc),
        subnetwork: defaultSubnetwork.selfLink,
        accessConfigs: [{
            natIp: natVmIp.address,
        }],
    }],
    metadataStartupScript: pulumi.interpolate`#! /bin/bash
# See https://cloud.google.com/datastream/docs/private-connectivity#set-up-reverse-proxy
export DB_ADDR=${instance.publicIpAddress}
export DB_PORT=5432
echo 1 > /proc/sys/net/ipv4/ip_forward
md_url_prefix=\"http://169.254.169.254/computeMetadata/v1/instance\"
vm_nic_ip=\"$(curl -H \"Metadata-Flavor: Google\" ${md_url_prefix}/network-interfaces/0/ip)\"
iptables -t nat -F
iptables -t nat -A PREROUTING \\
     -p tcp --dport $DB_PORT \\
     -j DNAT \\
     --to-destination $DB_ADDR
iptables -t nat -A POSTROUTING \\
     -p tcp --dport $DB_PORT \\
     -j SNAT \\
     --to-source $vm_nic_ip
iptables-save
`,
});
const rules = new gcp.compute.Firewall("rules", {
    name: "ingress-rule",
    network: privateConnection.vpcPeeringConfig.apply(vpcPeeringConfig => vpcPeeringConfig?.vpc),
    description: "Allow traffic into NAT VM",
    direction: "INGRESS",
    allows: [{
        protocol: "tcp",
        ports: ["5432"],
    }],
    sourceRanges: [privateConnection.vpcPeeringConfig.apply(vpcPeeringConfig => vpcPeeringConfig?.subnet)],
});
const defaultConnectionProfile = new gcp.datastream.ConnectionProfile("default", {
    displayName: "Connection profile",
    location: "us-central1",
    connectionProfileId: "my-profile",
    postgresqlProfile: {
        hostname: natVm.networkInterfaces.apply(networkInterfaces => networkInterfaces[0].networkIp),
        username: user.name,
        password: user.password,
        database: db.name,
        port: 5432,
    },
    privateConnectivity: {
        privateConnection: privateConnection.id,
    },
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random

default = gcp.compute.Network("default",
    name="my-network",
    auto_create_subnetworks=False)
default_subnetwork = gcp.compute.Subnetwork("default",
    name="my-subnetwork",
    ip_cidr_range="10.1.0.0/16",
    region="us-central1",
    network=default.id)
private_connection = gcp.datastream.PrivateConnection("private_connection",
    display_name="Private connection",
    location="us-central1",
    private_connection_id="my-connection",
    vpc_peering_config={
        "vpc": default.id,
        "subnet": "10.0.0.0/29",
    })
nat_vm_ip = gcp.compute.Address("nat_vm_ip", name="nat-vm-ip")
instance = gcp.sql.DatabaseInstance("instance",
    name="my-instance",
    database_version="POSTGRES_14",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "ip_configuration": {
            "authorized_networks": [{
                "value": nat_vm_ip.address,
            }],
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password=pwd["result"])
nat_vm = gcp.compute.Instance("nat_vm",
    name="nat-vm",
    machine_type="e2-medium",
    zone="us-central1-a",
    desired_status="RUNNING",
    boot_disk={
        "initialize_params": {
            "image": "debian-cloud/debian-12",
        },
    },
    network_interfaces=[{
        "network": private_connection.vpc_peering_config.vpc,
        "subnetwork": default_subnetwork.self_link,
        "access_configs": [{
            "nat_ip": nat_vm_ip.address,
        }],
    }],
    metadata_startup_script=instance.public_ip_address.apply(lambda public_ip_address: f"""#! /bin/bash
# See https://cloud.google.com/datastream/docs/private-connectivity#set-up-reverse-proxy
export DB_ADDR={public_ip_address}
export DB_PORT=5432
echo 1 > /proc/sys/net/ipv4/ip_forward
md_url_prefix=\"http://169.254.169.254/computeMetadata/v1/instance\"
vm_nic_ip=\"$(curl -H \"Metadata-Flavor: Google\" ${{md_url_prefix}}/network-interfaces/0/ip)\"
iptables -t nat -F
iptables -t nat -A PREROUTING \\
     -p tcp --dport $DB_PORT \\
     -j DNAT \\
     --to-destination $DB_ADDR
iptables -t nat -A POSTROUTING \\
     -p tcp --dport $DB_PORT \\
     -j SNAT \\
     --to-source $vm_nic_ip
iptables-save
"""))
rules = gcp.compute.Firewall("rules",
    name="ingress-rule",
    network=private_connection.vpc_peering_config.vpc,
    description="Allow traffic into NAT VM",
    direction="INGRESS",
    allows=[{
        "protocol": "tcp",
        "ports": ["5432"],
    }],
    source_ranges=[private_connection.vpc_peering_config.subnet])
default_connection_profile = gcp.datastream.ConnectionProfile("default",
    display_name="Connection profile",
    location="us-central1",
    connection_profile_id="my-profile",
    postgresql_profile={
        "hostname": nat_vm.network_interfaces[0].network_ip,
        "username": user.name,
        "password": user.password,
        "database": db.name,
        "port": 5432,
    },
    private_connectivity={
        "private_connection": private_connection.id,
    })
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_default, err := compute.NewNetwork(ctx, "default", &compute.NetworkArgs{
			Name:                  pulumi.String("my-network"),
			AutoCreateSubnetworks: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		defaultSubnetwork, err := compute.NewSubnetwork(ctx, "default", &compute.SubnetworkArgs{
			Name:        pulumi.String("my-subnetwork"),
			IpCidrRange: pulumi.String("10.1.0.0/16"),
			Region:      pulumi.String("us-central1"),
			Network:     _default.ID(),
		})
		if err != nil {
			return err
		}
		privateConnection, err := datastream.NewPrivateConnection(ctx, "private_connection", &datastream.PrivateConnectionArgs{
			DisplayName:         pulumi.String("Private connection"),
			Location:            pulumi.String("us-central1"),
			PrivateConnectionId: pulumi.String("my-connection"),
			VpcPeeringConfig: &datastream.PrivateConnectionVpcPeeringConfigArgs{
				Vpc:    _default.ID(),
				Subnet: pulumi.String("10.0.0.0/29"),
			},
		})
		if err != nil {
			return err
		}
		natVmIp, err := compute.NewAddress(ctx, "nat_vm_ip", &compute.AddressArgs{
			Name: pulumi.String("nat-vm-ip"),
		})
		if err != nil {
			return err
		}
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:            pulumi.String("my-instance"),
			DatabaseVersion: pulumi.String("POSTGRES_14"),
			Region:          pulumi.String("us-central1"),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-f1-micro"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: natVmIp.Address,
						},
					},
				},
			},
			DeletionProtection: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Instance: instance.Name,
			Name:     pulumi.String("db"),
		})
		if err != nil {
			return err
		}
		pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
			Length:  16,
			Special: false,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Password: pwd.Result,
		})
		if err != nil {
			return err
		}
		natVm, err := compute.NewInstance(ctx, "nat_vm", &compute.InstanceArgs{
			Name:          pulumi.String("nat-vm"),
			MachineType:   pulumi.String("e2-medium"),
			Zone:          pulumi.String("us-central1-a"),
			DesiredStatus: pulumi.String("RUNNING"),
			BootDisk: &compute.InstanceBootDiskArgs{
				InitializeParams: &compute.InstanceBootDiskInitializeParamsArgs{
					Image: pulumi.String("debian-cloud/debian-12"),
				},
			},
			NetworkInterfaces: compute.InstanceNetworkInterfaceArray{
				&compute.InstanceNetworkInterfaceArgs{
					Network: privateConnection.VpcPeeringConfig.ApplyT(func(vpcPeeringConfig datastream.PrivateConnectionVpcPeeringConfig) (*string, error) {
						return &vpcPeeringConfig.Vpc, nil
					}).(pulumi.StringPtrOutput),
					Subnetwork: defaultSubnetwork.SelfLink,
					AccessConfigs: compute.InstanceNetworkInterfaceAccessConfigArray{
						&compute.InstanceNetworkInterfaceAccessConfigArgs{
							NatIp: natVmIp.Address,
						},
					},
				},
			},
			MetadataStartupScript: instance.PublicIpAddress.ApplyT(func(publicIpAddress string) (string, error) {
				return fmt.Sprintf(`#! /bin/bash
# See https://cloud.google.com/datastream/docs/private-connectivity#set-up-reverse-proxy
export DB_ADDR=%v
export DB_PORT=5432
echo 1 > /proc/sys/net/ipv4/ip_forward
md_url_prefix=\"http://169.254.169.254/computeMetadata/v1/instance\"
vm_nic_ip=\"$(curl -H \"Metadata-Flavor: Google\" ${md_url_prefix}/network-interfaces/0/ip)\"
iptables -t nat -F
iptables -t nat -A PREROUTING \\
     -p tcp --dport $DB_PORT \\
     -j DNAT \\
     --to-destination $DB_ADDR
iptables -t nat -A POSTROUTING \\
     -p tcp --dport $DB_PORT \\
     -j SNAT \\
     --to-source $vm_nic_ip
iptables-save
`, publicIpAddress), nil
			}).(pulumi.StringOutput),
		})
		if err != nil {
			return err
		}
		_, err = compute.NewFirewall(ctx, "rules", &compute.FirewallArgs{
			Name: pulumi.String("ingress-rule"),
			Network: pulumi.String(privateConnection.VpcPeeringConfig.ApplyT(func(vpcPeeringConfig datastream.PrivateConnectionVpcPeeringConfig) (*string, error) {
				return &vpcPeeringConfig.Vpc, nil
			}).(pulumi.StringPtrOutput)),
			Description: pulumi.String("Allow traffic into NAT VM"),
			Direction:   pulumi.String("INGRESS"),
			Allows: compute.FirewallAllowArray{
				&compute.FirewallAllowArgs{
					Protocol: pulumi.String("tcp"),
					Ports: pulumi.StringArray{
						pulumi.String("5432"),
					},
				},
			},
			SourceRanges: pulumi.StringArray{
				pulumi.String(privateConnection.VpcPeeringConfig.ApplyT(func(vpcPeeringConfig datastream.PrivateConnectionVpcPeeringConfig) (*string, error) {
					return &vpcPeeringConfig.Subnet, nil
				}).(pulumi.StringPtrOutput)),
			},
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewConnectionProfile(ctx, "default", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Connection profile"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("my-profile"),
			PostgresqlProfile: &datastream.ConnectionProfilePostgresqlProfileArgs{
				Hostname: natVm.NetworkInterfaces.ApplyT(func(networkInterfaces []compute.InstanceNetworkInterface) (*string, error) {
					return &networkInterfaces[0].NetworkIp, nil
				}).(pulumi.StringPtrOutput),
				Username: user.Name,
				Password: user.Password,
				Database: db.Name,
				Port:     pulumi.Int(5432),
			},
			PrivateConnectivity: &datastream.ConnectionProfilePrivateConnectivityArgs{
				PrivateConnection: privateConnection.ID(),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;

return await Deployment.RunAsync(() => 
{
    var @default = new Gcp.Compute.Network("default", new()
    {
        Name = "my-network",
        AutoCreateSubnetworks = false,
    });

    var defaultSubnetwork = new Gcp.Compute.Subnetwork("default", new()
    {
        Name = "my-subnetwork",
        IpCidrRange = "10.1.0.0/16",
        Region = "us-central1",
        Network = @default.Id,
    });

    var privateConnection = new Gcp.Datastream.PrivateConnection("private_connection", new()
    {
        DisplayName = "Private connection",
        Location = "us-central1",
        PrivateConnectionId = "my-connection",
        VpcPeeringConfig = new Gcp.Datastream.Inputs.PrivateConnectionVpcPeeringConfigArgs
        {
            Vpc = @default.Id,
            Subnet = "10.0.0.0/29",
        },
    });

    var natVmIp = new Gcp.Compute.Address("nat_vm_ip", new()
    {
        Name = "nat-vm-ip",
    });

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "my-instance",
        DatabaseVersion = "POSTGRES_14",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = natVmIp.IPAddress,
                    },
                },
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = pwd.Result,
    });

    var natVm = new Gcp.Compute.Instance("nat_vm", new()
    {
        Name = "nat-vm",
        MachineType = "e2-medium",
        Zone = "us-central1-a",
        DesiredStatus = "RUNNING",
        BootDisk = new Gcp.Compute.Inputs.InstanceBootDiskArgs
        {
            InitializeParams = new Gcp.Compute.Inputs.InstanceBootDiskInitializeParamsArgs
            {
                Image = "debian-cloud/debian-12",
            },
        },
        NetworkInterfaces = new[]
        {
            new Gcp.Compute.Inputs.InstanceNetworkInterfaceArgs
            {
                Network = privateConnection.VpcPeeringConfig.Apply(vpcPeeringConfig => vpcPeeringConfig?.Vpc),
                Subnetwork = defaultSubnetwork.SelfLink,
                AccessConfigs = new[]
                {
                    new Gcp.Compute.Inputs.InstanceNetworkInterfaceAccessConfigArgs
                    {
                        NatIp = natVmIp.IPAddress,
                    },
                },
            },
        },
        MetadataStartupScript = instance.PublicIpAddress.Apply(publicIpAddress => @$"#! /bin/bash
# See https://cloud.google.com/datastream/docs/private-connectivity#set-up-reverse-proxy
export DB_ADDR={publicIpAddress}
export DB_PORT=5432
echo 1 > /proc/sys/net/ipv4/ip_forward
md_url_prefix=\""http://169.254.169.254/computeMetadata/v1/instance\""
vm_nic_ip=\""$(curl -H \""Metadata-Flavor: Google\"" ${{md_url_prefix}}/network-interfaces/0/ip)\""
iptables -t nat -F
iptables -t nat -A PREROUTING \\
     -p tcp --dport $DB_PORT \\
     -j DNAT \\
     --to-destination $DB_ADDR
iptables -t nat -A POSTROUTING \\
     -p tcp --dport $DB_PORT \\
     -j SNAT \\
     --to-source $vm_nic_ip
iptables-save
"),
    });

    var rules = new Gcp.Compute.Firewall("rules", new()
    {
        Name = "ingress-rule",
        Network = privateConnection.VpcPeeringConfig.Apply(vpcPeeringConfig => vpcPeeringConfig?.Vpc),
        Description = "Allow traffic into NAT VM",
        Direction = "INGRESS",
        Allows = new[]
        {
            new Gcp.Compute.Inputs.FirewallAllowArgs
            {
                Protocol = "tcp",
                Ports = new[]
                {
                    "5432",
                },
            },
        },
        SourceRanges = new[]
        {
            privateConnection.VpcPeeringConfig.Apply(vpcPeeringConfig => vpcPeeringConfig?.Subnet),
        },
    });

    var defaultConnectionProfile = new Gcp.Datastream.ConnectionProfile("default", new()
    {
        DisplayName = "Connection profile",
        Location = "us-central1",
        ConnectionProfileId = "my-profile",
        PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
        {
            Hostname = natVm.NetworkInterfaces.Apply(networkInterfaces => networkInterfaces[0].NetworkIp),
            Username = user.Name,
            Password = user.Password,
            Database = db.Name,
            Port = 5432,
        },
        PrivateConnectivity = new Gcp.Datastream.Inputs.ConnectionProfilePrivateConnectivityArgs
        {
            PrivateConnection = privateConnection.Id,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.compute.Subnetwork;
import com.pulumi.gcp.compute.SubnetworkArgs;
import com.pulumi.gcp.datastream.PrivateConnection;
import com.pulumi.gcp.datastream.PrivateConnectionArgs;
import com.pulumi.gcp.datastream.inputs.PrivateConnectionVpcPeeringConfigArgs;
import com.pulumi.gcp.compute.Address;
import com.pulumi.gcp.compute.AddressArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.compute.Instance;
import com.pulumi.gcp.compute.InstanceArgs;
import com.pulumi.gcp.compute.inputs.InstanceBootDiskArgs;
import com.pulumi.gcp.compute.inputs.InstanceBootDiskInitializeParamsArgs;
import com.pulumi.gcp.compute.inputs.InstanceNetworkInterfaceArgs;
import com.pulumi.gcp.compute.Firewall;
import com.pulumi.gcp.compute.FirewallArgs;
import com.pulumi.gcp.compute.inputs.FirewallAllowArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePostgresqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePrivateConnectivityArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var default_ = new Network("default", NetworkArgs.builder()
            .name("my-network")
            .autoCreateSubnetworks(false)
            .build());

        var defaultSubnetwork = new Subnetwork("defaultSubnetwork", SubnetworkArgs.builder()
            .name("my-subnetwork")
            .ipCidrRange("10.1.0.0/16")
            .region("us-central1")
            .network(default_.id())
            .build());

        var privateConnection = new PrivateConnection("privateConnection", PrivateConnectionArgs.builder()
            .displayName("Private connection")
            .location("us-central1")
            .privateConnectionId("my-connection")
            .vpcPeeringConfig(PrivateConnectionVpcPeeringConfigArgs.builder()
                .vpc(default_.id())
                .subnet("10.0.0.0/29")
                .build())
            .build());

        var natVmIp = new Address("natVmIp", AddressArgs.builder()
            .name("nat-vm-ip")
            .build());

        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("my-instance")
            .databaseVersion("POSTGRES_14")
            .region("us-central1")
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-f1-micro")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                        .value(natVmIp.address())
                        .build())
                    .build())
                .build())
            .deletionProtection(true)
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .instance(instance.name())
            .name("db")
            .build());

        var pwd = new Password("pwd", PasswordArgs.builder()
            .length(16)
            .special(false)
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .password(pwd.result())
            .build());

        var natVm = new Instance("natVm", InstanceArgs.builder()
            .name("nat-vm")
            .machineType("e2-medium")
            .zone("us-central1-a")
            .desiredStatus("RUNNING")
            .bootDisk(InstanceBootDiskArgs.builder()
                .initializeParams(InstanceBootDiskInitializeParamsArgs.builder()
                    .image("debian-cloud/debian-12")
                    .build())
                .build())
            .networkInterfaces(InstanceNetworkInterfaceArgs.builder()
                .network(privateConnection.vpcPeeringConfig().applyValue(_vpcPeeringConfig -> _vpcPeeringConfig.vpc()))
                .subnetwork(defaultSubnetwork.selfLink())
                .accessConfigs(InstanceNetworkInterfaceAccessConfigArgs.builder()
                    .natIp(natVmIp.address())
                    .build())
                .build())
            .metadataStartupScript(instance.publicIpAddress().applyValue(_publicIpAddress -> """
#! /bin/bash
# See https://cloud.google.com/datastream/docs/private-connectivity#set-up-reverse-proxy
export DB_ADDR=%s
export DB_PORT=5432
echo 1 > /proc/sys/net/ipv4/ip_forward
md_url_prefix=\"http://169.254.169.254/computeMetadata/v1/instance\"
vm_nic_ip=\"$(curl -H \"Metadata-Flavor: Google\" ${md_url_prefix}/network-interfaces/0/ip)\"
iptables -t nat -F
iptables -t nat -A PREROUTING \\
     -p tcp --dport $DB_PORT \\
     -j DNAT \\
     --to-destination $DB_ADDR
iptables -t nat -A POSTROUTING \\
     -p tcp --dport $DB_PORT \\
     -j SNAT \\
     --to-source $vm_nic_ip
iptables-save
", _publicIpAddress)))
            .build());

        var rules = new Firewall("rules", FirewallArgs.builder()
            .name("ingress-rule")
            .network(privateConnection.vpcPeeringConfig().applyValue(_vpcPeeringConfig -> _vpcPeeringConfig.vpc()))
            .description("Allow traffic into NAT VM")
            .direction("INGRESS")
            .allows(FirewallAllowArgs.builder()
                .protocol("tcp")
                .ports("5432")
                .build())
            .sourceRanges(privateConnection.vpcPeeringConfig().applyValue(_vpcPeeringConfig -> _vpcPeeringConfig.subnet()))
            .build());

        var defaultConnectionProfile = new ConnectionProfile("defaultConnectionProfile", ConnectionProfileArgs.builder()
            .displayName("Connection profile")
            .location("us-central1")
            .connectionProfileId("my-profile")
            .postgresqlProfile(ConnectionProfilePostgresqlProfileArgs.builder()
                .hostname(natVm.networkInterfaces().applyValue(_networkInterfaces -> _networkInterfaces[0].networkIp()))
                .username(user.name())
                .password(user.password())
                .database(db.name())
                .port(5432)
                .build())
            .privateConnectivity(ConnectionProfilePrivateConnectivityArgs.builder()
                .privateConnection(privateConnection.id())
                .build())
            .build());

    }
}
resources:
  default:
    type: gcp:compute:Network
    properties:
      name: my-network
      autoCreateSubnetworks: false
  defaultSubnetwork:
    type: gcp:compute:Subnetwork
    name: default
    properties:
      name: my-subnetwork
      ipCidrRange: 10.1.0.0/16
      region: us-central1
      network: ${default.id}
  privateConnection:
    type: gcp:datastream:PrivateConnection
    name: private_connection
    properties:
      displayName: Private connection
      location: us-central1
      privateConnectionId: my-connection
      vpcPeeringConfig:
        vpc: ${default.id}
        subnet: 10.0.0.0/29
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: my-instance
      databaseVersion: POSTGRES_14
      region: us-central1
      settings:
        tier: db-f1-micro
        ipConfiguration:
          authorizedNetworks:
            - value: ${natVmIp.address}
      deletionProtection: true
  db:
    type: gcp:sql:Database
    properties:
      instance: ${instance.name}
      name: db
  pwd:
    type: random:Password
    properties:
      length: 16
      special: false
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      password: ${pwd.result}
  natVmIp:
    type: gcp:compute:Address
    name: nat_vm_ip
    properties:
      name: nat-vm-ip
  natVm:
    type: gcp:compute:Instance
    name: nat_vm
    properties:
      name: nat-vm
      machineType: e2-medium
      zone: us-central1-a
      desiredStatus: RUNNING
      bootDisk:
        initializeParams:
          image: debian-cloud/debian-12
      networkInterfaces:
        - network: ${privateConnection.vpcPeeringConfig.vpc}
          subnetwork: ${defaultSubnetwork.selfLink}
          accessConfigs:
            - natIp: ${natVmIp.address}
      metadataStartupScript: |
        #! /bin/bash
        # See https://cloud.google.com/datastream/docs/private-connectivity#set-up-reverse-proxy
        export DB_ADDR=${instance.publicIpAddress}
        export DB_PORT=5432
        echo 1 > /proc/sys/net/ipv4/ip_forward
        md_url_prefix=\"http://169.254.169.254/computeMetadata/v1/instance\"
        vm_nic_ip=\"$(curl -H \"Metadata-Flavor: Google\" $${md_url_prefix}/network-interfaces/0/ip)\"
        iptables -t nat -F
        iptables -t nat -A PREROUTING \\
             -p tcp --dport $DB_PORT \\
             -j DNAT \\
             --to-destination $DB_ADDR
        iptables -t nat -A POSTROUTING \\
             -p tcp --dport $DB_PORT \\
             -j SNAT \\
             --to-source $vm_nic_ip
        iptables-save        
  rules:
    type: gcp:compute:Firewall
    properties:
      name: ingress-rule
      network: ${privateConnection.vpcPeeringConfig.vpc}
      description: Allow traffic into NAT VM
      direction: INGRESS
      allows:
        - protocol: tcp
          ports:
            - '5432'
      sourceRanges:
        - ${privateConnection.vpcPeeringConfig.subnet}
  defaultConnectionProfile:
    type: gcp:datastream:ConnectionProfile
    name: default
    properties:
      displayName: Connection profile
      location: us-central1
      connectionProfileId: my-profile
      postgresqlProfile:
        hostname: ${natVm.networkInterfaces[0].networkIp}
        username: ${user.name}
        password: ${user.password}
        database: ${db.name}
        port: 5432
      privateConnectivity:
        privateConnection: ${privateConnection.id}

The privateConnectivity property references a PrivateConnection resource that establishes VPC peering between Datastream and your network. The NAT VM acts as a reverse proxy, forwarding traffic from the peered subnet to the database’s public IP. Firewall rules must allow traffic from the private connection’s subnet to the NAT VM on the database port.

Connect to PostgreSQL with mutual TLS authentication

Databases requiring certificate-based authentication use SSL configuration to verify both server and client identities during connection establishment.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
import * as std from "@pulumi/std";

const datastreamIps = gcp.datastream.getStaticIps({
    location: "us-central1",
});
const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "my-instance",
    databaseVersion: "POSTGRES_15",
    region: "us-central1",
    settings: {
        tier: "db-f1-micro",
        ipConfiguration: {
            authorizedNetworks: std.format({
                input: "datastream-%d",
                args: [entry.key],
            }).then(invoke => .map(entry => ({
                name: invoke.result,
                value: entry.value,
            }))),
            ipv4Enabled: true,
            sslMode: "TRUSTED_CLIENT_CERTIFICATE_REQUIRED",
        },
    },
    deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
    instance: instance.name,
    name: "db",
});
const pwd = new random.index.Password("pwd", {
    length: 16,
    special: false,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: pwd.result,
});
const clientCert = new gcp.sql.SslCert("client_cert", {
    commonName: "client-name",
    instance: instance.name,
});
const _default = new gcp.datastream.ConnectionProfile("default", {
    displayName: "Connection Profile",
    location: "us-central1",
    connectionProfileId: "profile-id",
    postgresqlProfile: {
        hostname: instance.publicIpAddress,
        port: 5432,
        username: "user",
        password: pwd.result,
        database: db.name,
        sslConfig: {
            serverAndClientVerification: {
                clientCertificate: clientCert.cert,
                clientKey: clientCert.privateKey,
                caCertificate: clientCert.serverCaCert,
            },
        },
    },
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
import pulumi_std as std

datastream_ips = gcp.datastream.get_static_ips(location="us-central1")
instance = gcp.sql.DatabaseInstance("instance",
    name="my-instance",
    database_version="POSTGRES_15",
    region="us-central1",
    settings={
        "tier": "db-f1-micro",
        "ip_configuration": {
            "authorized_networks": [{"key": k, "value": v} for k, v in datastream_ips.static_ips.items()].apply(lambda entries: [{
                "name": std.format(input="datastream-%d",
                    args=[entry["key"]]).result,
                "value": entry["value"],
            } for entry in entries]),
            "ipv4_enabled": True,
            "ssl_mode": "TRUSTED_CLIENT_CERTIFICATE_REQUIRED",
        },
    },
    deletion_protection=True)
db = gcp.sql.Database("db",
    instance=instance.name,
    name="db")
pwd = random.index.Password("pwd",
    length=16,
    special=False)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password=pwd["result"])
client_cert = gcp.sql.SslCert("client_cert",
    common_name="client-name",
    instance=instance.name)
default = gcp.datastream.ConnectionProfile("default",
    display_name="Connection Profile",
    location="us-central1",
    connection_profile_id="profile-id",
    postgresql_profile={
        "hostname": instance.public_ip_address,
        "port": 5432,
        "username": "user",
        "password": pwd["result"],
        "database": db.name,
        "ssl_config": {
            "server_and_client_verification": {
                "client_certificate": client_cert.cert,
                "client_key": client_cert.private_key,
                "ca_certificate": client_cert.server_ca_cert,
            },
        },
    })
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
using Std = Pulumi.Std;

return await Deployment.RunAsync(() => 
{
    var datastreamIps = Gcp.Datastream.GetStaticIps.Invoke(new()
    {
        Location = "us-central1",
    });

    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "my-instance",
        DatabaseVersion = "POSTGRES_15",
        Region = "us-central1",
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-f1-micro",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = Std.Format.Invoke(new()
                {
                    Input = "datastream-%d",
                    Args = new[]
                    {
                        entry.Key,
                    },
                }).Apply(invoke => ),
                Ipv4Enabled = true,
                SslMode = "TRUSTED_CLIENT_CERTIFICATE_REQUIRED",
            },
        },
        DeletionProtection = true,
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Instance = instance.Name,
        Name = "db",
    });

    var pwd = new Random.Index.Password("pwd", new()
    {
        Length = 16,
        Special = false,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = pwd.Result,
    });

    var clientCert = new Gcp.Sql.SslCert("client_cert", new()
    {
        CommonName = "client-name",
        Instance = instance.Name,
    });

    var @default = new Gcp.Datastream.ConnectionProfile("default", new()
    {
        DisplayName = "Connection Profile",
        Location = "us-central1",
        ConnectionProfileId = "profile-id",
        PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Port = 5432,
            Username = "user",
            Password = pwd.Result,
            Database = db.Name,
            SslConfig = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileSslConfigArgs
            {
                ServerAndClientVerification = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileSslConfigServerAndClientVerificationArgs
                {
                    ClientCertificate = clientCert.Cert,
                    ClientKey = clientCert.PrivateKey,
                    CaCertificate = clientCert.ServerCaCert,
                },
            },
        },
    });

});

The sslConfig property enables mutual TLS through serverAndClientVerification, which requires three certificates: the client certificate and private key for authentication, and the CA certificate to verify the server. Cloud SQL must be configured with TRUSTED_CLIENT_CERTIFICATE_REQUIRED to enforce client certificate validation.

Store PostgreSQL credentials in Secret Manager

Teams managing credentials centrally use Secret Manager to avoid embedding passwords directly in connection profiles.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const _default = new gcp.datastream.ConnectionProfile("default", {
    displayName: "Postgres Source With Secret Manager",
    location: "us-central1",
    connectionProfileId: "source-profile",
    createWithoutValidation: true,
    postgresqlProfile: {
        hostname: "fake-hostname",
        port: 3306,
        username: "fake-username",
        secretManagerStoredPassword: "projects/fake-project/secrets/fake-secret/versions/1",
        database: "fake-database",
    },
});
import pulumi
import pulumi_gcp as gcp

default = gcp.datastream.ConnectionProfile("default",
    display_name="Postgres Source With Secret Manager",
    location="us-central1",
    connection_profile_id="source-profile",
    create_without_validation=True,
    postgresql_profile={
        "hostname": "fake-hostname",
        "port": 3306,
        "username": "fake-username",
        "secret_manager_stored_password": "projects/fake-project/secrets/fake-secret/versions/1",
        "database": "fake-database",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := datastream.NewConnectionProfile(ctx, "default", &datastream.ConnectionProfileArgs{
			DisplayName:             pulumi.String("Postgres Source With Secret Manager"),
			Location:                pulumi.String("us-central1"),
			ConnectionProfileId:     pulumi.String("source-profile"),
			CreateWithoutValidation: pulumi.Bool(true),
			PostgresqlProfile: &datastream.ConnectionProfilePostgresqlProfileArgs{
				Hostname:                    pulumi.String("fake-hostname"),
				Port:                        pulumi.Int(3306),
				Username:                    pulumi.String("fake-username"),
				SecretManagerStoredPassword: pulumi.String("projects/fake-project/secrets/fake-secret/versions/1"),
				Database:                    pulumi.String("fake-database"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var @default = new Gcp.Datastream.ConnectionProfile("default", new()
    {
        DisplayName = "Postgres Source With Secret Manager",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        CreateWithoutValidation = true,
        PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
        {
            Hostname = "fake-hostname",
            Port = 3306,
            Username = "fake-username",
            SecretManagerStoredPassword = "projects/fake-project/secrets/fake-secret/versions/1",
            Database = "fake-database",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePostgresqlProfileArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var default_ = new ConnectionProfile("default", ConnectionProfileArgs.builder()
            .displayName("Postgres Source With Secret Manager")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .createWithoutValidation(true)
            .postgresqlProfile(ConnectionProfilePostgresqlProfileArgs.builder()
                .hostname("fake-hostname")
                .port(3306)
                .username("fake-username")
                .secretManagerStoredPassword("projects/fake-project/secrets/fake-secret/versions/1")
                .database("fake-database")
                .build())
            .build());

    }
}
resources:
  default:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: Postgres Source With Secret Manager
      location: us-central1
      connectionProfileId: source-profile
      createWithoutValidation: true
      postgresqlProfile:
        hostname: fake-hostname
        port: 3306
        username: fake-username
        secretManagerStoredPassword: projects/fake-project/secrets/fake-secret/versions/1
        database: fake-database

The secretManagerStoredPassword property references a Secret Manager secret version instead of providing a plaintext password. The createWithoutValidation flag allows profile creation without testing connectivity, useful when the secret or database isn’t immediately available. The Datastream service account needs the secretAccessor role on the referenced secret.

Connect to SQL Server with port configuration

SQL Server databases use port 1433 by default and require explicit port configuration in the connection profile.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const instance = new gcp.sql.DatabaseInstance("instance", {
    name: "sql-server",
    databaseVersion: "SQLSERVER_2019_STANDARD",
    region: "us-central1",
    rootPassword: "root-password",
    deletionProtection: true,
    settings: {
        tier: "db-custom-2-4096",
        ipConfiguration: {
            authorizedNetworks: [
                {
                    value: "34.71.242.81",
                },
                {
                    value: "34.72.28.29",
                },
                {
                    value: "34.67.6.157",
                },
                {
                    value: "34.67.234.134",
                },
                {
                    value: "34.72.239.218",
                },
            ],
        },
    },
});
const db = new gcp.sql.Database("db", {
    name: "db",
    instance: instance.name,
});
const user = new gcp.sql.User("user", {
    name: "user",
    instance: instance.name,
    password: "password",
});
const _default = new gcp.datastream.ConnectionProfile("default", {
    displayName: "SQL Server Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    sqlServerProfile: {
        hostname: instance.publicIpAddress,
        port: 1433,
        username: user.name,
        password: user.password,
        database: db.name,
    },
});
import pulumi
import pulumi_gcp as gcp

instance = gcp.sql.DatabaseInstance("instance",
    name="sql-server",
    database_version="SQLSERVER_2019_STANDARD",
    region="us-central1",
    root_password="root-password",
    deletion_protection=True,
    settings={
        "tier": "db-custom-2-4096",
        "ip_configuration": {
            "authorized_networks": [
                {
                    "value": "34.71.242.81",
                },
                {
                    "value": "34.72.28.29",
                },
                {
                    "value": "34.67.6.157",
                },
                {
                    "value": "34.67.234.134",
                },
                {
                    "value": "34.72.239.218",
                },
            ],
        },
    })
db = gcp.sql.Database("db",
    name="db",
    instance=instance.name)
user = gcp.sql.User("user",
    name="user",
    instance=instance.name,
    password="password")
default = gcp.datastream.ConnectionProfile("default",
    display_name="SQL Server Source",
    location="us-central1",
    connection_profile_id="source-profile",
    sql_server_profile={
        "hostname": instance.public_ip_address,
        "port": 1433,
        "username": user.name,
        "password": user.password,
        "database": db.name,
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
			Name:               pulumi.String("sql-server"),
			DatabaseVersion:    pulumi.String("SQLSERVER_2019_STANDARD"),
			Region:             pulumi.String("us-central1"),
			RootPassword:       pulumi.String("root-password"),
			DeletionProtection: pulumi.Bool(true),
			Settings: &sql.DatabaseInstanceSettingsArgs{
				Tier: pulumi.String("db-custom-2-4096"),
				IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
					AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.71.242.81"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.28.29"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.6.157"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.67.234.134"),
						},
						&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
							Value: pulumi.String("34.72.239.218"),
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
			Name:     pulumi.String("db"),
			Instance: instance.Name,
		})
		if err != nil {
			return err
		}
		user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
			Name:     pulumi.String("user"),
			Instance: instance.Name,
			Password: pulumi.String("password"),
		})
		if err != nil {
			return err
		}
		_, err = datastream.NewConnectionProfile(ctx, "default", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("SQL Server Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
				Hostname: instance.PublicIpAddress,
				Port:     pulumi.Int(1433),
				Username: user.Name,
				Password: user.Password,
				Database: db.Name,
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var instance = new Gcp.Sql.DatabaseInstance("instance", new()
    {
        Name = "sql-server",
        DatabaseVersion = "SQLSERVER_2019_STANDARD",
        Region = "us-central1",
        RootPassword = "root-password",
        DeletionProtection = true,
        Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
        {
            Tier = "db-custom-2-4096",
            IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
            {
                AuthorizedNetworks = new[]
                {
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.71.242.81",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.28.29",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.6.157",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.67.234.134",
                    },
                    new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
                    {
                        Value = "34.72.239.218",
                    },
                },
            },
        },
    });

    var db = new Gcp.Sql.Database("db", new()
    {
        Name = "db",
        Instance = instance.Name,
    });

    var user = new Gcp.Sql.User("user", new()
    {
        Name = "user",
        Instance = instance.Name,
        Password = "password",
    });

    var @default = new Gcp.Datastream.ConnectionProfile("default", new()
    {
        DisplayName = "SQL Server Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
        {
            Hostname = instance.PublicIpAddress,
            Port = 1433,
            Username = user.Name,
            Password = user.Password,
            Database = db.Name,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
            .name("sql-server")
            .databaseVersion("SQLSERVER_2019_STANDARD")
            .region("us-central1")
            .rootPassword("root-password")
            .deletionProtection(true)
            .settings(DatabaseInstanceSettingsArgs.builder()
                .tier("db-custom-2-4096")
                .ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
                    .authorizedNetworks(                    
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.71.242.81")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.28.29")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.6.157")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.67.234.134")
                            .build(),
                        DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
                            .value("34.72.239.218")
                            .build())
                    .build())
                .build())
            .build());

        var db = new Database("db", DatabaseArgs.builder()
            .name("db")
            .instance(instance.name())
            .build());

        var user = new User("user", UserArgs.builder()
            .name("user")
            .instance(instance.name())
            .password("password")
            .build());

        var default_ = new ConnectionProfile("default", ConnectionProfileArgs.builder()
            .displayName("SQL Server Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
                .hostname(instance.publicIpAddress())
                .port(1433)
                .username(user.name())
                .password(user.password())
                .database(db.name())
                .build())
            .build());

    }
}
resources:
  instance:
    type: gcp:sql:DatabaseInstance
    properties:
      name: sql-server
      databaseVersion: SQLSERVER_2019_STANDARD
      region: us-central1
      rootPassword: root-password
      deletionProtection: true
      settings:
        tier: db-custom-2-4096
        ipConfiguration:
          authorizedNetworks:
            - value: 34.71.242.81
            - value: 34.72.28.29
            - value: 34.67.6.157
            - value: 34.67.234.134
            - value: 34.72.239.218
  db:
    type: gcp:sql:Database
    properties:
      name: db
      instance: ${instance.name}
  user:
    type: gcp:sql:User
    properties:
      name: user
      instance: ${instance.name}
      password: password
  default:
    type: gcp:datastream:ConnectionProfile
    properties:
      displayName: SQL Server Source
      location: us-central1
      connectionProfileId: source-profile
      sqlServerProfile:
        hostname: ${instance.publicIpAddress}
        port: 1433
        username: ${user.name}
        password: ${user.password}
        database: ${db.name}

The sqlServerProfile property includes a port field (1433 for SQL Server) alongside hostname, credentials, and database name. Like PostgreSQL, Cloud SQL must authorize Datastream’s static IP addresses in its authorized networks configuration.

Connect to MongoDB with replica set configuration

MongoDB sources require replica set configuration to enable change data capture, specifying host addresses and authentication details.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const _default = new gcp.datastream.ConnectionProfile("default", {
    displayName: "Mongodb Source",
    location: "us-central1",
    connectionProfileId: "source-profile",
    mongodbProfile: {
        hostAddresses: [{
            hostname: "mongodb-primary.example.com",
            port: 27017,
        }],
        replicaSet: "myReplicaSet",
        username: "mongoUser",
        password: "mongoPassword",
        database: "myDatabase",
        standardConnectionFormat: {}[0],
    },
});
import pulumi
import pulumi_gcp as gcp

default = gcp.datastream.ConnectionProfile("default",
    display_name="Mongodb Source",
    location="us-central1",
    connection_profile_id="source-profile",
    mongodb_profile={
        "host_addresses": [{
            "hostname": "mongodb-primary.example.com",
            "port": 27017,
        }],
        "replica_set": "myReplicaSet",
        "username": "mongoUser",
        "password": "mongoPassword",
        "database": "myDatabase",
        "standard_connection_format": {}[0],
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := datastream.NewConnectionProfile(ctx, "default", &datastream.ConnectionProfileArgs{
			DisplayName:         pulumi.String("Mongodb Source"),
			Location:            pulumi.String("us-central1"),
			ConnectionProfileId: pulumi.String("source-profile"),
			MongodbProfile: &datastream.ConnectionProfileMongodbProfileArgs{
				HostAddresses: datastream.ConnectionProfileMongodbProfileHostAddressArray{
					&datastream.ConnectionProfileMongodbProfileHostAddressArgs{
						Hostname: pulumi.String("mongodb-primary.example.com"),
						Port:     pulumi.Int(27017),
					},
				},
				ReplicaSet:               pulumi.String("myReplicaSet"),
				Username:                 pulumi.String("mongoUser"),
				Password:                 pulumi.String("mongoPassword"),
				Database:                 "myDatabase",
				StandardConnectionFormat: map[string]interface{}{}[0],
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var @default = new Gcp.Datastream.ConnectionProfile("default", new()
    {
        DisplayName = "Mongodb Source",
        Location = "us-central1",
        ConnectionProfileId = "source-profile",
        MongodbProfile = new Gcp.Datastream.Inputs.ConnectionProfileMongodbProfileArgs
        {
            HostAddresses = new[]
            {
                new Gcp.Datastream.Inputs.ConnectionProfileMongodbProfileHostAddressArgs
                {
                    Hostname = "mongodb-primary.example.com",
                    Port = 27017,
                },
            },
            ReplicaSet = "myReplicaSet",
            Username = "mongoUser",
            Password = "mongoPassword",
            Database = "myDatabase",
            StandardConnectionFormat = null[0],
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMongodbProfileArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var default_ = new ConnectionProfile("default", ConnectionProfileArgs.builder()
            .displayName("Mongodb Source")
            .location("us-central1")
            .connectionProfileId("source-profile")
            .mongodbProfile(ConnectionProfileMongodbProfileArgs.builder()
                .hostAddresses(ConnectionProfileMongodbProfileHostAddressArgs.builder()
                    .hostname("mongodb-primary.example.com")
                    .port(27017)
                    .build())
                .replicaSet("myReplicaSet")
                .username("mongoUser")
                .password("mongoPassword")
                .database("myDatabase")
                .standardConnectionFormat(ConnectionProfileMongodbProfileStandardConnectionFormatArgs.builder()
                    .build()[0])
                .build())
            .build());

    }
}

The mongodbProfile property defines hostAddresses as an array of hostname and port pairs, along with the replicaSet name for change stream access. MongoDB change data capture requires replica set deployment; standalone instances aren’t supported. The username and password authenticate against the specified database.

Beyond these examples

These snippets focus on specific connection profile features: Cloud Storage and database source profiles, private connectivity and SSL configuration, and Secret Manager integration. They’re intentionally minimal rather than full replication pipelines.

The examples may reference pre-existing infrastructure such as Cloud SQL instances, databases, and users, VPC networks, subnets, and private connections, Secret Manager secrets with stored credentials, and NAT VMs and firewall rules for private connectivity. They focus on configuring the connection profile rather than provisioning the surrounding infrastructure.

To keep things focused, common connection profile patterns are omitted, including:

  • Forward SSH tunnel connectivity (forwardSshConnectivity)
  • Oracle and MySQL database profiles
  • Salesforce and Spanner source profiles
  • Labels and metadata tagging

These omissions are intentional: the goal is to illustrate how each connection profile feature is wired, not provide drop-in replication modules. See the Datastream ConnectionProfile resource reference for all available configuration options.

Let's configure GCP Datastream Connection Profiles

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Setup
What connection profile types does Datastream support?
Datastream supports GCS, PostgreSQL, MySQL, Oracle, SQL Server, BigQuery, MongoDB, Salesforce, and Spanner connection profiles. Each profile type has specific configuration requirements shown in the examples.
What properties can't I change after creation?
The connectionProfileId, location, project, and createWithoutValidation properties are immutable. Changing these requires replacing the connection profile.
When should I use createWithoutValidation?
Use createWithoutValidation when you need to create a connection profile without validating connectivity first. This is commonly used for Salesforce, Spanner, MongoDB, and Secret Manager configurations where validation may not be immediately possible.
Private Connectivity & Networking
How do I connect to PostgreSQL using private connectivity?
Configure privateConnectivity with a PrivateConnection resource, then set up a NAT VM with iptables rules to proxy traffic from the private connection subnet to your Cloud SQL instance. The example shows the complete NAT VM configuration with startup script.
What IP addresses should I authorize for Cloud SQL connections?
Authorize these Datastream IP addresses in your Cloud SQL instance’s authorizedNetworks: 34.71.242.81, 34.72.28.29, 34.67.6.157, 34.67.234.134, and 34.72.239.218.
Security & Authentication
How do I configure SSL for PostgreSQL connections?
Use postgresqlProfile.sslConfig.serverAndClientVerification with clientCertificate, clientKey, and caCertificate from a Cloud SQL SSL certificate. Set the Cloud SQL instance’s sslMode to TRUSTED_CLIENT_CERTIFICATE_REQUIRED.
How do I use Secret Manager for database passwords?
Set secretManagerStoredPassword to a Secret Manager secret path (e.g., projects/PROJECT/secrets/SECRET/versions/VERSION) instead of using the password field directly. This works for PostgreSQL and Salesforce profiles.
Why aren't my labels being removed when I delete them from my config?
The labels field is non-authoritative and only manages labels present in your Pulumi configuration. Labels set outside Pulumi won’t be removed. Use the effectiveLabels output to see all labels on the resource.

Using a different cloud?

Explore integration guides for other cloud providers: