Deploy GCP Data Fusion Instances

The gcp:datafusion/instance:Instance resource, part of the Pulumi GCP provider, provisions a Data Fusion instance: its tier, region, networking mode, and optional features like encryption and event publishing. This guide focuses on four capabilities: instance tier selection, private networking via VPC peering or Private Service Connect, customer-managed encryption keys, and event publishing to Pub/Sub.

Data Fusion instances require a region and tier selection. Private instances depend on VPC networks and IP allocations; CMEK requires KMS keys with IAM bindings; event publishing requires Pub/Sub topics. The examples are intentionally small. Combine them with your own networking, encryption, and monitoring infrastructure.

Create a basic instance with minimal configuration

Most deployments start with a basic instance that provides core pipeline capabilities without advanced features.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const basicInstance = new gcp.datafusion.Instance("basic_instance", {
    name: "my-instance",
    region: "us-central1",
    type: "BASIC",
});
import pulumi
import pulumi_gcp as gcp

basic_instance = gcp.datafusion.Instance("basic_instance",
    name="my-instance",
    region="us-central1",
    type="BASIC")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := datafusion.NewInstance(ctx, "basic_instance", &datafusion.InstanceArgs{
			Name:   pulumi.String("my-instance"),
			Region: pulumi.String("us-central1"),
			Type:   pulumi.String("BASIC"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var basicInstance = new Gcp.DataFusion.Instance("basic_instance", new()
    {
        Name = "my-instance",
        Region = "us-central1",
        Type = "BASIC",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var basicInstance = new Instance("basicInstance", InstanceArgs.builder()
            .name("my-instance")
            .region("us-central1")
            .type("BASIC")
            .build());

    }
}
resources:
  basicInstance:
    type: gcp:datafusion:Instance
    name: basic_instance
    properties:
      name: my-instance
      region: us-central1
      type: BASIC

The type property determines the instance tier. BASIC provides point-and-click pipeline development with limitations on concurrent pipelines and no streaming support. The name and region properties identify where the instance runs.

Deploy a private instance with VPC peering

Production deployments often isolate Data Fusion nodes within a VPC, preventing public internet access while enabling connections to internal data sources.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const _default = gcp.appengine.getDefaultServiceAccount({});
const network = new gcp.compute.Network("network", {name: "datafusion-full-network"});
const privateIpAlloc = new gcp.compute.GlobalAddress("private_ip_alloc", {
    name: "datafusion-ip-alloc",
    addressType: "INTERNAL",
    purpose: "VPC_PEERING",
    prefixLength: 22,
    network: network.id,
});
const extendedInstance = new gcp.datafusion.Instance("extended_instance", {
    name: "my-instance",
    description: "My Data Fusion instance",
    displayName: "My Data Fusion instance",
    region: "us-central1",
    type: "BASIC",
    enableStackdriverLogging: true,
    enableStackdriverMonitoring: true,
    privateInstance: true,
    dataprocServiceAccount: _default.then(_default => _default.email),
    labels: {
        example_key: "example_value",
    },
    networkConfig: {
        network: "default",
        ipAllocation: pulumi.interpolate`${privateIpAlloc.address}/${privateIpAlloc.prefixLength}`,
    },
    accelerators: [{
        acceleratorType: "CDC",
        state: "ENABLED",
    }],
});
import pulumi
import pulumi_gcp as gcp

default = gcp.appengine.get_default_service_account()
network = gcp.compute.Network("network", name="datafusion-full-network")
private_ip_alloc = gcp.compute.GlobalAddress("private_ip_alloc",
    name="datafusion-ip-alloc",
    address_type="INTERNAL",
    purpose="VPC_PEERING",
    prefix_length=22,
    network=network.id)
extended_instance = gcp.datafusion.Instance("extended_instance",
    name="my-instance",
    description="My Data Fusion instance",
    display_name="My Data Fusion instance",
    region="us-central1",
    type="BASIC",
    enable_stackdriver_logging=True,
    enable_stackdriver_monitoring=True,
    private_instance=True,
    dataproc_service_account=default.email,
    labels={
        "example_key": "example_value",
    },
    network_config={
        "network": "default",
        "ip_allocation": pulumi.Output.all(
            address=private_ip_alloc.address,
            prefix_length=private_ip_alloc.prefix_length
).apply(lambda resolved_outputs: f"{resolved_outputs['address']}/{resolved_outputs['prefix_length']}")
,
    },
    accelerators=[{
        "accelerator_type": "CDC",
        "state": "ENABLED",
    }])
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/appengine"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_default, err := appengine.GetDefaultServiceAccount(ctx, &appengine.GetDefaultServiceAccountArgs{}, nil)
		if err != nil {
			return err
		}
		network, err := compute.NewNetwork(ctx, "network", &compute.NetworkArgs{
			Name: pulumi.String("datafusion-full-network"),
		})
		if err != nil {
			return err
		}
		privateIpAlloc, err := compute.NewGlobalAddress(ctx, "private_ip_alloc", &compute.GlobalAddressArgs{
			Name:         pulumi.String("datafusion-ip-alloc"),
			AddressType:  pulumi.String("INTERNAL"),
			Purpose:      pulumi.String("VPC_PEERING"),
			PrefixLength: pulumi.Int(22),
			Network:      network.ID(),
		})
		if err != nil {
			return err
		}
		_, err = datafusion.NewInstance(ctx, "extended_instance", &datafusion.InstanceArgs{
			Name:                        pulumi.String("my-instance"),
			Description:                 pulumi.String("My Data Fusion instance"),
			DisplayName:                 pulumi.String("My Data Fusion instance"),
			Region:                      pulumi.String("us-central1"),
			Type:                        pulumi.String("BASIC"),
			EnableStackdriverLogging:    pulumi.Bool(true),
			EnableStackdriverMonitoring: pulumi.Bool(true),
			PrivateInstance:             pulumi.Bool(true),
			DataprocServiceAccount:      pulumi.String(_default.Email),
			Labels: pulumi.StringMap{
				"example_key": pulumi.String("example_value"),
			},
			NetworkConfig: &datafusion.InstanceNetworkConfigArgs{
				Network: pulumi.String("default"),
				IpAllocation: pulumi.All(privateIpAlloc.Address, privateIpAlloc.PrefixLength).ApplyT(func(_args []interface{}) (string, error) {
					address := _args[0].(string)
					prefixLength := _args[1].(int)
					return fmt.Sprintf("%v/%v", address, prefixLength), nil
				}).(pulumi.StringOutput),
			},
			Accelerators: datafusion.InstanceAcceleratorArray{
				&datafusion.InstanceAcceleratorArgs{
					AcceleratorType: pulumi.String("CDC"),
					State:           pulumi.String("ENABLED"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var @default = Gcp.AppEngine.GetDefaultServiceAccount.Invoke();

    var network = new Gcp.Compute.Network("network", new()
    {
        Name = "datafusion-full-network",
    });

    var privateIpAlloc = new Gcp.Compute.GlobalAddress("private_ip_alloc", new()
    {
        Name = "datafusion-ip-alloc",
        AddressType = "INTERNAL",
        Purpose = "VPC_PEERING",
        PrefixLength = 22,
        Network = network.Id,
    });

    var extendedInstance = new Gcp.DataFusion.Instance("extended_instance", new()
    {
        Name = "my-instance",
        Description = "My Data Fusion instance",
        DisplayName = "My Data Fusion instance",
        Region = "us-central1",
        Type = "BASIC",
        EnableStackdriverLogging = true,
        EnableStackdriverMonitoring = true,
        PrivateInstance = true,
        DataprocServiceAccount = @default.Apply(@default => @default.Apply(getDefaultServiceAccountResult => getDefaultServiceAccountResult.Email)),
        Labels = 
        {
            { "example_key", "example_value" },
        },
        NetworkConfig = new Gcp.DataFusion.Inputs.InstanceNetworkConfigArgs
        {
            Network = "default",
            IpAllocation = Output.Tuple(privateIpAlloc.Address, privateIpAlloc.PrefixLength).Apply(values =>
            {
                var address = values.Item1;
                var prefixLength = values.Item2;
                return $"{address}/{prefixLength}";
            }),
        },
        Accelerators = new[]
        {
            new Gcp.DataFusion.Inputs.InstanceAcceleratorArgs
            {
                AcceleratorType = "CDC",
                State = "ENABLED",
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.appengine.AppengineFunctions;
import com.pulumi.gcp.appengine.inputs.GetDefaultServiceAccountArgs;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.compute.GlobalAddress;
import com.pulumi.gcp.compute.GlobalAddressArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceNetworkConfigArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceAcceleratorArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var default = AppengineFunctions.getDefaultServiceAccount(GetDefaultServiceAccountArgs.builder()
            .build());

        var network = new Network("network", NetworkArgs.builder()
            .name("datafusion-full-network")
            .build());

        var privateIpAlloc = new GlobalAddress("privateIpAlloc", GlobalAddressArgs.builder()
            .name("datafusion-ip-alloc")
            .addressType("INTERNAL")
            .purpose("VPC_PEERING")
            .prefixLength(22)
            .network(network.id())
            .build());

        var extendedInstance = new Instance("extendedInstance", InstanceArgs.builder()
            .name("my-instance")
            .description("My Data Fusion instance")
            .displayName("My Data Fusion instance")
            .region("us-central1")
            .type("BASIC")
            .enableStackdriverLogging(true)
            .enableStackdriverMonitoring(true)
            .privateInstance(true)
            .dataprocServiceAccount(default_.email())
            .labels(Map.of("example_key", "example_value"))
            .networkConfig(InstanceNetworkConfigArgs.builder()
                .network("default")
                .ipAllocation(Output.tuple(privateIpAlloc.address(), privateIpAlloc.prefixLength()).applyValue(values -> {
                    var address = values.t1;
                    var prefixLength = values.t2;
                    return String.format("%s/%s", address,prefixLength);
                }))
                .build())
            .accelerators(InstanceAcceleratorArgs.builder()
                .acceleratorType("CDC")
                .state("ENABLED")
                .build())
            .build());

    }
}
resources:
  extendedInstance:
    type: gcp:datafusion:Instance
    name: extended_instance
    properties:
      name: my-instance
      description: My Data Fusion instance
      displayName: My Data Fusion instance
      region: us-central1
      type: BASIC
      enableStackdriverLogging: true
      enableStackdriverMonitoring: true
      privateInstance: true
      dataprocServiceAccount: ${default.email}
      labels:
        example_key: example_value
      networkConfig:
        network: default
        ipAllocation: ${privateIpAlloc.address}/${privateIpAlloc.prefixLength}
      accelerators:
        - acceleratorType: CDC
          state: ENABLED
  network:
    type: gcp:compute:Network
    properties:
      name: datafusion-full-network
  privateIpAlloc:
    type: gcp:compute:GlobalAddress
    name: private_ip_alloc
    properties:
      name: datafusion-ip-alloc
      addressType: INTERNAL
      purpose: VPC_PEERING
      prefixLength: 22
      network: ${network.id}
variables:
  default:
    fn::invoke:
      function: gcp:appengine:getDefaultServiceAccount
      arguments: {}

Setting privateInstance to true places all Data Fusion nodes on private IPs. The networkConfig block specifies the VPC network and IP allocation range for peering. The ipAllocation uses a GlobalAddress to reserve a CIDR block for Data Fusion’s internal services. The dataprocServiceAccount property sets which service account Dataproc uses when Data Fusion launches processing jobs. The accelerators array enables features like Change Data Capture (CDC).

Connect via Private Service Connect interfaces

Private Service Connect offers an alternative to VPC peering, using network attachments to control subnet access.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const psc = new gcp.compute.Network("psc", {
    name: "datafusion-psc-network",
    autoCreateSubnetworks: false,
});
const pscSubnetwork = new gcp.compute.Subnetwork("psc", {
    name: "datafusion-psc-subnet",
    region: "us-central1",
    network: psc.id,
    ipCidrRange: "10.0.0.0/16",
});
const pscNetworkAttachment = new gcp.compute.NetworkAttachment("psc", {
    name: "datafusion-psc-attachment",
    region: "us-central1",
    connectionPreference: "ACCEPT_AUTOMATIC",
    subnetworks: [pscSubnetwork.selfLink],
});
const pscInstance = new gcp.datafusion.Instance("psc_instance", {
    name: "psc-instance",
    region: "us-central1",
    type: "BASIC",
    privateInstance: true,
    networkConfig: {
        connectionType: "PRIVATE_SERVICE_CONNECT_INTERFACES",
        privateServiceConnectConfig: {
            networkAttachment: pscNetworkAttachment.id,
            unreachableCidrBlock: "192.168.0.0/25",
        },
    },
});
import pulumi
import pulumi_gcp as gcp

psc = gcp.compute.Network("psc",
    name="datafusion-psc-network",
    auto_create_subnetworks=False)
psc_subnetwork = gcp.compute.Subnetwork("psc",
    name="datafusion-psc-subnet",
    region="us-central1",
    network=psc.id,
    ip_cidr_range="10.0.0.0/16")
psc_network_attachment = gcp.compute.NetworkAttachment("psc",
    name="datafusion-psc-attachment",
    region="us-central1",
    connection_preference="ACCEPT_AUTOMATIC",
    subnetworks=[psc_subnetwork.self_link])
psc_instance = gcp.datafusion.Instance("psc_instance",
    name="psc-instance",
    region="us-central1",
    type="BASIC",
    private_instance=True,
    network_config={
        "connection_type": "PRIVATE_SERVICE_CONNECT_INTERFACES",
        "private_service_connect_config": {
            "network_attachment": psc_network_attachment.id,
            "unreachable_cidr_block": "192.168.0.0/25",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		psc, err := compute.NewNetwork(ctx, "psc", &compute.NetworkArgs{
			Name:                  pulumi.String("datafusion-psc-network"),
			AutoCreateSubnetworks: pulumi.Bool(false),
		})
		if err != nil {
			return err
		}
		pscSubnetwork, err := compute.NewSubnetwork(ctx, "psc", &compute.SubnetworkArgs{
			Name:        pulumi.String("datafusion-psc-subnet"),
			Region:      pulumi.String("us-central1"),
			Network:     psc.ID(),
			IpCidrRange: pulumi.String("10.0.0.0/16"),
		})
		if err != nil {
			return err
		}
		pscNetworkAttachment, err := compute.NewNetworkAttachment(ctx, "psc", &compute.NetworkAttachmentArgs{
			Name:                 pulumi.String("datafusion-psc-attachment"),
			Region:               pulumi.String("us-central1"),
			ConnectionPreference: pulumi.String("ACCEPT_AUTOMATIC"),
			Subnetworks: pulumi.StringArray{
				pscSubnetwork.SelfLink,
			},
		})
		if err != nil {
			return err
		}
		_, err = datafusion.NewInstance(ctx, "psc_instance", &datafusion.InstanceArgs{
			Name:            pulumi.String("psc-instance"),
			Region:          pulumi.String("us-central1"),
			Type:            pulumi.String("BASIC"),
			PrivateInstance: pulumi.Bool(true),
			NetworkConfig: &datafusion.InstanceNetworkConfigArgs{
				ConnectionType: pulumi.String("PRIVATE_SERVICE_CONNECT_INTERFACES"),
				PrivateServiceConnectConfig: &datafusion.InstanceNetworkConfigPrivateServiceConnectConfigArgs{
					NetworkAttachment:    pscNetworkAttachment.ID(),
					UnreachableCidrBlock: pulumi.String("192.168.0.0/25"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var psc = new Gcp.Compute.Network("psc", new()
    {
        Name = "datafusion-psc-network",
        AutoCreateSubnetworks = false,
    });

    var pscSubnetwork = new Gcp.Compute.Subnetwork("psc", new()
    {
        Name = "datafusion-psc-subnet",
        Region = "us-central1",
        Network = psc.Id,
        IpCidrRange = "10.0.0.0/16",
    });

    var pscNetworkAttachment = new Gcp.Compute.NetworkAttachment("psc", new()
    {
        Name = "datafusion-psc-attachment",
        Region = "us-central1",
        ConnectionPreference = "ACCEPT_AUTOMATIC",
        Subnetworks = new[]
        {
            pscSubnetwork.SelfLink,
        },
    });

    var pscInstance = new Gcp.DataFusion.Instance("psc_instance", new()
    {
        Name = "psc-instance",
        Region = "us-central1",
        Type = "BASIC",
        PrivateInstance = true,
        NetworkConfig = new Gcp.DataFusion.Inputs.InstanceNetworkConfigArgs
        {
            ConnectionType = "PRIVATE_SERVICE_CONNECT_INTERFACES",
            PrivateServiceConnectConfig = new Gcp.DataFusion.Inputs.InstanceNetworkConfigPrivateServiceConnectConfigArgs
            {
                NetworkAttachment = pscNetworkAttachment.Id,
                UnreachableCidrBlock = "192.168.0.0/25",
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.compute.Subnetwork;
import com.pulumi.gcp.compute.SubnetworkArgs;
import com.pulumi.gcp.compute.NetworkAttachment;
import com.pulumi.gcp.compute.NetworkAttachmentArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceNetworkConfigArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceNetworkConfigPrivateServiceConnectConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var psc = new Network("psc", NetworkArgs.builder()
            .name("datafusion-psc-network")
            .autoCreateSubnetworks(false)
            .build());

        var pscSubnetwork = new Subnetwork("pscSubnetwork", SubnetworkArgs.builder()
            .name("datafusion-psc-subnet")
            .region("us-central1")
            .network(psc.id())
            .ipCidrRange("10.0.0.0/16")
            .build());

        var pscNetworkAttachment = new NetworkAttachment("pscNetworkAttachment", NetworkAttachmentArgs.builder()
            .name("datafusion-psc-attachment")
            .region("us-central1")
            .connectionPreference("ACCEPT_AUTOMATIC")
            .subnetworks(pscSubnetwork.selfLink())
            .build());

        var pscInstance = new Instance("pscInstance", InstanceArgs.builder()
            .name("psc-instance")
            .region("us-central1")
            .type("BASIC")
            .privateInstance(true)
            .networkConfig(InstanceNetworkConfigArgs.builder()
                .connectionType("PRIVATE_SERVICE_CONNECT_INTERFACES")
                .privateServiceConnectConfig(InstanceNetworkConfigPrivateServiceConnectConfigArgs.builder()
                    .networkAttachment(pscNetworkAttachment.id())
                    .unreachableCidrBlock("192.168.0.0/25")
                    .build())
                .build())
            .build());

    }
}
resources:
  pscInstance:
    type: gcp:datafusion:Instance
    name: psc_instance
    properties:
      name: psc-instance
      region: us-central1
      type: BASIC
      privateInstance: true
      networkConfig:
        connectionType: PRIVATE_SERVICE_CONNECT_INTERFACES
        privateServiceConnectConfig:
          networkAttachment: ${pscNetworkAttachment.id}
          unreachableCidrBlock: 192.168.0.0/25
  psc:
    type: gcp:compute:Network
    properties:
      name: datafusion-psc-network
      autoCreateSubnetworks: false
  pscSubnetwork:
    type: gcp:compute:Subnetwork
    name: psc
    properties:
      name: datafusion-psc-subnet
      region: us-central1
      network: ${psc.id}
      ipCidrRange: 10.0.0.0/16
  pscNetworkAttachment:
    type: gcp:compute:NetworkAttachment
    name: psc
    properties:
      name: datafusion-psc-attachment
      region: us-central1
      connectionPreference: ACCEPT_AUTOMATIC
      subnetworks:
        - ${pscSubnetwork.selfLink}

The connectionType property switches from VPC peering to Private Service Connect. The privateServiceConnectConfig block references a NetworkAttachment that defines which subnets can reach the instance. The unreachableCidrBlock reserves an IP range for internal routing that won’t conflict with your VPC.

Encrypt data with customer-managed keys

Organizations with strict governance requirements use customer-managed encryption keys (CMEK) to control data encryption.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const keyRing = new gcp.kms.KeyRing("key_ring", {
    name: "my-instance",
    location: "us-central1",
});
const cryptoKey = new gcp.kms.CryptoKey("crypto_key", {
    name: "my-instance",
    keyRing: keyRing.id,
});
const project = gcp.organizations.getProject({});
const cryptoKeyMemberCdfSa = new gcp.kms.CryptoKeyIAMMember("crypto_key_member_cdf_sa", {
    cryptoKeyId: cryptoKey.id,
    role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member: project.then(project => `serviceAccount:service-${project.number}@gcp-sa-datafusion.iam.gserviceaccount.com`),
});
const cryptoKeyMemberGcsSa = new gcp.kms.CryptoKeyIAMMember("crypto_key_member_gcs_sa", {
    cryptoKeyId: cryptoKey.id,
    role: "roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member: project.then(project => `serviceAccount:service-${project.number}@gs-project-accounts.iam.gserviceaccount.com`),
});
const cmek = new gcp.datafusion.Instance("cmek", {
    name: "my-instance",
    region: "us-central1",
    type: "BASIC",
    cryptoKeyConfig: {
        keyReference: cryptoKey.id,
    },
}, {
    dependsOn: [
        cryptoKeyMemberCdfSa,
        cryptoKeyMemberGcsSa,
    ],
});
import pulumi
import pulumi_gcp as gcp

key_ring = gcp.kms.KeyRing("key_ring",
    name="my-instance",
    location="us-central1")
crypto_key = gcp.kms.CryptoKey("crypto_key",
    name="my-instance",
    key_ring=key_ring.id)
project = gcp.organizations.get_project()
crypto_key_member_cdf_sa = gcp.kms.CryptoKeyIAMMember("crypto_key_member_cdf_sa",
    crypto_key_id=crypto_key.id,
    role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member=f"serviceAccount:service-{project.number}@gcp-sa-datafusion.iam.gserviceaccount.com")
crypto_key_member_gcs_sa = gcp.kms.CryptoKeyIAMMember("crypto_key_member_gcs_sa",
    crypto_key_id=crypto_key.id,
    role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
    member=f"serviceAccount:service-{project.number}@gs-project-accounts.iam.gserviceaccount.com")
cmek = gcp.datafusion.Instance("cmek",
    name="my-instance",
    region="us-central1",
    type="BASIC",
    crypto_key_config={
        "key_reference": crypto_key.id,
    },
    opts = pulumi.ResourceOptions(depends_on=[
            crypto_key_member_cdf_sa,
            crypto_key_member_gcs_sa,
        ]))
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/kms"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		keyRing, err := kms.NewKeyRing(ctx, "key_ring", &kms.KeyRingArgs{
			Name:     pulumi.String("my-instance"),
			Location: pulumi.String("us-central1"),
		})
		if err != nil {
			return err
		}
		cryptoKey, err := kms.NewCryptoKey(ctx, "crypto_key", &kms.CryptoKeyArgs{
			Name:    pulumi.String("my-instance"),
			KeyRing: keyRing.ID(),
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		cryptoKeyMemberCdfSa, err := kms.NewCryptoKeyIAMMember(ctx, "crypto_key_member_cdf_sa", &kms.CryptoKeyIAMMemberArgs{
			CryptoKeyId: cryptoKey.ID(),
			Role:        pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
			Member:      pulumi.Sprintf("serviceAccount:service-%v@gcp-sa-datafusion.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		cryptoKeyMemberGcsSa, err := kms.NewCryptoKeyIAMMember(ctx, "crypto_key_member_gcs_sa", &kms.CryptoKeyIAMMemberArgs{
			CryptoKeyId: cryptoKey.ID(),
			Role:        pulumi.String("roles/cloudkms.cryptoKeyEncrypterDecrypter"),
			Member:      pulumi.Sprintf("serviceAccount:service-%v@gs-project-accounts.iam.gserviceaccount.com", project.Number),
		})
		if err != nil {
			return err
		}
		_, err = datafusion.NewInstance(ctx, "cmek", &datafusion.InstanceArgs{
			Name:   pulumi.String("my-instance"),
			Region: pulumi.String("us-central1"),
			Type:   pulumi.String("BASIC"),
			CryptoKeyConfig: &datafusion.InstanceCryptoKeyConfigArgs{
				KeyReference: cryptoKey.ID(),
			},
		}, pulumi.DependsOn([]pulumi.Resource{
			cryptoKeyMemberCdfSa,
			cryptoKeyMemberGcsSa,
		}))
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var keyRing = new Gcp.Kms.KeyRing("key_ring", new()
    {
        Name = "my-instance",
        Location = "us-central1",
    });

    var cryptoKey = new Gcp.Kms.CryptoKey("crypto_key", new()
    {
        Name = "my-instance",
        KeyRing = keyRing.Id,
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var cryptoKeyMemberCdfSa = new Gcp.Kms.CryptoKeyIAMMember("crypto_key_member_cdf_sa", new()
    {
        CryptoKeyId = cryptoKey.Id,
        Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gcp-sa-datafusion.iam.gserviceaccount.com",
    });

    var cryptoKeyMemberGcsSa = new Gcp.Kms.CryptoKeyIAMMember("crypto_key_member_gcs_sa", new()
    {
        CryptoKeyId = cryptoKey.Id,
        Role = "roles/cloudkms.cryptoKeyEncrypterDecrypter",
        Member = $"serviceAccount:service-{project.Apply(getProjectResult => getProjectResult.Number)}@gs-project-accounts.iam.gserviceaccount.com",
    });

    var cmek = new Gcp.DataFusion.Instance("cmek", new()
    {
        Name = "my-instance",
        Region = "us-central1",
        Type = "BASIC",
        CryptoKeyConfig = new Gcp.DataFusion.Inputs.InstanceCryptoKeyConfigArgs
        {
            KeyReference = cryptoKey.Id,
        },
    }, new CustomResourceOptions
    {
        DependsOn =
        {
            cryptoKeyMemberCdfSa,
            cryptoKeyMemberGcsSa,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.kms.KeyRing;
import com.pulumi.gcp.kms.KeyRingArgs;
import com.pulumi.gcp.kms.CryptoKey;
import com.pulumi.gcp.kms.CryptoKeyArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.kms.CryptoKeyIAMMember;
import com.pulumi.gcp.kms.CryptoKeyIAMMemberArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceCryptoKeyConfigArgs;
import com.pulumi.resources.CustomResourceOptions;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var keyRing = new KeyRing("keyRing", KeyRingArgs.builder()
            .name("my-instance")
            .location("us-central1")
            .build());

        var cryptoKey = new CryptoKey("cryptoKey", CryptoKeyArgs.builder()
            .name("my-instance")
            .keyRing(keyRing.id())
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var cryptoKeyMemberCdfSa = new CryptoKeyIAMMember("cryptoKeyMemberCdfSa", CryptoKeyIAMMemberArgs.builder()
            .cryptoKeyId(cryptoKey.id())
            .role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
            .member(String.format("serviceAccount:service-%s@gcp-sa-datafusion.iam.gserviceaccount.com", project.number()))
            .build());

        var cryptoKeyMemberGcsSa = new CryptoKeyIAMMember("cryptoKeyMemberGcsSa", CryptoKeyIAMMemberArgs.builder()
            .cryptoKeyId(cryptoKey.id())
            .role("roles/cloudkms.cryptoKeyEncrypterDecrypter")
            .member(String.format("serviceAccount:service-%s@gs-project-accounts.iam.gserviceaccount.com", project.number()))
            .build());

        var cmek = new Instance("cmek", InstanceArgs.builder()
            .name("my-instance")
            .region("us-central1")
            .type("BASIC")
            .cryptoKeyConfig(InstanceCryptoKeyConfigArgs.builder()
                .keyReference(cryptoKey.id())
                .build())
            .build(), CustomResourceOptions.builder()
                .dependsOn(                
                    cryptoKeyMemberCdfSa,
                    cryptoKeyMemberGcsSa)
                .build());

    }
}
resources:
  cmek:
    type: gcp:datafusion:Instance
    properties:
      name: my-instance
      region: us-central1
      type: BASIC
      cryptoKeyConfig:
        keyReference: ${cryptoKey.id}
    options:
      dependsOn:
        - ${cryptoKeyMemberCdfSa}
        - ${cryptoKeyMemberGcsSa}
  cryptoKey:
    type: gcp:kms:CryptoKey
    name: crypto_key
    properties:
      name: my-instance
      keyRing: ${keyRing.id}
  keyRing:
    type: gcp:kms:KeyRing
    name: key_ring
    properties:
      name: my-instance
      location: us-central1
  cryptoKeyMemberCdfSa:
    type: gcp:kms:CryptoKeyIAMMember
    name: crypto_key_member_cdf_sa
    properties:
      cryptoKeyId: ${cryptoKey.id}
      role: roles/cloudkms.cryptoKeyEncrypterDecrypter
      member: serviceAccount:service-${project.number}@gcp-sa-datafusion.iam.gserviceaccount.com
  cryptoKeyMemberGcsSa:
    type: gcp:kms:CryptoKeyIAMMember
    name: crypto_key_member_gcs_sa
    properties:
      cryptoKeyId: ${cryptoKey.id}
      role: roles/cloudkms.cryptoKeyEncrypterDecrypter
      member: serviceAccount:service-${project.number}@gs-project-accounts.iam.gserviceaccount.com
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The cryptoKeyConfig block references a KMS CryptoKey for encrypting data at rest. Data Fusion requires IAM permissions on the key for both its own service account and the GCS service account. The dependsOn ensures IAM bindings exist before creating the instance. The service account emails use the project number, which you retrieve from the project data source.

Enable role-based access control with Enterprise tier

Enterprise instances provide streaming pipeline support and granular role-based access control for multi-team environments.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const enterpriseInstance = new gcp.datafusion.Instance("enterprise_instance", {
    name: "my-instance",
    region: "us-central1",
    type: "ENTERPRISE",
    enableRbac: true,
});
import pulumi
import pulumi_gcp as gcp

enterprise_instance = gcp.datafusion.Instance("enterprise_instance",
    name="my-instance",
    region="us-central1",
    type="ENTERPRISE",
    enable_rbac=True)
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := datafusion.NewInstance(ctx, "enterprise_instance", &datafusion.InstanceArgs{
			Name:       pulumi.String("my-instance"),
			Region:     pulumi.String("us-central1"),
			Type:       pulumi.String("ENTERPRISE"),
			EnableRbac: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var enterpriseInstance = new Gcp.DataFusion.Instance("enterprise_instance", new()
    {
        Name = "my-instance",
        Region = "us-central1",
        Type = "ENTERPRISE",
        EnableRbac = true,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var enterpriseInstance = new Instance("enterpriseInstance", InstanceArgs.builder()
            .name("my-instance")
            .region("us-central1")
            .type("ENTERPRISE")
            .enableRbac(true)
            .build());

    }
}
resources:
  enterpriseInstance:
    type: gcp:datafusion:Instance
    name: enterprise_instance
    properties:
      name: my-instance
      region: us-central1
      type: ENTERPRISE
      enableRbac: true

The ENTERPRISE type unlocks advanced features including streaming pipelines and higher concurrent pipeline limits. Setting enableRbac to true enables granular permissions for different user roles within the Data Fusion UI.

Publish pipeline events to Pub/Sub

Teams monitoring pipeline execution can stream Data Fusion events to Pub/Sub for integration with alerting or cataloging systems.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const eventTopic = new gcp.pubsub.Topic("event", {name: "my-instance"});
const event = new gcp.datafusion.Instance("event", {
    name: "my-instance",
    region: "us-central1",
    type: "BASIC",
    eventPublishConfig: {
        enabled: true,
        topic: eventTopic.id,
    },
});
import pulumi
import pulumi_gcp as gcp

event_topic = gcp.pubsub.Topic("event", name="my-instance")
event = gcp.datafusion.Instance("event",
    name="my-instance",
    region="us-central1",
    type="BASIC",
    event_publish_config={
        "enabled": True,
        "topic": event_topic.id,
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		eventTopic, err := pubsub.NewTopic(ctx, "event", &pubsub.TopicArgs{
			Name: pulumi.String("my-instance"),
		})
		if err != nil {
			return err
		}
		_, err = datafusion.NewInstance(ctx, "event", &datafusion.InstanceArgs{
			Name:   pulumi.String("my-instance"),
			Region: pulumi.String("us-central1"),
			Type:   pulumi.String("BASIC"),
			EventPublishConfig: &datafusion.InstanceEventPublishConfigArgs{
				Enabled: pulumi.Bool(true),
				Topic:   eventTopic.ID(),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var eventTopic = new Gcp.PubSub.Topic("event", new()
    {
        Name = "my-instance",
    });

    var @event = new Gcp.DataFusion.Instance("event", new()
    {
        Name = "my-instance",
        Region = "us-central1",
        Type = "BASIC",
        EventPublishConfig = new Gcp.DataFusion.Inputs.InstanceEventPublishConfigArgs
        {
            Enabled = true,
            Topic = eventTopic.Id,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import com.pulumi.gcp.datafusion.inputs.InstanceEventPublishConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var eventTopic = new Topic("eventTopic", TopicArgs.builder()
            .name("my-instance")
            .build());

        var event = new Instance("event", InstanceArgs.builder()
            .name("my-instance")
            .region("us-central1")
            .type("BASIC")
            .eventPublishConfig(InstanceEventPublishConfigArgs.builder()
                .enabled(true)
                .topic(eventTopic.id())
                .build())
            .build());

    }
}
resources:
  event:
    type: gcp:datafusion:Instance
    properties:
      name: my-instance
      region: us-central1
      type: BASIC
      eventPublishConfig:
        enabled: true
        topic: ${eventTopic.id}
  eventTopic:
    type: gcp:pubsub:Topic
    name: event
    properties:
      name: my-instance

The eventPublishConfig block enables event publishing and specifies the destination Pub/Sub topic. Data Fusion publishes pipeline lifecycle events (start, success, failure) to this topic for downstream processing.

Pin a developer instance to a specific zone

Developer instances support zone placement for testing workflows that need to colocate with other zonal resources.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const zone = new gcp.datafusion.Instance("zone", {
    name: "my-instance",
    region: "us-central1",
    zone: "us-central1-a",
    type: "DEVELOPER",
});
import pulumi
import pulumi_gcp as gcp

zone = gcp.datafusion.Instance("zone",
    name="my-instance",
    region="us-central1",
    zone="us-central1-a",
    type="DEVELOPER")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/datafusion"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := datafusion.NewInstance(ctx, "zone", &datafusion.InstanceArgs{
			Name:   pulumi.String("my-instance"),
			Region: pulumi.String("us-central1"),
			Zone:   pulumi.String("us-central1-a"),
			Type:   pulumi.String("DEVELOPER"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var zone = new Gcp.DataFusion.Instance("zone", new()
    {
        Name = "my-instance",
        Region = "us-central1",
        Zone = "us-central1-a",
        Type = "DEVELOPER",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.datafusion.Instance;
import com.pulumi.gcp.datafusion.InstanceArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var zone = new Instance("zone", InstanceArgs.builder()
            .name("my-instance")
            .region("us-central1")
            .zone("us-central1-a")
            .type("DEVELOPER")
            .build());

    }
}
resources:
  zone:
    type: gcp:datafusion:Instance
    properties:
      name: my-instance
      region: us-central1
      zone: us-central1-a
      type: DEVELOPER

The DEVELOPER type provides all features with restrictive capacity limits for low-cost development. The zone property pins the instance to a specific zone, which only DEVELOPER instances support.

Beyond these examples

These snippets focus on specific Data Fusion instance features: instance tiers, private networking (VPC peering and Private Service Connect), and customer-managed encryption and event publishing. They’re intentionally minimal rather than full data platform deployments.

The examples may reference pre-existing infrastructure such as VPC networks, subnets, and network attachments, KMS keys with IAM bindings, and Pub/Sub topics. They focus on configuring the instance rather than provisioning everything around it.

To keep things focused, common instance patterns are omitted, including:

  • Dataproc service account configuration (dataprocServiceAccount)
  • Stackdriver logging and monitoring (enableStackdriverLogging, enableStackdriverMonitoring)
  • Resource labels and tags (labels, tags)
  • Accelerators for CDC and other features

These omissions are intentional: the goal is to illustrate how each Data Fusion feature is wired, not provide drop-in data platform modules. See the Data Fusion Instance resource reference for all available configuration options.

Let's deploy GCP Data Fusion Instances

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Instance Types & Configuration
What's the difference between BASIC, ENTERPRISE, and DEVELOPER instance types?
BASIC supports point-and-click data pipelines with limitations (no streaming, fewer concurrent pipelines). ENTERPRISE adds streaming pipeline support and higher concurrency. DEVELOPER includes all features but with restrictive capacity, designed for low-cost development and testing.
What properties are immutable after instance creation?
These properties cannot be changed after creation: name, region, type, zone, project, options, cryptoKeyConfig, dataprocServiceAccount, description, displayName, networkConfig, privateInstance, and tags. Changing any of these requires recreating the instance.
When should I use the zone field?
The zone field only applies to DEVELOPER instance types. It specifies the zone where the instance will be created. BASIC and ENTERPRISE types don’t use this field.
Private Networking
How do I create a private Data Fusion instance?
Set privateInstance to true and configure networkConfig with your network and IP allocation. Private instances have private IP addresses only and cannot access the public internet.
How do I configure Private Service Connect for my instance?
Set networkConfig.connectionType to PRIVATE_SERVICE_CONNECT_INTERFACES and configure privateServiceConnectConfig with a networkAttachment and unreachableCidrBlock.
Security & Encryption
What IAM permissions are required for customer-managed encryption (CMEK)?

Grant roles/cloudkms.cryptoKeyEncrypterDecrypter to both service accounts:

  1. Data Fusion: service-PROJECT_NUMBER@gcp-sa-datafusion.iam.gserviceaccount.com
  2. Cloud Storage: service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com

Use dependsOn to ensure IAM bindings are created before the instance.

Can I enable RBAC on any instance type?
The enableRbac property enables granular role-based access control. The schema examples show it used with ENTERPRISE type instances.
Common Issues & Gotchas
Why am I seeing a permanent diff when I enable accelerators?
Enabling accelerators may create a permanent diff with the options field. You can either manually update your state file to include the diffed options, or add the options field to a lifecycle ignore_changes block.
What's the difference between labels and effectiveLabels?
The labels field is non-authoritative and only manages labels present in your configuration. Use effectiveLabels to see all labels on the resource, including those set by other clients and services.
Why is serviceAccount deprecated?
The serviceAccount output is deprecated and will be removed in a future release. Use tenantProjectId instead to extract the tenant project ID.
How do I enable event publishing for my instance?
Configure eventPublishConfig with enabled set to true and specify a Pub/Sub topic ID for publishing events.

Using a different cloud?

Explore analytics guides for other cloud providers: