Kafka Provider
Installation
The Kafka provider is available as a package in all Pulumi languages:
- JavaScript/TypeScript:
@pulumi/kafka
- Python:
pulumi-kafka
- Go:
github.com/pulumi/pulumi-kafka/sdk/v3/go/kafka
- .NET:
Pulumi.Kafka
- Java:
com.pulumi/kafka
Overview
The Kafka provider is used to interact with Apache Kafka clusters. The provider allows you to manage Kafka topics, ACLs, quotas, and SCRAM credentials. It supports various authentication methods including TLS, SASL/PLAIN, SASL/SCRAM, AWS IAM, and OAuth.
Documentation
- Quick Start Guide - Get started quickly with common scenarios
- Authentication Guide - Detailed authentication configuration
- AWS MSK Integration - Complete MSK setup guide
- Migration Guide - Migrate between setups and versions
- Troubleshooting Guide - Common issues and solutions
Example Usage
Basic Configuration with TLS
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: nodejs
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:caCert:
value: 'TODO: file("../secrets/ca.crt")'
kafka:clientCert:
value: 'TODO: file("../secrets/client-cert.pem")'
kafka:clientKey:
value: 'TODO: file("../secrets/client-key.pem")'
kafka:tlsEnabled:
value: true
import * as pulumi from "@pulumi/pulumi";
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: python
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:caCert:
value: 'TODO: file("../secrets/ca.crt")'
kafka:clientCert:
value: 'TODO: file("../secrets/client-cert.pem")'
kafka:clientKey:
value: 'TODO: file("../secrets/client-key.pem")'
kafka:tlsEnabled:
value: true
import pulumi
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: dotnet
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:caCert:
value: 'TODO: file("../secrets/ca.crt")'
kafka:clientCert:
value: 'TODO: file("../secrets/client-cert.pem")'
kafka:clientKey:
value: 'TODO: file("../secrets/client-key.pem")'
kafka:tlsEnabled:
value: true
using System.Collections.Generic;
using System.Linq;
using Pulumi;
return await Deployment.RunAsync(() =>
{
});
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: go
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:caCert:
value: 'TODO: file("../secrets/ca.crt")'
kafka:clientCert:
value: 'TODO: file("../secrets/client-cert.pem")'
kafka:clientKey:
value: 'TODO: file("../secrets/client-key.pem")'
kafka:tlsEnabled:
value: true
package main
import (
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
return nil
})
}
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: yaml
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:caCert:
value: 'TODO: file("../secrets/ca.crt")'
kafka:clientCert:
value: 'TODO: file("../secrets/client-cert.pem")'
kafka:clientKey:
value: 'TODO: file("../secrets/client-key.pem")'
kafka:tlsEnabled:
value: true
{}
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: java
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:caCert:
value: 'TODO: file("../secrets/ca.crt")'
kafka:clientCert:
value: 'TODO: file("../secrets/client-cert.pem")'
kafka:clientKey:
value: 'TODO: file("../secrets/client-key.pem")'
kafka:tlsEnabled:
value: true
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
}
}
SASL/PLAIN Authentication
# Pulumi.yaml provider configuration file
name: configuration-example
runtime:
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:saslMechanism:
value: plain
kafka:saslPassword:
value: 'TODO: var.kafka_password'
kafka:saslUsername:
value: pulumi
kafka:tlsEnabled:
value: true
SASL/SCRAM Authentication
# Pulumi.yaml provider configuration file
name: configuration-example
runtime:
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:saslMechanism:
value: scram-sha256
kafka:saslPassword:
value: 'TODO: var.kafka_password'
kafka:saslUsername:
value: pulumi
kafka:tlsEnabled:
value: true
AWS MSK with IAM Authentication (Using AssumeRole)
# Pulumi.yaml provider configuration file
name: configuration-example
runtime:
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsRegion:
value: us-east-1
kafka:saslAwsRoleArn:
value: arn:aws:iam::123456789012:role/kafka-pulumi-role
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
AWS MSK with IAM Authentication (Using AWS Profile)
# Pulumi.yaml provider configuration file
name: configuration-example
runtime:
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsProfile:
value: production
kafka:saslAwsRegion:
value: us-east-1
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
AWS MSK with IAM Authentication (Using Static Credentials)
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: nodejs
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsAccessKey:
value: 'TODO: data.vault_aws_access_credentials.creds.access_key'
kafka:saslAwsRegion:
value: us-east-1
kafka:saslAwsSecretKey:
value: 'TODO: data.vault_aws_access_credentials.creds.secret_key'
kafka:saslAwsToken:
value: 'TODO: data.vault_aws_access_credentials.creds.security_token'
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
import * as pulumi from "@pulumi/pulumi";
import * as vault from "@pulumi/vault";
const creds = vault.aws.getAccessCredentials({
backend: "aws",
type: "sts",
role: "kafka-access-role",
});
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: python
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsAccessKey:
value: 'TODO: data.vault_aws_access_credentials.creds.access_key'
kafka:saslAwsRegion:
value: us-east-1
kafka:saslAwsSecretKey:
value: 'TODO: data.vault_aws_access_credentials.creds.secret_key'
kafka:saslAwsToken:
value: 'TODO: data.vault_aws_access_credentials.creds.security_token'
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
import pulumi
import pulumi_vault as vault
creds = vault.aws.get_access_credentials(backend="aws",
type="sts",
role="kafka-access-role")
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: dotnet
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsAccessKey:
value: 'TODO: data.vault_aws_access_credentials.creds.access_key'
kafka:saslAwsRegion:
value: us-east-1
kafka:saslAwsSecretKey:
value: 'TODO: data.vault_aws_access_credentials.creds.secret_key'
kafka:saslAwsToken:
value: 'TODO: data.vault_aws_access_credentials.creds.security_token'
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Vault = Pulumi.Vault;
return await Deployment.RunAsync(() =>
{
var creds = Vault.Aws.GetAccessCredentials.Invoke(new()
{
Backend = "aws",
Type = "sts",
Role = "kafka-access-role",
});
});
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: go
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsAccessKey:
value: 'TODO: data.vault_aws_access_credentials.creds.access_key'
kafka:saslAwsRegion:
value: us-east-1
kafka:saslAwsSecretKey:
value: 'TODO: data.vault_aws_access_credentials.creds.secret_key'
kafka:saslAwsToken:
value: 'TODO: data.vault_aws_access_credentials.creds.security_token'
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
package main
import (
"github.com/pulumi/pulumi-vault/sdk/v7/go/vault/aws"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := aws.GetAccessCredentials(ctx, &aws.GetAccessCredentialsArgs{
Backend: "aws",
Type: pulumi.StringRef("sts"),
Role: "kafka-access-role",
}, nil)
if err != nil {
return err
}
return nil
})
}
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: yaml
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsAccessKey:
value: 'TODO: data.vault_aws_access_credentials.creds.access_key'
kafka:saslAwsRegion:
value: us-east-1
kafka:saslAwsSecretKey:
value: 'TODO: data.vault_aws_access_credentials.creds.secret_key'
kafka:saslAwsToken:
value: 'TODO: data.vault_aws_access_credentials.creds.security_token'
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
variables:
creds:
fn::invoke:
function: vault:aws:getAccessCredentials
arguments:
backend: aws
type: sts
role: kafka-access-role
# Pulumi.yaml provider configuration file
name: configuration-example
runtime: java
config:
kafka:bootstrapServers:
value:
- b-1.msk-cluster.xxx.kafka.us-east-1.amazonaws.com:9098
kafka:saslAwsAccessKey:
value: 'TODO: data.vault_aws_access_credentials.creds.access_key'
kafka:saslAwsRegion:
value: us-east-1
kafka:saslAwsSecretKey:
value: 'TODO: data.vault_aws_access_credentials.creds.secret_key'
kafka:saslAwsToken:
value: 'TODO: data.vault_aws_access_credentials.creds.security_token'
kafka:saslMechanism:
value: aws-iam
kafka:tlsEnabled:
value: true
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.vault.aws.AwsFunctions;
import com.pulumi.vault.aws.inputs.GetAccessCredentialsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var creds = AwsFunctions.getAccessCredentials(GetAccessCredentialsArgs.builder()
.backend("aws")
.type("sts")
.role("kafka-access-role")
.build());
}
}
OAuth2 Authentication
# Pulumi.yaml provider configuration file
name: configuration-example
runtime:
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:saslMechanism:
value: oauthbearer
kafka:saslOauthScopes:
value:
- kafka:read
- kafka:write
kafka:saslTokenUrl:
value: https://oauth.example.com/oauth2/token
kafka:tlsEnabled:
value: true
Redpanda Compatibility
When using with Redpanda, you may need to force a specific Kafka protocol version:
# Pulumi.yaml provider configuration file
name: configuration-example
runtime:
config:
kafka:bootstrapServers:
value:
- localhost:9092
kafka:kafkaVersion:
value: 2.1.0
Configuration Reference
Required
bootstrapServers
(List of String) A list of kafka brokerscaCert
(String) CA certificate file to validate the server’s certificate.caCertFile
(String, Deprecated) Path to a CA certificate file to validate the server’s certificate.clientCert
(String) The client certificate.clientCertFile
(String, Deprecated) Path to a file containing the client certificate.clientKey
(String) The private key that the certificate was issued for.clientKeyFile
(String, Deprecated) Path to a file containing the private key that the certificate was issued for.clientKeyPassphrase
(String) The passphrase for the private key that the certificate was issued for.kafkaVersion
(String) The version of Kafka protocol to use in$MAJOR.$MINOR.$PATCH
format. Some features may not be available on older versions. Default is 2.7.0.saslAwsAccessKey
(String) The AWS access key.saslAwsContainerAuthorizationTokenFile
(String) Path to a file containing the AWS pod identity authorization tokensaslAwsContainerCredentialsFullUri
(String) URI to retrieve AWS credentials fromsaslAwsCredsDebug
(Boolean) Set this to true to turn AWS credentials debug.saslAwsExternalId
(String) External ID of the AWS IAM role to assumesaslAwsProfile
(String) AWS profile name to usesaslAwsRegion
(String) AWS region where MSK is deployed.saslAwsRoleArn
(String) Arn of an AWS IAM role to assumesaslAwsSecretKey
(String) The AWS secret key.saslAwsSharedConfigFiles
(List of String) List of paths to AWS shared config files.saslAwsToken
(String) The AWS session token. Only required if you are using temporary security credentials.saslMechanism
(String) SASL mechanism, can be plain, scram-sha512, scram-sha256, aws-iamsaslOauthScopes
(List of String) OAuth scopes to request when using the oauthbearer mechanismsaslPassword
(String) Password for SASL authentication.saslTokenUrl
(String) The url to retrieve oauth2 tokens from, when using sasl mechanism oauthbearersaslUsername
(String) Username for SASL authentication.skipTlsVerify
(Boolean) Set this to true only if the target Kafka server is an insecure development instance.timeout
(Number) Timeout in secondstlsEnabled
(Boolean) Enable communication with the Kafka Cluster over TLS.