The aws:ec2/flowLog:FlowLog resource, part of the Pulumi AWS provider, captures IP traffic metadata for VPCs, subnets, network interfaces, Transit Gateways, or NAT Gateways and routes it to CloudWatch Logs, S3, or Firehose. This guide focuses on three capabilities: CloudWatch Logs integration, S3 storage with Parquet optimization, and cross-account Firehose delivery.
Flow logs attach to existing network resources and require IAM roles, destination buckets, log groups, or Firehose streams. The examples are intentionally small. Combine them with your own VPC infrastructure and log analysis tools.
Send flow logs to CloudWatch Logs
Most VPC monitoring starts by capturing network traffic to CloudWatch Logs, where you can query connection patterns using CloudWatch Insights.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const exampleLogGroup = new aws.cloudwatch.LogGroup("example", {name: "example"});
const assumeRole = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["vpc-flow-logs.amazonaws.com"],
}],
actions: ["sts:AssumeRole"],
}],
});
const exampleRole = new aws.iam.Role("example", {
name: "example",
assumeRolePolicy: assumeRole.then(assumeRole => assumeRole.json),
});
const exampleFlowLog = new aws.ec2.FlowLog("example", {
iamRoleArn: exampleRole.arn,
logDestination: exampleLogGroup.arn,
trafficType: "ALL",
vpcId: exampleAwsVpc.id,
});
const example = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
actions: [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
],
resources: ["*"],
}],
});
const exampleRolePolicy = new aws.iam.RolePolicy("example", {
name: "example",
role: exampleRole.id,
policy: example.then(example => example.json),
});
import pulumi
import pulumi_aws as aws
example_log_group = aws.cloudwatch.LogGroup("example", name="example")
assume_role = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["vpc-flow-logs.amazonaws.com"],
}],
"actions": ["sts:AssumeRole"],
}])
example_role = aws.iam.Role("example",
name="example",
assume_role_policy=assume_role.json)
example_flow_log = aws.ec2.FlowLog("example",
iam_role_arn=example_role.arn,
log_destination=example_log_group.arn,
traffic_type="ALL",
vpc_id=example_aws_vpc["id"])
example = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"actions": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
],
"resources": ["*"],
}])
example_role_policy = aws.iam.RolePolicy("example",
name="example",
role=example_role.id,
policy=example.json)
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
exampleLogGroup, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
Name: pulumi.String("example"),
})
if err != nil {
return err
}
assumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"vpc-flow-logs.amazonaws.com",
},
},
},
Actions: []string{
"sts:AssumeRole",
},
},
},
}, nil)
if err != nil {
return err
}
exampleRole, err := iam.NewRole(ctx, "example", &iam.RoleArgs{
Name: pulumi.String("example"),
AssumeRolePolicy: pulumi.String(assumeRole.Json),
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "example", &ec2.FlowLogArgs{
IamRoleArn: exampleRole.Arn,
LogDestination: exampleLogGroup.Arn,
TrafficType: pulumi.String("ALL"),
VpcId: pulumi.Any(exampleAwsVpc.Id),
})
if err != nil {
return err
}
example, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Actions: []string{
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
},
Resources: []string{
"*",
},
},
},
}, nil)
if err != nil {
return err
}
_, err = iam.NewRolePolicy(ctx, "example", &iam.RolePolicyArgs{
Name: pulumi.String("example"),
Role: exampleRole.ID(),
Policy: pulumi.String(example.Json),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var exampleLogGroup = new Aws.CloudWatch.LogGroup("example", new()
{
Name = "example",
});
var assumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"vpc-flow-logs.amazonaws.com",
},
},
},
Actions = new[]
{
"sts:AssumeRole",
},
},
},
});
var exampleRole = new Aws.Iam.Role("example", new()
{
Name = "example",
AssumeRolePolicy = assumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var exampleFlowLog = new Aws.Ec2.FlowLog("example", new()
{
IamRoleArn = exampleRole.Arn,
LogDestination = exampleLogGroup.Arn,
TrafficType = "ALL",
VpcId = exampleAwsVpc.Id,
});
var example = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
},
Resources = new[]
{
"*",
},
},
},
});
var exampleRolePolicy = new Aws.Iam.RolePolicy("example", new()
{
Name = "example",
Role = exampleRole.Id,
Policy = example.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var exampleLogGroup = new LogGroup("exampleLogGroup", LogGroupArgs.builder()
.name("example")
.build());
final var assumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("vpc-flow-logs.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var exampleRole = new Role("exampleRole", RoleArgs.builder()
.name("example")
.assumeRolePolicy(assumeRole.json())
.build());
var exampleFlowLog = new FlowLog("exampleFlowLog", FlowLogArgs.builder()
.iamRoleArn(exampleRole.arn())
.logDestination(exampleLogGroup.arn())
.trafficType("ALL")
.vpcId(exampleAwsVpc.id())
.build());
final var example = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams")
.resources("*")
.build())
.build());
var exampleRolePolicy = new RolePolicy("exampleRolePolicy", RolePolicyArgs.builder()
.name("example")
.role(exampleRole.id())
.policy(example.json())
.build());
}
}
resources:
exampleFlowLog:
type: aws:ec2:FlowLog
name: example
properties:
iamRoleArn: ${exampleRole.arn}
logDestination: ${exampleLogGroup.arn}
trafficType: ALL
vpcId: ${exampleAwsVpc.id}
exampleLogGroup:
type: aws:cloudwatch:LogGroup
name: example
properties:
name: example
exampleRole:
type: aws:iam:Role
name: example
properties:
name: example
assumeRolePolicy: ${assumeRole.json}
exampleRolePolicy:
type: aws:iam:RolePolicy
name: example
properties:
name: example
role: ${exampleRole.id}
policy: ${example.json}
variables:
assumeRole:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
principals:
- type: Service
identifiers:
- vpc-flow-logs.amazonaws.com
actions:
- sts:AssumeRole
example:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
actions:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- logs:DescribeLogGroups
- logs:DescribeLogStreams
resources:
- '*'
The flow log captures all traffic (ACCEPT, REJECT, or ALL) from the specified VPC and sends records to the CloudWatch Log Group. The iamRoleArn grants permissions to create log streams and write events. The IAM role requires logs:CreateLogGroup, logs:CreateLogStream, and logs:PutLogEvents permissions.
Store flow logs in S3 buckets
Teams building audit trails or analytics pipelines often route flow logs to S3 for cost-effective storage and integration with Athena.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const exampleBucket = new aws.s3.Bucket("example", {bucket: "example"});
const example = new aws.ec2.FlowLog("example", {
logDestination: exampleBucket.arn,
logDestinationType: "s3",
trafficType: "ALL",
vpcId: exampleAwsVpc.id,
});
import pulumi
import pulumi_aws as aws
example_bucket = aws.s3.Bucket("example", bucket="example")
example = aws.ec2.FlowLog("example",
log_destination=example_bucket.arn,
log_destination_type="s3",
traffic_type="ALL",
vpc_id=example_aws_vpc["id"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
exampleBucket, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example"),
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "example", &ec2.FlowLogArgs{
LogDestination: exampleBucket.Arn,
LogDestinationType: pulumi.String("s3"),
TrafficType: pulumi.String("ALL"),
VpcId: pulumi.Any(exampleAwsVpc.Id),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var exampleBucket = new Aws.S3.Bucket("example", new()
{
BucketName = "example",
});
var example = new Aws.Ec2.FlowLog("example", new()
{
LogDestination = exampleBucket.Arn,
LogDestinationType = "s3",
TrafficType = "ALL",
VpcId = exampleAwsVpc.Id,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var exampleBucket = new Bucket("exampleBucket", BucketArgs.builder()
.bucket("example")
.build());
var example = new FlowLog("example", FlowLogArgs.builder()
.logDestination(exampleBucket.arn())
.logDestinationType("s3")
.trafficType("ALL")
.vpcId(exampleAwsVpc.id())
.build());
}
}
resources:
example:
type: aws:ec2:FlowLog
properties:
logDestination: ${exampleBucket.arn}
logDestinationType: s3
trafficType: ALL
vpcId: ${exampleAwsVpc.id}
exampleBucket:
type: aws:s3:Bucket
name: example
properties:
bucket: example
Setting logDestinationType to “s3” routes traffic records to the S3 bucket. The logDestination property points to the bucket ARN. Unlike CloudWatch destinations, S3 logging doesn’t require an IAM role in the flow log configuration; the Flow Logs service uses a service-linked role with PutObject permissions.
Optimize S3 storage with Parquet and partitioning
Analytics workloads benefit from columnar formats and time-based partitioning, which reduce query costs in Athena and Redshift Spectrum.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const exampleBucket = new aws.s3.Bucket("example", {bucket: "example"});
const example = new aws.ec2.FlowLog("example", {
logDestination: exampleBucket.arn,
logDestinationType: "s3",
trafficType: "ALL",
vpcId: exampleAwsVpc.id,
destinationOptions: {
fileFormat: "parquet",
perHourPartition: true,
},
});
import pulumi
import pulumi_aws as aws
example_bucket = aws.s3.Bucket("example", bucket="example")
example = aws.ec2.FlowLog("example",
log_destination=example_bucket.arn,
log_destination_type="s3",
traffic_type="ALL",
vpc_id=example_aws_vpc["id"],
destination_options={
"file_format": "parquet",
"per_hour_partition": True,
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
exampleBucket, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example"),
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "example", &ec2.FlowLogArgs{
LogDestination: exampleBucket.Arn,
LogDestinationType: pulumi.String("s3"),
TrafficType: pulumi.String("ALL"),
VpcId: pulumi.Any(exampleAwsVpc.Id),
DestinationOptions: &ec2.FlowLogDestinationOptionsArgs{
FileFormat: pulumi.String("parquet"),
PerHourPartition: pulumi.Bool(true),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var exampleBucket = new Aws.S3.Bucket("example", new()
{
BucketName = "example",
});
var example = new Aws.Ec2.FlowLog("example", new()
{
LogDestination = exampleBucket.Arn,
LogDestinationType = "s3",
TrafficType = "ALL",
VpcId = exampleAwsVpc.Id,
DestinationOptions = new Aws.Ec2.Inputs.FlowLogDestinationOptionsArgs
{
FileFormat = "parquet",
PerHourPartition = true,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import com.pulumi.aws.ec2.inputs.FlowLogDestinationOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var exampleBucket = new Bucket("exampleBucket", BucketArgs.builder()
.bucket("example")
.build());
var example = new FlowLog("example", FlowLogArgs.builder()
.logDestination(exampleBucket.arn())
.logDestinationType("s3")
.trafficType("ALL")
.vpcId(exampleAwsVpc.id())
.destinationOptions(FlowLogDestinationOptionsArgs.builder()
.fileFormat("parquet")
.perHourPartition(true)
.build())
.build());
}
}
resources:
example:
type: aws:ec2:FlowLog
properties:
logDestination: ${exampleBucket.arn}
logDestinationType: s3
trafficType: ALL
vpcId: ${exampleAwsVpc.id}
destinationOptions:
fileFormat: parquet
perHourPartition: true
exampleBucket:
type: aws:s3:Bucket
name: example
properties:
bucket: example
The destinationOptions block controls file format and partitioning. Setting fileFormat to “parquet” converts logs to a columnar format that compresses better and queries faster than plain text. The perHourPartition option organizes files by hour, reducing the data scanned per query.
Route logs across AWS accounts via Firehose
Organizations with centralized logging send flow logs from workload accounts to a dedicated logging account, where Firehose delivers them to S3, Redshift, or other destinations.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
// For source account
const src = new aws.ec2.Vpc("src", {});
const srcAssumeRolePolicy = aws.iam.getPolicyDocument({
statements: [{
actions: ["sts:AssumeRole"],
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["delivery.logs.amazonaws.com"],
}],
}],
});
const srcRole = new aws.iam.Role("src", {
name: "tf-example-mySourceRole",
assumeRolePolicy: srcAssumeRolePolicy.then(srcAssumeRolePolicy => srcAssumeRolePolicy.json),
});
// For destination account
const dstAssumeRolePolicy = aws.iam.getPolicyDocumentOutput({
statements: [{
actions: ["sts:AssumeRole"],
effect: "Allow",
principals: [{
type: "AWS",
identifiers: [srcRole.arn],
}],
}],
});
const dst = new aws.iam.Role("dst", {
name: "AWSLogDeliveryFirehoseCrossAccountRole",
assumeRolePolicy: dstAssumeRolePolicy.apply(dstAssumeRolePolicy => dstAssumeRolePolicy.json),
});
const srcRolePolicy = aws.iam.getPolicyDocumentOutput({
statements: [
{
effect: "Allow",
actions: ["iam:PassRole"],
resources: [srcRole.arn],
conditions: [
{
test: "StringEquals",
variable: "iam:PassedToService",
values: ["delivery.logs.amazonaws.com"],
},
{
test: "StringLike",
variable: "iam:AssociatedResourceARN",
values: [src.arn],
},
],
},
{
effect: "Allow",
actions: [
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery",
],
resources: ["*"],
},
{
effect: "Allow",
actions: ["sts:AssumeRole"],
resources: [dst.arn],
},
],
});
const srcPolicy = new aws.iam.RolePolicy("src_policy", {
name: "tf-example-mySourceRolePolicy",
role: srcRole.name,
policy: srcRolePolicy.apply(srcRolePolicy => srcRolePolicy.json),
});
const dstFirehoseDeliveryStream = new aws.kinesis.FirehoseDeliveryStream("dst", {tags: {
LogDeliveryEnabled: "true",
}});
const srcFlowLog = new aws.ec2.FlowLog("src", {
logDestinationType: "kinesis-data-firehose",
logDestination: dstFirehoseDeliveryStream.arn,
trafficType: "ALL",
vpcId: src.id,
iamRoleArn: srcRole.arn,
deliverCrossAccountRole: dst.arn,
});
const dstRolePolicy = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
actions: [
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
],
resources: ["*"],
}],
});
const dstRolePolicy2 = new aws.iam.RolePolicy("dst", {
name: "AWSLogDeliveryFirehoseCrossAccountRolePolicy",
role: dst.name,
policy: dstRolePolicy.then(dstRolePolicy => dstRolePolicy.json),
});
import pulumi
import pulumi_aws as aws
# For source account
src = aws.ec2.Vpc("src")
src_assume_role_policy = aws.iam.get_policy_document(statements=[{
"actions": ["sts:AssumeRole"],
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["delivery.logs.amazonaws.com"],
}],
}])
src_role = aws.iam.Role("src",
name="tf-example-mySourceRole",
assume_role_policy=src_assume_role_policy.json)
# For destination account
dst_assume_role_policy = aws.iam.get_policy_document_output(statements=[{
"actions": ["sts:AssumeRole"],
"effect": "Allow",
"principals": [{
"type": "AWS",
"identifiers": [src_role.arn],
}],
}])
dst = aws.iam.Role("dst",
name="AWSLogDeliveryFirehoseCrossAccountRole",
assume_role_policy=dst_assume_role_policy.json)
src_role_policy = aws.iam.get_policy_document_output(statements=[
{
"effect": "Allow",
"actions": ["iam:PassRole"],
"resources": [src_role.arn],
"conditions": [
{
"test": "StringEquals",
"variable": "iam:PassedToService",
"values": ["delivery.logs.amazonaws.com"],
},
{
"test": "StringLike",
"variable": "iam:AssociatedResourceARN",
"values": [src.arn],
},
],
},
{
"effect": "Allow",
"actions": [
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery",
],
"resources": ["*"],
},
{
"effect": "Allow",
"actions": ["sts:AssumeRole"],
"resources": [dst.arn],
},
])
src_policy = aws.iam.RolePolicy("src_policy",
name="tf-example-mySourceRolePolicy",
role=src_role.name,
policy=src_role_policy.json)
dst_firehose_delivery_stream = aws.kinesis.FirehoseDeliveryStream("dst", tags={
"LogDeliveryEnabled": "true",
})
src_flow_log = aws.ec2.FlowLog("src",
log_destination_type="kinesis-data-firehose",
log_destination=dst_firehose_delivery_stream.arn,
traffic_type="ALL",
vpc_id=src.id,
iam_role_arn=src_role.arn,
deliver_cross_account_role=dst.arn)
dst_role_policy = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"actions": [
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
],
"resources": ["*"],
}])
dst_role_policy2 = aws.iam.RolePolicy("dst",
name="AWSLogDeliveryFirehoseCrossAccountRolePolicy",
role=dst.name,
policy=dst_role_policy.json)
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
// For source account
src, err := ec2.NewVpc(ctx, "src", nil)
if err != nil {
return err
}
srcAssumeRolePolicy, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Actions: []string{
"sts:AssumeRole",
},
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"delivery.logs.amazonaws.com",
},
},
},
},
},
}, nil)
if err != nil {
return err
}
srcRole, err := iam.NewRole(ctx, "src", &iam.RoleArgs{
Name: pulumi.String("tf-example-mySourceRole"),
AssumeRolePolicy: pulumi.String(srcAssumeRolePolicy.Json),
})
if err != nil {
return err
}
// For destination account
dstAssumeRolePolicy := iam.GetPolicyDocumentOutput(ctx, iam.GetPolicyDocumentOutputArgs{
Statements: iam.GetPolicyDocumentStatementArray{
&iam.GetPolicyDocumentStatementArgs{
Actions: pulumi.StringArray{
pulumi.String("sts:AssumeRole"),
},
Effect: pulumi.String("Allow"),
Principals: iam.GetPolicyDocumentStatementPrincipalArray{
&iam.GetPolicyDocumentStatementPrincipalArgs{
Type: pulumi.String("AWS"),
Identifiers: pulumi.StringArray{
srcRole.Arn,
},
},
},
},
},
}, nil)
dst, err := iam.NewRole(ctx, "dst", &iam.RoleArgs{
Name: pulumi.String("AWSLogDeliveryFirehoseCrossAccountRole"),
AssumeRolePolicy: pulumi.String(dstAssumeRolePolicy.ApplyT(func(dstAssumeRolePolicy iam.GetPolicyDocumentResult) (*string, error) {
return &dstAssumeRolePolicy.Json, nil
}).(pulumi.StringPtrOutput)),
})
if err != nil {
return err
}
srcRolePolicy := iam.GetPolicyDocumentOutput(ctx, iam.GetPolicyDocumentOutputArgs{
Statements: iam.GetPolicyDocumentStatementArray{
&iam.GetPolicyDocumentStatementArgs{
Effect: pulumi.String("Allow"),
Actions: pulumi.StringArray{
pulumi.String("iam:PassRole"),
},
Resources: pulumi.StringArray{
srcRole.Arn,
},
Conditions: iam.GetPolicyDocumentStatementConditionArray{
&iam.GetPolicyDocumentStatementConditionArgs{
Test: pulumi.String("StringEquals"),
Variable: pulumi.String("iam:PassedToService"),
Values: pulumi.StringArray{
pulumi.String("delivery.logs.amazonaws.com"),
},
},
&iam.GetPolicyDocumentStatementConditionArgs{
Test: pulumi.String("StringLike"),
Variable: pulumi.String("iam:AssociatedResourceARN"),
Values: pulumi.StringArray{
src.Arn,
},
},
},
},
&iam.GetPolicyDocumentStatementArgs{
Effect: pulumi.String("Allow"),
Actions: pulumi.StringArray{
pulumi.String("logs:CreateLogDelivery"),
pulumi.String("logs:DeleteLogDelivery"),
pulumi.String("logs:ListLogDeliveries"),
pulumi.String("logs:GetLogDelivery"),
},
Resources: pulumi.StringArray{
pulumi.String("*"),
},
},
&iam.GetPolicyDocumentStatementArgs{
Effect: pulumi.String("Allow"),
Actions: pulumi.StringArray{
pulumi.String("sts:AssumeRole"),
},
Resources: pulumi.StringArray{
dst.Arn,
},
},
},
}, nil)
_, err = iam.NewRolePolicy(ctx, "src_policy", &iam.RolePolicyArgs{
Name: pulumi.String("tf-example-mySourceRolePolicy"),
Role: srcRole.Name,
Policy: pulumi.String(srcRolePolicy.ApplyT(func(srcRolePolicy iam.GetPolicyDocumentResult) (*string, error) {
return &srcRolePolicy.Json, nil
}).(pulumi.StringPtrOutput)),
})
if err != nil {
return err
}
dstFirehoseDeliveryStream, err := kinesis.NewFirehoseDeliveryStream(ctx, "dst", &kinesis.FirehoseDeliveryStreamArgs{
Tags: pulumi.StringMap{
"LogDeliveryEnabled": pulumi.String("true"),
},
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "src", &ec2.FlowLogArgs{
LogDestinationType: pulumi.String("kinesis-data-firehose"),
LogDestination: dstFirehoseDeliveryStream.Arn,
TrafficType: pulumi.String("ALL"),
VpcId: src.ID(),
IamRoleArn: srcRole.Arn,
DeliverCrossAccountRole: dst.Arn,
})
if err != nil {
return err
}
dstRolePolicy, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Actions: []string{
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
},
Resources: []string{
"*",
},
},
},
}, nil)
if err != nil {
return err
}
_, err = iam.NewRolePolicy(ctx, "dst", &iam.RolePolicyArgs{
Name: pulumi.String("AWSLogDeliveryFirehoseCrossAccountRolePolicy"),
Role: dst.Name,
Policy: pulumi.String(dstRolePolicy.Json),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
// For source account
var src = new Aws.Ec2.Vpc("src");
var srcAssumeRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Actions = new[]
{
"sts:AssumeRole",
},
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"delivery.logs.amazonaws.com",
},
},
},
},
},
});
var srcRole = new Aws.Iam.Role("src", new()
{
Name = "tf-example-mySourceRole",
AssumeRolePolicy = srcAssumeRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
// For destination account
var dstAssumeRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Actions = new[]
{
"sts:AssumeRole",
},
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "AWS",
Identifiers = new[]
{
srcRole.Arn,
},
},
},
},
},
});
var dst = new Aws.Iam.Role("dst", new()
{
Name = "AWSLogDeliveryFirehoseCrossAccountRole",
AssumeRolePolicy = dstAssumeRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var srcRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"iam:PassRole",
},
Resources = new[]
{
srcRole.Arn,
},
Conditions = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "StringEquals",
Variable = "iam:PassedToService",
Values = new[]
{
"delivery.logs.amazonaws.com",
},
},
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "StringLike",
Variable = "iam:AssociatedResourceARN",
Values = new[]
{
src.Arn,
},
},
},
},
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery",
},
Resources = new[]
{
"*",
},
},
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"sts:AssumeRole",
},
Resources = new[]
{
dst.Arn,
},
},
},
});
var srcPolicy = new Aws.Iam.RolePolicy("src_policy", new()
{
Name = "tf-example-mySourceRolePolicy",
Role = srcRole.Name,
Policy = srcRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var dstFirehoseDeliveryStream = new Aws.Kinesis.FirehoseDeliveryStream("dst", new()
{
Tags =
{
{ "LogDeliveryEnabled", "true" },
},
});
var srcFlowLog = new Aws.Ec2.FlowLog("src", new()
{
LogDestinationType = "kinesis-data-firehose",
LogDestination = dstFirehoseDeliveryStream.Arn,
TrafficType = "ALL",
VpcId = src.Id,
IamRoleArn = srcRole.Arn,
DeliverCrossAccountRole = dst.Arn,
});
var dstRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
},
Resources = new[]
{
"*",
},
},
},
});
var dstRolePolicy2 = new Aws.Iam.RolePolicy("dst", new()
{
Name = "AWSLogDeliveryFirehoseCrossAccountRolePolicy",
Role = dst.Name,
Policy = dstRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.ec2.Vpc;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
// For source account
var src = new Vpc("src");
final var srcAssumeRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.actions("sts:AssumeRole")
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("delivery.logs.amazonaws.com")
.build())
.build())
.build());
var srcRole = new Role("srcRole", RoleArgs.builder()
.name("tf-example-mySourceRole")
.assumeRolePolicy(srcAssumeRolePolicy.json())
.build());
// For destination account
final var dstAssumeRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.actions("sts:AssumeRole")
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("AWS")
.identifiers(srcRole.arn())
.build())
.build())
.build());
var dst = new Role("dst", RoleArgs.builder()
.name("AWSLogDeliveryFirehoseCrossAccountRole")
.assumeRolePolicy(dstAssumeRolePolicy.applyValue(_dstAssumeRolePolicy -> _dstAssumeRolePolicy.json()))
.build());
final var srcRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions("iam:PassRole")
.resources(srcRole.arn())
.conditions(
GetPolicyDocumentStatementConditionArgs.builder()
.test("StringEquals")
.variable("iam:PassedToService")
.values("delivery.logs.amazonaws.com")
.build(),
GetPolicyDocumentStatementConditionArgs.builder()
.test("StringLike")
.variable("iam:AssociatedResourceARN")
.values(src.arn())
.build())
.build(),
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery")
.resources("*")
.build(),
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions("sts:AssumeRole")
.resources(dst.arn())
.build())
.build());
var srcPolicy = new RolePolicy("srcPolicy", RolePolicyArgs.builder()
.name("tf-example-mySourceRolePolicy")
.role(srcRole.name())
.policy(srcRolePolicy.applyValue(_srcRolePolicy -> _srcRolePolicy.json()))
.build());
var dstFirehoseDeliveryStream = new FirehoseDeliveryStream("dstFirehoseDeliveryStream", FirehoseDeliveryStreamArgs.builder()
.tags(Map.of("LogDeliveryEnabled", "true"))
.build());
var srcFlowLog = new FlowLog("srcFlowLog", FlowLogArgs.builder()
.logDestinationType("kinesis-data-firehose")
.logDestination(dstFirehoseDeliveryStream.arn())
.trafficType("ALL")
.vpcId(src.id())
.iamRoleArn(srcRole.arn())
.deliverCrossAccountRole(dst.arn())
.build());
final var dstRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream")
.resources("*")
.build())
.build());
var dstRolePolicy2 = new RolePolicy("dstRolePolicy2", RolePolicyArgs.builder()
.name("AWSLogDeliveryFirehoseCrossAccountRolePolicy")
.role(dst.name())
.policy(dstRolePolicy.json())
.build());
}
}
resources:
# For source account
src:
type: aws:ec2:Vpc
srcRole:
type: aws:iam:Role
name: src
properties:
name: tf-example-mySourceRole
assumeRolePolicy: ${srcAssumeRolePolicy.json}
srcPolicy:
type: aws:iam:RolePolicy
name: src_policy
properties:
name: tf-example-mySourceRolePolicy
role: ${srcRole.name}
policy: ${srcRolePolicy.json}
srcFlowLog:
type: aws:ec2:FlowLog
name: src
properties:
logDestinationType: kinesis-data-firehose
logDestination: ${dstFirehoseDeliveryStream.arn}
trafficType: ALL
vpcId: ${src.id}
iamRoleArn: ${srcRole.arn}
deliverCrossAccountRole: ${dst.arn}
dst:
type: aws:iam:Role
properties:
name: AWSLogDeliveryFirehoseCrossAccountRole
assumeRolePolicy: ${dstAssumeRolePolicy.json}
dstRolePolicy2:
type: aws:iam:RolePolicy
name: dst
properties:
name: AWSLogDeliveryFirehoseCrossAccountRolePolicy
role: ${dst.name}
policy: ${dstRolePolicy.json}
dstFirehoseDeliveryStream:
type: aws:kinesis:FirehoseDeliveryStream
name: dst
properties:
tags:
LogDeliveryEnabled: 'true'
variables:
srcAssumeRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- actions:
- sts:AssumeRole
effect: Allow
principals:
- type: Service
identifiers:
- delivery.logs.amazonaws.com
srcRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
actions:
- iam:PassRole
resources:
- ${srcRole.arn}
conditions:
- test: StringEquals
variable: iam:PassedToService
values:
- delivery.logs.amazonaws.com
- test: StringLike
variable: iam:AssociatedResourceARN
values:
- ${src.arn}
- effect: Allow
actions:
- logs:CreateLogDelivery
- logs:DeleteLogDelivery
- logs:ListLogDeliveries
- logs:GetLogDelivery
resources:
- '*'
- effect: Allow
actions:
- sts:AssumeRole
resources:
- ${dst.arn}
# For destination account
dstAssumeRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- actions:
- sts:AssumeRole
effect: Allow
principals:
- type: AWS
identifiers:
- ${srcRole.arn}
dstRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
actions:
- iam:CreateServiceLinkedRole
- firehose:TagDeliveryStream
resources:
- '*'
Cross-account delivery requires IAM roles in both accounts. The source account’s iamRoleArn grants permissions to create log deliveries and assume the destination role. The deliverCrossAccountRole points to the destination account’s role, which has permissions to write to Firehose. The logDestinationType must be “kinesis-data-firehose”, and the logDestination references the Firehose delivery stream ARN in the destination account.
Beyond these examples
These snippets focus on specific flow log features: CloudWatch, S3, and Firehose destinations, Parquet format and time-based partitioning, and cross-account delivery. They’re intentionally minimal rather than full network monitoring solutions.
The examples reference pre-existing infrastructure such as VPCs, subnets, or network interfaces to monitor, IAM roles with appropriate log delivery permissions, and S3 buckets, CloudWatch Log Groups, or Firehose delivery streams. They focus on configuring the flow log rather than provisioning everything around it.
To keep things focused, common flow log patterns are omitted, including:
- Custom log formats (logFormat property)
- Aggregation intervals (maxAggregationInterval)
- Traffic filtering (trafficType: ACCEPT or REJECT only)
- Subnet, ENI, Transit Gateway, or NAT Gateway scopes
These omissions are intentional: the goal is to illustrate how each flow log feature is wired, not provide drop-in monitoring modules. See the VPC Flow Log resource reference for all available configuration options.
Let's configure AWS VPC Flow Logs
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Configuration & Resource Scope
eniId, regionalNatGatewayId, subnetId, transitGatewayId, transitGatewayAttachmentId, or vpcId. This determines the scope of traffic captured.trafficType is required when monitoring an ENI, NAT Gateway, Subnet, or VPC. Valid values are ACCEPT, REJECT, or ALL.logDestination, logDestinationType, trafficType, maxAggregationInterval, iamRoleArn, destinationOptions, and all resource identifiers (vpcId, subnetId, etc.). You must recreate the flow log to change these.Destinations & Formats
Flow logs support three destinations via logDestinationType:
- CloudWatch Logs -
cloud-watch-logs(default) - S3 -
s3 - Kinesis Data Firehose -
kinesis-data-firehose
destinationOptions with fileFormat set to parquet. You can also enable perHourPartition for hourly partitioning.IAM & Permissions
logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents, logs:DescribeLogGroups, and logs:DescribeLogStreams.deliverCrossAccountRole to the destination account’s IAM role ARN. The source account role needs sts:AssumeRole permission for that destination role, and the destination role must trust the source role.Timing & Aggregation
maxAggregationInterval accepts 60 seconds (1 minute) or 600 seconds (10 minutes). The default is 600 seconds.transitGatewayId or transitGatewayAttachmentId is specified, maxAggregationInterval must be set to 60 seconds. The default 600-second interval is not supported for Transit Gateway resources.Using a different cloud?
Explore networking guides for other cloud providers: