Deploy AWS Managed Workflows for Apache Airflow

The aws:mwaa/environment:Environment resource, part of the Pulumi AWS provider, provisions a managed Apache Airflow environment: its S3 DAG storage, execution role, VPC placement, and runtime configuration. This guide focuses on three capabilities: S3-based DAG storage, Airflow configuration overrides, and CloudWatch logging setup.

MWAA environments require an IAM execution role, a versioned S3 bucket, and VPC infrastructure (two private subnets with internet access and security groups). The examples are intentionally small. Combine them with your own IAM policies, VPC setup, and monitoring infrastructure.

Create an environment with DAGs from S3

Most deployments point to a versioned S3 bucket containing DAG files, specify the execution role, and connect to private subnets for network isolation.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.mwaa.Environment("example", {
    dagS3Path: "dags/",
    executionRoleArn: exampleAwsIamRole.arn,
    name: "example",
    networkConfiguration: {
        securityGroupIds: [exampleAwsSecurityGroup.id],
        subnetIds: _private.map(__item => __item.id),
    },
    sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws

example = aws.mwaa.Environment("example",
    dag_s3_path="dags/",
    execution_role_arn=example_aws_iam_role["arn"],
    name="example",
    network_configuration={
        "security_group_ids": [example_aws_security_group["id"]],
        "subnet_ids": [__item["id"] for __item in private],
    },
    source_bucket_arn=example_aws_s3_bucket["arn"])
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:6,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Mwaa.Environment("example", new()
    {
        DagS3Path = "dags/",
        ExecutionRoleArn = exampleAwsIamRole.Arn,
        Name = "example",
        NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
        {
            SecurityGroupIds = new[]
            {
                exampleAwsSecurityGroup.Id,
            },
            SubnetIds = @private.Select(__item => __item.Id).ToList(),
        },
        SourceBucketArn = exampleAwsS3Bucket.Arn,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Environment("example", EnvironmentArgs.builder()
            .dagS3Path("dags/")
            .executionRoleArn(exampleAwsIamRole.arn())
            .name("example")
            .networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
                .securityGroupIds(exampleAwsSecurityGroup.id())
                .subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
                .build())
            .sourceBucketArn(exampleAwsS3Bucket.arn())
            .build());

    }
}

At runtime, MWAA reads DAG definitions from the S3 path specified in dagS3Path. The sourceBucketArn identifies the versioned bucket, and executionRoleArn grants permissions to read from S3 and write logs. The networkConfiguration places Airflow components in your private subnets; these subnets must have internet connectivity (via NAT gateway or similar) for MWAA to function.

Override Airflow runtime settings

Teams often tune Airflow’s behavior for their workload patterns, adjusting parallelism or retry policies.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.mwaa.Environment("example", {
    airflowConfigurationOptions: {
        "core.default_task_retries": "16",
        "core.parallelism": "1",
    },
    dagS3Path: "dags/",
    executionRoleArn: exampleAwsIamRole.arn,
    name: "example",
    networkConfiguration: {
        securityGroupIds: [exampleAwsSecurityGroup.id],
        subnetIds: _private.map(__item => __item.id),
    },
    sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws

example = aws.mwaa.Environment("example",
    airflow_configuration_options={
        "core.default_task_retries": "16",
        "core.parallelism": "1",
    },
    dag_s3_path="dags/",
    execution_role_arn=example_aws_iam_role["arn"],
    name="example",
    network_configuration={
        "security_group_ids": [example_aws_security_group["id"]],
        "subnet_ids": [__item["id"] for __item in private],
    },
    source_bucket_arn=example_aws_s3_bucket["arn"])
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
AirflowConfigurationOptions: pulumi.StringMap{
"core.default_task_retries": pulumi.String("16"),
"core.parallelism": pulumi.String("1"),
},
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:10,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Mwaa.Environment("example", new()
    {
        AirflowConfigurationOptions = 
        {
            { "core.default_task_retries", "16" },
            { "core.parallelism", "1" },
        },
        DagS3Path = "dags/",
        ExecutionRoleArn = exampleAwsIamRole.Arn,
        Name = "example",
        NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
        {
            SecurityGroupIds = new[]
            {
                exampleAwsSecurityGroup.Id,
            },
            SubnetIds = @private.Select(__item => __item.Id).ToList(),
        },
        SourceBucketArn = exampleAwsS3Bucket.Arn,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Environment("example", EnvironmentArgs.builder()
            .airflowConfigurationOptions(Map.ofEntries(
                Map.entry("core.default_task_retries", "16"),
                Map.entry("core.parallelism", "1")
            ))
            .dagS3Path("dags/")
            .executionRoleArn(exampleAwsIamRole.arn())
            .name("example")
            .networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
                .securityGroupIds(exampleAwsSecurityGroup.id())
                .subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
                .build())
            .sourceBucketArn(exampleAwsS3Bucket.arn())
            .build());

    }
}

The airflowConfigurationOptions property passes key-value pairs that override Airflow’s core settings. Here, core.parallelism limits concurrent task execution to 1, and core.default_task_retries sets the retry count to 16. These settings apply at the Airflow configuration level, affecting all DAGs in the environment.

Configure CloudWatch logging for Airflow components

Production environments enable detailed logging across Airflow’s components to troubleshoot DAG processing, scheduling, and task execution.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.mwaa.Environment("example", {
    dagS3Path: "dags/",
    executionRoleArn: exampleAwsIamRole.arn,
    loggingConfiguration: {
        dagProcessingLogs: {
            enabled: true,
            logLevel: "DEBUG",
        },
        schedulerLogs: {
            enabled: true,
            logLevel: "INFO",
        },
        taskLogs: {
            enabled: true,
            logLevel: "WARNING",
        },
        webserverLogs: {
            enabled: true,
            logLevel: "ERROR",
        },
        workerLogs: {
            enabled: true,
            logLevel: "CRITICAL",
        },
    },
    name: "example",
    networkConfiguration: {
        securityGroupIds: [exampleAwsSecurityGroup.id],
        subnetIds: _private.map(__item => __item.id),
    },
    sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws

example = aws.mwaa.Environment("example",
    dag_s3_path="dags/",
    execution_role_arn=example_aws_iam_role["arn"],
    logging_configuration={
        "dag_processing_logs": {
            "enabled": True,
            "log_level": "DEBUG",
        },
        "scheduler_logs": {
            "enabled": True,
            "log_level": "INFO",
        },
        "task_logs": {
            "enabled": True,
            "log_level": "WARNING",
        },
        "webserver_logs": {
            "enabled": True,
            "log_level": "ERROR",
        },
        "worker_logs": {
            "enabled": True,
            "log_level": "CRITICAL",
        },
    },
    name="example",
    network_configuration={
        "security_group_ids": [example_aws_security_group["id"]],
        "subnet_ids": [__item["id"] for __item in private],
    },
    source_bucket_arn=example_aws_s3_bucket["arn"])
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
LoggingConfiguration: &mwaa.EnvironmentLoggingConfigurationArgs{
DagProcessingLogs: &mwaa.EnvironmentLoggingConfigurationDagProcessingLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("DEBUG"),
},
SchedulerLogs: &mwaa.EnvironmentLoggingConfigurationSchedulerLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("INFO"),
},
TaskLogs: &mwaa.EnvironmentLoggingConfigurationTaskLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("WARNING"),
},
WebserverLogs: &mwaa.EnvironmentLoggingConfigurationWebserverLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("ERROR"),
},
WorkerLogs: &mwaa.EnvironmentLoggingConfigurationWorkerLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("CRITICAL"),
},
},
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:28,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Mwaa.Environment("example", new()
    {
        DagS3Path = "dags/",
        ExecutionRoleArn = exampleAwsIamRole.Arn,
        LoggingConfiguration = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationArgs
        {
            DagProcessingLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationDagProcessingLogsArgs
            {
                Enabled = true,
                LogLevel = "DEBUG",
            },
            SchedulerLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationSchedulerLogsArgs
            {
                Enabled = true,
                LogLevel = "INFO",
            },
            TaskLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationTaskLogsArgs
            {
                Enabled = true,
                LogLevel = "WARNING",
            },
            WebserverLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationWebserverLogsArgs
            {
                Enabled = true,
                LogLevel = "ERROR",
            },
            WorkerLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationWorkerLogsArgs
            {
                Enabled = true,
                LogLevel = "CRITICAL",
            },
        },
        Name = "example",
        NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
        {
            SecurityGroupIds = new[]
            {
                exampleAwsSecurityGroup.Id,
            },
            SubnetIds = @private.Select(__item => __item.Id).ToList(),
        },
        SourceBucketArn = exampleAwsS3Bucket.Arn,
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationDagProcessingLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationSchedulerLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationTaskLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationWebserverLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationWorkerLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Environment("example", EnvironmentArgs.builder()
            .dagS3Path("dags/")
            .executionRoleArn(exampleAwsIamRole.arn())
            .loggingConfiguration(EnvironmentLoggingConfigurationArgs.builder()
                .dagProcessingLogs(EnvironmentLoggingConfigurationDagProcessingLogsArgs.builder()
                    .enabled(true)
                    .logLevel("DEBUG")
                    .build())
                .schedulerLogs(EnvironmentLoggingConfigurationSchedulerLogsArgs.builder()
                    .enabled(true)
                    .logLevel("INFO")
                    .build())
                .taskLogs(EnvironmentLoggingConfigurationTaskLogsArgs.builder()
                    .enabled(true)
                    .logLevel("WARNING")
                    .build())
                .webserverLogs(EnvironmentLoggingConfigurationWebserverLogsArgs.builder()
                    .enabled(true)
                    .logLevel("ERROR")
                    .build())
                .workerLogs(EnvironmentLoggingConfigurationWorkerLogsArgs.builder()
                    .enabled(true)
                    .logLevel("CRITICAL")
                    .build())
                .build())
            .name("example")
            .networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
                .securityGroupIds(exampleAwsSecurityGroup.id())
                .subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
                .build())
            .sourceBucketArn(exampleAwsS3Bucket.arn())
            .build());

    }
}

The loggingConfiguration property controls which Airflow components send logs to CloudWatch and at what verbosity. Each component (dagProcessingLogs, schedulerLogs, taskLogs, webserverLogs, workerLogs) can be enabled independently with its own log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Task logs are enabled by default at INFO level; other components require explicit configuration.

Beyond these examples

These snippets focus on specific environment-level features: S3-based DAG storage and execution, Airflow runtime configuration overrides, and CloudWatch logging for Airflow components. They’re intentionally minimal rather than full Airflow deployments.

The examples rely on pre-existing infrastructure such as IAM execution roles with MWAA permissions, versioned S3 buckets for DAG storage, VPC with private subnets and internet connectivity, and security groups for environment access. They focus on configuring the environment rather than provisioning everything around it.

To keep things focused, common environment patterns are omitted, including:

  • Environment sizing (environmentClass, minWorkers, maxWorkers)
  • Scheduler and webserver scaling (schedulers, minWebservers, maxWebservers)
  • Python dependencies and plugins (requirementsS3Path, pluginsS3Path)
  • Startup scripts for environment initialization
  • KMS encryption and webserver access modes

These omissions are intentional: the goal is to illustrate how each environment feature is wired, not provide drop-in Airflow modules. See the MWAA Environment resource reference for all available configuration options.

Let's deploy AWS Managed Workflows for Apache Airflow

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Prerequisites & Setup
What resources do I need to create an MWAA environment?
You need three components: an IAM role (executionRoleArn), two private subnets in your VPC (networkConfiguration), and a versioned S3 bucket (sourceBucketArn).
Why is my MWAA environment deployment failing?
Deployment fails if the subnets in networkConfiguration lack internet connectivity. Ensure both private subnets have internet access, typically through a NAT gateway.
Deployment & Configuration
What properties can't I change after creating the environment?
Three properties are immutable: name (environment name), endpointManagement (VPC endpoint management mode), and kmsKey (encryption key ARN).
Do I need to specify a version when using plugins or requirements files?
Yes. If you provide pluginsS3Path, requirementsS3Path, or startupScriptS3Path, you must also specify the corresponding ObjectVersion parameter (pluginsS3ObjectVersion, requirementsS3ObjectVersion, or startupScriptS3ObjectVersion).
Can I use a startup script with any Airflow version?
No, startup scripts (startupScriptS3Path) are only supported for environment versions 2.x and later.
Scaling & Performance
What are the worker scaling limits?
Workers can scale between 1 and 25. The default minimum is 1 worker (minWorkers) and the default maximum is 10 workers (maxWorkers).
How many webservers can I run?
For most environment classes, you can run 2 to 5 webservers. For mw1.micro, only 1 webserver is allowed.
How many schedulers can I configure?
For Airflow v2.0.2 and above, you can run 2 to 5 schedulers (default is 2). For v1.10.12, only 1 scheduler is supported.
What environment classes are available?
Four classes are available: mw1.micro, mw1.small (default), mw1.medium, and mw1.large. The class affects worker and webserver scaling limits.
Logging & Monitoring
What's the default log level for Airflow tasks?
Airflow task logs are enabled by default with the INFO log level.
Access & Security
Can I access the webserver from the internet?
Yes, by setting webserverAccessMode to PUBLIC_ONLY. The default is PRIVATE_ONLY, which restricts access to your VPC.
Who manages the VPC endpoints for my environment?
By default (SERVICE), AWS creates and manages the VPC endpoints. Set endpointManagement to CUSTOMER if you want to create and manage them yourself. This setting is immutable.

Using a different cloud?

Explore analytics guides for other cloud providers: