The aws:lambda/eventSourceMapping:EventSourceMapping resource, part of the Pulumi AWS provider, connects Lambda functions to event sources, defining how records are batched, filtered, and delivered for processing. This guide focuses on four categories of event sources: stream sources (DynamoDB, Kinesis), queue sources (SQS), Kafka sources (MSK and self-managed), and message brokers (ActiveMQ, RabbitMQ).
Event source mappings reference existing Lambda functions and event sources. Some configurations require VPC infrastructure or Secrets Manager credentials. The examples are intentionally small. Combine them with your own Lambda functions, IAM roles, and event source infrastructure.
Process DynamoDB table changes with Lambda
Applications that need to react to database changes connect Lambda functions to DynamoDB Streams, enabling real-time processing of inserts, updates, and deletes.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsDynamodbTable.streamArn,
functionName: exampleAwsLambdaFunction.arn,
startingPosition: "LATEST",
tags: {
Name: "dynamodb-stream-mapping",
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_dynamodb_table["streamArn"],
function_name=example_aws_lambda_function["arn"],
starting_position="LATEST",
tags={
"Name": "dynamodb-stream-mapping",
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsDynamodbTable.StreamArn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
StartingPosition: pulumi.String("LATEST"),
Tags: pulumi.StringMap{
"Name": pulumi.String("dynamodb-stream-mapping"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsDynamodbTable.StreamArn,
FunctionName = exampleAwsLambdaFunction.Arn,
StartingPosition = "LATEST",
Tags =
{
{ "Name", "dynamodb-stream-mapping" },
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsDynamodbTable.streamArn())
.functionName(exampleAwsLambdaFunction.arn())
.startingPosition("LATEST")
.tags(Map.of("Name", "dynamodb-stream-mapping"))
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsDynamodbTable.streamArn}
functionName: ${exampleAwsLambdaFunction.arn}
startingPosition: LATEST
tags:
Name: dynamodb-stream-mapping
The eventSourceArn points to the DynamoDB table’s stream. The startingPosition determines where Lambda begins reading: LATEST processes only new records, while TRIM_HORIZON processes all available records from the stream’s retention window.
Consume Kinesis streams with batching and error handling
Real-time data pipelines use Kinesis to buffer high-throughput events. Lambda processes these streams in batches, with configurable parallelization and dead-letter queues for failed records.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsKinesisStream.arn,
functionName: exampleAwsLambdaFunction.arn,
startingPosition: "LATEST",
batchSize: 100,
maximumBatchingWindowInSeconds: 5,
parallelizationFactor: 2,
destinationConfig: {
onFailure: {
destinationArn: dlq.arn,
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_kinesis_stream["arn"],
function_name=example_aws_lambda_function["arn"],
starting_position="LATEST",
batch_size=100,
maximum_batching_window_in_seconds=5,
parallelization_factor=2,
destination_config={
"on_failure": {
"destination_arn": dlq["arn"],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsKinesisStream.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
StartingPosition: pulumi.String("LATEST"),
BatchSize: pulumi.Int(100),
MaximumBatchingWindowInSeconds: pulumi.Int(5),
ParallelizationFactor: pulumi.Int(2),
DestinationConfig: &lambda.EventSourceMappingDestinationConfigArgs{
OnFailure: &lambda.EventSourceMappingDestinationConfigOnFailureArgs{
DestinationArn: pulumi.Any(dlq.Arn),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsKinesisStream.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
StartingPosition = "LATEST",
BatchSize = 100,
MaximumBatchingWindowInSeconds = 5,
ParallelizationFactor = 2,
DestinationConfig = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigArgs
{
OnFailure = new Aws.Lambda.Inputs.EventSourceMappingDestinationConfigOnFailureArgs
{
DestinationArn = dlq.Arn,
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingDestinationConfigOnFailureArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsKinesisStream.arn())
.functionName(exampleAwsLambdaFunction.arn())
.startingPosition("LATEST")
.batchSize(100)
.maximumBatchingWindowInSeconds(5)
.parallelizationFactor(2)
.destinationConfig(EventSourceMappingDestinationConfigArgs.builder()
.onFailure(EventSourceMappingDestinationConfigOnFailureArgs.builder()
.destinationArn(dlq.arn())
.build())
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsKinesisStream.arn}
functionName: ${exampleAwsLambdaFunction.arn}
startingPosition: LATEST
batchSize: 100
maximumBatchingWindowInSeconds: 5
parallelizationFactor: 2
destinationConfig:
onFailure:
destinationArn: ${dlq.arn}
Lambda retrieves up to batchSize records and waits up to maximumBatchingWindowInSeconds before invoking your function. The parallelizationFactor controls how many batches Lambda processes concurrently from each shard. When processing fails, destinationConfig routes failed records to a dead-letter queue for later analysis.
Poll SQS queues with concurrency scaling
Message-driven architectures use SQS to decouple producers from consumers. Lambda polls queues and scales the number of concurrent function invocations based on queue depth.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsSqsQueue.arn,
functionName: exampleAwsLambdaFunction.arn,
batchSize: 10,
scalingConfig: {
maximumConcurrency: 100,
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_sqs_queue["arn"],
function_name=example_aws_lambda_function["arn"],
batch_size=10,
scaling_config={
"maximum_concurrency": 100,
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
BatchSize: pulumi.Int(10),
ScalingConfig: &lambda.EventSourceMappingScalingConfigArgs{
MaximumConcurrency: pulumi.Int(100),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsSqsQueue.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
BatchSize = 10,
ScalingConfig = new Aws.Lambda.Inputs.EventSourceMappingScalingConfigArgs
{
MaximumConcurrency = 100,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingScalingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsSqsQueue.arn())
.functionName(exampleAwsLambdaFunction.arn())
.batchSize(10)
.scalingConfig(EventSourceMappingScalingConfigArgs.builder()
.maximumConcurrency(100)
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsSqsQueue.arn}
functionName: ${exampleAwsLambdaFunction.arn}
batchSize: 10
scalingConfig:
maximumConcurrency: 100
The scalingConfig property sets maximumConcurrency, which controls how many function instances Lambda runs simultaneously. As queue depth increases, Lambda automatically scales up to this limit. The batchSize determines how many messages each invocation receives.
Filter SQS messages before invoking Lambda
High-volume queues often contain messages that don’t require processing. Event filtering lets Lambda skip irrelevant messages without invoking your function, reducing costs and latency.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsSqsQueue.arn,
functionName: exampleAwsLambdaFunction.arn,
filterCriteria: {
filters: [{
pattern: JSON.stringify({
body: {
Temperature: [{
numeric: [
">",
0,
"<=",
100,
],
}],
Location: ["New York"],
},
}),
}],
},
});
import pulumi
import json
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_sqs_queue["arn"],
function_name=example_aws_lambda_function["arn"],
filter_criteria={
"filters": [{
"pattern": json.dumps({
"body": {
"Temperature": [{
"numeric": [
">",
0,
"<=",
100,
],
}],
"Location": ["New York"],
},
}),
}],
})
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
tmpJSON0, err := json.Marshal(map[string]interface{}{
"body": map[string]interface{}{
"Temperature": []map[string]interface{}{
map[string]interface{}{
"numeric": []interface{}{
">",
0,
"<=",
100,
},
},
},
"Location": []string{
"New York",
},
},
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
_, err = lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsSqsQueue.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
FilterCriteria: &lambda.EventSourceMappingFilterCriteriaArgs{
Filters: lambda.EventSourceMappingFilterCriteriaFilterArray{
&lambda.EventSourceMappingFilterCriteriaFilterArgs{
Pattern: pulumi.String(json0),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsSqsQueue.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
FilterCriteria = new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaArgs
{
Filters = new[]
{
new Aws.Lambda.Inputs.EventSourceMappingFilterCriteriaFilterArgs
{
Pattern = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["body"] = new Dictionary<string, object?>
{
["Temperature"] = new[]
{
new Dictionary<string, object?>
{
["numeric"] = new object?[]
{
">",
0,
"<=",
100,
},
},
},
["Location"] = new[]
{
"New York",
},
},
}),
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingFilterCriteriaArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsSqsQueue.arn())
.functionName(exampleAwsLambdaFunction.arn())
.filterCriteria(EventSourceMappingFilterCriteriaArgs.builder()
.filters(EventSourceMappingFilterCriteriaFilterArgs.builder()
.pattern(serializeJson(
jsonObject(
jsonProperty("body", jsonObject(
jsonProperty("Temperature", jsonArray(jsonObject(
jsonProperty("numeric", jsonArray(
">",
0,
"<=",
100
))
))),
jsonProperty("Location", jsonArray("New York"))
))
)))
.build())
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsSqsQueue.arn}
functionName: ${exampleAwsLambdaFunction.arn}
filterCriteria:
filters:
- pattern:
fn::toJSON:
body:
Temperature:
- numeric:
- '>'
- 0
- <=
- 100
Location:
- New York
The filterCriteria property defines a JSON pattern that Lambda evaluates against each message body. Only messages matching the pattern trigger function invocations. In this configuration, Lambda processes only messages with Temperature between 0 and 100 and Location set to “New York”.
Consume Kafka topics from managed MSK clusters
Event-driven systems built on Kafka use MSK for managed cluster operations. Lambda subscribes to topics and processes records in batches, with consumer group coordination handled automatically.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsMskCluster.arn,
functionName: exampleAwsLambdaFunction.arn,
topics: [
"orders",
"inventory",
],
startingPosition: "TRIM_HORIZON",
batchSize: 100,
amazonManagedKafkaEventSourceConfig: {
consumerGroupId: "lambda-consumer-group",
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_msk_cluster["arn"],
function_name=example_aws_lambda_function["arn"],
topics=[
"orders",
"inventory",
],
starting_position="TRIM_HORIZON",
batch_size=100,
amazon_managed_kafka_event_source_config={
"consumer_group_id": "lambda-consumer-group",
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsMskCluster.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
Topics: pulumi.StringArray{
pulumi.String("orders"),
pulumi.String("inventory"),
},
StartingPosition: pulumi.String("TRIM_HORIZON"),
BatchSize: pulumi.Int(100),
AmazonManagedKafkaEventSourceConfig: &lambda.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs{
ConsumerGroupId: pulumi.String("lambda-consumer-group"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsMskCluster.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
Topics = new[]
{
"orders",
"inventory",
},
StartingPosition = "TRIM_HORIZON",
BatchSize = 100,
AmazonManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs
{
ConsumerGroupId = "lambda-consumer-group",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsMskCluster.arn())
.functionName(exampleAwsLambdaFunction.arn())
.topics(
"orders",
"inventory")
.startingPosition("TRIM_HORIZON")
.batchSize(100)
.amazonManagedKafkaEventSourceConfig(EventSourceMappingAmazonManagedKafkaEventSourceConfigArgs.builder()
.consumerGroupId("lambda-consumer-group")
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsMskCluster.arn}
functionName: ${exampleAwsLambdaFunction.arn}
topics:
- orders
- inventory
startingPosition: TRIM_HORIZON
batchSize: 100
amazonManagedKafkaEventSourceConfig:
consumerGroupId: lambda-consumer-group
The topics property lists Kafka topics to consume. The amazonManagedKafkaEventSourceConfig block sets the consumerGroupId, which Lambda uses for offset management and coordination. The startingPosition determines whether Lambda reads from the beginning (TRIM_HORIZON) or only new records (LATEST).
Connect to self-hosted Kafka clusters
Organizations running their own Kafka infrastructure can connect Lambda to self-managed clusters by providing bootstrap servers and VPC access configuration.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
functionName: exampleAwsLambdaFunction.arn,
topics: ["orders"],
startingPosition: "TRIM_HORIZON",
selfManagedEventSource: {
endpoints: {
KAFKA_BOOTSTRAP_SERVERS: "kafka1.example.com:9092,kafka2.example.com:9092",
},
},
selfManagedKafkaEventSourceConfig: {
consumerGroupId: "lambda-consumer-group",
},
sourceAccessConfigurations: [
{
type: "VPC_SUBNET",
uri: `subnet:${example1.id}`,
},
{
type: "VPC_SUBNET",
uri: `subnet:${example2.id}`,
},
{
type: "VPC_SECURITY_GROUP",
uri: `security_group:${exampleAwsSecurityGroup.id}`,
},
],
provisionedPollerConfig: {
maximumPollers: 100,
minimumPollers: 10,
},
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
function_name=example_aws_lambda_function["arn"],
topics=["orders"],
starting_position="TRIM_HORIZON",
self_managed_event_source={
"endpoints": {
"KAFKA_BOOTSTRAP_SERVERS": "kafka1.example.com:9092,kafka2.example.com:9092",
},
},
self_managed_kafka_event_source_config={
"consumer_group_id": "lambda-consumer-group",
},
source_access_configurations=[
{
"type": "VPC_SUBNET",
"uri": f"subnet:{example1['id']}",
},
{
"type": "VPC_SUBNET",
"uri": f"subnet:{example2['id']}",
},
{
"type": "VPC_SECURITY_GROUP",
"uri": f"security_group:{example_aws_security_group['id']}",
},
],
provisioned_poller_config={
"maximum_pollers": 100,
"minimum_pollers": 10,
})
package main
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
Topics: pulumi.StringArray{
pulumi.String("orders"),
},
StartingPosition: pulumi.String("TRIM_HORIZON"),
SelfManagedEventSource: &lambda.EventSourceMappingSelfManagedEventSourceArgs{
Endpoints: pulumi.StringMap{
"KAFKA_BOOTSTRAP_SERVERS": pulumi.String("kafka1.example.com:9092,kafka2.example.com:9092"),
},
},
SelfManagedKafkaEventSourceConfig: &lambda.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs{
ConsumerGroupId: pulumi.String("lambda-consumer-group"),
},
SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("VPC_SUBNET"),
Uri: pulumi.Sprintf("subnet:%v", example1.Id),
},
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("VPC_SUBNET"),
Uri: pulumi.Sprintf("subnet:%v", example2.Id),
},
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("VPC_SECURITY_GROUP"),
Uri: pulumi.Sprintf("security_group:%v", exampleAwsSecurityGroup.Id),
},
},
ProvisionedPollerConfig: &lambda.EventSourceMappingProvisionedPollerConfigArgs{
MaximumPollers: pulumi.Int(100),
MinimumPollers: pulumi.Int(10),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
FunctionName = exampleAwsLambdaFunction.Arn,
Topics = new[]
{
"orders",
},
StartingPosition = "TRIM_HORIZON",
SelfManagedEventSource = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedEventSourceArgs
{
Endpoints =
{
{ "KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092" },
},
},
SelfManagedKafkaEventSourceConfig = new Aws.Lambda.Inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs
{
ConsumerGroupId = "lambda-consumer-group",
},
SourceAccessConfigurations = new[]
{
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "VPC_SUBNET",
Uri = $"subnet:{example1.Id}",
},
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "VPC_SUBNET",
Uri = $"subnet:{example2.Id}",
},
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "VPC_SECURITY_GROUP",
Uri = $"security_group:{exampleAwsSecurityGroup.Id}",
},
},
ProvisionedPollerConfig = new Aws.Lambda.Inputs.EventSourceMappingProvisionedPollerConfigArgs
{
MaximumPollers = 100,
MinimumPollers = 10,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedEventSourceArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSelfManagedKafkaEventSourceConfigArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingProvisionedPollerConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.functionName(exampleAwsLambdaFunction.arn())
.topics("orders")
.startingPosition("TRIM_HORIZON")
.selfManagedEventSource(EventSourceMappingSelfManagedEventSourceArgs.builder()
.endpoints(Map.of("KAFKA_BOOTSTRAP_SERVERS", "kafka1.example.com:9092,kafka2.example.com:9092"))
.build())
.selfManagedKafkaEventSourceConfig(EventSourceMappingSelfManagedKafkaEventSourceConfigArgs.builder()
.consumerGroupId("lambda-consumer-group")
.build())
.sourceAccessConfigurations(
EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("VPC_SUBNET")
.uri(String.format("subnet:%s", example1.id()))
.build(),
EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("VPC_SUBNET")
.uri(String.format("subnet:%s", example2.id()))
.build(),
EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("VPC_SECURITY_GROUP")
.uri(String.format("security_group:%s", exampleAwsSecurityGroup.id()))
.build())
.provisionedPollerConfig(EventSourceMappingProvisionedPollerConfigArgs.builder()
.maximumPollers(100)
.minimumPollers(10)
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
functionName: ${exampleAwsLambdaFunction.arn}
topics:
- orders
startingPosition: TRIM_HORIZON
selfManagedEventSource:
endpoints:
KAFKA_BOOTSTRAP_SERVERS: kafka1.example.com:9092,kafka2.example.com:9092
selfManagedKafkaEventSourceConfig:
consumerGroupId: lambda-consumer-group
sourceAccessConfigurations:
- type: VPC_SUBNET
uri: subnet:${example1.id}
- type: VPC_SUBNET
uri: subnet:${example2.id}
- type: VPC_SECURITY_GROUP
uri: security_group:${exampleAwsSecurityGroup.id}
provisionedPollerConfig:
maximumPollers: 100
minimumPollers: 10
The selfManagedEventSource block specifies Kafka bootstrap servers via the KAFKA_BOOTSTRAP_SERVERS endpoint. The sourceAccessConfigurations array provides VPC subnet and security group references, enabling Lambda to reach the cluster. The provisionedPollerConfig controls how many pollers Lambda maintains for consuming records.
Process ActiveMQ messages with authentication
Message brokers like ActiveMQ require authentication credentials. Lambda retrieves these from Secrets Manager when establishing connections to broker queues.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.lambda.EventSourceMapping("example", {
eventSourceArn: exampleAwsMqBroker.arn,
functionName: exampleAwsLambdaFunction.arn,
queues: "orders",
batchSize: 10,
sourceAccessConfigurations: [{
type: "BASIC_AUTH",
uri: exampleAwsSecretsmanagerSecretVersion.arn,
}],
});
import pulumi
import pulumi_aws as aws
example = aws.lambda_.EventSourceMapping("example",
event_source_arn=example_aws_mq_broker["arn"],
function_name=example_aws_lambda_function["arn"],
queues="orders",
batch_size=10,
source_access_configurations=[{
"type": "BASIC_AUTH",
"uri": example_aws_secretsmanager_secret_version["arn"],
}])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := lambda.NewEventSourceMapping(ctx, "example", &lambda.EventSourceMappingArgs{
EventSourceArn: pulumi.Any(exampleAwsMqBroker.Arn),
FunctionName: pulumi.Any(exampleAwsLambdaFunction.Arn),
Queues: pulumi.String("orders"),
BatchSize: pulumi.Int(10),
SourceAccessConfigurations: lambda.EventSourceMappingSourceAccessConfigurationArray{
&lambda.EventSourceMappingSourceAccessConfigurationArgs{
Type: pulumi.String("BASIC_AUTH"),
Uri: pulumi.Any(exampleAwsSecretsmanagerSecretVersion.Arn),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Lambda.EventSourceMapping("example", new()
{
EventSourceArn = exampleAwsMqBroker.Arn,
FunctionName = exampleAwsLambdaFunction.Arn,
Queues = "orders",
BatchSize = 10,
SourceAccessConfigurations = new[]
{
new Aws.Lambda.Inputs.EventSourceMappingSourceAccessConfigurationArgs
{
Type = "BASIC_AUTH",
Uri = exampleAwsSecretsmanagerSecretVersion.Arn,
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.lambda.EventSourceMapping;
import com.pulumi.aws.lambda.EventSourceMappingArgs;
import com.pulumi.aws.lambda.inputs.EventSourceMappingSourceAccessConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new EventSourceMapping("example", EventSourceMappingArgs.builder()
.eventSourceArn(exampleAwsMqBroker.arn())
.functionName(exampleAwsLambdaFunction.arn())
.queues("orders")
.batchSize(10)
.sourceAccessConfigurations(EventSourceMappingSourceAccessConfigurationArgs.builder()
.type("BASIC_AUTH")
.uri(exampleAwsSecretsmanagerSecretVersion.arn())
.build())
.build());
}
}
resources:
example:
type: aws:lambda:EventSourceMapping
properties:
eventSourceArn: ${exampleAwsMqBroker.arn}
functionName: ${exampleAwsLambdaFunction.arn}
queues: orders
batchSize: 10
sourceAccessConfigurations:
- type: BASIC_AUTH
uri: ${exampleAwsSecretsmanagerSecretVersion.arn}
The queues property specifies which ActiveMQ queue to consume. The sourceAccessConfigurations array includes a BASIC_AUTH entry pointing to a Secrets Manager secret containing broker credentials. Lambda retrieves these credentials automatically when connecting to the broker.
Beyond these examples
These snippets focus on specific event source mapping features: stream and queue event sources (DynamoDB, Kinesis, SQS), Kafka integration (MSK and self-managed), and message broker connectivity (ActiveMQ, RabbitMQ, DocumentDB). They’re intentionally minimal rather than full event-driven applications.
The examples reference pre-existing infrastructure such as Lambda functions, event sources (streams, queues, Kafka clusters, message brokers), VPC subnets and security groups (for self-managed Kafka), and Secrets Manager secrets (for authenticated sources). They focus on configuring the event source mapping rather than provisioning the surrounding infrastructure.
To keep things focused, common event source mapping patterns are omitted, including:
- Tumbling windows for streaming analytics (tumblingWindowInSeconds)
- Batch splitting on errors (bisectBatchOnFunctionError)
- Record age and retry limits (maximumRecordAgeInSeconds, maximumRetryAttempts)
- CloudWatch metrics configuration (metricsConfig)
- KMS encryption for filter criteria (kmsKeyArn)
These omissions are intentional: the goal is to illustrate how each event source type is wired, not provide drop-in event processing modules. See the Lambda EventSourceMapping resource reference for all available configuration options.
Let's configure AWS Lambda Event Source Mappings
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Configuration & Compatibility
eventSourceArn, queues, topics, startingPosition, startingPositionTimestamp, amazonManagedKafkaEventSourceConfig, and selfManagedKafkaEventSourceConfig. Changes to these require recreating the resource.amazonManagedKafkaEventSourceConfig is incompatible with selfManagedEventSource and selfManagedKafkaEventSourceConfig. Use amazonManagedKafkaEventSourceConfig for MSK, selfManagedKafkaEventSourceConfig for self-managed Kafka.startingPosition. This property is only required for stream sources like Kinesis, DynamoDB, MSK, and self-managed Kafka.Feature availability varies by source:
- Stream sources only (DynamoDB/Kinesis):
bisectBatchOnFunctionError,maximumRecordAgeInSeconds,maximumRetryAttempts,parallelizationFactor,tumblingWindowInSeconds - Stream sources and Kafka:
destinationConfig - SQS only:
scalingConfig - MSK/self-managed Kafka only:
provisionedPollerConfig - Stream sources and SQS:
maximumBatchingWindowInSeconds,functionResponseTypes,metricsConfig
Event Sources & Setup
selfManagedEventSource with KAFKA_BOOTSTRAP_SERVERS endpoints, selfManagedKafkaEventSourceConfig with consumerGroupId, and sourceAccessConfigurations for VPC access (subnets and security groups).sourceAccessConfigurations with type BASIC_AUTH pointing to a Secrets Manager secret ARN. For RabbitMQ, also include a VIRTUAL_HOST configuration.functionName is your input (can be a name or ARN), while functionArn is a computed output showing the actual ARN that Lambda uses for the event source mapping.Batch Processing & Performance
parallelizationFactor (1-10) for stream sources (DynamoDB and Kinesis). The default is 1, meaning sequential processing per shard.maximumBatchingWindowInSeconds (0-300) for stream sources and SQS standard queues. Records buffer until this timeout expires or batchSize is reached.Error Handling & Retries
maximumRetryAttempts defaults to -1 (forever) with a max of 10000. maximumRecordAgeInSeconds defaults to -1 (forever) or can be set between 60 and 604800 seconds.destinationConfig with an SQS queue, SNS topic, or S3 bucket (Kafka only) for failed records. This is available for stream sources (DynamoDB/Kinesis) and Kafka sources (MSK/self-managed).Advanced Features
filterCriteria with a JSON pattern for Kinesis, DynamoDB, or SQS sources. The pattern can match on message body fields, as shown in the SQS filtering example with Temperature and Location filters.scalingConfig with maximumConcurrency to limit how many concurrent Lambda invocations process messages from the SQS queue.Using a different cloud?
Explore serverless guides for other cloud providers: