Deploy GCP Managed Service for Apache Kafka Clusters

The gcp:managedkafka/cluster:Cluster resource, part of the Pulumi GCP provider, provisions a Managed Service for Apache Kafka cluster: its compute capacity, network placement, and security configuration. This guide focuses on three capabilities: capacity and network configuration, mTLS authentication, and customer-managed encryption.

Kafka clusters require VPC subnets for private network access and may reference Certificate Authority pools or Cloud KMS keys for security features. The examples are intentionally small. Combine them with your own VPC infrastructure, security policies, and topic management.

Create a cluster with capacity and network configuration

Most deployments start by defining cluster capacity and connecting to a VPC subnet for private network access.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const project = gcp.organizations.getProject({});
const example = new gcp.managedkafka.Cluster("example", {
    clusterId: "my-cluster",
    location: "us-central1",
    capacityConfig: {
        vcpuCount: "3",
        memoryBytes: "3221225472",
    },
    gcpConfig: {
        accessConfig: {
            networkConfigs: [{
                subnet: project.then(project => `projects/${project.number}/regions/us-central1/subnetworks/default`),
            }],
        },
    },
    rebalanceConfig: {
        mode: "AUTO_REBALANCE_ON_SCALE_UP",
    },
    labels: {
        key: "value",
    },
});
import pulumi
import pulumi_gcp as gcp

project = gcp.organizations.get_project()
example = gcp.managedkafka.Cluster("example",
    cluster_id="my-cluster",
    location="us-central1",
    capacity_config={
        "vcpu_count": "3",
        "memory_bytes": "3221225472",
    },
    gcp_config={
        "access_config": {
            "network_configs": [{
                "subnet": f"projects/{project.number}/regions/us-central1/subnetworks/default",
            }],
        },
    },
    rebalance_config={
        "mode": "AUTO_REBALANCE_ON_SCALE_UP",
    },
    labels={
        "key": "value",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/managedkafka"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		_, err = managedkafka.NewCluster(ctx, "example", &managedkafka.ClusterArgs{
			ClusterId: pulumi.String("my-cluster"),
			Location:  pulumi.String("us-central1"),
			CapacityConfig: &managedkafka.ClusterCapacityConfigArgs{
				VcpuCount:   pulumi.String("3"),
				MemoryBytes: pulumi.String("3221225472"),
			},
			GcpConfig: &managedkafka.ClusterGcpConfigArgs{
				AccessConfig: &managedkafka.ClusterGcpConfigAccessConfigArgs{
					NetworkConfigs: managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArray{
						&managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArgs{
							Subnet: pulumi.Sprintf("projects/%v/regions/us-central1/subnetworks/default", project.Number),
						},
					},
				},
			},
			RebalanceConfig: &managedkafka.ClusterRebalanceConfigArgs{
				Mode: pulumi.String("AUTO_REBALANCE_ON_SCALE_UP"),
			},
			Labels: pulumi.StringMap{
				"key": pulumi.String("value"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.ManagedKafka.Cluster("example", new()
    {
        ClusterId = "my-cluster",
        Location = "us-central1",
        CapacityConfig = new Gcp.ManagedKafka.Inputs.ClusterCapacityConfigArgs
        {
            VcpuCount = "3",
            MemoryBytes = "3221225472",
        },
        GcpConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigArgs
        {
            AccessConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigArgs
            {
                NetworkConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigNetworkConfigArgs
                    {
                        Subnet = $"projects/{project.Apply(getProjectResult => getProjectResult.Number)}/regions/us-central1/subnetworks/default",
                    },
                },
            },
        },
        RebalanceConfig = new Gcp.ManagedKafka.Inputs.ClusterRebalanceConfigArgs
        {
            Mode = "AUTO_REBALANCE_ON_SCALE_UP",
        },
        Labels = 
        {
            { "key", "value" },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.managedkafka.Cluster;
import com.pulumi.gcp.managedkafka.ClusterArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterCapacityConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigAccessConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterRebalanceConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Cluster("example", ClusterArgs.builder()
            .clusterId("my-cluster")
            .location("us-central1")
            .capacityConfig(ClusterCapacityConfigArgs.builder()
                .vcpuCount("3")
                .memoryBytes("3221225472")
                .build())
            .gcpConfig(ClusterGcpConfigArgs.builder()
                .accessConfig(ClusterGcpConfigAccessConfigArgs.builder()
                    .networkConfigs(ClusterGcpConfigAccessConfigNetworkConfigArgs.builder()
                        .subnet(String.format("projects/%s/regions/us-central1/subnetworks/default", project.number()))
                        .build())
                    .build())
                .build())
            .rebalanceConfig(ClusterRebalanceConfigArgs.builder()
                .mode("AUTO_REBALANCE_ON_SCALE_UP")
                .build())
            .labels(Map.of("key", "value"))
            .build());

    }
}
resources:
  example:
    type: gcp:managedkafka:Cluster
    properties:
      clusterId: my-cluster
      location: us-central1
      capacityConfig:
        vcpuCount: 3
        memoryBytes: 3.221225472e+09
      gcpConfig:
        accessConfig:
          networkConfigs:
            - subnet: projects/${project.number}/regions/us-central1/subnetworks/default
      rebalanceConfig:
        mode: AUTO_REBALANCE_ON_SCALE_UP
      labels:
        key: value
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The capacityConfig property sets compute resources: vcpuCount defines the number of virtual CPUs, and memoryBytes allocates memory in bytes. The gcpConfig.accessConfig.networkConfigs array specifies which VPC subnet the cluster joins, enabling private connectivity to your applications. The rebalanceConfig property controls how Kafka redistributes partitions when you scale the cluster; AUTO_REBALANCE_ON_SCALE_UP automatically rebalances when adding capacity.

Enable mutual TLS authentication with Certificate Authority

Applications requiring client certificate authentication use mTLS to verify both client and server identities.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const caPool = new gcp.certificateauthority.CaPool("ca_pool", {
    name: "my-ca-pool",
    location: "us-central1",
    tier: "ENTERPRISE",
    publishingOptions: {
        publishCaCert: true,
        publishCrl: true,
    },
});
const project = gcp.organizations.getProject({});
const example = new gcp.managedkafka.Cluster("example", {
    clusterId: "my-cluster",
    location: "us-central1",
    capacityConfig: {
        vcpuCount: "3",
        memoryBytes: "3221225472",
    },
    gcpConfig: {
        accessConfig: {
            networkConfigs: [{
                subnet: project.then(project => `projects/${project.number}/regions/us-central1/subnetworks/default`),
            }],
        },
    },
    tlsConfig: {
        trustConfig: {
            casConfigs: [{
                caPool: caPool.id,
            }],
        },
        sslPrincipalMappingRules: "RULE:pattern/replacement/L,DEFAULT",
    },
});
import pulumi
import pulumi_gcp as gcp

ca_pool = gcp.certificateauthority.CaPool("ca_pool",
    name="my-ca-pool",
    location="us-central1",
    tier="ENTERPRISE",
    publishing_options={
        "publish_ca_cert": True,
        "publish_crl": True,
    })
project = gcp.organizations.get_project()
example = gcp.managedkafka.Cluster("example",
    cluster_id="my-cluster",
    location="us-central1",
    capacity_config={
        "vcpu_count": "3",
        "memory_bytes": "3221225472",
    },
    gcp_config={
        "access_config": {
            "network_configs": [{
                "subnet": f"projects/{project.number}/regions/us-central1/subnetworks/default",
            }],
        },
    },
    tls_config={
        "trust_config": {
            "cas_configs": [{
                "ca_pool": ca_pool.id,
            }],
        },
        "ssl_principal_mapping_rules": "RULE:pattern/replacement/L,DEFAULT",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/certificateauthority"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/managedkafka"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		caPool, err := certificateauthority.NewCaPool(ctx, "ca_pool", &certificateauthority.CaPoolArgs{
			Name:     pulumi.String("my-ca-pool"),
			Location: pulumi.String("us-central1"),
			Tier:     pulumi.String("ENTERPRISE"),
			PublishingOptions: &certificateauthority.CaPoolPublishingOptionsArgs{
				PublishCaCert: pulumi.Bool(true),
				PublishCrl:    pulumi.Bool(true),
			},
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		_, err = managedkafka.NewCluster(ctx, "example", &managedkafka.ClusterArgs{
			ClusterId: pulumi.String("my-cluster"),
			Location:  pulumi.String("us-central1"),
			CapacityConfig: &managedkafka.ClusterCapacityConfigArgs{
				VcpuCount:   pulumi.String("3"),
				MemoryBytes: pulumi.String("3221225472"),
			},
			GcpConfig: &managedkafka.ClusterGcpConfigArgs{
				AccessConfig: &managedkafka.ClusterGcpConfigAccessConfigArgs{
					NetworkConfigs: managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArray{
						&managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArgs{
							Subnet: pulumi.Sprintf("projects/%v/regions/us-central1/subnetworks/default", project.Number),
						},
					},
				},
			},
			TlsConfig: &managedkafka.ClusterTlsConfigArgs{
				TrustConfig: &managedkafka.ClusterTlsConfigTrustConfigArgs{
					CasConfigs: managedkafka.ClusterTlsConfigTrustConfigCasConfigArray{
						&managedkafka.ClusterTlsConfigTrustConfigCasConfigArgs{
							CaPool: caPool.ID(),
						},
					},
				},
				SslPrincipalMappingRules: pulumi.String("RULE:pattern/replacement/L,DEFAULT"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var caPool = new Gcp.CertificateAuthority.CaPool("ca_pool", new()
    {
        Name = "my-ca-pool",
        Location = "us-central1",
        Tier = "ENTERPRISE",
        PublishingOptions = new Gcp.CertificateAuthority.Inputs.CaPoolPublishingOptionsArgs
        {
            PublishCaCert = true,
            PublishCrl = true,
        },
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.ManagedKafka.Cluster("example", new()
    {
        ClusterId = "my-cluster",
        Location = "us-central1",
        CapacityConfig = new Gcp.ManagedKafka.Inputs.ClusterCapacityConfigArgs
        {
            VcpuCount = "3",
            MemoryBytes = "3221225472",
        },
        GcpConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigArgs
        {
            AccessConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigArgs
            {
                NetworkConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigNetworkConfigArgs
                    {
                        Subnet = $"projects/{project.Apply(getProjectResult => getProjectResult.Number)}/regions/us-central1/subnetworks/default",
                    },
                },
            },
        },
        TlsConfig = new Gcp.ManagedKafka.Inputs.ClusterTlsConfigArgs
        {
            TrustConfig = new Gcp.ManagedKafka.Inputs.ClusterTlsConfigTrustConfigArgs
            {
                CasConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterTlsConfigTrustConfigCasConfigArgs
                    {
                        CaPool = caPool.Id,
                    },
                },
            },
            SslPrincipalMappingRules = "RULE:pattern/replacement/L,DEFAULT",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.certificateauthority.CaPool;
import com.pulumi.gcp.certificateauthority.CaPoolArgs;
import com.pulumi.gcp.certificateauthority.inputs.CaPoolPublishingOptionsArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.managedkafka.Cluster;
import com.pulumi.gcp.managedkafka.ClusterArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterCapacityConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigAccessConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterTlsConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterTlsConfigTrustConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var caPool = new CaPool("caPool", CaPoolArgs.builder()
            .name("my-ca-pool")
            .location("us-central1")
            .tier("ENTERPRISE")
            .publishingOptions(CaPoolPublishingOptionsArgs.builder()
                .publishCaCert(true)
                .publishCrl(true)
                .build())
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Cluster("example", ClusterArgs.builder()
            .clusterId("my-cluster")
            .location("us-central1")
            .capacityConfig(ClusterCapacityConfigArgs.builder()
                .vcpuCount("3")
                .memoryBytes("3221225472")
                .build())
            .gcpConfig(ClusterGcpConfigArgs.builder()
                .accessConfig(ClusterGcpConfigAccessConfigArgs.builder()
                    .networkConfigs(ClusterGcpConfigAccessConfigNetworkConfigArgs.builder()
                        .subnet(String.format("projects/%s/regions/us-central1/subnetworks/default", project.number()))
                        .build())
                    .build())
                .build())
            .tlsConfig(ClusterTlsConfigArgs.builder()
                .trustConfig(ClusterTlsConfigTrustConfigArgs.builder()
                    .casConfigs(ClusterTlsConfigTrustConfigCasConfigArgs.builder()
                        .caPool(caPool.id())
                        .build())
                    .build())
                .sslPrincipalMappingRules("RULE:pattern/replacement/L,DEFAULT")
                .build())
            .build());

    }
}
resources:
  example:
    type: gcp:managedkafka:Cluster
    properties:
      clusterId: my-cluster
      location: us-central1
      capacityConfig:
        vcpuCount: 3
        memoryBytes: 3.221225472e+09
      gcpConfig:
        accessConfig:
          networkConfigs:
            - subnet: projects/${project.number}/regions/us-central1/subnetworks/default
      tlsConfig:
        trustConfig:
          casConfigs:
            - caPool: ${caPool.id}
        sslPrincipalMappingRules: RULE:pattern/replacement/L,DEFAULT
  caPool:
    type: gcp:certificateauthority:CaPool
    name: ca_pool
    properties:
      name: my-ca-pool
      location: us-central1
      tier: ENTERPRISE
      publishingOptions:
        publishCaCert: true
        publishCrl: true
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The tlsConfig.trustConfig.casConfigs array references Certificate Authority pools that issue client certificates. The sslPrincipalMappingRules property defines how certificate distinguished names map to Kafka principals, controlling authorization. This configuration requires clients to present valid certificates signed by the specified CA pool before connecting to the cluster.

Encrypt cluster data with customer-managed keys

Organizations with strict data governance use customer-managed encryption keys to control encryption at rest.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const project = gcp.organizations.getProject({});
const example = new gcp.managedkafka.Cluster("example", {
    clusterId: "my-cluster",
    location: "us-central1",
    capacityConfig: {
        vcpuCount: "3",
        memoryBytes: "3221225472",
    },
    gcpConfig: {
        accessConfig: {
            networkConfigs: [{
                subnet: project.then(project => `projects/${project.number}/regions/us-central1/subnetworks/default`),
            }],
        },
        kmsKey: "example-key",
    },
});
const kafkaServiceIdentity = new gcp.projects.ServiceIdentity("kafka_service_identity", {
    project: project.then(project => project.projectId),
    service: "managedkafka.googleapis.com",
});
import pulumi
import pulumi_gcp as gcp

project = gcp.organizations.get_project()
example = gcp.managedkafka.Cluster("example",
    cluster_id="my-cluster",
    location="us-central1",
    capacity_config={
        "vcpu_count": "3",
        "memory_bytes": "3221225472",
    },
    gcp_config={
        "access_config": {
            "network_configs": [{
                "subnet": f"projects/{project.number}/regions/us-central1/subnetworks/default",
            }],
        },
        "kms_key": "example-key",
    })
kafka_service_identity = gcp.projects.ServiceIdentity("kafka_service_identity",
    project=project.project_id,
    service="managedkafka.googleapis.com")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/managedkafka"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/projects"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		_, err = managedkafka.NewCluster(ctx, "example", &managedkafka.ClusterArgs{
			ClusterId: pulumi.String("my-cluster"),
			Location:  pulumi.String("us-central1"),
			CapacityConfig: &managedkafka.ClusterCapacityConfigArgs{
				VcpuCount:   pulumi.String("3"),
				MemoryBytes: pulumi.String("3221225472"),
			},
			GcpConfig: &managedkafka.ClusterGcpConfigArgs{
				AccessConfig: &managedkafka.ClusterGcpConfigAccessConfigArgs{
					NetworkConfigs: managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArray{
						&managedkafka.ClusterGcpConfigAccessConfigNetworkConfigArgs{
							Subnet: pulumi.Sprintf("projects/%v/regions/us-central1/subnetworks/default", project.Number),
						},
					},
				},
				KmsKey: pulumi.String("example-key"),
			},
		})
		if err != nil {
			return err
		}
		_, err = projects.NewServiceIdentity(ctx, "kafka_service_identity", &projects.ServiceIdentityArgs{
			Project: pulumi.String(project.ProjectId),
			Service: pulumi.String("managedkafka.googleapis.com"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.ManagedKafka.Cluster("example", new()
    {
        ClusterId = "my-cluster",
        Location = "us-central1",
        CapacityConfig = new Gcp.ManagedKafka.Inputs.ClusterCapacityConfigArgs
        {
            VcpuCount = "3",
            MemoryBytes = "3221225472",
        },
        GcpConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigArgs
        {
            AccessConfig = new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigArgs
            {
                NetworkConfigs = new[]
                {
                    new Gcp.ManagedKafka.Inputs.ClusterGcpConfigAccessConfigNetworkConfigArgs
                    {
                        Subnet = $"projects/{project.Apply(getProjectResult => getProjectResult.Number)}/regions/us-central1/subnetworks/default",
                    },
                },
            },
            KmsKey = "example-key",
        },
    });

    var kafkaServiceIdentity = new Gcp.Projects.ServiceIdentity("kafka_service_identity", new()
    {
        Project = project.Apply(getProjectResult => getProjectResult.ProjectId),
        Service = "managedkafka.googleapis.com",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.managedkafka.Cluster;
import com.pulumi.gcp.managedkafka.ClusterArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterCapacityConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigArgs;
import com.pulumi.gcp.managedkafka.inputs.ClusterGcpConfigAccessConfigArgs;
import com.pulumi.gcp.projects.ServiceIdentity;
import com.pulumi.gcp.projects.ServiceIdentityArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Cluster("example", ClusterArgs.builder()
            .clusterId("my-cluster")
            .location("us-central1")
            .capacityConfig(ClusterCapacityConfigArgs.builder()
                .vcpuCount("3")
                .memoryBytes("3221225472")
                .build())
            .gcpConfig(ClusterGcpConfigArgs.builder()
                .accessConfig(ClusterGcpConfigAccessConfigArgs.builder()
                    .networkConfigs(ClusterGcpConfigAccessConfigNetworkConfigArgs.builder()
                        .subnet(String.format("projects/%s/regions/us-central1/subnetworks/default", project.number()))
                        .build())
                    .build())
                .kmsKey("example-key")
                .build())
            .build());

        var kafkaServiceIdentity = new ServiceIdentity("kafkaServiceIdentity", ServiceIdentityArgs.builder()
            .project(project.projectId())
            .service("managedkafka.googleapis.com")
            .build());

    }
}
resources:
  example:
    type: gcp:managedkafka:Cluster
    properties:
      clusterId: my-cluster
      location: us-central1
      capacityConfig:
        vcpuCount: 3
        memoryBytes: 3.221225472e+09
      gcpConfig:
        accessConfig:
          networkConfigs:
            - subnet: projects/${project.number}/regions/us-central1/subnetworks/default
        kmsKey: example-key
  kafkaServiceIdentity:
    type: gcp:projects:ServiceIdentity
    name: kafka_service_identity
    properties:
      project: ${project.projectId}
      service: managedkafka.googleapis.com
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The gcpConfig.kmsKey property references a Cloud KMS key that encrypts cluster data at rest. The ServiceIdentity resource grants the Managed Kafka service permission to use your KMS key for encryption operations. This gives you control over key rotation, access policies, and audit logging for encryption activities.

Beyond these examples

These snippets focus on specific cluster-level features: capacity configuration and VPC networking, mTLS authentication with Certificate Authority, and customer-managed encryption keys. They’re intentionally minimal rather than full Kafka deployments.

The examples may reference pre-existing infrastructure such as VPC subnets in the target region, Certificate Authority pools for mTLS, and Cloud KMS keys for CMEK. They focus on configuring the cluster rather than provisioning the surrounding infrastructure.

To keep things focused, common cluster patterns are omitted, including:

  • Per-broker capacity tuning (brokerCapacityConfig)
  • Rebalancing behavior configuration (rebalanceConfig)
  • Custom labels for organization and cost tracking
  • Topic and consumer group management

These omissions are intentional: the goal is to illustrate how each cluster feature is wired, not provide drop-in Kafka modules. See the Managed Kafka Cluster resource reference for all available configuration options.

Let's deploy GCP Managed Service for Apache Kafka Clusters

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Setup
What are the naming requirements for clusterId?
The clusterId must be 1-63 characters long and match the regex a-z? (lowercase letters, numbers, and hyphens only) to comply with RFC 1035.
What's the minimum configuration needed to create a Kafka cluster?
You need clusterId, location, capacityConfig (with vcpuCount and memoryBytes), and gcpConfig with a subnet specified in accessConfig.networkConfigs.
What properties can't I change after creating a cluster?
The clusterId, location, and project properties are immutable. Changing any of these requires replacing the cluster.
Security & Encryption
How do I enable mTLS authentication for my cluster?
Configure tlsConfig with a trustConfig containing casConfigs pointing to your Certificate Authority pool, and set sslPrincipalMappingRules for principal mapping.
How do I remove TLS configuration from my cluster?
Explicitly add an empty tlsConfig block to your configuration. Simply removing the block won’t clear the TLS settings.
How do I use customer-managed encryption keys (CMEK)?
Set gcpConfig.kmsKey to your KMS key identifier and create a gcp.projects.ServiceIdentity resource for the managedkafka.googleapis.com service.
Resource Management
Why aren't all my labels showing up in Pulumi state?
The labels field is non-authoritative and only manages labels defined in your configuration. Use the effectiveLabels output property to see all labels on the resource, including those set by other clients.
What does AUTO_REBALANCE_ON_SCALE_UP do?
This rebalanceConfig.mode setting automatically rebalances partitions across brokers when you scale up the cluster capacity.

Using a different cloud?

Explore messaging guides for other cloud providers: