The aws:kinesis/firehoseDeliveryStream:FirehoseDeliveryStream resource, part of the Pulumi AWS provider, defines a Kinesis Firehose delivery stream that ingests real-time data and delivers it to destinations like S3, Redshift, OpenSearch, Splunk, or HTTP endpoints. This guide focuses on four capabilities: S3 delivery with Lambda transformation, dynamic partitioning for organized storage, data warehouse and search platform integration, and HTTP endpoint delivery with custom configuration.
Delivery streams depend on IAM roles, destination resources, and optionally Lambda functions for transformation. The examples are intentionally small. Combine them with your own IAM policies, destination infrastructure, and monitoring configuration.
Deliver to S3 with Lambda processing
Many pipelines begin by delivering records into S3, either as the primary storage layer or as a central point for downstream analytics tools. Firehose can pass each batch through a Lambda function before writing to S3, allowing you to normalize formats or enrich events.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const bucket = new aws.s3.Bucket("bucket", {bucket: "tf-test-bucket"});
const firehoseAssumeRole = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["firehose.amazonaws.com"],
}],
actions: ["sts:AssumeRole"],
}],
});
const firehoseRole = new aws.iam.Role("firehose_role", {
name: "firehose_test_role",
assumeRolePolicy: firehoseAssumeRole.then(firehoseAssumeRole => firehoseAssumeRole.json),
});
const lambdaAssumeRole = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["lambda.amazonaws.com"],
}],
actions: ["sts:AssumeRole"],
}],
});
const lambdaIam = new aws.iam.Role("lambda_iam", {
name: "lambda_iam",
assumeRolePolicy: lambdaAssumeRole.then(lambdaAssumeRole => lambdaAssumeRole.json),
});
const lambdaProcessor = new aws.lambda.Function("lambda_processor", {
code: new pulumi.asset.FileArchive("lambda.zip"),
name: "firehose_lambda_processor",
role: lambdaIam.arn,
handler: "exports.handler",
runtime: aws.lambda.Runtime.NodeJS20dX,
});
const extendedS3Stream = new aws.kinesis.FirehoseDeliveryStream("extended_s3_stream", {
name: "kinesis-firehose-extended-s3-test-stream",
destination: "extended_s3",
extendedS3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
processingConfiguration: {
enabled: true,
processors: [{
type: "Lambda",
parameters: [{
parameterName: "LambdaArn",
parameterValue: pulumi.interpolate`${lambdaProcessor.arn}:$LATEST`,
}],
}],
},
},
});
const bucketAcl = new aws.s3.BucketAcl("bucket_acl", {
bucket: bucket.id,
acl: "private",
});
import pulumi
import pulumi_aws as aws
bucket = aws.s3.Bucket("bucket", bucket="tf-test-bucket")
firehose_assume_role = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["firehose.amazonaws.com"],
}],
"actions": ["sts:AssumeRole"],
}])
firehose_role = aws.iam.Role("firehose_role",
name="firehose_test_role",
assume_role_policy=firehose_assume_role.json)
lambda_assume_role = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["lambda.amazonaws.com"],
}],
"actions": ["sts:AssumeRole"],
}])
lambda_iam = aws.iam.Role("lambda_iam",
name="lambda_iam",
assume_role_policy=lambda_assume_role.json)
lambda_processor = aws.lambda_.Function("lambda_processor",
code=pulumi.FileArchive("lambda.zip"),
name="firehose_lambda_processor",
role=lambda_iam.arn,
handler="exports.handler",
runtime=aws.lambda_.Runtime.NODE_JS20D_X)
extended_s3_stream = aws.kinesis.FirehoseDeliveryStream("extended_s3_stream",
name="kinesis-firehose-extended-s3-test-stream",
destination="extended_s3",
extended_s3_configuration={
"role_arn": firehose_role.arn,
"bucket_arn": bucket.arn,
"processing_configuration": {
"enabled": True,
"processors": [{
"type": "Lambda",
"parameters": [{
"parameter_name": "LambdaArn",
"parameter_value": lambda_processor.arn.apply(lambda arn: f"{arn}:$LATEST"),
}],
}],
},
})
bucket_acl = aws.s3.BucketAcl("bucket_acl",
bucket=bucket.id,
acl="private")
package main
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
bucket, err := s3.NewBucket(ctx, "bucket", &s3.BucketArgs{
Bucket: pulumi.String("tf-test-bucket"),
})
if err != nil {
return err
}
firehoseAssumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"firehose.amazonaws.com",
},
},
},
Actions: []string{
"sts:AssumeRole",
},
},
},
}, nil)
if err != nil {
return err
}
firehoseRole, err := iam.NewRole(ctx, "firehose_role", &iam.RoleArgs{
Name: pulumi.String("firehose_test_role"),
AssumeRolePolicy: pulumi.String(firehoseAssumeRole.Json),
})
if err != nil {
return err
}
lambdaAssumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"lambda.amazonaws.com",
},
},
},
Actions: []string{
"sts:AssumeRole",
},
},
},
}, nil)
if err != nil {
return err
}
lambdaIam, err := iam.NewRole(ctx, "lambda_iam", &iam.RoleArgs{
Name: pulumi.String("lambda_iam"),
AssumeRolePolicy: pulumi.String(lambdaAssumeRole.Json),
})
if err != nil {
return err
}
lambdaProcessor, err := lambda.NewFunction(ctx, "lambda_processor", &lambda.FunctionArgs{
Code: pulumi.NewFileArchive("lambda.zip"),
Name: pulumi.String("firehose_lambda_processor"),
Role: lambdaIam.Arn,
Handler: pulumi.String("exports.handler"),
Runtime: pulumi.String(lambda.RuntimeNodeJS20dX),
})
if err != nil {
return err
}
_, err = kinesis.NewFirehoseDeliveryStream(ctx, "extended_s3_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-extended-s3-test-stream"),
Destination: pulumi.String("extended_s3"),
ExtendedS3Configuration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationArgs{
RoleArn: firehoseRole.Arn,
BucketArn: bucket.Arn,
ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs{
Enabled: pulumi.Bool(true),
Processors: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("Lambda"),
Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("LambdaArn"),
ParameterValue: lambdaProcessor.Arn.ApplyT(func(arn string) (string, error) {
return fmt.Sprintf("%v:$LATEST", arn), nil
}).(pulumi.StringOutput),
},
},
},
},
},
},
})
if err != nil {
return err
}
_, err = s3.NewBucketAcl(ctx, "bucket_acl", &s3.BucketAclArgs{
Bucket: bucket.ID(),
Acl: pulumi.String("private"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var bucket = new Aws.S3.Bucket("bucket", new()
{
BucketName = "tf-test-bucket",
});
var firehoseAssumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"firehose.amazonaws.com",
},
},
},
Actions = new[]
{
"sts:AssumeRole",
},
},
},
});
var firehoseRole = new Aws.Iam.Role("firehose_role", new()
{
Name = "firehose_test_role",
AssumeRolePolicy = firehoseAssumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var lambdaAssumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"lambda.amazonaws.com",
},
},
},
Actions = new[]
{
"sts:AssumeRole",
},
},
},
});
var lambdaIam = new Aws.Iam.Role("lambda_iam", new()
{
Name = "lambda_iam",
AssumeRolePolicy = lambdaAssumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var lambdaProcessor = new Aws.Lambda.Function("lambda_processor", new()
{
Code = new FileArchive("lambda.zip"),
Name = "firehose_lambda_processor",
Role = lambdaIam.Arn,
Handler = "exports.handler",
Runtime = Aws.Lambda.Runtime.NodeJS20dX,
});
var extendedS3Stream = new Aws.Kinesis.FirehoseDeliveryStream("extended_s3_stream", new()
{
Name = "kinesis-firehose-extended-s3-test-stream",
Destination = "extended_s3",
ExtendedS3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs
{
Enabled = true,
Processors = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "Lambda",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "LambdaArn",
ParameterValue = lambdaProcessor.Arn.Apply(arn => $"{arn}:$LATEST"),
},
},
},
},
},
},
});
var bucketAcl = new Aws.S3.BucketAcl("bucket_acl", new()
{
Bucket = bucket.Id,
Acl = "private",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.lambda.Function;
import com.pulumi.aws.lambda.FunctionArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import com.pulumi.aws.s3.BucketAcl;
import com.pulumi.aws.s3.BucketAclArgs;
import com.pulumi.asset.FileArchive;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var bucket = new Bucket("bucket", BucketArgs.builder()
.bucket("tf-test-bucket")
.build());
final var firehoseAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("firehose.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
.name("firehose_test_role")
.assumeRolePolicy(firehoseAssumeRole.json())
.build());
final var lambdaAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("lambda.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var lambdaIam = new Role("lambdaIam", RoleArgs.builder()
.name("lambda_iam")
.assumeRolePolicy(lambdaAssumeRole.json())
.build());
var lambdaProcessor = new Function("lambdaProcessor", FunctionArgs.builder()
.code(new FileArchive("lambda.zip"))
.name("firehose_lambda_processor")
.role(lambdaIam.arn())
.handler("exports.handler")
.runtime("nodejs20.x")
.build());
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-extended-s3-test-stream")
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled(true)
.processors(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(lambdaProcessor.arn().applyValue(_arn -> String.format("%s:$LATEST", _arn)))
.build())
.build())
.build())
.build())
.build());
var bucketAcl = new BucketAcl("bucketAcl", BucketAclArgs.builder()
.bucket(bucket.id())
.acl("private")
.build());
}
}
resources:
extendedS3Stream:
type: aws:kinesis:FirehoseDeliveryStream
name: extended_s3_stream
properties:
name: kinesis-firehose-extended-s3-test-stream
destination: extended_s3
extendedS3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
processingConfiguration:
enabled: 'true'
processors:
- type: Lambda
parameters:
- parameterName: LambdaArn
parameterValue: ${lambdaProcessor.arn}:$LATEST
bucket:
type: aws:s3:Bucket
properties:
bucket: tf-test-bucket
bucketAcl:
type: aws:s3:BucketAcl
name: bucket_acl
properties:
bucket: ${bucket.id}
acl: private
firehoseRole:
type: aws:iam:Role
name: firehose_role
properties:
name: firehose_test_role
assumeRolePolicy: ${firehoseAssumeRole.json}
lambdaIam:
type: aws:iam:Role
name: lambda_iam
properties:
name: lambda_iam
assumeRolePolicy: ${lambdaAssumeRole.json}
lambdaProcessor:
type: aws:lambda:Function
name: lambda_processor
properties:
code:
fn::FileArchive: lambda.zip
name: firehose_lambda_processor
role: ${lambdaIam.arn}
handler: exports.handler
runtime: nodejs20.x
variables:
firehoseAssumeRole:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
principals:
- type: Service
identifiers:
- firehose.amazonaws.com
actions:
- sts:AssumeRole
lambdaAssumeRole:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
principals:
- type: Service
identifiers:
- lambda.amazonaws.com
actions:
- sts:AssumeRole
In this configuration, Firehose sends incoming records to your Lambda function, waits for the transformed payload, and then writes the results to the S3 bucket. The processingConfiguration enables Lambda-based transformation, and the parameters specify which Lambda function version to invoke. The roleArn grants Firehose permission to invoke Lambda and write to S3.
Partition S3 data by extracted fields
Applications that store high-volume event streams often need to organize data by customer, region, or time for efficient querying. Dynamic partitioning extracts fields from records and uses them to construct S3 prefixes automatically.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const extendedS3Stream = new aws.kinesis.FirehoseDeliveryStream("extended_s3_stream", {
name: "kinesis-firehose-extended-s3-test-stream",
destination: "extended_s3",
extendedS3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 64,
dynamicPartitioningConfiguration: {
enabled: true,
},
prefix: "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
errorOutputPrefix: "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
processingConfiguration: {
enabled: true,
processors: [
{
type: "RecordDeAggregation",
parameters: [{
parameterName: "SubRecordType",
parameterValue: "JSON",
}],
},
{
type: "AppendDelimiterToRecord",
},
{
type: "MetadataExtraction",
parameters: [
{
parameterName: "JsonParsingEngine",
parameterValue: "JQ-1.6",
},
{
parameterName: "MetadataExtractionQuery",
parameterValue: "{customer_id:.customer_id}",
},
],
},
],
},
},
});
import pulumi
import pulumi_aws as aws
extended_s3_stream = aws.kinesis.FirehoseDeliveryStream("extended_s3_stream",
name="kinesis-firehose-extended-s3-test-stream",
destination="extended_s3",
extended_s3_configuration={
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 64,
"dynamic_partitioning_configuration": {
"enabled": True,
},
"prefix": "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
"error_output_prefix": "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
"processing_configuration": {
"enabled": True,
"processors": [
{
"type": "RecordDeAggregation",
"parameters": [{
"parameter_name": "SubRecordType",
"parameter_value": "JSON",
}],
},
{
"type": "AppendDelimiterToRecord",
},
{
"type": "MetadataExtraction",
"parameters": [
{
"parameter_name": "JsonParsingEngine",
"parameter_value": "JQ-1.6",
},
{
"parameter_name": "MetadataExtractionQuery",
"parameter_value": "{customer_id:.customer_id}",
},
],
},
],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := kinesis.NewFirehoseDeliveryStream(ctx, "extended_s3_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-extended-s3-test-stream"),
Destination: pulumi.String("extended_s3"),
ExtendedS3Configuration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(64),
DynamicPartitioningConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs{
Enabled: pulumi.Bool(true),
},
Prefix: pulumi.String("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"),
ErrorOutputPrefix: pulumi.String("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/"),
ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs{
Enabled: pulumi.Bool(true),
Processors: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("RecordDeAggregation"),
Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("SubRecordType"),
ParameterValue: pulumi.String("JSON"),
},
},
},
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("AppendDelimiterToRecord"),
},
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("MetadataExtraction"),
Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("JsonParsingEngine"),
ParameterValue: pulumi.String("JQ-1.6"),
},
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("MetadataExtractionQuery"),
ParameterValue: pulumi.String("{customer_id:.customer_id}"),
},
},
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var extendedS3Stream = new Aws.Kinesis.FirehoseDeliveryStream("extended_s3_stream", new()
{
Name = "kinesis-firehose-extended-s3-test-stream",
Destination = "extended_s3",
ExtendedS3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 64,
DynamicPartitioningConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs
{
Enabled = true,
},
Prefix = "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
ErrorOutputPrefix = "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs
{
Enabled = true,
Processors = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "RecordDeAggregation",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "SubRecordType",
ParameterValue = "JSON",
},
},
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "AppendDelimiterToRecord",
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "MetadataExtraction",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "JsonParsingEngine",
ParameterValue = "JQ-1.6",
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "MetadataExtractionQuery",
ParameterValue = "{customer_id:.customer_id}",
},
},
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-extended-s3-test-stream")
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(64)
.dynamicPartitioningConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs.builder()
.enabled(true)
.build())
.prefix("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/")
.errorOutputPrefix("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/")
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled(true)
.processors(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("RecordDeAggregation")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("SubRecordType")
.parameterValue("JSON")
.build())
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("AppendDelimiterToRecord")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("MetadataExtraction")
.parameters(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("JsonParsingEngine")
.parameterValue("JQ-1.6")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("MetadataExtractionQuery")
.parameterValue("{customer_id:.customer_id}")
.build())
.build())
.build())
.build())
.build());
}
}
resources:
extendedS3Stream:
type: aws:kinesis:FirehoseDeliveryStream
name: extended_s3_stream
properties:
name: kinesis-firehose-extended-s3-test-stream
destination: extended_s3
extendedS3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 64
dynamicPartitioningConfiguration:
enabled: 'true'
prefix: data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
errorOutputPrefix: errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/
processingConfiguration:
enabled: 'true'
processors:
- type: RecordDeAggregation
parameters:
- parameterName: SubRecordType
parameterValue: JSON
- type: AppendDelimiterToRecord
- type: MetadataExtraction
parameters:
- parameterName: JsonParsingEngine
parameterValue: JQ-1.6
- parameterName: MetadataExtractionQuery
parameterValue: '{customer_id:.customer_id}'
When dynamicPartitioningConfiguration is enabled, Firehose uses the MetadataExtraction processor to run JQ queries against each record. The query {customer_id:.customer_id} extracts the customer_id field, which then populates the !{partitionKeyFromQuery:customer_id} placeholder in the prefix. Records are automatically organized into folders like data/customer_id=12345/year=2024/month=01/. The errorOutputPrefix handles records that fail processing.
Load data into Redshift tables
Data warehouses like Redshift require structured data loading with specific table schemas and COPY commands. Firehose stages data in S3, then executes COPY operations to load it into Redshift tables.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testCluster = new aws.redshift.Cluster("test_cluster", {
clusterIdentifier: "tf-redshift-cluster",
databaseName: "test",
masterUsername: "testuser",
masterPassword: "T3stPass",
nodeType: "dc1.large",
clusterType: "single-node",
});
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "redshift",
redshiftConfiguration: {
roleArn: firehoseRole.arn,
clusterJdbcurl: pulumi.interpolate`jdbc:redshift://${testCluster.endpoint}/${testCluster.databaseName}`,
username: "testuser",
password: "T3stPass",
dataTableName: "test-table",
copyOptions: "delimiter '|'",
dataTableColumns: "test-col",
s3BackupMode: "Enabled",
s3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
s3BackupConfiguration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 15,
bufferingInterval: 300,
compressionFormat: "GZIP",
},
},
});
import pulumi
import pulumi_aws as aws
test_cluster = aws.redshift.Cluster("test_cluster",
cluster_identifier="tf-redshift-cluster",
database_name="test",
master_username="testuser",
master_password="T3stPass",
node_type="dc1.large",
cluster_type="single-node")
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="redshift",
redshift_configuration={
"role_arn": firehose_role["arn"],
"cluster_jdbcurl": pulumi.Output.all(
endpoint=test_cluster.endpoint,
database_name=test_cluster.database_name
).apply(lambda resolved_outputs: f"jdbc:redshift://{resolved_outputs['endpoint']}/{resolved_outputs['database_name']}")
,
"username": "testuser",
"password": "T3stPass",
"data_table_name": "test-table",
"copy_options": "delimiter '|'",
"data_table_columns": "test-col",
"s3_backup_mode": "Enabled",
"s3_configuration": {
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
"s3_backup_configuration": {
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 15,
"buffering_interval": 300,
"compression_format": "GZIP",
},
})
package main
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/redshift"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
testCluster, err := redshift.NewCluster(ctx, "test_cluster", &redshift.ClusterArgs{
ClusterIdentifier: pulumi.String("tf-redshift-cluster"),
DatabaseName: pulumi.String("test"),
MasterUsername: pulumi.String("testuser"),
MasterPassword: pulumi.String("T3stPass"),
NodeType: pulumi.String("dc1.large"),
ClusterType: pulumi.String("single-node"),
})
if err != nil {
return err
}
_, err = kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("redshift"),
RedshiftConfiguration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
ClusterJdbcurl: pulumi.All(testCluster.Endpoint, testCluster.DatabaseName).ApplyT(func(_args []interface{}) (string, error) {
endpoint := _args[0].(string)
databaseName := _args[1].(string)
return fmt.Sprintf("jdbc:redshift://%v/%v", endpoint, databaseName), nil
}).(pulumi.StringOutput),
Username: pulumi.String("testuser"),
Password: pulumi.String("T3stPass"),
DataTableName: pulumi.String("test-table"),
CopyOptions: pulumi.String("delimiter '|'"),
DataTableColumns: pulumi.String("test-col"),
S3BackupMode: pulumi.String("Enabled"),
S3Configuration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
S3BackupConfiguration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(15),
BufferingInterval: pulumi.Int(300),
CompressionFormat: pulumi.String("GZIP"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testCluster = new Aws.RedShift.Cluster("test_cluster", new()
{
ClusterIdentifier = "tf-redshift-cluster",
DatabaseName = "test",
MasterUsername = "testuser",
MasterPassword = "T3stPass",
NodeType = "dc1.large",
ClusterType = "single-node",
});
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "redshift",
RedshiftConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs
{
RoleArn = firehoseRole.Arn,
ClusterJdbcurl = Output.Tuple(testCluster.Endpoint, testCluster.DatabaseName).Apply(values =>
{
var endpoint = values.Item1;
var databaseName = values.Item2;
return $"jdbc:redshift://{endpoint}/{databaseName}";
}),
Username = "testuser",
Password = "T3stPass",
DataTableName = "test-table",
CopyOptions = "delimiter '|'",
DataTableColumns = "test-col",
S3BackupMode = "Enabled",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
S3BackupConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 15,
BufferingInterval = 300,
CompressionFormat = "GZIP",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.redshift.Cluster;
import com.pulumi.aws.redshift.ClusterArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Cluster("testCluster", ClusterArgs.builder()
.clusterIdentifier("tf-redshift-cluster")
.databaseName("test")
.masterUsername("testuser")
.masterPassword("T3stPass")
.nodeType("dc1.large")
.clusterType("single-node")
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("redshift")
.redshiftConfiguration(FirehoseDeliveryStreamRedshiftConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.clusterJdbcurl(Output.tuple(testCluster.endpoint(), testCluster.databaseName()).applyValue(values -> {
var endpoint = values.t1;
var databaseName = values.t2;
return String.format("jdbc:redshift://%s/%s", endpoint,databaseName);
}))
.username("testuser")
.password("T3stPass")
.dataTableName("test-table")
.copyOptions("delimiter '|'")
.dataTableColumns("test-col")
.s3BackupMode("Enabled")
.s3Configuration(FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.s3BackupConfiguration(FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(15)
.bufferingInterval(300)
.compressionFormat("GZIP")
.build())
.build())
.build());
}
}
resources:
testCluster:
type: aws:redshift:Cluster
name: test_cluster
properties:
clusterIdentifier: tf-redshift-cluster
databaseName: test
masterUsername: testuser
masterPassword: T3stPass
nodeType: dc1.large
clusterType: single-node
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: redshift
redshiftConfiguration:
roleArn: ${firehoseRole.arn}
clusterJdbcurl: jdbc:redshift://${testCluster.endpoint}/${testCluster.databaseName}
username: testuser
password: T3stPass
dataTableName: test-table
copyOptions: delimiter '|'
dataTableColumns: test-col
s3BackupMode: Enabled
s3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
s3BackupConfiguration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 15
bufferingInterval: 300
compressionFormat: GZIP
Firehose first writes records to S3, then issues a Redshift COPY command using the clusterJdbcurl connection string. The dataTableName specifies the target table, and copyOptions passes parameters to the COPY command (like delimiter characters). The s3BackupMode determines whether all data or only failed records are kept in S3. The s3BackupConfiguration defines a separate bucket for backup data.
Index records in OpenSearch
Search and analytics workloads often use OpenSearch to index and query event data in near real-time. Firehose delivers records to an OpenSearch domain and index.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testCluster = new aws.opensearch.Domain("test_cluster", {domainName: "firehose-os-test"});
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "opensearch",
opensearchConfiguration: {
domainArn: testCluster.arn,
roleArn: firehoseRole.arn,
indexName: "test",
s3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
processingConfiguration: {
enabled: true,
processors: [{
type: "Lambda",
parameters: [{
parameterName: "LambdaArn",
parameterValue: `${lambdaProcessor.arn}:$LATEST`,
}],
}],
},
},
});
import pulumi
import pulumi_aws as aws
test_cluster = aws.opensearch.Domain("test_cluster", domain_name="firehose-os-test")
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="opensearch",
opensearch_configuration={
"domain_arn": test_cluster.arn,
"role_arn": firehose_role["arn"],
"index_name": "test",
"s3_configuration": {
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
"processing_configuration": {
"enabled": True,
"processors": [{
"type": "Lambda",
"parameters": [{
"parameter_name": "LambdaArn",
"parameter_value": f"{lambda_processor['arn']}:$LATEST",
}],
}],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/opensearch"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
testCluster, err := opensearch.NewDomain(ctx, "test_cluster", &opensearch.DomainArgs{
DomainName: pulumi.String("firehose-os-test"),
})
if err != nil {
return err
}
_, err = kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("opensearch"),
OpensearchConfiguration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationArgs{
DomainArn: testCluster.Arn,
RoleArn: pulumi.Any(firehoseRole.Arn),
IndexName: pulumi.String("test"),
S3Configuration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs{
Enabled: pulumi.Bool(true),
Processors: kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArray{
&kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("Lambda"),
Parameters: kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("LambdaArn"),
ParameterValue: pulumi.Sprintf("%v:$LATEST", lambdaProcessor.Arn),
},
},
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testCluster = new Aws.OpenSearch.Domain("test_cluster", new()
{
DomainName = "firehose-os-test",
});
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "opensearch",
OpensearchConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs
{
DomainArn = testCluster.Arn,
RoleArn = firehoseRole.Arn,
IndexName = "test",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs
{
Enabled = true,
Processors = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs
{
Type = "Lambda",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "LambdaArn",
ParameterValue = $"{lambdaProcessor.Arn}:$LATEST",
},
},
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.opensearch.Domain;
import com.pulumi.aws.opensearch.DomainArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster", DomainArgs.builder()
.domainName("firehose-os-test")
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("opensearch")
.opensearchConfiguration(FirehoseDeliveryStreamOpensearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(firehoseRole.arn())
.indexName("test")
.s3Configuration(FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.processingConfiguration(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs.builder()
.enabled(true)
.processors(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(String.format("%s:$LATEST", lambdaProcessor.arn()))
.build())
.build())
.build())
.build())
.build());
}
}
resources:
testCluster:
type: aws:opensearch:Domain
name: test_cluster
properties:
domainName: firehose-os-test
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: opensearch
opensearchConfiguration:
domainArn: ${testCluster.arn}
roleArn: ${firehoseRole.arn}
indexName: test
s3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
processingConfiguration:
enabled: 'true'
processors:
- type: Lambda
parameters:
- parameterName: LambdaArn
parameterValue: ${lambdaProcessor.arn}:$LATEST
Records flow to the OpenSearch domain specified by domainArn and are indexed into the indexName. The s3Configuration provides a backup destination for records that fail to index. Firehose automatically retries failed deliveries and falls back to S3 when retries are exhausted.
Send events to Splunk HEC
Observability platforms like Splunk ingest logs and metrics through HTTP Event Collector (HEC) endpoints. Firehose delivers batches to HEC with configurable acknowledgment timeouts.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "splunk",
splunkConfiguration: {
hecEndpoint: "https://http-inputs-mydomain.splunkcloud.com:443",
hecToken: "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
hecAcknowledgmentTimeout: 600,
hecEndpointType: "Event",
s3BackupMode: "FailedEventsOnly",
s3Configuration: {
roleArn: firehose.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
},
});
import pulumi
import pulumi_aws as aws
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="splunk",
splunk_configuration={
"hec_endpoint": "https://http-inputs-mydomain.splunkcloud.com:443",
"hec_token": "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
"hec_acknowledgment_timeout": 600,
"hec_endpoint_type": "Event",
"s3_backup_mode": "FailedEventsOnly",
"s3_configuration": {
"role_arn": firehose["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("splunk"),
SplunkConfiguration: &kinesis.FirehoseDeliveryStreamSplunkConfigurationArgs{
HecEndpoint: pulumi.String("https://http-inputs-mydomain.splunkcloud.com:443"),
HecToken: pulumi.String("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A"),
HecAcknowledgmentTimeout: pulumi.Int(600),
HecEndpointType: pulumi.String("Event"),
S3BackupMode: pulumi.String("FailedEventsOnly"),
S3Configuration: &kinesis.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehose.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "splunk",
SplunkConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamSplunkConfigurationArgs
{
HecEndpoint = "https://http-inputs-mydomain.splunkcloud.com:443",
HecToken = "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
HecAcknowledgmentTimeout = 600,
HecEndpointType = "Event",
S3BackupMode = "FailedEventsOnly",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs
{
RoleArn = firehose.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("splunk")
.splunkConfiguration(FirehoseDeliveryStreamSplunkConfigurationArgs.builder()
.hecEndpoint("https://http-inputs-mydomain.splunkcloud.com:443")
.hecToken("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A")
.hecAcknowledgmentTimeout(600)
.hecEndpointType("Event")
.s3BackupMode("FailedEventsOnly")
.s3Configuration(FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs.builder()
.roleArn(firehose.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.build())
.build());
}
}
resources:
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: splunk
splunkConfiguration:
hecEndpoint: https://http-inputs-mydomain.splunkcloud.com:443
hecToken: 51D4DA16-C61B-4F5F-8EC7-ED4301342A4A
hecAcknowledgmentTimeout: 600
hecEndpointType: Event
s3BackupMode: FailedEventsOnly
s3Configuration:
roleArn: ${firehose.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
The hecEndpoint points to your Splunk Cloud or Enterprise HEC URL, and the hecToken authenticates the delivery. The hecAcknowledgmentTimeout controls how long Firehose waits for Splunk to acknowledge receipt before considering the delivery failed. The s3BackupMode determines whether all events or only failed events are backed up to S3.
Deliver to HTTP endpoints with custom headers
Third-party observability and analytics platforms often provide HTTP endpoints for data ingestion. Firehose can deliver to these endpoints with custom headers and request encoding.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "http_endpoint",
httpEndpointConfiguration: {
url: "https://aws-api.newrelic.com/firehose/v1",
name: "New Relic",
accessKey: "my-key",
bufferingSize: 15,
bufferingInterval: 600,
roleArn: firehose.arn,
s3BackupMode: "FailedDataOnly",
s3Configuration: {
roleArn: firehose.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
requestConfiguration: {
contentEncoding: "GZIP",
commonAttributes: [
{
name: "testname",
value: "testvalue",
},
{
name: "testname2",
value: "testvalue2",
},
],
},
},
});
import pulumi
import pulumi_aws as aws
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="http_endpoint",
http_endpoint_configuration={
"url": "https://aws-api.newrelic.com/firehose/v1",
"name": "New Relic",
"access_key": "my-key",
"buffering_size": 15,
"buffering_interval": 600,
"role_arn": firehose["arn"],
"s3_backup_mode": "FailedDataOnly",
"s3_configuration": {
"role_arn": firehose["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
"request_configuration": {
"content_encoding": "GZIP",
"common_attributes": [
{
"name": "testname",
"value": "testvalue",
},
{
"name": "testname2",
"value": "testvalue2",
},
],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("http_endpoint"),
HttpEndpointConfiguration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationArgs{
Url: pulumi.String("https://aws-api.newrelic.com/firehose/v1"),
Name: pulumi.String("New Relic"),
AccessKey: pulumi.String("my-key"),
BufferingSize: pulumi.Int(15),
BufferingInterval: pulumi.Int(600),
RoleArn: pulumi.Any(firehose.Arn),
S3BackupMode: pulumi.String("FailedDataOnly"),
S3Configuration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehose.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
RequestConfiguration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs{
ContentEncoding: pulumi.String("GZIP"),
CommonAttributes: kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArray{
&kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs{
Name: pulumi.String("testname"),
Value: pulumi.String("testvalue"),
},
&kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs{
Name: pulumi.String("testname2"),
Value: pulumi.String("testvalue2"),
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "http_endpoint",
HttpEndpointConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs
{
Url = "https://aws-api.newrelic.com/firehose/v1",
Name = "New Relic",
AccessKey = "my-key",
BufferingSize = 15,
BufferingInterval = 600,
RoleArn = firehose.Arn,
S3BackupMode = "FailedDataOnly",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs
{
RoleArn = firehose.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
RequestConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs
{
ContentEncoding = "GZIP",
CommonAttributes = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs
{
Name = "testname",
Value = "testvalue",
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs
{
Name = "testname2",
Value = "testvalue2",
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("http_endpoint")
.httpEndpointConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationArgs.builder()
.url("https://aws-api.newrelic.com/firehose/v1")
.name("New Relic")
.accessKey("my-key")
.bufferingSize(15)
.bufferingInterval(600)
.roleArn(firehose.arn())
.s3BackupMode("FailedDataOnly")
.s3Configuration(FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs.builder()
.roleArn(firehose.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.requestConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs.builder()
.contentEncoding("GZIP")
.commonAttributes(
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname")
.value("testvalue")
.build(),
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname2")
.value("testvalue2")
.build())
.build())
.build())
.build());
}
}
resources:
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: http_endpoint
httpEndpointConfiguration:
url: https://aws-api.newrelic.com/firehose/v1
name: New Relic
accessKey: my-key
bufferingSize: 15
bufferingInterval: 600
roleArn: ${firehose.arn}
s3BackupMode: FailedDataOnly
s3Configuration:
roleArn: ${firehose.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
requestConfiguration:
contentEncoding: GZIP
commonAttributes:
- name: testname
value: testvalue
- name: testname2
value: testvalue2
The url specifies the HTTP endpoint, and the accessKey provides authentication. The requestConfiguration block controls how data is encoded (contentEncoding: “GZIP”) and adds custom headers via commonAttributes. Each attribute becomes an HTTP header in the request. The s3BackupMode ensures failed deliveries are preserved in S3.
Beyond these examples
These snippets focus on specific delivery stream features: S3 delivery with Lambda processing and dynamic partitioning, data warehouse loading, search and analytics indexing, and HTTP endpoint delivery with custom headers. They’re intentionally minimal rather than full data pipeline implementations.
The examples may reference pre-existing infrastructure such as S3 buckets for data and backup, IAM roles for Firehose and destination services, Lambda functions for transformation, and destination services like Redshift clusters, OpenSearch domains, or Splunk HEC endpoints. They focus on configuring the delivery stream rather than provisioning everything around it.
To keep things focused, common delivery stream patterns are omitted, including:
- VPC configuration for private OpenSearch/Elasticsearch access
- Server-side encryption (serverSideEncryption)
- CloudWatch logging and monitoring (cloudwatchLoggingOptions)
- Buffering and retry tuning (bufferingSize, bufferingInterval, retryOptions)
- Source configuration (Kinesis streams, MSK clusters)
- Iceberg and Snowflake destinations
These omissions are intentional: the goal is to illustrate how each delivery stream feature is wired, not provide drop-in data pipeline modules. See the Kinesis Firehose Delivery Stream resource reference for all available configuration options.
Let's create and Configure Kinesis Firehose Delivery Streams
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Configuration & Immutability
destination, name, kinesisSourceConfiguration, and mskSourceConfiguration properties are immutable. Changing any of these requires replacing the stream.s3 destination is deprecated and doesn’t support import operations. Use extended_s3 for enhanced features and full import support.extended_s3, redshift, elasticsearch, opensearch, opensearchserverless, splunk, http_endpoint, snowflake, and iceberg. The s3 destination is deprecated.Security & Encryption
Dynamic Partitioning
dynamicPartitioningConfiguration in your extendedS3Configuration and use built-in processors like MetadataExtraction with JQ queries to extract partition keys from your data.MetadataExtractionQuery parameter value.VPC & Networking
ec2:DescribeVpcs, ec2:DescribeVpcAttribute, ec2:DescribeSubnets, ec2:DescribeSecurityGroups, ec2:DescribeNetworkInterfaces, ec2:CreateNetworkInterface, ec2:CreateNetworkInterfacePermission, and ec2:DeleteNetworkInterface.WAF Integration
aws-waf-logs-.