Deploy GCP Managed Service for Apache Kafka Clusters

The gcp:managedkafka/cluster:Cluster resource, part of the Pulumi GCP provider, provisions a Managed Service for Apache Kafka cluster: its compute capacity, network placement, and security configuration. This guide focuses on three capabilities: capacity and network configuration, mutual TLS authentication, and customer-managed encryption keys.

Kafka clusters require VPC subnets for network access and may reference Certificate Authority pools or KMS keys for security features. The examples are intentionally small. Combine them with your own VPC infrastructure, security policies, and monitoring.

Create a cluster with capacity and network configuration

Most deployments start by defining cluster capacity and connecting to a VPC subnet for private network access.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const project = gcp.organizations.getProject({});
const example = new gcp.managedkafka.Cluster("example", {
    clusterId: "my-cluster",
    location: "us-central1",
    capacityConfig: {
        vcpuCount: "3",
        memoryBytes: "3221225472",
    },
    gcpConfig: {
        accessConfig: {
            networkConfigs: [{
                subnet: project.then(project => `projects/${project.number}/regions/us-central1/subnetworks/default`),
            }],
        },
    },
    rebalanceConfig: {
        mode: "AUTO_REBALANCE_ON_SCALE_UP",
    },
    labels: {
        key: "value",
    },
});
import pulumi
import pulumi_gcp as gcp

project = gcp.organizations.get_project()
example = gcp.managedkafka.Cluster("example",
    cluster_id="my-cluster",
    location="us-central1",
    capacity_config={
        "vcpu_count": "3",
        "memory_bytes": "3221225472",
    },
    gcp_config={
        "access_config": {
            "network_configs": [{
                "subnet": f"projects/{project.number}/regions/us-central1/subnetworks/default",
            }],
        },
    },
    rebalance_config={
        "mode": "AUTO_REBALANCE_ON_SCALE_UP",
    },
    labels={
        "key": "value",
    })
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/managedkafka"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		_, err = managedkafka.NewCluster(ctx, "example", &managedkafka.ClusterArgs{
			ClusterId: pulumi.String("my-cluster"),
			Location:  pulumi.String("us-central1"),
			CapacityConfig: &managedkafka.ClusterCapacityConfigArgs{
				VcpuCount:   pulumi.String("3"),
				MemoryBytes: pulumi.String("3221225472"),
			},
			GcpConfig: &managedkafka.ClusterGcpConfigArgs{
				AccessConfig: &managedkafka.ClusterGcpConfigAccessConfigArgs{
					NetworkConfigs: managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArray{
						&managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArgs{
							Subnet: pulumi.Sprintf("projects/%v/regions/us-central1/subnetworks/default", project.Number),
						},
					},
				},
			},
			RebalanceConfig: &managedkafka.ClusterRebalanceConfigArgs{
				Mode: pulumi.String("AUTO_REBALANCE_ON_SCALE_UP"),
			},
			Labels: pulumi.StringMap{
				"key": pulumi.String("value"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.ManagedKafka.Cluster("example", new()
    {
        ClusterId = "my-cluster",
        Location = "us-central1",
        CapacityConfig = new Gcp.ManagedKafka.Inputs.ClusterCapacityConfigArgs
        {
            VcpuCount = "3",
            MemoryBytes = "3221225472",
        },
        GcpConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigArgs
        {
            AccessConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigArgs
            {
                NetworkConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigNetworkConfigArgs
                    {
                        Subnet = $"projects/{project.Apply(getProjectResult => getProjectResult.Number)}/regions/us-central1/subnetworks/default",
                    },
                },
            },
        },
        RebalanceConfig = new Gcp.ManagedKafka.Inputs.ClusterRebalanceConfigArgs
        {
            Mode = "AUTO_REBALANCE_ON_SCALE_UP",
        },
        Labels = 
        {
            { "key", "value" },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.managedkafka.Cluster;
import com.pulumi.gcp.managedkafka.ClusterArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterCapacityConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigAccessConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterRebalanceConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Cluster("example", ClusterArgs.builder()
            .clusterId("my-cluster")
            .location("us-central1")
            .capacityConfig(ClusterCapacityConfigArgs.builder()
                .vcpuCount("3")
                .memoryBytes("3221225472")
                .build())
            .gcpConfig(ClusterGcpConfigArgs.builder()
                .accessConfig(ClusterGcpConfigAccessConfigArgs.builder()
                    .networkConfigs(ClusterGcpConfigAccessConfigNetworkConfigArgs.builder()
                        .subnet(String.format("projects/%s/regions/us-central1/subnetworks/default", project.number()))
                        .build())
                    .build())
                .build())
            .rebalanceConfig(ClusterRebalanceConfigArgs.builder()
                .mode("AUTO_REBALANCE_ON_SCALE_UP")
                .build())
            .labels(Map.of("key", "value"))
            .build());

    }
}
resources:
  example:
    type: gcp:managedkafka:Cluster
    properties:
      clusterId: my-cluster
      location: us-central1
      capacityConfig:
        vcpuCount: 3
        memoryBytes: 3.221225472e+09
      gcpConfig:
        accessConfig:
          networkConfigs:
            - subnet: projects/${project.number}/regions/us-central1/subnetworks/default
      rebalanceConfig:
        mode: AUTO_REBALANCE_ON_SCALE_UP
      labels:
        key: value
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The capacityConfig property sets compute resources: vcpuCount defines the number of virtual CPUs, and memoryBytes allocates memory in bytes. The gcpConfig.accessConfig.networkConfigs array specifies which VPC subnets the cluster joins. The rebalanceConfig.mode controls how Kafka redistributes partitions when you scale the cluster; AUTO_REBALANCE_ON_SCALE_UP automatically rebalances when adding capacity.

Enable mutual TLS authentication with certificate authority

Applications requiring client certificate authentication configure mTLS by connecting to a Certificate Authority pool.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const caPool = new gcp.certificateauthority.CaPool("ca_pool", {
    name: "my-ca-pool",
    location: "us-central1",
    tier: "ENTERPRISE",
    publishingOptions: {
        publishCaCert: true,
        publishCrl: true,
    },
});
const project = gcp.organizations.getProject({});
const example = new gcp.managedkafka.Cluster("example", {
    clusterId: "my-cluster",
    location: "us-central1",
    capacityConfig: {
        vcpuCount: "3",
        memoryBytes: "3221225472",
    },
    gcpConfig: {
        accessConfig: {
            networkConfigs: [{
                subnet: project.then(project => `projects/${project.number}/regions/us-central1/subnetworks/default`),
            }],
        },
    },
    tlsConfig: {
        trustConfig: {
            casConfigs: [{
                caPool: caPool.id,
            }],
        },
        sslPrincipalMappingRules: "RULE:pattern/replacement/L,DEFAULT",
    },
});
import pulumi
import pulumi_gcp as gcp

ca_pool = gcp.certificateauthority.CaPool("ca_pool",
    name="my-ca-pool",
    location="us-central1",
    tier="ENTERPRISE",
    publishing_options={
        "publish_ca_cert": True,
        "publish_crl": True,
    })
project = gcp.organizations.get_project()
example = gcp.managedkafka.Cluster("example",
    cluster_id="my-cluster",
    location="us-central1",
    capacity_config={
        "vcpu_count": "3",
        "memory_bytes": "3221225472",
    },
    gcp_config={
        "access_config": {
            "network_configs": [{
                "subnet": f"projects/{project.number}/regions/us-central1/subnetworks/default",
            }],
        },
    },
    tls_config={
        "trust_config": {
            "cas_configs": [{
                "ca_pool": ca_pool.id,
            }],
        },
        "ssl_principal_mapping_rules": "RULE:pattern/replacement/L,DEFAULT",
    })
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/certificateauthority"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/managedkafka"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		caPool, err := certificateauthority.NewCaPool(ctx, "ca_pool", &certificateauthority.CaPoolArgs{
			Name:     pulumi.String("my-ca-pool"),
			Location: pulumi.String("us-central1"),
			Tier:     pulumi.String("ENTERPRISE"),
			PublishingOptions: &certificateauthority.CaPoolPublishingOptionsArgs{
				PublishCaCert: pulumi.Bool(true),
				PublishCrl:    pulumi.Bool(true),
			},
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		_, err = managedkafka.NewCluster(ctx, "example", &managedkafka.ClusterArgs{
			ClusterId: pulumi.String("my-cluster"),
			Location:  pulumi.String("us-central1"),
			CapacityConfig: &managedkafka.ClusterCapacityConfigArgs{
				VcpuCount:   pulumi.String("3"),
				MemoryBytes: pulumi.String("3221225472"),
			},
			GcpConfig: &managedkafka.ClusterGcpConfigArgs{
				AccessConfig: &managedkafka.ClusterGcpConfigAccessConfigArgs{
					NetworkConfigs: managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArray{
						&managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArgs{
							Subnet: pulumi.Sprintf("projects/%v/regions/us-central1/subnetworks/default", project.Number),
						},
					},
				},
			},
			TlsConfig: &managedkafka.ClusterTlsConfigArgs{
				TrustConfig: &managedkafka.ClusterTlsConfigTrustConfigArgs{
					CasConfigs: managedkafka.ClusterTlsConfigTrustConfigCasConfigArray{
						&managedkafka.ClusterTlsConfigTrustConfigCasConfigArgs{
							CaPool: caPool.ID(),
						},
					},
				},
				SslPrincipalMappingRules: pulumi.String("RULE:pattern/replacement/L,DEFAULT"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var caPool = new Gcp.CertificateAuthority.CaPool("ca_pool", new()
    {
        Name = "my-ca-pool",
        Location = "us-central1",
        Tier = "ENTERPRISE",
        PublishingOptions = new Gcp.CertificateAuthority.Inputs.CaPoolPublishingOptionsArgs
        {
            PublishCaCert = true,
            PublishCrl = true,
        },
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.ManagedKafka.Cluster("example", new()
    {
        ClusterId = "my-cluster",
        Location = "us-central1",
        CapacityConfig = new Gcp.ManagedKafka.Inputs.ClusterCapacityConfigArgs
        {
            VcpuCount = "3",
            MemoryBytes = "3221225472",
        },
        GcpConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigArgs
        {
            AccessConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigArgs
            {
                NetworkConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigNetworkConfigArgs
                    {
                        Subnet = $"projects/{project.Apply(getProjectResult => getProjectResult.Number)}/regions/us-central1/subnetworks/default",
                    },
                },
            },
        },
        TlsConfig = new Gcp.ManagedKafka.Inputs.ClusterTlsConfigArgs
        {
            TrustConfig = new Gcp.ManagedKafka.Inputs.ClusterTlsConfigTrustConfigArgs
            {
                CasConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterTlsConfigTrustConfigCasConfigArgs
                    {
                        CaPool = caPool.Id,
                    },
                },
            },
            SslPrincipalMappingRules = "RULE:pattern/replacement/L,DEFAULT",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.certificateauthority.CaPool;
import com.pulumi.gcp.certificateauthority.CaPoolArgs;
import com.pulumi.gcp.certificateauthority.inputs.CaPoolPublishingOptionsArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.managedkafka.Cluster;
import com.pulumi.gcp.managedkafka.ClusterArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterCapacityConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigAccessConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterTlsConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterTlsConfigTrustConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var caPool = new CaPool("caPool", CaPoolArgs.builder()
            .name("my-ca-pool")
            .location("us-central1")
            .tier("ENTERPRISE")
            .publishingOptions(CaPoolPublishingOptionsArgs.builder()
                .publishCaCert(true)
                .publishCrl(true)
                .build())
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Cluster("example", ClusterArgs.builder()
            .clusterId("my-cluster")
            .location("us-central1")
            .capacityConfig(ClusterCapacityConfigArgs.builder()
                .vcpuCount("3")
                .memoryBytes("3221225472")
                .build())
            .gcpConfig(ClusterGcpConfigArgs.builder()
                .accessConfig(ClusterGcpConfigAccessConfigArgs.builder()
                    .networkConfigs(ClusterGcpConfigAccessConfigNetworkConfigArgs.builder()
                        .subnet(String.format("projects/%s/regions/us-central1/subnetworks/default", project.number()))
                        .build())
                    .build())
                .build())
            .tlsConfig(ClusterTlsConfigArgs.builder()
                .trustConfig(ClusterTlsConfigTrustConfigArgs.builder()
                    .casConfigs(ClusterTlsConfigTrustConfigCasConfigArgs.builder()
                        .caPool(caPool.id())
                        .build())
                    .build())
                .sslPrincipalMappingRules("RULE:pattern/replacement/L,DEFAULT")
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:managedkafka:Cluster
    properties:
      clusterId: my-cluster
      location: us-central1
      capacityConfig:
        vcpuCount: 3
        memoryBytes: 3.221225472e+09
      gcpConfig:
        accessConfig:
          networkConfigs:
            - subnet: projects/${project.number}/regions/us-central1/subnetworks/default
      tlsConfig:
        trustConfig:
          casConfigs:
            - caPool: ${caPool.id}
        sslPrincipalMappingRules: RULE:pattern/replacement/L,DEFAULT
  caPool:
    type: gcp:certificateauthority:CaPool
    name: ca_pool
    properties:
      name: my-ca-pool
      location: us-central1
      tier: ENTERPRISE
      publishingOptions:
        publishCaCert: true
        publishCrl: true
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The tlsConfig.trustConfig.casConfigs array references Certificate Authority pools that issue client certificates. The sslPrincipalMappingRules property defines how certificate subject names map to Kafka principals; the example uses a pattern/replacement rule with a DEFAULT fallback. This configuration requires clients to present valid certificates from the specified CA pool.

Encrypt cluster data with customer-managed keys

Organizations with encryption requirements use customer-managed encryption keys to control data encryption at rest.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const project = gcp.organizations.getProject({});
const example = new gcp.managedkafka.Cluster("example", {
    clusterId: "my-cluster",
    location: "us-central1",
    capacityConfig: {
        vcpuCount: "3",
        memoryBytes: "3221225472",
    },
    gcpConfig: {
        accessConfig: {
            networkConfigs: [{
                subnet: project.then(project => `projects/${project.number}/regions/us-central1/subnetworks/default`),
            }],
        },
        kmsKey: "example-key",
    },
});
const kafkaServiceIdentity = new gcp.projects.ServiceIdentity("kafka_service_identity", {
    project: project.then(project => project.projectId),
    service: "managedkafka.googleapis.com",
});
import pulumi
import pulumi_gcp as gcp

project = gcp.organizations.get_project()
example = gcp.managedkafka.Cluster("example",
    cluster_id="my-cluster",
    location="us-central1",
    capacity_config={
        "vcpu_count": "3",
        "memory_bytes": "3221225472",
    },
    gcp_config={
        "access_config": {
            "network_configs": [{
                "subnet": f"projects/{project.number}/regions/us-central1/subnetworks/default",
            }],
        },
        "kms_key": "example-key",
    })
kafka_service_identity = gcp.projects.ServiceIdentity("kafka_service_identity",
    project=project.project_id,
    service="managedkafka.googleapis.com")
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/managedkafka"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/projects"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		_, err = managedkafka.NewCluster(ctx, "example", &managedkafka.ClusterArgs{
			ClusterId: pulumi.String("my-cluster"),
			Location:  pulumi.String("us-central1"),
			CapacityConfig: &managedkafka.ClusterCapacityConfigArgs{
				VcpuCount:   pulumi.String("3"),
				MemoryBytes: pulumi.String("3221225472"),
			},
			GcpConfig: &managedkafka.ClusterGcpConfigArgs{
				AccessConfig: &managedkafka.ClusterGcpConfigAccessConfigArgs{
					NetworkConfigs: managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArray{
						&managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArgs{
							Subnet: pulumi.Sprintf("projects/%v/regions/us-central1/subnetworks/default", project.Number),
						},
					},
				},
				KmsKey: pulumi.String("example-key"),
			},
		})
		if err != nil {
			return err
		}
		_, err = projects.NewServiceIdentity(ctx, "kafka_service_identity", &projects.ServiceIdentityArgs{
			Project: pulumi.String(project.ProjectId),
			Service: pulumi.String("managedkafka.googleapis.com"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.ManagedKafka.Cluster("example", new()
    {
        ClusterId = "my-cluster",
        Location = "us-central1",
        CapacityConfig = new Gcp.ManagedKafka.Inputs.ClusterCapacityConfigArgs
        {
            VcpuCount = "3",
            MemoryBytes = "3221225472",
        },
        GcpConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigArgs
        {
            AccessConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigArgs
            {
                NetworkConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigNetworkConfigArgs
                    {
                        Subnet = $"projects/{project.Apply(getProjectResult => getProjectResult.Number)}/regions/us-central1/subnetworks/default",
                    },
                },
            },
            KmsKey = "example-key",
        },
    });

    var kafkaServiceIdentity = new Gcp.Projects.ServiceIdentity("kafka_service_identity", new()
    {
        Project = project.Apply(getProjectResult => getProjectResult.ProjectId),
        Service = "managedkafka.googleapis.com",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.managedkafka.Cluster;
import com.pulumi.gcp.managedkafka.ClusterArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterCapacityConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigAccessConfigArgs;
import com.pulumi.gcp.projects.ServiceIdentity;
import com.pulumi.gcp.projects.ServiceIdentityArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Cluster("example", ClusterArgs.builder()
            .clusterId("my-cluster")
            .location("us-central1")
            .capacityConfig(ClusterCapacityConfigArgs.builder()
                .vcpuCount("3")
                .memoryBytes("3221225472")
                .build())
            .gcpConfig(ClusterGcpConfigArgs.builder()
                .accessConfig(ClusterGcpConfigAccessConfigArgs.builder()
                    .networkConfigs(ClusterGcpConfigAccessConfigNetworkConfigArgs.builder()
                        .subnet(String.format("projects/%s/regions/us-central1/subnetworks/default", project.number()))
                        .build())
                    .build())
                .kmsKey("example-key")
                .build())
            .build());

        var kafkaServiceIdentity = new ServiceIdentity("kafkaServiceIdentity", ServiceIdentityArgs.builder()
            .project(project.projectId())
            .service("managedkafka.googleapis.com")
            .build());

    }
}
resources:
  example:
    type: gcp:managedkafka:Cluster
    properties:
      clusterId: my-cluster
      location: us-central1
      capacityConfig:
        vcpuCount: 3
        memoryBytes: 3.221225472e+09
      gcpConfig:
        accessConfig:
          networkConfigs:
            - subnet: projects/${project.number}/regions/us-central1/subnetworks/default
        kmsKey: example-key
  kafkaServiceIdentity:
    type: gcp:projects:ServiceIdentity
    name: kafka_service_identity
    properties:
      project: ${project.projectId}
      service: managedkafka.googleapis.com
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The gcpConfig.kmsKey property references a Cloud KMS key that encrypts cluster data. The ServiceIdentity resource grants the Managed Kafka service permission to use your KMS key for encryption operations. Without this identity and permission, the cluster cannot access the encryption key.

Beyond these examples

These snippets focus on specific cluster-level features: capacity and network configuration, mTLS authentication, and customer-managed encryption. They’re intentionally minimal rather than full Kafka deployments.

The examples may reference pre-existing infrastructure such as VPC subnets (default subnet in us-central1) and KMS encryption keys (for CMEK example). They focus on configuring the cluster rather than provisioning the surrounding infrastructure.

To keep things focused, common cluster patterns are omitted, including:

  • Broker-level capacity tuning (brokerCapacityConfig)
  • Rebalancing behavior (rebalanceConfig)
  • Custom labels and metadata
  • Multi-region or multi-subnet configurations

These omissions are intentional: the goal is to illustrate how each cluster feature is wired, not provide drop-in Kafka modules. See the Managed Kafka Cluster resource reference for all available configuration options.

Let's deploy GCP Managed Service for Apache Kafka Clusters

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Setup
What are the minimum required fields to create a Kafka cluster?
You need clusterId, location, capacityConfig (with vcpuCount and memoryBytes), and gcpConfig with at least one subnet in networkConfigs.
What are the naming requirements for clusterId?
The clusterId must be 1-63 characters long and match the regex a-z? to comply with RFC 1035. Use only lowercase letters, numbers, and hyphens.
What properties are immutable after cluster creation?
The clusterId, location, and project properties cannot be changed after the cluster is created.
Security & Encryption
How do I enable mTLS authentication for my Kafka cluster?
Configure tlsConfig with a trustConfig containing casConfigs pointing to a CA pool. You can optionally set sslPrincipalMappingRules for principal mapping.
How do I remove mTLS configuration from my cluster?
Explicitly add an empty tlsConfig block to your configuration. Simply removing the block won’t clear the TLS settings in GCP.
How do I use customer-managed encryption keys (CMEK) with my cluster?
Set kmsKey in the gcpConfig block and ensure the Kafka service identity has appropriate permissions to access the key.
Resource Management
Why don't my labels match what's shown in the GCP console?
The labels field is non-authoritative and only manages labels in your Pulumi configuration. Use the effectiveLabels output to see all labels present on the resource in GCP.
What locations are supported for Managed Kafka clusters?
See https://cloud.google.com/managed-kafka/docs/locations for the list of supported locations. The location property is immutable after creation.
Capacity & Performance
What's the difference between capacityConfig and brokerCapacityConfig?
capacityConfig sets cluster-wide capacity (vCPU and memory), while brokerCapacityConfig configures capacity at a per-broker level within the cluster.

Using a different cloud?

Explore messaging guides for other cloud providers: