The gcp:datastream/stream:Stream resource, part of the Pulumi GCP provider, defines a Datastream replication stream that continuously copies data from a source database to a destination. This guide focuses on three capabilities: source database configuration for MySQL, PostgreSQL, and SQL Server; destination configuration for Cloud Storage and BigQuery; and filtering, backfill strategies, and BigQuery optimizations.
Streams depend on connection profiles that define source and destination endpoints, plus IAM permissions for the Datastream service account to access storage, BigQuery, and KMS resources. The examples are intentionally small. Combine them with your own connection profiles and infrastructure.
Stream MySQL to Cloud Storage with filtering and encryption
Data pipelines often replicate MySQL databases to Cloud Storage for analytics, applying table and column filters to control what gets replicated.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "my-instance",
databaseVersion: "MYSQL_8_0",
region: "us-central1",
settings: {
tier: "db-f1-micro",
backupConfiguration: {
enabled: true,
binaryLogEnabled: true,
},
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
instance: instance.name,
name: "db",
});
const pwd = new random.index.Password("pwd", {
length: 16,
special: false,
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
host: "%",
password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
displayName: "Source connection profile",
location: "us-central1",
connectionProfileId: "source-profile",
mysqlProfile: {
hostname: instance.publicIpAddress,
username: user.name,
password: user.password,
},
});
const bucket = new gcp.storage.Bucket("bucket", {
name: "my-bucket",
location: "US",
uniformBucketLevelAccess: true,
});
const viewer = new gcp.storage.BucketIAMMember("viewer", {
bucket: bucket.name,
role: "roles/storage.objectViewer",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const creator = new gcp.storage.BucketIAMMember("creator", {
bucket: bucket.name,
role: "roles/storage.objectCreator",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const reader = new gcp.storage.BucketIAMMember("reader", {
bucket: bucket.name,
role: "roles/storage.legacyBucketReader",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const keyUser = new gcp.kms.CryptoKeyIAMMember("key_user", {
cryptoKeyId: "kms-name",
role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com`),
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
displayName: "Connection profile",
location: "us-central1",
connectionProfileId: "destination-profile",
gcsProfile: {
bucket: bucket.name,
rootPath: "/path",
},
});
const _default = new gcp.datastream.Stream("default", {
streamId: "my-stream",
desiredState: "NOT_STARTED",
location: "us-central1",
displayName: "my stream",
labels: {
key: "value",
},
sourceConfig: {
sourceConnectionProfile: sourceConnectionProfile.id,
mysqlSourceConfig: {
includeObjects: {
mysqlDatabases: [{
database: "my-database",
mysqlTables: [
{
table: "includedTable",
mysqlColumns: [{
column: "includedColumn",
dataType: "VARCHAR",
collation: "utf8mb4",
primaryKey: false,
nullable: false,
ordinalPosition: 0,
}],
},
{
table: "includedTable_2",
},
],
}],
},
excludeObjects: {
mysqlDatabases: [{
database: "my-database",
mysqlTables: [{
table: "excludedTable",
mysqlColumns: [{
column: "excludedColumn",
dataType: "VARCHAR",
collation: "utf8mb4",
primaryKey: false,
nullable: false,
ordinalPosition: 0,
}],
}],
}],
},
maxConcurrentCdcTasks: 5,
},
},
destinationConfig: {
destinationConnectionProfile: destinationConnectionProfile.id,
gcsDestinationConfig: {
path: "mydata",
fileRotationMb: 200,
fileRotationInterval: "60s",
jsonFileFormat: {
schemaFileFormat: "NO_SCHEMA_FILE",
compression: "GZIP",
},
},
},
backfillAll: {
mysqlExcludedObjects: {
mysqlDatabases: [{
database: "my-database",
mysqlTables: [{
table: "excludedTable",
mysqlColumns: [{
column: "excludedColumn",
dataType: "VARCHAR",
collation: "utf8mb4",
primaryKey: false,
nullable: false,
ordinalPosition: 0,
}],
}],
}],
},
},
customerManagedEncryptionKey: "kms-name",
}, {
dependsOn: [keyUser],
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
name="my-instance",
database_version="MYSQL_8_0",
region="us-central1",
settings={
"tier": "db-f1-micro",
"backup_configuration": {
"enabled": True,
"binary_log_enabled": True,
},
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
},
deletion_protection=True)
db = gcp.sql.Database("db",
instance=instance.name,
name="db")
pwd = random.index.Password("pwd",
length=16,
special=False)
user = gcp.sql.User("user",
name="user",
instance=instance.name,
host="%",
password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
display_name="Source connection profile",
location="us-central1",
connection_profile_id="source-profile",
mysql_profile={
"hostname": instance.public_ip_address,
"username": user.name,
"password": user.password,
})
bucket = gcp.storage.Bucket("bucket",
name="my-bucket",
location="US",
uniform_bucket_level_access=True)
viewer = gcp.storage.BucketIAMMember("viewer",
bucket=bucket.name,
role="roles/storage.objectViewer",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
creator = gcp.storage.BucketIAMMember("creator",
bucket=bucket.name,
role="roles/storage.objectCreator",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
reader = gcp.storage.BucketIAMMember("reader",
bucket=bucket.name,
role="roles/storage.legacyBucketReader",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
key_user = gcp.kms.CryptoKeyIAMMember("key_user",
crypto_key_id="kms-name",
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
member=f"serviceAccount:service-{project.number}@gcp-sa-datastream.iam.gserviceaccount.com")
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
display_name="Connection profile",
location="us-central1",
connection_profile_id="destination-profile",
gcs_profile={
"bucket": bucket.name,
"root_path": "/path",
})
default = gcp.datastream.Stream("default",
stream_id="my-stream",
desired_state="NOT_STARTED",
location="us-central1",
display_name="my stream",
labels={
"key": "value",
},
source_config={
"source_connection_profile": source_connection_profile.id,
"mysql_source_config": {
"include_objects": {
"mysql_databases": [{
"database": "my-database",
"mysql_tables": [
{
"table": "includedTable",
"mysql_columns": [{
"column": "includedColumn",
"data_type": "VARCHAR",
"collation": "utf8mb4",
"primary_key": False,
"nullable": False,
"ordinal_position": 0,
}],
},
{
"table": "includedTable_2",
},
],
}],
},
"exclude_objects": {
"mysql_databases": [{
"database": "my-database",
"mysql_tables": [{
"table": "excludedTable",
"mysql_columns": [{
"column": "excludedColumn",
"data_type": "VARCHAR",
"collation": "utf8mb4",
"primary_key": False,
"nullable": False,
"ordinal_position": 0,
}],
}],
}],
},
"max_concurrent_cdc_tasks": 5,
},
},
destination_config={
"destination_connection_profile": destination_connection_profile.id,
"gcs_destination_config": {
"path": "mydata",
"file_rotation_mb": 200,
"file_rotation_interval": "60s",
"json_file_format": {
"schema_file_format": "NO_SCHEMA_FILE",
"compression": "GZIP",
},
},
},
backfill_all={
"mysql_excluded_objects": {
"mysql_databases": [{
"database": "my-database",
"mysql_tables": [{
"table": "excludedTable",
"mysql_columns": [{
"column": "excludedColumn",
"data_type": "VARCHAR",
"collation": "utf8mb4",
"primary_key": False,
"nullable": False,
"ordinal_position": 0,
}],
}],
}],
},
},
customer_managed_encryption_key="kms-name",
opts = pulumi.ResourceOptions(depends_on=[key_user]))
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/kms"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/storage"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("my-instance"),
DatabaseVersion: pulumi.String("MYSQL_8_0"),
Region: pulumi.String("us-central1"),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-f1-micro"),
BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
Enabled: pulumi.Bool(true),
BinaryLogEnabled: pulumi.Bool(true),
},
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
DeletionProtection: pulumi.Bool(true),
})
if err != nil {
return err
}
_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Instance: instance.Name,
Name: pulumi.String("db"),
})
if err != nil {
return err
}
pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
Length: 16,
Special: false,
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Host: pulumi.String("%"),
Password: pwd.Result,
})
if err != nil {
return err
}
sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Source connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
Hostname: instance.PublicIpAddress,
Username: user.Name,
Password: user.Password,
},
})
if err != nil {
return err
}
bucket, err := storage.NewBucket(ctx, "bucket", &storage.BucketArgs{
Name: pulumi.String("my-bucket"),
Location: pulumi.String("US"),
UniformBucketLevelAccess: pulumi.Bool(true),
})
if err != nil {
return err
}
_, err = storage.NewBucketIAMMember(ctx, "viewer", &storage.BucketIAMMemberArgs{
Bucket: bucket.Name,
Role: pulumi.String("roles/storage.objectViewer"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
_, err = storage.NewBucketIAMMember(ctx, "creator", &storage.BucketIAMMemberArgs{
Bucket: bucket.Name,
Role: pulumi.String("roles/storage.objectCreator"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
_, err = storage.NewBucketIAMMember(ctx, "reader", &storage.BucketIAMMemberArgs{
Bucket: bucket.Name,
Role: pulumi.String("roles/storage.legacyBucketReader"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
keyUser, err := kms.NewCryptoKeyIAMMember(ctx, "key_user", &kms.CryptoKeyIAMMemberArgs{
CryptoKeyId: pulumi.String("kms-name"),
Role: pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datastream.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
GcsProfile: &datastream.ConnectionProfileGcsProfileArgs{
Bucket: bucket.Name,
RootPath: pulumi.String("/path"),
},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
StreamId: pulumi.String("my-stream"),
DesiredState: pulumi.String("NOT_STARTED"),
Location: pulumi.String("us-central1"),
DisplayName: pulumi.String("my stream"),
Labels: pulumi.StringMap{
"key": pulumi.String("value"),
},
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: sourceConnectionProfile.ID(),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{
IncludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs{
MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArray{
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs{
Database: pulumi.String("my-database"),
MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArray{
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("includedTable"),
MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
Column: pulumi.String("includedColumn"),
DataType: pulumi.String("VARCHAR"),
Collation: pulumi.String("utf8mb4"),
PrimaryKey: pulumi.Bool(false),
Nullable: pulumi.Bool(false),
OrdinalPosition: pulumi.Int(0),
},
},
},
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("includedTable_2"),
},
},
},
},
},
ExcludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs{
MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArray{
&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs{
Database: pulumi.String("my-database"),
MysqlTables: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArray{
&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("excludedTable"),
MysqlColumns: datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
&datastream.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
Column: pulumi.String("excludedColumn"),
DataType: pulumi.String("VARCHAR"),
Collation: pulumi.String("utf8mb4"),
PrimaryKey: pulumi.Bool(false),
Nullable: pulumi.Bool(false),
OrdinalPosition: pulumi.Int(0),
},
},
},
},
},
},
},
MaxConcurrentCdcTasks: pulumi.Int(5),
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destinationConnectionProfile.ID(),
GcsDestinationConfig: &datastream.StreamDestinationConfigGcsDestinationConfigArgs{
Path: pulumi.String("mydata"),
FileRotationMb: pulumi.Int(200),
FileRotationInterval: pulumi.String("60s"),
JsonFileFormat: &datastream.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs{
SchemaFileFormat: pulumi.String("NO_SCHEMA_FILE"),
Compression: pulumi.String("GZIP"),
},
},
},
BackfillAll: &datastream.StreamBackfillAllArgs{
MysqlExcludedObjects: &datastream.StreamBackfillAllMysqlExcludedObjectsArgs{
MysqlDatabases: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArray{
&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs{
Database: pulumi.String("my-database"),
MysqlTables: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArray{
&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs{
Table: pulumi.String("excludedTable"),
MysqlColumns: datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArray{
&datastream.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs{
Column: pulumi.String("excludedColumn"),
DataType: pulumi.String("VARCHAR"),
Collation: pulumi.String("utf8mb4"),
PrimaryKey: pulumi.Bool(false),
Nullable: pulumi.Bool(false),
OrdinalPosition: pulumi.Int(0),
},
},
},
},
},
},
},
},
CustomerManagedEncryptionKey: pulumi.String("kms-name"),
}, pulumi.DependsOn([]pulumi.Resource{
keyUser,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
return await Deployment.RunAsync(() =>
{
var project = Gcp.Organizations.GetProject.Invoke();
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "my-instance",
DatabaseVersion = "MYSQL_8_0",
Region = "us-central1",
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-f1-micro",
BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
{
Enabled = true,
BinaryLogEnabled = true,
},
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
DeletionProtection = true,
});
var db = new Gcp.Sql.Database("db", new()
{
Instance = instance.Name,
Name = "db",
});
var pwd = new Random.Index.Password("pwd", new()
{
Length = 16,
Special = false,
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Host = "%",
Password = pwd.Result,
});
var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
{
DisplayName = "Source connection profile",
Location = "us-central1",
ConnectionProfileId = "source-profile",
MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
{
Hostname = instance.PublicIpAddress,
Username = user.Name,
Password = user.Password,
},
});
var bucket = new Gcp.Storage.Bucket("bucket", new()
{
Name = "my-bucket",
Location = "US",
UniformBucketLevelAccess = true,
});
var viewer = new Gcp.Storage.BucketIAMMember("viewer", new()
{
Bucket = bucket.Name,
Role = "roles/storage.objectViewer",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var creator = new Gcp.Storage.BucketIAMMember("creator", new()
{
Bucket = bucket.Name,
Role = "roles/storage.objectCreator",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var reader = new Gcp.Storage.BucketIAMMember("reader", new()
{
Bucket = bucket.Name,
Role = "roles/storage.legacyBucketReader",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var keyUser = new Gcp.Kms.CryptoKeyIAMMember("key_user", new()
{
CryptoKeyId = "kms-name",
Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datastream.iam.gserviceaccount.com",
});
var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
{
DisplayName = "Connection profile",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
GcsProfile = new Gcp.Datastream.Inputs.ConnectionProfileGcsProfileArgs
{
Bucket = bucket.Name,
RootPath = "/path",
},
});
var @default = new Gcp.Datastream.Stream("default", new()
{
StreamId = "my-stream",
DesiredState = "NOT_STARTED",
Location = "us-central1",
DisplayName = "my stream",
Labels =
{
{ "key", "value" },
},
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = sourceConnectionProfile.Id,
MysqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigArgs
{
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs
{
MysqlDatabases = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs
{
Database = "my-database",
MysqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
{
Table = "includedTable",
MysqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
{
Column = "includedColumn",
DataType = "VARCHAR",
Collation = "utf8mb4",
PrimaryKey = false,
Nullable = false,
OrdinalPosition = 0,
},
},
},
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs
{
Table = "includedTable_2",
},
},
},
},
},
ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs
{
MysqlDatabases = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs
{
Database = "my-database",
MysqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs
{
Table = "excludedTable",
MysqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
{
Column = "excludedColumn",
DataType = "VARCHAR",
Collation = "utf8mb4",
PrimaryKey = false,
Nullable = false,
OrdinalPosition = 0,
},
},
},
},
},
},
},
MaxConcurrentCdcTasks = 5,
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destinationConnectionProfile.Id,
GcsDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigArgs
{
Path = "mydata",
FileRotationMb = 200,
FileRotationInterval = "60s",
JsonFileFormat = new Gcp.Datastream.Inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs
{
SchemaFileFormat = "NO_SCHEMA_FILE",
Compression = "GZIP",
},
},
},
BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
{
MysqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsArgs
{
MysqlDatabases = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs
{
Database = "my-database",
MysqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs
{
Table = "excludedTable",
MysqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs
{
Column = "excludedColumn",
DataType = "VARCHAR",
Collation = "utf8mb4",
PrimaryKey = false,
Nullable = false,
OrdinalPosition = 0,
},
},
},
},
},
},
},
},
CustomerManagedEncryptionKey = "kms-name",
}, new CustomResourceOptions
{
DependsOn =
{
keyUser,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.storage.Bucket;
import com.pulumi.gcp.storage.BucketArgs;
import com.pulumi.gcp.storage.BucketIAMMember;
import com.pulumi.gcp.storage.BucketIAMMemberArgs;
import com.pulumi.gcp.kms.CryptoKeyIAMMember;
import com.pulumi.gcp.kms.CryptoKeyIAMMemberArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileGcsProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllMysqlExcludedObjectsArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("my-instance")
.databaseVersion("MYSQL_8_0")
.region("us-central1")
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-f1-micro")
.backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
.enabled(true)
.binaryLogEnabled(true)
.build())
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.deletionProtection(true)
.build());
var db = new Database("db", DatabaseArgs.builder()
.instance(instance.name())
.name("db")
.build());
var pwd = new Password("pwd", PasswordArgs.builder()
.length(16)
.special(false)
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.host("%")
.password(pwd.result())
.build());
var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Source connection profile")
.location("us-central1")
.connectionProfileId("source-profile")
.mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
.hostname(instance.publicIpAddress())
.username(user.name())
.password(user.password())
.build())
.build());
var bucket = new Bucket("bucket", BucketArgs.builder()
.name("my-bucket")
.location("US")
.uniformBucketLevelAccess(true)
.build());
var viewer = new BucketIAMMember("viewer", BucketIAMMemberArgs.builder()
.bucket(bucket.name())
.role("roles/storage.objectViewer")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var creator = new BucketIAMMember("creator", BucketIAMMemberArgs.builder()
.bucket(bucket.name())
.role("roles/storage.objectCreator")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var reader = new BucketIAMMember("reader", BucketIAMMemberArgs.builder()
.bucket(bucket.name())
.role("roles/storage.legacyBucketReader")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var keyUser = new CryptoKeyIAMMember("keyUser", CryptoKeyIAMMemberArgs.builder()
.cryptoKeyId("kms-name")
.role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
.member(String.format("serviceAccount:service-%s@gcp-sa-datastream.iam.gserviceaccount.com", project.number()))
.build());
var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Connection profile")
.location("us-central1")
.connectionProfileId("destination-profile")
.gcsProfile(ConnectionProfileGcsProfileArgs.builder()
.bucket(bucket.name())
.rootPath("/path")
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.streamId("my-stream")
.desiredState("NOT_STARTED")
.location("us-central1")
.displayName("my stream")
.labels(Map.of("key", "value"))
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(sourceConnectionProfile.id())
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.includeObjects(StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs.builder()
.mysqlDatabases(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs.builder()
.database("my-database")
.mysqlTables(
StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("includedTable")
.mysqlColumns(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
.column("includedColumn")
.dataType("VARCHAR")
.collation("utf8mb4")
.primaryKey(false)
.nullable(false)
.ordinalPosition(0)
.build())
.build(),
StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("includedTable_2")
.build())
.build())
.build())
.excludeObjects(StreamSourceConfigMysqlSourceConfigExcludeObjectsArgs.builder()
.mysqlDatabases(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseArgs.builder()
.database("my-database")
.mysqlTables(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("excludedTable")
.mysqlColumns(StreamSourceConfigMysqlSourceConfigExcludeObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
.column("excludedColumn")
.dataType("VARCHAR")
.collation("utf8mb4")
.primaryKey(false)
.nullable(false)
.ordinalPosition(0)
.build())
.build())
.build())
.build())
.maxConcurrentCdcTasks(5)
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destinationConnectionProfile.id())
.gcsDestinationConfig(StreamDestinationConfigGcsDestinationConfigArgs.builder()
.path("mydata")
.fileRotationMb(200)
.fileRotationInterval("60s")
.jsonFileFormat(StreamDestinationConfigGcsDestinationConfigJsonFileFormatArgs.builder()
.schemaFileFormat("NO_SCHEMA_FILE")
.compression("GZIP")
.build())
.build())
.build())
.backfillAll(StreamBackfillAllArgs.builder()
.mysqlExcludedObjects(StreamBackfillAllMysqlExcludedObjectsArgs.builder()
.mysqlDatabases(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseArgs.builder()
.database("my-database")
.mysqlTables(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableArgs.builder()
.table("excludedTable")
.mysqlColumns(StreamBackfillAllMysqlExcludedObjectsMysqlDatabaseMysqlTableMysqlColumnArgs.builder()
.column("excludedColumn")
.dataType("VARCHAR")
.collation("utf8mb4")
.primaryKey(false)
.nullable(false)
.ordinalPosition(0)
.build())
.build())
.build())
.build())
.build())
.customerManagedEncryptionKey("kms-name")
.build(), CustomResourceOptions.builder()
.dependsOn(keyUser)
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: my-instance
databaseVersion: MYSQL_8_0
region: us-central1
settings:
tier: db-f1-micro
backupConfiguration:
enabled: true
binaryLogEnabled: true
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
deletionProtection: true
db:
type: gcp:sql:Database
properties:
instance: ${instance.name}
name: db
pwd:
type: random:Password
properties:
length: 16
special: false
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
host: '%'
password: ${pwd.result}
sourceConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: source_connection_profile
properties:
displayName: Source connection profile
location: us-central1
connectionProfileId: source-profile
mysqlProfile:
hostname: ${instance.publicIpAddress}
username: ${user.name}
password: ${user.password}
bucket:
type: gcp:storage:Bucket
properties:
name: my-bucket
location: US
uniformBucketLevelAccess: true
viewer:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${bucket.name}
role: roles/storage.objectViewer
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
creator:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${bucket.name}
role: roles/storage.objectCreator
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
reader:
type: gcp:storage:BucketIAMMember
properties:
bucket: ${bucket.name}
role: roles/storage.legacyBucketReader
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
keyUser:
type: gcp:kms:CryptoKeyIAMMember
name: key_user
properties:
cryptoKeyId: kms-name
role: roles/cloudkms.cryptoKeyEncrypterDecrypter
member: serviceAccount:service-${project.number}@gcp-sa-datastream.iam.gserviceaccount.com
destinationConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: destination_connection_profile
properties:
displayName: Connection profile
location: us-central1
connectionProfileId: destination-profile
gcsProfile:
bucket: ${bucket.name}
rootPath: /path
default:
type: gcp:datastream:Stream
properties:
streamId: my-stream
desiredState: NOT_STARTED
location: us-central1
displayName: my stream
labels:
key: value
sourceConfig:
sourceConnectionProfile: ${sourceConnectionProfile.id}
mysqlSourceConfig:
includeObjects:
mysqlDatabases:
- database: my-database
mysqlTables:
- table: includedTable
mysqlColumns:
- column: includedColumn
dataType: VARCHAR
collation: utf8mb4
primaryKey: false
nullable: false
ordinalPosition: 0
- table: includedTable_2
excludeObjects:
mysqlDatabases:
- database: my-database
mysqlTables:
- table: excludedTable
mysqlColumns:
- column: excludedColumn
dataType: VARCHAR
collation: utf8mb4
primaryKey: false
nullable: false
ordinalPosition: 0
maxConcurrentCdcTasks: 5
destinationConfig:
destinationConnectionProfile: ${destinationConnectionProfile.id}
gcsDestinationConfig:
path: mydata
fileRotationMb: 200
fileRotationInterval: 60s
jsonFileFormat:
schemaFileFormat: NO_SCHEMA_FILE
compression: GZIP
backfillAll:
mysqlExcludedObjects:
mysqlDatabases:
- database: my-database
mysqlTables:
- table: excludedTable
mysqlColumns:
- column: excludedColumn
dataType: VARCHAR
collation: utf8mb4
primaryKey: false
nullable: false
ordinalPosition: 0
customerManagedEncryptionKey: kms-name
options:
dependsOn:
- ${keyUser}
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The sourceConfig references a MySQL connection profile and defines includeObjects and excludeObjects to filter which databases, tables, and columns get replicated. The destinationConfig points to a GCS connection profile and configures file rotation and JSON formatting. The customerManagedEncryptionKey property encrypts data at rest using a KMS key. The backfillAll property controls initial historical data load, with its own exclusion filters.
Stream PostgreSQL to BigQuery with publication slots
PostgreSQL replication to BigQuery requires configuring publication and replication slots on the source database for change data capture.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const source = new gcp.datastream.ConnectionProfile("source", {
displayName: "Postgresql Source",
location: "us-central1",
connectionProfileId: "source-profile",
postgresqlProfile: {
hostname: "hostname",
port: 5432,
username: "user",
password: "pass",
database: "postgres",
},
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
displayName: "BigQuery Destination",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "Postgres to BigQuery",
location: "us-central1",
streamId: "my-stream",
desiredState: "RUNNING",
sourceConfig: {
sourceConnectionProfile: source.id,
postgresqlSourceConfig: {
maxConcurrentBackfillTasks: 12,
publication: "publication",
replicationSlot: "replication_slot",
includeObjects: {
postgresqlSchemas: [{
schema: "schema",
postgresqlTables: [{
table: "table",
postgresqlColumns: [{
column: "column",
}],
}],
}],
},
excludeObjects: {
postgresqlSchemas: [{
schema: "schema",
postgresqlTables: [{
table: "table",
postgresqlColumns: [{
column: "column",
}],
}],
}],
},
},
},
destinationConfig: {
destinationConnectionProfile: destination.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
},
},
backfillAll: {
postgresqlExcludedObjects: {
postgresqlSchemas: [{
schema: "schema",
postgresqlTables: [{
table: "table",
postgresqlColumns: [{
column: "column",
}],
}],
}],
},
},
});
import pulumi
import pulumi_gcp as gcp
source = gcp.datastream.ConnectionProfile("source",
display_name="Postgresql Source",
location="us-central1",
connection_profile_id="source-profile",
postgresql_profile={
"hostname": "hostname",
"port": 5432,
"username": "user",
"password": "pass",
"database": "postgres",
})
destination = gcp.datastream.ConnectionProfile("destination",
display_name="BigQuery Destination",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
display_name="Postgres to BigQuery",
location="us-central1",
stream_id="my-stream",
desired_state="RUNNING",
source_config={
"source_connection_profile": source.id,
"postgresql_source_config": {
"max_concurrent_backfill_tasks": 12,
"publication": "publication",
"replication_slot": "replication_slot",
"include_objects": {
"postgresql_schemas": [{
"schema": "schema",
"postgresql_tables": [{
"table": "table",
"postgresql_columns": [{
"column": "column",
}],
}],
}],
},
"exclude_objects": {
"postgresql_schemas": [{
"schema": "schema",
"postgresql_tables": [{
"table": "table",
"postgresql_columns": [{
"column": "column",
}],
}],
}],
},
},
},
destination_config={
"destination_connection_profile": destination.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
},
},
backfill_all={
"postgresql_excluded_objects": {
"postgresql_schemas": [{
"schema": "schema",
"postgresql_tables": [{
"table": "table",
"postgresql_columns": [{
"column": "column",
}],
}],
}],
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Postgresql Source"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
PostgresqlProfile: &datastream.ConnectionProfilePostgresqlProfileArgs{
Hostname: pulumi.String("hostname"),
Port: pulumi.Int(5432),
Username: pulumi.String("user"),
Password: pulumi.String("pass"),
Database: pulumi.String("postgres"),
},
})
if err != nil {
return err
}
destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("BigQuery Destination"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("Postgres to BigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("my-stream"),
DesiredState: pulumi.String("RUNNING"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: source.ID(),
PostgresqlSourceConfig: &datastream.StreamSourceConfigPostgresqlSourceConfigArgs{
MaxConcurrentBackfillTasks: pulumi.Int(12),
Publication: pulumi.String("publication"),
ReplicationSlot: pulumi.String("replication_slot"),
IncludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs{
PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs{
Schema: pulumi.String("schema"),
PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs{
Table: pulumi.String("table"),
PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
Column: pulumi.String("column"),
},
},
},
},
},
},
},
ExcludeObjects: &datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs{
PostgresqlSchemas: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs{
Schema: pulumi.String("schema"),
PostgresqlTables: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs{
Table: pulumi.String("table"),
PostgresqlColumns: datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
&datastream.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
Column: pulumi.String("column"),
},
},
},
},
},
},
},
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destination.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
},
},
BackfillAll: &datastream.StreamBackfillAllArgs{
PostgresqlExcludedObjects: &datastream.StreamBackfillAllPostgresqlExcludedObjectsArgs{
PostgresqlSchemas: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArray{
&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs{
Schema: pulumi.String("schema"),
PostgresqlTables: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArray{
&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs{
Table: pulumi.String("table"),
PostgresqlColumns: datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArray{
&datastream.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs{
Column: pulumi.String("column"),
},
},
},
},
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var source = new Gcp.Datastream.ConnectionProfile("source", new()
{
DisplayName = "Postgresql Source",
Location = "us-central1",
ConnectionProfileId = "source-profile",
PostgresqlProfile = new Gcp.Datastream.Inputs.ConnectionProfilePostgresqlProfileArgs
{
Hostname = "hostname",
Port = 5432,
Username = "user",
Password = "pass",
Database = "postgres",
},
});
var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
{
DisplayName = "BigQuery Destination",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "Postgres to BigQuery",
Location = "us-central1",
StreamId = "my-stream",
DesiredState = "RUNNING",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = source.Id,
PostgresqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigArgs
{
MaxConcurrentBackfillTasks = 12,
Publication = "publication",
ReplicationSlot = "replication_slot",
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs
{
PostgresqlSchemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs
{
Schema = "schema",
PostgresqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs
{
Table = "table",
PostgresqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
{
Column = "column",
},
},
},
},
},
},
},
ExcludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs
{
PostgresqlSchemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs
{
Schema = "schema",
PostgresqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs
{
Table = "table",
PostgresqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
{
Column = "column",
},
},
},
},
},
},
},
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destination.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
},
},
BackfillAll = new Gcp.Datastream.Inputs.StreamBackfillAllArgs
{
PostgresqlExcludedObjects = new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs
{
PostgresqlSchemas = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs
{
Schema = "schema",
PostgresqlTables = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs
{
Table = "table",
PostgresqlColumns = new[]
{
new Gcp.Datastream.Inputs.StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs
{
Column = "column",
},
},
},
},
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfilePostgresqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllPostgresqlExcludedObjectsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
.displayName("Postgresql Source")
.location("us-central1")
.connectionProfileId("source-profile")
.postgresqlProfile(ConnectionProfilePostgresqlProfileArgs.builder()
.hostname("hostname")
.port(5432)
.username("user")
.password("pass")
.database("postgres")
.build())
.build());
var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
.displayName("BigQuery Destination")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("Postgres to BigQuery")
.location("us-central1")
.streamId("my-stream")
.desiredState("RUNNING")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(source.id())
.postgresqlSourceConfig(StreamSourceConfigPostgresqlSourceConfigArgs.builder()
.maxConcurrentBackfillTasks(12)
.publication("publication")
.replicationSlot("replication_slot")
.includeObjects(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsArgs.builder()
.postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaArgs.builder()
.schema("schema")
.postgresqlTables(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
.table("table")
.postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigIncludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
.column("column")
.build())
.build())
.build())
.build())
.excludeObjects(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsArgs.builder()
.postgresqlSchemas(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaArgs.builder()
.schema("schema")
.postgresqlTables(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
.table("table")
.postgresqlColumns(StreamSourceConfigPostgresqlSourceConfigExcludeObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
.column("column")
.build())
.build())
.build())
.build())
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destination.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.build())
.build())
.backfillAll(StreamBackfillAllArgs.builder()
.postgresqlExcludedObjects(StreamBackfillAllPostgresqlExcludedObjectsArgs.builder()
.postgresqlSchemas(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaArgs.builder()
.schema("schema")
.postgresqlTables(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTableArgs.builder()
.table("table")
.postgresqlColumns(StreamBackfillAllPostgresqlExcludedObjectsPostgresqlSchemaPostgresqlTablePostgresqlColumnArgs.builder()
.column("column")
.build())
.build())
.build())
.build())
.build())
.build());
}
}
resources:
source:
type: gcp:datastream:ConnectionProfile
properties:
displayName: Postgresql Source
location: us-central1
connectionProfileId: source-profile
postgresqlProfile:
hostname: hostname
port: 5432
username: user
password: pass
database: postgres
destination:
type: gcp:datastream:ConnectionProfile
properties:
displayName: BigQuery Destination
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
displayName: Postgres to BigQuery
location: us-central1
streamId: my-stream
desiredState: RUNNING
sourceConfig:
sourceConnectionProfile: ${source.id}
postgresqlSourceConfig:
maxConcurrentBackfillTasks: 12
publication: publication
replicationSlot: replication_slot
includeObjects:
postgresqlSchemas:
- schema: schema
postgresqlTables:
- table: table
postgresqlColumns:
- column: column
excludeObjects:
postgresqlSchemas:
- schema: schema
postgresqlTables:
- table: table
postgresqlColumns:
- column: column
destinationConfig:
destinationConnectionProfile: ${destination.id}
bigqueryDestinationConfig:
dataFreshness: 900s
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
backfillAll:
postgresqlExcludedObjects:
postgresqlSchemas:
- schema: schema
postgresqlTables:
- table: table
postgresqlColumns:
- column: column
The postgresqlSourceConfig specifies the publication and replicationSlot names that must exist in your PostgreSQL database. The bigqueryDestinationConfig uses sourceHierarchyDatasets to create separate BigQuery datasets matching your source schema structure. The dataFreshness property controls how often BigQuery tables are updated with new changes.
Stream SQL Server using transaction logs
SQL Server replication can use transaction logs to capture changes, providing low-latency CDC without additional database configuration.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "sql-server",
databaseVersion: "SQLSERVER_2019_STANDARD",
region: "us-central1",
rootPassword: "root-password",
deletionProtection: true,
settings: {
tier: "db-custom-2-4096",
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
password: "password",
});
const db = new gcp.sql.Database("db", {
name: "db",
instance: instance.name,
}, {
dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
displayName: "SQL Server Source",
location: "us-central1",
connectionProfileId: "source-profile",
sqlServerProfile: {
hostname: instance.publicIpAddress,
port: 1433,
username: user.name,
password: user.password,
database: db.name,
},
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
displayName: "BigQuery Destination",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "SQL Server to BigQuery",
location: "us-central1",
streamId: "stream",
sourceConfig: {
sourceConnectionProfile: source.id,
sqlServerSourceConfig: {
includeObjects: {
schemas: [{
schema: "schema",
tables: [{
table: "table",
}],
}],
},
transactionLogs: {},
},
},
destinationConfig: {
destinationConnectionProfile: destination.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
},
},
backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
instance = gcp.sql.DatabaseInstance("instance",
name="sql-server",
database_version="SQLSERVER_2019_STANDARD",
region="us-central1",
root_password="root-password",
deletion_protection=True,
settings={
"tier": "db-custom-2-4096",
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
})
user = gcp.sql.User("user",
name="user",
instance=instance.name,
password="password")
db = gcp.sql.Database("db",
name="db",
instance=instance.name,
opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
display_name="SQL Server Source",
location="us-central1",
connection_profile_id="source-profile",
sql_server_profile={
"hostname": instance.public_ip_address,
"port": 1433,
"username": user.name,
"password": user.password,
"database": db.name,
})
destination = gcp.datastream.ConnectionProfile("destination",
display_name="BigQuery Destination",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
display_name="SQL Server to BigQuery",
location="us-central1",
stream_id="stream",
source_config={
"source_connection_profile": source.id,
"sql_server_source_config": {
"include_objects": {
"schemas": [{
"schema": "schema",
"tables": [{
"table": "table",
}],
}],
},
"transaction_logs": {},
},
},
destination_config={
"destination_connection_profile": destination.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
},
},
backfill_none={})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("sql-server"),
DatabaseVersion: pulumi.String("SQLSERVER_2019_STANDARD"),
Region: pulumi.String("us-central1"),
RootPassword: pulumi.String("root-password"),
DeletionProtection: pulumi.Bool(true),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-custom-2-4096"),
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Password: pulumi.String("password"),
})
if err != nil {
return err
}
db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Name: pulumi.String("db"),
Instance: instance.Name,
}, pulumi.DependsOn([]pulumi.Resource{
user,
}))
if err != nil {
return err
}
source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("SQL Server Source"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
Hostname: instance.PublicIpAddress,
Port: pulumi.Int(1433),
Username: user.Name,
Password: user.Password,
Database: db.Name,
},
})
if err != nil {
return err
}
destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("BigQuery Destination"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("SQL Server to BigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("stream"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: source.ID(),
SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
Schema: pulumi.String("schema"),
Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
Table: pulumi.String("table"),
},
},
},
},
},
TransactionLogs: &datastream.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs{},
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destination.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "sql-server",
DatabaseVersion = "SQLSERVER_2019_STANDARD",
Region = "us-central1",
RootPassword = "root-password",
DeletionProtection = true,
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-custom-2-4096",
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Password = "password",
});
var db = new Gcp.Sql.Database("db", new()
{
Name = "db",
Instance = instance.Name,
}, new CustomResourceOptions
{
DependsOn =
{
user,
},
});
var source = new Gcp.Datastream.ConnectionProfile("source", new()
{
DisplayName = "SQL Server Source",
Location = "us-central1",
ConnectionProfileId = "source-profile",
SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
{
Hostname = instance.PublicIpAddress,
Port = 1433,
Username = user.Name,
Password = user.Password,
Database = db.Name,
},
});
var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
{
DisplayName = "BigQuery Destination",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "SQL Server to BigQuery",
Location = "us-central1",
StreamId = "stream",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = source.Id,
SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
{
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
{
Schemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
{
Schema = "schema",
Tables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
{
Table = "table",
},
},
},
},
},
TransactionLogs = null,
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destination.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
},
},
BackfillNone = null,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("sql-server")
.databaseVersion("SQLSERVER_2019_STANDARD")
.region("us-central1")
.rootPassword("root-password")
.deletionProtection(true)
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-custom-2-4096")
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.password("password")
.build());
var db = new Database("db", DatabaseArgs.builder()
.name("db")
.instance(instance.name())
.build(), CustomResourceOptions.builder()
.dependsOn(user)
.build());
var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
.displayName("SQL Server Source")
.location("us-central1")
.connectionProfileId("source-profile")
.sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
.hostname(instance.publicIpAddress())
.port(1433)
.username(user.name())
.password(user.password())
.database(db.name())
.build())
.build());
var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
.displayName("BigQuery Destination")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("SQL Server to BigQuery")
.location("us-central1")
.streamId("stream")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(source.id())
.sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
.includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
.schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
.schema("schema")
.tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
.table("table")
.build())
.build())
.build())
.transactionLogs(StreamSourceConfigSqlServerSourceConfigTransactionLogsArgs.builder()
.build())
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destination.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: sql-server
databaseVersion: SQLSERVER_2019_STANDARD
region: us-central1
rootPassword: root-password
deletionProtection: true
settings:
tier: db-custom-2-4096
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
db:
type: gcp:sql:Database
properties:
name: db
instance: ${instance.name}
options:
dependsOn:
- ${user}
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
password: password
source:
type: gcp:datastream:ConnectionProfile
properties:
displayName: SQL Server Source
location: us-central1
connectionProfileId: source-profile
sqlServerProfile:
hostname: ${instance.publicIpAddress}
port: 1433
username: ${user.name}
password: ${user.password}
database: ${db.name}
destination:
type: gcp:datastream:ConnectionProfile
properties:
displayName: BigQuery Destination
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
displayName: SQL Server to BigQuery
location: us-central1
streamId: stream
sourceConfig:
sourceConnectionProfile: ${source.id}
sqlServerSourceConfig:
includeObjects:
schemas:
- schema: schema
tables:
- table: table
transactionLogs: {}
destinationConfig:
destinationConnectionProfile: ${destination.id}
bigqueryDestinationConfig:
dataFreshness: 900s
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
backfillNone: {}
The sqlServerSourceConfig with transactionLogs captures changes directly from SQL Server transaction logs. The includeObjects property filters by schema and table. The backfillNone property skips historical data load, starting replication from the current point in time.
Stream SQL Server using change tables
SQL Server change data capture can use change tables instead of transaction logs, which may be preferred for certain database configurations.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "sql-server",
databaseVersion: "SQLSERVER_2019_STANDARD",
region: "us-central1",
rootPassword: "root-password",
deletionProtection: true,
settings: {
tier: "db-custom-2-4096",
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
password: "password",
});
const db = new gcp.sql.Database("db", {
name: "db",
instance: instance.name,
}, {
dependsOn: [user],
});
const source = new gcp.datastream.ConnectionProfile("source", {
displayName: "SQL Server Source",
location: "us-central1",
connectionProfileId: "source-profile",
sqlServerProfile: {
hostname: instance.publicIpAddress,
port: 1433,
username: user.name,
password: user.password,
database: db.name,
},
});
const destination = new gcp.datastream.ConnectionProfile("destination", {
displayName: "BigQuery Destination",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "SQL Server to BigQuery",
location: "us-central1",
streamId: "stream",
sourceConfig: {
sourceConnectionProfile: source.id,
sqlServerSourceConfig: {
includeObjects: {
schemas: [{
schema: "schema",
tables: [{
table: "table",
}],
}],
},
changeTables: {},
},
},
destinationConfig: {
destinationConnectionProfile: destination.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
},
},
backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
instance = gcp.sql.DatabaseInstance("instance",
name="sql-server",
database_version="SQLSERVER_2019_STANDARD",
region="us-central1",
root_password="root-password",
deletion_protection=True,
settings={
"tier": "db-custom-2-4096",
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
})
user = gcp.sql.User("user",
name="user",
instance=instance.name,
password="password")
db = gcp.sql.Database("db",
name="db",
instance=instance.name,
opts = pulumi.ResourceOptions(depends_on=[user]))
source = gcp.datastream.ConnectionProfile("source",
display_name="SQL Server Source",
location="us-central1",
connection_profile_id="source-profile",
sql_server_profile={
"hostname": instance.public_ip_address,
"port": 1433,
"username": user.name,
"password": user.password,
"database": db.name,
})
destination = gcp.datastream.ConnectionProfile("destination",
display_name="BigQuery Destination",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
display_name="SQL Server to BigQuery",
location="us-central1",
stream_id="stream",
source_config={
"source_connection_profile": source.id,
"sql_server_source_config": {
"include_objects": {
"schemas": [{
"schema": "schema",
"tables": [{
"table": "table",
}],
}],
},
"change_tables": {},
},
},
destination_config={
"destination_connection_profile": destination.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
},
},
backfill_none={})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("sql-server"),
DatabaseVersion: pulumi.String("SQLSERVER_2019_STANDARD"),
Region: pulumi.String("us-central1"),
RootPassword: pulumi.String("root-password"),
DeletionProtection: pulumi.Bool(true),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-custom-2-4096"),
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Password: pulumi.String("password"),
})
if err != nil {
return err
}
db, err := sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Name: pulumi.String("db"),
Instance: instance.Name,
}, pulumi.DependsOn([]pulumi.Resource{
user,
}))
if err != nil {
return err
}
source, err := datastream.NewConnectionProfile(ctx, "source", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("SQL Server Source"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
SqlServerProfile: &datastream.ConnectionProfileSqlServerProfileArgs{
Hostname: instance.PublicIpAddress,
Port: pulumi.Int(1433),
Username: user.Name,
Password: user.Password,
Database: db.Name,
},
})
if err != nil {
return err
}
destination, err := datastream.NewConnectionProfile(ctx, "destination", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("BigQuery Destination"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("SQL Server to BigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("stream"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: source.ID(),
SqlServerSourceConfig: &datastream.StreamSourceConfigSqlServerSourceConfigArgs{
IncludeObjects: &datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs{
Schemas: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs{
Schema: pulumi.String("schema"),
Tables: datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArray{
&datastream.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs{
Table: pulumi.String("table"),
},
},
},
},
},
ChangeTables: &datastream.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs{},
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destination.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "sql-server",
DatabaseVersion = "SQLSERVER_2019_STANDARD",
Region = "us-central1",
RootPassword = "root-password",
DeletionProtection = true,
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-custom-2-4096",
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Password = "password",
});
var db = new Gcp.Sql.Database("db", new()
{
Name = "db",
Instance = instance.Name,
}, new CustomResourceOptions
{
DependsOn =
{
user,
},
});
var source = new Gcp.Datastream.ConnectionProfile("source", new()
{
DisplayName = "SQL Server Source",
Location = "us-central1",
ConnectionProfileId = "source-profile",
SqlServerProfile = new Gcp.Datastream.Inputs.ConnectionProfileSqlServerProfileArgs
{
Hostname = instance.PublicIpAddress,
Port = 1433,
Username = user.Name,
Password = user.Password,
Database = db.Name,
},
});
var destination = new Gcp.Datastream.ConnectionProfile("destination", new()
{
DisplayName = "BigQuery Destination",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "SQL Server to BigQuery",
Location = "us-central1",
StreamId = "stream",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = source.Id,
SqlServerSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigArgs
{
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs
{
Schemas = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs
{
Schema = "schema",
Tables = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs
{
Table = "table",
},
},
},
},
},
ChangeTables = null,
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destination.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
},
},
BackfillNone = null,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileSqlServerProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigSqlServerSourceConfigChangeTablesArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("sql-server")
.databaseVersion("SQLSERVER_2019_STANDARD")
.region("us-central1")
.rootPassword("root-password")
.deletionProtection(true)
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-custom-2-4096")
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.password("password")
.build());
var db = new Database("db", DatabaseArgs.builder()
.name("db")
.instance(instance.name())
.build(), CustomResourceOptions.builder()
.dependsOn(user)
.build());
var source = new ConnectionProfile("source", ConnectionProfileArgs.builder()
.displayName("SQL Server Source")
.location("us-central1")
.connectionProfileId("source-profile")
.sqlServerProfile(ConnectionProfileSqlServerProfileArgs.builder()
.hostname(instance.publicIpAddress())
.port(1433)
.username(user.name())
.password(user.password())
.database(db.name())
.build())
.build());
var destination = new ConnectionProfile("destination", ConnectionProfileArgs.builder()
.displayName("BigQuery Destination")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("SQL Server to BigQuery")
.location("us-central1")
.streamId("stream")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(source.id())
.sqlServerSourceConfig(StreamSourceConfigSqlServerSourceConfigArgs.builder()
.includeObjects(StreamSourceConfigSqlServerSourceConfigIncludeObjectsArgs.builder()
.schemas(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaArgs.builder()
.schema("schema")
.tables(StreamSourceConfigSqlServerSourceConfigIncludeObjectsSchemaTableArgs.builder()
.table("table")
.build())
.build())
.build())
.changeTables(StreamSourceConfigSqlServerSourceConfigChangeTablesArgs.builder()
.build())
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destination.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: sql-server
databaseVersion: SQLSERVER_2019_STANDARD
region: us-central1
rootPassword: root-password
deletionProtection: true
settings:
tier: db-custom-2-4096
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
db:
type: gcp:sql:Database
properties:
name: db
instance: ${instance.name}
options:
dependsOn:
- ${user}
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
password: password
source:
type: gcp:datastream:ConnectionProfile
properties:
displayName: SQL Server Source
location: us-central1
connectionProfileId: source-profile
sqlServerProfile:
hostname: ${instance.publicIpAddress}
port: 1433
username: ${user.name}
password: ${user.password}
database: ${db.name}
destination:
type: gcp:datastream:ConnectionProfile
properties:
displayName: BigQuery Destination
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
displayName: SQL Server to BigQuery
location: us-central1
streamId: stream
sourceConfig:
sourceConnectionProfile: ${source.id}
sqlServerSourceConfig:
includeObjects:
schemas:
- schema: schema
tables:
- table: table
changeTables: {}
destinationConfig:
destinationConnectionProfile: ${destination.id}
bigqueryDestinationConfig:
dataFreshness: 900s
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
backfillNone: {}
The changeTables property configures CDC to read from SQL Server change tables rather than transaction logs. This approach requires CDC to be enabled on your SQL Server instance with change tables configured for the tables you want to replicate.
Stream to a single BigQuery dataset
Instead of creating separate datasets per source schema, you can consolidate all replicated tables into a single BigQuery dataset.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
const postgres = new gcp.bigquery.Dataset("postgres", {
datasetId: "postgres",
friendlyName: "postgres",
description: "Database of postgres",
location: "us-central1",
});
const destinationConnectionProfile2 = new gcp.datastream.ConnectionProfile("destination_connection_profile2", {
displayName: "Connection profile",
location: "us-central1",
connectionProfileId: "dest-profile",
bigqueryProfile: {},
});
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "instance-name",
databaseVersion: "MYSQL_8_0",
region: "us-central1",
settings: {
tier: "db-f1-micro",
backupConfiguration: {
enabled: true,
binaryLogEnabled: true,
},
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
deletionProtection: false,
});
const pwd = new random.index.Password("pwd", {
length: 16,
special: false,
});
const user = new gcp.sql.User("user", {
name: "my-user",
instance: instance.name,
host: "%",
password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
displayName: "Source connection profile",
location: "us-central1",
connectionProfileId: "source-profile",
mysqlProfile: {
hostname: instance.publicIpAddress,
username: user.name,
password: user.password,
},
});
const _default = new gcp.datastream.Stream("default", {
displayName: "postgres to bigQuery",
location: "us-central1",
streamId: "postgres-bigquery",
sourceConfig: {
sourceConnectionProfile: sourceConnectionProfile.id,
mysqlSourceConfig: {},
},
destinationConfig: {
destinationConnectionProfile: destinationConnectionProfile2.id,
bigqueryDestinationConfig: {
dataFreshness: "900s",
singleTargetDataset: {
datasetId: postgres.id,
},
},
},
backfillAll: {},
});
const db = new gcp.sql.Database("db", {
instance: instance.name,
name: "db",
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
postgres = gcp.bigquery.Dataset("postgres",
dataset_id="postgres",
friendly_name="postgres",
description="Database of postgres",
location="us-central1")
destination_connection_profile2 = gcp.datastream.ConnectionProfile("destination_connection_profile2",
display_name="Connection profile",
location="us-central1",
connection_profile_id="dest-profile",
bigquery_profile={})
instance = gcp.sql.DatabaseInstance("instance",
name="instance-name",
database_version="MYSQL_8_0",
region="us-central1",
settings={
"tier": "db-f1-micro",
"backup_configuration": {
"enabled": True,
"binary_log_enabled": True,
},
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
},
deletion_protection=False)
pwd = random.index.Password("pwd",
length=16,
special=False)
user = gcp.sql.User("user",
name="my-user",
instance=instance.name,
host="%",
password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
display_name="Source connection profile",
location="us-central1",
connection_profile_id="source-profile",
mysql_profile={
"hostname": instance.public_ip_address,
"username": user.name,
"password": user.password,
})
default = gcp.datastream.Stream("default",
display_name="postgres to bigQuery",
location="us-central1",
stream_id="postgres-bigquery",
source_config={
"source_connection_profile": source_connection_profile.id,
"mysql_source_config": {},
},
destination_config={
"destination_connection_profile": destination_connection_profile2.id,
"bigquery_destination_config": {
"data_freshness": "900s",
"single_target_dataset": {
"dataset_id": postgres.id,
},
},
},
backfill_all={})
db = gcp.sql.Database("db",
instance=instance.name,
name="db")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
postgres, err := bigquery.NewDataset(ctx, "postgres", &bigquery.DatasetArgs{
DatasetId: pulumi.String("postgres"),
FriendlyName: pulumi.String("postgres"),
Description: pulumi.String("Database of postgres"),
Location: pulumi.String("us-central1"),
})
if err != nil {
return err
}
destinationConnectionProfile2, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile2", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("dest-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("instance-name"),
DatabaseVersion: pulumi.String("MYSQL_8_0"),
Region: pulumi.String("us-central1"),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-f1-micro"),
BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
Enabled: pulumi.Bool(true),
BinaryLogEnabled: pulumi.Bool(true),
},
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
DeletionProtection: pulumi.Bool(false),
})
if err != nil {
return err
}
pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
Length: 16,
Special: false,
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("my-user"),
Instance: instance.Name,
Host: pulumi.String("%"),
Password: pwd.Result,
})
if err != nil {
return err
}
sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Source connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
Hostname: instance.PublicIpAddress,
Username: user.Name,
Password: user.Password,
},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
DisplayName: pulumi.String("postgres to bigQuery"),
Location: pulumi.String("us-central1"),
StreamId: pulumi.String("postgres-bigquery"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: sourceConnectionProfile.ID(),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destinationConnectionProfile2.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
DataFreshness: pulumi.String("900s"),
SingleTargetDataset: &datastream.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs{
DatasetId: postgres.ID(),
},
},
},
BackfillAll: &datastream.StreamBackfillAllArgs{},
})
if err != nil {
return err
}
_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Instance: instance.Name,
Name: pulumi.String("db"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
return await Deployment.RunAsync(() =>
{
var postgres = new Gcp.BigQuery.Dataset("postgres", new()
{
DatasetId = "postgres",
FriendlyName = "postgres",
Description = "Database of postgres",
Location = "us-central1",
});
var destinationConnectionProfile2 = new Gcp.Datastream.ConnectionProfile("destination_connection_profile2", new()
{
DisplayName = "Connection profile",
Location = "us-central1",
ConnectionProfileId = "dest-profile",
BigqueryProfile = null,
});
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "instance-name",
DatabaseVersion = "MYSQL_8_0",
Region = "us-central1",
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-f1-micro",
BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
{
Enabled = true,
BinaryLogEnabled = true,
},
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
DeletionProtection = false,
});
var pwd = new Random.Index.Password("pwd", new()
{
Length = 16,
Special = false,
});
var user = new Gcp.Sql.User("user", new()
{
Name = "my-user",
Instance = instance.Name,
Host = "%",
Password = pwd.Result,
});
var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
{
DisplayName = "Source connection profile",
Location = "us-central1",
ConnectionProfileId = "source-profile",
MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
{
Hostname = instance.PublicIpAddress,
Username = user.Name,
Password = user.Password,
},
});
var @default = new Gcp.Datastream.Stream("default", new()
{
DisplayName = "postgres to bigQuery",
Location = "us-central1",
StreamId = "postgres-bigquery",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = sourceConnectionProfile.Id,
MysqlSourceConfig = null,
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destinationConnectionProfile2.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
DataFreshness = "900s",
SingleTargetDataset = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs
{
DatasetId = postgres.Id,
},
},
},
BackfillAll = null,
});
var db = new Gcp.Sql.Database("db", new()
{
Instance = instance.Name,
Name = "db",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillAllArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var postgres = new Dataset("postgres", DatasetArgs.builder()
.datasetId("postgres")
.friendlyName("postgres")
.description("Database of postgres")
.location("us-central1")
.build());
var destinationConnectionProfile2 = new ConnectionProfile("destinationConnectionProfile2", ConnectionProfileArgs.builder()
.displayName("Connection profile")
.location("us-central1")
.connectionProfileId("dest-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("instance-name")
.databaseVersion("MYSQL_8_0")
.region("us-central1")
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-f1-micro")
.backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
.enabled(true)
.binaryLogEnabled(true)
.build())
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.deletionProtection(false)
.build());
var pwd = new Password("pwd", PasswordArgs.builder()
.length(16)
.special(false)
.build());
var user = new User("user", UserArgs.builder()
.name("my-user")
.instance(instance.name())
.host("%")
.password(pwd.result())
.build());
var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Source connection profile")
.location("us-central1")
.connectionProfileId("source-profile")
.mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
.hostname(instance.publicIpAddress())
.username(user.name())
.password(user.password())
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.displayName("postgres to bigQuery")
.location("us-central1")
.streamId("postgres-bigquery")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(sourceConnectionProfile.id())
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destinationConnectionProfile2.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.dataFreshness("900s")
.singleTargetDataset(StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs.builder()
.datasetId(postgres.id())
.build())
.build())
.build())
.backfillAll(StreamBackfillAllArgs.builder()
.build())
.build());
var db = new Database("db", DatabaseArgs.builder()
.instance(instance.name())
.name("db")
.build());
}
}
resources:
postgres:
type: gcp:bigquery:Dataset
properties:
datasetId: postgres
friendlyName: postgres
description: Database of postgres
location: us-central1
default:
type: gcp:datastream:Stream
properties:
displayName: postgres to bigQuery
location: us-central1
streamId: postgres-bigquery
sourceConfig:
sourceConnectionProfile: ${sourceConnectionProfile.id}
mysqlSourceConfig: {}
destinationConfig:
destinationConnectionProfile: ${destinationConnectionProfile2.id}
bigqueryDestinationConfig:
dataFreshness: 900s
singleTargetDataset:
datasetId: ${postgres.id}
backfillAll: {}
destinationConnectionProfile2:
type: gcp:datastream:ConnectionProfile
name: destination_connection_profile2
properties:
displayName: Connection profile
location: us-central1
connectionProfileId: dest-profile
bigqueryProfile: {}
instance:
type: gcp:sql:DatabaseInstance
properties:
name: instance-name
databaseVersion: MYSQL_8_0
region: us-central1
settings:
tier: db-f1-micro
backupConfiguration:
enabled: true
binaryLogEnabled: true
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
deletionProtection: false
db:
type: gcp:sql:Database
properties:
instance: ${instance.name}
name: db
pwd:
type: random:Password
properties:
length: 16
special: false
user:
type: gcp:sql:User
properties:
name: my-user
instance: ${instance.name}
host: '%'
password: ${pwd.result}
sourceConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: source_connection_profile
properties:
displayName: Source connection profile
location: us-central1
connectionProfileId: source-profile
mysqlProfile:
hostname: ${instance.publicIpAddress}
username: ${user.name}
password: ${user.password}
The singleTargetDataset property directs all replicated tables to one BigQuery dataset, regardless of source schema. This simplifies BigQuery organization when you don’t need schema-level separation.
Use append-only mode for BigQuery writes
Append-only mode writes all changes as new rows rather than updating existing rows, preserving full change history for audit or time-travel queries.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as random from "@pulumi/random";
const project = gcp.organizations.getProject({});
const instance = new gcp.sql.DatabaseInstance("instance", {
name: "my-instance",
databaseVersion: "MYSQL_8_0",
region: "us-central1",
settings: {
tier: "db-f1-micro",
backupConfiguration: {
enabled: true,
binaryLogEnabled: true,
},
ipConfiguration: {
authorizedNetworks: [
{
value: "34.71.242.81",
},
{
value: "34.72.28.29",
},
{
value: "34.67.6.157",
},
{
value: "34.67.234.134",
},
{
value: "34.72.239.218",
},
],
},
},
deletionProtection: true,
});
const db = new gcp.sql.Database("db", {
instance: instance.name,
name: "db",
});
const pwd = new random.index.Password("pwd", {
length: 16,
special: false,
});
const user = new gcp.sql.User("user", {
name: "user",
instance: instance.name,
host: "%",
password: pwd.result,
});
const sourceConnectionProfile = new gcp.datastream.ConnectionProfile("source_connection_profile", {
displayName: "Source connection profile",
location: "us-central1",
connectionProfileId: "source-profile",
mysqlProfile: {
hostname: instance.publicIpAddress,
username: user.name,
password: user.password,
},
});
const destinationConnectionProfile = new gcp.datastream.ConnectionProfile("destination_connection_profile", {
displayName: "Connection profile",
location: "us-central1",
connectionProfileId: "destination-profile",
bigqueryProfile: {},
});
const _default = new gcp.datastream.Stream("default", {
streamId: "my-stream",
location: "us-central1",
displayName: "my stream",
sourceConfig: {
sourceConnectionProfile: sourceConnectionProfile.id,
mysqlSourceConfig: {},
},
destinationConfig: {
destinationConnectionProfile: destinationConnectionProfile.id,
bigqueryDestinationConfig: {
sourceHierarchyDatasets: {
datasetTemplate: {
location: "us-central1",
},
},
appendOnly: {},
},
},
backfillNone: {},
});
import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
project = gcp.organizations.get_project()
instance = gcp.sql.DatabaseInstance("instance",
name="my-instance",
database_version="MYSQL_8_0",
region="us-central1",
settings={
"tier": "db-f1-micro",
"backup_configuration": {
"enabled": True,
"binary_log_enabled": True,
},
"ip_configuration": {
"authorized_networks": [
{
"value": "34.71.242.81",
},
{
"value": "34.72.28.29",
},
{
"value": "34.67.6.157",
},
{
"value": "34.67.234.134",
},
{
"value": "34.72.239.218",
},
],
},
},
deletion_protection=True)
db = gcp.sql.Database("db",
instance=instance.name,
name="db")
pwd = random.index.Password("pwd",
length=16,
special=False)
user = gcp.sql.User("user",
name="user",
instance=instance.name,
host="%",
password=pwd["result"])
source_connection_profile = gcp.datastream.ConnectionProfile("source_connection_profile",
display_name="Source connection profile",
location="us-central1",
connection_profile_id="source-profile",
mysql_profile={
"hostname": instance.public_ip_address,
"username": user.name,
"password": user.password,
})
destination_connection_profile = gcp.datastream.ConnectionProfile("destination_connection_profile",
display_name="Connection profile",
location="us-central1",
connection_profile_id="destination-profile",
bigquery_profile={})
default = gcp.datastream.Stream("default",
stream_id="my-stream",
location="us-central1",
display_name="my stream",
source_config={
"source_connection_profile": source_connection_profile.id,
"mysql_source_config": {},
},
destination_config={
"destination_connection_profile": destination_connection_profile.id,
"bigquery_destination_config": {
"source_hierarchy_datasets": {
"dataset_template": {
"location": "us-central1",
},
},
"append_only": {},
},
},
backfill_none={})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/sql"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
instance, err := sql.NewDatabaseInstance(ctx, "instance", &sql.DatabaseInstanceArgs{
Name: pulumi.String("my-instance"),
DatabaseVersion: pulumi.String("MYSQL_8_0"),
Region: pulumi.String("us-central1"),
Settings: &sql.DatabaseInstanceSettingsArgs{
Tier: pulumi.String("db-f1-micro"),
BackupConfiguration: &sql.DatabaseInstanceSettingsBackupConfigurationArgs{
Enabled: pulumi.Bool(true),
BinaryLogEnabled: pulumi.Bool(true),
},
IpConfiguration: &sql.DatabaseInstanceSettingsIpConfigurationArgs{
AuthorizedNetworks: sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArray{
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.71.242.81"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.28.29"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.6.157"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.67.234.134"),
},
&sql.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs{
Value: pulumi.String("34.72.239.218"),
},
},
},
},
DeletionProtection: pulumi.Bool(true),
})
if err != nil {
return err
}
_, err = sql.NewDatabase(ctx, "db", &sql.DatabaseArgs{
Instance: instance.Name,
Name: pulumi.String("db"),
})
if err != nil {
return err
}
pwd, err := random.NewPassword(ctx, "pwd", &random.PasswordArgs{
Length: 16,
Special: false,
})
if err != nil {
return err
}
user, err := sql.NewUser(ctx, "user", &sql.UserArgs{
Name: pulumi.String("user"),
Instance: instance.Name,
Host: pulumi.String("%"),
Password: pwd.Result,
})
if err != nil {
return err
}
sourceConnectionProfile, err := datastream.NewConnectionProfile(ctx, "source_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Source connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("source-profile"),
MysqlProfile: &datastream.ConnectionProfileMysqlProfileArgs{
Hostname: instance.PublicIpAddress,
Username: user.Name,
Password: user.Password,
},
})
if err != nil {
return err
}
destinationConnectionProfile, err := datastream.NewConnectionProfile(ctx, "destination_connection_profile", &datastream.ConnectionProfileArgs{
DisplayName: pulumi.String("Connection profile"),
Location: pulumi.String("us-central1"),
ConnectionProfileId: pulumi.String("destination-profile"),
BigqueryProfile: &datastream.ConnectionProfileBigqueryProfileArgs{},
})
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "default", &datastream.StreamArgs{
StreamId: pulumi.String("my-stream"),
Location: pulumi.String("us-central1"),
DisplayName: pulumi.String("my stream"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: sourceConnectionProfile.ID(),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: destinationConnectionProfile.ID(),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
SourceHierarchyDatasets: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs{
DatasetTemplate: &datastream.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs{
Location: pulumi.String("us-central1"),
},
},
AppendOnly: &datastream.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs{},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
using Random = Pulumi.Random;
return await Deployment.RunAsync(() =>
{
var project = Gcp.Organizations.GetProject.Invoke();
var instance = new Gcp.Sql.DatabaseInstance("instance", new()
{
Name = "my-instance",
DatabaseVersion = "MYSQL_8_0",
Region = "us-central1",
Settings = new Gcp.Sql.Inputs.DatabaseInstanceSettingsArgs
{
Tier = "db-f1-micro",
BackupConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsBackupConfigurationArgs
{
Enabled = true,
BinaryLogEnabled = true,
},
IpConfiguration = new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationArgs
{
AuthorizedNetworks = new[]
{
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.71.242.81",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.28.29",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.6.157",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.67.234.134",
},
new Gcp.Sql.Inputs.DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs
{
Value = "34.72.239.218",
},
},
},
},
DeletionProtection = true,
});
var db = new Gcp.Sql.Database("db", new()
{
Instance = instance.Name,
Name = "db",
});
var pwd = new Random.Index.Password("pwd", new()
{
Length = 16,
Special = false,
});
var user = new Gcp.Sql.User("user", new()
{
Name = "user",
Instance = instance.Name,
Host = "%",
Password = pwd.Result,
});
var sourceConnectionProfile = new Gcp.Datastream.ConnectionProfile("source_connection_profile", new()
{
DisplayName = "Source connection profile",
Location = "us-central1",
ConnectionProfileId = "source-profile",
MysqlProfile = new Gcp.Datastream.Inputs.ConnectionProfileMysqlProfileArgs
{
Hostname = instance.PublicIpAddress,
Username = user.Name,
Password = user.Password,
},
});
var destinationConnectionProfile = new Gcp.Datastream.ConnectionProfile("destination_connection_profile", new()
{
DisplayName = "Connection profile",
Location = "us-central1",
ConnectionProfileId = "destination-profile",
BigqueryProfile = null,
});
var @default = new Gcp.Datastream.Stream("default", new()
{
StreamId = "my-stream",
Location = "us-central1",
DisplayName = "my stream",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = sourceConnectionProfile.Id,
MysqlSourceConfig = null,
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = destinationConnectionProfile.Id,
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
SourceHierarchyDatasets = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs
{
DatasetTemplate = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs
{
Location = "us-central1",
},
},
AppendOnly = null,
},
},
BackfillNone = null,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.sql.DatabaseInstance;
import com.pulumi.gcp.sql.DatabaseInstanceArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsBackupConfigurationArgs;
import com.pulumi.gcp.sql.inputs.DatabaseInstanceSettingsIpConfigurationArgs;
import com.pulumi.gcp.sql.Database;
import com.pulumi.gcp.sql.DatabaseArgs;
import com.pulumi.random.Password;
import com.pulumi.random.PasswordArgs;
import com.pulumi.gcp.sql.User;
import com.pulumi.gcp.sql.UserArgs;
import com.pulumi.gcp.datastream.ConnectionProfile;
import com.pulumi.gcp.datastream.ConnectionProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileMysqlProfileArgs;
import com.pulumi.gcp.datastream.inputs.ConnectionProfileBigqueryProfileArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var instance = new DatabaseInstance("instance", DatabaseInstanceArgs.builder()
.name("my-instance")
.databaseVersion("MYSQL_8_0")
.region("us-central1")
.settings(DatabaseInstanceSettingsArgs.builder()
.tier("db-f1-micro")
.backupConfiguration(DatabaseInstanceSettingsBackupConfigurationArgs.builder()
.enabled(true)
.binaryLogEnabled(true)
.build())
.ipConfiguration(DatabaseInstanceSettingsIpConfigurationArgs.builder()
.authorizedNetworks(
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.71.242.81")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.28.29")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.6.157")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.67.234.134")
.build(),
DatabaseInstanceSettingsIpConfigurationAuthorizedNetworkArgs.builder()
.value("34.72.239.218")
.build())
.build())
.build())
.deletionProtection(true)
.build());
var db = new Database("db", DatabaseArgs.builder()
.instance(instance.name())
.name("db")
.build());
var pwd = new Password("pwd", PasswordArgs.builder()
.length(16)
.special(false)
.build());
var user = new User("user", UserArgs.builder()
.name("user")
.instance(instance.name())
.host("%")
.password(pwd.result())
.build());
var sourceConnectionProfile = new ConnectionProfile("sourceConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Source connection profile")
.location("us-central1")
.connectionProfileId("source-profile")
.mysqlProfile(ConnectionProfileMysqlProfileArgs.builder()
.hostname(instance.publicIpAddress())
.username(user.name())
.password(user.password())
.build())
.build());
var destinationConnectionProfile = new ConnectionProfile("destinationConnectionProfile", ConnectionProfileArgs.builder()
.displayName("Connection profile")
.location("us-central1")
.connectionProfileId("destination-profile")
.bigqueryProfile(ConnectionProfileBigqueryProfileArgs.builder()
.build())
.build());
var default_ = new Stream("default", StreamArgs.builder()
.streamId("my-stream")
.location("us-central1")
.displayName("my stream")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile(sourceConnectionProfile.id())
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile(destinationConnectionProfile.id())
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.sourceHierarchyDatasets(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsArgs.builder()
.datasetTemplate(StreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasetsDatasetTemplateArgs.builder()
.location("us-central1")
.build())
.build())
.appendOnly(StreamDestinationConfigBigqueryDestinationConfigAppendOnlyArgs.builder()
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.build());
}
}
resources:
instance:
type: gcp:sql:DatabaseInstance
properties:
name: my-instance
databaseVersion: MYSQL_8_0
region: us-central1
settings:
tier: db-f1-micro
backupConfiguration:
enabled: true
binaryLogEnabled: true
ipConfiguration:
authorizedNetworks:
- value: 34.71.242.81
- value: 34.72.28.29
- value: 34.67.6.157
- value: 34.67.234.134
- value: 34.72.239.218
deletionProtection: true
db:
type: gcp:sql:Database
properties:
instance: ${instance.name}
name: db
pwd:
type: random:Password
properties:
length: 16
special: false
user:
type: gcp:sql:User
properties:
name: user
instance: ${instance.name}
host: '%'
password: ${pwd.result}
sourceConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: source_connection_profile
properties:
displayName: Source connection profile
location: us-central1
connectionProfileId: source-profile
mysqlProfile:
hostname: ${instance.publicIpAddress}
username: ${user.name}
password: ${user.password}
destinationConnectionProfile:
type: gcp:datastream:ConnectionProfile
name: destination_connection_profile
properties:
displayName: Connection profile
location: us-central1
connectionProfileId: destination-profile
bigqueryProfile: {}
default:
type: gcp:datastream:Stream
properties:
streamId: my-stream
location: us-central1
displayName: my stream
sourceConfig:
sourceConnectionProfile: ${sourceConnectionProfile.id}
mysqlSourceConfig: {}
destinationConfig:
destinationConnectionProfile: ${destinationConnectionProfile.id}
bigqueryDestinationConfig:
sourceHierarchyDatasets:
datasetTemplate:
location: us-central1
appendOnly: {}
backfillNone: {}
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The appendOnly property in bigqueryDestinationConfig changes write behavior to insert-only, creating a new row for every change event. This preserves complete change history but requires more storage and different query patterns.
Apply clustering and partitioning rules to BigQuery tables
BigQuery performance improves when tables are properly clustered and partitioned. Rule sets let you define these optimizations per source table.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const project = gcp.organizations.getProject({});
const stream = new gcp.datastream.Stream("stream", {
streamId: "rules-stream",
location: "us-central1",
displayName: "BigQuery Stream with Rules",
sourceConfig: {
sourceConnectionProfile: "rules-source-profile",
mysqlSourceConfig: {
includeObjects: {
mysqlDatabases: [{
database: "my_database",
}],
},
binaryLogPosition: {},
},
},
destinationConfig: {
destinationConnectionProfile: "rules-dest-profile",
bigqueryDestinationConfig: {
singleTargetDataset: {
datasetId: "rules-project:rules-dataset",
},
},
},
backfillNone: {},
ruleSets: [
{
objectFilter: {
sourceObjectIdentifier: {
mysqlIdentifier: {
database: "test_database",
table: "test_table_1",
},
},
},
customizationRules: [
{
bigqueryClustering: {
columns: ["user_id"],
},
},
{
bigqueryPartitioning: {
ingestionTimePartition: {},
},
},
],
},
{
objectFilter: {
sourceObjectIdentifier: {
mysqlIdentifier: {
database: "test_database",
table: "test_table_2",
},
},
},
customizationRules: [
{
bigqueryClustering: {
columns: ["event_time"],
},
},
{
bigqueryPartitioning: {
timeUnitPartition: {
column: "event_time",
partitioningTimeGranularity: "PARTITIONING_TIME_GRANULARITY_DAY",
},
},
},
],
},
],
});
import pulumi
import pulumi_gcp as gcp
project = gcp.organizations.get_project()
stream = gcp.datastream.Stream("stream",
stream_id="rules-stream",
location="us-central1",
display_name="BigQuery Stream with Rules",
source_config={
"source_connection_profile": "rules-source-profile",
"mysql_source_config": {
"include_objects": {
"mysql_databases": [{
"database": "my_database",
}],
},
"binary_log_position": {},
},
},
destination_config={
"destination_connection_profile": "rules-dest-profile",
"bigquery_destination_config": {
"single_target_dataset": {
"dataset_id": "rules-project:rules-dataset",
},
},
},
backfill_none={},
rule_sets=[
{
"object_filter": {
"source_object_identifier": {
"mysql_identifier": {
"database": "test_database",
"table": "test_table_1",
},
},
},
"customization_rules": [
{
"bigquery_clustering": {
"columns": ["user_id"],
},
},
{
"bigquery_partitioning": {
"ingestion_time_partition": {},
},
},
],
},
{
"object_filter": {
"source_object_identifier": {
"mysql_identifier": {
"database": "test_database",
"table": "test_table_2",
},
},
},
"customization_rules": [
{
"bigquery_clustering": {
"columns": ["event_time"],
},
},
{
"bigquery_partitioning": {
"time_unit_partition": {
"column": "event_time",
"partitioning_time_granularity": "PARTITIONING_TIME_GRANULARITY_DAY",
},
},
},
],
},
])
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datastream"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
_, err = datastream.NewStream(ctx, "stream", &datastream.StreamArgs{
StreamId: pulumi.String("rules-stream"),
Location: pulumi.String("us-central1"),
DisplayName: pulumi.String("BigQuery Stream with Rules"),
SourceConfig: &datastream.StreamSourceConfigArgs{
SourceConnectionProfile: pulumi.String("rules-source-profile"),
MysqlSourceConfig: &datastream.StreamSourceConfigMysqlSourceConfigArgs{
IncludeObjects: &datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs{
MysqlDatabases: datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArray{
&datastream.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs{
Database: pulumi.String("my_database"),
},
},
},
BinaryLogPosition: &datastream.StreamSourceConfigMysqlSourceConfigBinaryLogPositionArgs{},
},
},
DestinationConfig: &datastream.StreamDestinationConfigArgs{
DestinationConnectionProfile: pulumi.String("rules-dest-profile"),
BigqueryDestinationConfig: &datastream.StreamDestinationConfigBigqueryDestinationConfigArgs{
SingleTargetDataset: &datastream.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs{
DatasetId: pulumi.String("rules-project:rules-dataset"),
},
},
},
BackfillNone: &datastream.StreamBackfillNoneArgs{},
RuleSets: datastream.StreamRuleSetArray{
&datastream.StreamRuleSetArgs{
ObjectFilter: &datastream.StreamRuleSetObjectFilterArgs{
SourceObjectIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierArgs{
MysqlIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs{
Database: pulumi.String("test_database"),
Table: pulumi.String("test_table_1"),
},
},
},
CustomizationRules: datastream.StreamRuleSetCustomizationRuleArray{
&datastream.StreamRuleSetCustomizationRuleArgs{
BigqueryClustering: &datastream.StreamRuleSetCustomizationRuleBigqueryClusteringArgs{
Columns: pulumi.StringArray{
pulumi.String("user_id"),
},
},
},
&datastream.StreamRuleSetCustomizationRuleArgs{
BigqueryPartitioning: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs{
IngestionTimePartition: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningIngestionTimePartitionArgs{},
},
},
},
},
&datastream.StreamRuleSetArgs{
ObjectFilter: &datastream.StreamRuleSetObjectFilterArgs{
SourceObjectIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierArgs{
MysqlIdentifier: &datastream.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs{
Database: pulumi.String("test_database"),
Table: pulumi.String("test_table_2"),
},
},
},
CustomizationRules: datastream.StreamRuleSetCustomizationRuleArray{
&datastream.StreamRuleSetCustomizationRuleArgs{
BigqueryClustering: &datastream.StreamRuleSetCustomizationRuleBigqueryClusteringArgs{
Columns: pulumi.StringArray{
pulumi.String("event_time"),
},
},
},
&datastream.StreamRuleSetCustomizationRuleArgs{
BigqueryPartitioning: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs{
TimeUnitPartition: &datastream.StreamRuleSetCustomizationRuleBigqueryPartitioningTimeUnitPartitionArgs{
Column: pulumi.String("event_time"),
PartitioningTimeGranularity: pulumi.String("PARTITIONING_TIME_GRANULARITY_DAY"),
},
},
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var project = Gcp.Organizations.GetProject.Invoke();
var stream = new Gcp.Datastream.Stream("stream", new()
{
StreamId = "rules-stream",
Location = "us-central1",
DisplayName = "BigQuery Stream with Rules",
SourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigArgs
{
SourceConnectionProfile = "rules-source-profile",
MysqlSourceConfig = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigArgs
{
IncludeObjects = new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs
{
MysqlDatabases = new[]
{
new Gcp.Datastream.Inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs
{
Database = "my_database",
},
},
},
BinaryLogPosition = null,
},
},
DestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigArgs
{
DestinationConnectionProfile = "rules-dest-profile",
BigqueryDestinationConfig = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigArgs
{
SingleTargetDataset = new Gcp.Datastream.Inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs
{
DatasetId = "rules-project:rules-dataset",
},
},
},
BackfillNone = null,
RuleSets = new[]
{
new Gcp.Datastream.Inputs.StreamRuleSetArgs
{
ObjectFilter = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterArgs
{
SourceObjectIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierArgs
{
MysqlIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs
{
Database = "test_database",
Table = "test_table_1",
},
},
},
CustomizationRules = new[]
{
new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
{
BigqueryClustering = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryClusteringArgs
{
Columns = new[]
{
"user_id",
},
},
},
new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
{
BigqueryPartitioning = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs
{
IngestionTimePartition = null,
},
},
},
},
new Gcp.Datastream.Inputs.StreamRuleSetArgs
{
ObjectFilter = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterArgs
{
SourceObjectIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierArgs
{
MysqlIdentifier = new Gcp.Datastream.Inputs.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs
{
Database = "test_database",
Table = "test_table_2",
},
},
},
CustomizationRules = new[]
{
new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
{
BigqueryClustering = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryClusteringArgs
{
Columns = new[]
{
"event_time",
},
},
},
new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleArgs
{
BigqueryPartitioning = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryPartitioningArgs
{
TimeUnitPartition = new Gcp.Datastream.Inputs.StreamRuleSetCustomizationRuleBigqueryPartitioningTimeUnitPartitionArgs
{
Column = "event_time",
PartitioningTimeGranularity = "PARTITIONING_TIME_GRANULARITY_DAY",
},
},
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.datastream.Stream;
import com.pulumi.gcp.datastream.StreamArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs;
import com.pulumi.gcp.datastream.inputs.StreamSourceConfigMysqlSourceConfigBinaryLogPositionArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigArgs;
import com.pulumi.gcp.datastream.inputs.StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs;
import com.pulumi.gcp.datastream.inputs.StreamBackfillNoneArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetObjectFilterArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetObjectFilterSourceObjectIdentifierArgs;
import com.pulumi.gcp.datastream.inputs.StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var stream = new Stream("stream", StreamArgs.builder()
.streamId("rules-stream")
.location("us-central1")
.displayName("BigQuery Stream with Rules")
.sourceConfig(StreamSourceConfigArgs.builder()
.sourceConnectionProfile("rules-source-profile")
.mysqlSourceConfig(StreamSourceConfigMysqlSourceConfigArgs.builder()
.includeObjects(StreamSourceConfigMysqlSourceConfigIncludeObjectsArgs.builder()
.mysqlDatabases(StreamSourceConfigMysqlSourceConfigIncludeObjectsMysqlDatabaseArgs.builder()
.database("my_database")
.build())
.build())
.binaryLogPosition(StreamSourceConfigMysqlSourceConfigBinaryLogPositionArgs.builder()
.build())
.build())
.build())
.destinationConfig(StreamDestinationConfigArgs.builder()
.destinationConnectionProfile("rules-dest-profile")
.bigqueryDestinationConfig(StreamDestinationConfigBigqueryDestinationConfigArgs.builder()
.singleTargetDataset(StreamDestinationConfigBigqueryDestinationConfigSingleTargetDatasetArgs.builder()
.datasetId("rules-project:rules-dataset")
.build())
.build())
.build())
.backfillNone(StreamBackfillNoneArgs.builder()
.build())
.ruleSets(
StreamRuleSetArgs.builder()
.objectFilter(StreamRuleSetObjectFilterArgs.builder()
.sourceObjectIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierArgs.builder()
.mysqlIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs.builder()
.database("test_database")
.table("test_table_1")
.build())
.build())
.build())
.customizationRules(
StreamRuleSetCustomizationRuleArgs.builder()
.bigqueryClustering(StreamRuleSetCustomizationRuleBigqueryClusteringArgs.builder()
.columns("user_id")
.build())
.build(),
StreamRuleSetCustomizationRuleArgs.builder()
.bigqueryPartitioning(StreamRuleSetCustomizationRuleBigqueryPartitioningArgs.builder()
.ingestionTimePartition(StreamRuleSetCustomizationRuleBigqueryPartitioningIngestionTimePartitionArgs.builder()
.build())
.build())
.build())
.build(),
StreamRuleSetArgs.builder()
.objectFilter(StreamRuleSetObjectFilterArgs.builder()
.sourceObjectIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierArgs.builder()
.mysqlIdentifier(StreamRuleSetObjectFilterSourceObjectIdentifierMysqlIdentifierArgs.builder()
.database("test_database")
.table("test_table_2")
.build())
.build())
.build())
.customizationRules(
StreamRuleSetCustomizationRuleArgs.builder()
.bigqueryClustering(StreamRuleSetCustomizationRuleBigqueryClusteringArgs.builder()
.columns("event_time")
.build())
.build(),
StreamRuleSetCustomizationRuleArgs.builder()
.bigqueryPartitioning(StreamRuleSetCustomizationRuleBigqueryPartitioningArgs.builder()
.timeUnitPartition(StreamRuleSetCustomizationRuleBigqueryPartitioningTimeUnitPartitionArgs.builder()
.column("event_time")
.partitioningTimeGranularity("PARTITIONING_TIME_GRANULARITY_DAY")
.build())
.build())
.build())
.build())
.build());
}
}
resources:
stream:
type: gcp:datastream:Stream
properties:
streamId: rules-stream
location: us-central1
displayName: BigQuery Stream with Rules
sourceConfig:
sourceConnectionProfile: rules-source-profile
mysqlSourceConfig:
includeObjects:
mysqlDatabases:
- database: my_database
binaryLogPosition: {}
destinationConfig:
destinationConnectionProfile: rules-dest-profile
bigqueryDestinationConfig:
singleTargetDataset:
datasetId: rules-project:rules-dataset
backfillNone: {}
ruleSets:
- objectFilter:
sourceObjectIdentifier:
mysqlIdentifier:
database: test_database
table: test_table_1
customizationRules:
- bigqueryClustering:
columns:
- user_id
- bigqueryPartitioning:
ingestionTimePartition: {}
- objectFilter:
sourceObjectIdentifier:
mysqlIdentifier:
database: test_database
table: test_table_2
customizationRules:
- bigqueryClustering:
columns:
- event_time
- bigqueryPartitioning:
timeUnitPartition:
column: event_time
partitioningTimeGranularity: PARTITIONING_TIME_GRANULARITY_DAY
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The ruleSets property defines per-table customizations. Each rule set has an objectFilter that identifies the source table and customizationRules that specify BigQuery clustering columns and partitioning strategy. Clustering improves query performance for filtered queries, while partitioning reduces costs by limiting data scanned.
Beyond these examples
These snippets focus on specific stream-level features: source database configuration for MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB; destination configuration for Cloud Storage and BigQuery; and filtering, backfill strategies, and encryption. They’re intentionally minimal rather than full replication pipelines.
The examples reference pre-existing infrastructure such as connection profiles for sources and destinations, Cloud SQL instances or external databases with replication enabled, GCS buckets or BigQuery datasets, KMS keys for encryption, and IAM permissions for the Datastream service account. They focus on configuring the stream rather than provisioning the surrounding infrastructure.
To keep things focused, common stream patterns are omitted, including:
- Connection profile creation (referenced but not shown)
- Stream state management (desiredState property)
- Private connectivity configuration
- Stream validation (createWithoutValidation)
These omissions are intentional: the goal is to illustrate how each stream feature is wired, not provide drop-in replication modules. See the Datastream Stream resource reference for all available configuration options.
Let's create GCP Datastream Streams
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Immutability & Configuration Constraints
location, streamId, project, customerManagedEncryptionKey, and createWithoutValidation. Changing any of these requires replacing the stream.createWithoutValidation to true. This property is immutable and must be set at creation time.Stream Lifecycle & State Management
desiredState with three values: NOT_STARTED (default, creates without starting), RUNNING (starts the stream), or PAUSED (pauses from RUNNING state).Backfill Strategies
backfillAll automatically backfills the stream’s objects (with optional exclusions), while backfillNone disables automatic backfill entirely.Source Configuration
mysqlSourceConfig, postgresqlSourceConfig, oracleSourceConfig, sqlServerSourceConfig, or mongodbSourceConfig.transactionLogs captures changes from transaction logs, while changeTables uses SQL Server Change Tracking. Configure one in sqlServerSourceConfig.publication and replicationSlot in postgresqlSourceConfig. For example: publication: "publication", replicationSlot: "replication_slot".Destination Configuration
sourceHierarchyDatasets creates datasets matching source hierarchy, or singleTargetDataset writes all data to one dataset. Configure in bigqueryDestinationConfig.ruleSets to configure clustering and partitioning. Specify bigqueryClustering with columns and bigqueryPartitioning with ingestionTimePartition or timeUnitPartition.Security & Permissions
roles/storage.objectViewer, roles/storage.objectCreator, and roles/storage.legacyBucketReader.customerManagedEncryptionKey on the stream and kmsKeyName in datasetTemplate. Grant roles/cloudkms.cryptoKeyEncrypterDecrypter to the BigQuery service account, and use dependsOn to ensure IAM binding completes first.Using a different cloud?
Explore analytics guides for other cloud providers: