Configure Azure IoT Operations Dataflows

The azure-native:iotoperations:Dataflow resource, part of the Pulumi Azure Native provider, defines data routing pipelines within Azure IoT Operations that move telemetry between MQTT brokers and cloud destinations, with optional transformation. This guide focuses on three capabilities: MQTT-to-cloud routing patterns, filtering and transformation expressions, and contextual enrichment and format conversion.

Dataflows belong to a DataflowProfile and reference DataflowEndpoint resources for sources and destinations. The examples are intentionally small. Combine them with your own endpoint configurations, schema registries, and transformation logic.

Route MQTT telemetry to Event Grid topics

IoT deployments often route device telemetry from MQTT brokers to Azure Event Grid for event-driven processing.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-event-grid",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["thermostats/+/telemetry/temperature/#"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                destinationSettings: {
                    dataDestination: "factory/telemetry",
                    endpointRef: "event-grid-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-event-grid",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["thermostats/+/telemetry/temperature/#"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "destination_settings": {
                    "data_destination": "factory/telemetry",
                    "endpoint_ref": "event-grid-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-event-grid"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("thermostats/+/telemetry/temperature/#"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("factory/telemetry"),
							EndpointRef:     pulumi.String("event-grid-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-event-grid",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "thermostats/+/telemetry/temperature/#",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "factory/telemetry",
                        EndpointRef = "event-grid-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-event-grid")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("thermostats/+/telemetry/temperature/#")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("factory/telemetry")
                            .endpointRef("event-grid-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-event-grid
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - thermostats/+/telemetry/temperature/#
              endpointRef: aio-builtin-broker-endpoint
          - destinationSettings:
              dataDestination: factory/telemetry
              endpointRef: event-grid-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The operations array defines a pipeline: a source operation reads from MQTT topics via dataSources (supporting wildcards like + and #), and a destination operation writes to Event Grid via dataDestination. The endpointRef properties link to separately defined DataflowEndpoint resources that handle authentication and connection details.

Filter telemetry and republish to MQTT topics

Applications filter incoming telemetry based on threshold conditions before routing to specialized MQTT topics.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "mqtt-filter-to-topic",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    filter: [{
                        description: "filter-datapoint",
                        expression: "$1 > 9000 && $2 >= 8000",
                        inputs: [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                        type: azure_native.iotoperations.FilterType.Filter,
                    }],
                    map: [{
                        inputs: ["*"],
                        output: "*",
                        type: azure_native.iotoperations.DataflowMappingType.PassThrough,
                    }],
                },
                name: "transformation1",
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "data/filtered/thermostat",
                    endpointRef: "aio-builtin-broker-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="mqtt-filter-to-topic",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "filter": [{
                        "description": "filter-datapoint",
                        "expression": "$1 > 9000 && $2 >= 8000",
                        "inputs": [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                        "type": azure_native.iotoperations.FilterType.FILTER,
                    }],
                    "map": [{
                        "inputs": ["*"],
                        "output": "*",
                        "type": azure_native.iotoperations.DataflowMappingType.PASS_THROUGH,
                    }],
                },
                "name": "transformation1",
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "data/filtered/thermostat",
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("mqtt-filter-to-topic"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
								&iotoperations.DataflowBuiltInTransformationFilterArgs{
									Description: pulumi.String("filter-datapoint"),
									Expression:  pulumi.String("$1 > 9000 && $2 >= 8000"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
										pulumi.String("\"Tag 10\".Value"),
									},
									Type: pulumi.String(iotoperations.FilterTypeFilter),
								},
							},
							Map: iotoperations.DataflowBuiltInTransformationMapArray{
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("*"),
									},
									Output: pulumi.String("*"),
									Type:   pulumi.String(iotoperations.DataflowMappingTypePassThrough),
								},
							},
						},
						Name:          pulumi.String("transformation1"),
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("data/filtered/thermostat"),
							EndpointRef:     pulumi.String("aio-builtin-broker-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "mqtt-filter-to-topic",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        Filter = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
                            {
                                Description = "filter-datapoint",
                                Expression = "$1 > 9000 && $2 >= 8000",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                    "\"Tag 10\".Value",
                                },
                                Type = AzureNative.IoTOperations.FilterType.Filter,
                            },
                        },
                        Map = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "*",
                                },
                                Output = "*",
                                Type = AzureNative.IoTOperations.DataflowMappingType.PassThrough,
                            },
                        },
                    },
                    Name = "transformation1",
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "data/filtered/thermostat",
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("mqtt-filter-to-topic")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .filter(DataflowBuiltInTransformationFilterArgs.builder()
                                .description("filter-datapoint")
                                .expression("$1 > 9000 && $2 >= 8000")
                                .inputs(                                
                                    "temperature.Value",
                                    "\"Tag 10\".Value")
                                .type("Filter")
                                .build())
                            .map(DataflowBuiltInTransformationMapArgs.builder()
                                .inputs("*")
                                .output("*")
                                .type("PassThrough")
                                .build())
                            .build())
                        .name("transformation1")
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("data/filtered/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: mqtt-filter-to-topic
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              filter:
                - description: filter-datapoint
                  expression: $1 > 9000 && $2 >= 8000
                  inputs:
                    - temperature.Value
                    - '"Tag 10".Value'
                  type: Filter
              map:
                - inputs:
                    - '*'
                  output: '*'
                  type: PassThrough
            name: transformation1
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: data/filtered/thermostat
              endpointRef: aio-builtin-broker-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The builtInTransformationSettings block sits between source and destination operations. The filter array evaluates expressions against inputs (field references like temperature.Value), and only matching records proceed. The map array with PassThrough type forwards all fields unchanged. This pattern creates a filtered republish workflow within the same MQTT broker.

Transform telemetry with expressions and route to Event Hub

Complex scenarios require filtering, calculating derived metrics, and applying custom transformations before Event Hub delivery.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-event-hub-transformed",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    filter: [{
                        expression: "$1 > 9000 && $2 >= 8000",
                        inputs: [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                    }],
                    map: [
                        {
                            inputs: ["*"],
                            output: "*",
                        },
                        {
                            expression: "($1+$2)/2",
                            inputs: [
                                "temperature.Value",
                                "\"Tag 10\".Value",
                            ],
                            output: "AvgTemp.Value",
                        },
                        {
                            expression: "true",
                            inputs: [],
                            output: "dataflow-processed",
                        },
                        {
                            expression: "",
                            inputs: ["temperature.SourceTimestamp"],
                            output: "",
                        },
                        {
                            expression: "",
                            inputs: ["\"Tag 10\""],
                            output: "pressure",
                        },
                        {
                            expression: "cToF($1)",
                            inputs: ["temperature.Value"],
                            output: "temperatureF.Value",
                        },
                        {
                            expression: "scale ($1,0,10,0,100)",
                            inputs: ["\"Tag 10\".Value"],
                            output: "\"Scale Tag 10\".Value",
                        },
                    ],
                },
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "myuniqueeventhub",
                    endpointRef: "event-hub-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-event-hub-transformed",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "filter": [{
                        "expression": "$1 > 9000 && $2 >= 8000",
                        "inputs": [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                    }],
                    "map": [
                        {
                            "inputs": ["*"],
                            "output": "*",
                        },
                        {
                            "expression": "($1+$2)/2",
                            "inputs": [
                                "temperature.Value",
                                "\"Tag 10\".Value",
                            ],
                            "output": "AvgTemp.Value",
                        },
                        {
                            "expression": "true",
                            "inputs": [],
                            "output": "dataflow-processed",
                        },
                        {
                            "expression": "",
                            "inputs": ["temperature.SourceTimestamp"],
                            "output": "",
                        },
                        {
                            "expression": "",
                            "inputs": ["\"Tag 10\""],
                            "output": "pressure",
                        },
                        {
                            "expression": "cToF($1)",
                            "inputs": ["temperature.Value"],
                            "output": "temperatureF.Value",
                        },
                        {
                            "expression": "scale ($1,0,10,0,100)",
                            "inputs": ["\"Tag 10\".Value"],
                            "output": "\"Scale Tag 10\".Value",
                        },
                    ],
                },
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "myuniqueeventhub",
                    "endpoint_ref": "event-hub-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-event-hub-transformed"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
								&iotoperations.DataflowBuiltInTransformationFilterArgs{
									Expression: pulumi.String("$1 > 9000 && $2 >= 8000"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
										pulumi.String("\"Tag 10\".Value"),
									},
								},
							},
							Map: iotoperations.DataflowBuiltInTransformationMapArray{
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("*"),
									},
									Output: pulumi.String("*"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("($1+$2)/2"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
										pulumi.String("\"Tag 10\".Value"),
									},
									Output: pulumi.String("AvgTemp.Value"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("true"),
									Inputs:     pulumi.StringArray{},
									Output:     pulumi.String("dataflow-processed"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String(""),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.SourceTimestamp"),
									},
									Output: pulumi.String(""),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String(""),
									Inputs: pulumi.StringArray{
										pulumi.String("\"Tag 10\""),
									},
									Output: pulumi.String("pressure"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("cToF($1)"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
									},
									Output: pulumi.String("temperatureF.Value"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("scale ($1,0,10,0,100)"),
									Inputs: pulumi.StringArray{
										pulumi.String("\"Tag 10\".Value"),
									},
									Output: pulumi.String("\"Scale Tag 10\".Value"),
								},
							},
						},
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("myuniqueeventhub"),
							EndpointRef:     pulumi.String("event-hub-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-event-hub-transformed",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        Filter = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
                            {
                                Expression = "$1 > 9000 && $2 >= 8000",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                    "\"Tag 10\".Value",
                                },
                            },
                        },
                        Map = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "*",
                                },
                                Output = "*",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "($1+$2)/2",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                    "\"Tag 10\".Value",
                                },
                                Output = "AvgTemp.Value",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "true",
                                Inputs = new() { },
                                Output = "dataflow-processed",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "",
                                Inputs = new[]
                                {
                                    "temperature.SourceTimestamp",
                                },
                                Output = "",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "",
                                Inputs = new[]
                                {
                                    "\"Tag 10\"",
                                },
                                Output = "pressure",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "cToF($1)",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                },
                                Output = "temperatureF.Value",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "scale ($1,0,10,0,100)",
                                Inputs = new[]
                                {
                                    "\"Tag 10\".Value",
                                },
                                Output = "\"Scale Tag 10\".Value",
                            },
                        },
                    },
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "myuniqueeventhub",
                        EndpointRef = "event-hub-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-event-hub-transformed")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .filter(DataflowBuiltInTransformationFilterArgs.builder()
                                .expression("$1 > 9000 && $2 >= 8000")
                                .inputs(                                
                                    "temperature.Value",
                                    "\"Tag 10\".Value")
                                .build())
                            .map(                            
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .inputs("*")
                                    .output("*")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("($1+$2)/2")
                                    .inputs(                                    
                                        "temperature.Value",
                                        "\"Tag 10\".Value")
                                    .output("AvgTemp.Value")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("true")
                                    .inputs()
                                    .output("dataflow-processed")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("")
                                    .inputs("temperature.SourceTimestamp")
                                    .output("")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("")
                                    .inputs("\"Tag 10\"")
                                    .output("pressure")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("cToF($1)")
                                    .inputs("temperature.Value")
                                    .output("temperatureF.Value")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("scale ($1,0,10,0,100)")
                                    .inputs("\"Tag 10\".Value")
                                    .output("\"Scale Tag 10\".Value")
                                    .build())
                            .build())
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("myuniqueeventhub")
                            .endpointRef("event-hub-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-event-hub-transformed
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              filter:
                - expression: $1 > 9000 && $2 >= 8000
                  inputs:
                    - temperature.Value
                    - '"Tag 10".Value'
              map:
                - inputs:
                    - '*'
                  output: '*'
                - expression: ($1+$2)/2
                  inputs:
                    - temperature.Value
                    - '"Tag 10".Value'
                  output: AvgTemp.Value
                - expression: 'true'
                  inputs: []
                  output: dataflow-processed
                - expression: ""
                  inputs:
                    - temperature.SourceTimestamp
                  output: ""
                - expression: ""
                  inputs:
                    - '"Tag 10"'
                  output: pressure
                - expression: cToF($1)
                  inputs:
                    - temperature.Value
                  output: temperatureF.Value
                - expression: scale ($1,0,10,0,100)
                  inputs:
                    - '"Tag 10".Value'
                  output: '"Scale Tag 10".Value'
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: myuniqueeventhub
              endpointRef: event-hub-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

Multiple map entries define a transformation pipeline: passthrough (* to *), calculated fields (expressions like ($1+$2)/2), constant additions (true to dataflow-processed), field renames (empty expression copies input to output), and function calls (cToF($1) for unit conversion, scale($1,0,10,0,100) for normalization). The filter operation runs first, then transformations apply to surviving records.

Enrich telemetry with contextual data from datasets

Industrial IoT applications join incoming telemetry with reference data to add context like location or quality indicators.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-adx-contexualized",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    datasets: [{
                        expression: "$1 == $2",
                        inputs: [
                            "$source.country",
                            "$context.country",
                        ],
                        key: "quality",
                    }],
                    map: [
                        {
                            inputs: ["*"],
                            output: "*",
                        },
                        {
                            inputs: ["$context(quality).*"],
                            output: "enriched.*",
                        },
                    ],
                },
                name: "transformation1",
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "mytable",
                    endpointRef: "adx-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-adx-contexualized",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "datasets": [{
                        "expression": "$1 == $2",
                        "inputs": [
                            "$source.country",
                            "$context.country",
                        ],
                        "key": "quality",
                    }],
                    "map": [
                        {
                            "inputs": ["*"],
                            "output": "*",
                        },
                        {
                            "inputs": ["$context(quality).*"],
                            "output": "enriched.*",
                        },
                    ],
                },
                "name": "transformation1",
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "mytable",
                    "endpoint_ref": "adx-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-adx-contexualized"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							Datasets: iotoperations.DataflowBuiltInTransformationDatasetArray{
								&iotoperations.DataflowBuiltInTransformationDatasetArgs{
									Expression: pulumi.String("$1 == $2"),
									Inputs: pulumi.StringArray{
										pulumi.String("$source.country"),
										pulumi.String("$context.country"),
									},
									Key: pulumi.String("quality"),
								},
							},
							Map: iotoperations.DataflowBuiltInTransformationMapArray{
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("*"),
									},
									Output: pulumi.String("*"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("$context(quality).*"),
									},
									Output: pulumi.String("enriched.*"),
								},
							},
						},
						Name:          pulumi.String("transformation1"),
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("mytable"),
							EndpointRef:     pulumi.String("adx-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-adx-contexualized",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        Datasets = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationDatasetArgs
                            {
                                Expression = "$1 == $2",
                                Inputs = new[]
                                {
                                    "$source.country",
                                    "$context.country",
                                },
                                Key = "quality",
                            },
                        },
                        Map = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "*",
                                },
                                Output = "*",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "$context(quality).*",
                                },
                                Output = "enriched.*",
                            },
                        },
                    },
                    Name = "transformation1",
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "mytable",
                        EndpointRef = "adx-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-adx-contexualized")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .datasets(DataflowBuiltInTransformationDatasetArgs.builder()
                                .expression("$1 == $2")
                                .inputs(                                
                                    "$source.country",
                                    "$context.country")
                                .key("quality")
                                .build())
                            .map(                            
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .inputs("*")
                                    .output("*")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .inputs("$context(quality).*")
                                    .output("enriched.*")
                                    .build())
                            .build())
                        .name("transformation1")
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("mytable")
                            .endpointRef("adx-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-adx-contexualized
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              datasets:
                - expression: $1 == $2
                  inputs:
                    - $source.country
                    - $context.country
                  key: quality
              map:
                - inputs:
                    - '*'
                  output: '*'
                - inputs:
                    - $context(quality).*
                  output: enriched.*
            name: transformation1
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: mytable
              endpointRef: adx-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The datasets array defines reusable expressions that compare source fields ($source.country) with context data ($context.country), storing results under a key (quality). The map operations then reference these datasets via $context(quality).* to enrich the output. This pattern separates data matching logic from field mapping, enabling complex contextualization workflows.

Convert telemetry to Parquet for Microsoft Fabric

Analytics workloads in Microsoft Fabric benefit from columnar formats like Parquet, which require schema-based serialization.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-fabric",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    schemaRef: "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
                    serializationFormat: azure_native.iotoperations.TransformationSerializationFormat.Parquet,
                },
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "telemetryTable",
                    endpointRef: "fabric-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-fabric",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "schema_ref": "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
                    "serialization_format": azure_native.iotoperations.TransformationSerializationFormat.PARQUET,
                },
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "telemetryTable",
                    "endpoint_ref": "fabric-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-fabric"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							SchemaRef:           pulumi.String("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0"),
							SerializationFormat: pulumi.String(iotoperations.TransformationSerializationFormatParquet),
						},
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("telemetryTable"),
							EndpointRef:     pulumi.String("fabric-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-fabric",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        SchemaRef = "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
                        SerializationFormat = AzureNative.IoTOperations.TransformationSerializationFormat.Parquet,
                    },
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "telemetryTable",
                        EndpointRef = "fabric-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-fabric")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .schemaRef("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0")
                            .serializationFormat("Parquet")
                            .build())
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("telemetryTable")
                            .endpointRef("fabric-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-fabric
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              schemaRef: aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0
              serializationFormat: Parquet
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: telemetryTable
              endpointRef: fabric-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The builtInTransformationSettings block specifies serializationFormat as Parquet and references a schema via schemaRef (using the aio-sr:// URI scheme). The transformation operation converts JSON telemetry to Parquet according to the schema before the destination operation writes to Fabric. This pattern enables efficient columnar storage for analytics.

Beyond these examples

These snippets focus on specific dataflow-level features: MQTT-to-cloud routing patterns, filtering and transformation expressions, and contextual enrichment and format conversion. They’re intentionally minimal rather than full IoT data pipelines.

The examples reference pre-existing infrastructure such as DataflowProfile parent resources, DataflowEndpoint resources for MQTT brokers and cloud services, Azure IoT Operations instances with Custom Locations, and schema registry entries for Parquet serialization. They focus on configuring the dataflow pipeline rather than provisioning the surrounding infrastructure.

To keep things focused, common dataflow patterns are omitted, including:

  • Operational mode configuration (mode property always ‘Enabled’ in examples)
  • Asset references (assetRef in sourceSettings)
  • Error handling and retry policies
  • Batching and performance tuning
  • Authentication and authorization configuration
  • Monitoring and observability setup

These omissions are intentional: the goal is to illustrate how each dataflow operation is wired, not provide drop-in IoT solutions. See the Dataflow resource reference for all available configuration options.

Let's configure Azure IoT Operations Dataflows

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Resource Configuration & Immutability
What properties can't be changed after creating a dataflow?
The extendedLocation, dataflowName, dataflowProfileName, instanceName, and resourceGroupName properties are immutable and cannot be changed after creation.
How do I use a different API version for this resource?
Generate a local SDK package using pulumi package add azure-native iotoperations [ApiVersion]. Available versions include 2024-08-15-preview through 2025-10-01.
Operations & Pipeline Structure
What operation types can I use in a dataflow?

You can use three operation types in the operations array:

  1. Source - Reads data from endpoints using sourceSettings
  2. BuiltInTransformation - Transforms data using builtInTransformationSettings
  3. Destination - Writes data to endpoints using destinationSettings
What order should operations be in?
Operations execute in array order. Typical pattern: Source operation first, optional BuiltInTransformation operations in the middle, Destination operation last.
How do I reference endpoints in operations?
Use endpointRef in sourceSettings or destinationSettings to reference endpoint names like aio-builtin-broker-endpoint or event-hub-endpoint.
Data Transformation
How do I filter data based on conditions?
Configure builtInTransformationSettings.filter with an expression (like $1 > 9000 && $2 >= 8000) and inputs array specifying the fields to evaluate.
What's the difference between PassThrough and NewProperties mapping?
PassThrough copies data unchanged (inputs: ["*"], output: "*"), while NewProperties creates new fields using expressions to transform input values.
How do I use contextualization in transformations?
Create datasets with expressions comparing source and context data. Use $source.field and $context.field in inputs, then reference the dataset with $context(datasetKey).* in map operations.
What transformation expressions and functions are supported?
Examples show arithmetic (($1+$2)/2), comparison ($1 > 9000), boolean (&&, >=), and custom functions (cToF($1), scale($1,0,10,0,100)).
Serialization & Schemas
What serialization formats are supported?
Three formats are shown: Json (source data), Delta (transformation output), and Parquet (for destinations like Fabric).
How do I reference a schema for serialization?
Use schemaRef with the aio-sr:// format: aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0.
Data Sources & Patterns
Can I use MQTT wildcards in data sources?
Yes, use + for single-level wildcards and # for multi-level wildcards in dataSources, like thermostats/+/telemetry/temperature/#.

Using a different cloud?

Explore integration guides for other cloud providers: