The aws:mwaa/environment:Environment resource, part of the Pulumi AWS provider, provisions a managed Apache Airflow environment: its S3 DAG storage, execution role, VPC placement, and runtime configuration. This guide focuses on three capabilities: S3-based DAG storage, Airflow configuration overrides, and CloudWatch logging setup.
MWAA environments require an IAM execution role, a versioned S3 bucket, and VPC infrastructure (two private subnets with internet access and security groups). The examples are intentionally small. Combine them with your own IAM policies, VPC setup, and monitoring infrastructure.
Create an environment with DAGs from S3
Most deployments point to a versioned S3 bucket containing DAG files, specify the execution role, and connect to private subnets for network isolation.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.mwaa.Environment("example", {
dagS3Path: "dags/",
executionRoleArn: exampleAwsIamRole.arn,
name: "example",
networkConfiguration: {
securityGroupIds: [exampleAwsSecurityGroup.id],
subnetIds: _private.map(__item => __item.id),
},
sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws
example = aws.mwaa.Environment("example",
dag_s3_path="dags/",
execution_role_arn=example_aws_iam_role["arn"],
name="example",
network_configuration={
"security_group_ids": [example_aws_security_group["id"]],
"subnet_ids": [__item["id"] for __item in private],
},
source_bucket_arn=example_aws_s3_bucket["arn"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:6,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Mwaa.Environment("example", new()
{
DagS3Path = "dags/",
ExecutionRoleArn = exampleAwsIamRole.Arn,
Name = "example",
NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup.Id,
},
SubnetIds = @private.Select(__item => __item.Id).ToList(),
},
SourceBucketArn = exampleAwsS3Bucket.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Environment("example", EnvironmentArgs.builder()
.dagS3Path("dags/")
.executionRoleArn(exampleAwsIamRole.arn())
.name("example")
.networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
.securityGroupIds(exampleAwsSecurityGroup.id())
.subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
.build())
.sourceBucketArn(exampleAwsS3Bucket.arn())
.build());
}
}
At runtime, MWAA reads DAG definitions from the S3 path specified in dagS3Path. The sourceBucketArn identifies the versioned bucket, and executionRoleArn grants permissions to read from S3 and write logs. The networkConfiguration places Airflow components in your private subnets; these subnets must have internet connectivity (via NAT gateway or similar) for MWAA to function.
Override Airflow runtime settings
Teams often tune Airflow’s behavior for their workload patterns, adjusting parallelism or retry policies.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.mwaa.Environment("example", {
airflowConfigurationOptions: {
"core.default_task_retries": "16",
"core.parallelism": "1",
},
dagS3Path: "dags/",
executionRoleArn: exampleAwsIamRole.arn,
name: "example",
networkConfiguration: {
securityGroupIds: [exampleAwsSecurityGroup.id],
subnetIds: _private.map(__item => __item.id),
},
sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws
example = aws.mwaa.Environment("example",
airflow_configuration_options={
"core.default_task_retries": "16",
"core.parallelism": "1",
},
dag_s3_path="dags/",
execution_role_arn=example_aws_iam_role["arn"],
name="example",
network_configuration={
"security_group_ids": [example_aws_security_group["id"]],
"subnet_ids": [__item["id"] for __item in private],
},
source_bucket_arn=example_aws_s3_bucket["arn"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
AirflowConfigurationOptions: pulumi.StringMap{
"core.default_task_retries": pulumi.String("16"),
"core.parallelism": pulumi.String("1"),
},
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:10,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Mwaa.Environment("example", new()
{
AirflowConfigurationOptions =
{
{ "core.default_task_retries", "16" },
{ "core.parallelism", "1" },
},
DagS3Path = "dags/",
ExecutionRoleArn = exampleAwsIamRole.Arn,
Name = "example",
NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup.Id,
},
SubnetIds = @private.Select(__item => __item.Id).ToList(),
},
SourceBucketArn = exampleAwsS3Bucket.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Environment("example", EnvironmentArgs.builder()
.airflowConfigurationOptions(Map.ofEntries(
Map.entry("core.default_task_retries", "16"),
Map.entry("core.parallelism", "1")
))
.dagS3Path("dags/")
.executionRoleArn(exampleAwsIamRole.arn())
.name("example")
.networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
.securityGroupIds(exampleAwsSecurityGroup.id())
.subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
.build())
.sourceBucketArn(exampleAwsS3Bucket.arn())
.build());
}
}
The airflowConfigurationOptions property passes key-value pairs that override Airflow’s core settings. Here, core.parallelism limits concurrent task execution to 1, and core.default_task_retries sets the retry count to 16. These settings apply at the Airflow configuration level, affecting all DAGs in the environment.
Configure CloudWatch logging for Airflow components
Production environments enable detailed logging across Airflow’s components to troubleshoot DAG processing, scheduling, and task execution.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.mwaa.Environment("example", {
dagS3Path: "dags/",
executionRoleArn: exampleAwsIamRole.arn,
loggingConfiguration: {
dagProcessingLogs: {
enabled: true,
logLevel: "DEBUG",
},
schedulerLogs: {
enabled: true,
logLevel: "INFO",
},
taskLogs: {
enabled: true,
logLevel: "WARNING",
},
webserverLogs: {
enabled: true,
logLevel: "ERROR",
},
workerLogs: {
enabled: true,
logLevel: "CRITICAL",
},
},
name: "example",
networkConfiguration: {
securityGroupIds: [exampleAwsSecurityGroup.id],
subnetIds: _private.map(__item => __item.id),
},
sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws
example = aws.mwaa.Environment("example",
dag_s3_path="dags/",
execution_role_arn=example_aws_iam_role["arn"],
logging_configuration={
"dag_processing_logs": {
"enabled": True,
"log_level": "DEBUG",
},
"scheduler_logs": {
"enabled": True,
"log_level": "INFO",
},
"task_logs": {
"enabled": True,
"log_level": "WARNING",
},
"webserver_logs": {
"enabled": True,
"log_level": "ERROR",
},
"worker_logs": {
"enabled": True,
"log_level": "CRITICAL",
},
},
name="example",
network_configuration={
"security_group_ids": [example_aws_security_group["id"]],
"subnet_ids": [__item["id"] for __item in private],
},
source_bucket_arn=example_aws_s3_bucket["arn"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
LoggingConfiguration: &mwaa.EnvironmentLoggingConfigurationArgs{
DagProcessingLogs: &mwaa.EnvironmentLoggingConfigurationDagProcessingLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("DEBUG"),
},
SchedulerLogs: &mwaa.EnvironmentLoggingConfigurationSchedulerLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("INFO"),
},
TaskLogs: &mwaa.EnvironmentLoggingConfigurationTaskLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("WARNING"),
},
WebserverLogs: &mwaa.EnvironmentLoggingConfigurationWebserverLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("ERROR"),
},
WorkerLogs: &mwaa.EnvironmentLoggingConfigurationWorkerLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("CRITICAL"),
},
},
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:28,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Mwaa.Environment("example", new()
{
DagS3Path = "dags/",
ExecutionRoleArn = exampleAwsIamRole.Arn,
LoggingConfiguration = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationArgs
{
DagProcessingLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationDagProcessingLogsArgs
{
Enabled = true,
LogLevel = "DEBUG",
},
SchedulerLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationSchedulerLogsArgs
{
Enabled = true,
LogLevel = "INFO",
},
TaskLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationTaskLogsArgs
{
Enabled = true,
LogLevel = "WARNING",
},
WebserverLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationWebserverLogsArgs
{
Enabled = true,
LogLevel = "ERROR",
},
WorkerLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationWorkerLogsArgs
{
Enabled = true,
LogLevel = "CRITICAL",
},
},
Name = "example",
NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup.Id,
},
SubnetIds = @private.Select(__item => __item.Id).ToList(),
},
SourceBucketArn = exampleAwsS3Bucket.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationDagProcessingLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationSchedulerLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationTaskLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationWebserverLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationWorkerLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Environment("example", EnvironmentArgs.builder()
.dagS3Path("dags/")
.executionRoleArn(exampleAwsIamRole.arn())
.loggingConfiguration(EnvironmentLoggingConfigurationArgs.builder()
.dagProcessingLogs(EnvironmentLoggingConfigurationDagProcessingLogsArgs.builder()
.enabled(true)
.logLevel("DEBUG")
.build())
.schedulerLogs(EnvironmentLoggingConfigurationSchedulerLogsArgs.builder()
.enabled(true)
.logLevel("INFO")
.build())
.taskLogs(EnvironmentLoggingConfigurationTaskLogsArgs.builder()
.enabled(true)
.logLevel("WARNING")
.build())
.webserverLogs(EnvironmentLoggingConfigurationWebserverLogsArgs.builder()
.enabled(true)
.logLevel("ERROR")
.build())
.workerLogs(EnvironmentLoggingConfigurationWorkerLogsArgs.builder()
.enabled(true)
.logLevel("CRITICAL")
.build())
.build())
.name("example")
.networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
.securityGroupIds(exampleAwsSecurityGroup.id())
.subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
.build())
.sourceBucketArn(exampleAwsS3Bucket.arn())
.build());
}
}
The loggingConfiguration property controls which Airflow components send logs to CloudWatch and at what verbosity. Each component (dagProcessingLogs, schedulerLogs, taskLogs, webserverLogs, workerLogs) can be enabled independently with its own log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Task logs are enabled by default at INFO level; other components require explicit configuration.
Beyond these examples
These snippets focus on specific environment-level features: S3-based DAG storage and execution, Airflow runtime configuration overrides, and CloudWatch logging for Airflow components. They’re intentionally minimal rather than full Airflow deployments.
The examples rely on pre-existing infrastructure such as IAM execution roles with MWAA permissions, versioned S3 buckets for DAG storage, VPC with private subnets and internet connectivity, and security groups for environment access. They focus on configuring the environment rather than provisioning everything around it.
To keep things focused, common environment patterns are omitted, including:
- Environment sizing (environmentClass, minWorkers, maxWorkers)
- Scheduler and webserver scaling (schedulers, minWebservers, maxWebservers)
- Python dependencies and plugins (requirementsS3Path, pluginsS3Path)
- Startup scripts for environment initialization
- KMS encryption and webserver access modes
These omissions are intentional: the goal is to illustrate how each environment feature is wired, not provide drop-in Airflow modules. See the MWAA Environment resource reference for all available configuration options.
Let's deploy AWS Managed Workflows for Apache Airflow
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Prerequisites & Setup
executionRoleArn), two private subnets in your VPC (networkConfiguration), and a versioned S3 bucket (sourceBucketArn).networkConfiguration lack internet connectivity. Ensure both private subnets have internet access, typically through a NAT gateway.Deployment & Configuration
name (environment name), endpointManagement (VPC endpoint management mode), and kmsKey (encryption key ARN).pluginsS3Path, requirementsS3Path, or startupScriptS3Path, you must also specify the corresponding ObjectVersion parameter (pluginsS3ObjectVersion, requirementsS3ObjectVersion, or startupScriptS3ObjectVersion).startupScriptS3Path) are only supported for environment versions 2.x and later.Scaling & Performance
minWorkers) and the default maximum is 10 workers (maxWorkers).mw1.micro, only 1 webserver is allowed.mw1.micro, mw1.small (default), mw1.medium, and mw1.large. The class affects worker and webserver scaling limits.Logging & Monitoring
INFO log level.Access & Security
webserverAccessMode to PUBLIC_ONLY. The default is PRIVATE_ONLY, which restricts access to your VPC.SERVICE), AWS creates and manages the VPC endpoints. Set endpointManagement to CUSTOMER if you want to create and manage them yourself. This setting is immutable.Using a different cloud?
Explore analytics guides for other cloud providers: