Real-time Data Processing with AWS Lambda and Serverless Application Repository
PythonReal-time data processing on AWS can be achieved by integrating different AWS services such as AWS Lambda for running code in response to events and AWS Kinesis for real-time data streaming. For deploying serverless applications, AWS Serverless Application Repository offers a collection of applications that can be instantly deployed.
In a real-time data processing scenario using AWS Lambda, you set up an AWS Kinesis stream which will ingest real-time data. You then create an AWS Lambda function that is triggered by this stream; every time a new record is added to the stream, the AWS Lambda function is invoked to process that data.
In this program, we will create the AWS Kinesis stream and the AWS Lambda function. We will also create the necessary roles and permissions required for the Lambda function to access the Kinesis stream.
The
aws.kinesis.Stream
resource creates a Kinesis stream that you can use to ingest real-time data that the Lambda function will process. Theaws.lambda_.Function
resource represents the Lambda function. For the Lambda function to access the Kinesis stream, we create an IAM role with the necessary permissions usingaws.iam.Role
and its related policies.Here's a Pulumi program in Python that demonstrates how to set this up:
import pulumi import pulumi_aws as aws # Create an IAM role that allows the Lambda function to access the Kinesis Stream and CloudWatch Logs. lambda_role = aws.iam.Role("lambdaRole", assume_role_policy="""{ "Version": "2012-10-17", "Statement": [{ "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" } }] }""" ) # Attach the AWSLambdaKinesisExecutionRole managed policy to the IAM role. lambda_policy_attachment = aws.iam.RolePolicyAttachment("lambdaPolicyAttachment", role=lambda_role.name, policy_arn="arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole" ) # Create a Kinesis stream to ingest real-time data. kinesis_stream = aws.kinesis.Stream("kinesisStream", shard_count=1, retention_period=24 ) # Create the Lambda function that will process the real-time data. lambda_function = aws.lambda_.Function("lambdaFunction", runtime="python3.8", code=pulumi.FileArchive("./lambda"), handler="main.handler", # Assuming 'main' is the filename and 'handler' is the function within that file. role=lambda_role.arn, environment={ "variables": { "KINESIS_STREAM": kinesis_stream.name } }, event_source_mappings=[{ "event_source_arn": kinesis_stream.arn, "starting_position": "LATEST" }] ) # Create a CloudWatch Log Group for the Lambda function logs. log_group = aws.cloudwatch.LogGroup("logGroup", name=pulumi.Output.concat("/aws/lambda/", lambda_function.name) ) # Export the name of the Kinesis stream and the Lambda function. pulumi.export("kinesis_stream_name", kinesis_stream.name) pulumi.export("lambda_function_name", lambda_function.name)
In the above code:
- We start by creating an IAM role that our Lambda function will assume. This role has a trust relationship policy that allows Lambda functions (
lambda.amazonaws.com
) to assume the role. - We attach the
AWSLambdaKinesisExecutionRole
managed policy to the IAM role, which provides the permissions necessary for the Lambda function to read from the Kinesis stream and create log streams in CloudWatch Logs. - A Kinesis stream named
kinesisStream
is created with a single shard and a retention period of 24 hours (the minimum). - An AWS Lambda function named
lambdaFunction
is defined, which points to the Python code (bundled as a.zip
file in the./lambda
directory) and specifies thehandler
that will be invoked. The Lambda function is associated with the IAM role we created and configured to read events from the Kinesis stream. - We create a CloudWatch Log Group named
logGroup
to store logs generated by our Lambda function. - Finally, we export the names of the Kinesis stream and the Lambda function as stack outputs, providing an easy way to query these resources' names with the Pulumi CLI.
Before running this code, make sure you have the AWS CLI configured with your credentials and the Pulumi CLI installed. You should also have a directory named
lambda
at the root of your Pulumi project with a file (e.g.,main.py
) that contains the Lambda function code. The handler function in your Lambda code should be namedhandler
.This Pulumi program will provision the necessary AWS resources for real-time data processing using AWS Lambda and Kinesis. The integration allows your Lambda function to be automatically triggered to process data as it arrives in the data stream.
- We start by creating an IAM role that our Lambda function will assume. This role has a trust relationship policy that allows Lambda functions (