The gcp:datafusion/instance:Instance resource, part of the Pulumi GCP provider, provisions a Data Fusion instance: its tier, region, networking mode, and optional features like encryption and event publishing. This guide focuses on four capabilities: instance tier selection, private networking with VPC peering and Private Service Connect, customer-managed encryption keys, and event publishing to Pub/Sub.
Data Fusion instances require a region and tier selection. Private instances need VPC networks and subnets; CMEK requires KMS keys with IAM bindings; event publishing needs Pub/Sub topics. The examples are intentionally small. Combine them with your own networking, encryption, and monitoring infrastructure.
Create a basic instance with minimal configuration
Most deployments start with a basic instance that provides core data pipeline capabilities without additional networking or security features.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const basicInstance = new gcp.datafusion.Instance("basic_instance", {
name: "my-instance",
region: "us-central1",
type: "BASIC",
});
import pulumi
import pulumi_gcp as gcp
basic_instance = gcp.datafusion.Instance("basic_instance",
name="my-instance",
region="us-central1",
type="BASIC")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := datafusion.NewInstance(ctx, "basic_instance", &datafusion.InstanceArgs{
Name: pulumi.String("my-instance"),
Region: pulumi.String("us-central1"),
Type: pulumi.String("BASIC"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var basicInstance = new Gcp.DataFusion.Instance("basic_instance", new()
{
Name = "my-instance",
Region = "us-central1",
Type = "BASIC",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var basicInstance = new Instance("basicInstance", InstanceArgs.builder()
.name("my-instance")
.region("us-central1")
.type("BASIC")
.build());
}
}
resources:
basicInstance:
type: gcp:datafusion:Instance
name: basic_instance
properties:
name: my-instance
region: us-central1
type: BASIC
The type property determines the instance tier: BASIC provides point-and-click pipeline creation with some limitations, ENTERPRISE adds streaming pipelines and higher concurrency, and DEVELOPER includes all features with restrictive capacity for development use. The name and region properties define where the instance runs.
Deploy a private instance with VPC peering
Production deployments often isolate Data Fusion nodes from the public internet by running them within a VPC.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const _default = gcp.appengine.getDefaultServiceAccount({});
const network = new gcp.compute.Network("network", {name: "datafusion-full-network"});
const privateIpAlloc = new gcp.compute.GlobalAddress("private_ip_alloc", {
name: "datafusion-ip-alloc",
addressType: "INTERNAL",
purpose: "VPC_PEERING",
prefixLength: 22,
network: network.id,
});
const extendedInstance = new gcp.datafusion.Instance("extended_instance", {
name: "my-instance",
description: "My Data Fusion instance",
displayName: "My Data Fusion instance",
region: "us-central1",
type: "BASIC",
enableStackdriverLogging: true,
enableStackdriverMonitoring: true,
privateInstance: true,
dataprocServiceAccount: _default.then(_default => _default.email),
labels: {
example_key: "example_value",
},
networkConfig: {
network: "default",
ipAllocation: pulumi.interpolate`${privateIpAlloc.address}/${privateIpAlloc.prefixLength}`,
},
accelerators: [{
acceleratorType: "CDC",
state: "ENABLED",
}],
});
import pulumi
import pulumi_gcp as gcp
default = gcp.appengine.get_default_service_account()
network = gcp.compute.Network("network", name="datafusion-full-network")
private_ip_alloc = gcp.compute.GlobalAddress("private_ip_alloc",
name="datafusion-ip-alloc",
address_type="INTERNAL",
purpose="VPC_PEERING",
prefix_length=22,
network=network.id)
extended_instance = gcp.datafusion.Instance("extended_instance",
name="my-instance",
description="My Data Fusion instance",
display_name="My Data Fusion instance",
region="us-central1",
type="BASIC",
enable_stackdriver_logging=True,
enable_stackdriver_monitoring=True,
private_instance=True,
dataproc_service_account=default.email,
labels={
"example_key": "example_value",
},
network_config={
"network": "default",
"ip_allocation": pulumi.Output.all(
address=private_ip_alloc.address,
prefix_length=private_ip_alloc.prefix_length
).apply(lambda resolved_outputs: f"{resolved_outputs['address']}/{resolved_outputs['prefix_length']}")
,
},
accelerators=[{
"accelerator_type": "CDC",
"state": "ENABLED",
}])
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/appengine"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_default, err := appengine.GetDefaultServiceAccount(ctx, &appengine.GetDefaultServiceAccountArgs{}, nil)
if err != nil {
return err
}
network, err := compute.NewNetwork(ctx, "network", &compute.NetworkArgs{
Name: pulumi.String("datafusion-full-network"),
})
if err != nil {
return err
}
privateIpAlloc, err := compute.NewGlobalAddress(ctx, "private_ip_alloc", &compute.GlobalAddressArgs{
Name: pulumi.String("datafusion-ip-alloc"),
AddressType: pulumi.String("INTERNAL"),
Purpose: pulumi.String("VPC_PEERING"),
PrefixLength: pulumi.Int(22),
Network: network.ID(),
})
if err != nil {
return err
}
_, err = datafusion.NewInstance(ctx, "extended_instance", &datafusion.InstanceArgs{
Name: pulumi.String("my-instance"),
Description: pulumi.String("My Data Fusion instance"),
DisplayName: pulumi.String("My Data Fusion instance"),
Region: pulumi.String("us-central1"),
Type: pulumi.String("BASIC"),
EnableStackdriverLogging: pulumi.Bool(true),
EnableStackdriverMonitoring: pulumi.Bool(true),
PrivateInstance: pulumi.Bool(true),
DataprocServiceAccount: pulumi.String(_default.Email),
Labels: pulumi.StringMap{
"example_key": pulumi.String("example_value"),
},
NetworkConfig: &datafusion.InstanceNetworkConfigArgs{
Network: pulumi.String("default"),
IpAllocation: pulumi.All(privateIpAlloc.Address, privateIpAlloc.PrefixLength).ApplyT(func(_args []interface{}) (string, error) {
address := _args[0].(string)
prefixLength := _args[1].(int)
return fmt.Sprintf("%v/%v", address, prefixLength), nil
}).(pulumi.StringOutput),
},
Accelerators: datafusion.InstanceAcceleratorArray{
&datafusion.InstanceAcceleratorArgs{
AcceleratorType: pulumi.String("CDC"),
State: pulumi.String("ENABLED"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var @default = Gcp.AppEngine.GetDefaultServiceAccount.Invoke();
var network = new Gcp.Compute.Network("network", new()
{
Name = "datafusion-full-network",
});
var privateIpAlloc = new Gcp.Compute.GlobalAddress("private_ip_alloc", new()
{
Name = "datafusion-ip-alloc",
AddressType = "INTERNAL",
Purpose = "VPC_PEERING",
PrefixLength = 22,
Network = network.Id,
});
var extendedInstance = new Gcp.DataFusion.Instance("extended_instance", new()
{
Name = "my-instance",
Description = "My Data Fusion instance",
DisplayName = "My Data Fusion instance",
Region = "us-central1",
Type = "BASIC",
EnableStackdriverLogging = true,
EnableStackdriverMonitoring = true,
PrivateInstance = true,
DataprocServiceAccount = @default.Apply(@default => @default.Apply(getDefaultServiceAccountResult => getDefaultServiceAccountResult.Email)),
Labels =
{
{ "example_key", "example_value" },
},
NetworkConfig = new Gcp.DataFusion.Inputs.InstanceNetworkConfigArgs
{
Network = "default",
IpAllocation = Output.Tuple(privateIpAlloc.Address, privateIpAlloc.PrefixLength).Apply(values =>
{
var address = values.Item1;
var prefixLength = values.Item2;
return $"{address}/{prefixLength}";
}),
},
Accelerators = new[]
{
new Gcp.DataFusion.Inputs.InstanceAcceleratorArgs
{
AcceleratorType = "CDC",
State = "ENABLED",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.appengine.AppengineFunctions;
import com.pulumi.gcp.appengine.inputs.GetDefaultServiceAccountArgs;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.compute.GlobalAddress;
import com.pulumi.gcp.compute.GlobalAddressArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceNetworkConfigArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceAcceleratorArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var default = AppengineFunctions.getDefaultServiceAccount(GetDefaultServiceAccountArgs.builder()
.build());
var network = new Network("network", NetworkArgs.builder()
.name("datafusion-full-network")
.build());
var privateIpAlloc = new GlobalAddress("privateIpAlloc", GlobalAddressArgs.builder()
.name("datafusion-ip-alloc")
.addressType("INTERNAL")
.purpose("VPC_PEERING")
.prefixLength(22)
.network(network.id())
.build());
var extendedInstance = new Instance("extendedInstance", InstanceArgs.builder()
.name("my-instance")
.description("My Data Fusion instance")
.displayName("My Data Fusion instance")
.region("us-central1")
.type("BASIC")
.enableStackdriverLogging(true)
.enableStackdriverMonitoring(true)
.privateInstance(true)
.dataprocServiceAccount(default_.email())
.labels(Map.of("example_key", "example_value"))
.networkConfig(InstanceNetworkConfigArgs.builder()
.network("default")
.ipAllocation(Output.tuple(privateIpAlloc.address(), privateIpAlloc.prefixLength()).applyValue(values -> {
var address = values.t1;
var prefixLength = values.t2;
return String.format("%s/%s", address,prefixLength);
}))
.build())
.accelerators(InstanceAcceleratorArgs.builder()
.acceleratorType("CDC")
.state("ENABLED")
.build())
.build());
}
}
resources:
extendedInstance:
type: gcp:datafusion:Instance
name: extended_instance
properties:
name: my-instance
description: My Data Fusion instance
displayName: My Data Fusion instance
region: us-central1
type: BASIC
enableStackdriverLogging: true
enableStackdriverMonitoring: true
privateInstance: true
dataprocServiceAccount: ${default.email}
labels:
example_key: example_value
networkConfig:
network: default
ipAllocation: ${privateIpAlloc.address}/${privateIpAlloc.prefixLength}
accelerators:
- acceleratorType: CDC
state: ENABLED
network:
type: gcp:compute:Network
properties:
name: datafusion-full-network
privateIpAlloc:
type: gcp:compute:GlobalAddress
name: private_ip_alloc
properties:
name: datafusion-ip-alloc
addressType: INTERNAL
purpose: VPC_PEERING
prefixLength: 22
network: ${network.id}
variables:
default:
fn::invoke:
function: gcp:appengine:getDefaultServiceAccount
arguments: {}
Setting privateInstance to true creates an instance with private IP addresses only. The networkConfig block configures VPC peering: network specifies which VPC to peer with, and ipAllocation reserves a CIDR range for Data Fusion nodes. The accelerators array enables features like Change Data Capture (CDC). Stackdriver logging and monitoring capture pipeline execution metrics and logs.
Connect via Private Service Connect interfaces
Private Service Connect provides an alternative to VPC peering, using network attachments to control subnet access.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const psc = new gcp.compute.Network("psc", {
name: "datafusion-psc-network",
autoCreateSubnetworks: false,
});
const pscSubnetwork = new gcp.compute.Subnetwork("psc", {
name: "datafusion-psc-subnet",
region: "us-central1",
network: psc.id,
ipCidrRange: "10.0.0.0/16",
});
const pscNetworkAttachment = new gcp.compute.NetworkAttachment("psc", {
name: "datafusion-psc-attachment",
region: "us-central1",
connectionPreference: "ACCEPT_AUTOMATIC",
subnetworks: [pscSubnetwork.selfLink],
});
const pscInstance = new gcp.datafusion.Instance("psc_instance", {
name: "psc-instance",
region: "us-central1",
type: "BASIC",
privateInstance: true,
networkConfig: {
connectionType: "PRIVATE_SERVICE_CONNECT_INTERFACES",
privateServiceConnectConfig: {
networkAttachment: pscNetworkAttachment.id,
unreachableCidrBlock: "192.168.0.0/25",
},
},
});
import pulumi
import pulumi_gcp as gcp
psc = gcp.compute.Network("psc",
name="datafusion-psc-network",
auto_create_subnetworks=False)
psc_subnetwork = gcp.compute.Subnetwork("psc",
name="datafusion-psc-subnet",
region="us-central1",
network=psc.id,
ip_cidr_range="10.0.0.0/16")
psc_network_attachment = gcp.compute.NetworkAttachment("psc",
name="datafusion-psc-attachment",
region="us-central1",
connection_preference="ACCEPT_AUTOMATIC",
subnetworks=[psc_subnetwork.self_link])
psc_instance = gcp.datafusion.Instance("psc_instance",
name="psc-instance",
region="us-central1",
type="BASIC",
private_instance=True,
network_config={
"connection_type": "PRIVATE_SERVICE_CONNECT_INTERFACES",
"private_service_connect_config": {
"network_attachment": psc_network_attachment.id,
"unreachable_cidr_block": "192.168.0.0/25",
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
psc, err := compute.NewNetwork(ctx, "psc", &compute.NetworkArgs{
Name: pulumi.String("datafusion-psc-network"),
AutoCreateSubnetworks: pulumi.Bool(false),
})
if err != nil {
return err
}
pscSubnetwork, err := compute.NewSubnetwork(ctx, "psc", &compute.SubnetworkArgs{
Name: pulumi.String("datafusion-psc-subnet"),
Region: pulumi.String("us-central1"),
Network: psc.ID(),
IpCidrRange: pulumi.String("10.0.0.0/16"),
})
if err != nil {
return err
}
pscNetworkAttachment, err := compute.NewNetworkAttachment(ctx, "psc", &compute.NetworkAttachmentArgs{
Name: pulumi.String("datafusion-psc-attachment"),
Region: pulumi.String("us-central1"),
ConnectionPreference: pulumi.String("ACCEPT_AUTOMATIC"),
Subnetworks: pulumi.StringArray{
pscSubnetwork.SelfLink,
},
})
if err != nil {
return err
}
_, err = datafusion.NewInstance(ctx, "psc_instance", &datafusion.InstanceArgs{
Name: pulumi.String("psc-instance"),
Region: pulumi.String("us-central1"),
Type: pulumi.String("BASIC"),
PrivateInstance: pulumi.Bool(true),
NetworkConfig: &datafusion.InstanceNetworkConfigArgs{
ConnectionType: pulumi.String("PRIVATE_SERVICE_CONNECT_INTERFACES"),
PrivateServiceConnectConfig: &datafusion.InstanceNetworkConfigPrivateServiceConnectConfigArgs{
NetworkAttachment: pscNetworkAttachment.ID(),
UnreachableCidrBlock: pulumi.String("192.168.0.0/25"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var psc = new Gcp.Compute.Network("psc", new()
{
Name = "datafusion-psc-network",
AutoCreateSubnetworks = false,
});
var pscSubnetwork = new Gcp.Compute.Subnetwork("psc", new()
{
Name = "datafusion-psc-subnet",
Region = "us-central1",
Network = psc.Id,
IpCidrRange = "10.0.0.0/16",
});
var pscNetworkAttachment = new Gcp.Compute.NetworkAttachment("psc", new()
{
Name = "datafusion-psc-attachment",
Region = "us-central1",
ConnectionPreference = "ACCEPT_AUTOMATIC",
Subnetworks = new[]
{
pscSubnetwork.SelfLink,
},
});
var pscInstance = new Gcp.DataFusion.Instance("psc_instance", new()
{
Name = "psc-instance",
Region = "us-central1",
Type = "BASIC",
PrivateInstance = true,
NetworkConfig = new Gcp.DataFusion.Inputs.InstanceNetworkConfigArgs
{
ConnectionType = "PRIVATE_SERVICE_CONNECT_INTERFACES",
PrivateServiceConnectConfig = new Gcp.DataFusion.Inputs.InstanceNetworkConfigPrivateServiceConnectConfigArgs
{
NetworkAttachment = pscNetworkAttachment.Id,
UnreachableCidrBlock = "192.168.0.0/25",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.compute.Subnetwork;
import com.pulumi.gcp.compute.SubnetworkArgs;
import com.pulumi.gcp.compute.NetworkAttachment;
import com.pulumi.gcp.compute.NetworkAttachmentArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceNetworkConfigArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceNetworkConfigPrivateServiceConnectConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var psc = new Network("psc", NetworkArgs.builder()
.name("datafusion-psc-network")
.autoCreateSubnetworks(false)
.build());
var pscSubnetwork = new Subnetwork("pscSubnetwork", SubnetworkArgs.builder()
.name("datafusion-psc-subnet")
.region("us-central1")
.network(psc.id())
.ipCidrRange("10.0.0.0/16")
.build());
var pscNetworkAttachment = new NetworkAttachment("pscNetworkAttachment", NetworkAttachmentArgs.builder()
.name("datafusion-psc-attachment")
.region("us-central1")
.connectionPreference("ACCEPT_AUTOMATIC")
.subnetworks(pscSubnetwork.selfLink())
.build());
var pscInstance = new Instance("pscInstance", InstanceArgs.builder()
.name("psc-instance")
.region("us-central1")
.type("BASIC")
.privateInstance(true)
.networkConfig(InstanceNetworkConfigArgs.builder()
.connectionType("PRIVATE_SERVICE_CONNECT_INTERFACES")
.privateServiceConnectConfig(InstanceNetworkConfigPrivateServiceConnectConfigArgs.builder()
.networkAttachment(pscNetworkAttachment.id())
.unreachableCidrBlock("192.168.0.0/25")
.build())
.build())
.build());
}
}
resources:
pscInstance:
type: gcp:datafusion:Instance
name: psc_instance
properties:
name: psc-instance
region: us-central1
type: BASIC
privateInstance: true
networkConfig:
connectionType: PRIVATE_SERVICE_CONNECT_INTERFACES
privateServiceConnectConfig:
networkAttachment: ${pscNetworkAttachment.id}
unreachableCidrBlock: 192.168.0.0/25
psc:
type: gcp:compute:Network
properties:
name: datafusion-psc-network
autoCreateSubnetworks: false
pscSubnetwork:
type: gcp:compute:Subnetwork
name: psc
properties:
name: datafusion-psc-subnet
region: us-central1
network: ${psc.id}
ipCidrRange: 10.0.0.0/16
pscNetworkAttachment:
type: gcp:compute:NetworkAttachment
name: psc
properties:
name: datafusion-psc-attachment
region: us-central1
connectionPreference: ACCEPT_AUTOMATIC
subnetworks:
- ${pscSubnetwork.selfLink}
The connectionType property switches from VPC peering to Private Service Connect. The privateServiceConnectConfig block references a network attachment that defines which subnets can reach the instance. The unreachableCidrBlock reserves IP space for the Private Service Connect interface.
Encrypt data with customer-managed keys
Organizations with compliance requirements use customer-managed encryption keys (CMEK) to control data-at-rest encryption.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const keyRing = new gcp.kms.KeyRing("key_ring", {
name: "my-instance",
location: "us-central1",
});
const cryptoKey = new gcp.kms.CryptoKey("crypto_key", {
name: "my-instance",
keyRing: keyRing.id,
});
const project = gcp.organizations.getProject({});
const cryptoKeyMemberCdfSa = new gcp.kms.CryptoKeyIAMMember("crypto_key_member_cdf_sa", {
cryptoKeyId: cryptoKey.id,
role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datafusion.iam.gserviceaccount.com`),
});
const cryptoKeyMemberGcsSa = new gcp.kms.CryptoKeyIAMMember("crypto_key_member_gcs_sa", {
cryptoKeyId: cryptoKey.id,
role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
member: project.then(project => `serviceAccount:service-${project.number}@gs-project-accounts.iam.gserviceaccount.com`),
}, {
dependsOn: [cryptoKeyMemberCdfSa],
});
const cmek = new gcp.datafusion.Instance("cmek", {
name: "my-instance",
region: "us-central1",
type: "BASIC",
cryptoKeyConfig: {
keyReference: cryptoKey.id,
},
}, {
dependsOn: [cryptoKeyMemberGcsSa],
});
import pulumi
import pulumi_gcp as gcp
key_ring = gcp.kms.KeyRing("key_ring",
name="my-instance",
location="us-central1")
crypto_key = gcp.kms.CryptoKey("crypto_key",
name="my-instance",
key_ring=key_ring.id)
project = gcp.organizations.get_project()
crypto_key_member_cdf_sa = gcp.kms.CryptoKeyIAMMember("crypto_key_member_cdf_sa",
crypto_key_id=crypto_key.id,
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
member=f"serviceAccount:service-{project.number}@gcp-sa-datafusion.iam.gserviceaccount.com")
crypto_key_member_gcs_sa = gcp.kms.CryptoKeyIAMMember("crypto_key_member_gcs_sa",
crypto_key_id=crypto_key.id,
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
member=f"serviceAccount:service-{project.number}@gs-project-accounts.iam.gserviceaccount.com",
opts = pulumi.ResourceOptions(depends_on=[crypto_key_member_cdf_sa]))
cmek = gcp.datafusion.Instance("cmek",
name="my-instance",
region="us-central1",
type="BASIC",
crypto_key_config={
"key_reference": crypto_key.id,
},
opts = pulumi.ResourceOptions(depends_on=[crypto_key_member_gcs_sa]))
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/kms"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
keyRing, err := kms.NewKeyRing(ctx, "key_ring", &kms.KeyRingArgs{
Name: pulumi.String("my-instance"),
Location: pulumi.String("us-central1"),
})
if err != nil {
return err
}
cryptoKey, err := kms.NewCryptoKey(ctx, "crypto_key", &kms.CryptoKeyArgs{
Name: pulumi.String("my-instance"),
KeyRing: keyRing.ID(),
})
if err != nil {
return err
}
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
cryptoKeyMemberCdfSa, err := kms.NewCryptoKeyIAMMember(ctx, "crypto_key_member_cdf_sa", &kms.CryptoKeyIAMMemberArgs{
CryptoKeyId: cryptoKey.ID(),
Role: pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datafusion.iam.gserviceaccount.com", project.Number),
})
if err != nil {
return err
}
cryptoKeyMemberGcsSa, err := kms.NewCryptoKeyIAMMember(ctx, "crypto_key_member_gcs_sa", &kms.CryptoKeyIAMMemberArgs{
CryptoKeyId: cryptoKey.ID(),
Role: pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
Member: pulumi.Sprintf("serviceAccount:service-%v@gs-project-accounts.iam.gserviceaccount.com", project.Number),
}, pulumi.DependsOn([]pulumi.Resource{
cryptoKeyMemberCdfSa,
}))
if err != nil {
return err
}
_, err = datafusion.NewInstance(ctx, "cmek", &datafusion.InstanceArgs{
Name: pulumi.String("my-instance"),
Region: pulumi.String("us-central1"),
Type: pulumi.String("BASIC"),
CryptoKeyConfig: &datafusion.InstanceCryptoKeyConfigArgs{
KeyReference: cryptoKey.ID(),
},
}, pulumi.DependsOn([]pulumi.Resource{
cryptoKeyMemberGcsSa,
}))
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var keyRing = new Gcp.Kms.KeyRing("key_ring", new()
{
Name = "my-instance",
Location = "us-central1",
});
var cryptoKey = new Gcp.Kms.CryptoKey("crypto_key", new()
{
Name = "my-instance",
KeyRing = keyRing.Id,
});
var project = Gcp.Organizations.GetProject.Invoke();
var cryptoKeyMemberCdfSa = new Gcp.Kms.CryptoKeyIAMMember("crypto_key_member_cdf_sa", new()
{
CryptoKeyId = cryptoKey.Id,
Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datafusion.iam.gserviceaccount.com",
});
var cryptoKeyMemberGcsSa = new Gcp.Kms.CryptoKeyIAMMember("crypto_key_member_gcs_sa", new()
{
CryptoKeyId = cryptoKey.Id,
Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gs-project-accounts.iam.gserviceaccount.com",
}, new CustomResourceOptions
{
DependsOn =
{
cryptoKeyMemberCdfSa,
},
});
var cmek = new Gcp.DataFusion.Instance("cmek", new()
{
Name = "my-instance",
Region = "us-central1",
Type = "BASIC",
CryptoKeyConfig = new Gcp.DataFusion.Inputs.InstanceCryptoKeyConfigArgs
{
KeyReference = cryptoKey.Id,
},
}, new CustomResourceOptions
{
DependsOn =
{
cryptoKeyMemberGcsSa,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.kms.KeyRing;
import com.pulumi.gcp.kms.KeyRingArgs;
import com.pulumi.gcp.kms.CryptoKey;
import com.pulumi.gcp.kms.CryptoKeyArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.kms.CryptoKeyIAMMember;
import com.pulumi.gcp.kms.CryptoKeyIAMMemberArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceCryptoKeyConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var keyRing = new KeyRing("keyRing", KeyRingArgs.builder()
.name("my-instance")
.location("us-central1")
.build());
var cryptoKey = new CryptoKey("cryptoKey", CryptoKeyArgs.builder()
.name("my-instance")
.keyRing(keyRing.id())
.build());
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var cryptoKeyMemberCdfSa = new CryptoKeyIAMMember("cryptoKeyMemberCdfSa", CryptoKeyIAMMemberArgs.builder()
.cryptoKeyId(cryptoKey.id())
.role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
.member(String.format("serviceAccount:service-%s@gcp-sa-datafusion.iam.gserviceaccount.com", project.number()))
.build());
var cryptoKeyMemberGcsSa = new CryptoKeyIAMMember("cryptoKeyMemberGcsSa", CryptoKeyIAMMemberArgs.builder()
.cryptoKeyId(cryptoKey.id())
.role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
.member(String.format("serviceAccount:service-%s@gs-project-accounts.iam.gserviceaccount.com", project.number()))
.build(), CustomResourceOptions.builder()
.dependsOn(cryptoKeyMemberCdfSa)
.build());
var cmek = new Instance("cmek", InstanceArgs.builder()
.name("my-instance")
.region("us-central1")
.type("BASIC")
.cryptoKeyConfig(InstanceCryptoKeyConfigArgs.builder()
.keyReference(cryptoKey.id())
.build())
.build(), CustomResourceOptions.builder()
.dependsOn(cryptoKeyMemberGcsSa)
.build());
}
}
resources:
cmek:
type: gcp:datafusion:Instance
properties:
name: my-instance
region: us-central1
type: BASIC
cryptoKeyConfig:
keyReference: ${cryptoKey.id}
options:
dependsOn:
- ${cryptoKeyMemberGcsSa}
cryptoKey:
type: gcp:kms:CryptoKey
name: crypto_key
properties:
name: my-instance
keyRing: ${keyRing.id}
keyRing:
type: gcp:kms:KeyRing
name: key_ring
properties:
name: my-instance
location: us-central1
cryptoKeyMemberCdfSa:
type: gcp:kms:CryptoKeyIAMMember
name: crypto_key_member_cdf_sa
properties:
cryptoKeyId: ${cryptoKey.id}
role: roles/cloudkms.cryptoKeyEncrypterDecrypter
member: serviceAccount:service-${project.number}@gcp-sa-datafusion.iam.gserviceaccount.com
cryptoKeyMemberGcsSa:
type: gcp:kms:CryptoKeyIAMMember
name: crypto_key_member_gcs_sa
properties:
cryptoKeyId: ${cryptoKey.id}
role: roles/cloudkms.cryptoKeyEncrypterDecrypter
member: serviceAccount:service-${project.number}@gs-project-accounts.iam.gserviceaccount.com
options:
dependsOn:
- ${cryptoKeyMemberCdfSa}
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The cryptoKeyConfig block references a Cloud KMS key for encryption. Before creating the instance, grant the Data Fusion and Cloud Storage service accounts the cryptoKeyEncrypterDecrypter role on the key. The example uses project-specific service account addresses that follow the pattern service-{PROJECT_NUMBER}@{SERVICE}.iam.gserviceaccount.com.
Enable role-based access control with Enterprise tier
Enterprise instances provide advanced features including role-based access control (RBAC) for fine-grained permissions.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const enterpriseInstance = new gcp.datafusion.Instance("enterprise_instance", {
name: "my-instance",
region: "us-central1",
type: "ENTERPRISE",
enableRbac: true,
});
import pulumi
import pulumi_gcp as gcp
enterprise_instance = gcp.datafusion.Instance("enterprise_instance",
name="my-instance",
region="us-central1",
type="ENTERPRISE",
enable_rbac=True)
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := datafusion.NewInstance(ctx, "enterprise_instance", &datafusion.InstanceArgs{
Name: pulumi.String("my-instance"),
Region: pulumi.String("us-central1"),
Type: pulumi.String("ENTERPRISE"),
EnableRbac: pulumi.Bool(true),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var enterpriseInstance = new Gcp.DataFusion.Instance("enterprise_instance", new()
{
Name = "my-instance",
Region = "us-central1",
Type = "ENTERPRISE",
EnableRbac = true,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var enterpriseInstance = new Instance("enterpriseInstance", InstanceArgs.builder()
.name("my-instance")
.region("us-central1")
.type("ENTERPRISE")
.enableRbac(true)
.build());
}
}
resources:
enterpriseInstance:
type: gcp:datafusion:Instance
name: enterprise_instance
properties:
name: my-instance
region: us-central1
type: ENTERPRISE
enableRbac: true
Setting type to ENTERPRISE unlocks streaming pipeline support and higher concurrency limits. The enableRbac property enables granular role-based access control, allowing you to assign specific permissions to users and service accounts.
Publish pipeline events to Pub/Sub
Data Fusion can publish pipeline execution events to Pub/Sub topics, enabling downstream monitoring and orchestration.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const eventTopic = new gcp.pubsub.Topic("event", {name: "my-instance"});
const event = new gcp.datafusion.Instance("event", {
name: "my-instance",
region: "us-central1",
type: "BASIC",
eventPublishConfig: {
enabled: true,
topic: eventTopic.id,
},
});
import pulumi
import pulumi_gcp as gcp
event_topic = gcp.pubsub.Topic("event", name="my-instance")
event = gcp.datafusion.Instance("event",
name="my-instance",
region="us-central1",
type="BASIC",
event_publish_config={
"enabled": True,
"topic": event_topic.id,
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
eventTopic, err := pubsub.NewTopic(ctx, "event", &pubsub.TopicArgs{
Name: pulumi.String("my-instance"),
})
if err != nil {
return err
}
_, err = datafusion.NewInstance(ctx, "event", &datafusion.InstanceArgs{
Name: pulumi.String("my-instance"),
Region: pulumi.String("us-central1"),
Type: pulumi.String("BASIC"),
EventPublishConfig: &datafusion.InstanceEventPublishConfigArgs{
Enabled: pulumi.Bool(true),
Topic: eventTopic.ID(),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var eventTopic = new Gcp.PubSub.Topic("event", new()
{
Name = "my-instance",
});
var @event = new Gcp.DataFusion.Instance("event", new()
{
Name = "my-instance",
Region = "us-central1",
Type = "BASIC",
EventPublishConfig = new Gcp.DataFusion.Inputs.InstanceEventPublishConfigArgs
{
Enabled = true,
Topic = eventTopic.Id,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceEventPublishConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var eventTopic = new Topic("eventTopic", TopicArgs.builder()
.name("my-instance")
.build());
var event = new Instance("event", InstanceArgs.builder()
.name("my-instance")
.region("us-central1")
.type("BASIC")
.eventPublishConfig(InstanceEventPublishConfigArgs.builder()
.enabled(true)
.topic(eventTopic.id())
.build())
.build());
}
}
resources:
event:
type: gcp:datafusion:Instance
properties:
name: my-instance
region: us-central1
type: BASIC
eventPublishConfig:
enabled: true
topic: ${eventTopic.id}
eventTopic:
type: gcp:pubsub:Topic
name: event
properties:
name: my-instance
The eventPublishConfig block enables event publishing. Set enabled to true and reference a Pub/Sub topic via its resource ID. Data Fusion publishes pipeline state changes and execution events to this topic.
Beyond these examples
These snippets focus on specific Data Fusion instance features: instance tiers and RBAC, private networking (VPC peering and Private Service Connect), and customer-managed encryption and event publishing. They’re intentionally minimal rather than full data pipeline deployments.
The examples may reference pre-existing infrastructure such as VPC networks and subnets for private instances, KMS keys and IAM bindings for CMEK, and Pub/Sub topics for event publishing. They focus on configuring the instance rather than provisioning everything around it.
To keep things focused, common instance patterns are omitted, including:
- Zone placement for DEVELOPER instances (zone property)
- Version and patch revision management
- Accelerator configuration details (CDC, other types)
- Custom options map for instance behavior tuning
These omissions are intentional: the goal is to illustrate how each Data Fusion feature is wired, not provide drop-in data platform modules. See the Data Fusion Instance resource reference for all available configuration options.
Let's deploy GCP Data Fusion Instances
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Instance Types & Configuration
name, type, region, zone, privateInstance, networkConfig, cryptoKeyConfig, dataprocServiceAccount, description, displayName, project, options, and tags.Networking & Private Instances
privateInstance to true and configure networkConfig with your network details. Private instances have all nodes on private IPs without public internet access. For standard VPC peering, specify network and ipAllocation. For Private Service Connect, use connectionType: PRIVATE_SERVICE_CONNECT_INTERFACES with a networkAttachment.zone property. BASIC and ENTERPRISE instances don’t require zone specification.Security & Encryption
cryptoKeyConfig.keyReference with your Cloud KMS key ID. You must grant roles/cloudkms.cryptoKeyEncrypterDecrypter to both service accounts: service-PROJECT_NUMBER@gcp-sa-datafusion.iam.gserviceaccount.com (Data Fusion) and service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com (GCS). Use dependsOn to ensure IAM bindings complete before instance creation.enableRbac to true. This feature enables granular role-based access control and is typically used with ENTERPRISE type instances.State Management & Diffs
accelerators can create a permadiff with the options field. To resolve this, either manually update your state file to include the diffed options, or add options to a lifecycle ignore_changes block.labels field is non-authoritative and only manages labels present in your configuration. Use effectiveLabels to see all labels on the resource, including those set by other clients and services.tenantProjectId to extract the tenant project ID. The serviceAccount output will be removed in a future major release.Optional Features
eventPublishConfig with enabled: true and specify your Pub/Sub topic ID in the topic field.Using a different cloud?
Explore analytics guides for other cloud providers: