The aws:kinesis/firehoseDeliveryStream:FirehoseDeliveryStream resource, part of the Pulumi AWS provider, defines a Kinesis Firehose delivery stream that ingests real-time data and delivers it to destinations like S3, data warehouses, search engines, or third-party services. This guide focuses on three capabilities: S3 delivery with transformation and partitioning, data warehouse and search engine loading, and third-party observability platform integration.
A delivery stream doesn’t operate in isolation. It requires IAM roles for execution, destination resources like S3 buckets or Redshift clusters, and optionally Lambda functions for transformation. The examples are intentionally small and won’t run standalone. Combine them with your own IAM policies, destination infrastructure, and data sources.
Deliver to S3 with Lambda transformation
Many pipelines begin by delivering records into S3, either as the primary storage layer or as a central point for downstream analytics tools. Firehose can pass each batch through a Lambda function before writing to S3, allowing you to normalize formats or enrich events.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const bucket = new aws.s3.Bucket("bucket", {bucket: "tf-test-bucket"});
const firehoseAssumeRole = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["firehose.amazonaws.com"],
}],
actions: ["sts:AssumeRole"],
}],
});
const firehoseRole = new aws.iam.Role("firehose_role", {
name: "firehose_test_role",
assumeRolePolicy: firehoseAssumeRole.then(firehoseAssumeRole => firehoseAssumeRole.json),
});
const lambdaAssumeRole = aws.iam.getPolicyDocument({
statements: [{
effect: "Allow",
principals: [{
type: "Service",
identifiers: ["lambda.amazonaws.com"],
}],
actions: ["sts:AssumeRole"],
}],
});
const lambdaIam = new aws.iam.Role("lambda_iam", {
name: "lambda_iam",
assumeRolePolicy: lambdaAssumeRole.then(lambdaAssumeRole => lambdaAssumeRole.json),
});
const lambdaProcessor = new aws.lambda.Function("lambda_processor", {
code: new pulumi.asset.FileArchive("lambda.zip"),
name: "firehose_lambda_processor",
role: lambdaIam.arn,
handler: "exports.handler",
runtime: aws.lambda.Runtime.NodeJS20dX,
});
const extendedS3Stream = new aws.kinesis.FirehoseDeliveryStream("extended_s3_stream", {
name: "kinesis-firehose-extended-s3-test-stream",
destination: "extended_s3",
extendedS3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
processingConfiguration: {
enabled: true,
processors: [{
type: "Lambda",
parameters: [{
parameterName: "LambdaArn",
parameterValue: pulumi.interpolate`${lambdaProcessor.arn}:$LATEST`,
}],
}],
},
},
});
const bucketAcl = new aws.s3.BucketAcl("bucket_acl", {
bucket: bucket.id,
acl: "private",
});
import pulumi
import pulumi_aws as aws
bucket = aws.s3.Bucket("bucket", bucket="tf-test-bucket")
firehose_assume_role = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["firehose.amazonaws.com"],
}],
"actions": ["sts:AssumeRole"],
}])
firehose_role = aws.iam.Role("firehose_role",
name="firehose_test_role",
assume_role_policy=firehose_assume_role.json)
lambda_assume_role = aws.iam.get_policy_document(statements=[{
"effect": "Allow",
"principals": [{
"type": "Service",
"identifiers": ["lambda.amazonaws.com"],
}],
"actions": ["sts:AssumeRole"],
}])
lambda_iam = aws.iam.Role("lambda_iam",
name="lambda_iam",
assume_role_policy=lambda_assume_role.json)
lambda_processor = aws.lambda_.Function("lambda_processor",
code=pulumi.FileArchive("lambda.zip"),
name="firehose_lambda_processor",
role=lambda_iam.arn,
handler="exports.handler",
runtime=aws.lambda_.Runtime.NODE_JS20D_X)
extended_s3_stream = aws.kinesis.FirehoseDeliveryStream("extended_s3_stream",
name="kinesis-firehose-extended-s3-test-stream",
destination="extended_s3",
extended_s3_configuration={
"role_arn": firehose_role.arn,
"bucket_arn": bucket.arn,
"processing_configuration": {
"enabled": True,
"processors": [{
"type": "Lambda",
"parameters": [{
"parameter_name": "LambdaArn",
"parameter_value": lambda_processor.arn.apply(lambda arn: f"{arn}:$LATEST"),
}],
}],
},
})
bucket_acl = aws.s3.BucketAcl("bucket_acl",
bucket=bucket.id,
acl="private")
package main
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
bucket, err := s3.NewBucket(ctx, "bucket", &s3.BucketArgs{
Bucket: pulumi.String("tf-test-bucket"),
})
if err != nil {
return err
}
firehoseAssumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"firehose.amazonaws.com",
},
},
},
Actions: []string{
"sts:AssumeRole",
},
},
},
}, nil)
if err != nil {
return err
}
firehoseRole, err := iam.NewRole(ctx, "firehose_role", &iam.RoleArgs{
Name: pulumi.String("firehose_test_role"),
AssumeRolePolicy: pulumi.String(firehoseAssumeRole.Json),
})
if err != nil {
return err
}
lambdaAssumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
Statements: []iam.GetPolicyDocumentStatement{
{
Effect: pulumi.StringRef("Allow"),
Principals: []iam.GetPolicyDocumentStatementPrincipal{
{
Type: "Service",
Identifiers: []string{
"lambda.amazonaws.com",
},
},
},
Actions: []string{
"sts:AssumeRole",
},
},
},
}, nil)
if err != nil {
return err
}
lambdaIam, err := iam.NewRole(ctx, "lambda_iam", &iam.RoleArgs{
Name: pulumi.String("lambda_iam"),
AssumeRolePolicy: pulumi.String(lambdaAssumeRole.Json),
})
if err != nil {
return err
}
lambdaProcessor, err := lambda.NewFunction(ctx, "lambda_processor", &lambda.FunctionArgs{
Code: pulumi.NewFileArchive("lambda.zip"),
Name: pulumi.String("firehose_lambda_processor"),
Role: lambdaIam.Arn,
Handler: pulumi.String("exports.handler"),
Runtime: pulumi.String(lambda.RuntimeNodeJS20dX),
})
if err != nil {
return err
}
_, err = kinesis.NewFirehoseDeliveryStream(ctx, "extended_s3_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-extended-s3-test-stream"),
Destination: pulumi.String("extended_s3"),
ExtendedS3Configuration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationArgs{
RoleArn: firehoseRole.Arn,
BucketArn: bucket.Arn,
ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs{
Enabled: pulumi.Bool(true),
Processors: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("Lambda"),
Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("LambdaArn"),
ParameterValue: lambdaProcessor.Arn.ApplyT(func(arn string) (string, error) {
return fmt.Sprintf("%v:$LATEST", arn), nil
}).(pulumi.StringOutput),
},
},
},
},
},
},
})
if err != nil {
return err
}
_, err = s3.NewBucketAcl(ctx, "bucket_acl", &s3.BucketAclArgs{
Bucket: bucket.ID(),
Acl: pulumi.String("private"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var bucket = new Aws.S3.Bucket("bucket", new()
{
BucketName = "tf-test-bucket",
});
var firehoseAssumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"firehose.amazonaws.com",
},
},
},
Actions = new[]
{
"sts:AssumeRole",
},
},
},
});
var firehoseRole = new Aws.Iam.Role("firehose_role", new()
{
Name = "firehose_test_role",
AssumeRolePolicy = firehoseAssumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var lambdaAssumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
{
Statements = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
{
Effect = "Allow",
Principals = new[]
{
new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
{
Type = "Service",
Identifiers = new[]
{
"lambda.amazonaws.com",
},
},
},
Actions = new[]
{
"sts:AssumeRole",
},
},
},
});
var lambdaIam = new Aws.Iam.Role("lambda_iam", new()
{
Name = "lambda_iam",
AssumeRolePolicy = lambdaAssumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
});
var lambdaProcessor = new Aws.Lambda.Function("lambda_processor", new()
{
Code = new FileArchive("lambda.zip"),
Name = "firehose_lambda_processor",
Role = lambdaIam.Arn,
Handler = "exports.handler",
Runtime = Aws.Lambda.Runtime.NodeJS20dX,
});
var extendedS3Stream = new Aws.Kinesis.FirehoseDeliveryStream("extended_s3_stream", new()
{
Name = "kinesis-firehose-extended-s3-test-stream",
Destination = "extended_s3",
ExtendedS3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs
{
Enabled = true,
Processors = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "Lambda",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "LambdaArn",
ParameterValue = lambdaProcessor.Arn.Apply(arn => $"{arn}:$LATEST"),
},
},
},
},
},
},
});
var bucketAcl = new Aws.S3.BucketAcl("bucket_acl", new()
{
Bucket = bucket.Id,
Acl = "private",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.lambda.Function;
import com.pulumi.aws.lambda.FunctionArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import com.pulumi.aws.s3.BucketAcl;
import com.pulumi.aws.s3.BucketAclArgs;
import com.pulumi.asset.FileArchive;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var bucket = new Bucket("bucket", BucketArgs.builder()
.bucket("tf-test-bucket")
.build());
final var firehoseAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("firehose.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
.name("firehose_test_role")
.assumeRolePolicy(firehoseAssumeRole.json())
.build());
final var lambdaAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
.statements(GetPolicyDocumentStatementArgs.builder()
.effect("Allow")
.principals(GetPolicyDocumentStatementPrincipalArgs.builder()
.type("Service")
.identifiers("lambda.amazonaws.com")
.build())
.actions("sts:AssumeRole")
.build())
.build());
var lambdaIam = new Role("lambdaIam", RoleArgs.builder()
.name("lambda_iam")
.assumeRolePolicy(lambdaAssumeRole.json())
.build());
var lambdaProcessor = new Function("lambdaProcessor", FunctionArgs.builder()
.code(new FileArchive("lambda.zip"))
.name("firehose_lambda_processor")
.role(lambdaIam.arn())
.handler("exports.handler")
.runtime("nodejs20.x")
.build());
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-extended-s3-test-stream")
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled(true)
.processors(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(lambdaProcessor.arn().applyValue(_arn -> String.format("%s:$LATEST", _arn)))
.build())
.build())
.build())
.build())
.build());
var bucketAcl = new BucketAcl("bucketAcl", BucketAclArgs.builder()
.bucket(bucket.id())
.acl("private")
.build());
}
}
resources:
extendedS3Stream:
type: aws:kinesis:FirehoseDeliveryStream
name: extended_s3_stream
properties:
name: kinesis-firehose-extended-s3-test-stream
destination: extended_s3
extendedS3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
processingConfiguration:
enabled: 'true'
processors:
- type: Lambda
parameters:
- parameterName: LambdaArn
parameterValue: ${lambdaProcessor.arn}:$LATEST
bucket:
type: aws:s3:Bucket
properties:
bucket: tf-test-bucket
bucketAcl:
type: aws:s3:BucketAcl
name: bucket_acl
properties:
bucket: ${bucket.id}
acl: private
firehoseRole:
type: aws:iam:Role
name: firehose_role
properties:
name: firehose_test_role
assumeRolePolicy: ${firehoseAssumeRole.json}
lambdaIam:
type: aws:iam:Role
name: lambda_iam
properties:
name: lambda_iam
assumeRolePolicy: ${lambdaAssumeRole.json}
lambdaProcessor:
type: aws:lambda:Function
name: lambda_processor
properties:
code:
fn::FileArchive: lambda.zip
name: firehose_lambda_processor
role: ${lambdaIam.arn}
handler: exports.handler
runtime: nodejs20.x
variables:
firehoseAssumeRole:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
principals:
- type: Service
identifiers:
- firehose.amazonaws.com
actions:
- sts:AssumeRole
lambdaAssumeRole:
fn::invoke:
function: aws:iam:getPolicyDocument
arguments:
statements:
- effect: Allow
principals:
- type: Service
identifiers:
- lambda.amazonaws.com
actions:
- sts:AssumeRole
In this configuration, Firehose sends incoming records to your Lambda function, waits for the transformed payload, and then writes the results to the S3 bucket. The processingConfiguration enables Lambda-based transformation, and the LambdaArn parameter points to the function version to invoke. The extendedS3Configuration block defines the destination bucket and IAM role that Firehose assumes to write objects.
Partition S3 data by extracted fields
When storing large volumes of events, organizing data by time or business dimensions improves query performance and reduces costs. Dynamic partitioning extracts fields from your data and uses them to structure the S3 prefix.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const extendedS3Stream = new aws.kinesis.FirehoseDeliveryStream("extended_s3_stream", {
name: "kinesis-firehose-extended-s3-test-stream",
destination: "extended_s3",
extendedS3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 64,
dynamicPartitioningConfiguration: {
enabled: true,
},
prefix: "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
errorOutputPrefix: "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
processingConfiguration: {
enabled: true,
processors: [
{
type: "RecordDeAggregation",
parameters: [{
parameterName: "SubRecordType",
parameterValue: "JSON",
}],
},
{
type: "AppendDelimiterToRecord",
},
{
type: "MetadataExtraction",
parameters: [
{
parameterName: "JsonParsingEngine",
parameterValue: "JQ-1.6",
},
{
parameterName: "MetadataExtractionQuery",
parameterValue: "{customer_id:.customer_id}",
},
],
},
],
},
},
});
import pulumi
import pulumi_aws as aws
extended_s3_stream = aws.kinesis.FirehoseDeliveryStream("extended_s3_stream",
name="kinesis-firehose-extended-s3-test-stream",
destination="extended_s3",
extended_s3_configuration={
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 64,
"dynamic_partitioning_configuration": {
"enabled": True,
},
"prefix": "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
"error_output_prefix": "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
"processing_configuration": {
"enabled": True,
"processors": [
{
"type": "RecordDeAggregation",
"parameters": [{
"parameter_name": "SubRecordType",
"parameter_value": "JSON",
}],
},
{
"type": "AppendDelimiterToRecord",
},
{
"type": "MetadataExtraction",
"parameters": [
{
"parameter_name": "JsonParsingEngine",
"parameter_value": "JQ-1.6",
},
{
"parameter_name": "MetadataExtractionQuery",
"parameter_value": "{customer_id:.customer_id}",
},
],
},
],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := kinesis.NewFirehoseDeliveryStream(ctx, "extended_s3_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-extended-s3-test-stream"),
Destination: pulumi.String("extended_s3"),
ExtendedS3Configuration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(64),
DynamicPartitioningConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs{
Enabled: pulumi.Bool(true),
},
Prefix: pulumi.String("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"),
ErrorOutputPrefix: pulumi.String("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/"),
ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs{
Enabled: pulumi.Bool(true),
Processors: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("RecordDeAggregation"),
Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("SubRecordType"),
ParameterValue: pulumi.String("JSON"),
},
},
},
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("AppendDelimiterToRecord"),
},
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("MetadataExtraction"),
Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("JsonParsingEngine"),
ParameterValue: pulumi.String("JQ-1.6"),
},
&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("MetadataExtractionQuery"),
ParameterValue: pulumi.String("{customer_id:.customer_id}"),
},
},
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var extendedS3Stream = new Aws.Kinesis.FirehoseDeliveryStream("extended_s3_stream", new()
{
Name = "kinesis-firehose-extended-s3-test-stream",
Destination = "extended_s3",
ExtendedS3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 64,
DynamicPartitioningConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs
{
Enabled = true,
},
Prefix = "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
ErrorOutputPrefix = "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs
{
Enabled = true,
Processors = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "RecordDeAggregation",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "SubRecordType",
ParameterValue = "JSON",
},
},
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "AppendDelimiterToRecord",
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
{
Type = "MetadataExtraction",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "JsonParsingEngine",
ParameterValue = "JQ-1.6",
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "MetadataExtractionQuery",
ParameterValue = "{customer_id:.customer_id}",
},
},
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-extended-s3-test-stream")
.destination("extended_s3")
.extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(64)
.dynamicPartitioningConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs.builder()
.enabled(true)
.build())
.prefix("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/")
.errorOutputPrefix("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/")
.processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
.enabled(true)
.processors(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("RecordDeAggregation")
.parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("SubRecordType")
.parameterValue("JSON")
.build())
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("AppendDelimiterToRecord")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
.type("MetadataExtraction")
.parameters(
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("JsonParsingEngine")
.parameterValue("JQ-1.6")
.build(),
FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("MetadataExtractionQuery")
.parameterValue("{customer_id:.customer_id}")
.build())
.build())
.build())
.build())
.build());
}
}
resources:
extendedS3Stream:
type: aws:kinesis:FirehoseDeliveryStream
name: extended_s3_stream
properties:
name: kinesis-firehose-extended-s3-test-stream
destination: extended_s3
extendedS3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 64
dynamicPartitioningConfiguration:
enabled: 'true'
prefix: data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
errorOutputPrefix: errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/
processingConfiguration:
enabled: 'true'
processors:
- type: RecordDeAggregation
parameters:
- parameterName: SubRecordType
parameterValue: JSON
- type: AppendDelimiterToRecord
- type: MetadataExtraction
parameters:
- parameterName: JsonParsingEngine
parameterValue: JQ-1.6
- parameterName: MetadataExtractionQuery
parameterValue: '{customer_id:.customer_id}'
Dynamic partitioning reads fields from incoming records using JQ expressions and injects them into the S3 prefix template. The MetadataExtraction processor extracts customer_id from each record, and the prefix template references it with !{partitionKeyFromQuery:customer_id}. Firehose automatically creates the directory structure as records arrive. The errorOutputPrefix routes failed records to a separate prefix for debugging.
Load data into Redshift with S3 staging
Analytics teams often load streaming data into Redshift for SQL-based analysis. Firehose stages data in S3, then executes COPY commands to load it into your Redshift tables.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testCluster = new aws.redshift.Cluster("test_cluster", {
clusterIdentifier: "tf-redshift-cluster",
databaseName: "test",
masterUsername: "testuser",
masterPassword: "T3stPass",
nodeType: "dc1.large",
clusterType: "single-node",
});
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "redshift",
redshiftConfiguration: {
roleArn: firehoseRole.arn,
clusterJdbcurl: pulumi.interpolate`jdbc:redshift://${testCluster.endpoint}/${testCluster.databaseName}`,
username: "testuser",
password: "T3stPass",
dataTableName: "test-table",
copyOptions: "delimiter '|'",
dataTableColumns: "test-col",
s3BackupMode: "Enabled",
s3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
s3BackupConfiguration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 15,
bufferingInterval: 300,
compressionFormat: "GZIP",
},
},
});
import pulumi
import pulumi_aws as aws
test_cluster = aws.redshift.Cluster("test_cluster",
cluster_identifier="tf-redshift-cluster",
database_name="test",
master_username="testuser",
master_password="T3stPass",
node_type="dc1.large",
cluster_type="single-node")
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="redshift",
redshift_configuration={
"role_arn": firehose_role["arn"],
"cluster_jdbcurl": pulumi.Output.all(
endpoint=test_cluster.endpoint,
database_name=test_cluster.database_name
).apply(lambda resolved_outputs: f"jdbc:redshift://{resolved_outputs['endpoint']}/{resolved_outputs['database_name']}")
,
"username": "testuser",
"password": "T3stPass",
"data_table_name": "test-table",
"copy_options": "delimiter '|'",
"data_table_columns": "test-col",
"s3_backup_mode": "Enabled",
"s3_configuration": {
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
"s3_backup_configuration": {
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 15,
"buffering_interval": 300,
"compression_format": "GZIP",
},
})
package main
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/redshift"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
testCluster, err := redshift.NewCluster(ctx, "test_cluster", &redshift.ClusterArgs{
ClusterIdentifier: pulumi.String("tf-redshift-cluster"),
DatabaseName: pulumi.String("test"),
MasterUsername: pulumi.String("testuser"),
MasterPassword: pulumi.String("T3stPass"),
NodeType: pulumi.String("dc1.large"),
ClusterType: pulumi.String("single-node"),
})
if err != nil {
return err
}
_, err = kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("redshift"),
RedshiftConfiguration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
ClusterJdbcurl: pulumi.All(testCluster.Endpoint, testCluster.DatabaseName).ApplyT(func(_args []interface{}) (string, error) {
endpoint := _args[0].(string)
databaseName := _args[1].(string)
return fmt.Sprintf("jdbc:redshift://%v/%v", endpoint, databaseName), nil
}).(pulumi.StringOutput),
Username: pulumi.String("testuser"),
Password: pulumi.String("T3stPass"),
DataTableName: pulumi.String("test-table"),
CopyOptions: pulumi.String("delimiter '|'"),
DataTableColumns: pulumi.String("test-col"),
S3BackupMode: pulumi.String("Enabled"),
S3Configuration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
S3BackupConfiguration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(15),
BufferingInterval: pulumi.Int(300),
CompressionFormat: pulumi.String("GZIP"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testCluster = new Aws.RedShift.Cluster("test_cluster", new()
{
ClusterIdentifier = "tf-redshift-cluster",
DatabaseName = "test",
MasterUsername = "testuser",
MasterPassword = "T3stPass",
NodeType = "dc1.large",
ClusterType = "single-node",
});
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "redshift",
RedshiftConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs
{
RoleArn = firehoseRole.Arn,
ClusterJdbcurl = Output.Tuple(testCluster.Endpoint, testCluster.DatabaseName).Apply(values =>
{
var endpoint = values.Item1;
var databaseName = values.Item2;
return $"jdbc:redshift://{endpoint}/{databaseName}";
}),
Username = "testuser",
Password = "T3stPass",
DataTableName = "test-table",
CopyOptions = "delimiter '|'",
DataTableColumns = "test-col",
S3BackupMode = "Enabled",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
S3BackupConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 15,
BufferingInterval = 300,
CompressionFormat = "GZIP",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.redshift.Cluster;
import com.pulumi.aws.redshift.ClusterArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Cluster("testCluster", ClusterArgs.builder()
.clusterIdentifier("tf-redshift-cluster")
.databaseName("test")
.masterUsername("testuser")
.masterPassword("T3stPass")
.nodeType("dc1.large")
.clusterType("single-node")
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("redshift")
.redshiftConfiguration(FirehoseDeliveryStreamRedshiftConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.clusterJdbcurl(Output.tuple(testCluster.endpoint(), testCluster.databaseName()).applyValue(values -> {
var endpoint = values.t1;
var databaseName = values.t2;
return String.format("jdbc:redshift://%s/%s", endpoint,databaseName);
}))
.username("testuser")
.password("T3stPass")
.dataTableName("test-table")
.copyOptions("delimiter '|'")
.dataTableColumns("test-col")
.s3BackupMode("Enabled")
.s3Configuration(FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.s3BackupConfiguration(FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(15)
.bufferingInterval(300)
.compressionFormat("GZIP")
.build())
.build())
.build());
}
}
resources:
testCluster:
type: aws:redshift:Cluster
name: test_cluster
properties:
clusterIdentifier: tf-redshift-cluster
databaseName: test
masterUsername: testuser
masterPassword: T3stPass
nodeType: dc1.large
clusterType: single-node
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: redshift
redshiftConfiguration:
roleArn: ${firehoseRole.arn}
clusterJdbcurl: jdbc:redshift://${testCluster.endpoint}/${testCluster.databaseName}
username: testuser
password: T3stPass
dataTableName: test-table
copyOptions: delimiter '|'
dataTableColumns: test-col
s3BackupMode: Enabled
s3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
s3BackupConfiguration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 15
bufferingInterval: 300
compressionFormat: GZIP
Firehose buffers records in S3, then issues COPY commands to load them into the Redshift table specified by dataTableName. The clusterJdbcurl constructs the JDBC connection string from your cluster endpoint and database name. The copyOptions property passes parameters to the COPY command (here, setting the delimiter). The s3BackupMode and s3BackupConfiguration define where to store records that fail to load.
Index records in OpenSearch for search and analytics
Applications that need full-text search or log analytics often stream data into OpenSearch. Firehose delivers records to an OpenSearch domain and index, with S3 as a backup destination.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testCluster = new aws.opensearch.Domain("test_cluster", {domainName: "firehose-os-test"});
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "opensearch",
opensearchConfiguration: {
domainArn: testCluster.arn,
roleArn: firehoseRole.arn,
indexName: "test",
s3Configuration: {
roleArn: firehoseRole.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
processingConfiguration: {
enabled: true,
processors: [{
type: "Lambda",
parameters: [{
parameterName: "LambdaArn",
parameterValue: `${lambdaProcessor.arn}:$LATEST`,
}],
}],
},
},
});
import pulumi
import pulumi_aws as aws
test_cluster = aws.opensearch.Domain("test_cluster", domain_name="firehose-os-test")
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="opensearch",
opensearch_configuration={
"domain_arn": test_cluster.arn,
"role_arn": firehose_role["arn"],
"index_name": "test",
"s3_configuration": {
"role_arn": firehose_role["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
"processing_configuration": {
"enabled": True,
"processors": [{
"type": "Lambda",
"parameters": [{
"parameter_name": "LambdaArn",
"parameter_value": f"{lambda_processor['arn']}:$LATEST",
}],
}],
},
})
package main
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/opensearch"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
testCluster, err := opensearch.NewDomain(ctx, "test_cluster", &opensearch.DomainArgs{
DomainName: pulumi.String("firehose-os-test"),
})
if err != nil {
return err
}
_, err = kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("opensearch"),
OpensearchConfiguration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationArgs{
DomainArn: testCluster.Arn,
RoleArn: pulumi.Any(firehoseRole.Arn),
IndexName: pulumi.String("test"),
S3Configuration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehoseRole.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs{
Enabled: pulumi.Bool(true),
Processors: kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArray{
&kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs{
Type: pulumi.String("Lambda"),
Parameters: kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArray{
&kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs{
ParameterName: pulumi.String("LambdaArn"),
ParameterValue: pulumi.Sprintf("%v:$LATEST", lambdaProcessor.Arn),
},
},
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testCluster = new Aws.OpenSearch.Domain("test_cluster", new()
{
DomainName = "firehose-os-test",
});
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "opensearch",
OpensearchConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs
{
DomainArn = testCluster.Arn,
RoleArn = firehoseRole.Arn,
IndexName = "test",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs
{
RoleArn = firehoseRole.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs
{
Enabled = true,
Processors = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs
{
Type = "Lambda",
Parameters = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs
{
ParameterName = "LambdaArn",
ParameterValue = $"{lambdaProcessor.Arn}:$LATEST",
},
},
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.opensearch.Domain;
import com.pulumi.aws.opensearch.DomainArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testCluster = new Domain("testCluster", DomainArgs.builder()
.domainName("firehose-os-test")
.build());
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("opensearch")
.opensearchConfiguration(FirehoseDeliveryStreamOpensearchConfigurationArgs.builder()
.domainArn(testCluster.arn())
.roleArn(firehoseRole.arn())
.indexName("test")
.s3Configuration(FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs.builder()
.roleArn(firehoseRole.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.processingConfiguration(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs.builder()
.enabled(true)
.processors(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs.builder()
.type("Lambda")
.parameters(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs.builder()
.parameterName("LambdaArn")
.parameterValue(String.format("%s:$LATEST", lambdaProcessor.arn()))
.build())
.build())
.build())
.build())
.build());
}
}
resources:
testCluster:
type: aws:opensearch:Domain
name: test_cluster
properties:
domainName: firehose-os-test
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: opensearch
opensearchConfiguration:
domainArn: ${testCluster.arn}
roleArn: ${firehoseRole.arn}
indexName: test
s3Configuration:
roleArn: ${firehoseRole.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
processingConfiguration:
enabled: 'true'
processors:
- type: Lambda
parameters:
- parameterName: LambdaArn
parameterValue: ${lambdaProcessor.arn}:$LATEST
Firehose writes records to the OpenSearch index specified by indexName. The domainArn points to your OpenSearch domain, and the roleArn grants permissions to write to it. The s3Configuration block defines a backup destination for records that fail to index. The optional processingConfiguration can transform records before indexing.
Send events to Splunk HEC endpoint
Security and operations teams using Splunk can stream events directly to a Splunk HTTP Event Collector endpoint. Failed events are backed up to S3.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "splunk",
splunkConfiguration: {
hecEndpoint: "https://http-inputs-mydomain.splunkcloud.com:443",
hecToken: "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
hecAcknowledgmentTimeout: 600,
hecEndpointType: "Event",
s3BackupMode: "FailedEventsOnly",
s3Configuration: {
roleArn: firehose.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
},
});
import pulumi
import pulumi_aws as aws
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="splunk",
splunk_configuration={
"hec_endpoint": "https://http-inputs-mydomain.splunkcloud.com:443",
"hec_token": "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
"hec_acknowledgment_timeout": 600,
"hec_endpoint_type": "Event",
"s3_backup_mode": "FailedEventsOnly",
"s3_configuration": {
"role_arn": firehose["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("splunk"),
SplunkConfiguration: &kinesis.FirehoseDeliveryStreamSplunkConfigurationArgs{
HecEndpoint: pulumi.String("https://http-inputs-mydomain.splunkcloud.com:443"),
HecToken: pulumi.String("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A"),
HecAcknowledgmentTimeout: pulumi.Int(600),
HecEndpointType: pulumi.String("Event"),
S3BackupMode: pulumi.String("FailedEventsOnly"),
S3Configuration: &kinesis.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehose.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "splunk",
SplunkConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamSplunkConfigurationArgs
{
HecEndpoint = "https://http-inputs-mydomain.splunkcloud.com:443",
HecToken = "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
HecAcknowledgmentTimeout = 600,
HecEndpointType = "Event",
S3BackupMode = "FailedEventsOnly",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs
{
RoleArn = firehose.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("splunk")
.splunkConfiguration(FirehoseDeliveryStreamSplunkConfigurationArgs.builder()
.hecEndpoint("https://http-inputs-mydomain.splunkcloud.com:443")
.hecToken("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A")
.hecAcknowledgmentTimeout(600)
.hecEndpointType("Event")
.s3BackupMode("FailedEventsOnly")
.s3Configuration(FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs.builder()
.roleArn(firehose.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.build())
.build());
}
}
resources:
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: splunk
splunkConfiguration:
hecEndpoint: https://http-inputs-mydomain.splunkcloud.com:443
hecToken: 51D4DA16-C61B-4F5F-8EC7-ED4301342A4A
hecAcknowledgmentTimeout: 600
hecEndpointType: Event
s3BackupMode: FailedEventsOnly
s3Configuration:
roleArn: ${firehose.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
Firehose delivers records to the Splunk HEC endpoint specified by hecEndpoint. The hecToken authenticates requests, and hecAcknowledgmentTimeout controls how long Firehose waits for Splunk to confirm receipt. The s3BackupMode determines which records go to S3: FailedEventsOnly sends only rejected events, while AllEvents backs up everything.
Deliver to HTTP endpoints with custom headers
SaaS platforms often expose HTTP endpoints for streaming data ingestion. Firehose can deliver to any HTTP endpoint with custom headers and attributes.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
name: "kinesis-firehose-test-stream",
destination: "http_endpoint",
httpEndpointConfiguration: {
url: "https://aws-api.newrelic.com/firehose/v1",
name: "New Relic",
accessKey: "my-key",
bufferingSize: 15,
bufferingInterval: 600,
roleArn: firehose.arn,
s3BackupMode: "FailedDataOnly",
s3Configuration: {
roleArn: firehose.arn,
bucketArn: bucket.arn,
bufferingSize: 10,
bufferingInterval: 400,
compressionFormat: "GZIP",
},
requestConfiguration: {
contentEncoding: "GZIP",
commonAttributes: [
{
name: "testname",
value: "testvalue",
},
{
name: "testname2",
value: "testvalue2",
},
],
},
},
});
import pulumi
import pulumi_aws as aws
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
name="kinesis-firehose-test-stream",
destination="http_endpoint",
http_endpoint_configuration={
"url": "https://aws-api.newrelic.com/firehose/v1",
"name": "New Relic",
"access_key": "my-key",
"buffering_size": 15,
"buffering_interval": 600,
"role_arn": firehose["arn"],
"s3_backup_mode": "FailedDataOnly",
"s3_configuration": {
"role_arn": firehose["arn"],
"bucket_arn": bucket["arn"],
"buffering_size": 10,
"buffering_interval": 400,
"compression_format": "GZIP",
},
"request_configuration": {
"content_encoding": "GZIP",
"common_attributes": [
{
"name": "testname",
"value": "testvalue",
},
{
"name": "testname2",
"value": "testvalue2",
},
],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
Name: pulumi.String("kinesis-firehose-test-stream"),
Destination: pulumi.String("http_endpoint"),
HttpEndpointConfiguration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationArgs{
Url: pulumi.String("https://aws-api.newrelic.com/firehose/v1"),
Name: pulumi.String("New Relic"),
AccessKey: pulumi.String("my-key"),
BufferingSize: pulumi.Int(15),
BufferingInterval: pulumi.Int(600),
RoleArn: pulumi.Any(firehose.Arn),
S3BackupMode: pulumi.String("FailedDataOnly"),
S3Configuration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs{
RoleArn: pulumi.Any(firehose.Arn),
BucketArn: pulumi.Any(bucket.Arn),
BufferingSize: pulumi.Int(10),
BufferingInterval: pulumi.Int(400),
CompressionFormat: pulumi.String("GZIP"),
},
RequestConfiguration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs{
ContentEncoding: pulumi.String("GZIP"),
CommonAttributes: kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArray{
&kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs{
Name: pulumi.String("testname"),
Value: pulumi.String("testvalue"),
},
&kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs{
Name: pulumi.String("testname2"),
Value: pulumi.String("testvalue2"),
},
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
{
Name = "kinesis-firehose-test-stream",
Destination = "http_endpoint",
HttpEndpointConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs
{
Url = "https://aws-api.newrelic.com/firehose/v1",
Name = "New Relic",
AccessKey = "my-key",
BufferingSize = 15,
BufferingInterval = 600,
RoleArn = firehose.Arn,
S3BackupMode = "FailedDataOnly",
S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs
{
RoleArn = firehose.Arn,
BucketArn = bucket.Arn,
BufferingSize = 10,
BufferingInterval = 400,
CompressionFormat = "GZIP",
},
RequestConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs
{
ContentEncoding = "GZIP",
CommonAttributes = new[]
{
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs
{
Name = "testname",
Value = "testvalue",
},
new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs
{
Name = "testname2",
Value = "testvalue2",
},
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
.name("kinesis-firehose-test-stream")
.destination("http_endpoint")
.httpEndpointConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationArgs.builder()
.url("https://aws-api.newrelic.com/firehose/v1")
.name("New Relic")
.accessKey("my-key")
.bufferingSize(15)
.bufferingInterval(600)
.roleArn(firehose.arn())
.s3BackupMode("FailedDataOnly")
.s3Configuration(FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs.builder()
.roleArn(firehose.arn())
.bucketArn(bucket.arn())
.bufferingSize(10)
.bufferingInterval(400)
.compressionFormat("GZIP")
.build())
.requestConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs.builder()
.contentEncoding("GZIP")
.commonAttributes(
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname")
.value("testvalue")
.build(),
FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
.name("testname2")
.value("testvalue2")
.build())
.build())
.build())
.build());
}
}
resources:
testStream:
type: aws:kinesis:FirehoseDeliveryStream
name: test_stream
properties:
name: kinesis-firehose-test-stream
destination: http_endpoint
httpEndpointConfiguration:
url: https://aws-api.newrelic.com/firehose/v1
name: New Relic
accessKey: my-key
bufferingSize: 15
bufferingInterval: 600
roleArn: ${firehose.arn}
s3BackupMode: FailedDataOnly
s3Configuration:
roleArn: ${firehose.arn}
bucketArn: ${bucket.arn}
bufferingSize: 10
bufferingInterval: 400
compressionFormat: GZIP
requestConfiguration:
contentEncoding: GZIP
commonAttributes:
- name: testname
value: testvalue
- name: testname2
value: testvalue2
Firehose POSTs records to the URL specified in the httpEndpointConfiguration. The accessKey authenticates requests, and the requestConfiguration block adds custom headers via commonAttributes. The contentEncoding property compresses payloads before transmission. The s3BackupMode and s3Configuration define backup behavior for failed deliveries.
Beyond These Examples
These snippets focus on specific delivery stream features: S3 delivery with Lambda transformation and dynamic partitioning, data warehouse loading (Redshift), search and analytics (OpenSearch), and observability platforms (Splunk, HTTP endpoints). They’re intentionally minimal rather than full data pipeline deployments.
The examples may reference pre-existing infrastructure such as IAM execution roles for Firehose and destination access, S3 buckets for data delivery and error backup, destination resources (Redshift clusters, OpenSearch domains, Splunk HEC endpoints), and Lambda functions for data transformation. They focus on configuring the delivery stream rather than provisioning everything around it.
To keep things focused, common delivery stream patterns are omitted, including:
- VPC configuration for private destination access
- Data format conversion (Parquet, ORC)
- CloudWatch logging and monitoring
- Encryption at rest (serverSideEncryption)
- Source configuration (Kinesis streams, MSK clusters)
- Buffering size and interval tuning
These omissions are intentional: the goal is to illustrate how each destination is wired, not provide drop-in pipeline modules. See the Kinesis Firehose Delivery Stream resource reference for all available configuration options.
Frequently Asked Questions
Destination Configuration & Deprecations
s3 destination is deprecated and does not support import operations. Use extended_s3 instead for new delivery streams.extended_s3, redshift, elasticsearch, opensearch, opensearchserverless, splunk, http_endpoint, snowflake, and iceberg.s3Configuration block for data staging and backup.Sources & Encryption
kinesisSourceConfiguration is set.destination, name, kinesisSourceConfiguration, and mskSourceConfiguration properties cannot be changed after creation.VPC & Networking
ec2:DescribeVpcs, ec2:DescribeVpcAttribute, ec2:DescribeSubnets, ec2:DescribeSecurityGroups, ec2:DescribeNetworkInterfaces, ec2:CreateNetworkInterface, ec2:CreateNetworkInterfacePermission, and ec2:DeleteNetworkInterface permissions.dependsOn ensures the IAM role policy is fully attached before creating the delivery stream, preventing permission errors.Dynamic Partitioning
RecordDeAggregation, AppendDelimiterToRecord, and MetadataExtraction in the processingConfiguration block.MetadataExtractionQuery parameter value.WAF Integration & Naming
aws-waf-logs-.Import & Migration
s3 destination. Migrate to extended_s3 for import support.Ready to get started?
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Create free account