MongoDB Atlas v3.38.0 published on Thursday, Dec 4, 2025 by Pulumi
MongoDB Atlas v3.38.0 published on Thursday, Dec 4, 2025 by Pulumi
# Data Source:<span pulumi-lang-nodejs=" mongodbatlas.StreamProcessor
" pulumi-lang-dotnet=" mongodbatlas.StreamProcessor
" pulumi-lang-go=" StreamProcessor
" pulumi-lang-python=" StreamProcessor
" pulumi-lang-yaml=" mongodbatlas.StreamProcessor
" pulumi-lang-java=" mongodbatlas.StreamProcessor
“> mongodbatlas.StreamProcessor
mongodbatlas.StreamProcessor describes a stream processor.
Example Usage
S
import * as pulumi from "@pulumi/pulumi";
import * as mongodbatlas from "@pulumi/mongodbatlas";
const example = new mongodbatlas.StreamInstance("example", {
projectId: projectId,
instanceName: "InstanceName",
dataProcessRegion: {
region: "VIRGINIA_USA",
cloudProvider: "AWS",
},
});
const example_sample = new mongodbatlas.StreamConnection("example-sample", {
projectId: projectId,
instanceName: example.instanceName,
connectionName: "sample_stream_solar",
type: "Sample",
});
const example_cluster = new mongodbatlas.StreamConnection("example-cluster", {
projectId: projectId,
instanceName: example.instanceName,
connectionName: "ClusterConnection",
type: "Cluster",
clusterName: clusterName,
dbRoleToExecute: {
role: "atlasAdmin",
type: "BUILT_IN",
},
});
const example_kafka = new mongodbatlas.StreamConnection("example-kafka", {
projectId: projectId,
instanceName: example.instanceName,
connectionName: "KafkaPlaintextConnection",
type: "Kafka",
authentication: {
mechanism: "PLAIN",
username: kafkaUsername,
password: kafkaPassword,
},
bootstrapServers: "localhost:9092,localhost:9092",
config: {
"auto.offset.reset": "earliest",
},
security: {
protocol: "SASL_PLAINTEXT",
},
});
const stream_processor_sample_example = new mongodbatlas.StreamProcessor("stream-processor-sample-example", {
projectId: projectId,
instanceName: example.instanceName,
processorName: "sampleProcessorName",
pipeline: JSON.stringify([
{
$source: {
connectionName: mongodbatlasStreamConnection["example-sample"].connectionName,
},
},
{
$emit: {
connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
db: "sample",
coll: "solar",
timeseries: {
timeField: "_ts",
},
},
},
]),
state: "STARTED",
});
const stream_processor_cluster_to_kafka_example = new mongodbatlas.StreamProcessor("stream-processor-cluster-to-kafka-example", {
projectId: projectId,
instanceName: example.instanceName,
processorName: "clusterProcessorName",
pipeline: JSON.stringify([
{
$source: {
connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
},
},
{
$emit: {
connectionName: mongodbatlasStreamConnection["example-kafka"].connectionName,
topic: "topic_from_cluster",
},
},
]),
state: "CREATED",
});
const stream_processor_kafka_to_cluster_example = new mongodbatlas.StreamProcessor("stream-processor-kafka-to-cluster-example", {
projectId: projectId,
instanceName: example.instanceName,
processorName: "kafkaProcessorName",
pipeline: JSON.stringify([
{
$source: {
connectionName: mongodbatlasStreamConnection["example-kafka"].connectionName,
topic: "topic_source",
},
},
{
$emit: {
connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
db: "kafka",
coll: "topic_source",
timeseries: {
timeField: "ts",
},
},
},
]),
state: "CREATED",
options: {
dlq: {
coll: "exampleColumn",
connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
db: "exampleDb",
},
},
});
const example_stream_processors = example.instanceName.apply(instanceName => mongodbatlas.getStreamProcessorsOutput({
projectId: projectId,
instanceName: instanceName,
}));
const example_stream_processor = pulumi.all([example.instanceName, stream_processor_sample_example.processorName]).apply(([instanceName, processorName]) => mongodbatlas.getStreamProcessorOutput({
projectId: projectId,
instanceName: instanceName,
processorName: processorName,
}));
export const streamProcessorsState = example_stream_processor.apply(example_stream_processor => example_stream_processor.state);
export const streamProcessorsResults = example_stream_processors.apply(example_stream_processors => example_stream_processors.results);
import pulumi
import json
import pulumi_mongodbatlas as mongodbatlas
example = mongodbatlas.StreamInstance("example",
project_id=project_id,
instance_name="InstanceName",
data_process_region={
"region": "VIRGINIA_USA",
"cloud_provider": "AWS",
})
example_sample = mongodbatlas.StreamConnection("example-sample",
project_id=project_id,
instance_name=example.instance_name,
connection_name="sample_stream_solar",
type="Sample")
example_cluster = mongodbatlas.StreamConnection("example-cluster",
project_id=project_id,
instance_name=example.instance_name,
connection_name="ClusterConnection",
type="Cluster",
cluster_name=cluster_name,
db_role_to_execute={
"role": "atlasAdmin",
"type": "BUILT_IN",
})
example_kafka = mongodbatlas.StreamConnection("example-kafka",
project_id=project_id,
instance_name=example.instance_name,
connection_name="KafkaPlaintextConnection",
type="Kafka",
authentication={
"mechanism": "PLAIN",
"username": kafka_username,
"password": kafka_password,
},
bootstrap_servers="localhost:9092,localhost:9092",
config={
"auto.offset.reset": "earliest",
},
security={
"protocol": "SASL_PLAINTEXT",
})
stream_processor_sample_example = mongodbatlas.StreamProcessor("stream-processor-sample-example",
project_id=project_id,
instance_name=example.instance_name,
processor_name="sampleProcessorName",
pipeline=json.dumps([
{
"$source": {
"connectionName": mongodbatlas_stream_connection["example-sample"]["connectionName"],
},
},
{
"$emit": {
"connectionName": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
"db": "sample",
"coll": "solar",
"timeseries": {
"timeField": "_ts",
},
},
},
]),
state="STARTED")
stream_processor_cluster_to_kafka_example = mongodbatlas.StreamProcessor("stream-processor-cluster-to-kafka-example",
project_id=project_id,
instance_name=example.instance_name,
processor_name="clusterProcessorName",
pipeline=json.dumps([
{
"$source": {
"connectionName": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
},
},
{
"$emit": {
"connectionName": mongodbatlas_stream_connection["example-kafka"]["connectionName"],
"topic": "topic_from_cluster",
},
},
]),
state="CREATED")
stream_processor_kafka_to_cluster_example = mongodbatlas.StreamProcessor("stream-processor-kafka-to-cluster-example",
project_id=project_id,
instance_name=example.instance_name,
processor_name="kafkaProcessorName",
pipeline=json.dumps([
{
"$source": {
"connectionName": mongodbatlas_stream_connection["example-kafka"]["connectionName"],
"topic": "topic_source",
},
},
{
"$emit": {
"connectionName": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
"db": "kafka",
"coll": "topic_source",
"timeseries": {
"timeField": "ts",
},
},
},
]),
state="CREATED",
options={
"dlq": {
"coll": "exampleColumn",
"connection_name": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
"db": "exampleDb",
},
})
example_stream_processors = example.instance_name.apply(lambda instance_name: mongodbatlas.get_stream_processors_output(project_id=project_id,
instance_name=instance_name))
example_stream_processor = pulumi.Output.all(
instance_name=example.instance_name,
processor_name=stream_processor_sample_example.processor_name
).apply(lambda resolved_outputs: mongodbatlas.get_stream_processor_output(project_id=project_id,
instance_name=resolved_outputs['instance_name'],
processor_name=resolved_outputs['processor_name']))
pulumi.export("streamProcessorsState", example_stream_processor.state)
pulumi.export("streamProcessorsResults", example_stream_processors.results)
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-mongodbatlas/sdk/v3/go/mongodbatlas"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := mongodbatlas.NewStreamInstance(ctx, "example", &mongodbatlas.StreamInstanceArgs{
ProjectId: pulumi.Any(projectId),
InstanceName: pulumi.String("InstanceName"),
DataProcessRegion: &mongodbatlas.StreamInstanceDataProcessRegionArgs{
Region: pulumi.String("VIRGINIA_USA"),
CloudProvider: pulumi.String("AWS"),
},
})
if err != nil {
return err
}
_, err = mongodbatlas.NewStreamConnection(ctx, "example-sample", &mongodbatlas.StreamConnectionArgs{
ProjectId: pulumi.Any(projectId),
InstanceName: example.InstanceName,
ConnectionName: pulumi.String("sample_stream_solar"),
Type: pulumi.String("Sample"),
})
if err != nil {
return err
}
_, err = mongodbatlas.NewStreamConnection(ctx, "example-cluster", &mongodbatlas.StreamConnectionArgs{
ProjectId: pulumi.Any(projectId),
InstanceName: example.InstanceName,
ConnectionName: pulumi.String("ClusterConnection"),
Type: pulumi.String("Cluster"),
ClusterName: pulumi.Any(clusterName),
DbRoleToExecute: &mongodbatlas.StreamConnectionDbRoleToExecuteArgs{
Role: pulumi.String("atlasAdmin"),
Type: pulumi.String("BUILT_IN"),
},
})
if err != nil {
return err
}
_, err = mongodbatlas.NewStreamConnection(ctx, "example-kafka", &mongodbatlas.StreamConnectionArgs{
ProjectId: pulumi.Any(projectId),
InstanceName: example.InstanceName,
ConnectionName: pulumi.String("KafkaPlaintextConnection"),
Type: pulumi.String("Kafka"),
Authentication: &mongodbatlas.StreamConnectionAuthenticationArgs{
Mechanism: pulumi.String("PLAIN"),
Username: pulumi.Any(kafkaUsername),
Password: pulumi.Any(kafkaPassword),
},
BootstrapServers: pulumi.String("localhost:9092,localhost:9092"),
Config: pulumi.StringMap{
"auto.offset.reset": pulumi.String("earliest"),
},
Security: &mongodbatlas.StreamConnectionSecurityArgs{
Protocol: pulumi.String("SASL_PLAINTEXT"),
},
})
if err != nil {
return err
}
tmpJSON0, err := json.Marshal([]interface{}{
map[string]interface{}{
"$source": map[string]interface{}{
"connectionName": mongodbatlasStreamConnection.ExampleSample.ConnectionName,
},
},
map[string]interface{}{
"$emit": map[string]interface{}{
"connectionName": mongodbatlasStreamConnection.ExampleCluster.ConnectionName,
"db": "sample",
"coll": "solar",
"timeseries": map[string]interface{}{
"timeField": "_ts",
},
},
},
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
stream_processor_sample_example, err := mongodbatlas.NewStreamProcessor(ctx, "stream-processor-sample-example", &mongodbatlas.StreamProcessorArgs{
ProjectId: pulumi.Any(projectId),
InstanceName: example.InstanceName,
ProcessorName: pulumi.String("sampleProcessorName"),
Pipeline: pulumi.String(json0),
State: pulumi.String("STARTED"),
})
if err != nil {
return err
}
tmpJSON1, err := json.Marshal([]interface{}{
map[string]interface{}{
"$source": map[string]interface{}{
"connectionName": mongodbatlasStreamConnection.ExampleCluster.ConnectionName,
},
},
map[string]interface{}{
"$emit": map[string]interface{}{
"connectionName": mongodbatlasStreamConnection.ExampleKafka.ConnectionName,
"topic": "topic_from_cluster",
},
},
})
if err != nil {
return err
}
json1 := string(tmpJSON1)
_, err = mongodbatlas.NewStreamProcessor(ctx, "stream-processor-cluster-to-kafka-example", &mongodbatlas.StreamProcessorArgs{
ProjectId: pulumi.Any(projectId),
InstanceName: example.InstanceName,
ProcessorName: pulumi.String("clusterProcessorName"),
Pipeline: pulumi.String(json1),
State: pulumi.String("CREATED"),
})
if err != nil {
return err
}
tmpJSON2, err := json.Marshal([]interface{}{
map[string]interface{}{
"$source": map[string]interface{}{
"connectionName": mongodbatlasStreamConnection.ExampleKafka.ConnectionName,
"topic": "topic_source",
},
},
map[string]interface{}{
"$emit": map[string]interface{}{
"connectionName": mongodbatlasStreamConnection.ExampleCluster.ConnectionName,
"db": "kafka",
"coll": "topic_source",
"timeseries": map[string]interface{}{
"timeField": "ts",
},
},
},
})
if err != nil {
return err
}
json2 := string(tmpJSON2)
_, err = mongodbatlas.NewStreamProcessor(ctx, "stream-processor-kafka-to-cluster-example", &mongodbatlas.StreamProcessorArgs{
ProjectId: pulumi.Any(projectId),
InstanceName: example.InstanceName,
ProcessorName: pulumi.String("kafkaProcessorName"),
Pipeline: pulumi.String(json2),
State: pulumi.String("CREATED"),
Options: &mongodbatlas.StreamProcessorOptionsArgs{
Dlq: &mongodbatlas.StreamProcessorOptionsDlqArgs{
Coll: pulumi.String("exampleColumn"),
ConnectionName: pulumi.Any(mongodbatlasStreamConnection.ExampleCluster.ConnectionName),
Db: pulumi.String("exampleDb"),
},
},
})
if err != nil {
return err
}
example_stream_processors := example.InstanceName.ApplyT(func(instanceName string) (mongodbatlas.GetStreamProcessorsResult, error) {
return mongodbatlas.GetStreamProcessorsResult(interface{}(mongodbatlas.LookupStreamProcessors(ctx, &mongodbatlas.LookupStreamProcessorsArgs{
ProjectId: projectId,
InstanceName: instanceName,
}, nil))), nil
}).(mongodbatlas.GetStreamProcessorsResultOutput)
example_stream_processor := pulumi.All(example.InstanceName, stream_processor_sample_example.ProcessorName).ApplyT(func(_args []interface{}) (mongodbatlas.GetStreamProcessorResult, error) {
instanceName := _args[0].(string)
processorName := _args[1].(string)
return mongodbatlas.GetStreamProcessorResult(interface{}(mongodbatlas.LookupStreamProcessor(ctx, &mongodbatlas.LookupStreamProcessorArgs{
ProjectId: projectId,
InstanceName: instanceName,
ProcessorName: processorName,
}, nil))), nil
}).(mongodbatlas.GetStreamProcessorResultOutput)
ctx.Export("streamProcessorsState", example_stream_processor.ApplyT(func(example_stream_processor mongodbatlas.GetStreamProcessorResult) (*string, error) {
return &example_stream_processor.State, nil
}).(pulumi.StringPtrOutput))
ctx.Export("streamProcessorsResults", example_stream_processors.ApplyT(func(example_stream_processors mongodbatlas.GetStreamProcessorsResult) ([]mongodbatlas.GetStreamProcessorsResult, error) {
return []mongodbatlas.GetStreamProcessorsResult(example_stream_processors.Results), nil
}).([]mongodbatlas.GetStreamProcessorsResultOutput))
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Mongodbatlas = Pulumi.Mongodbatlas;
return await Deployment.RunAsync(() =>
{
var example = new Mongodbatlas.StreamInstance("example", new()
{
ProjectId = projectId,
InstanceName = "InstanceName",
DataProcessRegion = new Mongodbatlas.Inputs.StreamInstanceDataProcessRegionArgs
{
Region = "VIRGINIA_USA",
CloudProvider = "AWS",
},
});
var example_sample = new Mongodbatlas.StreamConnection("example-sample", new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
ConnectionName = "sample_stream_solar",
Type = "Sample",
});
var example_cluster = new Mongodbatlas.StreamConnection("example-cluster", new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
ConnectionName = "ClusterConnection",
Type = "Cluster",
ClusterName = clusterName,
DbRoleToExecute = new Mongodbatlas.Inputs.StreamConnectionDbRoleToExecuteArgs
{
Role = "atlasAdmin",
Type = "BUILT_IN",
},
});
var example_kafka = new Mongodbatlas.StreamConnection("example-kafka", new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
ConnectionName = "KafkaPlaintextConnection",
Type = "Kafka",
Authentication = new Mongodbatlas.Inputs.StreamConnectionAuthenticationArgs
{
Mechanism = "PLAIN",
Username = kafkaUsername,
Password = kafkaPassword,
},
BootstrapServers = "localhost:9092,localhost:9092",
Config =
{
{ "auto.offset.reset", "earliest" },
},
Security = new Mongodbatlas.Inputs.StreamConnectionSecurityArgs
{
Protocol = "SASL_PLAINTEXT",
},
});
var stream_processor_sample_example = new Mongodbatlas.StreamProcessor("stream-processor-sample-example", new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
ProcessorName = "sampleProcessorName",
Pipeline = JsonSerializer.Serialize(new[]
{
new Dictionary<string, object?>
{
["$source"] = new Dictionary<string, object?>
{
["connectionName"] = mongodbatlasStreamConnection.Example_sample.ConnectionName,
},
},
new Dictionary<string, object?>
{
["$emit"] = new Dictionary<string, object?>
{
["connectionName"] = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
["db"] = "sample",
["coll"] = "solar",
["timeseries"] = new Dictionary<string, object?>
{
["timeField"] = "_ts",
},
},
},
}),
State = "STARTED",
});
var stream_processor_cluster_to_kafka_example = new Mongodbatlas.StreamProcessor("stream-processor-cluster-to-kafka-example", new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
ProcessorName = "clusterProcessorName",
Pipeline = JsonSerializer.Serialize(new[]
{
new Dictionary<string, object?>
{
["$source"] = new Dictionary<string, object?>
{
["connectionName"] = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
},
},
new Dictionary<string, object?>
{
["$emit"] = new Dictionary<string, object?>
{
["connectionName"] = mongodbatlasStreamConnection.Example_kafka.ConnectionName,
["topic"] = "topic_from_cluster",
},
},
}),
State = "CREATED",
});
var stream_processor_kafka_to_cluster_example = new Mongodbatlas.StreamProcessor("stream-processor-kafka-to-cluster-example", new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
ProcessorName = "kafkaProcessorName",
Pipeline = JsonSerializer.Serialize(new[]
{
new Dictionary<string, object?>
{
["$source"] = new Dictionary<string, object?>
{
["connectionName"] = mongodbatlasStreamConnection.Example_kafka.ConnectionName,
["topic"] = "topic_source",
},
},
new Dictionary<string, object?>
{
["$emit"] = new Dictionary<string, object?>
{
["connectionName"] = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
["db"] = "kafka",
["coll"] = "topic_source",
["timeseries"] = new Dictionary<string, object?>
{
["timeField"] = "ts",
},
},
},
}),
State = "CREATED",
Options = new Mongodbatlas.Inputs.StreamProcessorOptionsArgs
{
Dlq = new Mongodbatlas.Inputs.StreamProcessorOptionsDlqArgs
{
Coll = "exampleColumn",
ConnectionName = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
Db = "exampleDb",
},
},
});
var example_stream_processors = Mongodbatlas.GetStreamProcessors.Invoke(new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
});
var example_stream_processor = Mongodbatlas.GetStreamProcessor.Invoke(new()
{
ProjectId = projectId,
InstanceName = example.InstanceName,
ProcessorName = stream_processor_sample_example.ProcessorName,
});
return new Dictionary<string, object?>
{
["streamProcessorsState"] = example_stream_processor.Apply(example_stream_processor => example_stream_processor.Apply(getStreamProcessorResult => getStreamProcessorResult.State)),
["streamProcessorsResults"] = example_stream_processors.Apply(example_stream_processors => example_stream_processors.Apply(getStreamProcessorsResult => getStreamProcessorsResult.Results)),
};
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.mongodbatlas.StreamInstance;
import com.pulumi.mongodbatlas.StreamInstanceArgs;
import com.pulumi.mongodbatlas.inputs.StreamInstanceDataProcessRegionArgs;
import com.pulumi.mongodbatlas.StreamConnection;
import com.pulumi.mongodbatlas.StreamConnectionArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionDbRoleToExecuteArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionAuthenticationArgs;
import com.pulumi.mongodbatlas.inputs.StreamConnectionSecurityArgs;
import com.pulumi.mongodbatlas.StreamProcessor;
import com.pulumi.mongodbatlas.StreamProcessorArgs;
import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsArgs;
import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsDlqArgs;
import com.pulumi.mongodbatlas.MongodbatlasFunctions;
import com.pulumi.mongodbatlas.inputs.GetStreamProcessorsArgs;
import com.pulumi.mongodbatlas.inputs.GetStreamProcessorArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new StreamInstance("example", StreamInstanceArgs.builder()
.projectId(projectId)
.instanceName("InstanceName")
.dataProcessRegion(StreamInstanceDataProcessRegionArgs.builder()
.region("VIRGINIA_USA")
.cloudProvider("AWS")
.build())
.build());
var example_sample = new StreamConnection("example-sample", StreamConnectionArgs.builder()
.projectId(projectId)
.instanceName(example.instanceName())
.connectionName("sample_stream_solar")
.type("Sample")
.build());
var example_cluster = new StreamConnection("example-cluster", StreamConnectionArgs.builder()
.projectId(projectId)
.instanceName(example.instanceName())
.connectionName("ClusterConnection")
.type("Cluster")
.clusterName(clusterName)
.dbRoleToExecute(StreamConnectionDbRoleToExecuteArgs.builder()
.role("atlasAdmin")
.type("BUILT_IN")
.build())
.build());
var example_kafka = new StreamConnection("example-kafka", StreamConnectionArgs.builder()
.projectId(projectId)
.instanceName(example.instanceName())
.connectionName("KafkaPlaintextConnection")
.type("Kafka")
.authentication(StreamConnectionAuthenticationArgs.builder()
.mechanism("PLAIN")
.username(kafkaUsername)
.password(kafkaPassword)
.build())
.bootstrapServers("localhost:9092,localhost:9092")
.config(Map.of("auto.offset.reset", "earliest"))
.security(StreamConnectionSecurityArgs.builder()
.protocol("SASL_PLAINTEXT")
.build())
.build());
var stream_processor_sample_example = new StreamProcessor("stream-processor-sample-example", StreamProcessorArgs.builder()
.projectId(projectId)
.instanceName(example.instanceName())
.processorName("sampleProcessorName")
.pipeline(serializeJson(
jsonArray(
jsonObject(
jsonProperty("$source", jsonObject(
jsonProperty("connectionName", mongodbatlasStreamConnection.example-sample().connectionName())
))
),
jsonObject(
jsonProperty("$emit", jsonObject(
jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
jsonProperty("db", "sample"),
jsonProperty("coll", "solar"),
jsonProperty("timeseries", jsonObject(
jsonProperty("timeField", "_ts")
))
))
)
)))
.state("STARTED")
.build());
var stream_processor_cluster_to_kafka_example = new StreamProcessor("stream-processor-cluster-to-kafka-example", StreamProcessorArgs.builder()
.projectId(projectId)
.instanceName(example.instanceName())
.processorName("clusterProcessorName")
.pipeline(serializeJson(
jsonArray(
jsonObject(
jsonProperty("$source", jsonObject(
jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName())
))
),
jsonObject(
jsonProperty("$emit", jsonObject(
jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
jsonProperty("topic", "topic_from_cluster")
))
)
)))
.state("CREATED")
.build());
var stream_processor_kafka_to_cluster_example = new StreamProcessor("stream-processor-kafka-to-cluster-example", StreamProcessorArgs.builder()
.projectId(projectId)
.instanceName(example.instanceName())
.processorName("kafkaProcessorName")
.pipeline(serializeJson(
jsonArray(
jsonObject(
jsonProperty("$source", jsonObject(
jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
jsonProperty("topic", "topic_source")
))
),
jsonObject(
jsonProperty("$emit", jsonObject(
jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
jsonProperty("db", "kafka"),
jsonProperty("coll", "topic_source"),
jsonProperty("timeseries", jsonObject(
jsonProperty("timeField", "ts")
))
))
)
)))
.state("CREATED")
.options(StreamProcessorOptionsArgs.builder()
.dlq(StreamProcessorOptionsDlqArgs.builder()
.coll("exampleColumn")
.connectionName(mongodbatlasStreamConnection.example-cluster().connectionName())
.db("exampleDb")
.build())
.build())
.build());
final var example-stream-processors = example.instanceName().applyValue(_instanceName -> MongodbatlasFunctions.getStreamProcessors(GetStreamProcessorsArgs.builder()
.projectId(projectId)
.instanceName(_instanceName)
.build()));
final var example-stream-processor = Output.tuple(example.instanceName(), stream_processor_sample_example.processorName()).applyValue(values -> {
var instanceName = values.t1;
var processorName = values.t2;
return MongodbatlasFunctions.getStreamProcessor(GetStreamProcessorArgs.builder()
.projectId(projectId)
.instanceName(instanceName)
.processorName(processorName)
.build());
});
ctx.export("streamProcessorsState", example_stream_processor.applyValue(_example_stream_processor -> _example_stream_processor.state()));
ctx.export("streamProcessorsResults", example_stream_processors.applyValue(_example_stream_processors -> _example_stream_processors.results()));
}
}
resources:
example:
type: mongodbatlas:StreamInstance
properties:
projectId: ${projectId}
instanceName: InstanceName
dataProcessRegion:
region: VIRGINIA_USA
cloudProvider: AWS
example-sample:
type: mongodbatlas:StreamConnection
properties:
projectId: ${projectId}
instanceName: ${example.instanceName}
connectionName: sample_stream_solar
type: Sample
example-cluster:
type: mongodbatlas:StreamConnection
properties:
projectId: ${projectId}
instanceName: ${example.instanceName}
connectionName: ClusterConnection
type: Cluster
clusterName: ${clusterName}
dbRoleToExecute:
role: atlasAdmin
type: BUILT_IN
example-kafka:
type: mongodbatlas:StreamConnection
properties:
projectId: ${projectId}
instanceName: ${example.instanceName}
connectionName: KafkaPlaintextConnection
type: Kafka
authentication:
mechanism: PLAIN
username: ${kafkaUsername}
password: ${kafkaPassword}
bootstrapServers: localhost:9092,localhost:9092
config:
auto.offset.reset: earliest
security:
protocol: SASL_PLAINTEXT
stream-processor-sample-example:
type: mongodbatlas:StreamProcessor
properties:
projectId: ${projectId}
instanceName: ${example.instanceName}
processorName: sampleProcessorName
pipeline:
fn::toJSON:
- $source:
connectionName: ${mongodbatlasStreamConnection"example-sample"[%!s(MISSING)].connectionName}
- $emit:
connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
db: sample
coll: solar
timeseries:
timeField: _ts
state: STARTED
stream-processor-cluster-to-kafka-example:
type: mongodbatlas:StreamProcessor
properties:
projectId: ${projectId}
instanceName: ${example.instanceName}
processorName: clusterProcessorName
pipeline:
fn::toJSON:
- $source:
connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
- $emit:
connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
topic: topic_from_cluster
state: CREATED
stream-processor-kafka-to-cluster-example:
type: mongodbatlas:StreamProcessor
properties:
projectId: ${projectId}
instanceName: ${example.instanceName}
processorName: kafkaProcessorName
pipeline:
fn::toJSON:
- $source:
connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
topic: topic_source
- $emit:
connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
db: kafka
coll: topic_source
timeseries:
timeField: ts
state: CREATED
options:
dlq:
coll: exampleColumn
connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
db: exampleDb
variables:
example-stream-processors:
fn::invoke:
function: mongodbatlas:getStreamProcessors
arguments:
projectId: ${projectId}
instanceName: ${example.instanceName}
example-stream-processor:
fn::invoke:
function: mongodbatlas:getStreamProcessor
arguments:
projectId: ${projectId}
instanceName: ${example.instanceName}
processorName: ${["stream-processor-sample-example"].processorName}
outputs:
# example making use of data sources
streamProcessorsState: ${["example-stream-processor"].state}
streamProcessorsResults: ${["example-stream-processors"].results}
Using getStreamProcessor
Two invocation forms are available. The direct form accepts plain arguments and either blocks until the result value is available, or returns a Promise-wrapped result. The output form accepts Input-wrapped arguments and returns an Output-wrapped result.
function getStreamProcessor(args: GetStreamProcessorArgs, opts?: InvokeOptions): Promise<GetStreamProcessorResult>
function getStreamProcessorOutput(args: GetStreamProcessorOutputArgs, opts?: InvokeOptions): Output<GetStreamProcessorResult>def get_stream_processor(instance_name: Optional[str] = None,
processor_name: Optional[str] = None,
project_id: Optional[str] = None,
opts: Optional[InvokeOptions] = None) -> GetStreamProcessorResult
def get_stream_processor_output(instance_name: Optional[pulumi.Input[str]] = None,
processor_name: Optional[pulumi.Input[str]] = None,
project_id: Optional[pulumi.Input[str]] = None,
opts: Optional[InvokeOptions] = None) -> Output[GetStreamProcessorResult]func LookupStreamProcessor(ctx *Context, args *LookupStreamProcessorArgs, opts ...InvokeOption) (*LookupStreamProcessorResult, error)
func LookupStreamProcessorOutput(ctx *Context, args *LookupStreamProcessorOutputArgs, opts ...InvokeOption) LookupStreamProcessorResultOutput> Note: This function is named LookupStreamProcessor in the Go SDK.
public static class GetStreamProcessor
{
public static Task<GetStreamProcessorResult> InvokeAsync(GetStreamProcessorArgs args, InvokeOptions? opts = null)
public static Output<GetStreamProcessorResult> Invoke(GetStreamProcessorInvokeArgs args, InvokeOptions? opts = null)
}public static CompletableFuture<GetStreamProcessorResult> getStreamProcessor(GetStreamProcessorArgs args, InvokeOptions options)
public static Output<GetStreamProcessorResult> getStreamProcessor(GetStreamProcessorArgs args, InvokeOptions options)
fn::invoke:
function: mongodbatlas:index/getStreamProcessor:getStreamProcessor
arguments:
# arguments dictionaryThe following arguments are supported:
- Instance
Name string - Human-readable label that identifies the stream instance.
- Processor
Name string - Human-readable label that identifies the stream processor.
- Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- Instance
Name string - Human-readable label that identifies the stream instance.
- Processor
Name string - Human-readable label that identifies the stream processor.
- Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instance
Name String - Human-readable label that identifies the stream instance.
- processor
Name String - Human-readable label that identifies the stream processor.
- project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instance
Name string - Human-readable label that identifies the stream instance.
- processor
Name string - Human-readable label that identifies the stream processor.
- project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instance_
name str - Human-readable label that identifies the stream instance.
- processor_
name str - Human-readable label that identifies the stream processor.
- project_
id str - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- instance
Name String - Human-readable label that identifies the stream instance.
- processor
Name String - Human-readable label that identifies the stream processor.
- project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
getStreamProcessor Result
The following output properties are available:
- Id string
- Instance
Name string - Human-readable label that identifies the stream instance.
- Options
Get
Stream Processor Options - Pipeline string
- Processor
Name string - Human-readable label that identifies the stream processor.
- Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- State string
- Stats string
- Id string
- Instance
Name string - Human-readable label that identifies the stream instance.
- Options
Get
Stream Processor Options - Pipeline string
- Processor
Name string - Human-readable label that identifies the stream processor.
- Project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- State string
- Stats string
- id String
- instance
Name String - Human-readable label that identifies the stream instance.
- options
Get
Stream Processor Options - pipeline String
- processor
Name String - Human-readable label that identifies the stream processor.
- project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state String
- stats String
- id string
- instance
Name string - Human-readable label that identifies the stream instance.
- options
Get
Stream Processor Options - pipeline string
- processor
Name string - Human-readable label that identifies the stream processor.
- project
Id string - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state string
- stats string
- id str
- instance_
name str - Human-readable label that identifies the stream instance.
- options
Get
Stream Processor Options - pipeline str
- processor_
name str - Human-readable label that identifies the stream processor.
- project_
id str - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state str
- stats str
- id String
- instance
Name String - Human-readable label that identifies the stream instance.
- options Property Map
- pipeline String
- processor
Name String - Human-readable label that identifies the stream processor.
- project
Id String - Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
- state String
- stats String
Supporting Types
GetStreamProcessorOptions
- Dlq
Get
Stream Processor Options Dlq - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- Dlq
Get
Stream Processor Options Dlq - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq
Get
Stream Processor Options Dlq - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq
Get
Stream Processor Options Dlq - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq
Get
Stream Processor Options Dlq - Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
- dlq Property Map
- Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
GetStreamProcessorOptionsDlq
- Coll string
- Name of the collection to use for the DLQ.
- Connection
Name string - Name of the connection to write DLQ messages to. Must be an Atlas connection.
- Db string
- Name of the database to use for the DLQ.
- Coll string
- Name of the collection to use for the DLQ.
- Connection
Name string - Name of the connection to write DLQ messages to. Must be an Atlas connection.
- Db string
- Name of the database to use for the DLQ.
- coll String
- Name of the collection to use for the DLQ.
- connection
Name String - Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db String
- Name of the database to use for the DLQ.
- coll string
- Name of the collection to use for the DLQ.
- connection
Name string - Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db string
- Name of the database to use for the DLQ.
- coll str
- Name of the collection to use for the DLQ.
- connection_
name str - Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db str
- Name of the database to use for the DLQ.
- coll String
- Name of the collection to use for the DLQ.
- connection
Name String - Name of the connection to write DLQ messages to. Must be an Atlas connection.
- db String
- Name of the database to use for the DLQ.
Package Details
- Repository
- MongoDB Atlas pulumi/pulumi-mongodbatlas
- License
- Apache-2.0
- Notes
- This Pulumi package is based on the
mongodbatlasTerraform Provider.
MongoDB Atlas v3.38.0 published on Thursday, Dec 4, 2025 by Pulumi
