Create AWS Kinesis Firehose Delivery Streams

The aws:kinesis/firehoseDeliveryStream:FirehoseDeliveryStream resource, part of the Pulumi AWS provider, defines a Kinesis Firehose delivery stream that buffers and delivers real-time data to destinations like S3, Redshift, OpenSearch, and third-party endpoints. This guide focuses on three capabilities: S3 delivery with transformation and partitioning, data warehouse loading, and search indexing and observability integration.

Delivery streams require IAM roles, destination resources, and optionally Lambda functions for transformation. The examples are intentionally small. Combine them with your own IAM policies, VPC configuration, and monitoring setup.

Deliver to S3 with Lambda transformation

Many pipelines begin by delivering records into S3, either as the primary storage layer or as a central point for downstream analytics tools. Firehose can pass each batch through a Lambda function before writing to S3, allowing you to normalize formats or enrich events.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const bucket = new aws.s3.Bucket("bucket", {bucket: "tf-test-bucket"});
const firehoseAssumeRole = aws.iam.getPolicyDocument({
    statements: [{
        effect: "Allow",
        principals: [{
            type: "Service",
            identifiers: ["firehose.amazonaws.com"],
        }],
        actions: ["sts:AssumeRole"],
    }],
});
const firehoseRole = new aws.iam.Role("firehose_role", {
    name: "firehose_test_role",
    assumeRolePolicy: firehoseAssumeRole.then(firehoseAssumeRole => firehoseAssumeRole.json),
});
const lambdaAssumeRole = aws.iam.getPolicyDocument({
    statements: [{
        effect: "Allow",
        principals: [{
            type: "Service",
            identifiers: ["lambda.amazonaws.com"],
        }],
        actions: ["sts:AssumeRole"],
    }],
});
const lambdaIam = new aws.iam.Role("lambda_iam", {
    name: "lambda_iam",
    assumeRolePolicy: lambdaAssumeRole.then(lambdaAssumeRole => lambdaAssumeRole.json),
});
const lambdaProcessor = new aws.lambda.Function("lambda_processor", {
    code: new pulumi.asset.FileArchive("lambda.zip"),
    name: "firehose_lambda_processor",
    role: lambdaIam.arn,
    handler: "exports.handler",
    runtime: aws.lambda.Runtime.NodeJS20dX,
});
const extendedS3Stream = new aws.kinesis.FirehoseDeliveryStream("extended_s3_stream", {
    name: "kinesis-firehose-extended-s3-test-stream",
    destination: "extended_s3",
    extendedS3Configuration: {
        roleArn: firehoseRole.arn,
        bucketArn: bucket.arn,
        processingConfiguration: {
            enabled: true,
            processors: [{
                type: "Lambda",
                parameters: [{
                    parameterName: "LambdaArn",
                    parameterValue: pulumi.interpolate`${lambdaProcessor.arn}:$LATEST`,
                }],
            }],
        },
    },
});
const bucketAcl = new aws.s3.BucketAcl("bucket_acl", {
    bucket: bucket.id,
    acl: "private",
});
import pulumi
import pulumi_aws as aws

bucket = aws.s3.Bucket("bucket", bucket="tf-test-bucket")
firehose_assume_role = aws.iam.get_policy_document(statements=[{
    "effect": "Allow",
    "principals": [{
        "type": "Service",
        "identifiers": ["firehose.amazonaws.com"],
    }],
    "actions": ["sts:AssumeRole"],
}])
firehose_role = aws.iam.Role("firehose_role",
    name="firehose_test_role",
    assume_role_policy=firehose_assume_role.json)
lambda_assume_role = aws.iam.get_policy_document(statements=[{
    "effect": "Allow",
    "principals": [{
        "type": "Service",
        "identifiers": ["lambda.amazonaws.com"],
    }],
    "actions": ["sts:AssumeRole"],
}])
lambda_iam = aws.iam.Role("lambda_iam",
    name="lambda_iam",
    assume_role_policy=lambda_assume_role.json)
lambda_processor = aws.lambda_.Function("lambda_processor",
    code=pulumi.FileArchive("lambda.zip"),
    name="firehose_lambda_processor",
    role=lambda_iam.arn,
    handler="exports.handler",
    runtime=aws.lambda_.Runtime.NODE_JS20D_X)
extended_s3_stream = aws.kinesis.FirehoseDeliveryStream("extended_s3_stream",
    name="kinesis-firehose-extended-s3-test-stream",
    destination="extended_s3",
    extended_s3_configuration={
        "role_arn": firehose_role.arn,
        "bucket_arn": bucket.arn,
        "processing_configuration": {
            "enabled": True,
            "processors": [{
                "type": "Lambda",
                "parameters": [{
                    "parameter_name": "LambdaArn",
                    "parameter_value": lambda_processor.arn.apply(lambda arn: f"{arn}:$LATEST"),
                }],
            }],
        },
    })
bucket_acl = aws.s3.BucketAcl("bucket_acl",
    bucket=bucket.id,
    acl="private")
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		bucket, err := s3.NewBucket(ctx, "bucket", &s3.BucketArgs{
			Bucket: pulumi.String("tf-test-bucket"),
		})
		if err != nil {
			return err
		}
		firehoseAssumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
			Statements: []iam.GetPolicyDocumentStatement{
				{
					Effect: pulumi.StringRef("Allow"),
					Principals: []iam.GetPolicyDocumentStatementPrincipal{
						{
							Type: "Service",
							Identifiers: []string{
								"firehose.amazonaws.com",
							},
						},
					},
					Actions: []string{
						"sts:AssumeRole",
					},
				},
			},
		}, nil)
		if err != nil {
			return err
		}
		firehoseRole, err := iam.NewRole(ctx, "firehose_role", &iam.RoleArgs{
			Name:             pulumi.String("firehose_test_role"),
			AssumeRolePolicy: pulumi.String(firehoseAssumeRole.Json),
		})
		if err != nil {
			return err
		}
		lambdaAssumeRole, err := iam.GetPolicyDocument(ctx, &iam.GetPolicyDocumentArgs{
			Statements: []iam.GetPolicyDocumentStatement{
				{
					Effect: pulumi.StringRef("Allow"),
					Principals: []iam.GetPolicyDocumentStatementPrincipal{
						{
							Type: "Service",
							Identifiers: []string{
								"lambda.amazonaws.com",
							},
						},
					},
					Actions: []string{
						"sts:AssumeRole",
					},
				},
			},
		}, nil)
		if err != nil {
			return err
		}
		lambdaIam, err := iam.NewRole(ctx, "lambda_iam", &iam.RoleArgs{
			Name:             pulumi.String("lambda_iam"),
			AssumeRolePolicy: pulumi.String(lambdaAssumeRole.Json),
		})
		if err != nil {
			return err
		}
		lambdaProcessor, err := lambda.NewFunction(ctx, "lambda_processor", &lambda.FunctionArgs{
			Code:    pulumi.NewFileArchive("lambda.zip"),
			Name:    pulumi.String("firehose_lambda_processor"),
			Role:    lambdaIam.Arn,
			Handler: pulumi.String("exports.handler"),
			Runtime: pulumi.String(lambda.RuntimeNodeJS20dX),
		})
		if err != nil {
			return err
		}
		_, err = kinesis.NewFirehoseDeliveryStream(ctx, "extended_s3_stream", &kinesis.FirehoseDeliveryStreamArgs{
			Name:        pulumi.String("kinesis-firehose-extended-s3-test-stream"),
			Destination: pulumi.String("extended_s3"),
			ExtendedS3Configuration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationArgs{
				RoleArn:   firehoseRole.Arn,
				BucketArn: bucket.Arn,
				ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs{
					Enabled: pulumi.Bool(true),
					Processors: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArray{
						&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
							Type: pulumi.String("Lambda"),
							Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
								&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
									ParameterName: pulumi.String("LambdaArn"),
									ParameterValue: lambdaProcessor.Arn.ApplyT(func(arn string) (string, error) {
										return fmt.Sprintf("%v:$LATEST", arn), nil
									}).(pulumi.StringOutput),
								},
							},
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		_, err = s3.NewBucketAcl(ctx, "bucket_acl", &s3.BucketAclArgs{
			Bucket: bucket.ID(),
			Acl:    pulumi.String("private"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var bucket = new Aws.S3.Bucket("bucket", new()
    {
        BucketName = "tf-test-bucket",
    });

    var firehoseAssumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
    {
        Statements = new[]
        {
            new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
            {
                Effect = "Allow",
                Principals = new[]
                {
                    new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
                    {
                        Type = "Service",
                        Identifiers = new[]
                        {
                            "firehose.amazonaws.com",
                        },
                    },
                },
                Actions = new[]
                {
                    "sts:AssumeRole",
                },
            },
        },
    });

    var firehoseRole = new Aws.Iam.Role("firehose_role", new()
    {
        Name = "firehose_test_role",
        AssumeRolePolicy = firehoseAssumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
    });

    var lambdaAssumeRole = Aws.Iam.GetPolicyDocument.Invoke(new()
    {
        Statements = new[]
        {
            new Aws.Iam.Inputs.GetPolicyDocumentStatementInputArgs
            {
                Effect = "Allow",
                Principals = new[]
                {
                    new Aws.Iam.Inputs.GetPolicyDocumentStatementPrincipalInputArgs
                    {
                        Type = "Service",
                        Identifiers = new[]
                        {
                            "lambda.amazonaws.com",
                        },
                    },
                },
                Actions = new[]
                {
                    "sts:AssumeRole",
                },
            },
        },
    });

    var lambdaIam = new Aws.Iam.Role("lambda_iam", new()
    {
        Name = "lambda_iam",
        AssumeRolePolicy = lambdaAssumeRole.Apply(getPolicyDocumentResult => getPolicyDocumentResult.Json),
    });

    var lambdaProcessor = new Aws.Lambda.Function("lambda_processor", new()
    {
        Code = new FileArchive("lambda.zip"),
        Name = "firehose_lambda_processor",
        Role = lambdaIam.Arn,
        Handler = "exports.handler",
        Runtime = Aws.Lambda.Runtime.NodeJS20dX,
    });

    var extendedS3Stream = new Aws.Kinesis.FirehoseDeliveryStream("extended_s3_stream", new()
    {
        Name = "kinesis-firehose-extended-s3-test-stream",
        Destination = "extended_s3",
        ExtendedS3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs
        {
            RoleArn = firehoseRole.Arn,
            BucketArn = bucket.Arn,
            ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs
            {
                Enabled = true,
                Processors = new[]
                {
                    new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
                    {
                        Type = "Lambda",
                        Parameters = new[]
                        {
                            new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
                            {
                                ParameterName = "LambdaArn",
                                ParameterValue = lambdaProcessor.Arn.Apply(arn => $"{arn}:$LATEST"),
                            },
                        },
                    },
                },
            },
        },
    });

    var bucketAcl = new Aws.S3.BucketAcl("bucket_acl", new()
    {
        Bucket = bucket.Id,
        Acl = "private",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.iam.IamFunctions;
import com.pulumi.aws.iam.inputs.GetPolicyDocumentArgs;
import com.pulumi.aws.iam.Role;
import com.pulumi.aws.iam.RoleArgs;
import com.pulumi.aws.lambda.Function;
import com.pulumi.aws.lambda.FunctionArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import com.pulumi.aws.s3.BucketAcl;
import com.pulumi.aws.s3.BucketAclArgs;
import com.pulumi.asset.FileArchive;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var bucket = new Bucket("bucket", BucketArgs.builder()
            .bucket("tf-test-bucket")
            .build());

        final var firehoseAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
            .statements(GetPolicyDocumentStatementArgs.builder()
                .effect("Allow")
                .principals(GetPolicyDocumentStatementPrincipalArgs.builder()
                    .type("Service")
                    .identifiers("firehose.amazonaws.com")
                    .build())
                .actions("sts:AssumeRole")
                .build())
            .build());

        var firehoseRole = new Role("firehoseRole", RoleArgs.builder()
            .name("firehose_test_role")
            .assumeRolePolicy(firehoseAssumeRole.json())
            .build());

        final var lambdaAssumeRole = IamFunctions.getPolicyDocument(GetPolicyDocumentArgs.builder()
            .statements(GetPolicyDocumentStatementArgs.builder()
                .effect("Allow")
                .principals(GetPolicyDocumentStatementPrincipalArgs.builder()
                    .type("Service")
                    .identifiers("lambda.amazonaws.com")
                    .build())
                .actions("sts:AssumeRole")
                .build())
            .build());

        var lambdaIam = new Role("lambdaIam", RoleArgs.builder()
            .name("lambda_iam")
            .assumeRolePolicy(lambdaAssumeRole.json())
            .build());

        var lambdaProcessor = new Function("lambdaProcessor", FunctionArgs.builder()
            .code(new FileArchive("lambda.zip"))
            .name("firehose_lambda_processor")
            .role(lambdaIam.arn())
            .handler("exports.handler")
            .runtime("nodejs20.x")
            .build());

        var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
            .name("kinesis-firehose-extended-s3-test-stream")
            .destination("extended_s3")
            .extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
                .roleArn(firehoseRole.arn())
                .bucketArn(bucket.arn())
                .processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
                    .enabled(true)
                    .processors(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
                        .type("Lambda")
                        .parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
                            .parameterName("LambdaArn")
                            .parameterValue(lambdaProcessor.arn().applyValue(_arn -> String.format("%s:$LATEST", _arn)))
                            .build())
                        .build())
                    .build())
                .build())
            .build());

        var bucketAcl = new BucketAcl("bucketAcl", BucketAclArgs.builder()
            .bucket(bucket.id())
            .acl("private")
            .build());

    }
}
resources:
  extendedS3Stream:
    type: aws:kinesis:FirehoseDeliveryStream
    name: extended_s3_stream
    properties:
      name: kinesis-firehose-extended-s3-test-stream
      destination: extended_s3
      extendedS3Configuration:
        roleArn: ${firehoseRole.arn}
        bucketArn: ${bucket.arn}
        processingConfiguration:
          enabled: 'true'
          processors:
            - type: Lambda
              parameters:
                - parameterName: LambdaArn
                  parameterValue: ${lambdaProcessor.arn}:$LATEST
  bucket:
    type: aws:s3:Bucket
    properties:
      bucket: tf-test-bucket
  bucketAcl:
    type: aws:s3:BucketAcl
    name: bucket_acl
    properties:
      bucket: ${bucket.id}
      acl: private
  firehoseRole:
    type: aws:iam:Role
    name: firehose_role
    properties:
      name: firehose_test_role
      assumeRolePolicy: ${firehoseAssumeRole.json}
  lambdaIam:
    type: aws:iam:Role
    name: lambda_iam
    properties:
      name: lambda_iam
      assumeRolePolicy: ${lambdaAssumeRole.json}
  lambdaProcessor:
    type: aws:lambda:Function
    name: lambda_processor
    properties:
      code:
        fn::FileArchive: lambda.zip
      name: firehose_lambda_processor
      role: ${lambdaIam.arn}
      handler: exports.handler
      runtime: nodejs20.x
variables:
  firehoseAssumeRole:
    fn::invoke:
      function: aws:iam:getPolicyDocument
      arguments:
        statements:
          - effect: Allow
            principals:
              - type: Service
                identifiers:
                  - firehose.amazonaws.com
            actions:
              - sts:AssumeRole
  lambdaAssumeRole:
    fn::invoke:
      function: aws:iam:getPolicyDocument
      arguments:
        statements:
          - effect: Allow
            principals:
              - type: Service
                identifiers:
                  - lambda.amazonaws.com
            actions:
              - sts:AssumeRole

When records arrive, Firehose invokes your Lambda function with batches of data. The function returns transformed records, which Firehose then writes to the S3 bucket. The processingConfiguration enables Lambda-based transformation, and the bucketArn and roleArn specify where data lands and what permissions Firehose uses. The Lambda function must return records in the format Firehose expects, with a status field indicating success or failure for each record.

Partition S3 data by extracted fields

Applications often need to organize S3 data by customer ID, timestamp, or other dimensions to optimize query performance and reduce costs. Dynamic partitioning extracts fields from records and uses them to construct S3 prefixes automatically.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const extendedS3Stream = new aws.kinesis.FirehoseDeliveryStream("extended_s3_stream", {
    name: "kinesis-firehose-extended-s3-test-stream",
    destination: "extended_s3",
    extendedS3Configuration: {
        roleArn: firehoseRole.arn,
        bucketArn: bucket.arn,
        bufferingSize: 64,
        dynamicPartitioningConfiguration: {
            enabled: true,
        },
        prefix: "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
        errorOutputPrefix: "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
        processingConfiguration: {
            enabled: true,
            processors: [
                {
                    type: "RecordDeAggregation",
                    parameters: [{
                        parameterName: "SubRecordType",
                        parameterValue: "JSON",
                    }],
                },
                {
                    type: "AppendDelimiterToRecord",
                },
                {
                    type: "MetadataExtraction",
                    parameters: [
                        {
                            parameterName: "JsonParsingEngine",
                            parameterValue: "JQ-1.6",
                        },
                        {
                            parameterName: "MetadataExtractionQuery",
                            parameterValue: "{customer_id:.customer_id}",
                        },
                    ],
                },
            ],
        },
    },
});
import pulumi
import pulumi_aws as aws

extended_s3_stream = aws.kinesis.FirehoseDeliveryStream("extended_s3_stream",
    name="kinesis-firehose-extended-s3-test-stream",
    destination="extended_s3",
    extended_s3_configuration={
        "role_arn": firehose_role["arn"],
        "bucket_arn": bucket["arn"],
        "buffering_size": 64,
        "dynamic_partitioning_configuration": {
            "enabled": True,
        },
        "prefix": "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
        "error_output_prefix": "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
        "processing_configuration": {
            "enabled": True,
            "processors": [
                {
                    "type": "RecordDeAggregation",
                    "parameters": [{
                        "parameter_name": "SubRecordType",
                        "parameter_value": "JSON",
                    }],
                },
                {
                    "type": "AppendDelimiterToRecord",
                },
                {
                    "type": "MetadataExtraction",
                    "parameters": [
                        {
                            "parameter_name": "JsonParsingEngine",
                            "parameter_value": "JQ-1.6",
                        },
                        {
                            "parameter_name": "MetadataExtractionQuery",
                            "parameter_value": "{customer_id:.customer_id}",
                        },
                    ],
                },
            ],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := kinesis.NewFirehoseDeliveryStream(ctx, "extended_s3_stream", &kinesis.FirehoseDeliveryStreamArgs{
			Name:        pulumi.String("kinesis-firehose-extended-s3-test-stream"),
			Destination: pulumi.String("extended_s3"),
			ExtendedS3Configuration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationArgs{
				RoleArn:       pulumi.Any(firehoseRole.Arn),
				BucketArn:     pulumi.Any(bucket.Arn),
				BufferingSize: pulumi.Int(64),
				DynamicPartitioningConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs{
					Enabled: pulumi.Bool(true),
				},
				Prefix:            pulumi.String("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"),
				ErrorOutputPrefix: pulumi.String("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/"),
				ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs{
					Enabled: pulumi.Bool(true),
					Processors: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArray{
						&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
							Type: pulumi.String("RecordDeAggregation"),
							Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
								&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
									ParameterName:  pulumi.String("SubRecordType"),
									ParameterValue: pulumi.String("JSON"),
								},
							},
						},
						&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
							Type: pulumi.String("AppendDelimiterToRecord"),
						},
						&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs{
							Type: pulumi.String("MetadataExtraction"),
							Parameters: kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArray{
								&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
									ParameterName:  pulumi.String("JsonParsingEngine"),
									ParameterValue: pulumi.String("JQ-1.6"),
								},
								&kinesis.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs{
									ParameterName:  pulumi.String("MetadataExtractionQuery"),
									ParameterValue: pulumi.String("{customer_id:.customer_id}"),
								},
							},
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var extendedS3Stream = new Aws.Kinesis.FirehoseDeliveryStream("extended_s3_stream", new()
    {
        Name = "kinesis-firehose-extended-s3-test-stream",
        Destination = "extended_s3",
        ExtendedS3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs
        {
            RoleArn = firehoseRole.Arn,
            BucketArn = bucket.Arn,
            BufferingSize = 64,
            DynamicPartitioningConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs
            {
                Enabled = true,
            },
            Prefix = "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
            ErrorOutputPrefix = "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/",
            ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs
            {
                Enabled = true,
                Processors = new[]
                {
                    new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
                    {
                        Type = "RecordDeAggregation",
                        Parameters = new[]
                        {
                            new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
                            {
                                ParameterName = "SubRecordType",
                                ParameterValue = "JSON",
                            },
                        },
                    },
                    new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
                    {
                        Type = "AppendDelimiterToRecord",
                    },
                    new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs
                    {
                        Type = "MetadataExtraction",
                        Parameters = new[]
                        {
                            new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
                            {
                                ParameterName = "JsonParsingEngine",
                                ParameterValue = "JQ-1.6",
                            },
                            new Aws.Kinesis.Inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs
                            {
                                ParameterName = "MetadataExtractionQuery",
                                ParameterValue = "{customer_id:.customer_id}",
                            },
                        },
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var extendedS3Stream = new FirehoseDeliveryStream("extendedS3Stream", FirehoseDeliveryStreamArgs.builder()
            .name("kinesis-firehose-extended-s3-test-stream")
            .destination("extended_s3")
            .extendedS3Configuration(FirehoseDeliveryStreamExtendedS3ConfigurationArgs.builder()
                .roleArn(firehoseRole.arn())
                .bucketArn(bucket.arn())
                .bufferingSize(64)
                .dynamicPartitioningConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationDynamicPartitioningConfigurationArgs.builder()
                    .enabled(true)
                    .build())
                .prefix("data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/")
                .errorOutputPrefix("errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/")
                .processingConfiguration(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationArgs.builder()
                    .enabled(true)
                    .processors(                    
                        FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
                            .type("RecordDeAggregation")
                            .parameters(FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
                                .parameterName("SubRecordType")
                                .parameterValue("JSON")
                                .build())
                            .build(),
                        FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
                            .type("AppendDelimiterToRecord")
                            .build(),
                        FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorArgs.builder()
                            .type("MetadataExtraction")
                            .parameters(                            
                                FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
                                    .parameterName("JsonParsingEngine")
                                    .parameterValue("JQ-1.6")
                                    .build(),
                                FirehoseDeliveryStreamExtendedS3ConfigurationProcessingConfigurationProcessorParameterArgs.builder()
                                    .parameterName("MetadataExtractionQuery")
                                    .parameterValue("{customer_id:.customer_id}")
                                    .build())
                            .build())
                    .build())
                .build())
            .build());

    }
}
resources:
  extendedS3Stream:
    type: aws:kinesis:FirehoseDeliveryStream
    name: extended_s3_stream
    properties:
      name: kinesis-firehose-extended-s3-test-stream
      destination: extended_s3
      extendedS3Configuration:
        roleArn: ${firehoseRole.arn}
        bucketArn: ${bucket.arn}
        bufferingSize: 64
        dynamicPartitioningConfiguration:
          enabled: 'true'
        prefix: data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
        errorOutputPrefix: errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/
        processingConfiguration:
          enabled: 'true'
          processors:
            - type: RecordDeAggregation
              parameters:
                - parameterName: SubRecordType
                  parameterValue: JSON
            - type: AppendDelimiterToRecord
            - type: MetadataExtraction
              parameters:
                - parameterName: JsonParsingEngine
                  parameterValue: JQ-1.6
                - parameterName: MetadataExtractionQuery
                  parameterValue: '{customer_id:.customer_id}'

The dynamicPartitioningConfiguration enables partitioning without Lambda. Firehose uses built-in processors to extract metadata from JSON records. The MetadataExtraction processor uses a JQ query (in this case, {customer_id:.customer_id}) to pull the customer_id field from each record. The prefix property uses template syntax like !{partitionKeyFromQuery:customer_id} to construct S3 paths dynamically. Failed records go to the errorOutputPrefix location for debugging.

Load data into Redshift via S3 staging

Data warehouses like Redshift require batch loading from S3. Firehose buffers records, writes them to S3, and then issues COPY commands to load data into Redshift tables.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const testCluster = new aws.redshift.Cluster("test_cluster", {
    clusterIdentifier: "tf-redshift-cluster",
    databaseName: "test",
    masterUsername: "testuser",
    masterPassword: "T3stPass",
    nodeType: "dc1.large",
    clusterType: "single-node",
});
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
    name: "kinesis-firehose-test-stream",
    destination: "redshift",
    redshiftConfiguration: {
        roleArn: firehoseRole.arn,
        clusterJdbcurl: pulumi.interpolate`jdbc:redshift://${testCluster.endpoint}/${testCluster.databaseName}`,
        username: "testuser",
        password: "T3stPass",
        dataTableName: "test-table",
        copyOptions: "delimiter '|'",
        dataTableColumns: "test-col",
        s3BackupMode: "Enabled",
        s3Configuration: {
            roleArn: firehoseRole.arn,
            bucketArn: bucket.arn,
            bufferingSize: 10,
            bufferingInterval: 400,
            compressionFormat: "GZIP",
        },
        s3BackupConfiguration: {
            roleArn: firehoseRole.arn,
            bucketArn: bucket.arn,
            bufferingSize: 15,
            bufferingInterval: 300,
            compressionFormat: "GZIP",
        },
    },
});
import pulumi
import pulumi_aws as aws

test_cluster = aws.redshift.Cluster("test_cluster",
    cluster_identifier="tf-redshift-cluster",
    database_name="test",
    master_username="testuser",
    master_password="T3stPass",
    node_type="dc1.large",
    cluster_type="single-node")
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
    name="kinesis-firehose-test-stream",
    destination="redshift",
    redshift_configuration={
        "role_arn": firehose_role["arn"],
        "cluster_jdbcurl": pulumi.Output.all(
            endpoint=test_cluster.endpoint,
            database_name=test_cluster.database_name
).apply(lambda resolved_outputs: f"jdbc:redshift://{resolved_outputs['endpoint']}/{resolved_outputs['database_name']}")
,
        "username": "testuser",
        "password": "T3stPass",
        "data_table_name": "test-table",
        "copy_options": "delimiter '|'",
        "data_table_columns": "test-col",
        "s3_backup_mode": "Enabled",
        "s3_configuration": {
            "role_arn": firehose_role["arn"],
            "bucket_arn": bucket["arn"],
            "buffering_size": 10,
            "buffering_interval": 400,
            "compression_format": "GZIP",
        },
        "s3_backup_configuration": {
            "role_arn": firehose_role["arn"],
            "bucket_arn": bucket["arn"],
            "buffering_size": 15,
            "buffering_interval": 300,
            "compression_format": "GZIP",
        },
    })
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/redshift"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		testCluster, err := redshift.NewCluster(ctx, "test_cluster", &redshift.ClusterArgs{
			ClusterIdentifier: pulumi.String("tf-redshift-cluster"),
			DatabaseName:      pulumi.String("test"),
			MasterUsername:    pulumi.String("testuser"),
			MasterPassword:    pulumi.String("T3stPass"),
			NodeType:          pulumi.String("dc1.large"),
			ClusterType:       pulumi.String("single-node"),
		})
		if err != nil {
			return err
		}
		_, err = kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
			Name:        pulumi.String("kinesis-firehose-test-stream"),
			Destination: pulumi.String("redshift"),
			RedshiftConfiguration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationArgs{
				RoleArn: pulumi.Any(firehoseRole.Arn),
				ClusterJdbcurl: pulumi.All(testCluster.Endpoint, testCluster.DatabaseName).ApplyT(func(_args []interface{}) (string, error) {
					endpoint := _args[0].(string)
					databaseName := _args[1].(string)
					return fmt.Sprintf("jdbc:redshift://%v/%v", endpoint, databaseName), nil
				}).(pulumi.StringOutput),
				Username:         pulumi.String("testuser"),
				Password:         pulumi.String("T3stPass"),
				DataTableName:    pulumi.String("test-table"),
				CopyOptions:      pulumi.String("delimiter '|'"),
				DataTableColumns: pulumi.String("test-col"),
				S3BackupMode:     pulumi.String("Enabled"),
				S3Configuration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs{
					RoleArn:           pulumi.Any(firehoseRole.Arn),
					BucketArn:         pulumi.Any(bucket.Arn),
					BufferingSize:     pulumi.Int(10),
					BufferingInterval: pulumi.Int(400),
					CompressionFormat: pulumi.String("GZIP"),
				},
				S3BackupConfiguration: &kinesis.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs{
					RoleArn:           pulumi.Any(firehoseRole.Arn),
					BucketArn:         pulumi.Any(bucket.Arn),
					BufferingSize:     pulumi.Int(15),
					BufferingInterval: pulumi.Int(300),
					CompressionFormat: pulumi.String("GZIP"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var testCluster = new Aws.RedShift.Cluster("test_cluster", new()
    {
        ClusterIdentifier = "tf-redshift-cluster",
        DatabaseName = "test",
        MasterUsername = "testuser",
        MasterPassword = "T3stPass",
        NodeType = "dc1.large",
        ClusterType = "single-node",
    });

    var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
    {
        Name = "kinesis-firehose-test-stream",
        Destination = "redshift",
        RedshiftConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs
        {
            RoleArn = firehoseRole.Arn,
            ClusterJdbcurl = Output.Tuple(testCluster.Endpoint, testCluster.DatabaseName).Apply(values =>
            {
                var endpoint = values.Item1;
                var databaseName = values.Item2;
                return $"jdbc:redshift://{endpoint}/{databaseName}";
            }),
            Username = "testuser",
            Password = "T3stPass",
            DataTableName = "test-table",
            CopyOptions = "delimiter '|'",
            DataTableColumns = "test-col",
            S3BackupMode = "Enabled",
            S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs
            {
                RoleArn = firehoseRole.Arn,
                BucketArn = bucket.Arn,
                BufferingSize = 10,
                BufferingInterval = 400,
                CompressionFormat = "GZIP",
            },
            S3BackupConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs
            {
                RoleArn = firehoseRole.Arn,
                BucketArn = bucket.Arn,
                BufferingSize = 15,
                BufferingInterval = 300,
                CompressionFormat = "GZIP",
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.redshift.Cluster;
import com.pulumi.aws.redshift.ClusterArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var testCluster = new Cluster("testCluster", ClusterArgs.builder()
            .clusterIdentifier("tf-redshift-cluster")
            .databaseName("test")
            .masterUsername("testuser")
            .masterPassword("T3stPass")
            .nodeType("dc1.large")
            .clusterType("single-node")
            .build());

        var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
            .name("kinesis-firehose-test-stream")
            .destination("redshift")
            .redshiftConfiguration(FirehoseDeliveryStreamRedshiftConfigurationArgs.builder()
                .roleArn(firehoseRole.arn())
                .clusterJdbcurl(Output.tuple(testCluster.endpoint(), testCluster.databaseName()).applyValue(values -> {
                    var endpoint = values.t1;
                    var databaseName = values.t2;
                    return String.format("jdbc:redshift://%s/%s", endpoint,databaseName);
                }))
                .username("testuser")
                .password("T3stPass")
                .dataTableName("test-table")
                .copyOptions("delimiter '|'")
                .dataTableColumns("test-col")
                .s3BackupMode("Enabled")
                .s3Configuration(FirehoseDeliveryStreamRedshiftConfigurationS3ConfigurationArgs.builder()
                    .roleArn(firehoseRole.arn())
                    .bucketArn(bucket.arn())
                    .bufferingSize(10)
                    .bufferingInterval(400)
                    .compressionFormat("GZIP")
                    .build())
                .s3BackupConfiguration(FirehoseDeliveryStreamRedshiftConfigurationS3BackupConfigurationArgs.builder()
                    .roleArn(firehoseRole.arn())
                    .bucketArn(bucket.arn())
                    .bufferingSize(15)
                    .bufferingInterval(300)
                    .compressionFormat("GZIP")
                    .build())
                .build())
            .build());

    }
}
resources:
  testCluster:
    type: aws:redshift:Cluster
    name: test_cluster
    properties:
      clusterIdentifier: tf-redshift-cluster
      databaseName: test
      masterUsername: testuser
      masterPassword: T3stPass
      nodeType: dc1.large
      clusterType: single-node
  testStream:
    type: aws:kinesis:FirehoseDeliveryStream
    name: test_stream
    properties:
      name: kinesis-firehose-test-stream
      destination: redshift
      redshiftConfiguration:
        roleArn: ${firehoseRole.arn}
        clusterJdbcurl: jdbc:redshift://${testCluster.endpoint}/${testCluster.databaseName}
        username: testuser
        password: T3stPass
        dataTableName: test-table
        copyOptions: delimiter '|'
        dataTableColumns: test-col
        s3BackupMode: Enabled
        s3Configuration:
          roleArn: ${firehoseRole.arn}
          bucketArn: ${bucket.arn}
          bufferingSize: 10
          bufferingInterval: 400
          compressionFormat: GZIP
        s3BackupConfiguration:
          roleArn: ${firehoseRole.arn}
          bucketArn: ${bucket.arn}
          bufferingSize: 15
          bufferingInterval: 300
          compressionFormat: GZIP

Firehose first writes records to S3, then executes a Redshift COPY command to load them into the table specified by dataTableName. The clusterJdbcurl points to your Redshift cluster, and copyOptions passes parameters to the COPY command (like delimiter characters). The s3BackupMode controls whether all records or only failed ones are kept in S3 after loading. The s3BackupConfiguration specifies a separate bucket for backup data.

Index records in OpenSearch with Lambda processing

Search and analytics workloads often index streaming data into OpenSearch for real-time queries. Firehose can transform records via Lambda before indexing them.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const testCluster = new aws.opensearch.Domain("test_cluster", {domainName: "firehose-os-test"});
const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
    name: "kinesis-firehose-test-stream",
    destination: "opensearch",
    opensearchConfiguration: {
        domainArn: testCluster.arn,
        roleArn: firehoseRole.arn,
        indexName: "test",
        s3Configuration: {
            roleArn: firehoseRole.arn,
            bucketArn: bucket.arn,
            bufferingSize: 10,
            bufferingInterval: 400,
            compressionFormat: "GZIP",
        },
        processingConfiguration: {
            enabled: true,
            processors: [{
                type: "Lambda",
                parameters: [{
                    parameterName: "LambdaArn",
                    parameterValue: `${lambdaProcessor.arn}:$LATEST`,
                }],
            }],
        },
    },
});
import pulumi
import pulumi_aws as aws

test_cluster = aws.opensearch.Domain("test_cluster", domain_name="firehose-os-test")
test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
    name="kinesis-firehose-test-stream",
    destination="opensearch",
    opensearch_configuration={
        "domain_arn": test_cluster.arn,
        "role_arn": firehose_role["arn"],
        "index_name": "test",
        "s3_configuration": {
            "role_arn": firehose_role["arn"],
            "bucket_arn": bucket["arn"],
            "buffering_size": 10,
            "buffering_interval": 400,
            "compression_format": "GZIP",
        },
        "processing_configuration": {
            "enabled": True,
            "processors": [{
                "type": "Lambda",
                "parameters": [{
                    "parameter_name": "LambdaArn",
                    "parameter_value": f"{lambda_processor['arn']}:$LATEST",
                }],
            }],
        },
    })
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/opensearch"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		testCluster, err := opensearch.NewDomain(ctx, "test_cluster", &opensearch.DomainArgs{
			DomainName: pulumi.String("firehose-os-test"),
		})
		if err != nil {
			return err
		}
		_, err = kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
			Name:        pulumi.String("kinesis-firehose-test-stream"),
			Destination: pulumi.String("opensearch"),
			OpensearchConfiguration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationArgs{
				DomainArn: testCluster.Arn,
				RoleArn:   pulumi.Any(firehoseRole.Arn),
				IndexName: pulumi.String("test"),
				S3Configuration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs{
					RoleArn:           pulumi.Any(firehoseRole.Arn),
					BucketArn:         pulumi.Any(bucket.Arn),
					BufferingSize:     pulumi.Int(10),
					BufferingInterval: pulumi.Int(400),
					CompressionFormat: pulumi.String("GZIP"),
				},
				ProcessingConfiguration: &kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs{
					Enabled: pulumi.Bool(true),
					Processors: kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArray{
						&kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs{
							Type: pulumi.String("Lambda"),
							Parameters: kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArray{
								&kinesis.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs{
									ParameterName:  pulumi.String("LambdaArn"),
									ParameterValue: pulumi.Sprintf("%v:$LATEST", lambdaProcessor.Arn),
								},
							},
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var testCluster = new Aws.OpenSearch.Domain("test_cluster", new()
    {
        DomainName = "firehose-os-test",
    });

    var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
    {
        Name = "kinesis-firehose-test-stream",
        Destination = "opensearch",
        OpensearchConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs
        {
            DomainArn = testCluster.Arn,
            RoleArn = firehoseRole.Arn,
            IndexName = "test",
            S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs
            {
                RoleArn = firehoseRole.Arn,
                BucketArn = bucket.Arn,
                BufferingSize = 10,
                BufferingInterval = 400,
                CompressionFormat = "GZIP",
            },
            ProcessingConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs
            {
                Enabled = true,
                Processors = new[]
                {
                    new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs
                    {
                        Type = "Lambda",
                        Parameters = new[]
                        {
                            new Aws.Kinesis.Inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs
                            {
                                ParameterName = "LambdaArn",
                                ParameterValue = $"{lambdaProcessor.Arn}:$LATEST",
                            },
                        },
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.opensearch.Domain;
import com.pulumi.aws.opensearch.DomainArgs;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var testCluster = new Domain("testCluster", DomainArgs.builder()
            .domainName("firehose-os-test")
            .build());

        var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
            .name("kinesis-firehose-test-stream")
            .destination("opensearch")
            .opensearchConfiguration(FirehoseDeliveryStreamOpensearchConfigurationArgs.builder()
                .domainArn(testCluster.arn())
                .roleArn(firehoseRole.arn())
                .indexName("test")
                .s3Configuration(FirehoseDeliveryStreamOpensearchConfigurationS3ConfigurationArgs.builder()
                    .roleArn(firehoseRole.arn())
                    .bucketArn(bucket.arn())
                    .bufferingSize(10)
                    .bufferingInterval(400)
                    .compressionFormat("GZIP")
                    .build())
                .processingConfiguration(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationArgs.builder()
                    .enabled(true)
                    .processors(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorArgs.builder()
                        .type("Lambda")
                        .parameters(FirehoseDeliveryStreamOpensearchConfigurationProcessingConfigurationProcessorParameterArgs.builder()
                            .parameterName("LambdaArn")
                            .parameterValue(String.format("%s:$LATEST", lambdaProcessor.arn()))
                            .build())
                        .build())
                    .build())
                .build())
            .build());

    }
}
resources:
  testCluster:
    type: aws:opensearch:Domain
    name: test_cluster
    properties:
      domainName: firehose-os-test
  testStream:
    type: aws:kinesis:FirehoseDeliveryStream
    name: test_stream
    properties:
      name: kinesis-firehose-test-stream
      destination: opensearch
      opensearchConfiguration:
        domainArn: ${testCluster.arn}
        roleArn: ${firehoseRole.arn}
        indexName: test
        s3Configuration:
          roleArn: ${firehoseRole.arn}
          bucketArn: ${bucket.arn}
          bufferingSize: 10
          bufferingInterval: 400
          compressionFormat: GZIP
        processingConfiguration:
          enabled: 'true'
          processors:
            - type: Lambda
              parameters:
                - parameterName: LambdaArn
                  parameterValue: ${lambdaProcessor.arn}:$LATEST

Records flow through your Lambda function, then Firehose indexes the transformed output into the OpenSearch domain specified by domainArn. The indexName determines which OpenSearch index receives the data. The s3Configuration provides a backup destination for records that fail to index. OpenSearch requires specific document formats, so the Lambda function typically shapes records to match your index mapping.

Send events to Splunk HEC endpoint

Observability platforms like Splunk ingest streaming data via HTTP Event Collector (HEC) endpoints. Firehose delivers batches to HEC and backs up failed events to S3.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
    name: "kinesis-firehose-test-stream",
    destination: "splunk",
    splunkConfiguration: {
        hecEndpoint: "https://http-inputs-mydomain.splunkcloud.com:443",
        hecToken: "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
        hecAcknowledgmentTimeout: 600,
        hecEndpointType: "Event",
        s3BackupMode: "FailedEventsOnly",
        s3Configuration: {
            roleArn: firehose.arn,
            bucketArn: bucket.arn,
            bufferingSize: 10,
            bufferingInterval: 400,
            compressionFormat: "GZIP",
        },
    },
});
import pulumi
import pulumi_aws as aws

test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
    name="kinesis-firehose-test-stream",
    destination="splunk",
    splunk_configuration={
        "hec_endpoint": "https://http-inputs-mydomain.splunkcloud.com:443",
        "hec_token": "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
        "hec_acknowledgment_timeout": 600,
        "hec_endpoint_type": "Event",
        "s3_backup_mode": "FailedEventsOnly",
        "s3_configuration": {
            "role_arn": firehose["arn"],
            "bucket_arn": bucket["arn"],
            "buffering_size": 10,
            "buffering_interval": 400,
            "compression_format": "GZIP",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
			Name:        pulumi.String("kinesis-firehose-test-stream"),
			Destination: pulumi.String("splunk"),
			SplunkConfiguration: &kinesis.FirehoseDeliveryStreamSplunkConfigurationArgs{
				HecEndpoint:              pulumi.String("https://http-inputs-mydomain.splunkcloud.com:443"),
				HecToken:                 pulumi.String("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A"),
				HecAcknowledgmentTimeout: pulumi.Int(600),
				HecEndpointType:          pulumi.String("Event"),
				S3BackupMode:             pulumi.String("FailedEventsOnly"),
				S3Configuration: &kinesis.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs{
					RoleArn:           pulumi.Any(firehose.Arn),
					BucketArn:         pulumi.Any(bucket.Arn),
					BufferingSize:     pulumi.Int(10),
					BufferingInterval: pulumi.Int(400),
					CompressionFormat: pulumi.String("GZIP"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
    {
        Name = "kinesis-firehose-test-stream",
        Destination = "splunk",
        SplunkConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamSplunkConfigurationArgs
        {
            HecEndpoint = "https://http-inputs-mydomain.splunkcloud.com:443",
            HecToken = "51D4DA16-C61B-4F5F-8EC7-ED4301342A4A",
            HecAcknowledgmentTimeout = 600,
            HecEndpointType = "Event",
            S3BackupMode = "FailedEventsOnly",
            S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs
            {
                RoleArn = firehose.Arn,
                BucketArn = bucket.Arn,
                BufferingSize = 10,
                BufferingInterval = 400,
                CompressionFormat = "GZIP",
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
            .name("kinesis-firehose-test-stream")
            .destination("splunk")
            .splunkConfiguration(FirehoseDeliveryStreamSplunkConfigurationArgs.builder()
                .hecEndpoint("https://http-inputs-mydomain.splunkcloud.com:443")
                .hecToken("51D4DA16-C61B-4F5F-8EC7-ED4301342A4A")
                .hecAcknowledgmentTimeout(600)
                .hecEndpointType("Event")
                .s3BackupMode("FailedEventsOnly")
                .s3Configuration(FirehoseDeliveryStreamSplunkConfigurationS3ConfigurationArgs.builder()
                    .roleArn(firehose.arn())
                    .bucketArn(bucket.arn())
                    .bufferingSize(10)
                    .bufferingInterval(400)
                    .compressionFormat("GZIP")
                    .build())
                .build())
            .build());

    }
}
resources:
  testStream:
    type: aws:kinesis:FirehoseDeliveryStream
    name: test_stream
    properties:
      name: kinesis-firehose-test-stream
      destination: splunk
      splunkConfiguration:
        hecEndpoint: https://http-inputs-mydomain.splunkcloud.com:443
        hecToken: 51D4DA16-C61B-4F5F-8EC7-ED4301342A4A
        hecAcknowledgmentTimeout: 600
        hecEndpointType: Event
        s3BackupMode: FailedEventsOnly
        s3Configuration:
          roleArn: ${firehose.arn}
          bucketArn: ${bucket.arn}
          bufferingSize: 10
          bufferingInterval: 400
          compressionFormat: GZIP

Firehose sends batches to the Splunk HEC endpoint specified by hecEndpoint, authenticating with hecToken. The hecAcknowledgmentTimeout controls how long Firehose waits for Splunk to acknowledge receipt. The hecEndpointType determines whether events go to the Event or Raw endpoint. When s3BackupMode is set to FailedEventsOnly, only events that Splunk rejects are written to S3.

Deliver to HTTP endpoints with custom headers

Third-party observability and analytics platforms often provide HTTP endpoints for data ingestion. Firehose can deliver batches with custom headers and request attributes.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const testStream = new aws.kinesis.FirehoseDeliveryStream("test_stream", {
    name: "kinesis-firehose-test-stream",
    destination: "http_endpoint",
    httpEndpointConfiguration: {
        url: "https://aws-api.newrelic.com/firehose/v1",
        name: "New Relic",
        accessKey: "my-key",
        bufferingSize: 15,
        bufferingInterval: 600,
        roleArn: firehose.arn,
        s3BackupMode: "FailedDataOnly",
        s3Configuration: {
            roleArn: firehose.arn,
            bucketArn: bucket.arn,
            bufferingSize: 10,
            bufferingInterval: 400,
            compressionFormat: "GZIP",
        },
        requestConfiguration: {
            contentEncoding: "GZIP",
            commonAttributes: [
                {
                    name: "testname",
                    value: "testvalue",
                },
                {
                    name: "testname2",
                    value: "testvalue2",
                },
            ],
        },
    },
});
import pulumi
import pulumi_aws as aws

test_stream = aws.kinesis.FirehoseDeliveryStream("test_stream",
    name="kinesis-firehose-test-stream",
    destination="http_endpoint",
    http_endpoint_configuration={
        "url": "https://aws-api.newrelic.com/firehose/v1",
        "name": "New Relic",
        "access_key": "my-key",
        "buffering_size": 15,
        "buffering_interval": 600,
        "role_arn": firehose["arn"],
        "s3_backup_mode": "FailedDataOnly",
        "s3_configuration": {
            "role_arn": firehose["arn"],
            "bucket_arn": bucket["arn"],
            "buffering_size": 10,
            "buffering_interval": 400,
            "compression_format": "GZIP",
        },
        "request_configuration": {
            "content_encoding": "GZIP",
            "common_attributes": [
                {
                    "name": "testname",
                    "value": "testvalue",
                },
                {
                    "name": "testname2",
                    "value": "testvalue2",
                },
            ],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesis"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := kinesis.NewFirehoseDeliveryStream(ctx, "test_stream", &kinesis.FirehoseDeliveryStreamArgs{
			Name:        pulumi.String("kinesis-firehose-test-stream"),
			Destination: pulumi.String("http_endpoint"),
			HttpEndpointConfiguration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationArgs{
				Url:               pulumi.String("https://aws-api.newrelic.com/firehose/v1"),
				Name:              pulumi.String("New Relic"),
				AccessKey:         pulumi.String("my-key"),
				BufferingSize:     pulumi.Int(15),
				BufferingInterval: pulumi.Int(600),
				RoleArn:           pulumi.Any(firehose.Arn),
				S3BackupMode:      pulumi.String("FailedDataOnly"),
				S3Configuration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs{
					RoleArn:           pulumi.Any(firehose.Arn),
					BucketArn:         pulumi.Any(bucket.Arn),
					BufferingSize:     pulumi.Int(10),
					BufferingInterval: pulumi.Int(400),
					CompressionFormat: pulumi.String("GZIP"),
				},
				RequestConfiguration: &kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs{
					ContentEncoding: pulumi.String("GZIP"),
					CommonAttributes: kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArray{
						&kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs{
							Name:  pulumi.String("testname"),
							Value: pulumi.String("testvalue"),
						},
						&kinesis.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs{
							Name:  pulumi.String("testname2"),
							Value: pulumi.String("testvalue2"),
						},
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var testStream = new Aws.Kinesis.FirehoseDeliveryStream("test_stream", new()
    {
        Name = "kinesis-firehose-test-stream",
        Destination = "http_endpoint",
        HttpEndpointConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs
        {
            Url = "https://aws-api.newrelic.com/firehose/v1",
            Name = "New Relic",
            AccessKey = "my-key",
            BufferingSize = 15,
            BufferingInterval = 600,
            RoleArn = firehose.Arn,
            S3BackupMode = "FailedDataOnly",
            S3Configuration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs
            {
                RoleArn = firehose.Arn,
                BucketArn = bucket.Arn,
                BufferingSize = 10,
                BufferingInterval = 400,
                CompressionFormat = "GZIP",
            },
            RequestConfiguration = new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs
            {
                ContentEncoding = "GZIP",
                CommonAttributes = new[]
                {
                    new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs
                    {
                        Name = "testname",
                        Value = "testvalue",
                    },
                    new Aws.Kinesis.Inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs
                    {
                        Name = "testname2",
                        Value = "testvalue2",
                    },
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.kinesis.FirehoseDeliveryStream;
import com.pulumi.aws.kinesis.FirehoseDeliveryStreamArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs;
import com.pulumi.aws.kinesis.inputs.FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var testStream = new FirehoseDeliveryStream("testStream", FirehoseDeliveryStreamArgs.builder()
            .name("kinesis-firehose-test-stream")
            .destination("http_endpoint")
            .httpEndpointConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationArgs.builder()
                .url("https://aws-api.newrelic.com/firehose/v1")
                .name("New Relic")
                .accessKey("my-key")
                .bufferingSize(15)
                .bufferingInterval(600)
                .roleArn(firehose.arn())
                .s3BackupMode("FailedDataOnly")
                .s3Configuration(FirehoseDeliveryStreamHttpEndpointConfigurationS3ConfigurationArgs.builder()
                    .roleArn(firehose.arn())
                    .bucketArn(bucket.arn())
                    .bufferingSize(10)
                    .bufferingInterval(400)
                    .compressionFormat("GZIP")
                    .build())
                .requestConfiguration(FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationArgs.builder()
                    .contentEncoding("GZIP")
                    .commonAttributes(                    
                        FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
                            .name("testname")
                            .value("testvalue")
                            .build(),
                        FirehoseDeliveryStreamHttpEndpointConfigurationRequestConfigurationCommonAttributeArgs.builder()
                            .name("testname2")
                            .value("testvalue2")
                            .build())
                    .build())
                .build())
            .build());

    }
}
resources:
  testStream:
    type: aws:kinesis:FirehoseDeliveryStream
    name: test_stream
    properties:
      name: kinesis-firehose-test-stream
      destination: http_endpoint
      httpEndpointConfiguration:
        url: https://aws-api.newrelic.com/firehose/v1
        name: New Relic
        accessKey: my-key
        bufferingSize: 15
        bufferingInterval: 600
        roleArn: ${firehose.arn}
        s3BackupMode: FailedDataOnly
        s3Configuration:
          roleArn: ${firehose.arn}
          bucketArn: ${bucket.arn}
          bufferingSize: 10
          bufferingInterval: 400
          compressionFormat: GZIP
        requestConfiguration:
          contentEncoding: GZIP
          commonAttributes:
            - name: testname
              value: testvalue
            - name: testname2
              value: testvalue2

Firehose POSTs batches to the url with the accessKey for authentication. The requestConfiguration lets you add custom headers via commonAttributes and control contentEncoding. The s3BackupMode determines whether all data or only failed requests are backed up to S3. This configuration works with any HTTP endpoint that accepts JSON payloads, including New Relic, Datadog, and custom webhooks.

Beyond these examples

These snippets focus on specific delivery stream features: S3 delivery with Lambda transformation and dynamic partitioning, data warehouse loading, and search indexing and observability integration. They’re intentionally minimal rather than full data pipeline implementations.

The examples may reference pre-existing infrastructure such as IAM roles with appropriate permissions, S3 buckets, Lambda functions, Redshift clusters, OpenSearch domains, and VPC infrastructure for private endpoint access. They focus on configuring the delivery stream rather than provisioning everything around it.

To keep things focused, common delivery stream patterns are omitted, including:

  • VPC configuration for private endpoint access (vpcConfig)
  • CloudWatch logging and monitoring (cloudwatchLoggingOptions)
  • Retry and buffering tuning (retryOptions, bufferingSize, bufferingInterval)
  • Server-side encryption (serverSideEncryption)
  • Source configuration (kinesisSourceConfiguration, mskSourceConfiguration)
  • Iceberg and Snowflake destinations

These omissions are intentional: the goal is to illustrate how each delivery stream feature is wired, not provide drop-in data pipeline modules. See the Kinesis Firehose Delivery Stream resource reference for all available configuration options.

Let's create AWS Kinesis Firehose Delivery Streams

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Encryption & Sources
Why am I getting errors when enabling encryption with a Kinesis stream source?
Server-side encryption is incompatible with Kinesis stream sources. Don’t configure serverSideEncryption when using kinesisSourceConfiguration.
Can I change the source configuration after creating the stream?
No, both kinesisSourceConfiguration and mskSourceConfiguration are immutable and will force resource replacement if changed.
Destinations & Configuration
What destinations are available, and should I use s3 or extended_s3?
Available destinations include extended_s3, redshift, elasticsearch, opensearch, opensearchserverless, splunk, http_endpoint, snowflake, and iceberg. The s3 destination is deprecated and doesn’t support import, use extended_s3 instead.
Which destinations require an s3Configuration block?
The redshift and http_endpoint destinations require you to specify an s3Configuration block in addition to their destination-specific configuration.
VPC & Networking
How do I configure VPC access for Elasticsearch or OpenSearch destinations?
Configure vpcConfig with subnetIds, securityGroupIds, and roleArn. The IAM role must include EC2 permissions (ec2:DescribeVpcs, ec2:CreateNetworkInterface, ec2:DeleteNetworkInterface, etc.) and domain access (es:*). Use dependsOn to ensure the IAM policy exists before creating the stream.
Dynamic Partitioning
How many dynamic partitioning keys can I use, and how do I specify multiple keys?
You can use up to 50 dynamic partitioning keys. Specify multiple keys by comma-separating them in the MetadataExtractionQuery parameter, for example: {store_id:.store_id,customer_id:.customer_id}.
Immutability & Constraints
What properties can't I change after creating the stream?
The destination, name, kinesisSourceConfiguration, and mskSourceConfiguration properties are immutable and will force resource replacement if modified.
Why is my WAF logging stream failing to create?
When using Firehose for WAF logging, the stream name must be prefixed with aws-waf-logs-. Without this prefix, WAF integration will fail.

Using a different cloud?

Explore analytics guides for other cloud providers: