The aws:mwaa/environment:Environment resource, part of the Pulumi AWS provider, provisions a managed Apache Airflow environment: its S3 DAG source, execution role, network placement, and runtime configuration. This guide focuses on three capabilities: S3-based DAG storage, Airflow configuration overrides, and CloudWatch logging setup.
MWAA environments require an IAM execution role, a versioned S3 bucket for DAG files, and VPC infrastructure with private subnets that have internet connectivity. The examples are intentionally small. Combine them with your own IAM policies, VPC setup, and scaling configuration.
Create an environment with DAGs from S3
Most deployments start by pointing to a versioned S3 bucket containing DAG files, specifying the execution role, and connecting to private subnets.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.mwaa.Environment("example", {
dagS3Path: "dags/",
executionRoleArn: exampleAwsIamRole.arn,
name: "example",
networkConfiguration: {
securityGroupIds: [exampleAwsSecurityGroup.id],
subnetIds: _private.map(__item => __item.id),
},
sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws
example = aws.mwaa.Environment("example",
dag_s3_path="dags/",
execution_role_arn=example_aws_iam_role["arn"],
name="example",
network_configuration={
"security_group_ids": [example_aws_security_group["id"]],
"subnet_ids": [__item["id"] for __item in private],
},
source_bucket_arn=example_aws_s3_bucket["arn"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:6,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Mwaa.Environment("example", new()
{
DagS3Path = "dags/",
ExecutionRoleArn = exampleAwsIamRole.Arn,
Name = "example",
NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup.Id,
},
SubnetIds = @private.Select(__item => __item.Id).ToList(),
},
SourceBucketArn = exampleAwsS3Bucket.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Environment("example", EnvironmentArgs.builder()
.dagS3Path("dags/")
.executionRoleArn(exampleAwsIamRole.arn())
.name("example")
.networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
.securityGroupIds(exampleAwsSecurityGroup.id())
.subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
.build())
.sourceBucketArn(exampleAwsS3Bucket.arn())
.build());
}
}
At runtime, MWAA reads DAG definitions from the S3 path specified in dagS3Path. The sourceBucketArn identifies the versioned bucket, and executionRoleArn grants permissions to read from S3 and write logs. The networkConfiguration places Airflow components in your private subnets with specified security groups; these subnets must have internet access via NAT gateway or similar for MWAA to function.
Override Airflow runtime settings
Teams often tune Airflow’s behavior for their workload patterns, adjusting parallelism or retry policies.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.mwaa.Environment("example", {
airflowConfigurationOptions: {
"core.default_task_retries": "16",
"core.parallelism": "1",
},
dagS3Path: "dags/",
executionRoleArn: exampleAwsIamRole.arn,
name: "example",
networkConfiguration: {
securityGroupIds: [exampleAwsSecurityGroup.id],
subnetIds: _private.map(__item => __item.id),
},
sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws
example = aws.mwaa.Environment("example",
airflow_configuration_options={
"core.default_task_retries": "16",
"core.parallelism": "1",
},
dag_s3_path="dags/",
execution_role_arn=example_aws_iam_role["arn"],
name="example",
network_configuration={
"security_group_ids": [example_aws_security_group["id"]],
"subnet_ids": [__item["id"] for __item in private],
},
source_bucket_arn=example_aws_s3_bucket["arn"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
AirflowConfigurationOptions: pulumi.StringMap{
"core.default_task_retries": pulumi.String("16"),
"core.parallelism": pulumi.String("1"),
},
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:10,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Mwaa.Environment("example", new()
{
AirflowConfigurationOptions =
{
{ "core.default_task_retries", "16" },
{ "core.parallelism", "1" },
},
DagS3Path = "dags/",
ExecutionRoleArn = exampleAwsIamRole.Arn,
Name = "example",
NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup.Id,
},
SubnetIds = @private.Select(__item => __item.Id).ToList(),
},
SourceBucketArn = exampleAwsS3Bucket.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Environment("example", EnvironmentArgs.builder()
.airflowConfigurationOptions(Map.ofEntries(
Map.entry("core.default_task_retries", "16"),
Map.entry("core.parallelism", "1")
))
.dagS3Path("dags/")
.executionRoleArn(exampleAwsIamRole.arn())
.name("example")
.networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
.securityGroupIds(exampleAwsSecurityGroup.id())
.subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
.build())
.sourceBucketArn(exampleAwsS3Bucket.arn())
.build());
}
}
The airflowConfigurationOptions property passes key-value pairs that override Airflow’s core settings. Here, core.parallelism limits concurrent task execution to 1, and core.default_task_retries sets the retry count to 16. These settings apply at the Airflow configuration level, affecting how the scheduler and workers behave.
Configure CloudWatch logging for Airflow components
Production environments typically enable detailed logging across Airflow’s components to troubleshoot DAG processing and task execution.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.mwaa.Environment("example", {
dagS3Path: "dags/",
executionRoleArn: exampleAwsIamRole.arn,
loggingConfiguration: {
dagProcessingLogs: {
enabled: true,
logLevel: "DEBUG",
},
schedulerLogs: {
enabled: true,
logLevel: "INFO",
},
taskLogs: {
enabled: true,
logLevel: "WARNING",
},
webserverLogs: {
enabled: true,
logLevel: "ERROR",
},
workerLogs: {
enabled: true,
logLevel: "CRITICAL",
},
},
name: "example",
networkConfiguration: {
securityGroupIds: [exampleAwsSecurityGroup.id],
subnetIds: _private.map(__item => __item.id),
},
sourceBucketArn: exampleAwsS3Bucket.arn,
});
import pulumi
import pulumi_aws as aws
example = aws.mwaa.Environment("example",
dag_s3_path="dags/",
execution_role_arn=example_aws_iam_role["arn"],
logging_configuration={
"dag_processing_logs": {
"enabled": True,
"log_level": "DEBUG",
},
"scheduler_logs": {
"enabled": True,
"log_level": "INFO",
},
"task_logs": {
"enabled": True,
"log_level": "WARNING",
},
"webserver_logs": {
"enabled": True,
"log_level": "ERROR",
},
"worker_logs": {
"enabled": True,
"log_level": "CRITICAL",
},
},
name="example",
network_configuration={
"security_group_ids": [example_aws_security_group["id"]],
"subnet_ids": [__item["id"] for __item in private],
},
source_bucket_arn=example_aws_s3_bucket["arn"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/mwaa"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := mwaa.NewEnvironment(ctx, "example", &mwaa.EnvironmentArgs{
DagS3Path: pulumi.String("dags/"),
ExecutionRoleArn: pulumi.Any(exampleAwsIamRole.Arn),
LoggingConfiguration: &mwaa.EnvironmentLoggingConfigurationArgs{
DagProcessingLogs: &mwaa.EnvironmentLoggingConfigurationDagProcessingLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("DEBUG"),
},
SchedulerLogs: &mwaa.EnvironmentLoggingConfigurationSchedulerLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("INFO"),
},
TaskLogs: &mwaa.EnvironmentLoggingConfigurationTaskLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("WARNING"),
},
WebserverLogs: &mwaa.EnvironmentLoggingConfigurationWebserverLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("ERROR"),
},
WorkerLogs: &mwaa.EnvironmentLoggingConfigurationWorkerLogsArgs{
Enabled: pulumi.Bool(true),
LogLevel: pulumi.String("CRITICAL"),
},
},
Name: pulumi.String("example"),
NetworkConfiguration: &mwaa.EnvironmentNetworkConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup.Id,
},
SubnetIds: []pulumi.String(%!v(PANIC=Format method: fatal: A failure has occurred: unlowered splat expression @ example.pp:28,24-37)),
},
SourceBucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.Mwaa.Environment("example", new()
{
DagS3Path = "dags/",
ExecutionRoleArn = exampleAwsIamRole.Arn,
LoggingConfiguration = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationArgs
{
DagProcessingLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationDagProcessingLogsArgs
{
Enabled = true,
LogLevel = "DEBUG",
},
SchedulerLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationSchedulerLogsArgs
{
Enabled = true,
LogLevel = "INFO",
},
TaskLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationTaskLogsArgs
{
Enabled = true,
LogLevel = "WARNING",
},
WebserverLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationWebserverLogsArgs
{
Enabled = true,
LogLevel = "ERROR",
},
WorkerLogs = new Aws.Mwaa.Inputs.EnvironmentLoggingConfigurationWorkerLogsArgs
{
Enabled = true,
LogLevel = "CRITICAL",
},
},
Name = "example",
NetworkConfiguration = new Aws.Mwaa.Inputs.EnvironmentNetworkConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup.Id,
},
SubnetIds = @private.Select(__item => __item.Id).ToList(),
},
SourceBucketArn = exampleAwsS3Bucket.Arn,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.mwaa.Environment;
import com.pulumi.aws.mwaa.EnvironmentArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationDagProcessingLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationSchedulerLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationTaskLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationWebserverLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentLoggingConfigurationWorkerLogsArgs;
import com.pulumi.aws.mwaa.inputs.EnvironmentNetworkConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Environment("example", EnvironmentArgs.builder()
.dagS3Path("dags/")
.executionRoleArn(exampleAwsIamRole.arn())
.loggingConfiguration(EnvironmentLoggingConfigurationArgs.builder()
.dagProcessingLogs(EnvironmentLoggingConfigurationDagProcessingLogsArgs.builder()
.enabled(true)
.logLevel("DEBUG")
.build())
.schedulerLogs(EnvironmentLoggingConfigurationSchedulerLogsArgs.builder()
.enabled(true)
.logLevel("INFO")
.build())
.taskLogs(EnvironmentLoggingConfigurationTaskLogsArgs.builder()
.enabled(true)
.logLevel("WARNING")
.build())
.webserverLogs(EnvironmentLoggingConfigurationWebserverLogsArgs.builder()
.enabled(true)
.logLevel("ERROR")
.build())
.workerLogs(EnvironmentLoggingConfigurationWorkerLogsArgs.builder()
.enabled(true)
.logLevel("CRITICAL")
.build())
.build())
.name("example")
.networkConfiguration(EnvironmentNetworkConfigurationArgs.builder()
.securityGroupIds(exampleAwsSecurityGroup.id())
.subnetIds(private_.stream().map(element -> element.id()).collect(toList()))
.build())
.sourceBucketArn(exampleAwsS3Bucket.arn())
.build());
}
}
The loggingConfiguration property controls CloudWatch logging for five Airflow components: DAG processing, scheduler, tasks, webserver, and workers. Each component can be enabled independently with its own log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Task logs are enabled by default at INFO level; this example shows explicit configuration for all components.
Beyond these examples
These snippets focus on specific environment-level features: S3-based DAG storage and execution, Airflow runtime configuration overrides, and CloudWatch logging for Airflow components. They’re intentionally minimal rather than full Airflow deployments.
The examples rely on pre-existing infrastructure such as IAM execution roles with MWAA permissions, versioned S3 buckets for DAG storage, VPC with private subnets and internet connectivity, and security groups for environment access. They focus on configuring the environment rather than provisioning everything around it.
To keep things focused, common environment patterns are omitted, including:
- Environment sizing (environmentClass, minWorkers, maxWorkers)
- Scheduler and webserver scaling (schedulers, minWebservers, maxWebservers)
- Python dependencies and plugins (requirementsS3Path, pluginsS3Path)
- Startup scripts for custom initialization (startupScriptS3Path)
- KMS encryption and webserver access modes
These omissions are intentional: the goal is to illustrate how each environment feature is wired, not provide drop-in Airflow modules. See the MWAA Environment resource reference for all available configuration options.
Let's deploy AWS Managed Workflows for Apache Airflow
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Prerequisites & Setup
executionRoleArn), two private subnets in your VPC (networkConfiguration), and a versioned S3 bucket (sourceBucketArn).networkConfiguration requires internet connectivity. Deployment fails if subnets lack internet access, typically requiring a NAT Gateway for private subnets.pluginsS3Path, you must also specify pluginsS3ObjectVersion. Similarly, requirementsS3Path requires requirementsS3ObjectVersion.Networking & Access
webserverAccessMode to specify PRIVATE_ONLY (default, VPC-only access) or PUBLIC_ONLY (internet-accessible).endpointManagement set to SERVICE (default) means AWS creates and manages VPC endpoints. CUSTOMER means you create and manage your own VPC endpoints. This setting is immutable after creation.Scaling & Performance
mw1.micro.mw1.micro, mw1.small, mw1.medium, and mw1.large. The default is mw1.small.Configuration & Customization
airflowConfigurationOptions to specify overrides as key-value pairs, such as core.default_task_retries or core.parallelism.dagProcessingLogs, schedulerLogs, taskLogs, webserverLogs, and workerLogs. Task logs are enabled by default at the INFO level.Immutability & Versioning
name (environment name), endpointManagement (VPC endpoint management mode), and kmsKey (encryption key ARN).startupScriptS3Path) are only supported for environment versions 2.x and later.Using a different cloud?
Explore analytics guides for other cloud providers: