Using aws mwaa with pipes
TypeScriptAWS Managed Workflows for Apache Airflow (MWAA) is a managed orchestration service for Apache Airflow that makes it easier to set up and operate end-to-end data pipelines in the cloud at scale. With Pulumi, you can create and configure an MWAA environment and use Amazon EventBridge (formerly known as AWS CloudWatch Events) to trigger tasks or workflows in response to specific events or conditions.
Below is a Pulumi program written in TypeScript that demonstrates how to create an MWAA environment and set up a simple event-driven pipeline using Amazon EventBridge and Lambda functions. Here's what the setup will do:
- Set up an MWAA environment to orchestrate your data workflows using Apache Airflow.
- Create an IAM role and policy that MWAA will assume to access AWS resources.
- Set up a Lambda function that will act as a task within your workflow.
- Create an EventBridge rule that triggers the Lambda function in response to a specified event pattern.
This program doesn't cover the creation of Airflow DAGs (Directed Acyclic Graphs), which would be written in Python and uploaded to the MWAA environment's associated Amazon S3 bucket. Rather, it focuses on provisioning the infrastructure required to get MWAA and EventBridge up and running.
import * as aws from "@pulumi/aws"; import * as pulumi from "@pulumi/pulumi"; // Create an IAM role for MWAA with necessary permissions. const mwaaRole = new aws.iam.Role("mwaaRole", { assumeRolePolicy: aws.iam.assumeRolePolicyForPrincipal({ Service: "airflow.amazonaws.com", }), }); // Attach the AWS managed MWAA execution policy to the role. const mwaaPolicyAttach = new aws.iam.PolicyAttachment("mwaaPolicyAttach", { roles: [mwaaRole], policyArn: "arn:aws:iam::aws:policy/AmazonMWAAFullAccess", }); // Create an S3 bucket to store the Airflow DAGs const dagsBucket = new aws.s3.Bucket("dagsBucket", { forceDestroy: true, }); // Create the MWAA environment. const mwaaEnvironment = new aws.mwaa.Environment("mwaaEnvironment", { name: "my-mwaa-environment", airflowVersion: "2.0.2", // specify the Airflow version executionRoleArn: mwaaRole.arn, // Reference to the source bucket created above. sourceBucketArn: dagsBucket.arn, // Sample configuration for the network, make sure the security groups and subnets are actually linked. networkConfiguration: { securityGroupIds: ["sg-xxxxxxxx"], subnetIds: ["subnet-xxxxxxxx", "subnet-xxxxxxxx"], }, // Define the tags, can be useful for cost allocation, access control, etc. tags: { "Environment": "Production", }, }, { dependsOn: [mwaaPolicyAttach], // Ensures policy is attached before creating environment }); // IAM Role for the Lambda function. const lambdaRole = new aws.iam.Role("lambdaRole", { assumeRolePolicy: aws.iam.assumeRolePolicyForPrincipal({ Service: "lambda.amazonaws.com", }), }); // Inline policy that allows the Lambda function to log to CloudWatch. const lambdaLoggingPolicy = new aws.iam.RolePolicy("lambdaLogging", { role: lambdaRole.id, policy: { Version: "2012-10-17", Statement: [{ Action: [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", ], Effect: "Allow", Resource: "arn:aws:logs:*:*:*", }], }, }); // Lambda function that will be triggered by EventBridge rule. const lambda = new aws.lambda.Function("myLambda", { role: lambdaRole.arn, runtime: aws.lambda.Runtime.Python3d8, // Use Python 3.8 runtime handler: "index.handler", // handler file is `index.py` and handler function is `handler` code: new pulumi.asset.AssetArchive({ "index.py": new pulumi.asset.StringAsset( `import json def handler(event, context): print("Received event: " + json.dumps(event, indent=2)) return event` ), }), }, { dependsOn: [lambdaLoggingPolicy], // Ensure logging policy is in place before creating Lambda }); // Define the EventBridge rule that triggers the Lambda function. const rule = new aws.cloudwatch.EventRule("myRule", { eventPattern: { // Define the pattern of the events that will trigger the Lambda function. source: ["aws.ec2"], detailType: ["EC2 Instance State-change Notification"], detail: { state: ["running"], }, }, }); // Create an EventBridge target based on the rule. const target = new aws.cloudwatch.EventTarget("myTarget", { rule: rule.name, arn: lambda.arn, }); // Grant the EventBridge service permission to invoke the Lambda function. const permission = new aws.lambda.Permission("myPermission", { action: "lambda:InvokeFunction", function: lambda.name, principal: "events.amazonaws.com", sourceArn: rule.arn, }, { dependsOn: [target], // Ensure the rule and target are created before setting up permission }); // Exporting the name of the MWAA environment and lambda function ARN. export const mwaaEnvironmentName = mwaaEnvironment.name; export const lambdaFunctionArn = lambda.arn;
In this program:
- We use the
aws.mwaa.Environment
resource to create an MWAA environment. - We provide a role with
AmazonMWAAFullAccess
policy to the MWAA environment. - An S3 bucket is created to store the Airflow DAGs (workflow definitions).
- We use the
aws.lambda.Function
resource to create a Lambda function which will be triggered by the EventBridge rule. - The
aws.cloudwatch.EventRule
resource defines the conditions under which the EventBridge will trigger the Lambda function. - The
aws.cloudwatch.EventTarget
resource connects the EventBridge rule to the Lambda function. - Lastly, the
aws.lambda.Permission
resource provides EventBridge with permission to invoke the Lambda function.
This setup illustrates how you can use Pulumi to put together a serverless data processing workflow using AWS MWAA and an event-driven architecture. Remember to upload your Airflow DAGs to the S3 bucket and to update the network configuration in the
networkConfiguration
property to match your AWS environment's actual VPC, subnets, and security groups.