The aws:lambda/eventSourceMapping:EventSourceMapping resource, part of the Pulumi AWS provider, connects Lambda functions to event sources, defining how records are batched, filtered, and delivered for processing. This guide focuses on four capabilities: stream sources (DynamoDB, Kinesis), queue sources (SQS), Kafka sources (MSK and self-managed), and message filtering and batching.
Event source mappings reference existing Lambda functions and event sources such as streams, queues, Kafka clusters, and MQ brokers. Some configurations require VPC setup and Secrets Manager credentials. The examples are intentionally small. Combine them with your own Lambda functions, IAM roles, and event infrastructure.
Process DynamoDB table changes with Lambda
Applications that need to react to database changes connect Lambda functions to DynamoDB Streams, enabling real-time processing of inserts, updates, and deletes.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsDynamodbTable.streamArn,
functionName: exampleAwsLambdaFunction.arn,
startingPosition: "LATEST",
tags: {
Name: "dynamodb-stream-mapping",
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_dynamodb_table["streamArn"],
function_name=example_aws_lambda_function["arn"],
starting_position="LATEST",
tags={
"Name": "dynamodb-stream-mapping",
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsDynamodbTable.StreamArn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
StartingPosition: pulumi.String("LATEST"),
Tags: pulumi.StringMap{
"Name": pulumi.String("dynamodb-stream-mapping"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsDynamodbTable.StreamArn,
FunctionName = exampleAwsLambdaFunction.Arn,
StartingPosition = "LATEST",
Tags =
{
{ "Name", "dynamodb-stream-mapping" },
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsDynamodbTable.streamArn())
.functionName(exampleAwsLambdaFunction.arn())
.startingPosition("LATEST")
.tags(Map.of("Name", "dynamodb-stream-mapping"))
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsDynamodbTable.streamArn}
functionName: ${exampleAwsLambdaFunction.arn}
startingPosition: LATEST
tags:
Name: dynamodb-stream-mapping
The eventSourceArn points to your DynamoDB table’s stream. The startingPosition determines where Lambda begins reading: LATEST processes only new records, while TRIM_HORIZON processes all available records from the stream’s retention window.
Consume Kinesis streams with batching and error handling
Real-time data pipelines buffer events in Kinesis before Lambda processing. Batching and parallelization control throughput, while dead letter queues capture failed records.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsKinesisStream.arn,
functionName: exampleAwsLambdaFunction.arn,
startingPosition: "LATEST",
batchSize: 100,
maximumBatchingWindowInSeconds: 5,
parallelizationFactor: 2,
destinationConfig: {
onFailure: {
destinationArn: dlq.arn,
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_kinesis_stream["arn"],
function_name=example_aws_lambda_function["arn"],
starting_position="LATEST",
batch_size=100,
maximum_batching_window_in_seconds=5,
parallelization_factor=2,
destination_config={
"on_failure": {
"destination_arn": dlq["arn"],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsKinesisStream.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
StartingPosition: pulumi.String("LATEST"),
BatchSize: pulumi.Int(100),
MaximumBatchingWindowInSeconds: pulumi.Int(5),
ParallelizationFactor: pulumi.Int(2),
DestinationConfig: &lambda.EventSourceMappingDestinationConfigArgs{
OnFailure: &lambda.EventSourceMappingDestinationConfigOnFailureArgs{
DestinationArn: pulumi.Any(dlq.Arn),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsKinesisStream.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
StartingPosition = "LATEST",
BatchSize = 100,
MaximumBatchingWindowInSeconds = 5,
ParallelizationFactor = 2,
DestinationConfig = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigArgs
{
OnFailure = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigOnFailureArgs
{
DestinationArn = dlq.Arn,
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigOnFailureArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsKinesisStream.arn())
.functionName(exampleAwsLambdaFunction.arn())
.startingPosition("LATEST")
.batchSize(100)
.maximumBatchingWindowInSeconds(5)
.parallelizationFactor(2)
.destinationConfig(EventSourceMappingDestinationConfigArgs.builder()
.onFailure(EventSourceMappingDestinationConfigOnFailureArgs.builder()
.destinationArn(dlq.arn())
.build())
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsKinesisStream.arn}
functionName: ${exampleAwsLambdaFunction.arn}
startingPosition: LATEST
batchSize: 100
maximumBatchingWindowInSeconds: 5
parallelizationFactor: 2
destinationConfig:
onFailure:
destinationArn: ${dlq.arn}
Lambda retrieves up to batchSize records and waits up to maximumBatchingWindowInSeconds before invoking your function. The parallelizationFactor processes multiple batches from each shard concurrently. When processing fails, destinationConfig routes failed records to a dead letter queue for later analysis.
Poll SQS queues with concurrency scaling
Lambda polls SQS queues automatically, scaling the number of concurrent function invocations based on queue depth.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsSqsQueue.arn,
functionName: exampleAwsLambdaFunction.arn,
batchSize: 10,
scalingConfig: {
maximumConcurrency: 100,
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_sqs_queue["arn"],
function_name=example_aws_lambda_function["arn"],
batch_size=10,
scaling_config={
"maximum_concurrency": 100,
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
BatchSize: pulumi.Int(10),
ScalingConfig: &lambda.EventSourceMappingScalingConfigArgs{
MaximumConcurrency: pulumi.Int(100),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsSqsQueue.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
BatchSize = 10,
ScalingConfig = new Aws.Lambda.Inputs.EventSourceMappingScalingConfigArgs
{
MaximumConcurrency = 100,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingScalingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsSqsQueue.arn())
.functionName(exampleAwsLambdaFunction.arn())
.batchSize(10)
.scalingConfig(EventSourceMappingScalingConfigArgs.builder()
.maximumConcurrency(100)
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsSqsQueue.arn}
functionName: ${exampleAwsLambdaFunction.arn}
batchSize: 10
scalingConfig:
maximumConcurrency: 100
The scalingConfig property controls how many function instances Lambda runs concurrently. As queue depth increases, Lambda scales up to maximumConcurrency, then scales down when the queue drains.
Filter SQS messages before Lambda invocation
When queues contain mixed message types, event filtering reduces Lambda invocations by evaluating message content before triggering the function.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsSqsQueue.arn,
functionName: exampleAwsLambdaFunction.arn,
filterCriteria: {
filters: [{
pattern: JSON.stringify({
body: {
Temperature: [{
numeric: [
">",
0,
"<=",
100,
],
}],
Location: ["New York"],
},
}),
}],
},
});
import pulumi
import json
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_sqs_queue["arn"],
function_name=example_aws_lambda_function["arn"],
filter_criteria={
"filters": [{
"pattern": json.dumps({
"body": {
"Temperature": [{
"numeric": [
">",
0,
"<=",
100,
],
}],
"Location": ["New York"],
},
}),
}],
})
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
tmpJSON0, err := json.Marshal(map[string]interface{}{
"body": map[string]interface{}{
"Temperature": []map[string]interface{}{
map[string]interface{}{
"numeric": []interface{}{
">",
0,
"<=",
100,
},
},
},
"Location": []string{
"New York",
},
},
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
_, err = lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
FilterCriteria: &lambda.EventSourceMappingFilterCriteriaArgs{
Filters: lambda.EventSourceMappingFilterCriteriaFilterArray{
&lambda.EventSourceMappingFilterCriteriaFilterArgs{
Pattern: pulumi.String(json0),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsSqsQueue.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
FilterCriteria = new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaArgs
{
Filters = new[]
{
new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaFilterArgs
{
Pattern = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["body"] = new Dictionary<string, object?>
{
["Temperature"] = new[]
{
new Dictionary<string, object?>
{
["numeric"] = new object?[]
{
">",
0,
"<=",
100,
},
},
},
["Location"] = new[]
{
"New York",
},
},
}),
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingFilterCriteriaArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsSqsQueue.arn())
.functionName(exampleAwsLambdaFunction.arn())
.filterCriteria(EventSourceMappingFilterCriteriaArgs.builder()
.filters(EventSourceMappingFilterCriteriaFilterArgs.builder()
.pattern(serializeJson(
jsonObject(
jsonProperty("body", jsonObject(
jsonProperty("Temperature", jsonArray(jsonObject(
jsonProperty("numeric", jsonArray(
">",
0,
"<=",
100
))
))),
jsonProperty("Location", jsonArray("New York"))
))
)))
.build())
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsSqsQueue.arn}
functionName: ${exampleAwsLambdaFunction.arn}
filterCriteria:
filters:
- pattern:
fn::toJSON:
body:
Temperature:
- numeric:
- '>'
- 0
- <=
- 100
Location:
- New York
The filterCriteria property defines a JSON pattern that Lambda evaluates against each message body. Only messages matching the pattern trigger your function. Here, Lambda invokes only for messages with Temperature between 0 and 100 and Location set to “New York”.
Consume Kafka topics from Amazon MSK
Managed Kafka clusters in MSK trigger Lambda functions when new records arrive on specified topics, enabling event-driven processing of Kafka streams.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsMskCluster.arn,
functionName: exampleAwsLambdaFunction.arn,
topics: [
"orders",
"inventory",
],
startingPosition: "TRIM_HORIZON",
batchSize: 100,
amazonManagedKafkaEventSourceConfig: {
consumerGroupId: "lambda-consumer-group",
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_msk_cluster["arn"],
function_name=example_aws_lambda_function["arn"],
topics=[
"orders",
"inventory",
],
starting_position="TRIM_HORIZON",
batch_size=100,
amazon_managed_kafka_event_source_config={
"consumer_group_id": "lambda-consumer-group",
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsMskCluster.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
Topics: pulumi.StringArray{
pulumi.String("orders"),
pulumi.String("inventory"),
},
StartingPosition: pulumi.String("TRIM_HORIZON"),
BatchSize: pulumi.Int(100),
AmazonManagedKafkaEventSourceConfig: &lambda.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs{
ConsumerGroupId: pulumi.String("lambda-consumer-group"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsMskCluster.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
Topics = new[]
{
"orders",
"inventory",
},
StartingPosition = "TRIM_HORIZON",
BatchSize = 100,
AmazonManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs
{
ConsumerGroupId = "lambda-consumer-group",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsMskCluster.arn())
.functionName(exampleAwsLambdaFunction.arn())
.topics(
"orders",
"inventory")
.startingPosition("TRIM_HORIZON")
.batchSize(100)
.amazonManagedKafkaEventSourceConfig(EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs.builder()
.consumerGroupId("lambda-consumer-group")
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsMskCluster.arn}
functionName: ${exampleAwsLambdaFunction.arn}
topics:
- orders
- inventory
startingPosition: TRIM_HORIZON
batchSize: 100
amazonManagedKafkaEventSourceConfig:
consumerGroupId: lambda-consumer-group
The topics property lists Kafka topics to consume. The amazonManagedKafkaEventSourceConfig block sets the consumer group ID, which tracks read position across Lambda invocations. Lambda manages offset commits automatically.
Connect to self-managed Kafka clusters
Lambda can consume from Kafka clusters you manage outside AWS by providing bootstrap server endpoints and VPC access configuration.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
functionName: exampleAwsLambdaFunction.arn,
topics: ["orders"],
startingPosition: "TRIM_HORIZON",
selfManagedEventSource: {
endpoints: {
KAFKA_BOOTSTRAP_SERVERS: "kafka1.example.com:9092,kafka2.example.com:9092",
},
},
selfManagedKafkaEventSourceConfig: {
consumerGroupId: "lambda-consumer-group",
},
sourceAccessConfigurations: [
{
type: "VPC_SUBNET",
uri: `subnet:${example1.id}`,
},
{
type: "VPC_SUBNET",
uri: `subnet:${example2.id}`,
},
{
type: "VPC_SECURITY_GROUP",
uri: `security_group:${exampleAwsSecurityGroup.id}`,
},
],
provisionedPollerConfig: {
maximumPollers: 100,
minimumPollers: 10,
pollerGroupName: "group-123",
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
function_name=example_aws_lambda_function["arn"],
topics=["orders"],
starting_position="TRIM_HORIZON",
self_managed_event_source={
"endpoints": {
"KAFKA_BOOTSTRAP_SERVERS": "kafka1.example.com:9092,kafka2.example.com:9092",
},
},
self_managed_kafka_event_source_config={
"consumer_group_id": "lambda-consumer-group",
},
source_access_configurations=[
{
"type": "VPC_SUBNET",
"uri": f"subnet:{example1['id']}",
},
{
"type": "VPC_SUBNET",
"uri": f"subnet:{example2['id']}",
},
{
"type": "VPC_SECURITY_GROUP",
"uri": f"security_group:{example_aws_security_group['id']}",
},
],
provisioned_poller_config={
"maximum_pollers": 100,
"minimum_pollers": 10,
"poller_group_name": "group-123",
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
Topics: pulumi.StringArray{
pulumi.String("orders"),
},
StartingPosition: pulumi.String("TRIM_HORIZON"),
SelfManagedEventSource: &lambda.EventSourceMappingSelfManagedEventSourceArgs{
Endpoints: pulumi.StringMap{
"KAFKA_BOOTSTRAP_SERVERS": pulumi.String("kafka1.example.com:9092,kafka2.example.com:9092"),
},
},
SelfManagedKafkaEventSourceConfig: &lambda.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs{
ConsumerGroupId: pulumi.String("lambda-consumer-group"),
},
SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("VPC_SUBNET"),
Uri: pulumi.Sprintf("subnet:%v", example1.Id),
},
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("VPC_SUBNET"),
Uri: pulumi.Sprintf("subnet:%v", example2.Id),
},
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("VPC_SECURITY_GROUP"),
Uri: pulumi.Sprintf("security_group:%v", exampleAwsSecurityGroup.Id),
},
},
ProvisionedPollerConfig: &lambda.EventSourceMappingProvisionedPollerConfigArgs{
MaximumPollers: pulumi.Int(100),
MinimumPollers: pulumi.Int(10),
PollerGroupName: pulumi.String("group-123"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
FunctionName = exampleAwsLambdaFunction.Arn,
Topics = new[]
{
"orders",
},
StartingPosition = "TRIM_HORIZON",
SelfManagedEventSource = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedEventSourceArgs
{
Endpoints =
{
{ "KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092" },
},
},
SelfManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs
{
ConsumerGroupId = "lambda-consumer-group",
},
SourceAccessConfigurations = new[]
{
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "VPC_SUBNET",
Uri = $"subnet:{example1.Id}",
},
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "VPC_SUBNET",
Uri = $"subnet:{example2.Id}",
},
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "VPC_SECURITY_GROUP",
Uri = $"security_group:{exampleAwsSecurityGroup.Id}",
},
},
ProvisionedPollerConfig = new Aws.Lambda.Inputs.EventSourceMappingProvisionedPollerConfigArgs
{
MaximumPollers = 100,
MinimumPollers = 10,
PollerGroupName = "group-123",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedEventSourceArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingProvisionedPollerConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.functionName(exampleAwsLambdaFunction.arn())
.topics("orders")
.startingPosition("TRIM_HORIZON")
.selfManagedEventSource(EventSourceMappingSelfManagedEventSourceArgs.builder()
.endpoints(Map.of("KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092"))
.build())
.selfManagedKafkaEventSourceConfig(EventSourceMappingSelfManagedKafkaEventSourceConfigArgs.builder()
.consumerGroupId("lambda-consumer-group")
.build())
.sourceAccessConfigurations(
EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("VPC_SUBNET")
.uri(String.format("subnet:%s", example1.id()))
.build(),
EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("VPC_SUBNET")
.uri(String.format("subnet:%s", example2.id()))
.build(),
EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("VPC_SECURITY_GROUP")
.uri(String.format("security_group:%s", exampleAwsSecurityGroup.id()))
.build())
.provisionedPollerConfig(EventSourceMappingProvisionedPollerConfigArgs.builder()
.maximumPollers(100)
.minimumPollers(10)
.pollerGroupName("group-123")
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
functionName: ${exampleAwsLambdaFunction.arn}
topics:
- orders
startingPosition: TRIM_HORIZON
selfManagedEventSource:
endpoints:
KAFKA_BOOTSTRAP_SERVERS: kafka1.example.com:9092,kafka2.example.com:9092
selfManagedKafkaEventSourceConfig:
consumerGroupId: lambda-consumer-group
sourceAccessConfigurations:
- type: VPC_SUBNET
uri: subnet:${example1.id}
- type: VPC_SUBNET
uri: subnet:${example2.id}
- type: VPC_SECURITY_GROUP
uri: security_group:${exampleAwsSecurityGroup.id}
provisionedPollerConfig:
maximumPollers: 100
minimumPollers: 10
pollerGroupName: group-123
The selfManagedEventSource block specifies Kafka bootstrap servers. The sourceAccessConfigurations array provides VPC subnet and security group references so Lambda can reach your cluster. The provisionedPollerConfig controls how many pollers Lambda allocates to read from Kafka partitions.
Process ActiveMQ messages with authentication
Amazon MQ brokers running ActiveMQ trigger Lambda functions when messages arrive in specified queues, using Secrets Manager for authentication.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsMqBroker.arn,
functionName: exampleAwsLambdaFunction.arn,
queues: "orders",
batchSize: 10,
sourceAccessConfigurations: [{
type: "BASIC_AUTH",
uri: exampleAwsSecretsmanagerSecretVersion.arn,
}],
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_mq_broker["arn"],
function_name=example_aws_lambda_function["arn"],
queues="orders",
batch_size=10,
source_access_configurations=[{
"type": "BASIC_AUTH",
"uri": example_aws_secretsmanager_secret_version["arn"],
}])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsMqBroker.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
Queues: pulumi.String("orders"),
BatchSize: pulumi.Int(10),
SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("BASIC_AUTH"),
Uri: pulumi.Any(exampleAwsSecretsmanagerSecretVersion.Arn),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsMqBroker.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
Queues = "orders",
BatchSize = 10,
SourceAccessConfigurations = new[]
{
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "BASIC_AUTH",
Uri = exampleAwsSecretsmanagerSecretVersion.Arn,
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsMqBroker.arn())
.functionName(exampleAwsLambdaFunction.arn())
.queues("orders")
.batchSize(10)
.sourceAccessConfigurations(EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("BASIC_AUTH")
.uri(exampleAwsSecretsmanagerSecretVersion.arn())
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsMqBroker.arn}
functionName: ${exampleAwsLambdaFunction.arn}
queues: orders
batchSize: 10
sourceAccessConfigurations:
- type: BASIC_AUTH
uri: ${exampleAwsSecretsmanagerSecretVersion.arn}
The queues property specifies which ActiveMQ queue to consume. The sourceAccessConfigurations array references a Secrets Manager secret containing broker credentials. Lambda retrieves credentials at runtime and authenticates to the broker.
Beyond these examples
These snippets focus on specific event source mapping features: stream and queue event sources (DynamoDB, Kinesis, SQS), Kafka integration (MSK and self-managed), and message filtering and batching controls. They’re intentionally minimal rather than full event-driven applications.
The examples reference pre-existing infrastructure such as Lambda functions with appropriate IAM permissions, event sources (DynamoDB tables, Kinesis streams, SQS queues, Kafka clusters, MQ brokers), VPC subnets and security groups (for Kafka and MQ), and Secrets Manager secrets (for authentication). They focus on configuring the event source mapping rather than provisioning the surrounding infrastructure.
To keep things focused, common mapping patterns are omitted, including:
- Error handling options (bisectBatchOnFunctionError, maximumRetryAttempts)
- Windowing for streaming analytics (tumblingWindowInSeconds)
- Metrics configuration (metricsConfig)
- DocumentDB change streams
These omissions are intentional: the goal is to illustrate how each event source mapping feature is wired, not provide drop-in event processing modules. See the Lambda EventSourceMapping resource reference for all available configuration options.
Let's configure AWS Lambda Event Source Mappings
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Configuration & Source Types
eventSourceArn and amazonManagedKafkaEventSourceConfig, while self-managed Kafka uses selfManagedEventSource with bootstrap servers and requires sourceAccessConfigurations for VPC access. These configurations are mutually exclusive.selfManagedEventSource with KAFKA_BOOTSTRAP_SERVERS endpoints, configure selfManagedKafkaEventSourceConfig with a consumer group ID, and include sourceAccessConfigurations with VPC_SUBNET and VPC_SECURITY_GROUP entries.startingPosition parameter is only for stream sources (Kinesis, DynamoDB) and Kafka sources. SQS queues don’t use stream positions, so this parameter must not be provided for SQS event sources.Immutability & Updates
eventSourceArn, startingPosition, startingPositionTimestamp, topics, queues, selfManagedEventSource, amazonManagedKafkaEventSourceConfig, and selfManagedKafkaEventSourceConfig.functionName is the input property where you specify the Lambda function’s name or ARN. functionArn is a computed output property that always returns the actual ARN of the function.Batch Processing & Performance
batchSize is 100 for DynamoDB, Kinesis, MQ, and MSK sources. For SQS queues, the default is 10.maximumBatchingWindowInSeconds (0-300 seconds) to gather records before invoking the function. Records buffer until either the time window expires or batchSize is reached. This is available for stream sources (DynamoDB and Kinesis) and SQS standard queues.parallelizationFactor to process multiple batches from each shard concurrently. The value ranges from 1 (default) to 10, and is only available for stream sources.Error Handling & Retries
destinationConfig with an SQS queue, SNS topic, or S3 bucket (Kafka only) to receive failed records. This is available for stream sources (DynamoDB and Kinesis) and Kafka sources (MSK and self-managed).maximumRetryAttempts to control retries when the function returns an error. The default is -1 (retry forever), with a maximum of 10000 retries. This is only available for stream sources (DynamoDB and Kinesis).functionResponseTypes to ["ReportBatchItemFailures"] to enable AWS Lambda checkpointing. This allows your function to report which items in a batch failed, and is available for SQS and stream sources (DynamoDB and Kinesis).true, Lambda splits the batch in two and retries if the function returns an error. This is only available for stream sources (DynamoDB and Kinesis) and defaults to false.Event Filtering & Advanced Features
filterCriteria with a JSON pattern to filter events from Kinesis streams, DynamoDB streams, or SQS queues. The pattern matches against the event structure to determine which events to process.tumblingWindowInSeconds (1-900 seconds) to define a processing window for AWS Lambda streaming analytics. This is only available for stream sources (DynamoDB and Kinesis).Using a different cloud?
Explore serverless guides for other cloud providers: