Configure AWS Lambda Event Source Mappings

The aws:lambda/eventSourceMapping:EventSourceMapping resource, part of the Pulumi AWS provider, connects Lambda functions to event sources, defining how records are batched, filtered, and delivered for processing. This guide focuses on four categories of event sources: stream sources (DynamoDB, Kinesis), queue sources (SQS), Kafka sources (MSK and self-managed), and message brokers (ActiveMQ, RabbitMQ).

Event source mappings reference existing Lambda functions and event sources. Some configurations require VPC infrastructure or Secrets Manager credentials. The examples are intentionally small. Combine them with your own Lambda functions, IAM roles, and event source infrastructure.

Process DynamoDB table changes with Lambda

Applications that need to react to database changes connect Lambda functions to DynamoDB Streams, enabling real-time processing of inserts, updates, and deletes.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsDynamodbTable.streamArn,
    functionName: exampleAwsLambdaFunction.arn,
    startingPosition: "LATEST",
    tags: {
        Name: "dynamodb-stream-mapping",
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_dynamodb_table["streamArn"],
    function_name=example_aws_lambda_function["arn"],
    starting_position="LATEST",
    tags={
        "Name": "dynamodb-stream-mapping",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn:   pulumi.Any(exampleAwsDynamodbTable.StreamArn),
			FunctionName:     pulumi.Any(exampleAwsLambdaFunction.Arn),
			StartingPosition: pulumi.String("LATEST"),
			Tags: pulumi.StringMap{
				"Name": pulumi.String("dynamodb-stream-mapping"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsDynamodbTable.StreamArn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        StartingPosition = "LATEST",
        Tags = 
        {
            { "Name", "dynamodb-stream-mapping" },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsDynamodbTable.streamArn())
            .functionName(exampleAwsLambdaFunction.arn())
            .startingPosition("LATEST")
            .tags(Map.of("Name", "dynamodb-stream-mapping"))
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsDynamodbTable.streamArn}
      functionName: ${exampleAwsLambdaFunction.arn}
      startingPosition: LATEST
      tags:
        Name: dynamodb-stream-mapping

The eventSourceArn points to the DynamoDB table’s stream. The startingPosition determines where Lambda begins reading: LATEST processes only new records, while TRIM_HORIZON processes all available records from the stream’s retention window.

Consume Kinesis streams with batching and error handling

Real-time data pipelines use Kinesis to buffer high-throughput events. Lambda processes these streams in batches, with configurable parallelization and dead-letter queues for failed records.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsKinesisStream.arn,
    functionName: exampleAwsLambdaFunction.arn,
    startingPosition: "LATEST",
    batchSize: 100,
    maximumBatchingWindowInSeconds: 5,
    parallelizationFactor: 2,
    destinationConfig: {
        onFailure: {
            destinationArn: dlq.arn,
        },
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_kinesis_stream["arn"],
    function_name=example_aws_lambda_function["arn"],
    starting_position="LATEST",
    batch_size=100,
    maximum_batching_window_in_seconds=5,
    parallelization_factor=2,
    destination_config={
        "on_failure": {
            "destination_arn": dlq["arn"],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn:                 pulumi.Any(exampleAwsKinesisStream.Arn),
			FunctionName:                   pulumi.Any(exampleAwsLambdaFunction.Arn),
			StartingPosition:               pulumi.String("LATEST"),
			BatchSize:                      pulumi.Int(100),
			MaximumBatchingWindowInSeconds: pulumi.Int(5),
			ParallelizationFactor:          pulumi.Int(2),
			DestinationConfig: &lambda.EventSourceMappingDestinationConfigArgs{
				OnFailure: &lambda.EventSourceMappingDestinationConfigOnFailureArgs{
					DestinationArn: pulumi.Any(dlq.Arn),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsKinesisStream.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        StartingPosition = "LATEST",
        BatchSize = 100,
        MaximumBatchingWindowInSeconds = 5,
        ParallelizationFactor = 2,
        DestinationConfig = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigArgs
        {
            OnFailure = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigOnFailureArgs
            {
                DestinationArn = dlq.Arn,
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigOnFailureArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsKinesisStream.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .startingPosition("LATEST")
            .batchSize(100)
            .maximumBatchingWindowInSeconds(5)
            .parallelizationFactor(2)
            .destinationConfig(EventSourceMappingDestinationConfigArgs.builder()
                .onFailure(EventSourceMappingDestinationConfigOnFailureArgs.builder()
                    .destinationArn(dlq.arn())
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsKinesisStream.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      startingPosition: LATEST
      batchSize: 100
      maximumBatchingWindowInSeconds: 5
      parallelizationFactor: 2
      destinationConfig:
        onFailure:
          destinationArn: ${dlq.arn}

Lambda retrieves up to batchSize records and waits up to maximumBatchingWindowInSeconds before invoking your function. The parallelizationFactor controls how many batches Lambda processes concurrently from each shard. When processing fails, destinationConfig routes failed records to a dead-letter queue for later analysis.

Poll SQS queues with concurrency scaling

Message-driven architectures use SQS to decouple producers from consumers. Lambda polls queues and scales the number of concurrent function invocations based on queue depth.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsSqsQueue.arn,
    functionName: exampleAwsLambdaFunction.arn,
    batchSize: 10,
    scalingConfig: {
        maximumConcurrency: 100,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_sqs_queue["arn"],
    function_name=example_aws_lambda_function["arn"],
    batch_size=10,
    scaling_config={
        "maximum_concurrency": 100,
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			BatchSize:      pulumi.Int(10),
			ScalingConfig: &lambda.EventSourceMappingScalingConfigArgs{
				MaximumConcurrency: pulumi.Int(100),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsSqsQueue.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        BatchSize = 10,
        ScalingConfig = new Aws.Lambda.Inputs.EventSourceMappingScalingConfigArgs
        {
            MaximumConcurrency = 100,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingScalingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsSqsQueue.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .batchSize(10)
            .scalingConfig(EventSourceMappingScalingConfigArgs.builder()
                .maximumConcurrency(100)
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsSqsQueue.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      batchSize: 10
      scalingConfig:
        maximumConcurrency: 100

The scalingConfig property sets maximumConcurrency, which controls how many function instances Lambda runs simultaneously. As queue depth increases, Lambda automatically scales up to this limit. The batchSize determines how many messages each invocation receives.

Filter SQS messages before invoking Lambda

High-volume queues often contain messages that don’t require processing. Event filtering lets Lambda skip irrelevant messages without invoking your function, reducing costs and latency.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsSqsQueue.arn,
    functionName: exampleAwsLambdaFunction.arn,
    filterCriteria: {
        filters: [{
            pattern: JSON.stringify({
                body: {
                    Temperature: [{
                        numeric: [
                            ">",
                            0,
                            "<=",
                            100,
                        ],
                    }],
                    Location: ["New York"],
                },
            }),
        }],
    },
});
import pulumi
import json
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_sqs_queue["arn"],
    function_name=example_aws_lambda_function["arn"],
    filter_criteria={
        "filters": [{
            "pattern": json.dumps({
                "body": {
                    "Temperature": [{
                        "numeric": [
                            ">",
                            0,
                            "<=",
                            100,
                        ],
                    }],
                    "Location": ["New York"],
                },
            }),
        }],
    })
package main

import (
	"encoding/json"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		tmpJSON0, err := json.Marshal(map[string]interface{}{
			"body": map[string]interface{}{
				"Temperature": []map[string]interface{}{
					map[string]interface{}{
						"numeric": []interface{}{
							">",
							0,
							"<=",
							100,
						},
					},
				},
				"Location": []string{
					"New York",
				},
			},
		})
		if err != nil {
			return err
		}
		json0 := string(tmpJSON0)
		_, err = lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			FilterCriteria: &lambda.EventSourceMappingFilterCriteriaArgs{
				Filters: lambda.EventSourceMappingFilterCriteriaFilterArray{
					&lambda.EventSourceMappingFilterCriteriaFilterArgs{
						Pattern: pulumi.String(json0),
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsSqsQueue.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        FilterCriteria = new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaArgs
        {
            Filters = new[]
            {
                new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaFilterArgs
                {
                    Pattern = JsonSerializer.Serialize(new Dictionary<string, object?>
                    {
                        ["body"] = new Dictionary<string, object?>
                        {
                            ["Temperature"] = new[]
                            {
                                new Dictionary<string, object?>
                                {
                                    ["numeric"] = new object?[]
                                    {
                                        ">",
                                        0,
                                        "<=",
                                        100,
                                    },
                                },
                            },
                            ["Location"] = new[]
                            {
                                "New York",
                            },
                        },
                    }),
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingFilterCriteriaArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsSqsQueue.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .filterCriteria(EventSourceMappingFilterCriteriaArgs.builder()
                .filters(EventSourceMappingFilterCriteriaFilterArgs.builder()
                    .pattern(serializeJson(
                        jsonObject(
                            jsonProperty("body", jsonObject(
                                jsonProperty("Temperature", jsonArray(jsonObject(
                                    jsonProperty("numeric", jsonArray(
                                        ">", 
                                        0, 
                                        "<=", 
                                        100
                                    ))
                                ))),
                                jsonProperty("Location", jsonArray("New York"))
                            ))
                        )))
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsSqsQueue.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      filterCriteria:
        filters:
          - pattern:
              fn::toJSON:
                body:
                  Temperature:
                    - numeric:
                        - '>'
                        - 0
                        - <=
                        - 100
                  Location:
                    - New York

The filterCriteria property defines a JSON pattern that Lambda evaluates against each message body. Only messages matching the pattern trigger function invocations. In this configuration, Lambda processes only messages with Temperature between 0 and 100 and Location set to “New York”.

Consume Kafka topics from managed MSK clusters

Event-driven systems built on Kafka use MSK for managed cluster operations. Lambda subscribes to topics and processes records in batches, with consumer group coordination handled automatically.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsMskCluster.arn,
    functionName: exampleAwsLambdaFunction.arn,
    topics: [
        "orders",
        "inventory",
    ],
    startingPosition: "TRIM_HORIZON",
    batchSize: 100,
    amazonManagedKafkaEventSourceConfig: {
        consumerGroupId: "lambda-consumer-group",
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_msk_cluster["arn"],
    function_name=example_aws_lambda_function["arn"],
    topics=[
        "orders",
        "inventory",
    ],
    starting_position="TRIM_HORIZON",
    batch_size=100,
    amazon_managed_kafka_event_source_config={
        "consumer_group_id": "lambda-consumer-group",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsMskCluster.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			Topics: pulumi.StringArray{
				pulumi.String("orders"),
				pulumi.String("inventory"),
			},
			StartingPosition: pulumi.String("TRIM_HORIZON"),
			BatchSize:        pulumi.Int(100),
			AmazonManagedKafkaEventSourceConfig: &lambda.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs{
				ConsumerGroupId: pulumi.String("lambda-consumer-group"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsMskCluster.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        Topics = new[]
        {
            "orders",
            "inventory",
        },
        StartingPosition = "TRIM_HORIZON",
        BatchSize = 100,
        AmazonManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs
        {
            ConsumerGroupId = "lambda-consumer-group",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsMskCluster.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .topics(            
                "orders",
                "inventory")
            .startingPosition("TRIM_HORIZON")
            .batchSize(100)
            .amazonManagedKafkaEventSourceConfig(EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs.builder()
                .consumerGroupId("lambda-consumer-group")
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsMskCluster.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      topics:
        - orders
        - inventory
      startingPosition: TRIM_HORIZON
      batchSize: 100
      amazonManagedKafkaEventSourceConfig:
        consumerGroupId: lambda-consumer-group

The topics property lists Kafka topics to consume. The amazonManagedKafkaEventSourceConfig block sets the consumerGroupId, which Lambda uses for offset management and coordination. The startingPosition determines whether Lambda reads from the beginning (TRIM_HORIZON) or only new records (LATEST).

Connect to self-hosted Kafka clusters

Organizations running their own Kafka infrastructure can connect Lambda to self-managed clusters by providing bootstrap servers and VPC access configuration.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    functionName: exampleAwsLambdaFunction.arn,
    topics: ["orders"],
    startingPosition: "TRIM_HORIZON",
    selfManagedEventSource: {
        endpoints: {
            KAFKA_BOOTSTRAP_SERVERS: "kafka1.example.com:9092,kafka2.example.com:9092",
        },
    },
    selfManagedKafkaEventSourceConfig: {
        consumerGroupId: "lambda-consumer-group",
    },
    sourceAccessConfigurations: [
        {
            type: "VPC_SUBNET",
            uri: `subnet:${example1.id}`,
        },
        {
            type: "VPC_SUBNET",
            uri: `subnet:${example2.id}`,
        },
        {
            type: "VPC_SECURITY_GROUP",
            uri: `security_group:${exampleAwsSecurityGroup.id}`,
        },
    ],
    provisionedPollerConfig: {
        maximumPollers: 100,
        minimumPollers: 10,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    function_name=example_aws_lambda_function["arn"],
    topics=["orders"],
    starting_position="TRIM_HORIZON",
    self_managed_event_source={
        "endpoints": {
            "KAFKA_BOOTSTRAP_SERVERS": "kafka1.example.com:9092,kafka2.example.com:9092",
        },
    },
    self_managed_kafka_event_source_config={
        "consumer_group_id": "lambda-consumer-group",
    },
    source_access_configurations=[
        {
            "type": "VPC_SUBNET",
            "uri": f"subnet:{example1['id']}",
        },
        {
            "type": "VPC_SUBNET",
            "uri": f"subnet:{example2['id']}",
        },
        {
            "type": "VPC_SECURITY_GROUP",
            "uri": f"security_group:{example_aws_security_group['id']}",
        },
    ],
    provisioned_poller_config={
        "maximum_pollers": 100,
        "minimum_pollers": 10,
    })
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
			Topics: pulumi.StringArray{
				pulumi.String("orders"),
			},
			StartingPosition: pulumi.String("TRIM_HORIZON"),
			SelfManagedEventSource: &lambda.EventSourceMappingSelfManagedEventSourceArgs{
				Endpoints: pulumi.StringMap{
					"KAFKA_BOOTSTRAP_SERVERS": pulumi.String("kafka1.example.com:9092,kafka2.example.com:9092"),
				},
			},
			SelfManagedKafkaEventSourceConfig: &lambda.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs{
				ConsumerGroupId: pulumi.String("lambda-consumer-group"),
			},
			SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("VPC_SUBNET"),
					Uri:  pulumi.Sprintf("subnet:%v", example1.Id),
				},
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("VPC_SUBNET"),
					Uri:  pulumi.Sprintf("subnet:%v", example2.Id),
				},
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("VPC_SECURITY_GROUP"),
					Uri:  pulumi.Sprintf("security_group:%v", exampleAwsSecurityGroup.Id),
				},
			},
			ProvisionedPollerConfig: &lambda.EventSourceMappingProvisionedPollerConfigArgs{
				MaximumPollers: pulumi.Int(100),
				MinimumPollers: pulumi.Int(10),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        FunctionName = exampleAwsLambdaFunction.Arn,
        Topics = new[]
        {
            "orders",
        },
        StartingPosition = "TRIM_HORIZON",
        SelfManagedEventSource = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedEventSourceArgs
        {
            Endpoints = 
            {
                { "KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092" },
            },
        },
        SelfManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs
        {
            ConsumerGroupId = "lambda-consumer-group",
        },
        SourceAccessConfigurations = new[]
        {
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "VPC_SUBNET",
                Uri = $"subnet:{example1.Id}",
            },
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "VPC_SUBNET",
                Uri = $"subnet:{example2.Id}",
            },
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "VPC_SECURITY_GROUP",
                Uri = $"security_group:{exampleAwsSecurityGroup.Id}",
            },
        },
        ProvisionedPollerConfig = new Aws.Lambda.Inputs.EventSourceMappingProvisionedPollerConfigArgs
        {
            MaximumPollers = 100,
            MinimumPollers = 10,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedEventSourceArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingProvisionedPollerConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .functionName(exampleAwsLambdaFunction.arn())
            .topics("orders")
            .startingPosition("TRIM_HORIZON")
            .selfManagedEventSource(EventSourceMappingSelfManagedEventSourceArgs.builder()
                .endpoints(Map.of("KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092"))
                .build())
            .selfManagedKafkaEventSourceConfig(EventSourceMappingSelfManagedKafkaEventSourceConfigArgs.builder()
                .consumerGroupId("lambda-consumer-group")
                .build())
            .sourceAccessConfigurations(            
                EventSourceMappingSourceAccessConfigurationArgs.builder()
                    .type("VPC_SUBNET")
                    .uri(String.format("subnet:%s", example1.id()))
                    .build(),
                EventSourceMappingSourceAccessConfigurationArgs.builder()
                    .type("VPC_SUBNET")
                    .uri(String.format("subnet:%s", example2.id()))
                    .build(),
                EventSourceMappingSourceAccessConfigurationArgs.builder()
                    .type("VPC_SECURITY_GROUP")
                    .uri(String.format("security_group:%s", exampleAwsSecurityGroup.id()))
                    .build())
            .provisionedPollerConfig(EventSourceMappingProvisionedPollerConfigArgs.builder()
                .maximumPollers(100)
                .minimumPollers(10)
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      functionName: ${exampleAwsLambdaFunction.arn}
      topics:
        - orders
      startingPosition: TRIM_HORIZON
      selfManagedEventSource:
        endpoints:
          KAFKA_BOOTSTRAP_SERVERS: kafka1.example.com:9092,kafka2.example.com:9092
      selfManagedKafkaEventSourceConfig:
        consumerGroupId: lambda-consumer-group
      sourceAccessConfigurations:
        - type: VPC_SUBNET
          uri: subnet:${example1.id}
        - type: VPC_SUBNET
          uri: subnet:${example2.id}
        - type: VPC_SECURITY_GROUP
          uri: security_group:${exampleAwsSecurityGroup.id}
      provisionedPollerConfig:
        maximumPollers: 100
        minimumPollers: 10

The selfManagedEventSource block specifies Kafka bootstrap servers via the KAFKA_BOOTSTRAP_SERVERS endpoint. The sourceAccessConfigurations array provides VPC subnet and security group references, enabling Lambda to reach the cluster. The provisionedPollerConfig controls how many pollers Lambda maintains for consuming records.

Process ActiveMQ messages with authentication

Message brokers like ActiveMQ require authentication credentials. Lambda retrieves these from Secrets Manager when establishing connections to broker queues.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsMqBroker.arn,
    functionName: exampleAwsLambdaFunction.arn,
    queues: "orders",
    batchSize: 10,
    sourceAccessConfigurations: [{
        type: "BASIC_AUTH",
        uri: exampleAwsSecretsmanagerSecretVersion.arn,
    }],
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_mq_broker["arn"],
    function_name=example_aws_lambda_function["arn"],
    queues="orders",
    batch_size=10,
    source_access_configurations=[{
        "type": "BASIC_AUTH",
        "uri": example_aws_secretsmanager_secret_version["arn"],
    }])
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsMqBroker.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			Queues:         pulumi.String("orders"),
			BatchSize:      pulumi.Int(10),
			SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("BASIC_AUTH"),
					Uri:  pulumi.Any(exampleAwsSecretsmanagerSecretVersion.Arn),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsMqBroker.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        Queues = "orders",
        BatchSize = 10,
        SourceAccessConfigurations = new[]
        {
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "BASIC_AUTH",
                Uri = exampleAwsSecretsmanagerSecretVersion.Arn,
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsMqBroker.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .queues("orders")
            .batchSize(10)
            .sourceAccessConfigurations(EventSourceMappingSourceAccessConfigurationArgs.builder()
                .type("BASIC_AUTH")
                .uri(exampleAwsSecretsmanagerSecretVersion.arn())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsMqBroker.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      queues: orders
      batchSize: 10
      sourceAccessConfigurations:
        - type: BASIC_AUTH
          uri: ${exampleAwsSecretsmanagerSecretVersion.arn}

The queues property specifies which ActiveMQ queue to consume. The sourceAccessConfigurations array includes a BASIC_AUTH entry pointing to a Secrets Manager secret containing broker credentials. Lambda retrieves these credentials automatically when connecting to the broker.

Beyond these examples

These snippets focus on specific event source mapping features: stream and queue event sources (DynamoDB, Kinesis, SQS), Kafka integration (MSK and self-managed), and message broker connectivity (ActiveMQ, RabbitMQ, DocumentDB). They’re intentionally minimal rather than full event-driven applications.

The examples reference pre-existing infrastructure such as Lambda functions, event sources (streams, queues, Kafka clusters, message brokers), VPC subnets and security groups (for self-managed Kafka), and Secrets Manager secrets (for authenticated sources). They focus on configuring the event source mapping rather than provisioning the surrounding infrastructure.

To keep things focused, common event source mapping patterns are omitted, including:

  • Tumbling windows for streaming analytics (tumblingWindowInSeconds)
  • Batch splitting on errors (bisectBatchOnFunctionError)
  • Record age and retry limits (maximumRecordAgeInSeconds, maximumRetryAttempts)
  • CloudWatch metrics configuration (metricsConfig)
  • KMS encryption for filter criteria (kmsKeyArn)

These omissions are intentional: the goal is to illustrate how each event source type is wired, not provide drop-in event processing modules. See the Lambda EventSourceMapping resource reference for all available configuration options.

Let's configure AWS Lambda Event Source Mappings

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Compatibility
What properties can't I change after creating an event source mapping?
Several properties are immutable: eventSourceArn, queues, topics, startingPosition, startingPositionTimestamp, amazonManagedKafkaEventSourceConfig, and selfManagedKafkaEventSourceConfig. Changes to these require recreating the resource.
Can I use both Amazon MSK and self-managed Kafka configurations together?
No, amazonManagedKafkaEventSourceConfig is incompatible with selfManagedEventSource and selfManagedKafkaEventSourceConfig. Use amazonManagedKafkaEventSourceConfig for MSK, selfManagedKafkaEventSourceConfig for self-managed Kafka.
Why can't I set startingPosition for my SQS event source?
SQS event sources don’t support startingPosition. This property is only required for stream sources like Kinesis, DynamoDB, MSK, and self-managed Kafka.
Which features work with which event sources?

Feature availability varies by source:

  • Stream sources only (DynamoDB/Kinesis): bisectBatchOnFunctionError, maximumRecordAgeInSeconds, maximumRetryAttempts, parallelizationFactor, tumblingWindowInSeconds
  • Stream sources and Kafka: destinationConfig
  • SQS only: scalingConfig
  • MSK/self-managed Kafka only: provisionedPollerConfig
  • Stream sources and SQS: maximumBatchingWindowInSeconds, functionResponseTypes, metricsConfig
Event Sources & Setup
How do I configure a self-managed Kafka event source?
You need three components: selfManagedEventSource with KAFKA_BOOTSTRAP_SERVERS endpoints, selfManagedKafkaEventSourceConfig with consumerGroupId, and sourceAccessConfigurations for VPC access (subnets and security groups).
How do I authenticate to Amazon MQ brokers?
Use sourceAccessConfigurations with type BASIC_AUTH pointing to a Secrets Manager secret ARN. For RabbitMQ, also include a VIRTUAL_HOST configuration.
What's the difference between functionName and functionArn?
functionName is your input (can be a name or ARN), while functionArn is a computed output showing the actual ARN that Lambda uses for the event source mapping.
Batch Processing & Performance
What are the default batch sizes for different event sources?
Batch size defaults to 100 for DynamoDB, Kinesis, MQ, and MSK sources. For SQS, it defaults to 10.
How do I process batches from each shard concurrently?
Set parallelizationFactor (1-10) for stream sources (DynamoDB and Kinesis). The default is 1, meaning sequential processing per shard.
How do I control how long Lambda waits before invoking my function?
Use maximumBatchingWindowInSeconds (0-300) for stream sources and SQS standard queues. Records buffer until this timeout expires or batchSize is reached.
Error Handling & Retries
What are the retry and record age limits for stream sources?
For DynamoDB and Kinesis streams, maximumRetryAttempts defaults to -1 (forever) with a max of 10000. maximumRecordAgeInSeconds defaults to -1 (forever) or can be set between 60 and 604800 seconds.
How do I handle failed records from my event source?
Configure destinationConfig with an SQS queue, SNS topic, or S3 bucket (Kafka only) for failed records. This is available for stream sources (DynamoDB/Kinesis) and Kafka sources (MSK/self-managed).
Advanced Features
How do I filter events before Lambda processes them?
Use filterCriteria with a JSON pattern for Kinesis, DynamoDB, or SQS sources. The pattern can match on message body fields, as shown in the SQS filtering example with Temperature and Location filters.
How do I control SQS concurrency for my Lambda function?
Use scalingConfig with maximumConcurrency to limit how many concurrent Lambda invocations process messages from the SQS queue.

Using a different cloud?

Explore serverless guides for other cloud providers: