The aws:ec2/flowLog:FlowLog resource, part of the Pulumi AWS provider, captures IP traffic metadata for VPCs, subnets, network interfaces, or Transit Gateways and routes it to CloudWatch Logs, S3, or Firehose. This guide focuses on three capabilities: CloudWatch Logs delivery, S3 storage with Parquet optimization, and cross-account Firehose delivery.
Flow logs require a network resource to monitor and a destination for the logs. The examples are intentionally small. Combine them with your own VPC infrastructure and logging destinations.
Send flow logs to CloudWatch Logs
Most VPC monitoring starts by capturing network traffic to CloudWatch Logs, where you can query connection patterns using CloudWatch Insights.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const exampleLogGroup = new aws.cloudwatch.LogGroup("example", {name: "example"});
const assumeRole = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["vpc-flow-logs.amazonaws.com"],
}],
actions: ["sts:AssumeRole"],
}],
});
const exampleRole = new aws.iam.Role("example", {
name: "example",
assumeRolePolicy: assumeRole.then(assumeRole => assumeRole.json),
});
const exampleFlowLog = new aws.ec2.FlowLog("example", {
iamRoleArn: exampleRole.arn,
logDestination: exampleLogGroup.arn,
trafficType: "ALL",
vpcId: exampleAwsVpc.id,
});
const example = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
actions: [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
],
resources: ["*"],
}],
});
const exampleRolePolicy = new aws.iam.RolePolicy("example", {
name: "example",
role: exampleRole.id,
policy: example.then(example => example.json),
});
import pulumi
import pulumi_aws as aws
example_log_group = aws.cloudwatch.LogGroup("example", name="example")
assume_role = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["vpc-flow-logs.amazonaws.com"],
}],
"actions": ["sts:AssumeRole"],
}])
example_role = aws.iam.Role("example",
name="example",
assume_role_policy=assume_role.json)
example_flow_log = aws.ec2.FlowLog("example",
iam_role_arn=example_role.arn,
log_destination=example_log_group.arn,
traffic_type="ALL",
vpc_id=example_aws_vpc["id"])
example = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"actions": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
],
"resources": ["*"],
}])
example_role_policy = aws.iam.RolePolicy("example",
name="example",
role=example_role.id,
policy=example.json)
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
exampleLogGroup, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
Name: pulumi.String("example"),
})
if err != nil {
return err
}
assumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"vpc-flow-logs.amazonaws.com",
},
},
},
Actions: []string{
"sts:AssumeRole",
},
},
},
}, nil)
if err != nil {
return err
}
exampleRole, err := iam.NewRole(ctx, "example", &iam.RoleArgs{
Name: pulumi.String("example"),
AssumeRolePolicy: pulumi.String(assumeRole.Json),
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "example", &ec2.FlowLogArgs{
IamRoleArn: exampleRole.Arn,
LogDestination: exampleLogGroup.Arn,
TrafficType: pulumi.String("ALL"),
VpcId: pulumi.Any(exampleAwsVpc.Id),
})
if err != nil {
return err
}
example, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Actions: []string{
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
},
Resources: []string{
"*",
},
},
},
}, nil)
if err != nil {
return err
}
_, err = iam.NewRolePolicy(ctx, "example", &iam.RolePolicyArgs{
Name: pulumi.String("example"),
Role: exampleRole.ID(),
Policy: pulumi.String(example.Json),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var exampleLogGroup = new Aws.CloudWatch.LogGroup("example", new()
{
Name = "example",
});
var assumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"vpc-flow-logs.amazonaws.com",
},
},
},
Actions = new[]
{
"sts:AssumeRole",
},
},
},
});
var exampleRole = new Aws.Iam.Role("example", new()
{
Name = "example",
AssumeRolePolicy = assumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var exampleFlowLog = new Aws.Ec2.FlowLog("example", new()
{
IamRoleArn = exampleRole.Arn,
LogDestination = exampleLogGroup.Arn,
TrafficType = "ALL",
VpcId = exampleAwsVpc.Id,
});
var example = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
},
Resources = new[]
{
"*",
},
},
},
});
var exampleRolePolicy = new Aws.Iam.RolePolicy("example", new()
{
Name = "example",
Role = exampleRole.Id,
Policy = example.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var exampleLogGroup = new LogGroup("exampleLogGroup", LogGroupArgs.builder()
.name("example")
.build());
final var assumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("vpc-flow-logs.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var exampleRole = new Role("exampleRole", RoleArgs.builder()
.name("example")
.assumeRolePolicy(assumeRole.json())
.build());
var exampleFlowLog = new FlowLog("exampleFlowLog", FlowLogArgs.builder()
.iamRoleArn(exampleRole.arn())
.logDestination(exampleLogGroup.arn())
.trafficType("ALL")
.vpcId(exampleAwsVpc.id())
.build());
final var example = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams")
.resources("*")
.build())
.build());
var exampleRolePolicy = new RolePolicy("exampleRolePolicy", RolePolicyArgs.builder()
.name("example")
.role(exampleRole.id())
.policy(example.json())
.build());
}
}
resources:
exampleFlowLog:
type: aws:ec2:FlowLog
name: example
properties:
iamRoleArn: ${exampleRole.arn}
logDestination: ${exampleLogGroup.arn}
trafficType: ALL
vpcId: ${exampleAwsVpc.id}
exampleLogGroup:
type: aws:cloudwatch:LogGroup
name: example
properties:
name: example
exampleRole:
type: aws:iam:Role
name: example
properties:
name: example
assumeRolePolicy: ${assumeRole.json}
exampleRolePolicy:
type: aws:iam:RolePolicy
name: example
properties:
name: example
role: ${exampleRole.id}
policy: ${example.json}
variables:
assumeRole:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
principals:
- type: Service
identifiers:
- vpc-flow-logs.amazonaws.com
actions:
- sts:AssumeRole
example:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
actions:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- logs:DescribeLogGroups
- logs:DescribeLogStreams
resources:
- '*'
When you send logs to CloudWatch, the flow log writes records to the specified Log Group. The iamRoleArn grants permissions to create log streams and write events. The trafficType property controls whether you capture accepted connections, rejected connections, or all traffic. Here, ALL captures both accepted and rejected connections for the entire VPC.
Store flow logs in S3 for long-term retention
Teams that need to retain network logs for compliance or cost analysis often send flow logs to S3, where storage costs are lower than CloudWatch.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const exampleBucket = new aws.s3.Bucket("example", {bucket: "example"});
const example = new aws.ec2.FlowLog("example", {
logDestination: exampleBucket.arn,
logDestinationType: "s3",
trafficType: "ALL",
vpcId: exampleAwsVpc.id,
});
import pulumi
import pulumi_aws as aws
example_bucket = aws.s3.Bucket("example", bucket="example")
example = aws.ec2.FlowLog("example",
log_destination=example_bucket.arn,
log_destination_type="s3",
traffic_type="ALL",
vpc_id=example_aws_vpc["id"])
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
exampleBucket, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example"),
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "example", &ec2.FlowLogArgs{
LogDestination: exampleBucket.Arn,
LogDestinationType: pulumi.String("s3"),
TrafficType: pulumi.String("ALL"),
VpcId: pulumi.Any(exampleAwsVpc.Id),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var exampleBucket = new Aws.S3.Bucket("example", new()
{
BucketName = "example",
});
var example = new Aws.Ec2.FlowLog("example", new()
{
LogDestination = exampleBucket.Arn,
LogDestinationType = "s3",
TrafficType = "ALL",
VpcId = exampleAwsVpc.Id,
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var exampleBucket = new Bucket("exampleBucket", BucketArgs.builder()
.bucket("example")
.build());
var example = new FlowLog("example", FlowLogArgs.builder()
.logDestination(exampleBucket.arn())
.logDestinationType("s3")
.trafficType("ALL")
.vpcId(exampleAwsVpc.id())
.build());
}
}
resources:
example:
type: aws:ec2:FlowLog
properties:
logDestination: ${exampleBucket.arn}
logDestinationType: s3
trafficType: ALL
vpcId: ${exampleAwsVpc.id}
exampleBucket:
type: aws:s3:Bucket
name: example
properties:
bucket: example
The logDestinationType property switches from CloudWatch to S3. Unlike CloudWatch delivery, S3 doesn’t require an IAM role in the flow log configuration; instead, you configure bucket policies to allow the VPC Flow Logs service to write objects. The logDestination points to your bucket ARN.
Optimize S3 logs with Parquet and partitioning
Analytics workloads that query flow logs with Athena benefit from Parquet’s columnar format and hourly partitions, which reduce query costs and improve performance.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const exampleBucket = new aws.s3.Bucket("example", {bucket: "example"});
const example = new aws.ec2.FlowLog("example", {
logDestination: exampleBucket.arn,
logDestinationType: "s3",
trafficType: "ALL",
vpcId: exampleAwsVpc.id,
destinationOptions: {
fileFormat: "parquet",
perHourPartition: true,
},
});
import pulumi
import pulumi_aws as aws
example_bucket = aws.s3.Bucket("example", bucket="example")
example = aws.ec2.FlowLog("example",
log_destination=example_bucket.arn,
log_destination_type="s3",
traffic_type="ALL",
vpc_id=example_aws_vpc["id"],
destination_options={
"file_format": "parquet",
"per_hour_partition": True,
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
exampleBucket, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example"),
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "example", &ec2.FlowLogArgs{
LogDestination: exampleBucket.Arn,
LogDestinationType: pulumi.String("s3"),
TrafficType: pulumi.String("ALL"),
VpcId: pulumi.Any(exampleAwsVpc.Id),
DestinationOptions: &ec2.FlowLogDestinationOptionsArgs{
FileFormat: pulumi.String("parquet"),
PerHourPartition: pulumi.Bool(true),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var exampleBucket = new Aws.S3.Bucket("example", new()
{
BucketName = "example",
});
var example = new Aws.Ec2.FlowLog("example", new()
{
LogDestination = exampleBucket.Arn,
LogDestinationType = "s3",
TrafficType = "ALL",
VpcId = exampleAwsVpc.Id,
DestinationOptions = new Aws.Ec2.Inputs.FlowLogDestinationOptionsArgs
{
FileFormat = "parquet",
PerHourPartition = true,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import com.pulumi.aws.ec2.inputs.FlowLogDestinationOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var exampleBucket = new Bucket("exampleBucket", BucketArgs.builder()
.bucket("example")
.build());
var example = new FlowLog("example", FlowLogArgs.builder()
.logDestination(exampleBucket.arn())
.logDestinationType("s3")
.trafficType("ALL")
.vpcId(exampleAwsVpc.id())
.destinationOptions(FlowLogDestinationOptionsArgs.builder()
.fileFormat("parquet")
.perHourPartition(true)
.build())
.build());
}
}
resources:
example:
type: aws:ec2:FlowLog
properties:
logDestination: ${exampleBucket.arn}
logDestinationType: s3
trafficType: ALL
vpcId: ${exampleAwsVpc.id}
destinationOptions:
fileFormat: parquet
perHourPartition: true
exampleBucket:
type: aws:s3:Bucket
name: example
properties:
bucket: example
The destinationOptions block controls how flow logs are written to S3. Setting fileFormat to “parquet” converts logs from plain text to a columnar format that compresses better and queries faster. The perHourPartition option organizes logs into hourly folders, allowing Athena to scan only relevant time ranges.
Deliver logs across AWS accounts with Firehose
Organizations with centralized logging architectures send flow logs from workload accounts to a central logging account using Firehose and cross-account IAM roles.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
// For source account
const src = new aws.ec2.Vpc("src", {});
const srcAssumeRolePolicy = aws.iam.getPolicyDocument({
statements: [{
actions: ["sts:AssumeRole"],
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["delivery.logs.amazonaws.com"],
}],
}],
});
const srcRole = new aws.iam.Role("src", {
name: "tf-example-mySourceRole",
assumeRolePolicy: srcAssumeRolePolicy.then(srcAssumeRolePolicy => srcAssumeRolePolicy.json),
});
// For destination account
const dstAssumeRolePolicy = aws.iam.getPolicyDocumentOutput({
statements: [{
actions: ["sts:AssumeRole"],
effect: "Allow",
principals: [{
type: "AWS",
identifiers: [srcRole.arn],
}],
}],
});
const dst = new aws.iam.Role("dst", {
name: "AWSLogDeliveryFirehoseCrossAccountRole",
assumeRolePolicy: dstAssumeRolePolicy.apply(dstAssumeRolePolicy => dstAssumeRolePolicy.json),
});
const srcRolePolicy = aws.iam.getPolicyDocumentOutput({
statements: [
{
effect: "Allow",
actions: ["iam:PassRole"],
resources: [srcRole.arn],
conditions: [
{
test: "StringEquals",
variable: "iam:PassedToService",
values: ["delivery.logs.amazonaws.com"],
},
{
test: "StringLike",
variable: "iam:AssociatedResourceARN",
values: [src.arn],
},
],
},
{
effect: "Allow",
actions: [
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery",
],
resources: ["*"],
},
{
effect: "Allow",
actions: ["sts:AssumeRole"],
resources: [dst.arn],
},
],
});
const srcPolicy = new aws.iam.RolePolicy("src_policy", {
name: "tf-example-mySourceRolePolicy",
role: srcRole.name,
policy: srcRolePolicy.apply(srcRolePolicy => srcRolePolicy.json),
});
const dstFirehoseDeliveryStream = new aws.kinesis.FirehoseDeliveryStream("dst", {tags: {
LogDeliveryEnabled: "true",
}});
const srcFlowLog = new aws.ec2.FlowLog("src", {
logDestinationType: "kinesis-data-firehose",
logDestination: dstFirehoseDeliveryStream.arn,
trafficType: "ALL",
vpcId: src.id,
iamRoleArn: srcRole.arn,
deliverCrossAccountRole: dst.arn,
});
const dstRolePolicy = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
actions: [
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
],
resources: ["*"],
}],
});
const dstRolePolicy2 = new aws.iam.RolePolicy("dst", {
name: "AWSLogDeliveryFirehoseCrossAccountRolePolicy",
role: dst.name,
policy: dstRolePolicy.then(dstRolePolicy => dstRolePolicy.json),
});
import pulumi
import pulumi_aws as aws
# For source account
src = aws.ec2.Vpc("src")
src_assume_role_policy = aws.iam.get_policy_document(statements=[{
"actions": ["sts:AssumeRole"],
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["delivery.logs.amazonaws.com"],
}],
}])
src_role = aws.iam.Role("src",
name="tf-example-mySourceRole",
assume_role_policy=src_assume_role_policy.json)
# For destination account
dst_assume_role_policy = aws.iam.get_policy_document_output(statements=[{
"actions": ["sts:AssumeRole"],
"effect": "Allow",
"principals": [{
"type": "AWS",
"identifiers": [src_role.arn],
}],
}])
dst = aws.iam.Role("dst",
name="AWSLogDeliveryFirehoseCrossAccountRole",
assume_role_policy=dst_assume_role_policy.json)
src_role_policy = aws.iam.get_policy_document_output(statements=[
{
"effect": "Allow",
"actions": ["iam:PassRole"],
"resources": [src_role.arn],
"conditions": [
{
"test": "StringEquals",
"variable": "iam:PassedToService",
"values": ["delivery.logs.amazonaws.com"],
},
{
"test": "StringLike",
"variable": "iam:AssociatedResourceARN",
"values": [src.arn],
},
],
},
{
"effect": "Allow",
"actions": [
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery",
],
"resources": ["*"],
},
{
"effect": "Allow",
"actions": ["sts:AssumeRole"],
"resources": [dst.arn],
},
])
src_policy = aws.iam.RolePolicy("src_policy",
name="tf-example-mySourceRolePolicy",
role=src_role.name,
policy=src_role_policy.json)
dst_firehose_delivery_stream = aws.kinesis.FirehoseDeliveryStream("dst", tags={
"LogDeliveryEnabled": "true",
})
src_flow_log = aws.ec2.FlowLog("src",
log_destination_type="kinesis-data-firehose",
log_destination=dst_firehose_delivery_stream.arn,
traffic_type="ALL",
vpc_id=src.id,
iam_role_arn=src_role.arn,
deliver_cross_account_role=dst.arn)
dst_role_policy = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"actions": [
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
],
"resources": ["*"],
}])
dst_role_policy2 = aws.iam.RolePolicy("dst",
name="AWSLogDeliveryFirehoseCrossAccountRolePolicy",
role=dst.name,
policy=dst_role_policy.json)
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/ec2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
// For source account
src, err := ec2.NewVpc(ctx, "src", nil)
if err != nil {
return err
}
srcAssumeRolePolicy, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Actions: []string{
"sts:AssumeRole",
},
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"delivery.logs.amazonaws.com",
},
},
},
},
},
}, nil)
if err != nil {
return err
}
srcRole, err := iam.NewRole(ctx, "src", &iam.RoleArgs{
Name: pulumi.String("tf-example-mySourceRole"),
AssumeRolePolicy: pulumi.String(srcAssumeRolePolicy.Json),
})
if err != nil {
return err
}
// For destination account
dstAssumeRolePolicy := iam.GetPolicyDocumentOutput(ctx, iam.GetPolicyDocumentOutputArgs{
Statements: iam.GetPolicyDocumentStatementArray{
&iam.GetPolicyDocumentStatementArgs{
Actions: pulumi.StringArray{
pulumi.String("sts:AssumeRole"),
},
Effect: pulumi.String("Allow"),
Principals: iam.GetPolicyDocumentStatementPrincipalArray{
&iam.GetPolicyDocumentStatementPrincipalArgs{
Type: pulumi.String("AWS"),
Identifiers: pulumi.StringArray{
srcRole.Arn,
},
},
},
},
},
}, nil)
dst, err := iam.NewRole(ctx, "dst", &iam.RoleArgs{
Name: pulumi.String("AWSLogDeliveryFirehoseCrossAccountRole"),
AssumeRolePolicy: pulumi.String(dstAssumeRolePolicy.ApplyT(func(dstAssumeRolePolicy iam.GetPolicyDocumentResult) (*string, error) {
return &dstAssumeRolePolicy.Json, nil
}).(pulumi.StringPtrOutput)),
})
if err != nil {
return err
}
srcRolePolicy := iam.GetPolicyDocumentOutput(ctx, iam.GetPolicyDocumentOutputArgs{
Statements: iam.GetPolicyDocumentStatementArray{
&iam.GetPolicyDocumentStatementArgs{
Effect: pulumi.String("Allow"),
Actions: pulumi.StringArray{
pulumi.String("iam:PassRole"),
},
Resources: pulumi.StringArray{
srcRole.Arn,
},
Conditions: iam.GetPolicyDocumentStatementConditionArray{
&iam.GetPolicyDocumentStatementConditionArgs{
Test: pulumi.String("StringEquals"),
Variable: pulumi.String("iam:PassedToService"),
Values: pulumi.StringArray{
pulumi.String("delivery.logs.amazonaws.com"),
},
},
&iam.GetPolicyDocumentStatementConditionArgs{
Test: pulumi.String("StringLike"),
Variable: pulumi.String("iam:AssociatedResourceARN"),
Values: pulumi.StringArray{
src.Arn,
},
},
},
},
&iam.GetPolicyDocumentStatementArgs{
Effect: pulumi.String("Allow"),
Actions: pulumi.StringArray{
pulumi.String("logs:CreateLogDelivery"),
pulumi.String("logs:DeleteLogDelivery"),
pulumi.String("logs:ListLogDeliveries"),
pulumi.String("logs:GetLogDelivery"),
},
Resources: pulumi.StringArray{
pulumi.String("*"),
},
},
&iam.GetPolicyDocumentStatementArgs{
Effect: pulumi.String("Allow"),
Actions: pulumi.StringArray{
pulumi.String("sts:AssumeRole"),
},
Resources: pulumi.StringArray{
dst.Arn,
},
},
},
}, nil)
_, err = iam.NewRolePolicy(ctx, "src_policy", &iam.RolePolicyArgs{
Name: pulumi.String("tf-example-mySourceRolePolicy"),
Role: srcRole.Name,
Policy: pulumi.String(srcRolePolicy.ApplyT(func(srcRolePolicy iam.GetPolicyDocumentResult) (*string, error) {
return &srcRolePolicy.Json, nil
}).(pulumi.StringPtrOutput)),
})
if err != nil {
return err
}
dstFirehoseDeliveryStream, err := kinesis.NewFirehoseDeliveryStream(ctx, "dst", &kinesis.FirehoseDeliveryStreamArgs{
Tags: pulumi.StringMap{
"LogDeliveryEnabled": pulumi.String("true"),
},
})
if err != nil {
return err
}
_, err = ec2.NewFlowLog(ctx, "src", &ec2.FlowLogArgs{
LogDestinationType: pulumi.String("kinesis-data-firehose"),
LogDestination: dstFirehoseDeliveryStream.Arn,
TrafficType: pulumi.String("ALL"),
VpcId: src.ID(),
IamRoleArn: srcRole.Arn,
DeliverCrossAccountRole: dst.Arn,
})
if err != nil {
return err
}
dstRolePolicy, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Actions: []string{
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
},
Resources: []string{
"*",
},
},
},
}, nil)
if err != nil {
return err
}
_, err = iam.NewRolePolicy(ctx, "dst", &iam.RolePolicyArgs{
Name: pulumi.String("AWSLogDeliveryFirehoseCrossAccountRolePolicy"),
Role: dst.Name,
Policy: pulumi.String(dstRolePolicy.Json),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
// For source account
var src = new Aws.Ec2.Vpc("src");
var srcAssumeRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Actions = new[]
{
"sts:AssumeRole",
},
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"delivery.logs.amazonaws.com",
},
},
},
},
},
});
var srcRole = new Aws.Iam.Role("src", new()
{
Name = "tf-example-mySourceRole",
AssumeRolePolicy = srcAssumeRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
// For destination account
var dstAssumeRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Actions = new[]
{
"sts:AssumeRole",
},
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "AWS",
Identifiers = new[]
{
srcRole.Arn,
},
},
},
},
},
});
var dst = new Aws.Iam.Role("dst", new()
{
Name = "AWSLogDeliveryFirehoseCrossAccountRole",
AssumeRolePolicy = dstAssumeRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var srcRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"iam:PassRole",
},
Resources = new[]
{
srcRole.Arn,
},
Conditions = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "StringEquals",
Variable = "iam:PassedToService",
Values = new[]
{
"delivery.logs.amazonaws.com",
},
},
new Aws.Iam.Inputs.GetPolicyDocumentStatementConditionInputArgs
{
Test = "StringLike",
Variable = "iam:AssociatedResourceARN",
Values = new[]
{
src.Arn,
},
},
},
},
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery",
},
Resources = new[]
{
"*",
},
},
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"sts:AssumeRole",
},
Resources = new[]
{
dst.Arn,
},
},
},
});
var srcPolicy = new Aws.Iam.RolePolicy("src_policy", new()
{
Name = "tf-example-mySourceRolePolicy",
Role = srcRole.Name,
Policy = srcRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var dstFirehoseDeliveryStream = new Aws.Kinesis.FirehoseDeliveryStream("dst", new()
{
Tags =
{
{ "LogDeliveryEnabled", "true" },
},
});
var srcFlowLog = new Aws.Ec2.FlowLog("src", new()
{
LogDestinationType = "kinesis-data-firehose",
LogDestination = dstFirehoseDeliveryStream.Arn,
TrafficType = "ALL",
VpcId = src.Id,
IamRoleArn = srcRole.Arn,
DeliverCrossAccountRole = dst.Arn,
});
var dstRolePolicy = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Actions = new[]
{
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream",
},
Resources = new[]
{
"*",
},
},
},
});
var dstRolePolicy2 = new Aws.Iam.RolePolicy("dst", new()
{
Name = "AWSLogDeliveryFirehoseCrossAccountRolePolicy",
Role = dst.Name,
Policy = dstRolePolicy.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.ec2.Vpc;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.iam.RolePolicy;
import com.pulumi.aws.iam.RolePolicyArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.ec2.FlowLog;
import com.pulumi.aws.ec2.FlowLogArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
// For source account
var src = new Vpc("src");
final var srcAssumeRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.actions("sts:AssumeRole")
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("delivery.logs.amazonaws.com")
.build())
.build())
.build());
var srcRole = new Role("srcRole", RoleArgs.builder()
.name("tf-example-mySourceRole")
.assumeRolePolicy(srcAssumeRolePolicy.json())
.build());
// For destination account
final var dstAssumeRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.actions("sts:AssumeRole")
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("AWS")
.identifiers(srcRole.arn())
.build())
.build())
.build());
var dst = new Role("dst", RoleArgs.builder()
.name("AWSLogDeliveryFirehoseCrossAccountRole")
.assumeRolePolicy(dstAssumeRolePolicy.applyValue(_dstAssumeRolePolicy -> _dstAssumeRolePolicy.json()))
.build());
final var srcRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions("iam:PassRole")
.resources(srcRole.arn())
.conditions(
GetPolicyDocumentStatementConditionArgs.builder()
.test("StringEquals")
.variable("iam:PassedToService")
.values("delivery.logs.amazonaws.com")
.build(),
GetPolicyDocumentStatementConditionArgs.builder()
.test("StringLike")
.variable("iam:AssociatedResourceARN")
.values(src.arn())
.build())
.build(),
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"logs:CreateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:GetLogDelivery")
.resources("*")
.build(),
GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions("sts:AssumeRole")
.resources(dst.arn())
.build())
.build());
var srcPolicy = new RolePolicy("srcPolicy", RolePolicyArgs.builder()
.name("tf-example-mySourceRolePolicy")
.role(srcRole.name())
.policy(srcRolePolicy.applyValue(_srcRolePolicy -> _srcRolePolicy.json()))
.build());
var dstFirehoseDeliveryStream = new FirehoseDeliveryStream("dstFirehoseDeliveryStream", FirehoseDeliveryStreamArgs.builder()
.tags(Map.of("LogDeliveryEnabled", "true"))
.build());
var srcFlowLog = new FlowLog("srcFlowLog", FlowLogArgs.builder()
.logDestinationType("kinesis-data-firehose")
.logDestination(dstFirehoseDeliveryStream.arn())
.trafficType("ALL")
.vpcId(src.id())
.iamRoleArn(srcRole.arn())
.deliverCrossAccountRole(dst.arn())
.build());
final var dstRolePolicy = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.actions(
"iam:CreateServiceLinkedRole",
"firehose:TagDeliveryStream")
.resources("*")
.build())
.build());
var dstRolePolicy2 = new RolePolicy("dstRolePolicy2", RolePolicyArgs.builder()
.name("AWSLogDeliveryFirehoseCrossAccountRolePolicy")
.role(dst.name())
.policy(dstRolePolicy.json())
.build());
}
}
resources:
# For source account
src:
type: aws:ec2:Vpc
srcRole:
type: aws:iam:Role
name: src
properties:
name: tf-example-mySourceRole
assumeRolePolicy: ${srcAssumeRolePolicy.json}
srcPolicy:
type: aws:iam:RolePolicy
name: src_policy
properties:
name: tf-example-mySourceRolePolicy
role: ${srcRole.name}
policy: ${srcRolePolicy.json}
srcFlowLog:
type: aws:ec2:FlowLog
name: src
properties:
logDestinationType: kinesis-data-firehose
logDestination: ${dstFirehoseDeliveryStream.arn}
trafficType: ALL
vpcId: ${src.id}
iamRoleArn: ${srcRole.arn}
deliverCrossAccountRole: ${dst.arn}
dst:
type: aws:iam:Role
properties:
name: AWSLogDeliveryFirehoseCrossAccountRole
assumeRolePolicy: ${dstAssumeRolePolicy.json}
dstRolePolicy2:
type: aws:iam:RolePolicy
name: dst
properties:
name: AWSLogDeliveryFirehoseCrossAccountRolePolicy
role: ${dst.name}
policy: ${dstRolePolicy.json}
dstFirehoseDeliveryStream:
type: aws:kinesis:FirehoseDeliveryStream
name: dst
properties:
tags:
LogDeliveryEnabled: 'true'
variables:
srcAssumeRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- actions:
- sts:AssumeRole
effect: Allow
principals:
- type: Service
identifiers:
- delivery.logs.amazonaws.com
srcRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
actions:
- iam:PassRole
resources:
- ${srcRole.arn}
conditions:
- test: StringEquals
variable: iam:PassedToService
values:
- delivery.logs.amazonaws.com
- test: StringLike
variable: iam:AssociatedResourceARN
values:
- ${src.arn}
- effect: Allow
actions:
- logs:CreateLogDelivery
- logs:DeleteLogDelivery
- logs:ListLogDeliveries
- logs:GetLogDelivery
resources:
- '*'
- effect: Allow
actions:
- sts:AssumeRole
resources:
- ${dst.arn}
# For destination account
dstAssumeRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- actions:
- sts:AssumeRole
effect: Allow
principals:
- type: AWS
identifiers:
- ${srcRole.arn}
dstRolePolicy:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
actions:
- iam:CreateServiceLinkedRole
- firehose:TagDeliveryStream
resources:
- '*'
Cross-account delivery requires two IAM roles: one in the source account (iamRoleArn) that can assume a role in the destination account (deliverCrossAccountRole). The source role needs permissions to create log deliveries and assume the destination role. The destination role needs permissions to write to the Firehose stream. The logDestinationType switches to “kinesis-data-firehose”, and logDestination points to the Firehose stream ARN in the destination account.
Beyond these examples
These snippets focus on specific flow log features: CloudWatch, S3, and Firehose destinations, Parquet formatting and partitioning, and cross-account delivery. They’re intentionally minimal rather than full network monitoring solutions.
The examples may reference pre-existing infrastructure such as VPCs, subnets, or network interfaces to monitor, CloudWatch Log Groups, S3 buckets, or Firehose delivery streams, and IAM roles with appropriate trust relationships and permissions. They focus on configuring the flow log rather than provisioning everything around it.
To keep things focused, common flow log patterns are omitted, including:
- Custom log formats (logFormat property)
- Traffic filtering (trafficType: ACCEPT or REJECT only)
- Aggregation intervals (maxAggregationInterval)
- Subnet, ENI, Transit Gateway, or NAT Gateway scopes
These omissions are intentional: the goal is to illustrate how each flow log feature is wired, not provide drop-in monitoring modules. See the VPC Flow Log resource reference for all available configuration options.
Let's configure AWS VPC Flow Logs
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Configuration & Resource Attachment
eniId, regionalNatGatewayId, subnetId, transitGatewayId, transitGatewayAttachmentId, or vpcId. Choose based on your monitoring scope (VPC-wide, subnet-level, or specific interface).cloud-watch-logs (default), s3, and kinesis-data-firehose. Set via logDestinationType.ACCEPT (accepted traffic), REJECT (rejected traffic), or ALL (both). This property is required when attaching to ENI, NAT Gateway, subnet, or VPC.IAM & Permissions
logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents, logs:DescribeLogGroups, and logs:DescribeLogStreams.deliverCrossAccountRole with the destination account’s IAM role ARN. The source role needs sts:AssumeRole permission for the destination role, and the destination role must trust the source role ARN.Advanced Features
logDestinationType to s3 and configure destinationOptions with fileFormat: "parquet". You can also enable perHourPartition: true for hourly partitions.60 seconds (1 minute) or 600 seconds (10 minutes). The default is 600 seconds.transitGatewayId or transitGatewayAttachmentId, maxAggregationInterval must be set to 60 seconds (1 minute).Limitations & Immutability
logDestination, logDestinationType, maxAggregationInterval, trafficType, destinationOptions, deliverCrossAccountRole, and all resource attachment IDs (eniId, subnetId, vpcId, etc.). Changing them forces resource replacement.eniId, subnetId, vpcId, etc.) are immutable. You must create a new flow log to monitor a different resource.Using a different cloud?
Explore networking guides for other cloud providers: