Event-Driven Data Ingestion for Machine Learning Pipelines
PythonEvent-driven data ingestion is a key component of many machine learning pipelines. It allows systems to respond automatically to data as it arrives, processing and incorporating it into machine learning models. To demonstrate how you can create an event-driven data ingestion system on AWS using Pulumi, I’ll walk you through setting up a pipeline that triggers an AWS Lambda function whenever a new file is uploaded to an S3 bucket.
Here's the outline of what we're going to build:
- AWS S3 Bucket: This is the storage location where our data will land. We'll configure this bucket to trigger an event whenever a new file is uploaded.
- AWS Lambda Function: This serverless function will be invoked by the event happening in S3. It can process the data or move it to another service for further processing or analysis.
- IAM Role and Policy: The Lambda function will need the right permissions to access the S3 bucket and perform operations. We'll create a role and a policy with the minimum necessary permissions.
Let's write the code for our setup using Pulumi's Python SDK:
import pulumi import pulumi_aws as aws # Create an S3 bucket data_bucket = aws.s3.Bucket("dataBucket", acl="private", versioning=aws.s3.BucketVersioningArgs( enabled=True, ) ) # Create an IAM role that the Lambda function will assume lambda_role = aws.iam.Role("lambdaRole", assume_role_policy="""{ "Version": "2012-10-17", "Statement": [{ "Action": "sts:AssumeRole", "Principal": { "Service": "lambda.amazonaws.com" }, "Effect": "Allow", "Sid": "" }] }""" ) # Attach a policy to the role that allows the Lambda function to access the S3 bucket policy = aws.iam.RolePolicy("lambdaPolicy", role=lambda_role.id, policy=data_bucket.arn.apply(lambda arn: f"""{{ "Version": "2012-10-17", "Statement": [{{ "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "{arn}", "{arn}/*" ] }}] }}""") ) # Create a Lambda function that will be triggered when new files are added to the S3 bucket data_processor = aws.lambda_.Function("dataProcessor", code=pulumi.FileArchive("./data_processor"), # Assume we have a "data_processor" directory with our code handler="index.handler", # The entrypoint into our Lambda function code role=lambda_role.arn, runtime="python3.8", environment=aws.lambda_.FunctionEnvironmentArgs( variables={ "BUCKET_NAME": data_bucket.bucket } ), ) # Set up the S3 bucket notification to invoke the Lambda function notification = aws.s3.BucketNotification("bucketNotification", bucket=data_bucket.id, lambda_functions=[aws.s3.BucketNotificationLambdaFunctionArgs( lambda_function_arn=data_processor.arn, events=["s3:ObjectCreated:*"], filter_prefix="incoming/", # You can specify a prefix here if necessary filter_suffix=".json", # You can specify a suffix here if it’s a specific file type )], opts=pulumi.ResourceOptions(depends_on=[data_processor]) # Ensure lambda is created before the notification setup ) # Export the name of the bucket pulumi.export('data_bucket_name', data_bucket.bucket)
In this setup:
- We define an
aws.s3.Bucket
resource which is our S3 bucket for data files. - We create an
aws.iam.Role
and anaws.iam.RolePolicy
to grant our Lambda function access to read from the S3 bucket. - We define an
aws.lambda.Function
as our data processing unit. The lambda function will be executed each time a new object is created in the S3 bucket. - The S3
BucketNotification
resource hooks the Lambda function up to the S3 buckets’ object creation events. This is what makes our pipeline event-driven.
This is a simple but powerful setup that can process data immediately as it arrives. The Lambda function can then be extended to validate data, initiate a transformation job, or feed into a stream for real-time analysis. The possibilities are endless with this reactive approach, and it scales well since Lambda functions run in response to events and you are only billed for the compute time you consume.