The aws:kinesisanalyticsv2/application:Application resource, part of the Pulumi AWS provider, defines a Kinesis Data Analytics application that processes streaming data using either Apache Flink or SQL. This guide focuses on three capabilities: Flink JAR deployment from S3, SQL-based stream processing, and VPC networking for private resource access.
Applications require IAM execution roles and reference external resources like Kinesis streams, S3 buckets, Lambda functions, and optionally VPC infrastructure. The examples are intentionally small. Combine them with your own IAM roles, data sources, and output destinations.
Deploy a Flink application from S3
Stream processing workloads often use Apache Flink to transform and analyze data in real time without managing servers.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
bucket: example.id,
key: "example-flink-application",
source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
name: "example-flink-application",
runtimeEnvironment: "FLINK-1_8",
serviceExecutionRole: exampleAwsIamRole.arn,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: example.arn,
fileKey: exampleBucketObjectv2.key,
},
},
codeContentType: "ZIPFILE",
},
environmentProperties: {
propertyGroups: [
{
propertyGroupId: "PROPERTY-GROUP-1",
propertyMap: {
Key1: "Value1",
},
},
{
propertyGroupId: "PROPERTY-GROUP-2",
propertyMap: {
KeyA: "ValueA",
KeyB: "ValueB",
},
},
],
},
flinkApplicationConfiguration: {
checkpointConfiguration: {
configurationType: "DEFAULT",
},
monitoringConfiguration: {
configurationType: "CUSTOM",
logLevel: "DEBUG",
metricsLevel: "TASK",
},
parallelismConfiguration: {
autoScalingEnabled: true,
configurationType: "CUSTOM",
parallelism: 10,
parallelismPerKpu: 4,
},
},
},
tags: {
Environment: "test",
},
});
import pulumi
import pulumi_aws as aws
example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
bucket=example.id,
key="example-flink-application",
source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
name="example-flink-application",
runtime_environment="FLINK-1_8",
service_execution_role=example_aws_iam_role["arn"],
application_configuration={
"application_code_configuration": {
"code_content": {
"s3_content_location": {
"bucket_arn": example.arn,
"file_key": example_bucket_objectv2.key,
},
},
"code_content_type": "ZIPFILE",
},
"environment_properties": {
"property_groups": [
{
"property_group_id": "PROPERTY-GROUP-1",
"property_map": {
"Key1": "Value1",
},
},
{
"property_group_id": "PROPERTY-GROUP-2",
"property_map": {
"KeyA": "ValueA",
"KeyB": "ValueB",
},
},
],
},
"flink_application_configuration": {
"checkpoint_configuration": {
"configuration_type": "DEFAULT",
},
"monitoring_configuration": {
"configuration_type": "CUSTOM",
"log_level": "DEBUG",
"metrics_level": "TASK",
},
"parallelism_configuration": {
"auto_scaling_enabled": True,
"configuration_type": "CUSTOM",
"parallelism": 10,
"parallelism_per_kpu": 4,
},
},
},
tags={
"Environment": "test",
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example-flink-application"),
})
if err != nil {
return err
}
exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
Bucket: example.ID(),
Key: pulumi.String("example-flink-application"),
Source: pulumi.NewFileAsset("flink-app.jar"),
})
if err != nil {
return err
}
_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
Name: pulumi.String("example-flink-application"),
RuntimeEnvironment: pulumi.String("FLINK-1_8"),
ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
BucketArn: example.Arn,
FileKey: exampleBucketObjectv2.Key,
},
},
CodeContentType: pulumi.String("ZIPFILE"),
},
EnvironmentProperties: &kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesArgs{
PropertyGroups: kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
PropertyGroupId: pulumi.String("PROPERTY-GROUP-1"),
PropertyMap: pulumi.StringMap{
"Key1": pulumi.String("Value1"),
},
},
&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
PropertyGroupId: pulumi.String("PROPERTY-GROUP-2"),
PropertyMap: pulumi.StringMap{
"KeyA": pulumi.String("ValueA"),
"KeyB": pulumi.String("ValueB"),
},
},
},
},
FlinkApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs{
CheckpointConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs{
ConfigurationType: pulumi.String("DEFAULT"),
},
MonitoringConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs{
ConfigurationType: pulumi.String("CUSTOM"),
LogLevel: pulumi.String("DEBUG"),
MetricsLevel: pulumi.String("TASK"),
},
ParallelismConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs{
AutoScalingEnabled: pulumi.Bool(true),
ConfigurationType: pulumi.String("CUSTOM"),
Parallelism: pulumi.Int(10),
ParallelismPerKpu: pulumi.Int(4),
},
},
},
Tags: pulumi.StringMap{
"Environment": pulumi.String("test"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.S3.Bucket("example", new()
{
BucketName = "example-flink-application",
});
var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
{
Bucket = example.Id,
Key = "example-flink-application",
Source = new FileAsset("flink-app.jar"),
});
var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
{
Name = "example-flink-application",
RuntimeEnvironment = "FLINK-1_8",
ServiceExecutionRole = exampleAwsIamRole.Arn,
ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
{
ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
{
CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
{
S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
{
BucketArn = example.Arn,
FileKey = exampleBucketObjectv2.Key,
},
},
CodeContentType = "ZIPFILE",
},
EnvironmentProperties = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs
{
PropertyGroups = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
{
PropertyGroupId = "PROPERTY-GROUP-1",
PropertyMap =
{
{ "Key1", "Value1" },
},
},
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
{
PropertyGroupId = "PROPERTY-GROUP-2",
PropertyMap =
{
{ "KeyA", "ValueA" },
{ "KeyB", "ValueB" },
},
},
},
},
FlinkApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs
{
CheckpointConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs
{
ConfigurationType = "DEFAULT",
},
MonitoringConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs
{
ConfigurationType = "CUSTOM",
LogLevel = "DEBUG",
MetricsLevel = "TASK",
},
ParallelismConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs
{
AutoScalingEnabled = true,
ConfigurationType = "CUSTOM",
Parallelism = 10,
ParallelismPerKpu = 4,
},
},
},
Tags =
{
{ "Environment", "test" },
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Bucket("example", BucketArgs.builder()
.bucket("example-flink-application")
.build());
var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
.bucket(example.id())
.key("example-flink-application")
.source(new FileAsset("flink-app.jar"))
.build());
var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
.name("example-flink-application")
.runtimeEnvironment("FLINK-1_8")
.serviceExecutionRole(exampleAwsIamRole.arn())
.applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
.applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
.codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
.s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
.bucketArn(example.arn())
.fileKey(exampleBucketObjectv2.key())
.build())
.build())
.codeContentType("ZIPFILE")
.build())
.environmentProperties(ApplicationApplicationConfigurationEnvironmentPropertiesArgs.builder()
.propertyGroups(
ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
.propertyGroupId("PROPERTY-GROUP-1")
.propertyMap(Map.of("Key1", "Value1"))
.build(),
ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
.propertyGroupId("PROPERTY-GROUP-2")
.propertyMap(Map.ofEntries(
Map.entry("KeyA", "ValueA"),
Map.entry("KeyB", "ValueB")
))
.build())
.build())
.flinkApplicationConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs.builder()
.checkpointConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs.builder()
.configurationType("DEFAULT")
.build())
.monitoringConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs.builder()
.configurationType("CUSTOM")
.logLevel("DEBUG")
.metricsLevel("TASK")
.build())
.parallelismConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs.builder()
.autoScalingEnabled(true)
.configurationType("CUSTOM")
.parallelism(10)
.parallelismPerKpu(4)
.build())
.build())
.build())
.tags(Map.of("Environment", "test"))
.build());
}
}
resources:
example:
type: aws:s3:Bucket
properties:
bucket: example-flink-application
exampleBucketObjectv2:
type: aws:s3:BucketObjectv2
name: example
properties:
bucket: ${example.id}
key: example-flink-application
source:
fn::FileAsset: flink-app.jar
exampleApplication:
type: aws:kinesisanalyticsv2:Application
name: example
properties:
name: example-flink-application
runtimeEnvironment: FLINK-1_8
serviceExecutionRole: ${exampleAwsIamRole.arn}
applicationConfiguration:
applicationCodeConfiguration:
codeContent:
s3ContentLocation:
bucketArn: ${example.arn}
fileKey: ${exampleBucketObjectv2.key}
codeContentType: ZIPFILE
environmentProperties:
propertyGroups:
- propertyGroupId: PROPERTY-GROUP-1
propertyMap:
Key1: Value1
- propertyGroupId: PROPERTY-GROUP-2
propertyMap:
KeyA: ValueA
KeyB: ValueB
flinkApplicationConfiguration:
checkpointConfiguration:
configurationType: DEFAULT
monitoringConfiguration:
configurationType: CUSTOM
logLevel: DEBUG
metricsLevel: TASK
parallelismConfiguration:
autoScalingEnabled: true
configurationType: CUSTOM
parallelism: 10
parallelismPerKpu: 4
tags:
Environment: test
The runtimeEnvironment property specifies the Flink version. The applicationCodeConfiguration block points to your JAR file in S3 via s3ContentLocation. The flinkApplicationConfiguration controls checkpointing, parallelism, and monitoring. Environment properties pass configuration to your application code at runtime through propertyGroups.
Process streams with SQL queries
Teams familiar with SQL can analyze streaming data without learning a programming framework by writing SQL queries that read from Kinesis streams and write to destinations.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.cloudwatch.LogGroup("example", {name: "example-sql-application"});
const exampleLogStream = new aws.cloudwatch.LogStream("example", {
name: "example-sql-application",
logGroupName: example.name,
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
name: "example-sql-application",
runtimeEnvironment: "SQL-1_0",
serviceExecutionRole: exampleAwsIamRole.arn,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
textContent: "SELECT 1;\n",
},
codeContentType: "PLAINTEXT",
},
sqlApplicationConfiguration: {
input: {
namePrefix: "PREFIX_1",
inputParallelism: {
count: 3,
},
inputSchema: {
recordColumns: [
{
name: "COLUMN_1",
sqlType: "VARCHAR(8)",
mapping: "MAPPING-1",
},
{
name: "COLUMN_2",
sqlType: "DOUBLE",
},
],
recordEncoding: "UTF-8",
recordFormat: {
recordFormatType: "CSV",
mappingParameters: {
csvMappingParameters: {
recordColumnDelimiter: ",",
recordRowDelimiter: "\n",
},
},
},
},
kinesisStreamsInput: {
resourceArn: exampleAwsKinesisStream.arn,
},
},
outputs: [
{
name: "OUTPUT_1",
destinationSchema: {
recordFormatType: "JSON",
},
lambdaOutput: {
resourceArn: exampleAwsLambdaFunction.arn,
},
},
{
name: "OUTPUT_2",
destinationSchema: {
recordFormatType: "CSV",
},
kinesisFirehoseOutput: {
resourceArn: exampleAwsKinesisFirehoseDeliveryStream.arn,
},
},
],
referenceDataSource: {
tableName: "TABLE-1",
referenceSchema: {
recordColumns: [{
name: "COLUMN_1",
sqlType: "INTEGER",
}],
recordFormat: {
recordFormatType: "JSON",
mappingParameters: {
jsonMappingParameters: {
recordRowPath: "$",
},
},
},
},
s3ReferenceDataSource: {
bucketArn: exampleAwsS3Bucket.arn,
fileKey: "KEY-1",
},
},
},
},
cloudwatchLoggingOptions: {
logStreamArn: exampleLogStream.arn,
},
});
import pulumi
import pulumi_aws as aws
example = aws.cloudwatch.LogGroup("example", name="example-sql-application")
example_log_stream = aws.cloudwatch.LogStream("example",
name="example-sql-application",
log_group_name=example.name)
example_application = aws.kinesisanalyticsv2.Application("example",
name="example-sql-application",
runtime_environment="SQL-1_0",
service_execution_role=example_aws_iam_role["arn"],
application_configuration={
"application_code_configuration": {
"code_content": {
"text_content": "SELECT 1;\n",
},
"code_content_type": "PLAINTEXT",
},
"sql_application_configuration": {
"input": {
"name_prefix": "PREFIX_1",
"input_parallelism": {
"count": 3,
},
"input_schema": {
"record_columns": [
{
"name": "COLUMN_1",
"sql_type": "VARCHAR(8)",
"mapping": "MAPPING-1",
},
{
"name": "COLUMN_2",
"sql_type": "DOUBLE",
},
],
"record_encoding": "UTF-8",
"record_format": {
"record_format_type": "CSV",
"mapping_parameters": {
"csv_mapping_parameters": {
"record_column_delimiter": ",",
"record_row_delimiter": "\n",
},
},
},
},
"kinesis_streams_input": {
"resource_arn": example_aws_kinesis_stream["arn"],
},
},
"outputs": [
{
"name": "OUTPUT_1",
"destination_schema": {
"record_format_type": "JSON",
},
"lambda_output": {
"resource_arn": example_aws_lambda_function["arn"],
},
},
{
"name": "OUTPUT_2",
"destination_schema": {
"record_format_type": "CSV",
},
"kinesis_firehose_output": {
"resource_arn": example_aws_kinesis_firehose_delivery_stream["arn"],
},
},
],
"reference_data_source": {
"table_name": "TABLE-1",
"reference_schema": {
"record_columns": [{
"name": "COLUMN_1",
"sql_type": "INTEGER",
}],
"record_format": {
"record_format_type": "JSON",
"mapping_parameters": {
"json_mapping_parameters": {
"record_row_path": "$",
},
},
},
},
"s3_reference_data_source": {
"bucket_arn": example_aws_s3_bucket["arn"],
"file_key": "KEY-1",
},
},
},
},
cloudwatch_logging_options={
"log_stream_arn": example_log_stream.arn,
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
Name: pulumi.String("example-sql-application"),
})
if err != nil {
return err
}
exampleLogStream, err := cloudwatch.NewLogStream(ctx, "example", &cloudwatch.LogStreamArgs{
Name: pulumi.String("example-sql-application"),
LogGroupName: example.Name,
})
if err != nil {
return err
}
_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
Name: pulumi.String("example-sql-application"),
RuntimeEnvironment: pulumi.String("SQL-1_0"),
ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
TextContent: pulumi.String("SELECT 1;\n"),
},
CodeContentType: pulumi.String("PLAINTEXT"),
},
SqlApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs{
Input: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputTypeArgs{
NamePrefix: pulumi.String("PREFIX_1"),
InputParallelism: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs{
Count: pulumi.Int(3),
},
InputSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs{
RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
Name: pulumi.String("COLUMN_1"),
SqlType: pulumi.String("VARCHAR(8)"),
Mapping: pulumi.String("MAPPING-1"),
},
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
Name: pulumi.String("COLUMN_2"),
SqlType: pulumi.String("DOUBLE"),
},
},
RecordEncoding: pulumi.String("UTF-8"),
RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs{
RecordFormatType: pulumi.String("CSV"),
MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs{
CsvMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs{
RecordColumnDelimiter: pulumi.String(","),
RecordRowDelimiter: pulumi.String("\n"),
},
},
},
},
KinesisStreamsInput: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs{
ResourceArn: pulumi.Any(exampleAwsKinesisStream.Arn),
},
},
Outputs: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
Name: pulumi.String("OUTPUT_1"),
DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
RecordFormatType: pulumi.String("JSON"),
},
LambdaOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs{
ResourceArn: pulumi.Any(exampleAwsLambdaFunction.Arn),
},
},
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
Name: pulumi.String("OUTPUT_2"),
DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
RecordFormatType: pulumi.String("CSV"),
},
KinesisFirehoseOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs{
ResourceArn: pulumi.Any(exampleAwsKinesisFirehoseDeliveryStream.Arn),
},
},
},
ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs{
TableName: pulumi.String("TABLE-1"),
ReferenceSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs{
RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs{
Name: pulumi.String("COLUMN_1"),
SqlType: pulumi.String("INTEGER"),
},
},
RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs{
RecordFormatType: pulumi.String("JSON"),
MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs{
JsonMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs{
RecordRowPath: pulumi.String("$"),
},
},
},
},
S3ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs{
BucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
FileKey: pulumi.String("KEY-1"),
},
},
},
},
CloudwatchLoggingOptions: &kinesisanalyticsv2.ApplicationCloudwatchLoggingOptionsArgs{
LogStreamArn: exampleLogStream.Arn,
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.CloudWatch.LogGroup("example", new()
{
Name = "example-sql-application",
});
var exampleLogStream = new Aws.CloudWatch.LogStream("example", new()
{
Name = "example-sql-application",
LogGroupName = example.Name,
});
var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
{
Name = "example-sql-application",
RuntimeEnvironment = "SQL-1_0",
ServiceExecutionRole = exampleAwsIamRole.Arn,
ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
{
ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
{
CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
{
TextContent = @"SELECT 1;
",
},
CodeContentType = "PLAINTEXT",
},
SqlApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs
{
Input = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs
{
NamePrefix = "PREFIX_1",
InputParallelism = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs
{
Count = 3,
},
InputSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs
{
RecordColumns = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
{
Name = "COLUMN_1",
SqlType = "VARCHAR(8)",
Mapping = "MAPPING-1",
},
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
{
Name = "COLUMN_2",
SqlType = "DOUBLE",
},
},
RecordEncoding = "UTF-8",
RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs
{
RecordFormatType = "CSV",
MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs
{
CsvMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs
{
RecordColumnDelimiter = ",",
RecordRowDelimiter = @"
",
},
},
},
},
KinesisStreamsInput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs
{
ResourceArn = exampleAwsKinesisStream.Arn,
},
},
Outputs = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
{
Name = "OUTPUT_1",
DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
{
RecordFormatType = "JSON",
},
LambdaOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs
{
ResourceArn = exampleAwsLambdaFunction.Arn,
},
},
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
{
Name = "OUTPUT_2",
DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
{
RecordFormatType = "CSV",
},
KinesisFirehoseOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs
{
ResourceArn = exampleAwsKinesisFirehoseDeliveryStream.Arn,
},
},
},
ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs
{
TableName = "TABLE-1",
ReferenceSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs
{
RecordColumns = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs
{
Name = "COLUMN_1",
SqlType = "INTEGER",
},
},
RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs
{
RecordFormatType = "JSON",
MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs
{
JsonMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs
{
RecordRowPath = "$",
},
},
},
},
S3ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs
{
BucketArn = exampleAwsS3Bucket.Arn,
FileKey = "KEY-1",
},
},
},
},
CloudwatchLoggingOptions = new Aws.KinesisAnalyticsV2.Inputs.ApplicationCloudwatchLoggingOptionsArgs
{
LogStreamArn = exampleLogStream.Arn,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.cloudwatch.LogStream;
import com.pulumi.aws.cloudwatch.LogStreamArgs;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationCloudwatchLoggingOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new LogGroup("example", LogGroupArgs.builder()
.name("example-sql-application")
.build());
var exampleLogStream = new LogStream("exampleLogStream", LogStreamArgs.builder()
.name("example-sql-application")
.logGroupName(example.name())
.build());
var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
.name("example-sql-application")
.runtimeEnvironment("SQL-1_0")
.serviceExecutionRole(exampleAwsIamRole.arn())
.applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
.applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
.codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
.textContent("""
SELECT 1;
""")
.build())
.codeContentType("PLAINTEXT")
.build())
.sqlApplicationConfiguration(ApplicationApplicationConfigurationSqlApplicationConfigurationArgs.builder()
.input(ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs.builder()
.namePrefix("PREFIX_1")
.inputParallelism(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs.builder()
.count(3)
.build())
.inputSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs.builder()
.recordColumns(
ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
.name("COLUMN_1")
.sqlType("VARCHAR(8)")
.mapping("MAPPING-1")
.build(),
ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
.name("COLUMN_2")
.sqlType("DOUBLE")
.build())
.recordEncoding("UTF-8")
.recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs.builder()
.recordFormatType("CSV")
.mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs.builder()
.csvMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs.builder()
.recordColumnDelimiter(",")
.recordRowDelimiter("""
""")
.build())
.build())
.build())
.build())
.kinesisStreamsInput(ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs.builder()
.resourceArn(exampleAwsKinesisStream.arn())
.build())
.build())
.outputs(
ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
.name("OUTPUT_1")
.destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
.recordFormatType("JSON")
.build())
.lambdaOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs.builder()
.resourceArn(exampleAwsLambdaFunction.arn())
.build())
.build(),
ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
.name("OUTPUT_2")
.destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
.recordFormatType("CSV")
.build())
.kinesisFirehoseOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs.builder()
.resourceArn(exampleAwsKinesisFirehoseDeliveryStream.arn())
.build())
.build())
.referenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs.builder()
.tableName("TABLE-1")
.referenceSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs.builder()
.recordColumns(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs.builder()
.name("COLUMN_1")
.sqlType("INTEGER")
.build())
.recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs.builder()
.recordFormatType("JSON")
.mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs.builder()
.jsonMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs.builder()
.recordRowPath("$")
.build())
.build())
.build())
.build())
.s3ReferenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs.builder()
.bucketArn(exampleAwsS3Bucket.arn())
.fileKey("KEY-1")
.build())
.build())
.build())
.build())
.cloudwatchLoggingOptions(ApplicationCloudwatchLoggingOptionsArgs.builder()
.logStreamArn(exampleLogStream.arn())
.build())
.build());
}
}
resources:
example:
type: aws:cloudwatch:LogGroup
properties:
name: example-sql-application
exampleLogStream:
type: aws:cloudwatch:LogStream
name: example
properties:
name: example-sql-application
logGroupName: ${example.name}
exampleApplication:
type: aws:kinesisanalyticsv2:Application
name: example
properties:
name: example-sql-application
runtimeEnvironment: SQL-1_0
serviceExecutionRole: ${exampleAwsIamRole.arn}
applicationConfiguration:
applicationCodeConfiguration:
codeContent:
textContent: |
SELECT 1;
codeContentType: PLAINTEXT
sqlApplicationConfiguration:
input:
namePrefix: PREFIX_1
inputParallelism:
count: 3
inputSchema:
recordColumns:
- name: COLUMN_1
sqlType: VARCHAR(8)
mapping: MAPPING-1
- name: COLUMN_2
sqlType: DOUBLE
recordEncoding: UTF-8
recordFormat:
recordFormatType: CSV
mappingParameters:
csvMappingParameters:
recordColumnDelimiter: ','
recordRowDelimiter: |2+
kinesisStreamsInput:
resourceArn: ${exampleAwsKinesisStream.arn}
outputs:
- name: OUTPUT_1
destinationSchema:
recordFormatType: JSON
lambdaOutput:
resourceArn: ${exampleAwsLambdaFunction.arn}
- name: OUTPUT_2
destinationSchema:
recordFormatType: CSV
kinesisFirehoseOutput:
resourceArn: ${exampleAwsKinesisFirehoseDeliveryStream.arn}
referenceDataSource:
tableName: TABLE-1
referenceSchema:
recordColumns:
- name: COLUMN_1
sqlType: INTEGER
recordFormat:
recordFormatType: JSON
mappingParameters:
jsonMappingParameters:
recordRowPath: $
s3ReferenceDataSource:
bucketArn: ${exampleAwsS3Bucket.arn}
fileKey: KEY-1
cloudwatchLoggingOptions:
logStreamArn: ${exampleLogStream.arn}
The sqlApplicationConfiguration defines inputs, outputs, and reference data. The input block specifies a Kinesis stream source with an inputSchema that describes record structure. Outputs route transformed data to Lambda functions or Firehose delivery streams. The referenceDataSource joins streaming data with static lookup tables stored in S3.
Connect to private VPC resources
Applications that need to access RDS databases, ElastiCache clusters, or internal APIs require VPC configuration to join private subnets.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
bucket: example.id,
key: "example-flink-application",
source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
name: "example-flink-application",
runtimeEnvironment: "FLINK-1_8",
serviceExecutionRole: exampleAwsIamRole.arn,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: example.arn,
fileKey: exampleBucketObjectv2.key,
},
},
codeContentType: "ZIPFILE",
},
vpcConfiguration: {
securityGroupIds: [
exampleAwsSecurityGroup[0].id,
exampleAwsSecurityGroup[1].id,
],
subnetIds: [exampleAwsSubnet.id],
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
bucket=example.id,
key="example-flink-application",
source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
name="example-flink-application",
runtime_environment="FLINK-1_8",
service_execution_role=example_aws_iam_role["arn"],
application_configuration={
"application_code_configuration": {
"code_content": {
"s3_content_location": {
"bucket_arn": example.arn,
"file_key": example_bucket_objectv2.key,
},
},
"code_content_type": "ZIPFILE",
},
"vpc_configuration": {
"security_group_ids": [
example_aws_security_group[0]["id"],
example_aws_security_group[1]["id"],
],
"subnet_ids": [example_aws_subnet["id"]],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example-flink-application"),
})
if err != nil {
return err
}
exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
Bucket: example.ID(),
Key: pulumi.String("example-flink-application"),
Source: pulumi.NewFileAsset("flink-app.jar"),
})
if err != nil {
return err
}
_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
Name: pulumi.String("example-flink-application"),
RuntimeEnvironment: pulumi.String("FLINK-1_8"),
ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
BucketArn: example.Arn,
FileKey: exampleBucketObjectv2.Key,
},
},
CodeContentType: pulumi.String("ZIPFILE"),
},
VpcConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationVpcConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup[0].Id,
exampleAwsSecurityGroup[1].Id,
},
SubnetIds: pulumi.StringArray{
exampleAwsSubnet.Id,
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.S3.Bucket("example", new()
{
BucketName = "example-flink-application",
});
var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
{
Bucket = example.Id,
Key = "example-flink-application",
Source = new FileAsset("flink-app.jar"),
});
var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
{
Name = "example-flink-application",
RuntimeEnvironment = "FLINK-1_8",
ServiceExecutionRole = exampleAwsIamRole.Arn,
ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
{
ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
{
CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
{
S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
{
BucketArn = example.Arn,
FileKey = exampleBucketObjectv2.Key,
},
},
CodeContentType = "ZIPFILE",
},
VpcConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationVpcConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup[0].Id,
exampleAwsSecurityGroup[1].Id,
},
SubnetIds = new[]
{
exampleAwsSubnet.Id,
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationVpcConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Bucket("example", BucketArgs.builder()
.bucket("example-flink-application")
.build());
var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
.bucket(example.id())
.key("example-flink-application")
.source(new FileAsset("flink-app.jar"))
.build());
var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
.name("example-flink-application")
.runtimeEnvironment("FLINK-1_8")
.serviceExecutionRole(exampleAwsIamRole.arn())
.applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
.applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
.codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
.s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
.bucketArn(example.arn())
.fileKey(exampleBucketObjectv2.key())
.build())
.build())
.codeContentType("ZIPFILE")
.build())
.vpcConfiguration(ApplicationApplicationConfigurationVpcConfigurationArgs.builder()
.securityGroupIds(
exampleAwsSecurityGroup[0].id(),
exampleAwsSecurityGroup[1].id())
.subnetIds(exampleAwsSubnet.id())
.build())
.build())
.build());
}
}
resources:
example:
type: aws:s3:Bucket
properties:
bucket: example-flink-application
exampleBucketObjectv2:
type: aws:s3:BucketObjectv2
name: example
properties:
bucket: ${example.id}
key: example-flink-application
source:
fn::FileAsset: flink-app.jar
exampleApplication:
type: aws:kinesisanalyticsv2:Application
name: example
properties:
name: example-flink-application
runtimeEnvironment: FLINK-1_8
serviceExecutionRole: ${exampleAwsIamRole.arn}
applicationConfiguration:
applicationCodeConfiguration:
codeContent:
s3ContentLocation:
bucketArn: ${example.arn}
fileKey: ${exampleBucketObjectv2.key}
codeContentType: ZIPFILE
vpcConfiguration:
securityGroupIds:
- ${exampleAwsSecurityGroup[0].id}
- ${exampleAwsSecurityGroup[1].id}
subnetIds:
- ${exampleAwsSubnet.id}
The vpcConfiguration property places your application in specified subnets with attached security groups, enabling access to private resources. Kinesis Data Analytics manages the network interfaces automatically. Your execution role needs EC2 permissions to create and manage these interfaces.
Beyond these examples
These snippets focus on specific application-level features: Flink and SQL runtime environments, stream inputs and multiple output destinations, and VPC networking for private resource access. They’re intentionally minimal rather than full stream processing pipelines.
The examples rely on pre-existing infrastructure such as IAM execution roles with appropriate permissions, S3 buckets for application code and reference data, Kinesis streams, Lambda functions, Firehose delivery streams, and VPC subnets and security groups for the VPC example. They focus on configuring the application rather than provisioning everything around it.
To keep things focused, common application patterns are omitted, including:
- Application lifecycle (startApplication, forceStop)
- Monitoring configuration (metricsLevel, logLevel tuning)
- Snapshot and restore configuration
- Auto-scaling parameters beyond basic parallelism
These omissions are intentional: the goal is to illustrate how each application feature is wired, not provide drop-in stream processing modules. See the Kinesis Analytics v2 Application resource reference for all available configuration options.
Let's deploy AWS Kinesis Data Analytics Applications
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Application Types & Console Visibility
aws.kinesisanalyticsv2.Application cannot be viewed in the AWS Console. Use aws.kinesis.AnalyticsApplication instead if you need console visibility.STREAMING and INTERACTIVE. The applicationMode property is immutable and cannot be changed after creation.Configuration & Deployment
SQL-1_0, FLINK-1_6, FLINK-1_8, FLINK-1_11, FLINK-1_13, FLINK-1_15, FLINK-1_18, FLINK-1_19, FLINK-1_20.applicationCodeConfiguration.codeContent.s3ContentLocation with bucketArn and fileKey pointing to your JAR file in S3, and set codeContentType to ZIPFILE.applicationCodeConfiguration.codeContent.textContent with your SQL code and set codeContentType to PLAINTEXT.serviceExecutionRole with an IAM role ARN that grants access to Kinesis data streams, Kinesis Data Firehose delivery streams, Amazon S3 objects, and other external resources your application uses.Immutability & Updates
applicationMode, name, and description properties are immutable. Changing them requires replacing the entire application resource.versionId each time you update the application configuration.Networking & VPC
vpcConfiguration with securityGroupIds and subnetIds to connect your application to VPC resources.Lifecycle Management
startApplication to true to start the application or false to stop it. For unresponsive Flink applications, use forceStop to force termination.Using a different cloud?
Explore analytics guides for other cloud providers: