Deploy AWS Kinesis Data Analytics Applications

The aws:kinesisanalyticsv2/application:Application resource, part of the Pulumi AWS provider, defines a Kinesis Data Analytics application that processes streaming data using either Apache Flink or SQL. This guide focuses on three capabilities: Flink JAR deployment from S3, SQL-based stream processing, and VPC networking for private resource access.

Applications require IAM execution roles and reference external resources like Kinesis streams, S3 buckets, Lambda functions, and optionally VPC infrastructure. The examples are intentionally small. Combine them with your own IAM roles, data sources, and output destinations.

Stream processing workloads often use Apache Flink to transform and analyze data in real time without managing servers.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
    bucket: example.id,
    key: "example-flink-application",
    source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
    name: "example-flink-application",
    runtimeEnvironment: "FLINK-1_8",
    serviceExecutionRole: exampleAwsIamRole.arn,
    applicationConfiguration: {
        applicationCodeConfiguration: {
            codeContent: {
                s3ContentLocation: {
                    bucketArn: example.arn,
                    fileKey: exampleBucketObjectv2.key,
                },
            },
            codeContentType: "ZIPFILE",
        },
        environmentProperties: {
            propertyGroups: [
                {
                    propertyGroupId: "PROPERTY-GROUP-1",
                    propertyMap: {
                        Key1: "Value1",
                    },
                },
                {
                    propertyGroupId: "PROPERTY-GROUP-2",
                    propertyMap: {
                        KeyA: "ValueA",
                        KeyB: "ValueB",
                    },
                },
            ],
        },
        flinkApplicationConfiguration: {
            checkpointConfiguration: {
                configurationType: "DEFAULT",
            },
            monitoringConfiguration: {
                configurationType: "CUSTOM",
                logLevel: "DEBUG",
                metricsLevel: "TASK",
            },
            parallelismConfiguration: {
                autoScalingEnabled: true,
                configurationType: "CUSTOM",
                parallelism: 10,
                parallelismPerKpu: 4,
            },
        },
    },
    tags: {
        Environment: "test",
    },
});
import pulumi
import pulumi_aws as aws

example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
    bucket=example.id,
    key="example-flink-application",
    source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
    name="example-flink-application",
    runtime_environment="FLINK-1_8",
    service_execution_role=example_aws_iam_role["arn"],
    application_configuration={
        "application_code_configuration": {
            "code_content": {
                "s3_content_location": {
                    "bucket_arn": example.arn,
                    "file_key": example_bucket_objectv2.key,
                },
            },
            "code_content_type": "ZIPFILE",
        },
        "environment_properties": {
            "property_groups": [
                {
                    "property_group_id": "PROPERTY-GROUP-1",
                    "property_map": {
                        "Key1": "Value1",
                    },
                },
                {
                    "property_group_id": "PROPERTY-GROUP-2",
                    "property_map": {
                        "KeyA": "ValueA",
                        "KeyB": "ValueB",
                    },
                },
            ],
        },
        "flink_application_configuration": {
            "checkpoint_configuration": {
                "configuration_type": "DEFAULT",
            },
            "monitoring_configuration": {
                "configuration_type": "CUSTOM",
                "log_level": "DEBUG",
                "metrics_level": "TASK",
            },
            "parallelism_configuration": {
                "auto_scaling_enabled": True,
                "configuration_type": "CUSTOM",
                "parallelism": 10,
                "parallelism_per_kpu": 4,
            },
        },
    },
    tags={
        "Environment": "test",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
			Bucket: pulumi.String("example-flink-application"),
		})
		if err != nil {
			return err
		}
		exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
			Bucket: example.ID(),
			Key:    pulumi.String("example-flink-application"),
			Source: pulumi.NewFileAsset("flink-app.jar"),
		})
		if err != nil {
			return err
		}
		_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
			Name:                 pulumi.String("example-flink-application"),
			RuntimeEnvironment:   pulumi.String("FLINK-1_8"),
			ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
			ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
				ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
					CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
						S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
							BucketArn: example.Arn,
							FileKey:   exampleBucketObjectv2.Key,
						},
					},
					CodeContentType: pulumi.String("ZIPFILE"),
				},
				EnvironmentProperties: &kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesArgs{
					PropertyGroups: kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArray{
						&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
							PropertyGroupId: pulumi.String("PROPERTY-GROUP-1"),
							PropertyMap: pulumi.StringMap{
								"Key1": pulumi.String("Value1"),
							},
						},
						&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
							PropertyGroupId: pulumi.String("PROPERTY-GROUP-2"),
							PropertyMap: pulumi.StringMap{
								"KeyA": pulumi.String("ValueA"),
								"KeyB": pulumi.String("ValueB"),
							},
						},
					},
				},
				FlinkApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs{
					CheckpointConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs{
						ConfigurationType: pulumi.String("DEFAULT"),
					},
					MonitoringConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs{
						ConfigurationType: pulumi.String("CUSTOM"),
						LogLevel:          pulumi.String("DEBUG"),
						MetricsLevel:      pulumi.String("TASK"),
					},
					ParallelismConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs{
						AutoScalingEnabled: pulumi.Bool(true),
						ConfigurationType:  pulumi.String("CUSTOM"),
						Parallelism:        pulumi.Int(10),
						ParallelismPerKpu:  pulumi.Int(4),
					},
				},
			},
			Tags: pulumi.StringMap{
				"Environment": pulumi.String("test"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.S3.Bucket("example", new()
    {
        BucketName = "example-flink-application",
    });

    var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
    {
        Bucket = example.Id,
        Key = "example-flink-application",
        Source = new FileAsset("flink-app.jar"),
    });

    var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
    {
        Name = "example-flink-application",
        RuntimeEnvironment = "FLINK-1_8",
        ServiceExecutionRole = exampleAwsIamRole.Arn,
        ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
        {
            ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
            {
                CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
                {
                    S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
                    {
                        BucketArn = example.Arn,
                        FileKey = exampleBucketObjectv2.Key,
                    },
                },
                CodeContentType = "ZIPFILE",
            },
            EnvironmentProperties = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs
            {
                PropertyGroups = new[]
                {
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
                    {
                        PropertyGroupId = "PROPERTY-GROUP-1",
                        PropertyMap = 
                        {
                            { "Key1", "Value1" },
                        },
                    },
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
                    {
                        PropertyGroupId = "PROPERTY-GROUP-2",
                        PropertyMap = 
                        {
                            { "KeyA", "ValueA" },
                            { "KeyB", "ValueB" },
                        },
                    },
                },
            },
            FlinkApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs
            {
                CheckpointConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs
                {
                    ConfigurationType = "DEFAULT",
                },
                MonitoringConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs
                {
                    ConfigurationType = "CUSTOM",
                    LogLevel = "DEBUG",
                    MetricsLevel = "TASK",
                },
                ParallelismConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs
                {
                    AutoScalingEnabled = true,
                    ConfigurationType = "CUSTOM",
                    Parallelism = 10,
                    ParallelismPerKpu = 4,
                },
            },
        },
        Tags = 
        {
            { "Environment", "test" },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Bucket("example", BucketArgs.builder()
            .bucket("example-flink-application")
            .build());

        var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
            .bucket(example.id())
            .key("example-flink-application")
            .source(new FileAsset("flink-app.jar"))
            .build());

        var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
            .name("example-flink-application")
            .runtimeEnvironment("FLINK-1_8")
            .serviceExecutionRole(exampleAwsIamRole.arn())
            .applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
                .applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
                    .codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
                        .s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
                            .bucketArn(example.arn())
                            .fileKey(exampleBucketObjectv2.key())
                            .build())
                        .build())
                    .codeContentType("ZIPFILE")
                    .build())
                .environmentProperties(ApplicationApplicationConfigurationEnvironmentPropertiesArgs.builder()
                    .propertyGroups(                    
                        ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
                            .propertyGroupId("PROPERTY-GROUP-1")
                            .propertyMap(Map.of("Key1", "Value1"))
                            .build(),
                        ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
                            .propertyGroupId("PROPERTY-GROUP-2")
                            .propertyMap(Map.ofEntries(
                                Map.entry("KeyA", "ValueA"),
                                Map.entry("KeyB", "ValueB")
                            ))
                            .build())
                    .build())
                .flinkApplicationConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs.builder()
                    .checkpointConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs.builder()
                        .configurationType("DEFAULT")
                        .build())
                    .monitoringConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs.builder()
                        .configurationType("CUSTOM")
                        .logLevel("DEBUG")
                        .metricsLevel("TASK")
                        .build())
                    .parallelismConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs.builder()
                        .autoScalingEnabled(true)
                        .configurationType("CUSTOM")
                        .parallelism(10)
                        .parallelismPerKpu(4)
                        .build())
                    .build())
                .build())
            .tags(Map.of("Environment", "test"))
            .build());

    }
}
resources:
  example:
    type: aws:s3:Bucket
    properties:
      bucket: example-flink-application
  exampleBucketObjectv2:
    type: aws:s3:BucketObjectv2
    name: example
    properties:
      bucket: ${example.id}
      key: example-flink-application
      source:
        fn::FileAsset: flink-app.jar
  exampleApplication:
    type: aws:kinesisanalyticsv2:Application
    name: example
    properties:
      name: example-flink-application
      runtimeEnvironment: FLINK-1_8
      serviceExecutionRole: ${exampleAwsIamRole.arn}
      applicationConfiguration:
        applicationCodeConfiguration:
          codeContent:
            s3ContentLocation:
              bucketArn: ${example.arn}
              fileKey: ${exampleBucketObjectv2.key}
          codeContentType: ZIPFILE
        environmentProperties:
          propertyGroups:
            - propertyGroupId: PROPERTY-GROUP-1
              propertyMap:
                Key1: Value1
            - propertyGroupId: PROPERTY-GROUP-2
              propertyMap:
                KeyA: ValueA
                KeyB: ValueB
        flinkApplicationConfiguration:
          checkpointConfiguration:
            configurationType: DEFAULT
          monitoringConfiguration:
            configurationType: CUSTOM
            logLevel: DEBUG
            metricsLevel: TASK
          parallelismConfiguration:
            autoScalingEnabled: true
            configurationType: CUSTOM
            parallelism: 10
            parallelismPerKpu: 4
      tags:
        Environment: test

The runtimeEnvironment property specifies the Flink version. The applicationCodeConfiguration block points to your JAR file in S3 via s3ContentLocation. The flinkApplicationConfiguration controls checkpointing, parallelism, and monitoring. Environment properties pass configuration to your application code at runtime through propertyGroups.

Process streams with SQL queries

Teams familiar with SQL can analyze streaming data without learning a programming framework by writing SQL queries that read from Kinesis streams and write to destinations.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.cloudwatch.LogGroup("example", {name: "example-sql-application"});
const exampleLogStream = new aws.cloudwatch.LogStream("example", {
    name: "example-sql-application",
    logGroupName: example.name,
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
    name: "example-sql-application",
    runtimeEnvironment: "SQL-1_0",
    serviceExecutionRole: exampleAwsIamRole.arn,
    applicationConfiguration: {
        applicationCodeConfiguration: {
            codeContent: {
                textContent: "SELECT 1;\n",
            },
            codeContentType: "PLAINTEXT",
        },
        sqlApplicationConfiguration: {
            input: {
                namePrefix: "PREFIX_1",
                inputParallelism: {
                    count: 3,
                },
                inputSchema: {
                    recordColumns: [
                        {
                            name: "COLUMN_1",
                            sqlType: "VARCHAR(8)",
                            mapping: "MAPPING-1",
                        },
                        {
                            name: "COLUMN_2",
                            sqlType: "DOUBLE",
                        },
                    ],
                    recordEncoding: "UTF-8",
                    recordFormat: {
                        recordFormatType: "CSV",
                        mappingParameters: {
                            csvMappingParameters: {
                                recordColumnDelimiter: ",",
                                recordRowDelimiter: "\n",
                            },
                        },
                    },
                },
                kinesisStreamsInput: {
                    resourceArn: exampleAwsKinesisStream.arn,
                },
            },
            outputs: [
                {
                    name: "OUTPUT_1",
                    destinationSchema: {
                        recordFormatType: "JSON",
                    },
                    lambdaOutput: {
                        resourceArn: exampleAwsLambdaFunction.arn,
                    },
                },
                {
                    name: "OUTPUT_2",
                    destinationSchema: {
                        recordFormatType: "CSV",
                    },
                    kinesisFirehoseOutput: {
                        resourceArn: exampleAwsKinesisFirehoseDeliveryStream.arn,
                    },
                },
            ],
            referenceDataSource: {
                tableName: "TABLE-1",
                referenceSchema: {
                    recordColumns: [{
                        name: "COLUMN_1",
                        sqlType: "INTEGER",
                    }],
                    recordFormat: {
                        recordFormatType: "JSON",
                        mappingParameters: {
                            jsonMappingParameters: {
                                recordRowPath: "$",
                            },
                        },
                    },
                },
                s3ReferenceDataSource: {
                    bucketArn: exampleAwsS3Bucket.arn,
                    fileKey: "KEY-1",
                },
            },
        },
    },
    cloudwatchLoggingOptions: {
        logStreamArn: exampleLogStream.arn,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.cloudwatch.LogGroup("example", name="example-sql-application")
example_log_stream = aws.cloudwatch.LogStream("example",
    name="example-sql-application",
    log_group_name=example.name)
example_application = aws.kinesisanalyticsv2.Application("example",
    name="example-sql-application",
    runtime_environment="SQL-1_0",
    service_execution_role=example_aws_iam_role["arn"],
    application_configuration={
        "application_code_configuration": {
            "code_content": {
                "text_content": "SELECT 1;\n",
            },
            "code_content_type": "PLAINTEXT",
        },
        "sql_application_configuration": {
            "input": {
                "name_prefix": "PREFIX_1",
                "input_parallelism": {
                    "count": 3,
                },
                "input_schema": {
                    "record_columns": [
                        {
                            "name": "COLUMN_1",
                            "sql_type": "VARCHAR(8)",
                            "mapping": "MAPPING-1",
                        },
                        {
                            "name": "COLUMN_2",
                            "sql_type": "DOUBLE",
                        },
                    ],
                    "record_encoding": "UTF-8",
                    "record_format": {
                        "record_format_type": "CSV",
                        "mapping_parameters": {
                            "csv_mapping_parameters": {
                                "record_column_delimiter": ",",
                                "record_row_delimiter": "\n",
                            },
                        },
                    },
                },
                "kinesis_streams_input": {
                    "resource_arn": example_aws_kinesis_stream["arn"],
                },
            },
            "outputs": [
                {
                    "name": "OUTPUT_1",
                    "destination_schema": {
                        "record_format_type": "JSON",
                    },
                    "lambda_output": {
                        "resource_arn": example_aws_lambda_function["arn"],
                    },
                },
                {
                    "name": "OUTPUT_2",
                    "destination_schema": {
                        "record_format_type": "CSV",
                    },
                    "kinesis_firehose_output": {
                        "resource_arn": example_aws_kinesis_firehose_delivery_stream["arn"],
                    },
                },
            ],
            "reference_data_source": {
                "table_name": "TABLE-1",
                "reference_schema": {
                    "record_columns": [{
                        "name": "COLUMN_1",
                        "sql_type": "INTEGER",
                    }],
                    "record_format": {
                        "record_format_type": "JSON",
                        "mapping_parameters": {
                            "json_mapping_parameters": {
                                "record_row_path": "$",
                            },
                        },
                    },
                },
                "s3_reference_data_source": {
                    "bucket_arn": example_aws_s3_bucket["arn"],
                    "file_key": "KEY-1",
                },
            },
        },
    },
    cloudwatch_logging_options={
        "log_stream_arn": example_log_stream.arn,
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
			Name: pulumi.String("example-sql-application"),
		})
		if err != nil {
			return err
		}
		exampleLogStream, err := cloudwatch.NewLogStream(ctx, "example", &cloudwatch.LogStreamArgs{
			Name:         pulumi.String("example-sql-application"),
			LogGroupName: example.Name,
		})
		if err != nil {
			return err
		}
		_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
			Name:                 pulumi.String("example-sql-application"),
			RuntimeEnvironment:   pulumi.String("SQL-1_0"),
			ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
			ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
				ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
					CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
						TextContent: pulumi.String("SELECT 1;\n"),
					},
					CodeContentType: pulumi.String("PLAINTEXT"),
				},
				SqlApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs{
					Input: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputTypeArgs{
						NamePrefix: pulumi.String("PREFIX_1"),
						InputParallelism: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs{
							Count: pulumi.Int(3),
						},
						InputSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs{
							RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArray{
								&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
									Name:    pulumi.String("COLUMN_1"),
									SqlType: pulumi.String("VARCHAR(8)"),
									Mapping: pulumi.String("MAPPING-1"),
								},
								&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
									Name:    pulumi.String("COLUMN_2"),
									SqlType: pulumi.String("DOUBLE"),
								},
							},
							RecordEncoding: pulumi.String("UTF-8"),
							RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs{
								RecordFormatType: pulumi.String("CSV"),
								MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs{
									CsvMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs{
										RecordColumnDelimiter: pulumi.String(","),
										RecordRowDelimiter:    pulumi.String("\n"),
									},
								},
							},
						},
						KinesisStreamsInput: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs{
							ResourceArn: pulumi.Any(exampleAwsKinesisStream.Arn),
						},
					},
					Outputs: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArray{
						&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
							Name: pulumi.String("OUTPUT_1"),
							DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
								RecordFormatType: pulumi.String("JSON"),
							},
							LambdaOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs{
								ResourceArn: pulumi.Any(exampleAwsLambdaFunction.Arn),
							},
						},
						&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
							Name: pulumi.String("OUTPUT_2"),
							DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
								RecordFormatType: pulumi.String("CSV"),
							},
							KinesisFirehoseOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs{
								ResourceArn: pulumi.Any(exampleAwsKinesisFirehoseDeliveryStream.Arn),
							},
						},
					},
					ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs{
						TableName: pulumi.String("TABLE-1"),
						ReferenceSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs{
							RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArray{
								&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs{
									Name:    pulumi.String("COLUMN_1"),
									SqlType: pulumi.String("INTEGER"),
								},
							},
							RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs{
								RecordFormatType: pulumi.String("JSON"),
								MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs{
									JsonMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs{
										RecordRowPath: pulumi.String("$"),
									},
								},
							},
						},
						S3ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs{
							BucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
							FileKey:   pulumi.String("KEY-1"),
						},
					},
				},
			},
			CloudwatchLoggingOptions: &kinesisanalyticsv2.ApplicationCloudwatchLoggingOptionsArgs{
				LogStreamArn: exampleLogStream.Arn,
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.CloudWatch.LogGroup("example", new()
    {
        Name = "example-sql-application",
    });

    var exampleLogStream = new Aws.CloudWatch.LogStream("example", new()
    {
        Name = "example-sql-application",
        LogGroupName = example.Name,
    });

    var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
    {
        Name = "example-sql-application",
        RuntimeEnvironment = "SQL-1_0",
        ServiceExecutionRole = exampleAwsIamRole.Arn,
        ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
        {
            ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
            {
                CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
                {
                    TextContent = @"SELECT 1;
",
                },
                CodeContentType = "PLAINTEXT",
            },
            SqlApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs
            {
                Input = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs
                {
                    NamePrefix = "PREFIX_1",
                    InputParallelism = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs
                    {
                        Count = 3,
                    },
                    InputSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs
                    {
                        RecordColumns = new[]
                        {
                            new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
                            {
                                Name = "COLUMN_1",
                                SqlType = "VARCHAR(8)",
                                Mapping = "MAPPING-1",
                            },
                            new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
                            {
                                Name = "COLUMN_2",
                                SqlType = "DOUBLE",
                            },
                        },
                        RecordEncoding = "UTF-8",
                        RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs
                        {
                            RecordFormatType = "CSV",
                            MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs
                            {
                                CsvMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs
                                {
                                    RecordColumnDelimiter = ",",
                                    RecordRowDelimiter = @"
",
                                },
                            },
                        },
                    },
                    KinesisStreamsInput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs
                    {
                        ResourceArn = exampleAwsKinesisStream.Arn,
                    },
                },
                Outputs = new[]
                {
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
                    {
                        Name = "OUTPUT_1",
                        DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
                        {
                            RecordFormatType = "JSON",
                        },
                        LambdaOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs
                        {
                            ResourceArn = exampleAwsLambdaFunction.Arn,
                        },
                    },
                    new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
                    {
                        Name = "OUTPUT_2",
                        DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
                        {
                            RecordFormatType = "CSV",
                        },
                        KinesisFirehoseOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs
                        {
                            ResourceArn = exampleAwsKinesisFirehoseDeliveryStream.Arn,
                        },
                    },
                },
                ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs
                {
                    TableName = "TABLE-1",
                    ReferenceSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs
                    {
                        RecordColumns = new[]
                        {
                            new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs
                            {
                                Name = "COLUMN_1",
                                SqlType = "INTEGER",
                            },
                        },
                        RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs
                        {
                            RecordFormatType = "JSON",
                            MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs
                            {
                                JsonMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs
                                {
                                    RecordRowPath = "$",
                                },
                            },
                        },
                    },
                    S3ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs
                    {
                        BucketArn = exampleAwsS3Bucket.Arn,
                        FileKey = "KEY-1",
                    },
                },
            },
        },
        CloudwatchLoggingOptions = new Aws.KinesisAnalyticsV2.Inputs.ApplicationCloudwatchLoggingOptionsArgs
        {
            LogStreamArn = exampleLogStream.Arn,
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.cloudwatch.LogStream;
import com.pulumi.aws.cloudwatch.LogStreamArgs;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationCloudwatchLoggingOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new LogGroup("example", LogGroupArgs.builder()
            .name("example-sql-application")
            .build());

        var exampleLogStream = new LogStream("exampleLogStream", LogStreamArgs.builder()
            .name("example-sql-application")
            .logGroupName(example.name())
            .build());

        var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
            .name("example-sql-application")
            .runtimeEnvironment("SQL-1_0")
            .serviceExecutionRole(exampleAwsIamRole.arn())
            .applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
                .applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
                    .codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
                        .textContent("""
SELECT 1;
                        """)
                        .build())
                    .codeContentType("PLAINTEXT")
                    .build())
                .sqlApplicationConfiguration(ApplicationApplicationConfigurationSqlApplicationConfigurationArgs.builder()
                    .input(ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs.builder()
                        .namePrefix("PREFIX_1")
                        .inputParallelism(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs.builder()
                            .count(3)
                            .build())
                        .inputSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs.builder()
                            .recordColumns(                            
                                ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
                                    .name("COLUMN_1")
                                    .sqlType("VARCHAR(8)")
                                    .mapping("MAPPING-1")
                                    .build(),
                                ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
                                    .name("COLUMN_2")
                                    .sqlType("DOUBLE")
                                    .build())
                            .recordEncoding("UTF-8")
                            .recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs.builder()
                                .recordFormatType("CSV")
                                .mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs.builder()
                                    .csvMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs.builder()
                                        .recordColumnDelimiter(",")
                                        .recordRowDelimiter("""

                                        """)
                                        .build())
                                    .build())
                                .build())
                            .build())
                        .kinesisStreamsInput(ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs.builder()
                            .resourceArn(exampleAwsKinesisStream.arn())
                            .build())
                        .build())
                    .outputs(                    
                        ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
                            .name("OUTPUT_1")
                            .destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
                                .recordFormatType("JSON")
                                .build())
                            .lambdaOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs.builder()
                                .resourceArn(exampleAwsLambdaFunction.arn())
                                .build())
                            .build(),
                        ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
                            .name("OUTPUT_2")
                            .destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
                                .recordFormatType("CSV")
                                .build())
                            .kinesisFirehoseOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs.builder()
                                .resourceArn(exampleAwsKinesisFirehoseDeliveryStream.arn())
                                .build())
                            .build())
                    .referenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs.builder()
                        .tableName("TABLE-1")
                        .referenceSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs.builder()
                            .recordColumns(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs.builder()
                                .name("COLUMN_1")
                                .sqlType("INTEGER")
                                .build())
                            .recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs.builder()
                                .recordFormatType("JSON")
                                .mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs.builder()
                                    .jsonMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs.builder()
                                        .recordRowPath("$")
                                        .build())
                                    .build())
                                .build())
                            .build())
                        .s3ReferenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs.builder()
                            .bucketArn(exampleAwsS3Bucket.arn())
                            .fileKey("KEY-1")
                            .build())
                        .build())
                    .build())
                .build())
            .cloudwatchLoggingOptions(ApplicationCloudwatchLoggingOptionsArgs.builder()
                .logStreamArn(exampleLogStream.arn())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:cloudwatch:LogGroup
    properties:
      name: example-sql-application
  exampleLogStream:
    type: aws:cloudwatch:LogStream
    name: example
    properties:
      name: example-sql-application
      logGroupName: ${example.name}
  exampleApplication:
    type: aws:kinesisanalyticsv2:Application
    name: example
    properties:
      name: example-sql-application
      runtimeEnvironment: SQL-1_0
      serviceExecutionRole: ${exampleAwsIamRole.arn}
      applicationConfiguration:
        applicationCodeConfiguration:
          codeContent:
            textContent: |
              SELECT 1;              
          codeContentType: PLAINTEXT
        sqlApplicationConfiguration:
          input:
            namePrefix: PREFIX_1
            inputParallelism:
              count: 3
            inputSchema:
              recordColumns:
                - name: COLUMN_1
                  sqlType: VARCHAR(8)
                  mapping: MAPPING-1
                - name: COLUMN_2
                  sqlType: DOUBLE
              recordEncoding: UTF-8
              recordFormat:
                recordFormatType: CSV
                mappingParameters:
                  csvMappingParameters:
                    recordColumnDelimiter: ','
                    recordRowDelimiter: |2+
            kinesisStreamsInput:
              resourceArn: ${exampleAwsKinesisStream.arn}
          outputs:
            - name: OUTPUT_1
              destinationSchema:
                recordFormatType: JSON
              lambdaOutput:
                resourceArn: ${exampleAwsLambdaFunction.arn}
            - name: OUTPUT_2
              destinationSchema:
                recordFormatType: CSV
              kinesisFirehoseOutput:
                resourceArn: ${exampleAwsKinesisFirehoseDeliveryStream.arn}
          referenceDataSource:
            tableName: TABLE-1
            referenceSchema:
              recordColumns:
                - name: COLUMN_1
                  sqlType: INTEGER
              recordFormat:
                recordFormatType: JSON
                mappingParameters:
                  jsonMappingParameters:
                    recordRowPath: $
            s3ReferenceDataSource:
              bucketArn: ${exampleAwsS3Bucket.arn}
              fileKey: KEY-1
      cloudwatchLoggingOptions:
        logStreamArn: ${exampleLogStream.arn}

The sqlApplicationConfiguration defines inputs, outputs, and reference data. The input block specifies a Kinesis stream source with an inputSchema that describes record structure. Outputs route transformed data to Lambda functions or Firehose delivery streams. The referenceDataSource joins streaming data with static lookup tables stored in S3.

Connect to private VPC resources

Applications that need to access RDS databases, ElastiCache clusters, or internal APIs require VPC configuration to join private subnets.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
    bucket: example.id,
    key: "example-flink-application",
    source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
    name: "example-flink-application",
    runtimeEnvironment: "FLINK-1_8",
    serviceExecutionRole: exampleAwsIamRole.arn,
    applicationConfiguration: {
        applicationCodeConfiguration: {
            codeContent: {
                s3ContentLocation: {
                    bucketArn: example.arn,
                    fileKey: exampleBucketObjectv2.key,
                },
            },
            codeContentType: "ZIPFILE",
        },
        vpcConfiguration: {
            securityGroupIds: [
                exampleAwsSecurityGroup[0].id,
                exampleAwsSecurityGroup[1].id,
            ],
            subnetIds: [exampleAwsSubnet.id],
        },
    },
});
import pulumi
import pulumi_aws as aws

example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
    bucket=example.id,
    key="example-flink-application",
    source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
    name="example-flink-application",
    runtime_environment="FLINK-1_8",
    service_execution_role=example_aws_iam_role["arn"],
    application_configuration={
        "application_code_configuration": {
            "code_content": {
                "s3_content_location": {
                    "bucket_arn": example.arn,
                    "file_key": example_bucket_objectv2.key,
                },
            },
            "code_content_type": "ZIPFILE",
        },
        "vpc_configuration": {
            "security_group_ids": [
                example_aws_security_group[0]["id"],
                example_aws_security_group[1]["id"],
            ],
            "subnet_ids": [example_aws_subnet["id"]],
        },
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
			Bucket: pulumi.String("example-flink-application"),
		})
		if err != nil {
			return err
		}
		exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
			Bucket: example.ID(),
			Key:    pulumi.String("example-flink-application"),
			Source: pulumi.NewFileAsset("flink-app.jar"),
		})
		if err != nil {
			return err
		}
		_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
			Name:                 pulumi.String("example-flink-application"),
			RuntimeEnvironment:   pulumi.String("FLINK-1_8"),
			ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
			ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
				ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
					CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
						S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
							BucketArn: example.Arn,
							FileKey:   exampleBucketObjectv2.Key,
						},
					},
					CodeContentType: pulumi.String("ZIPFILE"),
				},
				VpcConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationVpcConfigurationArgs{
					SecurityGroupIds: pulumi.StringArray{
						exampleAwsSecurityGroup[0].Id,
						exampleAwsSecurityGroup[1].Id,
					},
					SubnetIds: pulumi.StringArray{
						exampleAwsSubnet.Id,
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.S3.Bucket("example", new()
    {
        BucketName = "example-flink-application",
    });

    var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
    {
        Bucket = example.Id,
        Key = "example-flink-application",
        Source = new FileAsset("flink-app.jar"),
    });

    var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
    {
        Name = "example-flink-application",
        RuntimeEnvironment = "FLINK-1_8",
        ServiceExecutionRole = exampleAwsIamRole.Arn,
        ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
        {
            ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
            {
                CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
                {
                    S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
                    {
                        BucketArn = example.Arn,
                        FileKey = exampleBucketObjectv2.Key,
                    },
                },
                CodeContentType = "ZIPFILE",
            },
            VpcConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationVpcConfigurationArgs
            {
                SecurityGroupIds = new[]
                {
                    exampleAwsSecurityGroup[0].Id,
                    exampleAwsSecurityGroup[1].Id,
                },
                SubnetIds = new[]
                {
                    exampleAwsSubnet.Id,
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationVpcConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new Bucket("example", BucketArgs.builder()
            .bucket("example-flink-application")
            .build());

        var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
            .bucket(example.id())
            .key("example-flink-application")
            .source(new FileAsset("flink-app.jar"))
            .build());

        var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
            .name("example-flink-application")
            .runtimeEnvironment("FLINK-1_8")
            .serviceExecutionRole(exampleAwsIamRole.arn())
            .applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
                .applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
                    .codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
                        .s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
                            .bucketArn(example.arn())
                            .fileKey(exampleBucketObjectv2.key())
                            .build())
                        .build())
                    .codeContentType("ZIPFILE")
                    .build())
                .vpcConfiguration(ApplicationApplicationConfigurationVpcConfigurationArgs.builder()
                    .securityGroupIds(                    
                        exampleAwsSecurityGroup[0].id(),
                        exampleAwsSecurityGroup[1].id())
                    .subnetIds(exampleAwsSubnet.id())
                    .build())
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:s3:Bucket
    properties:
      bucket: example-flink-application
  exampleBucketObjectv2:
    type: aws:s3:BucketObjectv2
    name: example
    properties:
      bucket: ${example.id}
      key: example-flink-application
      source:
        fn::FileAsset: flink-app.jar
  exampleApplication:
    type: aws:kinesisanalyticsv2:Application
    name: example
    properties:
      name: example-flink-application
      runtimeEnvironment: FLINK-1_8
      serviceExecutionRole: ${exampleAwsIamRole.arn}
      applicationConfiguration:
        applicationCodeConfiguration:
          codeContent:
            s3ContentLocation:
              bucketArn: ${example.arn}
              fileKey: ${exampleBucketObjectv2.key}
          codeContentType: ZIPFILE
        vpcConfiguration:
          securityGroupIds:
            - ${exampleAwsSecurityGroup[0].id}
            - ${exampleAwsSecurityGroup[1].id}
          subnetIds:
            - ${exampleAwsSubnet.id}

The vpcConfiguration property places your application in specified subnets with attached security groups, enabling access to private resources. Kinesis Data Analytics manages the network interfaces automatically. Your execution role needs EC2 permissions to create and manage these interfaces.

Beyond these examples

These snippets focus on specific application-level features: Flink and SQL runtime environments, stream inputs and multiple output destinations, and VPC networking for private resource access. They’re intentionally minimal rather than full stream processing pipelines.

The examples rely on pre-existing infrastructure such as IAM execution roles with appropriate permissions, S3 buckets for application code and reference data, Kinesis streams, Lambda functions, Firehose delivery streams, and VPC subnets and security groups for the VPC example. They focus on configuring the application rather than provisioning everything around it.

To keep things focused, common application patterns are omitted, including:

  • Application lifecycle (startApplication, forceStop)
  • Monitoring configuration (metricsLevel, logLevel tuning)
  • Snapshot and restore configuration
  • Auto-scaling parameters beyond basic parallelism

These omissions are intentional: the goal is to illustrate how each application feature is wired, not provide drop-in stream processing modules. See the Kinesis Analytics v2 Application resource reference for all available configuration options.

Let's deploy AWS Kinesis Data Analytics Applications

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Application Types & Console Visibility
Can I view my SQL application in the AWS Console?
No, SQL applications created with aws.kinesisanalyticsv2.Application cannot be viewed in the AWS Console. Use aws.kinesis.AnalyticsApplication instead if you need console visibility.
What application modes are available and can I change them?
Two modes are available: STREAMING and INTERACTIVE. The applicationMode property is immutable and cannot be changed after creation.
Configuration & Deployment
What runtime environments are supported?
Valid runtime environments are: SQL-1_0, FLINK-1_6, FLINK-1_8, FLINK-1_11, FLINK-1_13, FLINK-1_15, FLINK-1_18, FLINK-1_19, FLINK-1_20.
How do I deploy Flink application code?
Use applicationCodeConfiguration.codeContent.s3ContentLocation with bucketArn and fileKey pointing to your JAR file in S3, and set codeContentType to ZIPFILE.
How do I deploy SQL application code?
Use applicationCodeConfiguration.codeContent.textContent with your SQL code and set codeContentType to PLAINTEXT.
What IAM permissions does my application need?
Configure serviceExecutionRole with an IAM role ARN that grants access to Kinesis data streams, Kinesis Data Firehose delivery streams, Amazon S3 objects, and other external resources your application uses.
Immutability & Updates
What properties can't I change after creating my application?
The applicationMode, name, and description properties are immutable. Changing them requires replacing the entire application resource.
What happens to the version when I update my application?
Kinesis Data Analytics automatically increments the versionId each time you update the application configuration.
Networking & VPC
Can I connect my application to a VPC?
Yes, configure vpcConfiguration with securityGroupIds and subnetIds to connect your application to VPC resources.
Lifecycle Management
How do I start or stop my application?
Set startApplication to true to start the application or false to stop it. For unresponsive Flink applications, use forceStop to force termination.

Using a different cloud?

Explore analytics guides for other cloud providers: