Configure Azure Stream Analytics Inputs

The azure-native:streamanalytics:Input resource, part of the Pulumi Azure Native provider, defines input sources for Stream Analytics jobs: streaming event sources that deliver continuous data or reference data that enriches events through joins. This guide focuses on three capabilities: Event Hub and IoT Hub streaming inputs, Blob Storage for streaming and reference data, and serialization format configuration.

Inputs belong to Stream Analytics jobs and reference existing Event Hubs, IoT Hubs, or storage accounts. The examples are intentionally small. Combine them with your own Stream Analytics jobs and Azure infrastructure.

Ingest streaming events from Event Hub with JSON

Stream Analytics jobs commonly consume events from Event Hub, which acts as the ingestion layer for real-time pipelines.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const input = new azure_native.streamanalytics.Input("input", {
    inputName: "input7425",
    jobName: "sj197",
    properties: {
        datasource: {
            consumerGroupName: "sdkconsumergroup",
            eventHubName: "sdkeventhub",
            serviceBusNamespace: "sdktest",
            sharedAccessPolicyKey: "someSharedAccessPolicyKey==",
            sharedAccessPolicyName: "RootManageSharedAccessKey",
            type: "Microsoft.ServiceBus/EventHub",
        },
        serialization: {
            encoding: azure_native.streamanalytics.Encoding.UTF8,
            type: "Json",
        },
        type: "Stream",
    },
    resourceGroupName: "sjrg3139",
});
import pulumi
import pulumi_azure_native as azure_native

input = azure_native.streamanalytics.Input("input",
    input_name="input7425",
    job_name="sj197",
    properties={
        "datasource": {
            "consumer_group_name": "sdkconsumergroup",
            "event_hub_name": "sdkeventhub",
            "service_bus_namespace": "sdktest",
            "shared_access_policy_key": "someSharedAccessPolicyKey==",
            "shared_access_policy_name": "RootManageSharedAccessKey",
            "type": "Microsoft.ServiceBus/EventHub",
        },
        "serialization": {
            "encoding": azure_native.streamanalytics.Encoding.UTF8,
            "type": "Json",
        },
        "type": "Stream",
    },
    resource_group_name="sjrg3139")
package main

import (
	streamanalytics "github.com/pulumi/pulumi-azure-native-sdk/streamanalytics/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := streamanalytics.NewInput(ctx, "input", &streamanalytics.InputArgs{
			InputName: pulumi.String("input7425"),
			JobName:   pulumi.String("sj197"),
			Properties: &streamanalytics.StreamInputPropertiesArgs{
				Datasource: streamanalytics.EventHubStreamInputDataSource{
					ConsumerGroupName:      "sdkconsumergroup",
					EventHubName:           "sdkeventhub",
					ServiceBusNamespace:    "sdktest",
					SharedAccessPolicyKey:  "someSharedAccessPolicyKey==",
					SharedAccessPolicyName: "RootManageSharedAccessKey",
					Type:                   "Microsoft.ServiceBus/EventHub",
				},
				Serialization: streamanalytics.JsonSerialization{
					Encoding: streamanalytics.EncodingUTF8,
					Type:     "Json",
				},
				Type: pulumi.String("Stream"),
			},
			ResourceGroupName: pulumi.String("sjrg3139"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var input = new AzureNative.StreamAnalytics.Input("input", new()
    {
        InputName = "input7425",
        JobName = "sj197",
        Properties = new AzureNative.StreamAnalytics.Inputs.StreamInputPropertiesArgs
        {
            Datasource = new AzureNative.StreamAnalytics.Inputs.EventHubStreamInputDataSourceArgs
            {
                ConsumerGroupName = "sdkconsumergroup",
                EventHubName = "sdkeventhub",
                ServiceBusNamespace = "sdktest",
                SharedAccessPolicyKey = "someSharedAccessPolicyKey==",
                SharedAccessPolicyName = "RootManageSharedAccessKey",
                Type = "Microsoft.ServiceBus/EventHub",
            },
            Serialization = new AzureNative.StreamAnalytics.Inputs.JsonSerializationArgs
            {
                Encoding = AzureNative.StreamAnalytics.Encoding.UTF8,
                Type = "Json",
            },
            Type = "Stream",
        },
        ResourceGroupName = "sjrg3139",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.streamanalytics.Input;
import com.pulumi.azurenative.streamanalytics.InputArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var input = new Input("input", InputArgs.builder()
            .inputName("input7425")
            .jobName("sj197")
            .properties(StreamInputPropertiesArgs.builder()
                .datasource(EventHubStreamInputDataSourceArgs.builder()
                    .consumerGroupName("sdkconsumergroup")
                    .eventHubName("sdkeventhub")
                    .serviceBusNamespace("sdktest")
                    .sharedAccessPolicyKey("someSharedAccessPolicyKey==")
                    .sharedAccessPolicyName("RootManageSharedAccessKey")
                    .type("Microsoft.ServiceBus/EventHub")
                    .build())
                .serialization(JsonSerializationArgs.builder()
                    .encoding("UTF8")
                    .type("Json")
                    .build())
                .type("Stream")
                .build())
            .resourceGroupName("sjrg3139")
            .build());

    }
}
resources:
  input:
    type: azure-native:streamanalytics:Input
    properties:
      inputName: input7425
      jobName: sj197
      properties:
        datasource:
          consumerGroupName: sdkconsumergroup
          eventHubName: sdkeventhub
          serviceBusNamespace: sdktest
          sharedAccessPolicyKey: someSharedAccessPolicyKey==
          sharedAccessPolicyName: RootManageSharedAccessKey
          type: Microsoft.ServiceBus/EventHub
        serialization:
          encoding: UTF8
          type: Json
        type: Stream
      resourceGroupName: sjrg3139

The datasource property configures the Event Hub connection. The consumerGroupName isolates this job’s read position from other consumers. The serialization property tells Stream Analytics how to parse incoming bytes; JSON encoding with UTF8 handles text-based event payloads.

Consume device telemetry from IoT Hub with Avro

IoT scenarios route device telemetry through IoT Hub, which Stream Analytics can consume directly for real-time analytics.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const input = new azure_native.streamanalytics.Input("input", {
    inputName: "input7970",
    jobName: "sj9742",
    properties: {
        datasource: {
            consumerGroupName: "sdkconsumergroup",
            endpoint: "messages/events",
            iotHubNamespace: "iothub",
            sharedAccessPolicyKey: "sharedAccessPolicyKey=",
            sharedAccessPolicyName: "owner",
            type: "Microsoft.Devices/IotHubs",
        },
        serialization: {
            type: "Avro",
        },
        type: "Stream",
    },
    resourceGroupName: "sjrg3467",
});
import pulumi
import pulumi_azure_native as azure_native

input = azure_native.streamanalytics.Input("input",
    input_name="input7970",
    job_name="sj9742",
    properties={
        "datasource": {
            "consumer_group_name": "sdkconsumergroup",
            "endpoint": "messages/events",
            "iot_hub_namespace": "iothub",
            "shared_access_policy_key": "sharedAccessPolicyKey=",
            "shared_access_policy_name": "owner",
            "type": "Microsoft.Devices/IotHubs",
        },
        "serialization": {
            "type": "Avro",
        },
        "type": "Stream",
    },
    resource_group_name="sjrg3467")
package main

import (
	streamanalytics "github.com/pulumi/pulumi-azure-native-sdk/streamanalytics/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := streamanalytics.NewInput(ctx, "input", &streamanalytics.InputArgs{
			InputName: pulumi.String("input7970"),
			JobName:   pulumi.String("sj9742"),
			Properties: &streamanalytics.StreamInputPropertiesArgs{
				Datasource: streamanalytics.IoTHubStreamInputDataSource{
					ConsumerGroupName:      "sdkconsumergroup",
					Endpoint:               "messages/events",
					IotHubNamespace:        "iothub",
					SharedAccessPolicyKey:  "sharedAccessPolicyKey=",
					SharedAccessPolicyName: "owner",
					Type:                   "Microsoft.Devices/IotHubs",
				},
				Serialization: streamanalytics.AvroSerialization{
					Type: "Avro",
				},
				Type: pulumi.String("Stream"),
			},
			ResourceGroupName: pulumi.String("sjrg3467"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var input = new AzureNative.StreamAnalytics.Input("input", new()
    {
        InputName = "input7970",
        JobName = "sj9742",
        Properties = new AzureNative.StreamAnalytics.Inputs.StreamInputPropertiesArgs
        {
            Datasource = new AzureNative.StreamAnalytics.Inputs.IoTHubStreamInputDataSourceArgs
            {
                ConsumerGroupName = "sdkconsumergroup",
                Endpoint = "messages/events",
                IotHubNamespace = "iothub",
                SharedAccessPolicyKey = "sharedAccessPolicyKey=",
                SharedAccessPolicyName = "owner",
                Type = "Microsoft.Devices/IotHubs",
            },
            Serialization = new AzureNative.StreamAnalytics.Inputs.AvroSerializationArgs
            {
                Type = "Avro",
            },
            Type = "Stream",
        },
        ResourceGroupName = "sjrg3467",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.streamanalytics.Input;
import com.pulumi.azurenative.streamanalytics.InputArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var input = new Input("input", InputArgs.builder()
            .inputName("input7970")
            .jobName("sj9742")
            .properties(StreamInputPropertiesArgs.builder()
                .datasource(IoTHubStreamInputDataSourceArgs.builder()
                    .consumerGroupName("sdkconsumergroup")
                    .endpoint("messages/events")
                    .iotHubNamespace("iothub")
                    .sharedAccessPolicyKey("sharedAccessPolicyKey=")
                    .sharedAccessPolicyName("owner")
                    .type("Microsoft.Devices/IotHubs")
                    .build())
                .serialization(AvroSerializationArgs.builder()
                    .type("Avro")
                    .build())
                .type("Stream")
                .build())
            .resourceGroupName("sjrg3467")
            .build());

    }
}
resources:
  input:
    type: azure-native:streamanalytics:Input
    properties:
      inputName: input7970
      jobName: sj9742
      properties:
        datasource:
          consumerGroupName: sdkconsumergroup
          endpoint: messages/events
          iotHubNamespace: iothub
          sharedAccessPolicyKey: sharedAccessPolicyKey=
          sharedAccessPolicyName: owner
          type: Microsoft.Devices/IotHubs
        serialization:
          type: Avro
        type: Stream
      resourceGroupName: sjrg3467

The IoTHubStreamInputDataSource connects to the IoT Hub’s built-in Event Hub-compatible endpoint. The endpoint property specifies “messages/events” for device-to-cloud messages. Avro serialization provides compact binary encoding, reducing bandwidth for high-volume device data.

Read streaming data from Blob Storage with CSV

Some pipelines write events to Blob Storage in time-partitioned paths, which Stream Analytics reads as a continuous stream.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const input = new azure_native.streamanalytics.Input("input", {
    inputName: "input8899",
    jobName: "sj6695",
    properties: {
        datasource: {
            container: "state",
            dateFormat: "yyyy/MM/dd",
            pathPattern: "{date}/{time}",
            sourcePartitionCount: 16,
            storageAccounts: [{
                accountKey: "someAccountKey==",
                accountName: "someAccountName",
            }],
            timeFormat: "HH",
            type: "Microsoft.Storage/Blob",
        },
        serialization: {
            encoding: azure_native.streamanalytics.Encoding.UTF8,
            fieldDelimiter: ",",
            type: "Csv",
        },
        type: "Stream",
    },
    resourceGroupName: "sjrg8161",
});
import pulumi
import pulumi_azure_native as azure_native

input = azure_native.streamanalytics.Input("input",
    input_name="input8899",
    job_name="sj6695",
    properties={
        "datasource": {
            "container": "state",
            "date_format": "yyyy/MM/dd",
            "path_pattern": "{date}/{time}",
            "source_partition_count": 16,
            "storage_accounts": [{
                "account_key": "someAccountKey==",
                "account_name": "someAccountName",
            }],
            "time_format": "HH",
            "type": "Microsoft.Storage/Blob",
        },
        "serialization": {
            "encoding": azure_native.streamanalytics.Encoding.UTF8,
            "field_delimiter": ",",
            "type": "Csv",
        },
        "type": "Stream",
    },
    resource_group_name="sjrg8161")
package main

import (
	streamanalytics "github.com/pulumi/pulumi-azure-native-sdk/streamanalytics/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := streamanalytics.NewInput(ctx, "input", &streamanalytics.InputArgs{
			InputName: pulumi.String("input8899"),
			JobName:   pulumi.String("sj6695"),
			Properties: &streamanalytics.StreamInputPropertiesArgs{
				Datasource: streamanalytics.BlobStreamInputDataSource{
					Container:            "state",
					DateFormat:           "yyyy/MM/dd",
					PathPattern:          "{date}/{time}",
					SourcePartitionCount: 16,
					StorageAccounts: []streamanalytics.StorageAccount{
						{
							AccountKey:  "someAccountKey==",
							AccountName: "someAccountName",
						},
					},
					TimeFormat: "HH",
					Type:       "Microsoft.Storage/Blob",
				},
				Serialization: streamanalytics.CsvSerialization{
					Encoding:       streamanalytics.EncodingUTF8,
					FieldDelimiter: ",",
					Type:           "Csv",
				},
				Type: pulumi.String("Stream"),
			},
			ResourceGroupName: pulumi.String("sjrg8161"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var input = new AzureNative.StreamAnalytics.Input("input", new()
    {
        InputName = "input8899",
        JobName = "sj6695",
        Properties = new AzureNative.StreamAnalytics.Inputs.StreamInputPropertiesArgs
        {
            Datasource = new AzureNative.StreamAnalytics.Inputs.BlobStreamInputDataSourceArgs
            {
                Container = "state",
                DateFormat = "yyyy/MM/dd",
                PathPattern = "{date}/{time}",
                SourcePartitionCount = 16,
                StorageAccounts = new[]
                {
                    new AzureNative.StreamAnalytics.Inputs.StorageAccountArgs
                    {
                        AccountKey = "someAccountKey==",
                        AccountName = "someAccountName",
                    },
                },
                TimeFormat = "HH",
                Type = "Microsoft.Storage/Blob",
            },
            Serialization = new AzureNative.StreamAnalytics.Inputs.CsvSerializationArgs
            {
                Encoding = AzureNative.StreamAnalytics.Encoding.UTF8,
                FieldDelimiter = ",",
                Type = "Csv",
            },
            Type = "Stream",
        },
        ResourceGroupName = "sjrg8161",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.streamanalytics.Input;
import com.pulumi.azurenative.streamanalytics.InputArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var input = new Input("input", InputArgs.builder()
            .inputName("input8899")
            .jobName("sj6695")
            .properties(StreamInputPropertiesArgs.builder()
                .datasource(BlobStreamInputDataSourceArgs.builder()
                    .container("state")
                    .dateFormat("yyyy/MM/dd")
                    .pathPattern("{date}/{time}")
                    .sourcePartitionCount(16)
                    .storageAccounts(StorageAccountArgs.builder()
                        .accountKey("someAccountKey==")
                        .accountName("someAccountName")
                        .build())
                    .timeFormat("HH")
                    .type("Microsoft.Storage/Blob")
                    .build())
                .serialization(CsvSerializationArgs.builder()
                    .encoding("UTF8")
                    .fieldDelimiter(",")
                    .type("Csv")
                    .build())
                .type("Stream")
                .build())
            .resourceGroupName("sjrg8161")
            .build());

    }
}
resources:
  input:
    type: azure-native:streamanalytics:Input
    properties:
      inputName: input8899
      jobName: sj6695
      properties:
        datasource:
          container: state
          dateFormat: yyyy/MM/dd
          pathPattern: '{date}/{time}'
          sourcePartitionCount: 16
          storageAccounts:
            - accountKey: someAccountKey==
              accountName: someAccountName
          timeFormat: HH
          type: Microsoft.Storage/Blob
        serialization:
          encoding: UTF8
          fieldDelimiter: ','
          type: Csv
        type: Stream
      resourceGroupName: sjrg8161

The pathPattern uses {date} and {time} placeholders that Stream Analytics expands using dateFormat and timeFormat. The sourcePartitionCount controls parallelism for reading blobs. CSV serialization parses delimited text files; fieldDelimiter specifies the column separator.

Join streaming data with reference data from Blob Storage

Reference inputs provide static or slowly-changing lookup data that enriches streaming events through joins.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const input = new azure_native.streamanalytics.Input("input", {
    inputName: "input7225",
    jobName: "sj9597",
    properties: {
        datasource: {
            container: "state",
            dateFormat: "yyyy/MM/dd",
            pathPattern: "{date}/{time}",
            storageAccounts: [{
                accountKey: "someAccountKey==",
                accountName: "someAccountName",
            }],
            timeFormat: "HH",
            type: "Microsoft.Storage/Blob",
        },
        serialization: {
            encoding: azure_native.streamanalytics.Encoding.UTF8,
            fieldDelimiter: ",",
            type: "Csv",
        },
        type: "Reference",
    },
    resourceGroupName: "sjrg8440",
});
import pulumi
import pulumi_azure_native as azure_native

input = azure_native.streamanalytics.Input("input",
    input_name="input7225",
    job_name="sj9597",
    properties={
        "datasource": {
            "container": "state",
            "date_format": "yyyy/MM/dd",
            "path_pattern": "{date}/{time}",
            "storage_accounts": [{
                "account_key": "someAccountKey==",
                "account_name": "someAccountName",
            }],
            "time_format": "HH",
            "type": "Microsoft.Storage/Blob",
        },
        "serialization": {
            "encoding": azure_native.streamanalytics.Encoding.UTF8,
            "field_delimiter": ",",
            "type": "Csv",
        },
        "type": "Reference",
    },
    resource_group_name="sjrg8440")
package main

import (
	streamanalytics "github.com/pulumi/pulumi-azure-native-sdk/streamanalytics/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := streamanalytics.NewInput(ctx, "input", &streamanalytics.InputArgs{
			InputName: pulumi.String("input7225"),
			JobName:   pulumi.String("sj9597"),
			Properties: &streamanalytics.ReferenceInputPropertiesArgs{
				Datasource: streamanalytics.BlobReferenceInputDataSource{
					Container:   "state",
					DateFormat:  "yyyy/MM/dd",
					PathPattern: "{date}/{time}",
					StorageAccounts: []streamanalytics.StorageAccount{
						{
							AccountKey:  "someAccountKey==",
							AccountName: "someAccountName",
						},
					},
					TimeFormat: "HH",
					Type:       "Microsoft.Storage/Blob",
				},
				Serialization: streamanalytics.CsvSerialization{
					Encoding:       streamanalytics.EncodingUTF8,
					FieldDelimiter: ",",
					Type:           "Csv",
				},
				Type: pulumi.String("Reference"),
			},
			ResourceGroupName: pulumi.String("sjrg8440"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var input = new AzureNative.StreamAnalytics.Input("input", new()
    {
        InputName = "input7225",
        JobName = "sj9597",
        Properties = new AzureNative.StreamAnalytics.Inputs.ReferenceInputPropertiesArgs
        {
            Datasource = new AzureNative.StreamAnalytics.Inputs.BlobReferenceInputDataSourceArgs
            {
                Container = "state",
                DateFormat = "yyyy/MM/dd",
                PathPattern = "{date}/{time}",
                StorageAccounts = new[]
                {
                    new AzureNative.StreamAnalytics.Inputs.StorageAccountArgs
                    {
                        AccountKey = "someAccountKey==",
                        AccountName = "someAccountName",
                    },
                },
                TimeFormat = "HH",
                Type = "Microsoft.Storage/Blob",
            },
            Serialization = new AzureNative.StreamAnalytics.Inputs.CsvSerializationArgs
            {
                Encoding = AzureNative.StreamAnalytics.Encoding.UTF8,
                FieldDelimiter = ",",
                Type = "Csv",
            },
            Type = "Reference",
        },
        ResourceGroupName = "sjrg8440",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.streamanalytics.Input;
import com.pulumi.azurenative.streamanalytics.InputArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var input = new Input("input", InputArgs.builder()
            .inputName("input7225")
            .jobName("sj9597")
            .properties(ReferenceInputPropertiesArgs.builder()
                .datasource(BlobReferenceInputDataSourceArgs.builder()
                    .container("state")
                    .dateFormat("yyyy/MM/dd")
                    .pathPattern("{date}/{time}")
                    .storageAccounts(StorageAccountArgs.builder()
                        .accountKey("someAccountKey==")
                        .accountName("someAccountName")
                        .build())
                    .timeFormat("HH")
                    .type("Microsoft.Storage/Blob")
                    .build())
                .serialization(CsvSerializationArgs.builder()
                    .encoding("UTF8")
                    .fieldDelimiter(",")
                    .type("Csv")
                    .build())
                .type("Reference")
                .build())
            .resourceGroupName("sjrg8440")
            .build());

    }
}
resources:
  input:
    type: azure-native:streamanalytics:Input
    properties:
      inputName: input7225
      jobName: sj9597
      properties:
        datasource:
          container: state
          dateFormat: yyyy/MM/dd
          pathPattern: '{date}/{time}'
          storageAccounts:
            - accountKey: someAccountKey==
              accountName: someAccountName
          timeFormat: HH
          type: Microsoft.Storage/Blob
        serialization:
          encoding: UTF8
          fieldDelimiter: ','
          type: Csv
        type: Reference
      resourceGroupName: sjrg8440

Reference inputs differ from streaming inputs in their type property (“Reference” vs “Stream”). Stream Analytics reloads reference data periodically based on the pathPattern, allowing you to update lookup tables without restarting the job. The BlobReferenceInputDataSource points to CSV files that contain dimension tables or configuration data.

Beyond these examples

These snippets focus on specific input features: streaming inputs (Event Hub, IoT Hub, Blob Storage), reference data for enrichment, and serialization formats (JSON, Avro, CSV). They’re intentionally minimal rather than full Stream Analytics solutions.

The examples reference pre-existing infrastructure such as Event Hub namespaces and hubs, IoT Hub instances, storage accounts and blob containers, and Stream Analytics jobs. They focus on configuring the input rather than provisioning the surrounding Azure services.

To keep things focused, common input patterns are omitted, including:

  • Managed identity authentication (authenticationMode)
  • Partition keys for Event Hub
  • Compression settings
  • Gateway Message Bus and File inputs (edge scenarios)

These omissions are intentional: the goal is to illustrate how each input type is wired, not provide drop-in data ingestion modules. See the Stream Analytics Input resource reference for all available configuration options.

Let's configure Azure Stream Analytics Inputs

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Input Types & Configuration
What's the difference between Stream and Reference inputs?
Stream inputs provide continuous data from sources like Event Hub or IoT Hub, while Reference inputs provide static lookup data from sources like Blob Storage or File for enriching stream data.
What input sources can I use with Stream Analytics?
You can use five datasource types: GatewayMessageBus, Microsoft.Storage/Blob, File, Microsoft.ServiceBus/EventHub, and Microsoft.Devices/IotHubs.
What properties can't be changed after creating an input?
The inputName, jobName, and resourceGroupName properties are immutable and cannot be changed after creation.
Serialization & Data Formats
What data serialization formats are supported?
Stream Analytics inputs support three serialization formats: CSV (Csv), JSON (Json), and Avro (Avro).
How do I configure CSV serialization?
Set serialization.type to Csv, specify encoding (e.g., UTF8), and define the fieldDelimiter (e.g., ,).
Blob Storage Configuration
How do I use date and time patterns in Blob input paths?
Use placeholders like {date}/{time} in pathPattern, then specify dateFormat (e.g., yyyy/MM/dd) and timeFormat (e.g., HH) to match your blob naming structure.
What's the difference between Blob stream and reference inputs?
Blob stream inputs read continuous data and support sourcePartitionCount for parallelism, while Blob reference inputs provide static lookup data without partitioning.
What is sourcePartitionCount used for?
The sourcePartitionCount property specifies the number of partitions for Blob stream inputs, enabling parallel processing of blob data.
Event Hub & IoT Hub Configuration
How do I configure an Event Hub input?
Set datasource.type to Microsoft.ServiceBus/EventHub, then provide eventHubName, serviceBusNamespace, consumerGroupName, sharedAccessPolicyName, and sharedAccessPolicyKey.
How do I configure an IoT Hub input?
Set datasource.type to Microsoft.Devices/IotHubs, then provide iotHubNamespace, endpoint (e.g., messages/events), consumerGroupName, sharedAccessPolicyName, and sharedAccessPolicyKey.
API Versions & Advanced Features
How do I access preview API versions like 2021-10-01-preview?
Use the CLI command pulumi package add azure-native streamanalytics [ApiVersion] to generate a local SDK package with the desired API version.

Using a different cloud?

Explore analytics guides for other cloud providers: