Configure AWS Lambda Event Source Mappings

The aws:lambda/eventSourceMapping:EventSourceMapping resource, part of the Pulumi AWS provider, connects Lambda functions to event sources, defining how records are batched, filtered, and delivered for processing. This guide focuses on four capabilities: stream sources (DynamoDB, Kinesis), queue sources (SQS), Kafka sources (MSK and self-managed), and message filtering and batching.

Event source mappings reference existing Lambda functions and event sources such as streams, queues, Kafka clusters, and MQ brokers. Some configurations require VPC setup and Secrets Manager credentials. The examples are intentionally small. Combine them with your own Lambda functions, IAM roles, and event infrastructure.

Process DynamoDB table changes with Lambda

Applications that need to react to database changes connect Lambda functions to DynamoDB Streams, enabling real-time processing of inserts, updates, and deletes.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsDynamodbTable.streamArn,
    functionName: exampleAwsLambdaFunction.arn,
    startingPosition: "LATEST",
    tags: {
        Name: "dynamodb-stream-mapping",
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_dynamodb_table["streamArn"],
    function_name=example_aws_lambda_function["arn"],
    starting_position="LATEST",
    tags={
        "Name": "dynamodb-stream-mapping",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn:   pulumi.Any(exampleAwsDynamodbTable.StreamArn),
			FunctionName:     pulumi.Any(exampleAwsLambdaFunction.Arn),
			StartingPosition: pulumi.String("LATEST"),
			Tags: pulumi.StringMap{
				"Name": pulumi.String("dynamodb-stream-mapping"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsDynamodbTable.StreamArn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        StartingPosition = "LATEST",
        Tags = 
        {
            { "Name", "dynamodb-stream-mapping" },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsDynamodbTable.streamArn())
            .functionName(exampleAwsLambdaFunction.arn())
            .startingPosition("LATEST")
            .tags(Map.of("Name", "dynamodb-stream-mapping"))
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsDynamodbTable.streamArn}
      functionName: ${exampleAwsLambdaFunction.arn}
      startingPosition: LATEST
      tags:
        Name: dynamodb-stream-mapping

The eventSourceArn points to your DynamoDB table’s stream. The startingPosition determines where Lambda begins reading: LATEST processes only new records, while TRIM_HORIZON processes all available records from the stream’s retention window.

Consume Kinesis streams with batching and error handling

Real-time data pipelines buffer events in Kinesis before Lambda processing. Batching and parallelization control throughput, while dead letter queues capture failed records.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsKinesisStream.arn,
    functionName: exampleAwsLambdaFunction.arn,
    startingPosition: "LATEST",
    batchSize: 100,
    maximumBatchingWindowInSeconds: 5,
    parallelizationFactor: 2,
    destinationConfig: {
        onFailure: {
            destinationArn: dlq.arn,
        },
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_kinesis_stream["arn"],
    function_name=example_aws_lambda_function["arn"],
    starting_position="LATEST",
    batch_size=100,
    maximum_batching_window_in_seconds=5,
    parallelization_factor=2,
    destination_config={
        "on_failure": {
            "destination_arn": dlq["arn"],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn:                 pulumi.Any(exampleAwsKinesisStream.Arn),
			FunctionName:                   pulumi.Any(exampleAwsLambdaFunction.Arn),
			StartingPosition:               pulumi.String("LATEST"),
			BatchSize:                      pulumi.Int(100),
			MaximumBatchingWindowInSeconds: pulumi.Int(5),
			ParallelizationFactor:          pulumi.Int(2),
			DestinationConfig: &lambda.EventSourceMappingDestinationConfigArgs{
				OnFailure: &lambda.EventSourceMappingDestinationConfigOnFailureArgs{
					DestinationArn: pulumi.Any(dlq.Arn),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsKinesisStream.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        StartingPosition = "LATEST",
        BatchSize = 100,
        MaximumBatchingWindowInSeconds = 5,
        ParallelizationFactor = 2,
        DestinationConfig = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigArgs
        {
            OnFailure = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigOnFailureArgs
            {
                DestinationArn = dlq.Arn,
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigOnFailureArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsKinesisStream.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .startingPosition("LATEST")
            .batchSize(100)
            .maximumBatchingWindowInSeconds(5)
            .parallelizationFactor(2)
            .destinationConfig(EventSourceMappingDestinationConfigArgs.builder()
                .onFailure(EventSourceMappingDestinationConfigOnFailureArgs.builder()
                    .destinationArn(dlq.arn())
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsKinesisStream.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      startingPosition: LATEST
      batchSize: 100
      maximumBatchingWindowInSeconds: 5
      parallelizationFactor: 2
      destinationConfig:
        onFailure:
          destinationArn: ${dlq.arn}

Lambda retrieves up to batchSize records and waits up to maximumBatchingWindowInSeconds before invoking your function. The parallelizationFactor processes multiple batches from each shard concurrently. When processing fails, destinationConfig routes failed records to a dead letter queue for later analysis.

Poll SQS queues with concurrency scaling

Lambda polls SQS queues automatically, scaling the number of concurrent function invocations based on queue depth.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsSqsQueue.arn,
    functionName: exampleAwsLambdaFunction.arn,
    batchSize: 10,
    scalingConfig: {
        maximumConcurrency: 100,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_sqs_queue["arn"],
    function_name=example_aws_lambda_function["arn"],
    batch_size=10,
    scaling_config={
        "maximum_concurrency": 100,
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			BatchSize:      pulumi.Int(10),
			ScalingConfig: &lambda.EventSourceMappingScalingConfigArgs{
				MaximumConcurrency: pulumi.Int(100),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsSqsQueue.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        BatchSize = 10,
        ScalingConfig = new Aws.Lambda.Inputs.EventSourceMappingScalingConfigArgs
        {
            MaximumConcurrency = 100,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingScalingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsSqsQueue.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .batchSize(10)
            .scalingConfig(EventSourceMappingScalingConfigArgs.builder()
                .maximumConcurrency(100)
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsSqsQueue.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      batchSize: 10
      scalingConfig:
        maximumConcurrency: 100

The scalingConfig property controls how many function instances Lambda runs concurrently. As queue depth increases, Lambda scales up to maximumConcurrency, then scales down when the queue drains.

Filter SQS messages before Lambda invocation

When queues contain mixed message types, event filtering reduces Lambda invocations by evaluating message content before triggering the function.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsSqsQueue.arn,
    functionName: exampleAwsLambdaFunction.arn,
    filterCriteria: {
        filters: [{
            pattern: JSON.stringify({
                body: {
                    Temperature: [{
                        numeric: [
                            ">",
                            0,
                            "<=",
                            100,
                        ],
                    }],
                    Location: ["New York"],
                },
            }),
        }],
    },
});
import pulumi
import json
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_sqs_queue["arn"],
    function_name=example_aws_lambda_function["arn"],
    filter_criteria={
        "filters": [{
            "pattern": json.dumps({
                "body": {
                    "Temperature": [{
                        "numeric": [
                            ">",
                            0,
                            "<=",
                            100,
                        ],
                    }],
                    "Location": ["New York"],
                },
            }),
        }],
    })
package main

import (
	"encoding/json"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		tmpJSON0, err := json.Marshal(map[string]interface{}{
			"body": map[string]interface{}{
				"Temperature": []map[string]interface{}{
					map[string]interface{}{
						"numeric": []interface{}{
							">",
							0,
							"<=",
							100,
						},
					},
				},
				"Location": []string{
					"New York",
				},
			},
		})
		if err != nil {
			return err
		}
		json0 := string(tmpJSON0)
		_, err = lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			FilterCriteria: &lambda.EventSourceMappingFilterCriteriaArgs{
				Filters: lambda.EventSourceMappingFilterCriteriaFilterArray{
					&lambda.EventSourceMappingFilterCriteriaFilterArgs{
						Pattern: pulumi.String(json0),
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsSqsQueue.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        FilterCriteria = new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaArgs
        {
            Filters = new[]
            {
                new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaFilterArgs
                {
                    Pattern = JsonSerializer.Serialize(new Dictionary<string, object?>
                    {
                        ["body"] = new Dictionary<string, object?>
                        {
                            ["Temperature"] = new[]
                            {
                                new Dictionary<string, object?>
                                {
                                    ["numeric"] = new object?[]
                                    {
                                        ">",
                                        0,
                                        "<=",
                                        100,
                                    },
                                },
                            },
                            ["Location"] = new[]
                            {
                                "New York",
                            },
                        },
                    }),
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingFilterCriteriaArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsSqsQueue.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .filterCriteria(EventSourceMappingFilterCriteriaArgs.builder()
                .filters(EventSourceMappingFilterCriteriaFilterArgs.builder()
                    .pattern(serializeJson(
                        jsonObject(
                            jsonProperty("body", jsonObject(
                                jsonProperty("Temperature", jsonArray(jsonObject(
                                    jsonProperty("numeric", jsonArray(
                                        ">", 
                                        0, 
                                        "<=", 
                                        100
                                    ))
                                ))),
                                jsonProperty("Location", jsonArray("New York"))
                            ))
                        )))
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsSqsQueue.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      filterCriteria:
        filters:
          - pattern:
              fn::toJSON:
                body:
                  Temperature:
                    - numeric:
                        - '>'
                        - 0
                        - <=
                        - 100
                  Location:
                    - New York

The filterCriteria property defines a JSON pattern that Lambda evaluates against each message body. Only messages matching the pattern trigger your function. Here, Lambda invokes only for messages with Temperature between 0 and 100 and Location set to “New York”.

Consume Kafka topics from Amazon MSK

Managed Kafka clusters in MSK trigger Lambda functions when new records arrive on specified topics, enabling event-driven processing of Kafka streams.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsMskCluster.arn,
    functionName: exampleAwsLambdaFunction.arn,
    topics: [
        "orders",
        "inventory",
    ],
    startingPosition: "TRIM_HORIZON",
    batchSize: 100,
    amazonManagedKafkaEventSourceConfig: {
        consumerGroupId: "lambda-consumer-group",
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_msk_cluster["arn"],
    function_name=example_aws_lambda_function["arn"],
    topics=[
        "orders",
        "inventory",
    ],
    starting_position="TRIM_HORIZON",
    batch_size=100,
    amazon_managed_kafka_event_source_config={
        "consumer_group_id": "lambda-consumer-group",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsMskCluster.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			Topics: pulumi.StringArray{
				pulumi.String("orders"),
				pulumi.String("inventory"),
			},
			StartingPosition: pulumi.String("TRIM_HORIZON"),
			BatchSize:        pulumi.Int(100),
			AmazonManagedKafkaEventSourceConfig: &lambda.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs{
				ConsumerGroupId: pulumi.String("lambda-consumer-group"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsMskCluster.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        Topics = new[]
        {
            "orders",
            "inventory",
        },
        StartingPosition = "TRIM_HORIZON",
        BatchSize = 100,
        AmazonManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs
        {
            ConsumerGroupId = "lambda-consumer-group",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsMskCluster.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .topics(            
                "orders",
                "inventory")
            .startingPosition("TRIM_HORIZON")
            .batchSize(100)
            .amazonManagedKafkaEventSourceConfig(EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs.builder()
                .consumerGroupId("lambda-consumer-group")
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsMskCluster.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      topics:
        - orders
        - inventory
      startingPosition: TRIM_HORIZON
      batchSize: 100
      amazonManagedKafkaEventSourceConfig:
        consumerGroupId: lambda-consumer-group

The topics property lists Kafka topics to consume. The amazonManagedKafkaEventSourceConfig block sets the consumer group ID, which tracks read position across Lambda invocations. Lambda manages offset commits automatically.

Connect to self-managed Kafka clusters

Lambda can consume from Kafka clusters you manage outside AWS by providing bootstrap server endpoints and VPC access configuration.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    functionName: exampleAwsLambdaFunction.arn,
    topics: ["orders"],
    startingPosition: "TRIM_HORIZON",
    selfManagedEventSource: {
        endpoints: {
            KAFKA_BOOTSTRAP_SERVERS: "kafka1.example.com:9092,kafka2.example.com:9092",
        },
    },
    selfManagedKafkaEventSourceConfig: {
        consumerGroupId: "lambda-consumer-group",
    },
    sourceAccessConfigurations: [
        {
            type: "VPC_SUBNET",
            uri: `subnet:${example1.id}`,
        },
        {
            type: "VPC_SUBNET",
            uri: `subnet:${example2.id}`,
        },
        {
            type: "VPC_SECURITY_GROUP",
            uri: `security_group:${exampleAwsSecurityGroup.id}`,
        },
    ],
    provisionedPollerConfig: {
        maximumPollers: 100,
        minimumPollers: 10,
        pollerGroupName: "group-123",
    },
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    function_name=example_aws_lambda_function["arn"],
    topics=["orders"],
    starting_position="TRIM_HORIZON",
    self_managed_event_source={
        "endpoints": {
            "KAFKA_BOOTSTRAP_SERVERS": "kafka1.example.com:9092,kafka2.example.com:9092",
        },
    },
    self_managed_kafka_event_source_config={
        "consumer_group_id": "lambda-consumer-group",
    },
    source_access_configurations=[
        {
            "type": "VPC_SUBNET",
            "uri": f"subnet:{example1['id']}",
        },
        {
            "type": "VPC_SUBNET",
            "uri": f"subnet:{example2['id']}",
        },
        {
            "type": "VPC_SECURITY_GROUP",
            "uri": f"security_group:{example_aws_security_group['id']}",
        },
    ],
    provisioned_poller_config={
        "maximum_pollers": 100,
        "minimum_pollers": 10,
        "poller_group_name": "group-123",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
			Topics: pulumi.StringArray{
				pulumi.String("orders"),
			},
			StartingPosition: pulumi.String("TRIM_HORIZON"),
			SelfManagedEventSource: &lambda.EventSourceMappingSelfManagedEventSourceArgs{
				Endpoints: pulumi.StringMap{
					"KAFKA_BOOTSTRAP_SERVERS": pulumi.String("kafka1.example.com:9092,kafka2.example.com:9092"),
				},
			},
			SelfManagedKafkaEventSourceConfig: &lambda.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs{
				ConsumerGroupId: pulumi.String("lambda-consumer-group"),
			},
			SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("VPC_SUBNET"),
					Uri:  pulumi.Sprintf("subnet:%v", example1.Id),
				},
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("VPC_SUBNET"),
					Uri:  pulumi.Sprintf("subnet:%v", example2.Id),
				},
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("VPC_SECURITY_GROUP"),
					Uri:  pulumi.Sprintf("security_group:%v", exampleAwsSecurityGroup.Id),
				},
			},
			ProvisionedPollerConfig: &lambda.EventSourceMappingProvisionedPollerConfigArgs{
				MaximumPollers:  pulumi.Int(100),
				MinimumPollers:  pulumi.Int(10),
				PollerGroupName: pulumi.String("group-123"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        FunctionName = exampleAwsLambdaFunction.Arn,
        Topics = new[]
        {
            "orders",
        },
        StartingPosition = "TRIM_HORIZON",
        SelfManagedEventSource = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedEventSourceArgs
        {
            Endpoints = 
            {
                { "KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092" },
            },
        },
        SelfManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs
        {
            ConsumerGroupId = "lambda-consumer-group",
        },
        SourceAccessConfigurations = new[]
        {
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "VPC_SUBNET",
                Uri = $"subnet:{example1.Id}",
            },
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "VPC_SUBNET",
                Uri = $"subnet:{example2.Id}",
            },
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "VPC_SECURITY_GROUP",
                Uri = $"security_group:{exampleAwsSecurityGroup.Id}",
            },
        },
        ProvisionedPollerConfig = new Aws.Lambda.Inputs.EventSourceMappingProvisionedPollerConfigArgs
        {
            MaximumPollers = 100,
            MinimumPollers = 10,
            PollerGroupName = "group-123",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedEventSourceArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingProvisionedPollerConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .functionName(exampleAwsLambdaFunction.arn())
            .topics("orders")
            .startingPosition("TRIM_HORIZON")
            .selfManagedEventSource(EventSourceMappingSelfManagedEventSourceArgs.builder()
                .endpoints(Map.of("KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092"))
                .build())
            .selfManagedKafkaEventSourceConfig(EventSourceMappingSelfManagedKafkaEventSourceConfigArgs.builder()
                .consumerGroupId("lambda-consumer-group")
                .build())
            .sourceAccessConfigurations(            
                EventSourceMappingSourceAccessConfigurationArgs.builder()
                    .type("VPC_SUBNET")
                    .uri(String.format("subnet:%s", example1.id()))
                    .build(),
                EventSourceMappingSourceAccessConfigurationArgs.builder()
                    .type("VPC_SUBNET")
                    .uri(String.format("subnet:%s", example2.id()))
                    .build(),
                EventSourceMappingSourceAccessConfigurationArgs.builder()
                    .type("VPC_SECURITY_GROUP")
                    .uri(String.format("security_group:%s", exampleAwsSecurityGroup.id()))
                    .build())
            .provisionedPollerConfig(EventSourceMappingProvisionedPollerConfigArgs.builder()
                .maximumPollers(100)
                .minimumPollers(10)
                .pollerGroupName("group-123")
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      functionName: ${exampleAwsLambdaFunction.arn}
      topics:
        - orders
      startingPosition: TRIM_HORIZON
      selfManagedEventSource:
        endpoints:
          KAFKA_BOOTSTRAP_SERVERS: kafka1.example.com:9092,kafka2.example.com:9092
      selfManagedKafkaEventSourceConfig:
        consumerGroupId: lambda-consumer-group
      sourceAccessConfigurations:
        - type: VPC_SUBNET
          uri: subnet:${example1.id}
        - type: VPC_SUBNET
          uri: subnet:${example2.id}
        - type: VPC_SECURITY_GROUP
          uri: security_group:${exampleAwsSecurityGroup.id}
      provisionedPollerConfig:
        maximumPollers: 100
        minimumPollers: 10
        pollerGroupName: group-123

The selfManagedEventSource block specifies Kafka bootstrap servers. The sourceAccessConfigurations array provides VPC subnet and security group references so Lambda can reach your cluster. The provisionedPollerConfig controls how many pollers Lambda allocates to read from Kafka partitions.

Process ActiveMQ messages with authentication

Amazon MQ brokers running ActiveMQ trigger Lambda functions when messages arrive in specified queues, using Secrets Manager for authentication.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.lambda.EventSourceMapping("example", {
    eventSourceArn: exampleAwsMqBroker.arn,
    functionName: exampleAwsLambdaFunction.arn,
    queues: "orders",
    batchSize: 10,
    sourceAccessConfigurations: [{
        type: "BASIC_AUTH",
        uri: exampleAwsSecretsmanagerSecretVersion.arn,
    }],
});
import pulumi
import pulumi_aws as aws

example = aws.lambda_.EventSourceMapping("example",
    event_source_arn=example_aws_mq_broker["arn"],
    function_name=example_aws_lambda_function["arn"],
    queues="orders",
    batch_size=10,
    source_access_configurations=[{
        "type": "BASIC_AUTH",
        "uri": example_aws_secretsmanager_secret_version["arn"],
    }])
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
			EventSourceArn: pulumi.Any(exampleAwsMqBroker.Arn),
			FunctionName:   pulumi.Any(exampleAwsLambdaFunction.Arn),
			Queues:         pulumi.String("orders"),
			BatchSize:      pulumi.Int(10),
			SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
				&lambda.EventSourceMappingSourceAccessConfigurationArgs{
					Type: pulumi.String("BASIC_AUTH"),
					Uri:  pulumi.Any(exampleAwsSecretsmanagerSecretVersion.Arn),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Lambda.EventSourceMapping("example", new()
    {
        EventSourceArn = exampleAwsMqBroker.Arn,
        FunctionName = exampleAwsLambdaFunction.Arn,
        Queues = "orders",
        BatchSize = 10,
        SourceAccessConfigurations = new[]
        {
            new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
            {
                Type = "BASIC_AUTH",
                Uri = exampleAwsSecretsmanagerSecretVersion.Arn,
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
            .eventSourceArn(exampleAwsMqBroker.arn())
            .functionName(exampleAwsLambdaFunction.arn())
            .queues("orders")
            .batchSize(10)
            .sourceAccessConfigurations(EventSourceMappingSourceAccessConfigurationArgs.builder()
                .type("BASIC_AUTH")
                .uri(exampleAwsSecretsmanagerSecretVersion.arn())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:lambda:EventSourceMapping
    properties:
      eventSourceArn: ${exampleAwsMqBroker.arn}
      functionName: ${exampleAwsLambdaFunction.arn}
      queues: orders
      batchSize: 10
      sourceAccessConfigurations:
        - type: BASIC_AUTH
          uri: ${exampleAwsSecretsmanagerSecretVersion.arn}

The queues property specifies which ActiveMQ queue to consume. The sourceAccessConfigurations array references a Secrets Manager secret containing broker credentials. Lambda retrieves credentials at runtime and authenticates to the broker.

Beyond these examples

These snippets focus on specific event source mapping features: stream and queue event sources (DynamoDB, Kinesis, SQS), Kafka integration (MSK and self-managed), and message filtering and batching controls. They’re intentionally minimal rather than full event-driven applications.

The examples reference pre-existing infrastructure such as Lambda functions with appropriate IAM permissions, event sources (DynamoDB tables, Kinesis streams, SQS queues, Kafka clusters, MQ brokers), VPC subnets and security groups (for Kafka and MQ), and Secrets Manager secrets (for authentication). They focus on configuring the event source mapping rather than provisioning the surrounding infrastructure.

To keep things focused, common mapping patterns are omitted, including:

  • Error handling options (bisectBatchOnFunctionError, maximumRetryAttempts)
  • Windowing for streaming analytics (tumblingWindowInSeconds)
  • Metrics configuration (metricsConfig)
  • DocumentDB change streams

These omissions are intentional: the goal is to illustrate how each event source mapping feature is wired, not provide drop-in event processing modules. See the Lambda EventSourceMapping resource reference for all available configuration options.

Let's configure AWS Lambda Event Source Mappings

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Source Types
What event sources can I connect to Lambda functions?
You can connect Kinesis streams, DynamoDB streams, SQS queues, Amazon MQ brokers (ActiveMQ and RabbitMQ), Amazon MSK clusters, self-managed Apache Kafka, and DocumentDB change streams.
What's the difference between Amazon MSK and self-managed Kafka configuration?
Amazon MSK uses eventSourceArn and amazonManagedKafkaEventSourceConfig, while self-managed Kafka uses selfManagedEventSource with bootstrap servers and requires sourceAccessConfigurations for VPC access. These configurations are mutually exclusive.
How do I configure self-managed Kafka as an event source?
Set selfManagedEventSource with KAFKA_BOOTSTRAP_SERVERS endpoints, configure selfManagedKafkaEventSourceConfig with a consumer group ID, and include sourceAccessConfigurations with VPC_SUBNET and VPC_SECURITY_GROUP entries.
Why can't I use startingPosition with SQS?
The startingPosition parameter is only for stream sources (Kinesis, DynamoDB) and Kafka sources. SQS queues don’t use stream positions, so this parameter must not be provided for SQS event sources.
Immutability & Updates
What properties can't I change after creating an event source mapping?
The following properties are immutable and require resource replacement to change: eventSourceArn, startingPosition, startingPositionTimestamp, topics, queues, selfManagedEventSource, amazonManagedKafkaEventSourceConfig, and selfManagedKafkaEventSourceConfig.
What's the difference between functionName and functionArn?
functionName is the input property where you specify the Lambda function’s name or ARN. functionArn is a computed output property that always returns the actual ARN of the function.
Batch Processing & Performance
What are the default batch sizes for different event sources?
The default batchSize is 100 for DynamoDB, Kinesis, MQ, and MSK sources. For SQS queues, the default is 10.
How does batch windowing work?
Set maximumBatchingWindowInSeconds (0-300 seconds) to gather records before invoking the function. Records buffer until either the time window expires or batchSize is reached. This is available for stream sources (DynamoDB and Kinesis) and SQS standard queues.
How do I enable parallel processing for Kinesis or DynamoDB streams?
Set parallelizationFactor to process multiple batches from each shard concurrently. The value ranges from 1 (default) to 10, and is only available for stream sources.
Error Handling & Retries
How do I handle failed records from my event source?
Configure destinationConfig with an SQS queue, SNS topic, or S3 bucket (Kafka only) to receive failed records. This is available for stream sources (DynamoDB and Kinesis) and Kafka sources (MSK and self-managed).
What are the retry limits for stream sources?
Use maximumRetryAttempts to control retries when the function returns an error. The default is -1 (retry forever), with a maximum of 10000 retries. This is only available for stream sources (DynamoDB and Kinesis).
How do I enable batch failure reporting?
Set functionResponseTypes to ["ReportBatchItemFailures"] to enable AWS Lambda checkpointing. This allows your function to report which items in a batch failed, and is available for SQS and stream sources (DynamoDB and Kinesis).
What does bisectBatchOnFunctionError do?
When set to true, Lambda splits the batch in two and retries if the function returns an error. This is only available for stream sources (DynamoDB and Kinesis) and defaults to false.
Event Filtering & Advanced Features
How do I filter events before they reach my Lambda function?
Use filterCriteria with a JSON pattern to filter events from Kinesis streams, DynamoDB streams, or SQS queues. The pattern matches against the event structure to determine which events to process.
What is the tumbling window feature for stream analytics?
Set tumblingWindowInSeconds (1-900 seconds) to define a processing window for AWS Lambda streaming analytics. This is only available for stream sources (DynamoDB and Kinesis).

Using a different cloud?

Explore serverless guides for other cloud providers: