The gcp:datastream/stream:Stream resource, part of the Pulumi GCP provider, defines a Datastream replication stream that continuously moves data from source databases to destinations like BigQuery or Cloud Storage. This guide focuses on four capabilities: source database configuration for MySQL, PostgreSQL, and SQL Server; BigQuery and Cloud Storage destinations; CDC mechanisms and filtering; and BigLake Managed Tables.
Streams depend on connection profiles for both source and destination endpoints. The source database must have CDC enabled through binary logs, replication slots, or change tracking. The examples are intentionally small. Combine them with your own connection profiles, IAM configuration, and monitoring.
Stream MySQL to Cloud Storage with filtering and encryption
Data pipelines often replicate MySQL databases to Cloud Storage for analytics, applying table and column filters to control what gets replicated.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "my-instance",
databaseVersion: "MYSQL_8_0",
region: "us-central1",
settings: {
tier: "db-f1-micro",
backupConfiguration: {
enabled: true,
binaryLogEnabled: true,
},
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
instance: instance.name,
name: "db",
});
const pwd = new random.index.Password("pwd", {
length: 16,
special: false,
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
host: "%",
password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
displayName: "Source connection profile",
location: "us-central1",
connectionProfileId: "source-profile",
mysqlProfile: {
hostname: instance.publicIpAddress,
username: user.name,
password: user.password,
},
});
const bucket = new gcp.storage.Bucket("bucket", {
name: "my-bucket",
location: "US",
uniformBucketLevelAccess: true,
});
const viewer = new gcp.storage.BucketIAMMember("viewer", {
bucket: bucket.name,
role: "roles/storage.objectViewer",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const creator = new gcp.storage.BucketIAMMember("creator", {
bucket: bucket.name,
role: "roles/storage.objectCreator",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const reader = new gcp.storage.BucketIAMMember("reader", {
bucket: bucket.name,
role: "roles/storage.legacyBucketReader",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const keyUser = new gcp.kms.CryptoKeyIAMMember("key_user", {
cryptoKeyId: "kms-name",
role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
displayName: "Connection profile",
location: "us-central1",
connectionProfileId: "destination-profile",
gcsProfile: {
bucket: bucket.name,
rootPath: "/path",
},
});
const _default = new gcp.datastream.Stream("default", {
streamId: "my-stream",
desiredState: "NOT_STARTED",
location: "us-central1",
displayName: "my stream",
labels: {
key: "value",
},
sourceConfig: {
sourceConnectionProfile: sourceConnectionProfile.id,
mysqlSourceConfig: {
includeObjects: {
mysqlDatabases: [{
database: "my-database",
mysqlTables: [
{
table: "includedTable",
mysqlColumns: [{
column: "includedColumn",
dataType: "VARCHAR",
collation: "utf8mb4",
primaryKey: false,
nullable: false,
ordinalPosition: 0,
}],
},
{
table: "includedTable_2",
},
],
}],
},
excludeObjects: {
mysqlDatabases: [{
database: "my-database",
mysqlTables: [{
table: "excludedTable",
mysqlColumns: [{
column: "excludedColumn",
dataType: "VARCHAR",
collation: "utf8mb4",
primaryKey: false,
nullable: false,
ordinalPosition: 0,
}],
}],
}],
},
maxConcurrentCdcTasks: 5,
},
},
destinationConfig: {
destinationConnectionProfile: destinationConnectionProfile.id,
gcsDestinationConfig: {
path: "mydata",
fileRotationMb: 200,
fileRotationInterval: "60s",
jsonFileFormat: {
schemaFileFormat: "NO_SCHEMA_FILE",
compression: "GZIP",
},
},
},
backfillAll: {
mysqlExcludedObjects: {
mysqlDatabases: [{
database: "my-database",
mysqlTables: [{
table: "excludedTable",
mysqlColumns: [{
column: "excludedColumn",
dataType: "VARCHAR",
collation: "utf8mb4",
primaryKey: false,
nullable: false,
ordinalPosition: 0,
}],
}],
}],
},
},
customerManagedEncryptionKey: "kms-name",
}, {
dependsOn: [keyUser],
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
name="my-instance",
database_version="MYSQL_8_0",
region="us-central1",
settings={
"tier": "db-f1-micro",
"backup_configuration": {
"enabled": True,
"binary_log_enabled": True,
},
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
},
deletion_protection=True)
db = gcp.sql.Database("db",
instance=instance.name,
name="db")
pwd = random.index.Password("pwd",
length=16,
special=False)
user = gcp.sql.User("user",
name="user",
instance=instance.name,
host="%",
password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
display_name="Source connection profile",
location="us-central1",
connection_profile_id="source-profile",
mysql_profile={
"hostname": instance.public_ip_address,
"username": user.name,
"password": user.password,
})
bucket = gcp.storage.Bucket("bucket",
name="my-bucket",
location="US",
uniform_bucket_level_access=True)
viewer = gcp.storage.BucketIAMMember("viewer",
bucket=bucket.name,
role="roles/storage.objectViewer",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
creator = gcp.storage.BucketIAMMember("creator",
bucket=bucket.name,
role="roles/storage.objectCreator",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
reader = gcp.storage.BucketIAMMember("reader",
bucket=bucket.name,
role="roles/storage.legacyBucketReader",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
key_user = gcp.kms.CryptoKeyIAMMember("key_user",
crypto_key_id="kms-name",
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
display_name="Connection profile",
location="us-central1",
connection_profile_id="destination-profile",
gcs_profile={
"bucket": bucket.name,
"root_path": "/path",
})
default = gcp.datastream.Stream("default",
stream_id="my-stream",
desired_state="NOT_STARTED",
location="us-central1",
display_name="my stream",
labels={
"key": "value",
},
source_config={
"source_connection_profile": source_connection_profile.id,
"mysql_source_config": {
"include_objects": {
"mysql_databases": [{
"database": "my-database",
"mysql_tables": [
{
"table": "includedTable",
"mysql_columns": [{
"column": "includedColumn",
"data_type": "VARCHAR",
"collation": "utf8mb4",
"primary_key": False,
"nullable": False,
"ordinal_position": 0,
}],
},
{
"table": "includedTable_2",
},
],
}],
},
"exclude_objects": {
"mysql_databases": [{
"database": "my-database",
"mysql_tables": [{
"table": "excludedTable",
"mysql_columns": [{
"column": "excludedColumn",
"data_type": "VARCHAR",
"collation": "utf8mb4",
"primary_key": False,
"nullable": False,
"ordinal_position": 0,
}],
}],
}],
},
"max_concurrent_cdc_tasks": 5,
},
},
destination_config={
"destination_connection_profile": destination_connection_profile.id,
"gcs_destination_config": {
"path": "mydata",
"file_rotation_mb": 200,
"file_rotation_interval": "60s",
"json_file_format": {
"schema_file_format": "NO_SCHEMA_FILE",
"compression": "GZIP",
},
},
},
backfill_all={
"mysql_excluded_objects": {
"mysql_databases": [{
"database": "my-database",
"mysql_tables": [{
"table": "excludedTable",
"mysql_columns": [{
"column": "excludedColumn",
"data_type": "VARCHAR",
"collation": "utf8mb4",
"primary_key": False,
"nullable": False,
"ordinal_position": 0,
}],
}],
}],
},
},
customer_managed_encryption_key="kms-name",
opts = pulumi.ResourceOptions(depends_on=[key_user]))
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/kms"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("my-instance"),
DatabaseVersion: pulumi.String("MYSQL_8_0"),
Region: pulumi.String("us-central1"),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-f1-micro"),
BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
Enabled: pulumi.Bool(true),
BinaryLogEnabled: pulumi.Bool(true),
},
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
DeletionProtection: pulumi.Bool(true),
})
if err != nil {
return err
}
_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Instance: instance.Name,
Name: pulumi.String("db"),
})
if err != nil {
return err
}
pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
Length: 16,
Special: false,
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Host: pulumi.String("%"),
Password: pwd.Result,
})
if err != nil {
return err
}
sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Source connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
Hostname: instance.PublicIpAddress,
Username: user.Name,
Password: user.Password,
},
})
if err != nil {
return err
}
bucket, err := storage.NewBucket(ctx, "bucket", &storage.BucketArgs{
Name: pulumi.String("my-bucket"),
Location: pulumi.String("US"),
UniformBucketLevelAccess: pulumi.Bool(true),
})
if err != nil {
return err
}
_, err = storage.NewBucketIAMMember(ctx, "viewer", &storage.BucketIAMMemberArgs{
Bucket: bucket.Name,
Role: pulumi.String("roles/storage.objectViewer"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
_, err = storage.NewBucketIAMMember(ctx, "creator", &storage.BucketIAMMemberArgs{
Bucket: bucket.Name,
Role: pulumi.String("roles/storage.objectCreator"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
_, err = storage.NewBucketIAMMember(ctx, "reader", &storage.BucketIAMMemberArgs{
Bucket: bucket.Name,
Role: pulumi.String("roles/storage.legacyBucketReader"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
keyUser, err := kms.NewCryptoKeyIAMMember(ctx, "key_user", &kms.CryptoKeyIAMMemberArgs{
CryptoKeyId: pulumi.String("kms-name"),
Role: pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
GcsProfile: &datastream.ConnectionProfileGcsProfileArgs{
Bucket: bucket.Name,
RootPath: pulumi.String("/path"),
},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
StreamId: pulumi.String("my-stream"),
DesiredState: pulumi.String("NOT_STARTED"),
Location: pulumi.String("us-central1"),
DisplayName: pulumi.String("my stream"),
Labels: pulumi.StringMap{
"key": pulumi.String("value"),
},
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: sourceConnectionProfile.ID(),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{
IncludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs{
MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArray{
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs{
Database: pulumi.String("my-database"),
MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArray{
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("includedTable"),
MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
Column: pulumi.String("includedColumn"),
DataType: pulumi.String("VARCHAR"),
Collation: pulumi.String("utf8mb4"),
PrimaryKey: pulumi.Bool(false),
Nullable: pulumi.Bool(false),
OrdinalPosition: pulumi.Int(0),
},
},
},
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("includedTable_2"),
},
},
},
},
},
ExcludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs{
MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArray{
&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs{
Database: pulumi.String("my-database"),
MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArray{
&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("excludedTable"),
MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
Column: pulumi.String("excludedColumn"),
DataType: pulumi.String("VARCHAR"),
Collation: pulumi.String("utf8mb4"),
PrimaryKey: pulumi.Bool(false),
Nullable: pulumi.Bool(false),
OrdinalPosition: pulumi.Int(0),
},
},
},
},
},
},
},
MaxConcurrentCdcTasks: pulumi.Int(5),
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destinationConnectionProfile.ID(),
GcsDestinationConfig: &datastream.StreamDestinationConfigGcsDestinationConfigArgs{
Path: pulumi.String("mydata"),
FileRotationMb: pulumi.Int(200),
FileRotationInterval: pulumi.String("60s"),
JsonFileFormat: &datastream.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs{
SchemaFileFormat: pulumi.String("NO_SCHEMA_FILE"),
Compression: pulumi.String("GZIP"),
},
},
},
BackfillAll: &datastream.StreamBackfillAllArgs{
MysqlExcludedObjects: &datastream.StreamBackfillAllMysqlExcludedObjectsArgs{
MysqlDatabases: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArray{
&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs{
Database: pulumi.String("my-database"),
MysqlTables: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArray{
&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("excludedTable"),
MysqlColumns: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
Column: pulumi.String("excludedColumn"),
DataType: pulumi.String("VARCHAR"),
Collation: pulumi.String("utf8mb4"),
PrimaryKey: pulumi.Bool(false),
Nullable: pulumi.Bool(false),
OrdinalPosition: pulumi.Int(0),
},
},
},
},
},
},
},
},
CustomerManagedEncryptionKey: pulumi.String("kms-name"),
}, pulumi.DependsOn([]pulumi.Resource{
keyUser,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
return await Deployment.RunAsync(() =>
{
var project = Gcp.Organizations.GetProject.Invoke();
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "my-instance",
DatabaseVersion = "MYSQL_8_0",
Region = "us-central1",
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-f1-micro",
BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
{
Enabled = true,
BinaryLogEnabled = true,
},
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
DeletionProtection = true,
});
var db = new Gcp.Sql.Database("db", new()
{
Instance = instance.Name,
Name = "db",
});
var pwd = new Random.Index.Password("pwd", new()
{
Length = 16,
Special = false,
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Host = "%",
Password = pwd.Result,
});
var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
{
DisplayName = "Source connection profile",
Location = "us-central1",
ConnectionProfileId = "source-profile",
MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
{
Hostname = instance.PublicIpAddress,
Username = user.Name,
Password = user.Password,
},
});
var bucket = new Gcp.Storage.Bucket("bucket", new()
{
Name = "my-bucket",
Location = "US",
UniformBucketLevelAccess = true,
});
var viewer = new Gcp.Storage.BucketIAMMember("viewer", new()
{
Bucket = bucket.Name,
Role = "roles/storage.objectViewer",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var creator = new Gcp.Storage.BucketIAMMember("creator", new()
{
Bucket = bucket.Name,
Role = "roles/storage.objectCreator",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var reader = new Gcp.Storage.BucketIAMMember("reader", new()
{
Bucket = bucket.Name,
Role = "roles/storage.legacyBucketReader",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var keyUser = new Gcp.Kms.CryptoKeyIAMMember("key_user", new()
{
CryptoKeyId = "kms-name",
Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
{
DisplayName = "Connection profile",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
GcsProfile = new Gcp.Datastream.Inputs.ConnectionProfileGcsProfileArgs
{
Bucket = bucket.Name,
RootPath = "/path",
},
});
var @default = new Gcp.Datastream.Stream("default", new()
{
StreamId = "my-stream",
DesiredState = "NOT_STARTED",
Location = "us-central1",
DisplayName = "my stream",
Labels =
{
{ "key", "value" },
},
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = sourceConnectionProfile.Id,
MysqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigArgs
{
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs
{
MysqlDatabases = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs
{
Database = "my-database",
MysqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
{
Table = "includedTable",
MysqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
{
Column = "includedColumn",
DataType = "VARCHAR",
Collation = "utf8mb4",
PrimaryKey = false,
Nullable = false,
OrdinalPosition = 0,
},
},
},
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
{
Table = "includedTable_2",
},
},
},
},
},
ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs
{
MysqlDatabases = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs
{
Database = "my-database",
MysqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs
{
Table = "excludedTable",
MysqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
{
Column = "excludedColumn",
DataType = "VARCHAR",
Collation = "utf8mb4",
PrimaryKey = false,
Nullable = false,
OrdinalPosition = 0,
},
},
},
},
},
},
},
MaxConcurrentCdcTasks = 5,
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destinationConnectionProfile.Id,
GcsDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigArgs
{
Path = "mydata",
FileRotationMb = 200,
FileRotationInterval = "60s",
JsonFileFormat = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs
{
SchemaFileFormat = "NO_SCHEMA_FILE",
Compression = "GZIP",
},
},
},
BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
{
MysqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsArgs
{
MysqlDatabases = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs
{
Database = "my-database",
MysqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs
{
Table = "excludedTable",
MysqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
{
Column = "excludedColumn",
DataType = "VARCHAR",
Collation = "utf8mb4",
PrimaryKey = false,
Nullable = false,
OrdinalPosition = 0,
},
},
},
},
},
},
},
},
CustomerManagedEncryptionKey = "kms-name",
}, new CustomResourceOptions
{
DependsOn =
{
keyUser,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.kms.CryptoKeyIAMMember;
import com.pulumi.gcp.kms.CryptoKeyIAMMemberArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileGcsProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllMysqlExcludedObjectsArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("my-instance")
.databaseVersion("MYSQL_8_0")
.region("us-central1")
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-f1-micro")
.backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
.enabled(true)
.binaryLogEnabled(true)
.build())
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.deletionProtection(true)
.build());
var db = new Database("db", DatabaseArgs.builder()
.instance(instance.name())
.name("db")
.build());
var pwd = new Password("pwd", PasswordArgs.builder()
.length(16)
.special(false)
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.host("%")
.password(pwd.result())
.build());
var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Source connection profile")
.location("us-central1")
.connectionProfileId("source-profile")
.mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
.hostname(instance.publicIpAddress())
.username(user.name())
.password(user.password())
.build())
.build());
var bucket = new Bucket("bucket", BucketArgs.builder()
.name("my-bucket")
.location("US")
.uniformBucketLevelAccess(true)
.build());
var viewer = new BucketIAMMember("viewer", BucketIAMMemberArgs.builder()
.bucket(bucket.name())
.role("roles/storage.objectViewer")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var creator = new BucketIAMMember("creator", BucketIAMMemberArgs.builder()
.bucket(bucket.name())
.role("roles/storage.objectCreator")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var reader = new BucketIAMMember("reader", BucketIAMMemberArgs.builder()
.bucket(bucket.name())
.role("roles/storage.legacyBucketReader")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var keyUser = new CryptoKeyIAMMember("keyUser", CryptoKeyIAMMemberArgs.builder()
.cryptoKeyId("kms-name")
.role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Connection profile")
.location("us-central1")
.connectionProfileId("destination-profile")
.gcsProfile(ConnectionProfileGcsProfileArgs.builder()
.bucket(bucket.name())
.rootPath("/path")
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.streamId("my-stream")
.desiredState("NOT_STARTED")
.location("us-central1")
.displayName("my stream")
.labels(Map.of("key", "value"))
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(sourceConnectionProfile.id())
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.includeObjects(StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs.builder()
.mysqlDatabases(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs.builder()
.database("my-database")
.mysqlTables(
StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("includedTable")
.mysqlColumns(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
.column("includedColumn")
.dataType("VARCHAR")
.collation("utf8mb4")
.primaryKey(false)
.nullable(false)
.ordinalPosition(0)
.build())
.build(),
StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("includedTable_2")
.build())
.build())
.build())
.excludeObjects(StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs.builder()
.mysqlDatabases(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs.builder()
.database("my-database")
.mysqlTables(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("excludedTable")
.mysqlColumns(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
.column("excludedColumn")
.dataType("VARCHAR")
.collation("utf8mb4")
.primaryKey(false)
.nullable(false)
.ordinalPosition(0)
.build())
.build())
.build())
.build())
.maxConcurrentCdcTasks(5)
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destinationConnectionProfile.id())
.gcsDestinationConfig(StreamDestinationConfigGcsDestinationConfigArgs.builder()
.path("mydata")
.fileRotationMb(200)
.fileRotationInterval("60s")
.jsonFileFormat(StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs.builder()
.schemaFileFormat("NO_SCHEMA_FILE")
.compression("GZIP")
.build())
.build())
.build())
.backfillAll(StreamBackfillAllArgs.builder()
.mysqlExcludedObjects(StreamBackfillAllMysqlExcludedObjectsArgs.builder()
.mysqlDatabases(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs.builder()
.database("my-database")
.mysqlTables(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("excludedTable")
.mysqlColumns(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
.column("excludedColumn")
.dataType("VARCHAR")
.collation("utf8mb4")
.primaryKey(false)
.nullable(false)
.ordinalPosition(0)
.build())
.build())
.build())
.build())
.build())
.customerManagedEncryptionKey("kms-name")
.build(), CustomResourceOptions.builder()
.dependsOn(keyUser)
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: my-instance
databaseVersion: MYSQL_8_0
region: us-central1
settings:
tier: db-f1-micro
backupConfiguration:
enabled: true
binaryLogEnabled: true
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
deletionProtection: true
db:
type: gcp:sql:Database
properties:
instance: ${instance.name}
name: db
pwd:
type: random:Password
properties:
length: 16
special: false
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
host: '%'
password: ${pwd.result}
sourceConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: source_connection_profile
properties:
displayName: Source connection profile
location: us-central1
connectionProfileId: source-profile
mysqlProfile:
hostname: ${instance.publicIpAddress}
username: ${user.name}
password: ${user.password}
bucket:
type: gcp:storage:Bucket
properties:
name: my-bucket
location: US
uniformBucketLevelAccess: true
viewer:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${bucket.name}
role: roles/storage.objectViewer
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
creator:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${bucket.name}
role: roles/storage.objectCreator
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
reader:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${bucket.name}
role: roles/storage.legacyBucketReader
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
keyUser:
type: gcp:kms:CryptoKeyIAMMember
name: key_user
properties:
cryptoKeyId: kms-name
role: roles/cloudkms.cryptoKeyEncrypterDecrypter
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
destinationConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: destination_connection_profile
properties:
displayName: Connection profile
location: us-central1
connectionProfileId: destination-profile
gcsProfile:
bucket: ${bucket.name}
rootPath: /path
default:
type: gcp:datastream:Stream
properties:
streamId: my-stream
desiredState: NOT_STARTED
location: us-central1
displayName: my stream
labels:
key: value
sourceConfig:
sourceConnectionProfile: ${sourceConnectionProfile.id}
mysqlSourceConfig:
includeObjects:
mysqlDatabases:
- database: my-database
mysqlTables:
- table: includedTable
mysqlColumns:
- column: includedColumn
dataType: VARCHAR
collation: utf8mb4
primaryKey: false
nullable: false
ordinalPosition: 0
- table: includedTable_2
excludeObjects:
mysqlDatabases:
- database: my-database
mysqlTables:
- table: excludedTable
mysqlColumns:
- column: excludedColumn
dataType: VARCHAR
collation: utf8mb4
primaryKey: false
nullable: false
ordinalPosition: 0
maxConcurrentCdcTasks: 5
destinationConfig:
destinationConnectionProfile: ${destinationConnectionProfile.id}
gcsDestinationConfig:
path: mydata
fileRotationMb: 200
fileRotationInterval: 60s
jsonFileFormat:
schemaFileFormat: NO_SCHEMA_FILE
compression: GZIP
backfillAll:
mysqlExcludedObjects:
mysqlDatabases:
- database: my-database
mysqlTables:
- table: excludedTable
mysqlColumns:
- column: excludedColumn
dataType: VARCHAR
collation: utf8mb4
primaryKey: false
nullable: false
ordinalPosition: 0
customerManagedEncryptionKey: kms-name
options:
dependsOn:
- ${keyUser}
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The sourceConfig references a MySQL connection profile and defines includeObjects and excludeObjects to filter tables and columns. The destinationConfig points to a GCS connection profile and configures file rotation and JSON formatting. The backfillAll property controls initial data load, while customerManagedEncryptionKey enables KMS encryption. The maxConcurrentCdcTasks property controls parallelism for change data capture.
Stream PostgreSQL to BigQuery with replication slots
PostgreSQL replication to BigQuery enables real-time analytics on operational data using publication and replication slot mechanisms.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const source = new gcp.datastream.ConnectionProfile("source", {
displayName: "Postgresql Source",
location: "us-central1",
connectionProfileId: "source-profile",
postgresqlProfile: {
hostname: "hostname",
port: 5432,
username: "user",
password: "pass",
database: "postgres",
},
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
displayName: "BigQuery Destination",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "Postgres to BigQuery",
location: "us-central1",
streamId: "my-stream",
desiredState: "RUNNING",
sourceConfig: {
sourceConnectionProfile: source.id,
postgresqlSourceConfig: {
maxConcurrentBackfillTasks: 12,
publication: "publication",
replicationSlot: "replication_slot",
includeObjects: {
postgresqlSchemas: [{
schema: "schema",
postgresqlTables: [{
table: "table",
postgresqlColumns: [{
column: "column",
}],
}],
}],
},
excludeObjects: {
postgresqlSchemas: [{
schema: "schema",
postgresqlTables: [{
table: "table",
postgresqlColumns: [{
column: "column",
}],
}],
}],
},
},
},
destinationConfig: {
destinationConnectionProfile: destination.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
},
},
backfillAll: {
postgresqlExcludedObjects: {
postgresqlSchemas: [{
schema: "schema",
postgresqlTables: [{
table: "table",
postgresqlColumns: [{
column: "column",
}],
}],
}],
},
},
});
import pulumi
import pulumi_gcp as gcp
source = gcp.datastream.ConnectionProfile("source",
display_name="Postgresql Source",
location="us-central1",
connection_profile_id="source-profile",
postgresql_profile={
"hostname": "hostname",
"port": 5432,
"username": "user",
"password": "pass",
"database": "postgres",
})
destination = gcp.datastream.ConnectionProfile("destination",
display_name="BigQuery Destination",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
display_name="Postgres to BigQuery",
location="us-central1",
stream_id="my-stream",
desired_state="RUNNING",
source_config={
"source_connection_profile": source.id,
"postgresql_source_config": {
"max_concurrent_backfill_tasks": 12,
"publication": "publication",
"replication_slot": "replication_slot",
"include_objects": {
"postgresql_schemas": [{
"schema": "schema",
"postgresql_tables": [{
"table": "table",
"postgresql_columns": [{
"column": "column",
}],
}],
}],
},
"exclude_objects": {
"postgresql_schemas": [{
"schema": "schema",
"postgresql_tables": [{
"table": "table",
"postgresql_columns": [{
"column": "column",
}],
}],
}],
},
},
},
destination_config={
"destination_connection_profile": destination.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
},
},
backfill_all={
"postgresql_excluded_objects": {
"postgresql_schemas": [{
"schema": "schema",
"postgresql_tables": [{
"table": "table",
"postgresql_columns": [{
"column": "column",
}],
}],
}],
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Postgresql Source"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
PostgresqlProfile: &datastream.ConnectionProfilePostgresqlProfileArgs{
Hostname: pulumi.String("hostname"),
Port: pulumi.Int(5432),
Username: pulumi.String("user"),
Password: pulumi.String("pass"),
Database: pulumi.String("postgres"),
},
})
if err != nil {
return err
}
destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("BigQuery Destination"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("Postgres to BigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("my-stream"),
DesiredState: pulumi.String("RUNNING"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: source.ID(),
PostgresqlSourceConfig: &datastream.StreamSourceConfigPostgresqlSourceConfigArgs{
MaxConcurrentBackfillTasks: pulumi.Int(12),
Publication: pulumi.String("publication"),
ReplicationSlot: pulumi.String("replication_slot"),
IncludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs{
PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs{
Schema: pulumi.String("schema"),
PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs{
Table: pulumi.String("table"),
PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
Column: pulumi.String("column"),
},
},
},
},
},
},
},
ExcludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs{
PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs{
Schema: pulumi.String("schema"),
PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs{
Table: pulumi.String("table"),
PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
Column: pulumi.String("column"),
},
},
},
},
},
},
},
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destination.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
},
},
BackfillAll: &datastream.StreamBackfillAllArgs{
PostgresqlExcludedObjects: &datastream.StreamBackfillAllPostgresqlExcludedObjectsArgs{
PostgresqlSchemas: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArray{
&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs{
Schema: pulumi.String("schema"),
PostgresqlTables: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArray{
&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs{
Table: pulumi.String("table"),
PostgresqlColumns: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
Column: pulumi.String("column"),
},
},
},
},
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var source = new Gcp.Datastream.ConnectionProfile("source", new()
{
DisplayName = "Postgresql Source",
Location = "us-central1",
ConnectionProfileId = "source-profile",
PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
{
Hostname = "hostname",
Port = 5432,
Username = "user",
Password = "pass",
Database = "postgres",
},
});
var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
{
DisplayName = "BigQuery Destination",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "Postgres to BigQuery",
Location = "us-central1",
StreamId = "my-stream",
DesiredState = "RUNNING",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = source.Id,
PostgresqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigArgs
{
MaxConcurrentBackfillTasks = 12,
Publication = "publication",
ReplicationSlot = "replication_slot",
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs
{
PostgresqlSchemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs
{
Schema = "schema",
PostgresqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs
{
Table = "table",
PostgresqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
{
Column = "column",
},
},
},
},
},
},
},
ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs
{
PostgresqlSchemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs
{
Schema = "schema",
PostgresqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs
{
Table = "table",
PostgresqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
{
Column = "column",
},
},
},
},
},
},
},
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destination.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
},
},
BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
{
PostgresqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs
{
PostgresqlSchemas = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs
{
Schema = "schema",
PostgresqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs
{
Table = "table",
PostgresqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
{
Column = "column",
},
},
},
},
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePostgresqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
.displayName("Postgresql Source")
.location("us-central1")
.connectionProfileId("source-profile")
.postgresqlProfile(ConnectionProfilePostgresqlProfileArgs.builder()
.hostname("hostname")
.port(5432)
.username("user")
.password("pass")
.database("postgres")
.build())
.build());
var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
.displayName("BigQuery Destination")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("Postgres to BigQuery")
.location("us-central1")
.streamId("my-stream")
.desiredState("RUNNING")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(source.id())
.postgresqlSourceConfig(StreamSourceConfigPostgresqlSourceConfigArgs.builder()
.maxConcurrentBackfillTasks(12)
.publication("publication")
.replicationSlot("replication_slot")
.includeObjects(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs.builder()
.postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs.builder()
.schema("schema")
.postgresqlTables(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
.table("table")
.postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
.column("column")
.build())
.build())
.build())
.build())
.excludeObjects(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs.builder()
.postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs.builder()
.schema("schema")
.postgresqlTables(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
.table("table")
.postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
.column("column")
.build())
.build())
.build())
.build())
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destination.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.build())
.build())
.backfillAll(StreamBackfillAllArgs.builder()
.postgresqlExcludedObjects(StreamBackfillAllPostgresqlExcludedObjectsArgs.builder()
.postgresqlSchemas(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs.builder()
.schema("schema")
.postgresqlTables(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
.table("table")
.postgresqlColumns(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
.column("column")
.build())
.build())
.build())
.build())
.build())
.build());
}
}
resources:
source:
type: gcp:datastream:ConnectionProfile
properties:
displayName: Postgresql Source
location: us-central1
connectionProfileId: source-profile
postgresqlProfile:
hostname: hostname
port: 5432
username: user
password: pass
database: postgres
destination:
type: gcp:datastream:ConnectionProfile
properties:
displayName: BigQuery Destination
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
displayName: Postgres to BigQuery
location: us-central1
streamId: my-stream
desiredState: RUNNING
sourceConfig:
sourceConnectionProfile: ${source.id}
postgresqlSourceConfig:
maxConcurrentBackfillTasks: 12
publication: publication
replicationSlot: replication_slot
includeObjects:
postgresqlSchemas:
- schema: schema
postgresqlTables:
- table: table
postgresqlColumns:
- column: column
excludeObjects:
postgresqlSchemas:
- schema: schema
postgresqlTables:
- table: table
postgresqlColumns:
- column: column
destinationConfig:
destinationConnectionProfile: ${destination.id}
bigqueryDestinationConfig:
dataFreshness: 900s
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
backfillAll:
postgresqlExcludedObjects:
postgresqlSchemas:
- schema: schema
postgresqlTables:
- table: table
postgresqlColumns:
- column: column
The postgresqlSourceConfig specifies publication and replicationSlot names that must exist in PostgreSQL. The bigqueryDestinationConfig uses sourceHierarchyDatasets to automatically create datasets matching the source schema structure. The dataFreshness property controls how often BigQuery tables are updated with new changes. The maxConcurrentBackfillTasks property controls parallelism during initial data load.
Stream SQL Server to BigQuery using transaction logs
SQL Server transaction log-based replication captures all changes without requiring application-level triggers or change tracking tables.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "sql-server",
databaseVersion: "SQLSERVER_2019_STANDARD",
region: "us-central1",
rootPassword: "root-password",
deletionProtection: true,
settings: {
tier: "db-custom-2-4096",
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
password: "password",
});
const db = new gcp.sql.Database("db", {
name: "db",
instance: instance.name,
}, {
dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
displayName: "SQL Server Source",
location: "us-central1",
connectionProfileId: "source-profile",
sqlServerProfile: {
hostname: instance.publicIpAddress,
port: 1433,
username: user.name,
password: user.password,
database: db.name,
},
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
displayName: "BigQuery Destination",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "SQL Server to BigQuery",
location: "us-central1",
streamId: "stream",
sourceConfig: {
sourceConnectionProfile: source.id,
sqlServerSourceConfig: {
includeObjects: {
schemas: [{
schema: "schema",
tables: [{
table: "table",
}],
}],
},
transactionLogs: {},
},
},
destinationConfig: {
destinationConnectionProfile: destination.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
},
},
backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
instance = gcp.sql.DatabaseInstance("instance",
name="sql-server",
database_version="SQLSERVER_2019_STANDARD",
region="us-central1",
root_password="root-password",
deletion_protection=True,
settings={
"tier": "db-custom-2-4096",
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
})
user = gcp.sql.User("user",
name="user",
instance=instance.name,
password="password")
db = gcp.sql.Database("db",
name="db",
instance=instance.name,
opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
display_name="SQL Server Source",
location="us-central1",
connection_profile_id="source-profile",
sql_server_profile={
"hostname": instance.public_ip_address,
"port": 1433,
"username": user.name,
"password": user.password,
"database": db.name,
})
destination = gcp.datastream.ConnectionProfile("destination",
display_name="BigQuery Destination",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
display_name="SQL Server to BigQuery",
location="us-central1",
stream_id="stream",
source_config={
"source_connection_profile": source.id,
"sql_server_source_config": {
"include_objects": {
"schemas": [{
"schema": "schema",
"tables": [{
"table": "table",
}],
}],
},
"transaction_logs": {},
},
},
destination_config={
"destination_connection_profile": destination.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
},
},
backfill_none={})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("sql-server"),
DatabaseVersion: pulumi.String("SQLSERVER_2019_STANDARD"),
Region: pulumi.String("us-central1"),
RootPassword: pulumi.String("root-password"),
DeletionProtection: pulumi.Bool(true),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-custom-2-4096"),
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Password: pulumi.String("password"),
})
if err != nil {
return err
}
db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Name: pulumi.String("db"),
Instance: instance.Name,
}, pulumi.DependsOn([]pulumi.Resource{
user,
}))
if err != nil {
return err
}
source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("SQL Server Source"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
Hostname: instance.PublicIpAddress,
Port: pulumi.Int(1433),
Username: user.Name,
Password: user.Password,
Database: db.Name,
},
})
if err != nil {
return err
}
destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("BigQuery Destination"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("SQL Server to BigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("stream"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: source.ID(),
SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
Schema: pulumi.String("schema"),
Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
Table: pulumi.String("table"),
},
},
},
},
},
TransactionLogs: &datastream.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs{},
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destination.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "sql-server",
DatabaseVersion = "SQLSERVER_2019_STANDARD",
Region = "us-central1",
RootPassword = "root-password",
DeletionProtection = true,
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-custom-2-4096",
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Password = "password",
});
var db = new Gcp.Sql.Database("db", new()
{
Name = "db",
Instance = instance.Name,
}, new CustomResourceOptions
{
DependsOn =
{
user,
},
});
var source = new Gcp.Datastream.ConnectionProfile("source", new()
{
DisplayName = "SQL Server Source",
Location = "us-central1",
ConnectionProfileId = "source-profile",
SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
{
Hostname = instance.PublicIpAddress,
Port = 1433,
Username = user.Name,
Password = user.Password,
Database = db.Name,
},
});
var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
{
DisplayName = "BigQuery Destination",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "SQL Server to BigQuery",
Location = "us-central1",
StreamId = "stream",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = source.Id,
SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
{
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
{
Schemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
{
Schema = "schema",
Tables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
{
Table = "table",
},
},
},
},
},
TransactionLogs = null,
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destination.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
},
},
BackfillNone = null,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("sql-server")
.databaseVersion("SQLSERVER_2019_STANDARD")
.region("us-central1")
.rootPassword("root-password")
.deletionProtection(true)
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-custom-2-4096")
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.password("password")
.build());
var db = new Database("db", DatabaseArgs.builder()
.name("db")
.instance(instance.name())
.build(), CustomResourceOptions.builder()
.dependsOn(user)
.build());
var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
.displayName("SQL Server Source")
.location("us-central1")
.connectionProfileId("source-profile")
.sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
.hostname(instance.publicIpAddress())
.port(1433)
.username(user.name())
.password(user.password())
.database(db.name())
.build())
.build());
var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
.displayName("BigQuery Destination")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("SQL Server to BigQuery")
.location("us-central1")
.streamId("stream")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(source.id())
.sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
.includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
.schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
.schema("schema")
.tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
.table("table")
.build())
.build())
.build())
.transactionLogs(StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs.builder()
.build())
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destination.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: sql-server
databaseVersion: SQLSERVER_2019_STANDARD
region: us-central1
rootPassword: root-password
deletionProtection: true
settings:
tier: db-custom-2-4096
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
db:
type: gcp:sql:Database
properties:
name: db
instance: ${instance.name}
options:
dependsOn:
- ${user}
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
password: password
source:
type: gcp:datastream:ConnectionProfile
properties:
displayName: SQL Server Source
location: us-central1
connectionProfileId: source-profile
sqlServerProfile:
hostname: ${instance.publicIpAddress}
port: 1433
username: ${user.name}
password: ${user.password}
database: ${db.name}
destination:
type: gcp:datastream:ConnectionProfile
properties:
displayName: BigQuery Destination
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
displayName: SQL Server to BigQuery
location: us-central1
streamId: stream
sourceConfig:
sourceConnectionProfile: ${source.id}
sqlServerSourceConfig:
includeObjects:
schemas:
- schema: schema
tables:
- table: table
transactionLogs: {}
destinationConfig:
destinationConnectionProfile: ${destination.id}
bigqueryDestinationConfig:
dataFreshness: 900s
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
backfillNone: {}
The sqlServerSourceConfig with transactionLogs enables CDC through SQL Server’s native transaction log mechanism. The backfillNone property skips initial data load, starting replication from the current point in time. This approach requires SQL Server to have transaction log backup enabled.
Stream SQL Server using change tracking tables
Some SQL Server deployments use change tracking tables instead of transaction logs, providing an alternative CDC mechanism.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "sql-server",
databaseVersion: "SQLSERVER_2019_STANDARD",
region: "us-central1",
rootPassword: "root-password",
deletionProtection: true,
settings: {
tier: "db-custom-2-4096",
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
password: "password",
});
const db = new gcp.sql.Database("db", {
name: "db",
instance: instance.name,
}, {
dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
displayName: "SQL Server Source",
location: "us-central1",
connectionProfileId: "source-profile",
sqlServerProfile: {
hostname: instance.publicIpAddress,
port: 1433,
username: user.name,
password: user.password,
database: db.name,
},
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
displayName: "BigQuery Destination",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "SQL Server to BigQuery",
location: "us-central1",
streamId: "stream",
sourceConfig: {
sourceConnectionProfile: source.id,
sqlServerSourceConfig: {
includeObjects: {
schemas: [{
schema: "schema",
tables: [{
table: "table",
}],
}],
},
changeTables: {},
},
},
destinationConfig: {
destinationConnectionProfile: destination.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
},
},
backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
instance = gcp.sql.DatabaseInstance("instance",
name="sql-server",
database_version="SQLSERVER_2019_STANDARD",
region="us-central1",
root_password="root-password",
deletion_protection=True,
settings={
"tier": "db-custom-2-4096",
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
})
user = gcp.sql.User("user",
name="user",
instance=instance.name,
password="password")
db = gcp.sql.Database("db",
name="db",
instance=instance.name,
opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
display_name="SQL Server Source",
location="us-central1",
connection_profile_id="source-profile",
sql_server_profile={
"hostname": instance.public_ip_address,
"port": 1433,
"username": user.name,
"password": user.password,
"database": db.name,
})
destination = gcp.datastream.ConnectionProfile("destination",
display_name="BigQuery Destination",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
display_name="SQL Server to BigQuery",
location="us-central1",
stream_id="stream",
source_config={
"source_connection_profile": source.id,
"sql_server_source_config": {
"include_objects": {
"schemas": [{
"schema": "schema",
"tables": [{
"table": "table",
}],
}],
},
"change_tables": {},
},
},
destination_config={
"destination_connection_profile": destination.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
},
},
backfill_none={})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("sql-server"),
DatabaseVersion: pulumi.String("SQLSERVER_2019_STANDARD"),
Region: pulumi.String("us-central1"),
RootPassword: pulumi.String("root-password"),
DeletionProtection: pulumi.Bool(true),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-custom-2-4096"),
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Password: pulumi.String("password"),
})
if err != nil {
return err
}
db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Name: pulumi.String("db"),
Instance: instance.Name,
}, pulumi.DependsOn([]pulumi.Resource{
user,
}))
if err != nil {
return err
}
source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("SQL Server Source"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
Hostname: instance.PublicIpAddress,
Port: pulumi.Int(1433),
Username: user.Name,
Password: user.Password,
Database: db.Name,
},
})
if err != nil {
return err
}
destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("BigQuery Destination"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("SQL Server to BigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("stream"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: source.ID(),
SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
Schema: pulumi.String("schema"),
Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
Table: pulumi.String("table"),
},
},
},
},
},
ChangeTables: &datastream.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs{},
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destination.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "sql-server",
DatabaseVersion = "SQLSERVER_2019_STANDARD",
Region = "us-central1",
RootPassword = "root-password",
DeletionProtection = true,
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-custom-2-4096",
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Password = "password",
});
var db = new Gcp.Sql.Database("db", new()
{
Name = "db",
Instance = instance.Name,
}, new CustomResourceOptions
{
DependsOn =
{
user,
},
});
var source = new Gcp.Datastream.ConnectionProfile("source", new()
{
DisplayName = "SQL Server Source",
Location = "us-central1",
ConnectionProfileId = "source-profile",
SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
{
Hostname = instance.PublicIpAddress,
Port = 1433,
Username = user.Name,
Password = user.Password,
Database = db.Name,
},
});
var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
{
DisplayName = "BigQuery Destination",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "SQL Server to BigQuery",
Location = "us-central1",
StreamId = "stream",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = source.Id,
SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
{
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
{
Schemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
{
Schema = "schema",
Tables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
{
Table = "table",
},
},
},
},
},
ChangeTables = null,
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destination.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
},
},
BackfillNone = null,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("sql-server")
.databaseVersion("SQLSERVER_2019_STANDARD")
.region("us-central1")
.rootPassword("root-password")
.deletionProtection(true)
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-custom-2-4096")
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.password("password")
.build());
var db = new Database("db", DatabaseArgs.builder()
.name("db")
.instance(instance.name())
.build(), CustomResourceOptions.builder()
.dependsOn(user)
.build());
var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
.displayName("SQL Server Source")
.location("us-central1")
.connectionProfileId("source-profile")
.sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
.hostname(instance.publicIpAddress())
.port(1433)
.username(user.name())
.password(user.password())
.database(db.name())
.build())
.build());
var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
.displayName("BigQuery Destination")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("SQL Server to BigQuery")
.location("us-central1")
.streamId("stream")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(source.id())
.sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
.includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
.schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
.schema("schema")
.tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
.table("table")
.build())
.build())
.build())
.changeTables(StreamSourceConfigSqlServerSourceConfigChangeTablesArgs.builder()
.build())
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destination.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: sql-server
databaseVersion: SQLSERVER_2019_STANDARD
region: us-central1
rootPassword: root-password
deletionProtection: true
settings:
tier: db-custom-2-4096
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
db:
type: gcp:sql:Database
properties:
name: db
instance: ${instance.name}
options:
dependsOn:
- ${user}
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
password: password
source:
type: gcp:datastream:ConnectionProfile
properties:
displayName: SQL Server Source
location: us-central1
connectionProfileId: source-profile
sqlServerProfile:
hostname: ${instance.publicIpAddress}
port: 1433
username: ${user.name}
password: ${user.password}
database: ${db.name}
destination:
type: gcp:datastream:ConnectionProfile
properties:
displayName: BigQuery Destination
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
displayName: SQL Server to BigQuery
location: us-central1
streamId: stream
sourceConfig:
sourceConnectionProfile: ${source.id}
sqlServerSourceConfig:
includeObjects:
schemas:
- schema: schema
tables:
- table: table
changeTables: {}
destinationConfig:
destinationConnectionProfile: ${destination.id}
bigqueryDestinationConfig:
dataFreshness: 900s
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
backfillNone: {}
The changeTables property enables CDC through SQL Server’s change tracking feature rather than transaction logs. This approach has different performance characteristics and requires change tracking to be enabled on the source tables. It’s an alternative to the transaction log approach shown in the previous example.
Stream to a specific BigQuery dataset
When you need precise control over where data lands in BigQuery, you can target a specific dataset rather than letting Datastream create one automatically.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
const postgres = new gcp.bigquery.Dataset("postgres", {
datasetId: "postgres",
friendlyName: "postgres",
description: "Database of postgres",
location: "us-central1",
});
const destinationConnectionProfile2 = new gcp.datastream.ConnectionProfile("destination_connection_profile2", {
displayName: "Connection profile",
location: "us-central1",
connectionProfileId: "dest-profile",
bigqueryProfile: {},
});
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "instance-name",
databaseVersion: "MYSQL_8_0",
region: "us-central1",
settings: {
tier: "db-f1-micro",
backupConfiguration: {
enabled: true,
binaryLogEnabled: true,
},
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
deletionProtection: false,
});
const pwd = new random.index.Password("pwd", {
length: 16,
special: false,
});
const user = new gcp.sql.User("user", {
name: "my-user",
instance: instance.name,
host: "%",
password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
displayName: "Source connection profile",
location: "us-central1",
connectionProfileId: "source-profile",
mysqlProfile: {
hostname: instance.publicIpAddress,
username: user.name,
password: user.password,
},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "postgres to bigQuery",
location: "us-central1",
streamId: "postgres-bigquery",
sourceConfig: {
sourceConnectionProfile: sourceConnectionProfile.id,
mysqlSourceConfig: {},
},
destinationConfig: {
destinationConnectionProfile: destinationConnectionProfile2.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
singleTargetDataset: {
datasetId: postgres.id,
},
},
},
backfillAll: {},
});
const db = new gcp.sql.Database("db", {
instance: instance.name,
name: "db",
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
postgres = gcp.bigquery.Dataset("postgres",
dataset_id="postgres",
friendly_name="postgres",
description="Database of postgres",
location="us-central1")
destination_connection_profile2 = gcp.datastream.ConnectionProfile("destination_connection_profile2",
display_name="Connection profile",
location="us-central1",
connection_profile_id="dest-profile",
bigquery_profile={})
instance = gcp.sql.DatabaseInstance("instance",
name="instance-name",
database_version="MYSQL_8_0",
region="us-central1",
settings={
"tier": "db-f1-micro",
"backup_configuration": {
"enabled": True,
"binary_log_enabled": True,
},
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
},
deletion_protection=False)
pwd = random.index.Password("pwd",
length=16,
special=False)
user = gcp.sql.User("user",
name="my-user",
instance=instance.name,
host="%",
password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
display_name="Source connection profile",
location="us-central1",
connection_profile_id="source-profile",
mysql_profile={
"hostname": instance.public_ip_address,
"username": user.name,
"password": user.password,
})
default = gcp.datastream.Stream("default",
display_name="postgres to bigQuery",
location="us-central1",
stream_id="postgres-bigquery",
source_config={
"source_connection_profile": source_connection_profile.id,
"mysql_source_config": {},
},
destination_config={
"destination_connection_profile": destination_connection_profile2.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"single_target_dataset": {
"dataset_id": postgres.id,
},
},
},
backfill_all={})
db = gcp.sql.Database("db",
instance=instance.name,
name="db")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
postgres, err := bigquery.NewDataset(ctx, "postgres", &bigquery.DatasetArgs{
DatasetId: pulumi.String("postgres"),
FriendlyName: pulumi.String("postgres"),
Description: pulumi.String("Database of postgres"),
Location: pulumi.String("us-central1"),
})
if err != nil {
return err
}
destinationConnectionProfile2, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile2", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("dest-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("instance-name"),
DatabaseVersion: pulumi.String("MYSQL_8_0"),
Region: pulumi.String("us-central1"),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-f1-micro"),
BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
Enabled: pulumi.Bool(true),
BinaryLogEnabled: pulumi.Bool(true),
},
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
DeletionProtection: pulumi.Bool(false),
})
if err != nil {
return err
}
pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
Length: 16,
Special: false,
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("my-user"),
Instance: instance.Name,
Host: pulumi.String("%"),
Password: pwd.Result,
})
if err != nil {
return err
}
sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Source connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
Hostname: instance.PublicIpAddress,
Username: user.Name,
Password: user.Password,
},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("postgres to bigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("postgres-bigquery"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: sourceConnectionProfile.ID(),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destinationConnectionProfile2.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SingleTargetDataset: &datastream.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs{
DatasetId: postgres.ID(),
},
},
},
BackfillAll: &datastream.StreamBackfillAllArgs{},
})
if err != nil {
return err
}
_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Instance: instance.Name,
Name: pulumi.String("db"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
return await Deployment.RunAsync(() =>
{
var postgres = new Gcp.BigQuery.Dataset("postgres", new()
{
DatasetId = "postgres",
FriendlyName = "postgres",
Description = "Database of postgres",
Location = "us-central1",
});
var destinationConnectionProfile2 = new Gcp.Datastream.ConnectionProfile("destination_connection_profile2", new()
{
DisplayName = "Connection profile",
Location = "us-central1",
ConnectionProfileId = "dest-profile",
BigqueryProfile = null,
});
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "instance-name",
DatabaseVersion = "MYSQL_8_0",
Region = "us-central1",
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-f1-micro",
BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
{
Enabled = true,
BinaryLogEnabled = true,
},
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
DeletionProtection = false,
});
var pwd = new Random.Index.Password("pwd", new()
{
Length = 16,
Special = false,
});
var user = new Gcp.Sql.User("user", new()
{
Name = "my-user",
Instance = instance.Name,
Host = "%",
Password = pwd.Result,
});
var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
{
DisplayName = "Source connection profile",
Location = "us-central1",
ConnectionProfileId = "source-profile",
MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
{
Hostname = instance.PublicIpAddress,
Username = user.Name,
Password = user.Password,
},
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "postgres to bigQuery",
Location = "us-central1",
StreamId = "postgres-bigquery",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = sourceConnectionProfile.Id,
MysqlSourceConfig = null,
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destinationConnectionProfile2.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SingleTargetDataset = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs
{
DatasetId = postgres.Id,
},
},
},
BackfillAll = null,
});
var db = new Gcp.Sql.Database("db", new()
{
Instance = instance.Name,
Name = "db",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var postgres = new Dataset("postgres", DatasetArgs.builder()
.datasetId("postgres")
.friendlyName("postgres")
.description("Database of postgres")
.location("us-central1")
.build());
var destinationConnectionProfile2 = new ConnectionProfile("destinationConnectionProfile2", ConnectionProfileArgs.builder()
.displayName("Connection profile")
.location("us-central1")
.connectionProfileId("dest-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("instance-name")
.databaseVersion("MYSQL_8_0")
.region("us-central1")
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-f1-micro")
.backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
.enabled(true)
.binaryLogEnabled(true)
.build())
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.deletionProtection(false)
.build());
var pwd = new Password("pwd", PasswordArgs.builder()
.length(16)
.special(false)
.build());
var user = new User("user", UserArgs.builder()
.name("my-user")
.instance(instance.name())
.host("%")
.password(pwd.result())
.build());
var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Source connection profile")
.location("us-central1")
.connectionProfileId("source-profile")
.mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
.hostname(instance.publicIpAddress())
.username(user.name())
.password(user.password())
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("postgres to bigQuery")
.location("us-central1")
.streamId("postgres-bigquery")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(sourceConnectionProfile.id())
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destinationConnectionProfile2.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.singleTargetDataset(StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs.builder()
.datasetId(postgres.id())
.build())
.build())
.build())
.backfillAll(StreamBackfillAllArgs.builder()
.build())
.build());
var db = new Database("db", DatabaseArgs.builder()
.instance(instance.name())
.name("db")
.build());
}
}
resources:
postgres:
type: gcp:bigquery:Dataset
properties:
datasetId: postgres
friendlyName: postgres
description: Database of postgres
location: us-central1
default:
type: gcp:datastream:Stream
properties:
displayName: postgres to bigQuery
location: us-central1
streamId: postgres-bigquery
sourceConfig:
sourceConnectionProfile: ${sourceConnectionProfile.id}
mysqlSourceConfig: {}
destinationConfig:
destinationConnectionProfile: ${destinationConnectionProfile2.id}
bigqueryDestinationConfig:
dataFreshness: 900s
singleTargetDataset:
datasetId: ${postgres.id}
backfillAll: {}
destinationConnectionProfile2:
type: gcp:datastream:ConnectionProfile
name: destination_connection_profile2
properties:
displayName: Connection profile
location: us-central1
connectionProfileId: dest-profile
bigqueryProfile: {}
instance:
type: gcp:sql:DatabaseInstance
properties:
name: instance-name
databaseVersion: MYSQL_8_0
region: us-central1
settings:
tier: db-f1-micro
backupConfiguration:
enabled: true
binaryLogEnabled: true
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
deletionProtection: false
db:
type: gcp:sql:Database
properties:
instance: ${instance.name}
name: db
pwd:
type: random:Password
properties:
length: 16
special: false
user:
type: gcp:sql:User
properties:
name: my-user
instance: ${instance.name}
host: '%'
password: ${pwd.result}
sourceConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: source_connection_profile
properties:
displayName: Source connection profile
location: us-central1
connectionProfileId: source-profile
mysqlProfile:
hostname: ${instance.publicIpAddress}
username: ${user.name}
password: ${user.password}
The singleTargetDataset property directs all replicated data to a specific BigQuery dataset by ID. This contrasts with sourceHierarchyDatasets, which creates datasets automatically based on source schema structure. The referenced dataset must exist before the stream starts.
Use append-only mode for immutable data
Append-only mode writes all changes as new rows rather than updating existing ones, preserving complete change history.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "my-instance",
databaseVersion: "MYSQL_8_0",
region: "us-central1",
settings: {
tier: "db-f1-micro",
backupConfiguration: {
enabled: true,
binaryLogEnabled: true,
},
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
instance: instance.name,
name: "db",
});
const pwd = new random.index.Password("pwd", {
length: 16,
special: false,
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
host: "%",
password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
displayName: "Source connection profile",
location: "us-central1",
connectionProfileId: "source-profile",
mysqlProfile: {
hostname: instance.publicIpAddress,
username: user.name,
password: user.password,
},
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
displayName: "Connection profile",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
streamId: "my-stream",
location: "us-central1",
displayName: "my stream",
sourceConfig: {
sourceConnectionProfile: sourceConnectionProfile.id,
mysqlSourceConfig: {},
},
destinationConfig: {
destinationConnectionProfile: destinationConnectionProfile.id,
bigqueryDestinationConfig: {
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
appendOnly: {},
},
},
backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
name="my-instance",
database_version="MYSQL_8_0",
region="us-central1",
settings={
"tier": "db-f1-micro",
"backup_configuration": {
"enabled": True,
"binary_log_enabled": True,
},
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
},
deletion_protection=True)
db = gcp.sql.Database("db",
instance=instance.name,
name="db")
pwd = random.index.Password("pwd",
length=16,
special=False)
user = gcp.sql.User("user",
name="user",
instance=instance.name,
host="%",
password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
display_name="Source connection profile",
location="us-central1",
connection_profile_id="source-profile",
mysql_profile={
"hostname": instance.public_ip_address,
"username": user.name,
"password": user.password,
})
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
display_name="Connection profile",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
stream_id="my-stream",
location="us-central1",
display_name="my stream",
source_config={
"source_connection_profile": source_connection_profile.id,
"mysql_source_config": {},
},
destination_config={
"destination_connection_profile": destination_connection_profile.id,
"bigquery_destination_config": {
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
"append_only": {},
},
},
backfill_none={})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("my-instance"),
DatabaseVersion: pulumi.String("MYSQL_8_0"),
Region: pulumi.String("us-central1"),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-f1-micro"),
BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
Enabled: pulumi.Bool(true),
BinaryLogEnabled: pulumi.Bool(true),
},
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
DeletionProtection: pulumi.Bool(true),
})
if err != nil {
return err
}
_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Instance: instance.Name,
Name: pulumi.String("db"),
})
if err != nil {
return err
}
pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
Length: 16,
Special: false,
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Host: pulumi.String("%"),
Password: pwd.Result,
})
if err != nil {
return err
}
sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Source connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
Hostname: instance.PublicIpAddress,
Username: user.Name,
Password: user.Password,
},
})
if err != nil {
return err
}
destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
StreamId: pulumi.String("my-stream"),
Location: pulumi.String("us-central1"),
DisplayName: pulumi.String("my stream"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: sourceConnectionProfile.ID(),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destinationConnectionProfile.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
AppendOnly: &datastream.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs{},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
return await Deployment.RunAsync(() =>
{
var project = Gcp.Organizations.GetProject.Invoke();
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "my-instance",
DatabaseVersion = "MYSQL_8_0",
Region = "us-central1",
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-f1-micro",
BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
{
Enabled = true,
BinaryLogEnabled = true,
},
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
DeletionProtection = true,
});
var db = new Gcp.Sql.Database("db", new()
{
Instance = instance.Name,
Name = "db",
});
var pwd = new Random.Index.Password("pwd", new()
{
Length = 16,
Special = false,
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Host = "%",
Password = pwd.Result,
});
var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
{
DisplayName = "Source connection profile",
Location = "us-central1",
ConnectionProfileId = "source-profile",
MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
{
Hostname = instance.PublicIpAddress,
Username = user.Name,
Password = user.Password,
},
});
var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
{
DisplayName = "Connection profile",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
StreamId = "my-stream",
Location = "us-central1",
DisplayName = "my stream",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = sourceConnectionProfile.Id,
MysqlSourceConfig = null,
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destinationConnectionProfile.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
AppendOnly = null,
},
},
BackfillNone = null,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("my-instance")
.databaseVersion("MYSQL_8_0")
.region("us-central1")
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-f1-micro")
.backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
.enabled(true)
.binaryLogEnabled(true)
.build())
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.deletionProtection(true)
.build());
var db = new Database("db", DatabaseArgs.builder()
.instance(instance.name())
.name("db")
.build());
var pwd = new Password("pwd", PasswordArgs.builder()
.length(16)
.special(false)
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.host("%")
.password(pwd.result())
.build());
var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Source connection profile")
.location("us-central1")
.connectionProfileId("source-profile")
.mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
.hostname(instance.publicIpAddress())
.username(user.name())
.password(user.password())
.build())
.build());
var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Connection profile")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.streamId("my-stream")
.location("us-central1")
.displayName("my stream")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(sourceConnectionProfile.id())
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destinationConnectionProfile.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.appendOnly(StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs.builder()
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: my-instance
databaseVersion: MYSQL_8_0
region: us-central1
settings:
tier: db-f1-micro
backupConfiguration:
enabled: true
binaryLogEnabled: true
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
deletionProtection: true
db:
type: gcp:sql:Database
properties:
instance: ${instance.name}
name: db
pwd:
type: random:Password
properties:
length: 16
special: false
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
host: '%'
password: ${pwd.result}
sourceConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: source_connection_profile
properties:
displayName: Source connection profile
location: us-central1
connectionProfileId: source-profile
mysqlProfile:
hostname: ${instance.publicIpAddress}
username: ${user.name}
password: ${user.password}
destinationConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: destination_connection_profile
properties:
displayName: Connection profile
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
streamId: my-stream
location: us-central1
displayName: my stream
sourceConfig:
sourceConnectionProfile: ${sourceConnectionProfile.id}
mysqlSourceConfig: {}
destinationConfig:
destinationConnectionProfile: ${destinationConnectionProfile.id}
bigqueryDestinationConfig:
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
appendOnly: {}
backfillNone: {}
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The appendOnly property in bigqueryDestinationConfig changes how Datastream writes to BigQuery. Instead of updating rows in place, every change becomes a new row with metadata indicating the operation type. This preserves full change history for audit trails or time-travel queries.
Configure BigLake Managed Tables for query federation
BigLake Managed Tables store data in Cloud Storage while providing BigQuery query access, enabling cost-effective storage with federated analytics.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "blmt-instance",
databaseVersion: "MYSQL_8_0",
region: "us-central1",
settings: {
tier: "db-f1-micro",
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
instance: instance.name,
name: "db",
});
const pwd = new random.index.Password("pwd", {
length: 16,
special: false,
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
host: "%",
password: pwd.result,
});
const blmtBucket = new gcp.storage.Bucket("blmt_bucket", {
name: "blmt-bucket",
location: "us-central1",
forceDestroy: true,
});
const blmtConnection = new gcp.bigquery.Connection("blmt_connection", {
project: project.then(project => project.projectId),
location: "us-central1",
connectionId: "blmt-connection",
friendlyName: "Datastream BLMT Test Connection",
description: "Connection for Datastream BLMT test",
cloudResource: {},
});
const blmtConnectionBucketAdmin = new gcp.storage.BucketIAMMember("blmt_connection_bucket_admin", {
bucket: blmtBucket.name,
role: "roles/storage.admin",
member: blmtConnection.cloudResource.apply(cloudResource => `serviceAccount:${cloudResource?.serviceAccountId}`),
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
displayName: "Source connection profile",
location: "us-central1",
connectionProfileId: "blmt-source-profile",
mysqlProfile: {
hostname: instance.publicIpAddress,
username: user.name,
password: user.password,
},
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
displayName: "Connection profile",
location: "us-central1",
connectionProfileId: "blmt-destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
streamId: "blmt-stream",
location: "us-central1",
displayName: "My BLMT stream",
sourceConfig: {
sourceConnectionProfile: sourceConnectionProfile.id,
mysqlSourceConfig: {},
},
destinationConfig: {
destinationConnectionProfile: destinationConnectionProfile.id,
bigqueryDestinationConfig: {
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
blmtConfig: {
bucket: blmtBucket.name,
connectionName: pulumi.all([blmtConnection.project, blmtConnection.location, blmtConnection.connectionId]).apply(([project, location, connectionId]) => `${project}.${location}.${connectionId}`),
fileFormat: "PARQUET",
tableFormat: "ICEBERG",
rootPath: "/",
},
appendOnly: {},
},
},
backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
name="blmt-instance",
database_version="MYSQL_8_0",
region="us-central1",
settings={
"tier": "db-f1-micro",
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
},
deletion_protection=True)
db = gcp.sql.Database("db",
instance=instance.name,
name="db")
pwd = random.index.Password("pwd",
length=16,
special=False)
user = gcp.sql.User("user",
name="user",
instance=instance.name,
host="%",
password=pwd["result"])
blmt_bucket = gcp.storage.Bucket("blmt_bucket",
name="blmt-bucket",
location="us-central1",
force_destroy=True)
blmt_connection = gcp.bigquery.Connection("blmt_connection",
project=project.project_id,
location="us-central1",
connection_id="blmt-connection",
friendly_name="Datastream BLMT Test Connection",
description="Connection for Datastream BLMT test",
cloud_resource={})
blmt_connection_bucket_admin = gcp.storage.BucketIAMMember("blmt_connection_bucket_admin",
bucket=blmt_bucket.name,
role="roles/storage.admin",
member=blmt_connection.cloud_resource.apply(lambda cloud_resource: f"serviceAccount:{cloud_resource.service_account_id}"))
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
display_name="Source connection profile",
location="us-central1",
connection_profile_id="blmt-source-profile",
mysql_profile={
"hostname": instance.public_ip_address,
"username": user.name,
"password": user.password,
})
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
display_name="Connection profile",
location="us-central1",
connection_profile_id="blmt-destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
stream_id="blmt-stream",
location="us-central1",
display_name="My BLMT stream",
source_config={
"source_connection_profile": source_connection_profile.id,
"mysql_source_config": {},
},
destination_config={
"destination_connection_profile": destination_connection_profile.id,
"bigquery_destination_config": {
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
"blmt_config": {
"bucket": blmt_bucket.name,
"connection_name": pulumi.Output.all(
project=blmt_connection.project,
location=blmt_connection.location,
connection_id=blmt_connection.connection_id
).apply(lambda resolved_outputs: f"{resolved_outputs['project']}.{resolved_outputs['location']}.{resolved_outputs['connection_id']}")
,
"file_format": "PARQUET",
"table_format": "ICEBERG",
"root_path": "/",
},
"append_only": {},
},
},
backfill_none={})
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("blmt-instance"),
DatabaseVersion: pulumi.String("MYSQL_8_0"),
Region: pulumi.String("us-central1"),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-f1-micro"),
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
DeletionProtection: pulumi.Bool(true),
})
if err != nil {
return err
}
_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Instance: instance.Name,
Name: pulumi.String("db"),
})
if err != nil {
return err
}
pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
Length: 16,
Special: false,
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Host: pulumi.String("%"),
Password: pwd.Result,
})
if err != nil {
return err
}
blmtBucket, err := storage.NewBucket(ctx, "blmt_bucket", &storage.BucketArgs{
Name: pulumi.String("blmt-bucket"),
Location: pulumi.String("us-central1"),
ForceDestroy: pulumi.Bool(true),
})
if err != nil {
return err
}
blmtConnection, err := bigquery.NewConnection(ctx, "blmt_connection", &bigquery.ConnectionArgs{
Project: pulumi.String(project.ProjectId),
Location: pulumi.String("us-central1"),
ConnectionId: pulumi.String("blmt-connection"),
FriendlyName: pulumi.String("Datastream BLMT Test Connection"),
Description: pulumi.String("Connection for Datastream BLMT test"),
CloudResource: &bigquery.ConnectionCloudResourceArgs{},
})
if err != nil {
return err
}
_, err = storage.NewBucketIAMMember(ctx, "blmt_connection_bucket_admin", &storage.BucketIAMMemberArgs{
Bucket: blmtBucket.Name,
Role: pulumi.String("roles/storage.admin"),
Member: blmtConnection.CloudResource.ApplyT(func(cloudResource bigquery.ConnectionCloudResource) (string, error) {
return fmt.Sprintf("serviceAccount:%v", cloudResource.ServiceAccountId), nil
}).(pulumi.StringOutput),
})
if err != nil {
return err
}
sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Source connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("blmt-source-profile"),
MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
Hostname: instance.PublicIpAddress,
Username: user.Name,
Password: user.Password,
},
})
if err != nil {
return err
}
destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("blmt-destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
StreamId: pulumi.String("blmt-stream"),
Location: pulumi.String("us-central1"),
DisplayName: pulumi.String("My BLMT stream"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: sourceConnectionProfile.ID(),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destinationConnectionProfile.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
BlmtConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs{
Bucket: blmtBucket.Name,
ConnectionName: pulumi.All(blmtConnection.Project, blmtConnection.Location, blmtConnection.ConnectionId).ApplyT(func(_args []interface{}) (string, error) {
project := _args[0].(string)
location := _args[1].(*string)
connectionId := _args[2].(string)
return fmt.Sprintf("%v.%v.%v", project, location, connectionId), nil
}).(pulumi.StringOutput),
FileFormat: pulumi.String("PARQUET"),
TableFormat: pulumi.String("ICEBERG"),
RootPath: pulumi.String("/"),
},
AppendOnly: &datastream.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs{},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
return await Deployment.RunAsync(() =>
{
var project = Gcp.Organizations.GetProject.Invoke();
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "blmt-instance",
DatabaseVersion = "MYSQL_8_0",
Region = "us-central1",
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-f1-micro",
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
DeletionProtection = true,
});
var db = new Gcp.Sql.Database("db", new()
{
Instance = instance.Name,
Name = "db",
});
var pwd = new Random.Index.Password("pwd", new()
{
Length = 16,
Special = false,
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Host = "%",
Password = pwd.Result,
});
var blmtBucket = new Gcp.Storage.Bucket("blmt_bucket", new()
{
Name = "blmt-bucket",
Location = "us-central1",
ForceDestroy = true,
});
var blmtConnection = new Gcp.BigQuery.Connection("blmt_connection", new()
{
Project = project.Apply(getProjectResult => getProjectResult.ProjectId),
Location = "us-central1",
ConnectionId = "blmt-connection",
FriendlyName = "Datastream BLMT Test Connection",
Description = "Connection for Datastream BLMT test",
CloudResource = null,
});
var blmtConnectionBucketAdmin = new Gcp.Storage.BucketIAMMember("blmt_connection_bucket_admin", new()
{
Bucket = blmtBucket.Name,
Role = "roles/storage.admin",
Member = blmtConnection.CloudResource.Apply(cloudResource => $"serviceAccount:{cloudResource?.ServiceAccountId}"),
});
var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
{
DisplayName = "Source connection profile",
Location = "us-central1",
ConnectionProfileId = "blmt-source-profile",
MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
{
Hostname = instance.PublicIpAddress,
Username = user.Name,
Password = user.Password,
},
});
var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
{
DisplayName = "Connection profile",
Location = "us-central1",
ConnectionProfileId = "blmt-destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
StreamId = "blmt-stream",
Location = "us-central1",
DisplayName = "My BLMT stream",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = sourceConnectionProfile.Id,
MysqlSourceConfig = null,
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destinationConnectionProfile.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
BlmtConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs
{
Bucket = blmtBucket.Name,
ConnectionName = Output.Tuple(blmtConnection.Project, blmtConnection.Location, blmtConnection.ConnectionId).Apply(values =>
{
var project = values.Item1;
var location = values.Item2;
var connectionId = values.Item3;
return $"{project}.{location}.{connectionId}";
}),
FileFormat = "PARQUET",
TableFormat = "ICEBERG",
RootPath = "/",
},
AppendOnly = null,
},
},
BackfillNone = null,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionCloudResourceArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("blmt-instance")
.databaseVersion("MYSQL_8_0")
.region("us-central1")
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-f1-micro")
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.deletionProtection(true)
.build());
var db = new Database("db", DatabaseArgs.builder()
.instance(instance.name())
.name("db")
.build());
var pwd = new Password("pwd", PasswordArgs.builder()
.length(16)
.special(false)
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.host("%")
.password(pwd.result())
.build());
var blmtBucket = new Bucket("blmtBucket", BucketArgs.builder()
.name("blmt-bucket")
.location("us-central1")
.forceDestroy(true)
.build());
var blmtConnection = new Connection("blmtConnection", ConnectionArgs.builder()
.project(project.projectId())
.location("us-central1")
.connectionId("blmt-connection")
.friendlyName("Datastream BLMT Test Connection")
.description("Connection for Datastream BLMT test")
.cloudResource(ConnectionCloudResourceArgs.builder()
.build())
.build());
var blmtConnectionBucketAdmin = new BucketIAMMember("blmtConnectionBucketAdmin", BucketIAMMemberArgs.builder()
.bucket(blmtBucket.name())
.role("roles/storage.admin")
.member(blmtConnection.cloudResource().applyValue(_cloudResource -> String.format("serviceAccount:%s", _cloudResource.serviceAccountId())))
.build());
var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Source connection profile")
.location("us-central1")
.connectionProfileId("blmt-source-profile")
.mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
.hostname(instance.publicIpAddress())
.username(user.name())
.password(user.password())
.build())
.build());
var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Connection profile")
.location("us-central1")
.connectionProfileId("blmt-destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.streamId("blmt-stream")
.location("us-central1")
.displayName("My BLMT stream")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(sourceConnectionProfile.id())
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destinationConnectionProfile.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.blmtConfig(StreamDestinationConfigBigqueryDestinationConfigBlmtConfigArgs.builder()
.bucket(blmtBucket.name())
.connectionName(Output.tuple(blmtConnection.project(), blmtConnection.location(), blmtConnection.connectionId()).applyValue(values -> {
var project = values.t1;
var location = values.t2;
var connectionId = values.t3;
return String.format("%s.%s.%s", project,location,connectionId);
}))
.fileFormat("PARQUET")
.tableFormat("ICEBERG")
.rootPath("/")
.build())
.appendOnly(StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs.builder()
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: blmt-instance
databaseVersion: MYSQL_8_0
region: us-central1
settings:
tier: db-f1-micro
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
deletionProtection: true
db:
type: gcp:sql:Database
properties:
instance: ${instance.name}
name: db
pwd:
type: random:Password
properties:
length: 16
special: false
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
host: '%'
password: ${pwd.result}
blmtBucket:
type: gcp:storage:Bucket
name: blmt_bucket
properties:
name: blmt-bucket
location: us-central1
forceDestroy: true
blmtConnection:
type: gcp:bigquery:Connection
name: blmt_connection
properties:
project: ${project.projectId}
location: us-central1
connectionId: blmt-connection
friendlyName: Datastream BLMT Test Connection
description: Connection for Datastream BLMT test
cloudResource: {}
blmtConnectionBucketAdmin:
type: gcp:storage:BucketIAMMember
name: blmt_connection_bucket_admin
properties:
bucket: ${blmtBucket.name}
role: roles/storage.admin
member: serviceAccount:${blmtConnection.cloudResource.serviceAccountId}
sourceConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: source_connection_profile
properties:
displayName: Source connection profile
location: us-central1
connectionProfileId: blmt-source-profile
mysqlProfile:
hostname: ${instance.publicIpAddress}
username: ${user.name}
password: ${user.password}
destinationConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: destination_connection_profile
properties:
displayName: Connection profile
location: us-central1
connectionProfileId: blmt-destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
streamId: blmt-stream
location: us-central1
displayName: My BLMT stream
sourceConfig:
sourceConnectionProfile: ${sourceConnectionProfile.id}
mysqlSourceConfig: {}
destinationConfig:
destinationConnectionProfile: ${destinationConnectionProfile.id}
bigqueryDestinationConfig:
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
blmtConfig:
bucket: ${blmtBucket.name}
connectionName: ${blmtConnection.project}.${blmtConnection.location}.${blmtConnection.connectionId}
fileFormat: PARQUET
tableFormat: ICEBERG
rootPath: /
appendOnly: {}
backfillNone: {}
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The blmtConfig property configures BigLake Managed Tables, which store data in GCS (specified by bucket) while exposing it through BigQuery. The connectionName references a BigQuery connection that provides the service account for GCS access. The fileFormat and tableFormat properties control how data is stored (PARQUET files in ICEBERG table format). The BigQuery connection’s service account needs storage.admin role on the bucket.
Beyond these examples
These snippets focus on specific stream-level features: source database configuration for MySQL, PostgreSQL, Oracle, SQL Server, and MongoDB; destination configuration for Cloud Storage and BigQuery; CDC mechanisms like transaction logs, replication slots, and change tables; and data filtering and backfill strategies. They’re intentionally minimal rather than full replication pipelines.
The examples reference pre-existing infrastructure such as Datastream connection profiles for sources and destinations, source databases with CDC enabled, BigQuery datasets, GCS buckets, KMS keys, and IAM permissions for the Datastream service account. They focus on configuring the stream rather than provisioning the surrounding infrastructure.
To keep things focused, common stream patterns are omitted, including:
- Stream lifecycle management (desiredState for starting/pausing)
- Private connectivity configuration (VPC peering, Private Service Connect)
- Monitoring and alerting integration
- Advanced filtering (complex include/exclude patterns)
- Error handling and retry configuration
- Cross-region replication patterns
These omissions are intentional: the goal is to illustrate how each stream feature is wired, not provide drop-in replication modules. See the Datastream Stream resource reference for all available configuration options.
Let's create GCP Datastream Streams
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Configuration & Lifecycle
location, streamId, customerManagedEncryptionKey, and createWithoutValidation properties are immutable. Changing them requires recreating the stream.desiredState to RUNNING to start immediately, NOT_STARTED to create without starting (default), or PAUSED to pause a running stream.labels field is non-authoritative and only manages labels in your configuration. Use effectiveLabels to view all labels on the resource, including those managed by other clients.IAM & Permissions
service-PROJECT_NUMBER@gcp-sa-datastream.iam.gserviceaccount.com) three roles on the GCS bucket: roles/storage.objectViewer, roles/storage.objectCreator, and roles/storage.legacyBucketReader.customerManagedEncryptionKey to your KMS key name and grant the service account roles/cloudkms.cryptoKeyEncrypterDecrypter on the key. Use dependsOn to ensure the IAM binding completes before stream creation.projectId in sourceHierarchyDatasets to specify the target project. Ensure the Datastream service account has roles/bigquery.admin in the target project.Backfill Strategies
backfillAll automatically backfills the stream’s objects (with optional excludeObjects), while backfillNone disables automatic backfill. Choose one based on whether you need historical data.backfillAll with source-specific excludeObjects (e.g., mysqlExcludedObjects, postgresqlExcludedObjects, oracleExcludedObjects) to specify databases, tables, or columns to exclude.Source-Specific Configuration
transactionLogs uses SQL Server transaction logs for change data capture, while changeTables uses SQL Server change tables. Choose one based on your SQL Server CDC configuration.gtid: {} in mysqlSourceConfig instead of using binary log position. This enables GTID-based replication for MySQL sources.Destination Configuration
singleTargetDataset with datasetId in bigqueryDestinationConfig instead of sourceHierarchyDatasets.appendOnly: {} in bigqueryDestinationConfig to append data without updates or deletes. This mode is useful for audit logs or immutable data.blmtConfig with bucket, connectionName, fileFormat (PARQUET), tableFormat (ICEBERG), and rootPath. The BigQuery connection service account needs roles/storage.admin on the bucket.Using a different cloud?
Explore analytics guides for other cloud providers: