1. Packages
  2. Mongodbatlas Provider
  3. API Docs
  4. StreamProcessor
MongoDB Atlas v4.0.0 published on Tuesday, Dec 30, 2025 by Pulumi
mongodbatlas logo
MongoDB Atlas v4.0.0 published on Tuesday, Dec 30, 2025 by Pulumi

    mongodbatlas.StreamProcessor provides a Stream Processor resource. The resource lets you create, delete, import, start and stop a stream processor in a stream instance.

    NOTE: When updating an Atlas Stream Processor, the following behavior applies:

    1. If the processor is in a STARTED state, it will automatically be stopped before the update is applied
    2. The update will be performed while the processor is in STOPPED state
    3. If the processor was originally in STARTED state, it will be restarted after the update

    Example Usage

    S

    import * as pulumi from "@pulumi/pulumi";
    import * as mongodbatlas from "@pulumi/mongodbatlas";
    
    const example = new mongodbatlas.StreamInstance("example", {
        projectId: projectId,
        instanceName: "InstanceName",
        dataProcessRegion: {
            region: "VIRGINIA_USA",
            cloudProvider: "AWS",
        },
    });
    const example_sample = new mongodbatlas.StreamConnection("example-sample", {
        projectId: projectId,
        workspaceName: example.instanceName,
        connectionName: "sample_stream_solar",
        type: "Sample",
    });
    const example_cluster = new mongodbatlas.StreamConnection("example-cluster", {
        projectId: projectId,
        workspaceName: example.instanceName,
        connectionName: "ClusterConnection",
        type: "Cluster",
        clusterName: clusterName,
        dbRoleToExecute: {
            role: "atlasAdmin",
            type: "BUILT_IN",
        },
    });
    const example_kafka = new mongodbatlas.StreamConnection("example-kafka", {
        projectId: projectId,
        workspaceName: example.instanceName,
        connectionName: "KafkaPlaintextConnection",
        type: "Kafka",
        authentication: {
            mechanism: "PLAIN",
            username: kafkaUsername,
            password: kafkaPassword,
        },
        bootstrapServers: "localhost:9092,localhost:9092",
        config: {
            "auto.offset.reset": "earliest",
        },
        security: {
            protocol: "SASL_PLAINTEXT",
        },
    });
    const stream_processor_sample_example = new mongodbatlas.StreamProcessor("stream-processor-sample-example", {
        projectId: projectId,
        workspaceName: example.instanceName,
        processorName: "sampleProcessorName",
        pipeline: JSON.stringify([
            {
                $source: {
                    connectionName: mongodbatlasStreamConnection["example-sample"].connectionName,
                },
            },
            {
                $emit: {
                    connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
                    db: "sample",
                    coll: "solar",
                    timeseries: {
                        timeField: "_ts",
                    },
                },
            },
        ]),
        state: "STARTED",
    });
    const stream_processor_cluster_to_kafka_example = new mongodbatlas.StreamProcessor("stream-processor-cluster-to-kafka-example", {
        projectId: projectId,
        workspaceName: example.instanceName,
        processorName: "clusterProcessorName",
        pipeline: JSON.stringify([
            {
                $source: {
                    connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
                },
            },
            {
                $emit: {
                    connectionName: mongodbatlasStreamConnection["example-kafka"].connectionName,
                    topic: "topic_from_cluster",
                },
            },
        ]),
        state: "CREATED",
    });
    const stream_processor_kafka_to_cluster_example = new mongodbatlas.StreamProcessor("stream-processor-kafka-to-cluster-example", {
        projectId: projectId,
        workspaceName: example.instanceName,
        processorName: "kafkaProcessorName",
        pipeline: JSON.stringify([
            {
                $source: {
                    connectionName: mongodbatlasStreamConnection["example-kafka"].connectionName,
                    topic: "topic_source",
                },
            },
            {
                $emit: {
                    connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
                    db: "kafka",
                    coll: "topic_source",
                    timeseries: {
                        timeField: "ts",
                    },
                },
            },
        ]),
        state: "CREATED",
        options: {
            dlq: {
                coll: "exampleColumn",
                connectionName: mongodbatlasStreamConnection["example-cluster"].connectionName,
                db: "exampleDb",
            },
        },
    });
    const example_stream_processors = example.instanceName.apply(instanceName => mongodbatlas.getStreamProcessorsOutput({
        projectId: projectId,
        workspaceName: instanceName,
    }));
    const example_stream_processor = pulumi.all([example.instanceName, stream_processor_sample_example.processorName]).apply(([instanceName, processorName]) => mongodbatlas.getStreamProcessorOutput({
        projectId: projectId,
        workspaceName: instanceName,
        processorName: processorName,
    }));
    export const streamProcessorsState = example_stream_processor.apply(example_stream_processor => example_stream_processor.state);
    export const streamProcessorsResults = example_stream_processors.apply(example_stream_processors => example_stream_processors.results);
    
    import pulumi
    import json
    import pulumi_mongodbatlas as mongodbatlas
    
    example = mongodbatlas.StreamInstance("example",
        project_id=project_id,
        instance_name="InstanceName",
        data_process_region={
            "region": "VIRGINIA_USA",
            "cloud_provider": "AWS",
        })
    example_sample = mongodbatlas.StreamConnection("example-sample",
        project_id=project_id,
        workspace_name=example.instance_name,
        connection_name="sample_stream_solar",
        type="Sample")
    example_cluster = mongodbatlas.StreamConnection("example-cluster",
        project_id=project_id,
        workspace_name=example.instance_name,
        connection_name="ClusterConnection",
        type="Cluster",
        cluster_name=cluster_name,
        db_role_to_execute={
            "role": "atlasAdmin",
            "type": "BUILT_IN",
        })
    example_kafka = mongodbatlas.StreamConnection("example-kafka",
        project_id=project_id,
        workspace_name=example.instance_name,
        connection_name="KafkaPlaintextConnection",
        type="Kafka",
        authentication={
            "mechanism": "PLAIN",
            "username": kafka_username,
            "password": kafka_password,
        },
        bootstrap_servers="localhost:9092,localhost:9092",
        config={
            "auto.offset.reset": "earliest",
        },
        security={
            "protocol": "SASL_PLAINTEXT",
        })
    stream_processor_sample_example = mongodbatlas.StreamProcessor("stream-processor-sample-example",
        project_id=project_id,
        workspace_name=example.instance_name,
        processor_name="sampleProcessorName",
        pipeline=json.dumps([
            {
                "$source": {
                    "connectionName": mongodbatlas_stream_connection["example-sample"]["connectionName"],
                },
            },
            {
                "$emit": {
                    "connectionName": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
                    "db": "sample",
                    "coll": "solar",
                    "timeseries": {
                        "timeField": "_ts",
                    },
                },
            },
        ]),
        state="STARTED")
    stream_processor_cluster_to_kafka_example = mongodbatlas.StreamProcessor("stream-processor-cluster-to-kafka-example",
        project_id=project_id,
        workspace_name=example.instance_name,
        processor_name="clusterProcessorName",
        pipeline=json.dumps([
            {
                "$source": {
                    "connectionName": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
                },
            },
            {
                "$emit": {
                    "connectionName": mongodbatlas_stream_connection["example-kafka"]["connectionName"],
                    "topic": "topic_from_cluster",
                },
            },
        ]),
        state="CREATED")
    stream_processor_kafka_to_cluster_example = mongodbatlas.StreamProcessor("stream-processor-kafka-to-cluster-example",
        project_id=project_id,
        workspace_name=example.instance_name,
        processor_name="kafkaProcessorName",
        pipeline=json.dumps([
            {
                "$source": {
                    "connectionName": mongodbatlas_stream_connection["example-kafka"]["connectionName"],
                    "topic": "topic_source",
                },
            },
            {
                "$emit": {
                    "connectionName": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
                    "db": "kafka",
                    "coll": "topic_source",
                    "timeseries": {
                        "timeField": "ts",
                    },
                },
            },
        ]),
        state="CREATED",
        options={
            "dlq": {
                "coll": "exampleColumn",
                "connection_name": mongodbatlas_stream_connection["example-cluster"]["connectionName"],
                "db": "exampleDb",
            },
        })
    example_stream_processors = example.instance_name.apply(lambda instance_name: mongodbatlas.get_stream_processors_output(project_id=project_id,
        workspace_name=instance_name))
    example_stream_processor = pulumi.Output.all(
        instance_name=example.instance_name,
        processor_name=stream_processor_sample_example.processor_name
    ).apply(lambda resolved_outputs: mongodbatlas.get_stream_processor_output(project_id=project_id,
        workspace_name=resolved_outputs['instance_name'],
        processor_name=resolved_outputs['processor_name']))
    
    pulumi.export("streamProcessorsState", example_stream_processor.state)
    pulumi.export("streamProcessorsResults", example_stream_processors.results)
    
    package main
    
    import (
    	"encoding/json"
    
    	"github.com/pulumi/pulumi-mongodbatlas/sdk/v4/go/mongodbatlas"
    	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
    )
    
    func main() {
    	pulumi.Run(func(ctx *pulumi.Context) error {
    		example, err := mongodbatlas.NewStreamInstance(ctx, "example", &mongodbatlas.StreamInstanceArgs{
    			ProjectId:    pulumi.Any(projectId),
    			InstanceName: pulumi.String("InstanceName"),
    			DataProcessRegion: &mongodbatlas.StreamInstanceDataProcessRegionArgs{
    				Region:        pulumi.String("VIRGINIA_USA"),
    				CloudProvider: pulumi.String("AWS"),
    			},
    		})
    		if err != nil {
    			return err
    		}
    		_, err = mongodbatlas.NewStreamConnection(ctx, "example-sample", &mongodbatlas.StreamConnectionArgs{
    			ProjectId:      pulumi.Any(projectId),
    			WorkspaceName:  example.InstanceName,
    			ConnectionName: pulumi.String("sample_stream_solar"),
    			Type:           pulumi.String("Sample"),
    		})
    		if err != nil {
    			return err
    		}
    		_, err = mongodbatlas.NewStreamConnection(ctx, "example-cluster", &mongodbatlas.StreamConnectionArgs{
    			ProjectId:      pulumi.Any(projectId),
    			WorkspaceName:  example.InstanceName,
    			ConnectionName: pulumi.String("ClusterConnection"),
    			Type:           pulumi.String("Cluster"),
    			ClusterName:    pulumi.Any(clusterName),
    			DbRoleToExecute: &mongodbatlas.StreamConnectionDbRoleToExecuteArgs{
    				Role: pulumi.String("atlasAdmin"),
    				Type: pulumi.String("BUILT_IN"),
    			},
    		})
    		if err != nil {
    			return err
    		}
    		_, err = mongodbatlas.NewStreamConnection(ctx, "example-kafka", &mongodbatlas.StreamConnectionArgs{
    			ProjectId:      pulumi.Any(projectId),
    			WorkspaceName:  example.InstanceName,
    			ConnectionName: pulumi.String("KafkaPlaintextConnection"),
    			Type:           pulumi.String("Kafka"),
    			Authentication: &mongodbatlas.StreamConnectionAuthenticationArgs{
    				Mechanism: pulumi.String("PLAIN"),
    				Username:  pulumi.Any(kafkaUsername),
    				Password:  pulumi.Any(kafkaPassword),
    			},
    			BootstrapServers: pulumi.String("localhost:9092,localhost:9092"),
    			Config: pulumi.StringMap{
    				"auto.offset.reset": pulumi.String("earliest"),
    			},
    			Security: &mongodbatlas.StreamConnectionSecurityArgs{
    				Protocol: pulumi.String("SASL_PLAINTEXT"),
    			},
    		})
    		if err != nil {
    			return err
    		}
    		tmpJSON0, err := json.Marshal([]interface{}{
    			map[string]interface{}{
    				"$source": map[string]interface{}{
    					"connectionName": mongodbatlasStreamConnection.ExampleSample.ConnectionName,
    				},
    			},
    			map[string]interface{}{
    				"$emit": map[string]interface{}{
    					"connectionName": mongodbatlasStreamConnection.ExampleCluster.ConnectionName,
    					"db":             "sample",
    					"coll":           "solar",
    					"timeseries": map[string]interface{}{
    						"timeField": "_ts",
    					},
    				},
    			},
    		})
    		if err != nil {
    			return err
    		}
    		json0 := string(tmpJSON0)
    		stream_processor_sample_example, err := mongodbatlas.NewStreamProcessor(ctx, "stream-processor-sample-example", &mongodbatlas.StreamProcessorArgs{
    			ProjectId:     pulumi.Any(projectId),
    			WorkspaceName: example.InstanceName,
    			ProcessorName: pulumi.String("sampleProcessorName"),
    			Pipeline:      pulumi.String(json0),
    			State:         pulumi.String("STARTED"),
    		})
    		if err != nil {
    			return err
    		}
    		tmpJSON1, err := json.Marshal([]interface{}{
    			map[string]interface{}{
    				"$source": map[string]interface{}{
    					"connectionName": mongodbatlasStreamConnection.ExampleCluster.ConnectionName,
    				},
    			},
    			map[string]interface{}{
    				"$emit": map[string]interface{}{
    					"connectionName": mongodbatlasStreamConnection.ExampleKafka.ConnectionName,
    					"topic":          "topic_from_cluster",
    				},
    			},
    		})
    		if err != nil {
    			return err
    		}
    		json1 := string(tmpJSON1)
    		_, err = mongodbatlas.NewStreamProcessor(ctx, "stream-processor-cluster-to-kafka-example", &mongodbatlas.StreamProcessorArgs{
    			ProjectId:     pulumi.Any(projectId),
    			WorkspaceName: example.InstanceName,
    			ProcessorName: pulumi.String("clusterProcessorName"),
    			Pipeline:      pulumi.String(json1),
    			State:         pulumi.String("CREATED"),
    		})
    		if err != nil {
    			return err
    		}
    		tmpJSON2, err := json.Marshal([]interface{}{
    			map[string]interface{}{
    				"$source": map[string]interface{}{
    					"connectionName": mongodbatlasStreamConnection.ExampleKafka.ConnectionName,
    					"topic":          "topic_source",
    				},
    			},
    			map[string]interface{}{
    				"$emit": map[string]interface{}{
    					"connectionName": mongodbatlasStreamConnection.ExampleCluster.ConnectionName,
    					"db":             "kafka",
    					"coll":           "topic_source",
    					"timeseries": map[string]interface{}{
    						"timeField": "ts",
    					},
    				},
    			},
    		})
    		if err != nil {
    			return err
    		}
    		json2 := string(tmpJSON2)
    		_, err = mongodbatlas.NewStreamProcessor(ctx, "stream-processor-kafka-to-cluster-example", &mongodbatlas.StreamProcessorArgs{
    			ProjectId:     pulumi.Any(projectId),
    			WorkspaceName: example.InstanceName,
    			ProcessorName: pulumi.String("kafkaProcessorName"),
    			Pipeline:      pulumi.String(json2),
    			State:         pulumi.String("CREATED"),
    			Options: &mongodbatlas.StreamProcessorOptionsArgs{
    				Dlq: &mongodbatlas.StreamProcessorOptionsDlqArgs{
    					Coll:           pulumi.String("exampleColumn"),
    					ConnectionName: pulumi.Any(mongodbatlasStreamConnection.ExampleCluster.ConnectionName),
    					Db:             pulumi.String("exampleDb"),
    				},
    			},
    		})
    		if err != nil {
    			return err
    		}
    		example_stream_processors := example.InstanceName.ApplyT(func(instanceName string) (mongodbatlas.GetStreamProcessorsResult, error) {
    			return mongodbatlas.GetStreamProcessorsResult(interface{}(mongodbatlas.LookupStreamProcessors(ctx, &mongodbatlas.LookupStreamProcessorsArgs{
    				ProjectId:     projectId,
    				WorkspaceName: pulumi.StringRef(pulumi.StringRef(instanceName)),
    			}, nil))), nil
    		}).(mongodbatlas.GetStreamProcessorsResultOutput)
    		example_stream_processor := pulumi.All(example.InstanceName, stream_processor_sample_example.ProcessorName).ApplyT(func(_args []interface{}) (mongodbatlas.GetStreamProcessorResult, error) {
    			instanceName := _args[0].(string)
    			processorName := _args[1].(string)
    			return mongodbatlas.GetStreamProcessorResult(interface{}(mongodbatlas.LookupStreamProcessor(ctx, &mongodbatlas.LookupStreamProcessorArgs{
    				ProjectId:     projectId,
    				WorkspaceName: pulumi.StringRef(pulumi.StringRef(instanceName)),
    				ProcessorName: processorName,
    			}, nil))), nil
    		}).(mongodbatlas.GetStreamProcessorResultOutput)
    		ctx.Export("streamProcessorsState", example_stream_processor.ApplyT(func(example_stream_processor mongodbatlas.GetStreamProcessorResult) (*string, error) {
    			return &example_stream_processor.State, nil
    		}).(pulumi.StringPtrOutput))
    		ctx.Export("streamProcessorsResults", example_stream_processors.ApplyT(func(example_stream_processors mongodbatlas.GetStreamProcessorsResult) ([]mongodbatlas.GetStreamProcessorsResult, error) {
    			return []mongodbatlas.GetStreamProcessorsResult(example_stream_processors.Results), nil
    		}).([]mongodbatlas.GetStreamProcessorsResultOutput))
    		return nil
    	})
    }
    
    using System.Collections.Generic;
    using System.Linq;
    using System.Text.Json;
    using Pulumi;
    using Mongodbatlas = Pulumi.Mongodbatlas;
    
    return await Deployment.RunAsync(() => 
    {
        var example = new Mongodbatlas.StreamInstance("example", new()
        {
            ProjectId = projectId,
            InstanceName = "InstanceName",
            DataProcessRegion = new Mongodbatlas.Inputs.StreamInstanceDataProcessRegionArgs
            {
                Region = "VIRGINIA_USA",
                CloudProvider = "AWS",
            },
        });
    
        var example_sample = new Mongodbatlas.StreamConnection("example-sample", new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
            ConnectionName = "sample_stream_solar",
            Type = "Sample",
        });
    
        var example_cluster = new Mongodbatlas.StreamConnection("example-cluster", new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
            ConnectionName = "ClusterConnection",
            Type = "Cluster",
            ClusterName = clusterName,
            DbRoleToExecute = new Mongodbatlas.Inputs.StreamConnectionDbRoleToExecuteArgs
            {
                Role = "atlasAdmin",
                Type = "BUILT_IN",
            },
        });
    
        var example_kafka = new Mongodbatlas.StreamConnection("example-kafka", new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
            ConnectionName = "KafkaPlaintextConnection",
            Type = "Kafka",
            Authentication = new Mongodbatlas.Inputs.StreamConnectionAuthenticationArgs
            {
                Mechanism = "PLAIN",
                Username = kafkaUsername,
                Password = kafkaPassword,
            },
            BootstrapServers = "localhost:9092,localhost:9092",
            Config = 
            {
                { "auto.offset.reset", "earliest" },
            },
            Security = new Mongodbatlas.Inputs.StreamConnectionSecurityArgs
            {
                Protocol = "SASL_PLAINTEXT",
            },
        });
    
        var stream_processor_sample_example = new Mongodbatlas.StreamProcessor("stream-processor-sample-example", new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
            ProcessorName = "sampleProcessorName",
            Pipeline = JsonSerializer.Serialize(new[]
            {
                new Dictionary<string, object?>
                {
                    ["$source"] = new Dictionary<string, object?>
                    {
                        ["connectionName"] = mongodbatlasStreamConnection.Example_sample.ConnectionName,
                    },
                },
                new Dictionary<string, object?>
                {
                    ["$emit"] = new Dictionary<string, object?>
                    {
                        ["connectionName"] = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
                        ["db"] = "sample",
                        ["coll"] = "solar",
                        ["timeseries"] = new Dictionary<string, object?>
                        {
                            ["timeField"] = "_ts",
                        },
                    },
                },
            }),
            State = "STARTED",
        });
    
        var stream_processor_cluster_to_kafka_example = new Mongodbatlas.StreamProcessor("stream-processor-cluster-to-kafka-example", new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
            ProcessorName = "clusterProcessorName",
            Pipeline = JsonSerializer.Serialize(new[]
            {
                new Dictionary<string, object?>
                {
                    ["$source"] = new Dictionary<string, object?>
                    {
                        ["connectionName"] = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
                    },
                },
                new Dictionary<string, object?>
                {
                    ["$emit"] = new Dictionary<string, object?>
                    {
                        ["connectionName"] = mongodbatlasStreamConnection.Example_kafka.ConnectionName,
                        ["topic"] = "topic_from_cluster",
                    },
                },
            }),
            State = "CREATED",
        });
    
        var stream_processor_kafka_to_cluster_example = new Mongodbatlas.StreamProcessor("stream-processor-kafka-to-cluster-example", new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
            ProcessorName = "kafkaProcessorName",
            Pipeline = JsonSerializer.Serialize(new[]
            {
                new Dictionary<string, object?>
                {
                    ["$source"] = new Dictionary<string, object?>
                    {
                        ["connectionName"] = mongodbatlasStreamConnection.Example_kafka.ConnectionName,
                        ["topic"] = "topic_source",
                    },
                },
                new Dictionary<string, object?>
                {
                    ["$emit"] = new Dictionary<string, object?>
                    {
                        ["connectionName"] = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
                        ["db"] = "kafka",
                        ["coll"] = "topic_source",
                        ["timeseries"] = new Dictionary<string, object?>
                        {
                            ["timeField"] = "ts",
                        },
                    },
                },
            }),
            State = "CREATED",
            Options = new Mongodbatlas.Inputs.StreamProcessorOptionsArgs
            {
                Dlq = new Mongodbatlas.Inputs.StreamProcessorOptionsDlqArgs
                {
                    Coll = "exampleColumn",
                    ConnectionName = mongodbatlasStreamConnection.Example_cluster.ConnectionName,
                    Db = "exampleDb",
                },
            },
        });
    
        var example_stream_processors = Mongodbatlas.GetStreamProcessors.Invoke(new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
        });
    
        var example_stream_processor = Mongodbatlas.GetStreamProcessor.Invoke(new()
        {
            ProjectId = projectId,
            WorkspaceName = example.InstanceName,
            ProcessorName = stream_processor_sample_example.ProcessorName,
        });
    
        return new Dictionary<string, object?>
        {
            ["streamProcessorsState"] = example_stream_processor.Apply(example_stream_processor => example_stream_processor.Apply(getStreamProcessorResult => getStreamProcessorResult.State)),
            ["streamProcessorsResults"] = example_stream_processors.Apply(example_stream_processors => example_stream_processors.Apply(getStreamProcessorsResult => getStreamProcessorsResult.Results)),
        };
    });
    
    package generated_program;
    
    import com.pulumi.Context;
    import com.pulumi.Pulumi;
    import com.pulumi.core.Output;
    import com.pulumi.mongodbatlas.StreamInstance;
    import com.pulumi.mongodbatlas.StreamInstanceArgs;
    import com.pulumi.mongodbatlas.inputs.StreamInstanceDataProcessRegionArgs;
    import com.pulumi.mongodbatlas.StreamConnection;
    import com.pulumi.mongodbatlas.StreamConnectionArgs;
    import com.pulumi.mongodbatlas.inputs.StreamConnectionDbRoleToExecuteArgs;
    import com.pulumi.mongodbatlas.inputs.StreamConnectionAuthenticationArgs;
    import com.pulumi.mongodbatlas.inputs.StreamConnectionSecurityArgs;
    import com.pulumi.mongodbatlas.StreamProcessor;
    import com.pulumi.mongodbatlas.StreamProcessorArgs;
    import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsArgs;
    import com.pulumi.mongodbatlas.inputs.StreamProcessorOptionsDlqArgs;
    import com.pulumi.mongodbatlas.MongodbatlasFunctions;
    import com.pulumi.mongodbatlas.inputs.GetStreamProcessorsArgs;
    import com.pulumi.mongodbatlas.inputs.GetStreamProcessorArgs;
    import static com.pulumi.codegen.internal.Serialization.*;
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Map;
    import java.io.File;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    
    public class App {
        public static void main(String[] args) {
            Pulumi.run(App::stack);
        }
    
        public static void stack(Context ctx) {
            var example = new StreamInstance("example", StreamInstanceArgs.builder()
                .projectId(projectId)
                .instanceName("InstanceName")
                .dataProcessRegion(StreamInstanceDataProcessRegionArgs.builder()
                    .region("VIRGINIA_USA")
                    .cloudProvider("AWS")
                    .build())
                .build());
    
            var example_sample = new StreamConnection("example-sample", StreamConnectionArgs.builder()
                .projectId(projectId)
                .workspaceName(example.instanceName())
                .connectionName("sample_stream_solar")
                .type("Sample")
                .build());
    
            var example_cluster = new StreamConnection("example-cluster", StreamConnectionArgs.builder()
                .projectId(projectId)
                .workspaceName(example.instanceName())
                .connectionName("ClusterConnection")
                .type("Cluster")
                .clusterName(clusterName)
                .dbRoleToExecute(StreamConnectionDbRoleToExecuteArgs.builder()
                    .role("atlasAdmin")
                    .type("BUILT_IN")
                    .build())
                .build());
    
            var example_kafka = new StreamConnection("example-kafka", StreamConnectionArgs.builder()
                .projectId(projectId)
                .workspaceName(example.instanceName())
                .connectionName("KafkaPlaintextConnection")
                .type("Kafka")
                .authentication(StreamConnectionAuthenticationArgs.builder()
                    .mechanism("PLAIN")
                    .username(kafkaUsername)
                    .password(kafkaPassword)
                    .build())
                .bootstrapServers("localhost:9092,localhost:9092")
                .config(Map.of("auto.offset.reset", "earliest"))
                .security(StreamConnectionSecurityArgs.builder()
                    .protocol("SASL_PLAINTEXT")
                    .build())
                .build());
    
            var stream_processor_sample_example = new StreamProcessor("stream-processor-sample-example", StreamProcessorArgs.builder()
                .projectId(projectId)
                .workspaceName(example.instanceName())
                .processorName("sampleProcessorName")
                .pipeline(serializeJson(
                    jsonArray(
                        jsonObject(
                            jsonProperty("$source", jsonObject(
                                jsonProperty("connectionName", mongodbatlasStreamConnection.example-sample().connectionName())
                            ))
                        ), 
                        jsonObject(
                            jsonProperty("$emit", jsonObject(
                                jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
                                jsonProperty("db", "sample"),
                                jsonProperty("coll", "solar"),
                                jsonProperty("timeseries", jsonObject(
                                    jsonProperty("timeField", "_ts")
                                ))
                            ))
                        )
                    )))
                .state("STARTED")
                .build());
    
            var stream_processor_cluster_to_kafka_example = new StreamProcessor("stream-processor-cluster-to-kafka-example", StreamProcessorArgs.builder()
                .projectId(projectId)
                .workspaceName(example.instanceName())
                .processorName("clusterProcessorName")
                .pipeline(serializeJson(
                    jsonArray(
                        jsonObject(
                            jsonProperty("$source", jsonObject(
                                jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName())
                            ))
                        ), 
                        jsonObject(
                            jsonProperty("$emit", jsonObject(
                                jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
                                jsonProperty("topic", "topic_from_cluster")
                            ))
                        )
                    )))
                .state("CREATED")
                .build());
    
            var stream_processor_kafka_to_cluster_example = new StreamProcessor("stream-processor-kafka-to-cluster-example", StreamProcessorArgs.builder()
                .projectId(projectId)
                .workspaceName(example.instanceName())
                .processorName("kafkaProcessorName")
                .pipeline(serializeJson(
                    jsonArray(
                        jsonObject(
                            jsonProperty("$source", jsonObject(
                                jsonProperty("connectionName", mongodbatlasStreamConnection.example-kafka().connectionName()),
                                jsonProperty("topic", "topic_source")
                            ))
                        ), 
                        jsonObject(
                            jsonProperty("$emit", jsonObject(
                                jsonProperty("connectionName", mongodbatlasStreamConnection.example-cluster().connectionName()),
                                jsonProperty("db", "kafka"),
                                jsonProperty("coll", "topic_source"),
                                jsonProperty("timeseries", jsonObject(
                                    jsonProperty("timeField", "ts")
                                ))
                            ))
                        )
                    )))
                .state("CREATED")
                .options(StreamProcessorOptionsArgs.builder()
                    .dlq(StreamProcessorOptionsDlqArgs.builder()
                        .coll("exampleColumn")
                        .connectionName(mongodbatlasStreamConnection.example-cluster().connectionName())
                        .db("exampleDb")
                        .build())
                    .build())
                .build());
    
            final var example-stream-processors = example.instanceName().applyValue(_instanceName -> MongodbatlasFunctions.getStreamProcessors(GetStreamProcessorsArgs.builder()
                .projectId(projectId)
                .workspaceName(_instanceName)
                .build()));
    
            final var example-stream-processor = Output.tuple(example.instanceName(), stream_processor_sample_example.processorName()).applyValue(values -> {
                var instanceName = values.t1;
                var processorName = values.t2;
                return MongodbatlasFunctions.getStreamProcessor(GetStreamProcessorArgs.builder()
                    .projectId(projectId)
                    .workspaceName(instanceName)
                    .processorName(processorName)
                    .build());
            });
    
            ctx.export("streamProcessorsState", example_stream_processor.applyValue(_example_stream_processor -> _example_stream_processor.state()));
            ctx.export("streamProcessorsResults", example_stream_processors.applyValue(_example_stream_processors -> _example_stream_processors.results()));
        }
    }
    
    resources:
      example:
        type: mongodbatlas:StreamInstance
        properties:
          projectId: ${projectId}
          instanceName: InstanceName
          dataProcessRegion:
            region: VIRGINIA_USA
            cloudProvider: AWS
      example-sample:
        type: mongodbatlas:StreamConnection
        properties:
          projectId: ${projectId}
          workspaceName: ${example.instanceName}
          connectionName: sample_stream_solar
          type: Sample
      example-cluster:
        type: mongodbatlas:StreamConnection
        properties:
          projectId: ${projectId}
          workspaceName: ${example.instanceName}
          connectionName: ClusterConnection
          type: Cluster
          clusterName: ${clusterName}
          dbRoleToExecute:
            role: atlasAdmin
            type: BUILT_IN
      example-kafka:
        type: mongodbatlas:StreamConnection
        properties:
          projectId: ${projectId}
          workspaceName: ${example.instanceName}
          connectionName: KafkaPlaintextConnection
          type: Kafka
          authentication:
            mechanism: PLAIN
            username: ${kafkaUsername}
            password: ${kafkaPassword}
          bootstrapServers: localhost:9092,localhost:9092
          config:
            auto.offset.reset: earliest
          security:
            protocol: SASL_PLAINTEXT
      stream-processor-sample-example:
        type: mongodbatlas:StreamProcessor
        properties:
          projectId: ${projectId}
          workspaceName: ${example.instanceName}
          processorName: sampleProcessorName
          pipeline:
            fn::toJSON:
              - $source:
                  connectionName: ${mongodbatlasStreamConnection"example-sample"[%!s(MISSING)].connectionName}
              - $emit:
                  connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
                  db: sample
                  coll: solar
                  timeseries:
                    timeField: _ts
          state: STARTED
      stream-processor-cluster-to-kafka-example:
        type: mongodbatlas:StreamProcessor
        properties:
          projectId: ${projectId}
          workspaceName: ${example.instanceName}
          processorName: clusterProcessorName
          pipeline:
            fn::toJSON:
              - $source:
                  connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
              - $emit:
                  connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
                  topic: topic_from_cluster
          state: CREATED
      stream-processor-kafka-to-cluster-example:
        type: mongodbatlas:StreamProcessor
        properties:
          projectId: ${projectId}
          workspaceName: ${example.instanceName}
          processorName: kafkaProcessorName
          pipeline:
            fn::toJSON:
              - $source:
                  connectionName: ${mongodbatlasStreamConnection"example-kafka"[%!s(MISSING)].connectionName}
                  topic: topic_source
              - $emit:
                  connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
                  db: kafka
                  coll: topic_source
                  timeseries:
                    timeField: ts
          state: CREATED
          options:
            dlq:
              coll: exampleColumn
              connectionName: ${mongodbatlasStreamConnection"example-cluster"[%!s(MISSING)].connectionName}
              db: exampleDb
    variables:
      example-stream-processors:
        fn::invoke:
          function: mongodbatlas:getStreamProcessors
          arguments:
            projectId: ${projectId}
            workspaceName: ${example.instanceName}
      example-stream-processor:
        fn::invoke:
          function: mongodbatlas:getStreamProcessor
          arguments:
            projectId: ${projectId}
            workspaceName: ${example.instanceName}
            processorName: ${["stream-processor-sample-example"].processorName}
    outputs:
      # example making use of data sources
      streamProcessorsState: ${["example-stream-processor"].state}
      streamProcessorsResults: ${["example-stream-processors"].results}
    

    Further Examples

    • Atlas Stream Processor

    Create StreamProcessor Resource

    Resources are created with functions called constructors. To learn more about declaring and configuring resources, see Resources.

    Constructor syntax

    new StreamProcessor(name: string, args: StreamProcessorArgs, opts?: CustomResourceOptions);
    @overload
    def StreamProcessor(resource_name: str,
                        args: StreamProcessorArgs,
                        opts: Optional[ResourceOptions] = None)
    
    @overload
    def StreamProcessor(resource_name: str,
                        opts: Optional[ResourceOptions] = None,
                        pipeline: Optional[str] = None,
                        processor_name: Optional[str] = None,
                        project_id: Optional[str] = None,
                        delete_on_create_timeout: Optional[bool] = None,
                        instance_name: Optional[str] = None,
                        options: Optional[StreamProcessorOptionsArgs] = None,
                        state: Optional[str] = None,
                        timeouts: Optional[StreamProcessorTimeoutsArgs] = None,
                        workspace_name: Optional[str] = None)
    func NewStreamProcessor(ctx *Context, name string, args StreamProcessorArgs, opts ...ResourceOption) (*StreamProcessor, error)
    public StreamProcessor(string name, StreamProcessorArgs args, CustomResourceOptions? opts = null)
    public StreamProcessor(String name, StreamProcessorArgs args)
    public StreamProcessor(String name, StreamProcessorArgs args, CustomResourceOptions options)
    
    type: mongodbatlas:StreamProcessor
    properties: # The arguments to resource properties.
    options: # Bag of options to control resource's behavior.
    
    

    Parameters

    name string
    The unique name of the resource.
    args StreamProcessorArgs
    The arguments to resource properties.
    opts CustomResourceOptions
    Bag of options to control resource's behavior.
    resource_name str
    The unique name of the resource.
    args StreamProcessorArgs
    The arguments to resource properties.
    opts ResourceOptions
    Bag of options to control resource's behavior.
    ctx Context
    Context object for the current deployment.
    name string
    The unique name of the resource.
    args StreamProcessorArgs
    The arguments to resource properties.
    opts ResourceOption
    Bag of options to control resource's behavior.
    name string
    The unique name of the resource.
    args StreamProcessorArgs
    The arguments to resource properties.
    opts CustomResourceOptions
    Bag of options to control resource's behavior.
    name String
    The unique name of the resource.
    args StreamProcessorArgs
    The arguments to resource properties.
    options CustomResourceOptions
    Bag of options to control resource's behavior.

    Constructor example

    The following reference example uses placeholder values for all input properties.

    var streamProcessorResource = new Mongodbatlas.StreamProcessor("streamProcessorResource", new()
    {
        Pipeline = "string",
        ProcessorName = "string",
        ProjectId = "string",
        DeleteOnCreateTimeout = false,
        Options = new Mongodbatlas.Inputs.StreamProcessorOptionsArgs
        {
            Dlq = new Mongodbatlas.Inputs.StreamProcessorOptionsDlqArgs
            {
                Coll = "string",
                ConnectionName = "string",
                Db = "string",
            },
        },
        State = "string",
        Timeouts = new Mongodbatlas.Inputs.StreamProcessorTimeoutsArgs
        {
            Create = "string",
        },
        WorkspaceName = "string",
    });
    
    example, err := mongodbatlas.NewStreamProcessor(ctx, "streamProcessorResource", &mongodbatlas.StreamProcessorArgs{
    	Pipeline:              pulumi.String("string"),
    	ProcessorName:         pulumi.String("string"),
    	ProjectId:             pulumi.String("string"),
    	DeleteOnCreateTimeout: pulumi.Bool(false),
    	Options: &mongodbatlas.StreamProcessorOptionsArgs{
    		Dlq: &mongodbatlas.StreamProcessorOptionsDlqArgs{
    			Coll:           pulumi.String("string"),
    			ConnectionName: pulumi.String("string"),
    			Db:             pulumi.String("string"),
    		},
    	},
    	State: pulumi.String("string"),
    	Timeouts: &mongodbatlas.StreamProcessorTimeoutsArgs{
    		Create: pulumi.String("string"),
    	},
    	WorkspaceName: pulumi.String("string"),
    })
    
    var streamProcessorResource = new StreamProcessor("streamProcessorResource", StreamProcessorArgs.builder()
        .pipeline("string")
        .processorName("string")
        .projectId("string")
        .deleteOnCreateTimeout(false)
        .options(StreamProcessorOptionsArgs.builder()
            .dlq(StreamProcessorOptionsDlqArgs.builder()
                .coll("string")
                .connectionName("string")
                .db("string")
                .build())
            .build())
        .state("string")
        .timeouts(StreamProcessorTimeoutsArgs.builder()
            .create("string")
            .build())
        .workspaceName("string")
        .build());
    
    stream_processor_resource = mongodbatlas.StreamProcessor("streamProcessorResource",
        pipeline="string",
        processor_name="string",
        project_id="string",
        delete_on_create_timeout=False,
        options={
            "dlq": {
                "coll": "string",
                "connection_name": "string",
                "db": "string",
            },
        },
        state="string",
        timeouts={
            "create": "string",
        },
        workspace_name="string")
    
    const streamProcessorResource = new mongodbatlas.StreamProcessor("streamProcessorResource", {
        pipeline: "string",
        processorName: "string",
        projectId: "string",
        deleteOnCreateTimeout: false,
        options: {
            dlq: {
                coll: "string",
                connectionName: "string",
                db: "string",
            },
        },
        state: "string",
        timeouts: {
            create: "string",
        },
        workspaceName: "string",
    });
    
    type: mongodbatlas:StreamProcessor
    properties:
        deleteOnCreateTimeout: false
        options:
            dlq:
                coll: string
                connectionName: string
                db: string
        pipeline: string
        processorName: string
        projectId: string
        state: string
        timeouts:
            create: string
        workspaceName: string
    

    StreamProcessor Resource Properties

    To learn more about resource properties and how to use them, see Inputs and Outputs in the Architecture and Concepts docs.

    Inputs

    In Python, inputs that are objects can be passed either as argument classes or as dictionary literals.

    The StreamProcessor resource accepts the following input properties:

    Pipeline string
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    ProcessorName string
    Label that identifies the stream processor.
    ProjectId string
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    DeleteOnCreateTimeout bool
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    InstanceName string
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    Options StreamProcessorOptions
    Optional configuration for the stream processor.
    State string

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    Timeouts StreamProcessorTimeouts
    WorkspaceName string
    Label that identifies the stream processing workspace.
    Pipeline string
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    ProcessorName string
    Label that identifies the stream processor.
    ProjectId string
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    DeleteOnCreateTimeout bool
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    InstanceName string
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    Options StreamProcessorOptionsArgs
    Optional configuration for the stream processor.
    State string

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    Timeouts StreamProcessorTimeoutsArgs
    WorkspaceName string
    Label that identifies the stream processing workspace.
    pipeline String
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processorName String
    Label that identifies the stream processor.
    projectId String
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    deleteOnCreateTimeout Boolean
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instanceName String
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options StreamProcessorOptions
    Optional configuration for the stream processor.
    state String

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    timeouts StreamProcessorTimeouts
    workspaceName String
    Label that identifies the stream processing workspace.
    pipeline string
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processorName string
    Label that identifies the stream processor.
    projectId string
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    deleteOnCreateTimeout boolean
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instanceName string
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options StreamProcessorOptions
    Optional configuration for the stream processor.
    state string

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    timeouts StreamProcessorTimeouts
    workspaceName string
    Label that identifies the stream processing workspace.
    pipeline str
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processor_name str
    Label that identifies the stream processor.
    project_id str
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    delete_on_create_timeout bool
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instance_name str
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options StreamProcessorOptionsArgs
    Optional configuration for the stream processor.
    state str

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    timeouts StreamProcessorTimeoutsArgs
    workspace_name str
    Label that identifies the stream processing workspace.
    pipeline String
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processorName String
    Label that identifies the stream processor.
    projectId String
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    deleteOnCreateTimeout Boolean
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instanceName String
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options Property Map
    Optional configuration for the stream processor.
    state String

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    timeouts Property Map
    workspaceName String
    Label that identifies the stream processing workspace.

    Outputs

    All input properties are implicitly available as output properties. Additionally, the StreamProcessor resource produces the following output properties:

    Id string
    The provider-assigned unique ID for this managed resource.
    Stats string
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    Id string
    The provider-assigned unique ID for this managed resource.
    Stats string
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    id String
    The provider-assigned unique ID for this managed resource.
    stats String
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    id string
    The provider-assigned unique ID for this managed resource.
    stats string
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    id str
    The provider-assigned unique ID for this managed resource.
    stats str
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    id String
    The provider-assigned unique ID for this managed resource.
    stats String
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.

    Look up Existing StreamProcessor Resource

    Get an existing StreamProcessor resource’s state with the given name, ID, and optional extra properties used to qualify the lookup.

    public static get(name: string, id: Input<ID>, state?: StreamProcessorState, opts?: CustomResourceOptions): StreamProcessor
    @staticmethod
    def get(resource_name: str,
            id: str,
            opts: Optional[ResourceOptions] = None,
            delete_on_create_timeout: Optional[bool] = None,
            instance_name: Optional[str] = None,
            options: Optional[StreamProcessorOptionsArgs] = None,
            pipeline: Optional[str] = None,
            processor_name: Optional[str] = None,
            project_id: Optional[str] = None,
            state: Optional[str] = None,
            stats: Optional[str] = None,
            timeouts: Optional[StreamProcessorTimeoutsArgs] = None,
            workspace_name: Optional[str] = None) -> StreamProcessor
    func GetStreamProcessor(ctx *Context, name string, id IDInput, state *StreamProcessorState, opts ...ResourceOption) (*StreamProcessor, error)
    public static StreamProcessor Get(string name, Input<string> id, StreamProcessorState? state, CustomResourceOptions? opts = null)
    public static StreamProcessor get(String name, Output<String> id, StreamProcessorState state, CustomResourceOptions options)
    resources:  _:    type: mongodbatlas:StreamProcessor    get:      id: ${id}
    name
    The unique name of the resulting resource.
    id
    The unique provider ID of the resource to lookup.
    state
    Any extra arguments used during the lookup.
    opts
    A bag of options that control this resource's behavior.
    resource_name
    The unique name of the resulting resource.
    id
    The unique provider ID of the resource to lookup.
    name
    The unique name of the resulting resource.
    id
    The unique provider ID of the resource to lookup.
    state
    Any extra arguments used during the lookup.
    opts
    A bag of options that control this resource's behavior.
    name
    The unique name of the resulting resource.
    id
    The unique provider ID of the resource to lookup.
    state
    Any extra arguments used during the lookup.
    opts
    A bag of options that control this resource's behavior.
    name
    The unique name of the resulting resource.
    id
    The unique provider ID of the resource to lookup.
    state
    Any extra arguments used during the lookup.
    opts
    A bag of options that control this resource's behavior.
    The following state arguments are supported:
    DeleteOnCreateTimeout bool
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    InstanceName string
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    Options StreamProcessorOptions
    Optional configuration for the stream processor.
    Pipeline string
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    ProcessorName string
    Label that identifies the stream processor.
    ProjectId string
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    State string

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    Stats string
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    Timeouts StreamProcessorTimeouts
    WorkspaceName string
    Label that identifies the stream processing workspace.
    DeleteOnCreateTimeout bool
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    InstanceName string
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    Options StreamProcessorOptionsArgs
    Optional configuration for the stream processor.
    Pipeline string
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    ProcessorName string
    Label that identifies the stream processor.
    ProjectId string
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    State string

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    Stats string
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    Timeouts StreamProcessorTimeoutsArgs
    WorkspaceName string
    Label that identifies the stream processing workspace.
    deleteOnCreateTimeout Boolean
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instanceName String
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options StreamProcessorOptions
    Optional configuration for the stream processor.
    pipeline String
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processorName String
    Label that identifies the stream processor.
    projectId String
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    state String

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    stats String
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    timeouts StreamProcessorTimeouts
    workspaceName String
    Label that identifies the stream processing workspace.
    deleteOnCreateTimeout boolean
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instanceName string
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options StreamProcessorOptions
    Optional configuration for the stream processor.
    pipeline string
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processorName string
    Label that identifies the stream processor.
    projectId string
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    state string

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    stats string
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    timeouts StreamProcessorTimeouts
    workspaceName string
    Label that identifies the stream processing workspace.
    delete_on_create_timeout bool
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instance_name str
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options StreamProcessorOptionsArgs
    Optional configuration for the stream processor.
    pipeline str
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processor_name str
    Label that identifies the stream processor.
    project_id str
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    state str

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    stats str
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    timeouts StreamProcessorTimeoutsArgs
    workspace_name str
    Label that identifies the stream processing workspace.
    deleteOnCreateTimeout Boolean
    Indicates whether to delete the resource being created if a timeout is reached when waiting for completion. When set to true and timeout occurs, it triggers the deletion and returns immediately without waiting for deletion to complete. When set to false, the timeout will not trigger resource deletion. If you suspect a transient error when the value is true, wait before retrying to allow resource deletion to finish. Default is true.
    instanceName String
    Label that identifies the stream processing workspace.

    Deprecated: This parameter is deprecated. Please transition to workspace_name.

    options Property Map
    Optional configuration for the stream processor.
    pipeline String
    Stream aggregation pipeline you want to apply to your streaming data. MongoDB Atlas Docs contain more information. Using jsonencode is recommended when setting this attribute. For more details see the Aggregation Pipelines Documentation
    processorName String
    Label that identifies the stream processor.
    projectId String
    Unique 24-hexadecimal digit string that identifies your project. Use the /groups endpoint to retrieve all projects to which the authenticated user has access.
    state String

    The state of the stream processor. Commonly occurring states are 'CREATED', 'STARTED', 'STOPPED' and 'FAILED'. Used to start or stop the Stream Processor. Valid values are CREATED, STARTED or STOPPED. When a Stream Processor is created without specifying the state, it will default to CREATED state. When a Stream Processor is updated without specifying the state, it will default to the Previous state.

    NOTE When a Stream Processor is updated without specifying the state, it is stopped and then restored to previous state upon update completion.

    stats String
    The stats associated with the stream processor. Refer to the MongoDB Atlas Docs for more information.
    timeouts Property Map
    workspaceName String
    Label that identifies the stream processing workspace.

    Supporting Types

    StreamProcessorOptions, StreamProcessorOptionsArgs

    Dlq StreamProcessorOptionsDlq
    Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
    Dlq StreamProcessorOptionsDlq
    Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
    dlq StreamProcessorOptionsDlq
    Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
    dlq StreamProcessorOptionsDlq
    Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
    dlq StreamProcessorOptionsDlq
    Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.
    dlq Property Map
    Dead letter queue for the stream processor. Refer to the MongoDB Atlas Docs for more information.

    StreamProcessorOptionsDlq, StreamProcessorOptionsDlqArgs

    Coll string
    Name of the collection to use for the DLQ.
    ConnectionName string
    Name of the connection to write DLQ messages to. Must be an Atlas connection.
    Db string
    Name of the database to use for the DLQ.
    Coll string
    Name of the collection to use for the DLQ.
    ConnectionName string
    Name of the connection to write DLQ messages to. Must be an Atlas connection.
    Db string
    Name of the database to use for the DLQ.
    coll String
    Name of the collection to use for the DLQ.
    connectionName String
    Name of the connection to write DLQ messages to. Must be an Atlas connection.
    db String
    Name of the database to use for the DLQ.
    coll string
    Name of the collection to use for the DLQ.
    connectionName string
    Name of the connection to write DLQ messages to. Must be an Atlas connection.
    db string
    Name of the database to use for the DLQ.
    coll str
    Name of the collection to use for the DLQ.
    connection_name str
    Name of the connection to write DLQ messages to. Must be an Atlas connection.
    db str
    Name of the database to use for the DLQ.
    coll String
    Name of the collection to use for the DLQ.
    connectionName String
    Name of the connection to write DLQ messages to. Must be an Atlas connection.
    db String
    Name of the database to use for the DLQ.

    StreamProcessorTimeouts, StreamProcessorTimeoutsArgs

    Create string
    A string that can be parsed as a duration consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
    Create string
    A string that can be parsed as a duration consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
    create String
    A string that can be parsed as a duration consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
    create string
    A string that can be parsed as a duration consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
    create str
    A string that can be parsed as a duration consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
    create String
    A string that can be parsed as a duration consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).

    Import

    Stream Processor resource can be imported using the Project ID, Stream Instance name and Stream Processor name, in the format INSTANCE_NAME-PROJECT_ID-PROCESSOR_NAME, e.g.

    $ terraform import mongodbatlas_stream_processor.test yourInstanceName-6117ac2fe2a3d04ed27a987v-yourProcessorName
    

    For more information see: MongoDB Atlas API - Stream Processor Documentation.

    To learn more about importing existing cloud resources, see Importing resources.

    Package Details

    Repository
    MongoDB Atlas pulumi/pulumi-mongodbatlas
    License
    Apache-2.0
    Notes
    This Pulumi package is based on the mongodbatlas Terraform Provider.
    mongodbatlas logo
    MongoDB Atlas v4.0.0 published on Tuesday, Dec 30, 2025 by Pulumi
      Meet Neo: Your AI Platform Teammate