The aws:kinesisanalyticsv2/application:Application resource, part of the Pulumi AWS provider, defines Kinesis Data Analytics applications that run Apache Flink or SQL-based stream processing jobs. This guide focuses on three capabilities: Flink JAR deployment from S3, SQL-based stream transformation, and VPC networking for private resource access.
Applications require IAM execution roles with permissions to read from sources and write to destinations. They reference S3 buckets for code artifacts, Kinesis streams for input, and optionally VPC infrastructure for private resource access. The examples are intentionally small. Combine them with your own IAM roles, streams, and networking.
Deploy a Flink application from S3
Stream processing workloads often use Apache Flink to transform data in real time without managing servers or clusters.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
bucket: example.id,
key: "example-flink-application",
source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
name: "example-flink-application",
runtimeEnvironment: "FLINK-1_8",
serviceExecutionRole: exampleAwsIamRole.arn,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: example.arn,
fileKey: exampleBucketObjectv2.key,
},
},
codeContentType: "ZIPFILE",
},
environmentProperties: {
propertyGroups: [
{
propertyGroupId: "PROPERTY-GROUP-1",
propertyMap: {
Key1: "Value1",
},
},
{
propertyGroupId: "PROPERTY-GROUP-2",
propertyMap: {
KeyA: "ValueA",
KeyB: "ValueB",
},
},
],
},
flinkApplicationConfiguration: {
checkpointConfiguration: {
configurationType: "DEFAULT",
},
monitoringConfiguration: {
configurationType: "CUSTOM",
logLevel: "DEBUG",
metricsLevel: "TASK",
},
parallelismConfiguration: {
autoScalingEnabled: true,
configurationType: "CUSTOM",
parallelism: 10,
parallelismPerKpu: 4,
},
},
},
tags: {
Environment: "test",
},
});
import pulumi
import pulumi_aws as aws
example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
bucket=example.id,
key="example-flink-application",
source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
name="example-flink-application",
runtime_environment="FLINK-1_8",
service_execution_role=example_aws_iam_role["arn"],
application_configuration={
"application_code_configuration": {
"code_content": {
"s3_content_location": {
"bucket_arn": example.arn,
"file_key": example_bucket_objectv2.key,
},
},
"code_content_type": "ZIPFILE",
},
"environment_properties": {
"property_groups": [
{
"property_group_id": "PROPERTY-GROUP-1",
"property_map": {
"Key1": "Value1",
},
},
{
"property_group_id": "PROPERTY-GROUP-2",
"property_map": {
"KeyA": "ValueA",
"KeyB": "ValueB",
},
},
],
},
"flink_application_configuration": {
"checkpoint_configuration": {
"configuration_type": "DEFAULT",
},
"monitoring_configuration": {
"configuration_type": "CUSTOM",
"log_level": "DEBUG",
"metrics_level": "TASK",
},
"parallelism_configuration": {
"auto_scaling_enabled": True,
"configuration_type": "CUSTOM",
"parallelism": 10,
"parallelism_per_kpu": 4,
},
},
},
tags={
"Environment": "test",
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example-flink-application"),
})
if err != nil {
return err
}
exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
Bucket: example.ID(),
Key: pulumi.String("example-flink-application"),
Source: pulumi.NewFileAsset("flink-app.jar"),
})
if err != nil {
return err
}
_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
Name: pulumi.String("example-flink-application"),
RuntimeEnvironment: pulumi.String("FLINK-1_8"),
ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
BucketArn: example.Arn,
FileKey: exampleBucketObjectv2.Key,
},
},
CodeContentType: pulumi.String("ZIPFILE"),
},
EnvironmentProperties: &kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesArgs{
PropertyGroups: kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
PropertyGroupId: pulumi.String("PROPERTY-GROUP-1"),
PropertyMap: pulumi.StringMap{
"Key1": pulumi.String("Value1"),
},
},
&kinesisanalyticsv2.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs{
PropertyGroupId: pulumi.String("PROPERTY-GROUP-2"),
PropertyMap: pulumi.StringMap{
"KeyA": pulumi.String("ValueA"),
"KeyB": pulumi.String("ValueB"),
},
},
},
},
FlinkApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs{
CheckpointConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs{
ConfigurationType: pulumi.String("DEFAULT"),
},
MonitoringConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs{
ConfigurationType: pulumi.String("CUSTOM"),
LogLevel: pulumi.String("DEBUG"),
MetricsLevel: pulumi.String("TASK"),
},
ParallelismConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs{
AutoScalingEnabled: pulumi.Bool(true),
ConfigurationType: pulumi.String("CUSTOM"),
Parallelism: pulumi.Int(10),
ParallelismPerKpu: pulumi.Int(4),
},
},
},
Tags: pulumi.StringMap{
"Environment": pulumi.String("test"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.S3.Bucket("example", new()
{
BucketName = "example-flink-application",
});
var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
{
Bucket = example.Id,
Key = "example-flink-application",
Source = new FileAsset("flink-app.jar"),
});
var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
{
Name = "example-flink-application",
RuntimeEnvironment = "FLINK-1_8",
ServiceExecutionRole = exampleAwsIamRole.Arn,
ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
{
ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
{
CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
{
S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
{
BucketArn = example.Arn,
FileKey = exampleBucketObjectv2.Key,
},
},
CodeContentType = "ZIPFILE",
},
EnvironmentProperties = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs
{
PropertyGroups = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
{
PropertyGroupId = "PROPERTY-GROUP-1",
PropertyMap =
{
{ "Key1", "Value1" },
},
},
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs
{
PropertyGroupId = "PROPERTY-GROUP-2",
PropertyMap =
{
{ "KeyA", "ValueA" },
{ "KeyB", "ValueB" },
},
},
},
},
FlinkApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs
{
CheckpointConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs
{
ConfigurationType = "DEFAULT",
},
MonitoringConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs
{
ConfigurationType = "CUSTOM",
LogLevel = "DEBUG",
MetricsLevel = "TASK",
},
ParallelismConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs
{
AutoScalingEnabled = true,
ConfigurationType = "CUSTOM",
Parallelism = 10,
ParallelismPerKpu = 4,
},
},
},
Tags =
{
{ "Environment", "test" },
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationEnvironmentPropertiesArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Bucket("example", BucketArgs.builder()
.bucket("example-flink-application")
.build());
var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
.bucket(example.id())
.key("example-flink-application")
.source(new FileAsset("flink-app.jar"))
.build());
var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
.name("example-flink-application")
.runtimeEnvironment("FLINK-1_8")
.serviceExecutionRole(exampleAwsIamRole.arn())
.applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
.applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
.codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
.s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
.bucketArn(example.arn())
.fileKey(exampleBucketObjectv2.key())
.build())
.build())
.codeContentType("ZIPFILE")
.build())
.environmentProperties(ApplicationApplicationConfigurationEnvironmentPropertiesArgs.builder()
.propertyGroups(
ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
.propertyGroupId("PROPERTY-GROUP-1")
.propertyMap(Map.of("Key1", "Value1"))
.build(),
ApplicationApplicationConfigurationEnvironmentPropertiesPropertyGroupArgs.builder()
.propertyGroupId("PROPERTY-GROUP-2")
.propertyMap(Map.ofEntries(
Map.entry("KeyA", "ValueA"),
Map.entry("KeyB", "ValueB")
))
.build())
.build())
.flinkApplicationConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationArgs.builder()
.checkpointConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationCheckpointConfigurationArgs.builder()
.configurationType("DEFAULT")
.build())
.monitoringConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationMonitoringConfigurationArgs.builder()
.configurationType("CUSTOM")
.logLevel("DEBUG")
.metricsLevel("TASK")
.build())
.parallelismConfiguration(ApplicationApplicationConfigurationFlinkApplicationConfigurationParallelismConfigurationArgs.builder()
.autoScalingEnabled(true)
.configurationType("CUSTOM")
.parallelism(10)
.parallelismPerKpu(4)
.build())
.build())
.build())
.tags(Map.of("Environment", "test"))
.build());
}
}
resources:
example:
type: aws:s3:Bucket
properties:
bucket: example-flink-application
exampleBucketObjectv2:
type: aws:s3:BucketObjectv2
name: example
properties:
bucket: ${example.id}
key: example-flink-application
source:
fn::FileAsset: flink-app.jar
exampleApplication:
type: aws:kinesisanalyticsv2:Application
name: example
properties:
name: example-flink-application
runtimeEnvironment: FLINK-1_8
serviceExecutionRole: ${exampleAwsIamRole.arn}
applicationConfiguration:
applicationCodeConfiguration:
codeContent:
s3ContentLocation:
bucketArn: ${example.arn}
fileKey: ${exampleBucketObjectv2.key}
codeContentType: ZIPFILE
environmentProperties:
propertyGroups:
- propertyGroupId: PROPERTY-GROUP-1
propertyMap:
Key1: Value1
- propertyGroupId: PROPERTY-GROUP-2
propertyMap:
KeyA: ValueA
KeyB: ValueB
flinkApplicationConfiguration:
checkpointConfiguration:
configurationType: DEFAULT
monitoringConfiguration:
configurationType: CUSTOM
logLevel: DEBUG
metricsLevel: TASK
parallelismConfiguration:
autoScalingEnabled: true
configurationType: CUSTOM
parallelism: 10
parallelismPerKpu: 4
tags:
Environment: test
The runtimeEnvironment specifies the Flink version. The applicationCodeConfiguration points to your JAR in S3 via s3ContentLocation. The flinkApplicationConfiguration controls runtime behavior: parallelismConfiguration sets how many parallel tasks run, monitoringConfiguration defines logging verbosity, and environmentProperties pass configuration to your application code.
Process streams with SQL queries
Teams familiar with SQL can analyze streaming data without learning a programming framework. SQL applications read from Kinesis streams, apply transformations, and write to multiple destinations.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.cloudwatch.LogGroup("example", {name: "example-sql-application"});
const exampleLogStream = new aws.cloudwatch.LogStream("example", {
name: "example-sql-application",
logGroupName: example.name,
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
name: "example-sql-application",
runtimeEnvironment: "SQL-1_0",
serviceExecutionRole: exampleAwsIamRole.arn,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
textContent: "SELECT 1;\n",
},
codeContentType: "PLAINTEXT",
},
sqlApplicationConfiguration: {
input: {
namePrefix: "PREFIX_1",
inputParallelism: {
count: 3,
},
inputSchema: {
recordColumns: [
{
name: "COLUMN_1",
sqlType: "VARCHAR(8)",
mapping: "MAPPING-1",
},
{
name: "COLUMN_2",
sqlType: "DOUBLE",
},
],
recordEncoding: "UTF-8",
recordFormat: {
recordFormatType: "CSV",
mappingParameters: {
csvMappingParameters: {
recordColumnDelimiter: ",",
recordRowDelimiter: "\n",
},
},
},
},
kinesisStreamsInput: {
resourceArn: exampleAwsKinesisStream.arn,
},
},
outputs: [
{
name: "OUTPUT_1",
destinationSchema: {
recordFormatType: "JSON",
},
lambdaOutput: {
resourceArn: exampleAwsLambdaFunction.arn,
},
},
{
name: "OUTPUT_2",
destinationSchema: {
recordFormatType: "CSV",
},
kinesisFirehoseOutput: {
resourceArn: exampleAwsKinesisFirehoseDeliveryStream.arn,
},
},
],
referenceDataSource: {
tableName: "TABLE-1",
referenceSchema: {
recordColumns: [{
name: "COLUMN_1",
sqlType: "INTEGER",
}],
recordFormat: {
recordFormatType: "JSON",
mappingParameters: {
jsonMappingParameters: {
recordRowPath: "$",
},
},
},
},
s3ReferenceDataSource: {
bucketArn: exampleAwsS3Bucket.arn,
fileKey: "KEY-1",
},
},
},
},
cloudwatchLoggingOptions: {
logStreamArn: exampleLogStream.arn,
},
});
import pulumi
import pulumi_aws as aws
example = aws.cloudwatch.LogGroup("example", name="example-sql-application")
example_log_stream = aws.cloudwatch.LogStream("example",
name="example-sql-application",
log_group_name=example.name)
example_application = aws.kinesisanalyticsv2.Application("example",
name="example-sql-application",
runtime_environment="SQL-1_0",
service_execution_role=example_aws_iam_role["arn"],
application_configuration={
"application_code_configuration": {
"code_content": {
"text_content": "SELECT 1;\n",
},
"code_content_type": "PLAINTEXT",
},
"sql_application_configuration": {
"input": {
"name_prefix": "PREFIX_1",
"input_parallelism": {
"count": 3,
},
"input_schema": {
"record_columns": [
{
"name": "COLUMN_1",
"sql_type": "VARCHAR(8)",
"mapping": "MAPPING-1",
},
{
"name": "COLUMN_2",
"sql_type": "DOUBLE",
},
],
"record_encoding": "UTF-8",
"record_format": {
"record_format_type": "CSV",
"mapping_parameters": {
"csv_mapping_parameters": {
"record_column_delimiter": ",",
"record_row_delimiter": "\n",
},
},
},
},
"kinesis_streams_input": {
"resource_arn": example_aws_kinesis_stream["arn"],
},
},
"outputs": [
{
"name": "OUTPUT_1",
"destination_schema": {
"record_format_type": "JSON",
},
"lambda_output": {
"resource_arn": example_aws_lambda_function["arn"],
},
},
{
"name": "OUTPUT_2",
"destination_schema": {
"record_format_type": "CSV",
},
"kinesis_firehose_output": {
"resource_arn": example_aws_kinesis_firehose_delivery_stream["arn"],
},
},
],
"reference_data_source": {
"table_name": "TABLE-1",
"reference_schema": {
"record_columns": [{
"name": "COLUMN_1",
"sql_type": "INTEGER",
}],
"record_format": {
"record_format_type": "JSON",
"mapping_parameters": {
"json_mapping_parameters": {
"record_row_path": "$",
},
},
},
},
"s3_reference_data_source": {
"bucket_arn": example_aws_s3_bucket["arn"],
"file_key": "KEY-1",
},
},
},
},
cloudwatch_logging_options={
"log_stream_arn": example_log_stream.arn,
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/cloudwatch"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := cloudwatch.NewLogGroup(ctx, "example", &cloudwatch.LogGroupArgs{
Name: pulumi.String("example-sql-application"),
})
if err != nil {
return err
}
exampleLogStream, err := cloudwatch.NewLogStream(ctx, "example", &cloudwatch.LogStreamArgs{
Name: pulumi.String("example-sql-application"),
LogGroupName: example.Name,
})
if err != nil {
return err
}
_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
Name: pulumi.String("example-sql-application"),
RuntimeEnvironment: pulumi.String("SQL-1_0"),
ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
TextContent: pulumi.String("SELECT 1;\n"),
},
CodeContentType: pulumi.String("PLAINTEXT"),
},
SqlApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs{
Input: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputTypeArgs{
NamePrefix: pulumi.String("PREFIX_1"),
InputParallelism: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs{
Count: pulumi.Int(3),
},
InputSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs{
RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
Name: pulumi.String("COLUMN_1"),
SqlType: pulumi.String("VARCHAR(8)"),
Mapping: pulumi.String("MAPPING-1"),
},
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs{
Name: pulumi.String("COLUMN_2"),
SqlType: pulumi.String("DOUBLE"),
},
},
RecordEncoding: pulumi.String("UTF-8"),
RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs{
RecordFormatType: pulumi.String("CSV"),
MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs{
CsvMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs{
RecordColumnDelimiter: pulumi.String(","),
RecordRowDelimiter: pulumi.String("\n"),
},
},
},
},
KinesisStreamsInput: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs{
ResourceArn: pulumi.Any(exampleAwsKinesisStream.Arn),
},
},
Outputs: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
Name: pulumi.String("OUTPUT_1"),
DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
RecordFormatType: pulumi.String("JSON"),
},
LambdaOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs{
ResourceArn: pulumi.Any(exampleAwsLambdaFunction.Arn),
},
},
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputTypeArgs{
Name: pulumi.String("OUTPUT_2"),
DestinationSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs{
RecordFormatType: pulumi.String("CSV"),
},
KinesisFirehoseOutput: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs{
ResourceArn: pulumi.Any(exampleAwsKinesisFirehoseDeliveryStream.Arn),
},
},
},
ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs{
TableName: pulumi.String("TABLE-1"),
ReferenceSchema: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs{
RecordColumns: kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArray{
&kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs{
Name: pulumi.String("COLUMN_1"),
SqlType: pulumi.String("INTEGER"),
},
},
RecordFormat: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs{
RecordFormatType: pulumi.String("JSON"),
MappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs{
JsonMappingParameters: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs{
RecordRowPath: pulumi.String("$"),
},
},
},
},
S3ReferenceDataSource: &kinesisanalyticsv2.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs{
BucketArn: pulumi.Any(exampleAwsS3Bucket.Arn),
FileKey: pulumi.String("KEY-1"),
},
},
},
},
CloudwatchLoggingOptions: &kinesisanalyticsv2.ApplicationCloudwatchLoggingOptionsArgs{
LogStreamArn: exampleLogStream.Arn,
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.CloudWatch.LogGroup("example", new()
{
Name = "example-sql-application",
});
var exampleLogStream = new Aws.CloudWatch.LogStream("example", new()
{
Name = "example-sql-application",
LogGroupName = example.Name,
});
var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
{
Name = "example-sql-application",
RuntimeEnvironment = "SQL-1_0",
ServiceExecutionRole = exampleAwsIamRole.Arn,
ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
{
ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
{
CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
{
TextContent = @"SELECT 1;
",
},
CodeContentType = "PLAINTEXT",
},
SqlApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs
{
Input = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs
{
NamePrefix = "PREFIX_1",
InputParallelism = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs
{
Count = 3,
},
InputSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs
{
RecordColumns = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
{
Name = "COLUMN_1",
SqlType = "VARCHAR(8)",
Mapping = "MAPPING-1",
},
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs
{
Name = "COLUMN_2",
SqlType = "DOUBLE",
},
},
RecordEncoding = "UTF-8",
RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs
{
RecordFormatType = "CSV",
MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs
{
CsvMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs
{
RecordColumnDelimiter = ",",
RecordRowDelimiter = @"
",
},
},
},
},
KinesisStreamsInput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs
{
ResourceArn = exampleAwsKinesisStream.Arn,
},
},
Outputs = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
{
Name = "OUTPUT_1",
DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
{
RecordFormatType = "JSON",
},
LambdaOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs
{
ResourceArn = exampleAwsLambdaFunction.Arn,
},
},
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs
{
Name = "OUTPUT_2",
DestinationSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs
{
RecordFormatType = "CSV",
},
KinesisFirehoseOutput = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs
{
ResourceArn = exampleAwsKinesisFirehoseDeliveryStream.Arn,
},
},
},
ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs
{
TableName = "TABLE-1",
ReferenceSchema = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs
{
RecordColumns = new[]
{
new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs
{
Name = "COLUMN_1",
SqlType = "INTEGER",
},
},
RecordFormat = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs
{
RecordFormatType = "JSON",
MappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs
{
JsonMappingParameters = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs
{
RecordRowPath = "$",
},
},
},
},
S3ReferenceDataSource = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs
{
BucketArn = exampleAwsS3Bucket.Arn,
FileKey = "KEY-1",
},
},
},
},
CloudwatchLoggingOptions = new Aws.KinesisAnalyticsV2.Inputs.ApplicationCloudwatchLoggingOptionsArgs
{
LogStreamArn = exampleLogStream.Arn,
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.cloudwatch.LogGroup;
import com.pulumi.aws.cloudwatch.LogGroupArgs;
import com.pulumi.aws.cloudwatch.LogStream;
import com.pulumi.aws.cloudwatch.LogStreamArgs;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationCloudwatchLoggingOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new LogGroup("example", LogGroupArgs.builder()
.name("example-sql-application")
.build());
var exampleLogStream = new LogStream("exampleLogStream", LogStreamArgs.builder()
.name("example-sql-application")
.logGroupName(example.name())
.build());
var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
.name("example-sql-application")
.runtimeEnvironment("SQL-1_0")
.serviceExecutionRole(exampleAwsIamRole.arn())
.applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
.applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
.codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
.textContent("""
SELECT 1;
""")
.build())
.codeContentType("PLAINTEXT")
.build())
.sqlApplicationConfiguration(ApplicationApplicationConfigurationSqlApplicationConfigurationArgs.builder()
.input(ApplicationApplicationConfigurationSqlApplicationConfigurationInputArgs.builder()
.namePrefix("PREFIX_1")
.inputParallelism(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputParallelismArgs.builder()
.count(3)
.build())
.inputSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaArgs.builder()
.recordColumns(
ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
.name("COLUMN_1")
.sqlType("VARCHAR(8)")
.mapping("MAPPING-1")
.build(),
ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordColumnArgs.builder()
.name("COLUMN_2")
.sqlType("DOUBLE")
.build())
.recordEncoding("UTF-8")
.recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatArgs.builder()
.recordFormatType("CSV")
.mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersArgs.builder()
.csvMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationInputInputSchemaRecordFormatMappingParametersCsvMappingParametersArgs.builder()
.recordColumnDelimiter(",")
.recordRowDelimiter("""
""")
.build())
.build())
.build())
.build())
.kinesisStreamsInput(ApplicationApplicationConfigurationSqlApplicationConfigurationInputKinesisStreamsInputArgs.builder()
.resourceArn(exampleAwsKinesisStream.arn())
.build())
.build())
.outputs(
ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
.name("OUTPUT_1")
.destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
.recordFormatType("JSON")
.build())
.lambdaOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputLambdaOutputArgs.builder()
.resourceArn(exampleAwsLambdaFunction.arn())
.build())
.build(),
ApplicationApplicationConfigurationSqlApplicationConfigurationOutputArgs.builder()
.name("OUTPUT_2")
.destinationSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputDestinationSchemaArgs.builder()
.recordFormatType("CSV")
.build())
.kinesisFirehoseOutput(ApplicationApplicationConfigurationSqlApplicationConfigurationOutputKinesisFirehoseOutputArgs.builder()
.resourceArn(exampleAwsKinesisFirehoseDeliveryStream.arn())
.build())
.build())
.referenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceArgs.builder()
.tableName("TABLE-1")
.referenceSchema(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaArgs.builder()
.recordColumns(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordColumnArgs.builder()
.name("COLUMN_1")
.sqlType("INTEGER")
.build())
.recordFormat(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatArgs.builder()
.recordFormatType("JSON")
.mappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersArgs.builder()
.jsonMappingParameters(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceReferenceSchemaRecordFormatMappingParametersJsonMappingParametersArgs.builder()
.recordRowPath("$")
.build())
.build())
.build())
.build())
.s3ReferenceDataSource(ApplicationApplicationConfigurationSqlApplicationConfigurationReferenceDataSourceS3ReferenceDataSourceArgs.builder()
.bucketArn(exampleAwsS3Bucket.arn())
.fileKey("KEY-1")
.build())
.build())
.build())
.build())
.cloudwatchLoggingOptions(ApplicationCloudwatchLoggingOptionsArgs.builder()
.logStreamArn(exampleLogStream.arn())
.build())
.build());
}
}
resources:
example:
type: aws:cloudwatch:LogGroup
properties:
name: example-sql-application
exampleLogStream:
type: aws:cloudwatch:LogStream
name: example
properties:
name: example-sql-application
logGroupName: ${example.name}
exampleApplication:
type: aws:kinesisanalyticsv2:Application
name: example
properties:
name: example-sql-application
runtimeEnvironment: SQL-1_0
serviceExecutionRole: ${exampleAwsIamRole.arn}
applicationConfiguration:
applicationCodeConfiguration:
codeContent:
textContent: |
SELECT 1;
codeContentType: PLAINTEXT
sqlApplicationConfiguration:
input:
namePrefix: PREFIX_1
inputParallelism:
count: 3
inputSchema:
recordColumns:
- name: COLUMN_1
sqlType: VARCHAR(8)
mapping: MAPPING-1
- name: COLUMN_2
sqlType: DOUBLE
recordEncoding: UTF-8
recordFormat:
recordFormatType: CSV
mappingParameters:
csvMappingParameters:
recordColumnDelimiter: ','
recordRowDelimiter: |2+
kinesisStreamsInput:
resourceArn: ${exampleAwsKinesisStream.arn}
outputs:
- name: OUTPUT_1
destinationSchema:
recordFormatType: JSON
lambdaOutput:
resourceArn: ${exampleAwsLambdaFunction.arn}
- name: OUTPUT_2
destinationSchema:
recordFormatType: CSV
kinesisFirehoseOutput:
resourceArn: ${exampleAwsKinesisFirehoseDeliveryStream.arn}
referenceDataSource:
tableName: TABLE-1
referenceSchema:
recordColumns:
- name: COLUMN_1
sqlType: INTEGER
recordFormat:
recordFormatType: JSON
mappingParameters:
jsonMappingParameters:
recordRowPath: $
s3ReferenceDataSource:
bucketArn: ${exampleAwsS3Bucket.arn}
fileKey: KEY-1
cloudwatchLoggingOptions:
logStreamArn: ${exampleLogStream.arn}
The sqlApplicationConfiguration defines the entire data flow. The input block specifies the Kinesis stream source and schema (column names, types, delimiters). The outputs array routes transformed data to Lambda functions or Firehose delivery streams. The referenceDataSource joins streaming data with static lookup tables stored in S3.
Connect to private resources in a VPC
Applications that need to access RDS databases, ElastiCache clusters, or internal APIs require VPC configuration to join private subnets.
import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";
const example = new aws.s3.Bucket("example", {bucket: "example-flink-application"});
const exampleBucketObjectv2 = new aws.s3.BucketObjectv2("example", {
bucket: example.id,
key: "example-flink-application",
source: new pulumi.asset.FileAsset("flink-app.jar"),
});
const exampleApplication = new aws.kinesisanalyticsv2.Application("example", {
name: "example-flink-application",
runtimeEnvironment: "FLINK-1_8",
serviceExecutionRole: exampleAwsIamRole.arn,
applicationConfiguration: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: example.arn,
fileKey: exampleBucketObjectv2.key,
},
},
codeContentType: "ZIPFILE",
},
vpcConfiguration: {
securityGroupIds: [
exampleAwsSecurityGroup[0].id,
exampleAwsSecurityGroup[1].id,
],
subnetIds: [exampleAwsSubnet.id],
},
},
});
import pulumi
import pulumi_aws as aws
example = aws.s3.Bucket("example", bucket="example-flink-application")
example_bucket_objectv2 = aws.s3.BucketObjectv2("example",
bucket=example.id,
key="example-flink-application",
source=pulumi.FileAsset("flink-app.jar"))
example_application = aws.kinesisanalyticsv2.Application("example",
name="example-flink-application",
runtime_environment="FLINK-1_8",
service_execution_role=example_aws_iam_role["arn"],
application_configuration={
"application_code_configuration": {
"code_content": {
"s3_content_location": {
"bucket_arn": example.arn,
"file_key": example_bucket_objectv2.key,
},
},
"code_content_type": "ZIPFILE",
},
"vpc_configuration": {
"security_group_ids": [
example_aws_security_group[0]["id"],
example_aws_security_group[1]["id"],
],
"subnet_ids": [example_aws_subnet["id"]],
},
})
package main
import (
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/kinesisanalyticsv2"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
example, err := s3.NewBucket(ctx, "example", &s3.BucketArgs{
Bucket: pulumi.String("example-flink-application"),
})
if err != nil {
return err
}
exampleBucketObjectv2, err := s3.NewBucketObjectv2(ctx, "example", &s3.BucketObjectv2Args{
Bucket: example.ID(),
Key: pulumi.String("example-flink-application"),
Source: pulumi.NewFileAsset("flink-app.jar"),
})
if err != nil {
return err
}
_, err = kinesisanalyticsv2.NewApplication(ctx, "example", &kinesisanalyticsv2.ApplicationArgs{
Name: pulumi.String("example-flink-application"),
RuntimeEnvironment: pulumi.String("FLINK-1_8"),
ServiceExecutionRole: pulumi.Any(exampleAwsIamRole.Arn),
ApplicationConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationArgs{
ApplicationCodeConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs{
CodeContent: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs{
S3ContentLocation: &kinesisanalyticsv2.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs{
BucketArn: example.Arn,
FileKey: exampleBucketObjectv2.Key,
},
},
CodeContentType: pulumi.String("ZIPFILE"),
},
VpcConfiguration: &kinesisanalyticsv2.ApplicationApplicationConfigurationVpcConfigurationArgs{
SecurityGroupIds: pulumi.StringArray{
exampleAwsSecurityGroup[0].Id,
exampleAwsSecurityGroup[1].Id,
},
SubnetIds: pulumi.StringArray{
exampleAwsSubnet.Id,
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;
return await Deployment.RunAsync(() =>
{
var example = new Aws.S3.Bucket("example", new()
{
BucketName = "example-flink-application",
});
var exampleBucketObjectv2 = new Aws.S3.BucketObjectv2("example", new()
{
Bucket = example.Id,
Key = "example-flink-application",
Source = new FileAsset("flink-app.jar"),
});
var exampleApplication = new Aws.KinesisAnalyticsV2.Application("example", new()
{
Name = "example-flink-application",
RuntimeEnvironment = "FLINK-1_8",
ServiceExecutionRole = exampleAwsIamRole.Arn,
ApplicationConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationArgs
{
ApplicationCodeConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs
{
CodeContent = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs
{
S3ContentLocation = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs
{
BucketArn = example.Arn,
FileKey = exampleBucketObjectv2.Key,
},
},
CodeContentType = "ZIPFILE",
},
VpcConfiguration = new Aws.KinesisAnalyticsV2.Inputs.ApplicationApplicationConfigurationVpcConfigurationArgs
{
SecurityGroupIds = new[]
{
exampleAwsSecurityGroup[0].Id,
exampleAwsSecurityGroup[1].Id,
},
SubnetIds = new[]
{
exampleAwsSubnet.Id,
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.s3.Bucket;
import com.pulumi.aws.s3.BucketArgs;
import com.pulumi.aws.s3.BucketObjectv2;
import com.pulumi.aws.s3.BucketObjectv2Args;
import com.pulumi.aws.kinesisanalyticsv2.Application;
import com.pulumi.aws.kinesisanalyticsv2.ApplicationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs;
import com.pulumi.aws.kinesisanalyticsv2.inputs.ApplicationApplicationConfigurationVpcConfigurationArgs;
import com.pulumi.asset.FileAsset;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var example = new Bucket("example", BucketArgs.builder()
.bucket("example-flink-application")
.build());
var exampleBucketObjectv2 = new BucketObjectv2("exampleBucketObjectv2", BucketObjectv2Args.builder()
.bucket(example.id())
.key("example-flink-application")
.source(new FileAsset("flink-app.jar"))
.build());
var exampleApplication = new Application("exampleApplication", ApplicationArgs.builder()
.name("example-flink-application")
.runtimeEnvironment("FLINK-1_8")
.serviceExecutionRole(exampleAwsIamRole.arn())
.applicationConfiguration(ApplicationApplicationConfigurationArgs.builder()
.applicationCodeConfiguration(ApplicationApplicationConfigurationApplicationCodeConfigurationArgs.builder()
.codeContent(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentArgs.builder()
.s3ContentLocation(ApplicationApplicationConfigurationApplicationCodeConfigurationCodeContentS3ContentLocationArgs.builder()
.bucketArn(example.arn())
.fileKey(exampleBucketObjectv2.key())
.build())
.build())
.codeContentType("ZIPFILE")
.build())
.vpcConfiguration(ApplicationApplicationConfigurationVpcConfigurationArgs.builder()
.securityGroupIds(
exampleAwsSecurityGroup[0].id(),
exampleAwsSecurityGroup[1].id())
.subnetIds(exampleAwsSubnet.id())
.build())
.build())
.build());
}
}
resources:
example:
type: aws:s3:Bucket
properties:
bucket: example-flink-application
exampleBucketObjectv2:
type: aws:s3:BucketObjectv2
name: example
properties:
bucket: ${example.id}
key: example-flink-application
source:
fn::FileAsset: flink-app.jar
exampleApplication:
type: aws:kinesisanalyticsv2:Application
name: example
properties:
name: example-flink-application
runtimeEnvironment: FLINK-1_8
serviceExecutionRole: ${exampleAwsIamRole.arn}
applicationConfiguration:
applicationCodeConfiguration:
codeContent:
s3ContentLocation:
bucketArn: ${example.arn}
fileKey: ${exampleBucketObjectv2.key}
codeContentType: ZIPFILE
vpcConfiguration:
securityGroupIds:
- ${exampleAwsSecurityGroup[0].id}
- ${exampleAwsSecurityGroup[1].id}
subnetIds:
- ${exampleAwsSubnet.id}
The vpcConfiguration property places your application in specified subnets with attached security groups, enabling access to private resources. Kinesis Data Analytics manages the network interfaces automatically. This extends the basic Flink configuration with VPC networking.
Beyond these examples
These snippets focus on specific application-level features: Flink and SQL runtime environments, stream processing with custom parallelism and monitoring, and VPC networking for private resource access. They’re intentionally minimal rather than full stream processing pipelines.
The examples may reference pre-existing infrastructure such as IAM execution roles with appropriate permissions, S3 buckets for code artifacts and reference data, Kinesis streams, Lambda functions, Firehose delivery streams (for SQL applications), and VPC subnets and security groups (for VPC configuration). They focus on configuring the application rather than provisioning everything around it.
To keep things focused, common application patterns are omitted, including:
- Application snapshots and restore configuration
- Auto-scaling policies (parallelismConfiguration.autoScalingEnabled shown but not tuned)
- CloudWatch logging for Flink applications
- Application lifecycle management (startApplication, forceStop)
These omissions are intentional: the goal is to illustrate how each application feature is wired, not provide drop-in stream processing modules. See the Kinesis Analytics v2 Application resource reference for all available configuration options.
Let's deploy AWS Kinesis Data Analytics Applications
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Application Types & Console Visibility
aws.kinesis.AnalyticsApplication instead if you need console management.SQL-1_0 runtime and process data with SQL queries. Flink applications use FLINK-* runtimes and run compiled Java/Scala code for complex stream processing.SQL-1_0 for SQL applications, and FLINK-1_6, FLINK-1_8, FLINK-1_11, FLINK-1_13, FLINK-1_15, FLINK-1_18, FLINK-1_19, FLINK-1_20 for Flink applications.Configuration & Deployment
s3ContentLocation with bucketArn and fileKey pointing to your JAR file. For SQL applications, use textContent with inline SQL code and set codeContentType to PLAINTEXT.serviceExecutionRole must have permissions to access Kinesis data streams, Kinesis Data Firehose delivery streams, Amazon S3 objects, and other external resources your application uses.lambdaOutput or Kinesis Firehose delivery streams using kinesisFirehoseOutput.Immutability & Updates
name, applicationMode, and description properties are immutable and require resource replacement if changed.versionId each time the application is updated. Pulumi uses this to detect configuration drift.Lifecycle Management
startApplication property to true to start or false to stop the application.forceStop property to force stop an unresponsive Flink-based application.Networking & VPC
vpcConfiguration with securityGroupIds and subnetIds to run your application within a VPC.Using a different cloud?
Explore analytics guides for other cloud providers: