Deploy AWS Kinesis Data Analytics Applications

The aws:kinesisanalyticsv2/application:Application resource, part of the Pulumi AWS provider, defines Kinesis Data Analytics applications that run Apache Flink or SQL-based stream processing jobs. This guide focuses on three capabilities: Flink JAR deployment from S3, SQL-based stream transformation, and VPC networking for private resource access.

Applications require IAM execution roles with permissions to read from sources and write to destinations. They reference S3 buckets for code artifacts, Kinesis streams for input, and optionally VPC infrastructure for private resource access. The examples are intentionally small. Combine them with your own IAM roles, streams, and networking.

Stream processing workloads often use Apache Flink to transform data in real time without managing servers or clusters.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
    bucket: example.id,
    key: "example-flink-application",
    source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
    name: "example-flink-application",
    runtimeEnvironment: "FLINK-1_8",
    serviceExecutionRole: exampleAwsIamRole.arn,
    applicationConfiguration: {
        applicationCodeConfiguration: {
            codeContent: {
                s3ContentLocation: {
                    bucketArn: example.arn,
                    fileKey: exampleBucketObjectv2.key,
                },
            },
            codeContentType: "ZIPFILE",
        },
        environmentProperties: {
            propertyGroups: [
                {
                    propertyGroupId: "PROPERTY-GROUP-1",
                    propertyMap: {
                        Key1: "Value1",
                    },
                },
                {
                    propertyGroupId: "PROPERTY-GROUP-2",
                    propertyMap: {
                        KeyA: "ValueA",
                        KeyB: "ValueB",
                    },
                },
            ],
        },
        flinkApplicationConfiguration: {
            checkpointConfiguration: {
                configurationType: "DEFAULT",
            },
            monitoringConfiguration: {
                configurationType: "CUSTOM",
                logLevel: "DEBUG",
                metricsLevel: "TASK",
            },
            parallelismConfiguration: {
                autoScalingEnabled: true,
                configurationType: "CUSTOM",
                parallelism: 10,
                parallelismPerKpu: 4,
            },
        },
    },
    tags: {
        Environment: "test",
    },
});
import pulumi
import pulumi_aws as aws

example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
    bucket=example.id,
    key="example-flink-application",
    source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
    name="example-flink-application",
    runtime_environment="FLINK-1_8",
    service_execution_role=example_aws_iam_role["arn"],
    application_configuration={
        "application_code_configuration": {
            "code_content": {
                "s3_content_location": {
                    "bucket_arn": example.arn,
                    "file_key": example_bucket_objectv2.key,
                },
            },
            "code_content_type": "ZIPFILE",
        },
        "environment_properties": {
            "property_groups": [
                {
                    "property_group_id": "PROPERTY-GROUP-1",
                    "property_map": {
                        "Key1": "Value1",
                    },
                },
                {
                    "property_group_id": "PROPERTY-GROUP-2",
                    "property_map": {
                        "KeyA": "ValueA",
                        "KeyB": "ValueB",
                    },
                },
            ],
        },
        "flink_application_configuration": {
            "checkpoint_configuration": {
                "configuration_type": "DEFAULT",
            },
            "monitoring_configuration": {
                "configuration_type": "CUSTOM",
                "log_level": "DEBUG",
                "metrics_level": "TASK",
            },
            "parallelism_configuration": {
                "auto_scaling_enabled": True,
                "configuration_type": "CUSTOM",
                "parallelism": 10,
                "parallelism_per_kpu": 4,
            },
        },
    },
    tags={
        "Environment": "test",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
			Bucket: pulumi.String("example-flink-application"),
		})
		if err != nil {
			return err
		}
		exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
			Bucket: example.ID(),
			Key:    pulumi.String("example-flink-application"),
			Source: pulumi.NewFileAsset("flink-app.jar"),
		})
		if err != nil {
			return err
		}
		_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
			Name:                 pulumi.String("example-flink-application"),
			RuntimeEnvironment:   pulumi.String("FLINK-1_8"),
			ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
			ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
				ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
					CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
						S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
							BucketArn: example.Arn,
							FileKey:   exampleBucketObjectv2.Key,
						},
					},
					CodeContentType: pulumi.String("ZIPFILE"),
				},
				EnvironmentProperties: &kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesArgs{
					PropertyGroups: kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArray{
						&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
							PropertyGroupId: pulumi.String("PROPERTY-GROUP-1"),
							PropertyMap: pulumi.StringMap{
								"Key1": pulumi.String("Value1"),
							},
						},
						&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
							PropertyGroupId: pulumi.String("PROPERTY-GROUP-2"),
							PropertyMap: pulumi.StringMap{
								"KeyA": pulumi.String("ValueA"),
								"KeyB": pulumi.String("ValueB"),
							},
						},
					},
				},
				FlinkApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs{
					CheckpointConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs{
						ConfigurationType: pulumi.String("DEFAULT"),
					},
					MonitoringConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs{
						ConfigurationType: pulumi.String("CUSTOM"),
						LogLevel:          pulumi.String("DEBUG"),
						MetricsLevel:      pulumi.String("TASK"),
					},
					ParallelismConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs{
						AutoScalingEnabled: pulumi.Bool(true),
						ConfigurationType:  pulumi.String("CUSTOM"),
						Parallelism:        pulumi.Int(10),
						ParallelismPerKpu:  pulumi.Int(4),
					},
				},
			},
			Tags: pulumi.StringMap{
				"Environment": pulumi.String("test"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.S3.Bucket("example", new()
    {
        BucketName = "example-flink-application",
    });

    var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
    {
        Bucket = example.Id,
        Key = "example-flink-application",
        Source = new FileAsset("flink-app.jar"),
    });

    var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
    {
        Name = "example-flink-application",
        RuntimeEnvironment = "FLINK-1_8",
        ServiceExecutionRole = exampleAwsIamRole.Arn,
        ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
        {
            ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
            {
                CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
                {
                    S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
                    {
                        BucketArn = example.Arn,
                        FileKey = exampleBucketObjectv2.Key,
                    },
                },
                CodeContentType = "ZIPFILE",
            },
            EnvironmentProperties = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs
            {
                PropertyGroups = new[]
                {
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
                    {
                        PropertyGroupId = "PROPERTY-GROUP-1",
                        PropertyMap = 
                        {
                            { "Key1", "Value1" },
                        },
                    },
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
                    {
                        PropertyGroupId = "PROPERTY-GROUP-2",
                        PropertyMap = 
                        {
                            { "KeyA", "ValueA" },
                            { "KeyB", "ValueB" },
                        },
                    },
                },
            },
            FlinkApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs
            {
                CheckpointConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs
                {
                    ConfigurationType = "DEFAULT",
                },
                MonitoringConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs
                {
                    ConfigurationType = "CUSTOM",
                    LogLevel = "DEBUG",
                    MetricsLevel = "TASK",
                },
                ParallelismConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs
                {
                    AutoScalingEnabled = true,
                    ConfigurationType = "CUSTOM",
                    Parallelism = 10,
                    ParallelismPerKpu = 4,
                },
            },
        },
        Tags = 
        {
            { "Environment", "test" },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Bucket("example", BucketArgs.builder()
            .bucket("example-flink-application")
            .build());

        var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
            .bucket(example.id())
            .key("example-flink-application")
            .source(new FileAsset("flink-app.jar"))
            .build());

        var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
            .name("example-flink-application")
            .runtimeEnvironment("FLINK-1_8")
            .serviceExecutionRole(exampleAwsIamRole.arn())
            .applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
                .applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
                    .codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
                        .s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
                            .bucketArn(example.arn())
                            .fileKey(exampleBucketObjectv2.key())
                            .build())
                        .build())
                    .codeContentType("ZIPFILE")
                    .build())
                .environmentProperties(ApplicationApplicationConfigurationEnvironmentPropertiesArgs.builder()
                    .propertyGroups(                    
                        ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
                            .propertyGroupId("PROPERTY-GROUP-1")
                            .propertyMap(Map.of("Key1", "Value1"))
                            .build(),
                        ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
                            .propertyGroupId("PROPERTY-GROUP-2")
                            .propertyMap(Map.ofEntries(
                                Map.entry("KeyA", "ValueA"),
                                Map.entry("KeyB", "ValueB")
                            ))
                            .build())
                    .build())
                .flinkApplicationConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs.builder()
                    .checkpointConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs.builder()
                        .configurationType("DEFAULT")
                        .build())
                    .monitoringConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs.builder()
                        .configurationType("CUSTOM")
                        .logLevel("DEBUG")
                        .metricsLevel("TASK")
                        .build())
                    .parallelismConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs.builder()
                        .autoScalingEnabled(true)
                        .configurationType("CUSTOM")
                        .parallelism(10)
                        .parallelismPerKpu(4)
                        .build())
                    .build())
                .build())
            .tags(Map.of("Environment", "test"))
            .build());

    }
}
resources:
  example:
    type: aws:s3:Bucket
    properties:
      bucket: example-flink-application
  exampleBucketObjectv2:
    type: aws:s3:BucketObjectv2
    name: example
    properties:
      bucket: ${example.id}
      key: example-flink-application
      source:
        fn::FileAsset: flink-app.jar
  exampleApplication:
    type: aws:kinesisanalyticsv2:Application
    name: example
    properties:
      name: example-flink-application
      runtimeEnvironment: FLINK-1_8
      serviceExecutionRole: ${exampleAwsIamRole.arn}
      applicationConfiguration:
        applicationCodeConfiguration:
          codeContent:
            s3ContentLocation:
              bucketArn: ${example.arn}
              fileKey: ${exampleBucketObjectv2.key}
          codeContentType: ZIPFILE
        environmentProperties:
          propertyGroups:
            - propertyGroupId: PROPERTY-GROUP-1
              propertyMap:
                Key1: Value1
            - propertyGroupId: PROPERTY-GROUP-2
              propertyMap:
                KeyA: ValueA
                KeyB: ValueB
        flinkApplicationConfiguration:
          checkpointConfiguration:
            configurationType: DEFAULT
          monitoringConfiguration:
            configurationType: CUSTOM
            logLevel: DEBUG
            metricsLevel: TASK
          parallelismConfiguration:
            autoScalingEnabled: true
            configurationType: CUSTOM
            parallelism: 10
            parallelismPerKpu: 4
      tags:
        Environment: test

The runtimeEnvironment specifies the Flink version. The applicationCodeConfiguration points to your JAR in S3 via s3ContentLocation. The flinkApplicationConfiguration controls runtime behavior: parallelismConfiguration sets how many parallel tasks run, monitoringConfiguration defines logging verbosity, and environmentProperties pass configuration to your application code.

Process streams with SQL queries

Teams familiar with SQL can analyze streaming data without learning a programming framework. SQL applications read from Kinesis streams, apply transformations, and write to multiple destinations.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.cloudwatch.LogGroup("example", {name: "example-sql-application"});
const exampleLogStream = new aws.cloudwatch.LogStream("example", {
    name: "example-sql-application",
    logGroupName: example.name,
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
    name: "example-sql-application",
    runtimeEnvironment: "SQL-1_0",
    serviceExecutionRole: exampleAwsIamRole.arn,
    applicationConfiguration: {
        applicationCodeConfiguration: {
            codeContent: {
                textContent: "SELECT 1;\n",
            },
            codeContentType: "PLAINTEXT",
        },
        sqlApplicationConfiguration: {
            input: {
                namePrefix: "PREFIX_1",
                inputParallelism: {
                    count: 3,
                },
                inputSchema: {
                    recordColumns: [
                        {
                            name: "COLUMN_1",
                            sqlType: "VARCHAR(8)",
                            mapping: "MAPPING-1",
                        },
                        {
                            name: "COLUMN_2",
                            sqlType: "DOUBLE",
                        },
                    ],
                    recordEncoding: "UTF-8",
                    recordFormat: {
                        recordFormatType: "CSV",
                        mappingParameters: {
                            csvMappingParameters: {
                                recordColumnDelimiter: ",",
                                recordRowDelimiter: "\n",
                            },
                        },
                    },
                },
                kinesisStreamsInput: {
                    resourceArn: exampleAwsKinesisStream.arn,
                },
            },
            outputs: [
                {
                    name: "OUTPUT_1",
                    destinationSchema: {
                        recordFormatType: "JSON",
                    },
                    lambdaOutput: {
                        resourceArn: exampleAwsLambdaFunction.arn,
                    },
                },
                {
                    name: "OUTPUT_2",
                    destinationSchema: {
                        recordFormatType: "CSV",
                    },
                    kinesisFirehoseOutput: {
                        resourceArn: exampleAwsKinesisFirehoseDeliveryStream.arn,
                    },
                },
            ],
            referenceDataSource: {
                tableName: "TABLE-1",
                referenceSchema: {
                    recordColumns: [{
                        name: "COLUMN_1",
                        sqlType: "INTEGER",
                    }],
                    recordFormat: {
                        recordFormatType: "JSON",
                        mappingParameters: {
                            jsonMappingParameters: {
                                recordRowPath: "$",
                            },
                        },
                    },
                },
                s3ReferenceDataSource: {
                    bucketArn: exampleAwsS3Bucket.arn,
                    fileKey: "KEY-1",
                },
            },
        },
    },
    cloudwatchLoggingOptions: {
        logStreamArn: exampleLogStream.arn,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.cloudwatch.LogGroup("example", name="example-sql-application")
example_log_stream = aws.cloudwatch.LogStream("example",
    name="example-sql-application",
    log_group_name=example.name)
example_application = aws.kinesisanalyticsv2.Application("example",
    name="example-sql-application",
    runtime_environment="SQL-1_0",
    service_execution_role=example_aws_iam_role["arn"],
    application_configuration={
        "application_code_configuration": {
            "code_content": {
                "text_content": "SELECT 1;\n",
            },
            "code_content_type": "PLAINTEXT",
        },
        "sql_application_configuration": {
            "input": {
                "name_prefix": "PREFIX_1",
                "input_parallelism": {
                    "count": 3,
                },
                "input_schema": {
                    "record_columns": [
                        {
                            "name": "COLUMN_1",
                            "sql_type": "VARCHAR(8)",
                            "mapping": "MAPPING-1",
                        },
                        {
                            "name": "COLUMN_2",
                            "sql_type": "DOUBLE",
                        },
                    ],
                    "record_encoding": "UTF-8",
                    "record_format": {
                        "record_format_type": "CSV",
                        "mapping_parameters": {
                            "csv_mapping_parameters": {
                                "record_column_delimiter": ",",
                                "record_row_delimiter": "\n",
                            },
                        },
                    },
                },
                "kinesis_streams_input": {
                    "resource_arn": example_aws_kinesis_stream["arn"],
                },
            },
            "outputs": [
                {
                    "name": "OUTPUT_1",
                    "destination_schema": {
                        "record_format_type": "JSON",
                    },
                    "lambda_output": {
                        "resource_arn": example_aws_lambda_function["arn"],
                    },
                },
                {
                    "name": "OUTPUT_2",
                    "destination_schema": {
                        "record_format_type": "CSV",
                    },
                    "kinesis_firehose_output": {
                        "resource_arn": example_aws_kinesis_firehose_delivery_stream["arn"],
                    },
                },
            ],
            "reference_data_source": {
                "table_name": "TABLE-1",
                "reference_schema": {
                    "record_columns": [{
                        "name": "COLUMN_1",
                        "sql_type": "INTEGER",
                    }],
                    "record_format": {
                        "record_format_type": "JSON",
                        "mapping_parameters": {
                            "json_mapping_parameters": {
                                "record_row_path": "$",
                            },
                        },
                    },
                },
                "s3_reference_data_source": {
                    "bucket_arn": example_aws_s3_bucket["arn"],
                    "file_key": "KEY-1",
                },
            },
        },
    },
    cloudwatch_logging_options={
        "log_stream_arn": example_log_stream.arn,
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
			Name: pulumi.String("example-sql-application"),
		})
		if err != nil {
			return err
		}
		exampleLogStream, err := cloudwatch.NewLogStream(ctx, "example", &cloudwatch.LogStreamArgs{
			Name:         pulumi.String("example-sql-application"),
			LogGroupName: example.Name,
		})
		if err != nil {
			return err
		}
		_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
			Name:                 pulumi.String("example-sql-application"),
			RuntimeEnvironment:   pulumi.String("SQL-1_0"),
			ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
			ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
				ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
					CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
						TextContent: pulumi.String("SELECT 1;\n"),
					},
					CodeContentType: pulumi.String("PLAINTEXT"),
				},
				SqlApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs{
					Input: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputTypeArgs{
						NamePrefix: pulumi.String("PREFIX_1"),
						InputParallelism: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs{
							Count: pulumi.Int(3),
						},
						InputSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs{
							RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArray{
								&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
									Name:    pulumi.String("COLUMN_1"),
									SqlType: pulumi.String("VARCHAR(8)"),
									Mapping: pulumi.String("MAPPING-1"),
								},
								&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
									Name:    pulumi.String("COLUMN_2"),
									SqlType: pulumi.String("DOUBLE"),
								},
							},
							RecordEncoding: pulumi.String("UTF-8"),
							RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs{
								RecordFormatType: pulumi.String("CSV"),
								MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs{
									CsvMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs{
										RecordColumnDelimiter: pulumi.String(","),
										RecordRowDelimiter:    pulumi.String("\n"),
									},
								},
							},
						},
						KinesisStreamsInput: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs{
							ResourceArn: pulumi.Any(exampleAwsKinesisStream.Arn),
						},
					},
					Outputs: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArray{
						&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
							Name: pulumi.String("OUTPUT_1"),
							DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
								RecordFormatType: pulumi.String("JSON"),
							},
							LambdaOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs{
								ResourceArn: pulumi.Any(exampleAwsLambdaFunction.Arn),
							},
						},
						&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
							Name: pulumi.String("OUTPUT_2"),
							DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
								RecordFormatType: pulumi.String("CSV"),
							},
							KinesisFirehoseOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs{
								ResourceArn: pulumi.Any(exampleAwsKinesisFirehoseDeliveryStream.Arn),
							},
						},
					},
					ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs{
						TableName: pulumi.String("TABLE-1"),
						ReferenceSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs{
							RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArray{
								&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs{
									Name:    pulumi.String("COLUMN_1"),
									SqlType: pulumi.String("INTEGER"),
								},
							},
							RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs{
								RecordFormatType: pulumi.String("JSON"),
								MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs{
									JsonMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs{
										RecordRowPath: pulumi.String("$"),
									},
								},
							},
						},
						S3ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs{
							BucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
							FileKey:   pulumi.String("KEY-1"),
						},
					},
				},
			},
			CloudwatchLoggingOptions: &kinesisanalyticsv2.ApplicationCloudwatchLoggingOptionsArgs{
				LogStreamArn: exampleLogStream.Arn,
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.CloudWatch.LogGroup("example", new()
    {
        Name = "example-sql-application",
    });

    var exampleLogStream = new Aws.CloudWatch.LogStream("example", new()
    {
        Name = "example-sql-application",
        LogGroupName = example.Name,
    });

    var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
    {
        Name = "example-sql-application",
        RuntimeEnvironment = "SQL-1_0",
        ServiceExecutionRole = exampleAwsIamRole.Arn,
        ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
        {
            ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
            {
                CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
                {
                    TextContent = @"SELECT 1;
",
                },
                CodeContentType = "PLAINTEXT",
            },
            SqlApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs
            {
                Input = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs
                {
                    NamePrefix = "PREFIX_1",
                    InputParallelism = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs
                    {
                        Count = 3,
                    },
                    InputSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs
                    {
                        RecordColumns = new[]
                        {
                            new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
                            {
                                Name = "COLUMN_1",
                                SqlType = "VARCHAR(8)",
                                Mapping = "MAPPING-1",
                            },
                            new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
                            {
                                Name = "COLUMN_2",
                                SqlType = "DOUBLE",
                            },
                        },
                        RecordEncoding = "UTF-8",
                        RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs
                        {
                            RecordFormatType = "CSV",
                            MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs
                            {
                                CsvMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs
                                {
                                    RecordColumnDelimiter = ",",
                                    RecordRowDelimiter = @"
",
                                },
                            },
                        },
                    },
                    KinesisStreamsInput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs
                    {
                        ResourceArn = exampleAwsKinesisStream.Arn,
                    },
                },
                Outputs = new[]
                {
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
                    {
                        Name = "OUTPUT_1",
                        DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
                        {
                            RecordFormatType = "JSON",
                        },
                        LambdaOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs
                        {
                            ResourceArn = exampleAwsLambdaFunction.Arn,
                        },
                    },
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
                    {
                        Name = "OUTPUT_2",
                        DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
                        {
                            RecordFormatType = "CSV",
                        },
                        KinesisFirehoseOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs
                        {
                            ResourceArn = exampleAwsKinesisFirehoseDeliveryStream.Arn,
                        },
                    },
                },
                ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs
                {
                    TableName = "TABLE-1",
                    ReferenceSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs
                    {
                        RecordColumns = new[]
                        {
                            new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs
                            {
                                Name = "COLUMN_1",
                                SqlType = "INTEGER",
                            },
                        },
                        RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs
                        {
                            RecordFormatType = "JSON",
                            MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs
                            {
                                JsonMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs
                                {
                                    RecordRowPath = "$",
                                },
                            },
                        },
                    },
                    S3ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs
                    {
                        BucketArn = exampleAwsS3Bucket.Arn,
                        FileKey = "KEY-1",
                    },
                },
            },
        },
        CloudwatchLoggingOptions = new Aws.KinesisAnalyticsV2.Inputs.ApplicationCloudwatchLoggingOptionsArgs
        {
            LogStreamArn = exampleLogStream.Arn,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.cloudwatch.LogStream;
import com.pulumi.aws.cloudwatch.LogStreamArgs;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationCloudwatchLoggingOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new LogGroup("example", LogGroupArgs.builder()
            .name("example-sql-application")
            .build());

        var exampleLogStream = new LogStream("exampleLogStream", LogStreamArgs.builder()
            .name("example-sql-application")
            .logGroupName(example.name())
            .build());

        var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
            .name("example-sql-application")
            .runtimeEnvironment("SQL-1_0")
            .serviceExecutionRole(exampleAwsIamRole.arn())
            .applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
                .applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
                    .codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
                        .textContent("""
SELECT 1;
                        """)
                        .build())
                    .codeContentType("PLAINTEXT")
                    .build())
                .sqlApplicationConfiguration(ApplicationApplicationConfigurationSqlApplicationConfigurationArgs.builder()
                    .input(ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs.builder()
                        .namePrefix("PREFIX_1")
                        .inputParallelism(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs.builder()
                            .count(3)
                            .build())
                        .inputSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs.builder()
                            .recordColumns(                            
                                ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
                                    .name("COLUMN_1")
                                    .sqlType("VARCHAR(8)")
                                    .mapping("MAPPING-1")
                                    .build(),
                                ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
                                    .name("COLUMN_2")
                                    .sqlType("DOUBLE")
                                    .build())
                            .recordEncoding("UTF-8")
                            .recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs.builder()
                                .recordFormatType("CSV")
                                .mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs.builder()
                                    .csvMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs.builder()
                                        .recordColumnDelimiter(",")
                                        .recordRowDelimiter("""

                                        """)
                                        .build())
                                    .build())
                                .build())
                            .build())
                        .kinesisStreamsInput(ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs.builder()
                            .resourceArn(exampleAwsKinesisStream.arn())
                            .build())
                        .build())
                    .outputs(                    
                        ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
                            .name("OUTPUT_1")
                            .destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
                                .recordFormatType("JSON")
                                .build())
                            .lambdaOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs.builder()
                                .resourceArn(exampleAwsLambdaFunction.arn())
                                .build())
                            .build(),
                        ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
                            .name("OUTPUT_2")
                            .destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
                                .recordFormatType("CSV")
                                .build())
                            .kinesisFirehoseOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs.builder()
                                .resourceArn(exampleAwsKinesisFirehoseDeliveryStream.arn())
                                .build())
                            .build())
                    .referenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs.builder()
                        .tableName("TABLE-1")
                        .referenceSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs.builder()
                            .recordColumns(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs.builder()
                                .name("COLUMN_1")
                                .sqlType("INTEGER")
                                .build())
                            .recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs.builder()
                                .recordFormatType("JSON")
                                .mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs.builder()
                                    .jsonMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs.builder()
                                        .recordRowPath("$")
                                        .build())
                                    .build())
                                .build())
                            .build())
                        .s3ReferenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs.builder()
                            .bucketArn(exampleAwsS3Bucket.arn())
                            .fileKey("KEY-1")
                            .build())
                        .build())
                    .build())
                .build())
            .cloudwatchLoggingOptions(ApplicationCloudwatchLoggingOptionsArgs.builder()
                .logStreamArn(exampleLogStream.arn())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:cloudwatch:LogGroup
    properties:
      name: example-sql-application
  exampleLogStream:
    type: aws:cloudwatch:LogStream
    name: example
    properties:
      name: example-sql-application
      logGroupName: ${example.name}
  exampleApplication:
    type: aws:kinesisanalyticsv2:Application
    name: example
    properties:
      name: example-sql-application
      runtimeEnvironment: SQL-1_0
      serviceExecutionRole: ${exampleAwsIamRole.arn}
      applicationConfiguration:
        applicationCodeConfiguration:
          codeContent:
            textContent: |
              SELECT 1;              
          codeContentType: PLAINTEXT
        sqlApplicationConfiguration:
          input:
            namePrefix: PREFIX_1
            inputParallelism:
              count: 3
            inputSchema:
              recordColumns:
                - name: COLUMN_1
                  sqlType: VARCHAR(8)
                  mapping: MAPPING-1
                - name: COLUMN_2
                  sqlType: DOUBLE
              recordEncoding: UTF-8
              recordFormat:
                recordFormatType: CSV
                mappingParameters:
                  csvMappingParameters:
                    recordColumnDelimiter: ','
                    recordRowDelimiter: |2+
            kinesisStreamsInput:
              resourceArn: ${exampleAwsKinesisStream.arn}
          outputs:
            - name: OUTPUT_1
              destinationSchema:
                recordFormatType: JSON
              lambdaOutput:
                resourceArn: ${exampleAwsLambdaFunction.arn}
            - name: OUTPUT_2
              destinationSchema:
                recordFormatType: CSV
              kinesisFirehoseOutput:
                resourceArn: ${exampleAwsKinesisFirehoseDeliveryStream.arn}
          referenceDataSource:
            tableName: TABLE-1
            referenceSchema:
              recordColumns:
                - name: COLUMN_1
                  sqlType: INTEGER
              recordFormat:
                recordFormatType: JSON
                mappingParameters:
                  jsonMappingParameters:
                    recordRowPath: $
            s3ReferenceDataSource:
              bucketArn: ${exampleAwsS3Bucket.arn}
              fileKey: KEY-1
      cloudwatchLoggingOptions:
        logStreamArn: ${exampleLogStream.arn}

The sqlApplicationConfiguration defines the entire data flow. The input block specifies the Kinesis stream source and schema (column names, types, delimiters). The outputs array routes transformed data to Lambda functions or Firehose delivery streams. The referenceDataSource joins streaming data with static lookup tables stored in S3.

Connect to private resources in a VPC

Applications that need to access RDS databases, ElastiCache clusters, or internal APIs require VPC configuration to join private subnets.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
    bucket: example.id,
    key: "example-flink-application",
    source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
    name: "example-flink-application",
    runtimeEnvironment: "FLINK-1_8",
    serviceExecutionRole: exampleAwsIamRole.arn,
    applicationConfiguration: {
        applicationCodeConfiguration: {
            codeContent: {
                s3ContentLocation: {
                    bucketArn: example.arn,
                    fileKey: exampleBucketObjectv2.key,
                },
            },
            codeContentType: "ZIPFILE",
        },
        vpcConfiguration: {
            securityGroupIds: [
                exampleAwsSecurityGroup[0].id,
                exampleAwsSecurityGroup[1].id,
            ],
            subnetIds: [exampleAwsSubnet.id],
        },
    },
});
import pulumi
import pulumi_aws as aws

example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
    bucket=example.id,
    key="example-flink-application",
    source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
    name="example-flink-application",
    runtime_environment="FLINK-1_8",
    service_execution_role=example_aws_iam_role["arn"],
    application_configuration={
        "application_code_configuration": {
            "code_content": {
                "s3_content_location": {
                    "bucket_arn": example.arn,
                    "file_key": example_bucket_objectv2.key,
                },
            },
            "code_content_type": "ZIPFILE",
        },
        "vpc_configuration": {
            "security_group_ids": [
                example_aws_security_group[0]["id"],
                example_aws_security_group[1]["id"],
            ],
            "subnet_ids": [example_aws_subnet["id"]],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
			Bucket: pulumi.String("example-flink-application"),
		})
		if err != nil {
			return err
		}
		exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
			Bucket: example.ID(),
			Key:    pulumi.String("example-flink-application"),
			Source: pulumi.NewFileAsset("flink-app.jar"),
		})
		if err != nil {
			return err
		}
		_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
			Name:                 pulumi.String("example-flink-application"),
			RuntimeEnvironment:   pulumi.String("FLINK-1_8"),
			ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
			ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
				ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
					CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
						S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
							BucketArn: example.Arn,
							FileKey:   exampleBucketObjectv2.Key,
						},
					},
					CodeContentType: pulumi.String("ZIPFILE"),
				},
				VpcConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationVpcConfigurationArgs{
					SecurityGroupIds: pulumi.StringArray{
						exampleAwsSecurityGroup[0].Id,
						exampleAwsSecurityGroup[1].Id,
					},
					SubnetIds: pulumi.StringArray{
						exampleAwsSubnet.Id,
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.S3.Bucket("example", new()
    {
        BucketName = "example-flink-application",
    });

    var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
    {
        Bucket = example.Id,
        Key = "example-flink-application",
        Source = new FileAsset("flink-app.jar"),
    });

    var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
    {
        Name = "example-flink-application",
        RuntimeEnvironment = "FLINK-1_8",
        ServiceExecutionRole = exampleAwsIamRole.Arn,
        ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
        {
            ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
            {
                CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
                {
                    S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
                    {
                        BucketArn = example.Arn,
                        FileKey = exampleBucketObjectv2.Key,
                    },
                },
                CodeContentType = "ZIPFILE",
            },
            VpcConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationVpcConfigurationArgs
            {
                SecurityGroupIds = new[]
                {
                    exampleAwsSecurityGroup[0].Id,
                    exampleAwsSecurityGroup[1].Id,
                },
                SubnetIds = new[]
                {
                    exampleAwsSubnet.Id,
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationVpcConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Bucket("example", BucketArgs.builder()
            .bucket("example-flink-application")
            .build());

        var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
            .bucket(example.id())
            .key("example-flink-application")
            .source(new FileAsset("flink-app.jar"))
            .build());

        var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
            .name("example-flink-application")
            .runtimeEnvironment("FLINK-1_8")
            .serviceExecutionRole(exampleAwsIamRole.arn())
            .applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
                .applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
                    .codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
                        .s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
                            .bucketArn(example.arn())
                            .fileKey(exampleBucketObjectv2.key())
                            .build())
                        .build())
                    .codeContentType("ZIPFILE")
                    .build())
                .vpcConfiguration(ApplicationApplicationConfigurationVpcConfigurationArgs.builder()
                    .securityGroupIds(                    
                        exampleAwsSecurityGroup[0].id(),
                        exampleAwsSecurityGroup[1].id())
                    .subnetIds(exampleAwsSubnet.id())
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:s3:Bucket
    properties:
      bucket: example-flink-application
  exampleBucketObjectv2:
    type: aws:s3:BucketObjectv2
    name: example
    properties:
      bucket: ${example.id}
      key: example-flink-application
      source:
        fn::FileAsset: flink-app.jar
  exampleApplication:
    type: aws:kinesisanalyticsv2:Application
    name: example
    properties:
      name: example-flink-application
      runtimeEnvironment: FLINK-1_8
      serviceExecutionRole: ${exampleAwsIamRole.arn}
      applicationConfiguration:
        applicationCodeConfiguration:
          codeContent:
            s3ContentLocation:
              bucketArn: ${example.arn}
              fileKey: ${exampleBucketObjectv2.key}
          codeContentType: ZIPFILE
        vpcConfiguration:
          securityGroupIds:
            - ${exampleAwsSecurityGroup[0].id}
            - ${exampleAwsSecurityGroup[1].id}
          subnetIds:
            - ${exampleAwsSubnet.id}

The vpcConfiguration property places your application in specified subnets with attached security groups, enabling access to private resources. Kinesis Data Analytics manages the network interfaces automatically. This extends the basic Flink configuration with VPC networking.

Beyond these examples

These snippets focus on specific application-level features: Flink and SQL runtime environments, stream processing with custom parallelism and monitoring, and VPC networking for private resource access. They’re intentionally minimal rather than full stream processing pipelines.

The examples may reference pre-existing infrastructure such as IAM execution roles with appropriate permissions, S3 buckets for code artifacts and reference data, Kinesis streams, Lambda functions, Firehose delivery streams (for SQL applications), and VPC subnets and security groups (for VPC configuration). They focus on configuring the application rather than provisioning everything around it.

To keep things focused, common application patterns are omitted, including:

  • Application snapshots and restore configuration
  • Auto-scaling policies (parallelismConfiguration.autoScalingEnabled shown but not tuned)
  • CloudWatch logging for Flink applications
  • Application lifecycle management (startApplication, forceStop)

These omissions are intentional: the goal is to illustrate how each application feature is wired, not provide drop-in stream processing modules. See the Kinesis Analytics v2 Application resource reference for all available configuration options.

Let's deploy AWS Kinesis Data Analytics Applications

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Application Types & Console Visibility
Why can't I see my SQL application in the AWS Console?
SQL applications created with this resource aren’t visible in the AWS Console. Use aws.kinesis.AnalyticsApplication instead if you need console management.
What's the difference between SQL and Flink applications?
SQL applications use SQL-1_0 runtime and process data with SQL queries. Flink applications use FLINK-* runtimes and run compiled Java/Scala code for complex stream processing.
What runtime environments are available?
Valid values are SQL-1_0 for SQL applications, and FLINK-1_6, FLINK-1_8, FLINK-1_11, FLINK-1_13, FLINK-1_15, FLINK-1_18, FLINK-1_19, FLINK-1_20 for Flink applications.
Configuration & Deployment
How do I deploy code to my application?
For Flink applications, use s3ContentLocation with bucketArn and fileKey pointing to your JAR file. For SQL applications, use textContent with inline SQL code and set codeContentType to PLAINTEXT.
What IAM permissions does the service execution role need?
The serviceExecutionRole must have permissions to access Kinesis data streams, Kinesis Data Firehose delivery streams, Amazon S3 objects, and other external resources your application uses.
What output destinations are supported for SQL applications?
SQL applications can output to Lambda functions using lambdaOutput or Kinesis Firehose delivery streams using kinesisFirehoseOutput.
Immutability & Updates
What properties can't be changed after creation?
The name, applicationMode, and description properties are immutable and require resource replacement if changed.
How does Pulumi track application updates?
Kinesis Data Analytics updates the versionId each time the application is updated. Pulumi uses this to detect configuration drift.
Lifecycle Management
How do I start or stop my application?
Set the startApplication property to true to start or false to stop the application.
What if my Flink application becomes unresponsive?
Use the forceStop property to force stop an unresponsive Flink-based application.
Networking & VPC
Can I run my application in a VPC?
Yes, configure vpcConfiguration with securityGroupIds and subnetIds to run your application within a VPC.

Using a different cloud?

Explore analytics guides for other cloud providers: