Configure Azure IoT Operations Dataflows

The azure-native:iotoperations:Dataflow resource, part of the Pulumi Azure Native provider, defines data movement pipelines within Azure IoT Operations. It orchestrates how telemetry flows from source endpoints through transformation logic to destination endpoints. This guide focuses on three capabilities: MQTT-to-cloud routing patterns, built-in transformation operations, and format conversion for analytics destinations.

Dataflows belong to a DataflowProfile and reference DataflowEndpoint resources that define connectivity to MQTT brokers, Event Grid, Event Hub, Azure Data Explorer, and Microsoft Fabric. The examples are intentionally small. Combine them with your own endpoint configurations and IoT Operations infrastructure.

Route MQTT telemetry to Event Grid topics

IoT deployments often route device telemetry from MQTT brokers to Azure Event Grid for event-driven processing.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-event-grid",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["thermostats/+/telemetry/temperature/#"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                destinationSettings: {
                    dataDestination: "factory/telemetry",
                    endpointRef: "event-grid-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-event-grid",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["thermostats/+/telemetry/temperature/#"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "destination_settings": {
                    "data_destination": "factory/telemetry",
                    "endpoint_ref": "event-grid-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-event-grid"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("thermostats/+/telemetry/temperature/#"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("factory/telemetry"),
							EndpointRef:     pulumi.String("event-grid-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-event-grid",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "thermostats/+/telemetry/temperature/#",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "factory/telemetry",
                        EndpointRef = "event-grid-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-event-grid")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("thermostats/+/telemetry/temperature/#")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("factory/telemetry")
                            .endpointRef("event-grid-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-event-grid
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - thermostats/+/telemetry/temperature/#
              endpointRef: aio-builtin-broker-endpoint
          - destinationSettings:
              dataDestination: factory/telemetry
              endpointRef: event-grid-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The operations array defines a pipeline: the source operation reads from MQTT topics matching the wildcard pattern, and the destination operation writes to an Event Grid topic. The endpointRef properties point to DataflowEndpoint resources that handle authentication and connectivity. The dataDestination specifies the Event Grid topic name where messages land.

Filter telemetry and republish to MQTT topics

Applications filter incoming telemetry based on threshold conditions before routing to specialized MQTT topics.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "mqtt-filter-to-topic",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    filter: [{
                        description: "filter-datapoint",
                        expression: "$1 > 9000 && $2 >= 8000",
                        inputs: [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                        type: azure_native.iotoperations.FilterType.Filter,
                    }],
                    map: [{
                        inputs: ["*"],
                        output: "*",
                        type: azure_native.iotoperations.DataflowMappingType.PassThrough,
                    }],
                },
                name: "transformation1",
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "data/filtered/thermostat",
                    endpointRef: "aio-builtin-broker-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="mqtt-filter-to-topic",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "filter": [{
                        "description": "filter-datapoint",
                        "expression": "$1 > 9000 && $2 >= 8000",
                        "inputs": [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                        "type": azure_native.iotoperations.FilterType.FILTER,
                    }],
                    "map": [{
                        "inputs": ["*"],
                        "output": "*",
                        "type": azure_native.iotoperations.DataflowMappingType.PASS_THROUGH,
                    }],
                },
                "name": "transformation1",
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "data/filtered/thermostat",
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("mqtt-filter-to-topic"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
								&iotoperations.DataflowBuiltInTransformationFilterArgs{
									Description: pulumi.String("filter-datapoint"),
									Expression:  pulumi.String("$1 > 9000 && $2 >= 8000"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
										pulumi.String("\"Tag 10\".Value"),
									},
									Type: pulumi.String(iotoperations.FilterTypeFilter),
								},
							},
							Map: iotoperations.DataflowBuiltInTransformationMapArray{
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("*"),
									},
									Output: pulumi.String("*"),
									Type:   pulumi.String(iotoperations.DataflowMappingTypePassThrough),
								},
							},
						},
						Name:          pulumi.String("transformation1"),
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("data/filtered/thermostat"),
							EndpointRef:     pulumi.String("aio-builtin-broker-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "mqtt-filter-to-topic",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        Filter = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
                            {
                                Description = "filter-datapoint",
                                Expression = "$1 > 9000 && $2 >= 8000",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                    "\"Tag 10\".Value",
                                },
                                Type = AzureNative.IoTOperations.FilterType.Filter,
                            },
                        },
                        Map = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "*",
                                },
                                Output = "*",
                                Type = AzureNative.IoTOperations.DataflowMappingType.PassThrough,
                            },
                        },
                    },
                    Name = "transformation1",
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "data/filtered/thermostat",
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("mqtt-filter-to-topic")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .filter(DataflowBuiltInTransformationFilterArgs.builder()
                                .description("filter-datapoint")
                                .expression("$1 > 9000 && $2 >= 8000")
                                .inputs(                                
                                    "temperature.Value",
                                    "\"Tag 10\".Value")
                                .type("Filter")
                                .build())
                            .map(DataflowBuiltInTransformationMapArgs.builder()
                                .inputs("*")
                                .output("*")
                                .type("PassThrough")
                                .build())
                            .build())
                        .name("transformation1")
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("data/filtered/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: mqtt-filter-to-topic
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              filter:
                - description: filter-datapoint
                  expression: $1 > 9000 && $2 >= 8000
                  inputs:
                    - temperature.Value
                    - '"Tag 10".Value'
                  type: Filter
              map:
                - inputs:
                    - '*'
                  output: '*'
                  type: PassThrough
            name: transformation1
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: data/filtered/thermostat
              endpointRef: aio-builtin-broker-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The builtInTransformationSettings block sits between source and destination operations. The filter array evaluates expressions against input fields; only messages where both temperature.Value exceeds 9000 and “Tag 10”.Value exceeds 8000 pass through. The map array with PassThrough type forwards all fields unchanged. Filtered messages republish to a different MQTT topic on the same broker.

Transform telemetry with expressions and send to Event Hub

Complex pipelines combine filtering, field mapping, and custom expressions to reshape telemetry for Event Hub analytics.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-event-hub-transformed",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    filter: [{
                        expression: "$1 > 9000 && $2 >= 8000",
                        inputs: [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                    }],
                    map: [
                        {
                            inputs: ["*"],
                            output: "*",
                        },
                        {
                            expression: "($1+$2)/2",
                            inputs: [
                                "temperature.Value",
                                "\"Tag 10\".Value",
                            ],
                            output: "AvgTemp.Value",
                        },
                        {
                            expression: "true",
                            inputs: [],
                            output: "dataflow-processed",
                        },
                        {
                            expression: "",
                            inputs: ["temperature.SourceTimestamp"],
                            output: "",
                        },
                        {
                            expression: "",
                            inputs: ["\"Tag 10\""],
                            output: "pressure",
                        },
                        {
                            expression: "cToF($1)",
                            inputs: ["temperature.Value"],
                            output: "temperatureF.Value",
                        },
                        {
                            expression: "scale ($1,0,10,0,100)",
                            inputs: ["\"Tag 10\".Value"],
                            output: "\"Scale Tag 10\".Value",
                        },
                    ],
                },
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "myuniqueeventhub",
                    endpointRef: "event-hub-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-event-hub-transformed",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "filter": [{
                        "expression": "$1 > 9000 && $2 >= 8000",
                        "inputs": [
                            "temperature.Value",
                            "\"Tag 10\".Value",
                        ],
                    }],
                    "map": [
                        {
                            "inputs": ["*"],
                            "output": "*",
                        },
                        {
                            "expression": "($1+$2)/2",
                            "inputs": [
                                "temperature.Value",
                                "\"Tag 10\".Value",
                            ],
                            "output": "AvgTemp.Value",
                        },
                        {
                            "expression": "true",
                            "inputs": [],
                            "output": "dataflow-processed",
                        },
                        {
                            "expression": "",
                            "inputs": ["temperature.SourceTimestamp"],
                            "output": "",
                        },
                        {
                            "expression": "",
                            "inputs": ["\"Tag 10\""],
                            "output": "pressure",
                        },
                        {
                            "expression": "cToF($1)",
                            "inputs": ["temperature.Value"],
                            "output": "temperatureF.Value",
                        },
                        {
                            "expression": "scale ($1,0,10,0,100)",
                            "inputs": ["\"Tag 10\".Value"],
                            "output": "\"Scale Tag 10\".Value",
                        },
                    ],
                },
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "myuniqueeventhub",
                    "endpoint_ref": "event-hub-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-event-hub-transformed"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
								&iotoperations.DataflowBuiltInTransformationFilterArgs{
									Expression: pulumi.String("$1 > 9000 && $2 >= 8000"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
										pulumi.String("\"Tag 10\".Value"),
									},
								},
							},
							Map: iotoperations.DataflowBuiltInTransformationMapArray{
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("*"),
									},
									Output: pulumi.String("*"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("($1+$2)/2"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
										pulumi.String("\"Tag 10\".Value"),
									},
									Output: pulumi.String("AvgTemp.Value"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("true"),
									Inputs:     pulumi.StringArray{},
									Output:     pulumi.String("dataflow-processed"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String(""),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.SourceTimestamp"),
									},
									Output: pulumi.String(""),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String(""),
									Inputs: pulumi.StringArray{
										pulumi.String("\"Tag 10\""),
									},
									Output: pulumi.String("pressure"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("cToF($1)"),
									Inputs: pulumi.StringArray{
										pulumi.String("temperature.Value"),
									},
									Output: pulumi.String("temperatureF.Value"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Expression: pulumi.String("scale ($1,0,10,0,100)"),
									Inputs: pulumi.StringArray{
										pulumi.String("\"Tag 10\".Value"),
									},
									Output: pulumi.String("\"Scale Tag 10\".Value"),
								},
							},
						},
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("myuniqueeventhub"),
							EndpointRef:     pulumi.String("event-hub-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-event-hub-transformed",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        Filter = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
                            {
                                Expression = "$1 > 9000 && $2 >= 8000",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                    "\"Tag 10\".Value",
                                },
                            },
                        },
                        Map = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "*",
                                },
                                Output = "*",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "($1+$2)/2",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                    "\"Tag 10\".Value",
                                },
                                Output = "AvgTemp.Value",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "true",
                                Inputs = new() { },
                                Output = "dataflow-processed",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "",
                                Inputs = new[]
                                {
                                    "temperature.SourceTimestamp",
                                },
                                Output = "",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "",
                                Inputs = new[]
                                {
                                    "\"Tag 10\"",
                                },
                                Output = "pressure",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "cToF($1)",
                                Inputs = new[]
                                {
                                    "temperature.Value",
                                },
                                Output = "temperatureF.Value",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Expression = "scale ($1,0,10,0,100)",
                                Inputs = new[]
                                {
                                    "\"Tag 10\".Value",
                                },
                                Output = "\"Scale Tag 10\".Value",
                            },
                        },
                    },
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "myuniqueeventhub",
                        EndpointRef = "event-hub-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-event-hub-transformed")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .filter(DataflowBuiltInTransformationFilterArgs.builder()
                                .expression("$1 > 9000 && $2 >= 8000")
                                .inputs(                                
                                    "temperature.Value",
                                    "\"Tag 10\".Value")
                                .build())
                            .map(                            
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .inputs("*")
                                    .output("*")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("($1+$2)/2")
                                    .inputs(                                    
                                        "temperature.Value",
                                        "\"Tag 10\".Value")
                                    .output("AvgTemp.Value")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("true")
                                    .inputs()
                                    .output("dataflow-processed")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("")
                                    .inputs("temperature.SourceTimestamp")
                                    .output("")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("")
                                    .inputs("\"Tag 10\"")
                                    .output("pressure")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("cToF($1)")
                                    .inputs("temperature.Value")
                                    .output("temperatureF.Value")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .expression("scale ($1,0,10,0,100)")
                                    .inputs("\"Tag 10\".Value")
                                    .output("\"Scale Tag 10\".Value")
                                    .build())
                            .build())
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("myuniqueeventhub")
                            .endpointRef("event-hub-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-event-hub-transformed
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              filter:
                - expression: $1 > 9000 && $2 >= 8000
                  inputs:
                    - temperature.Value
                    - '"Tag 10".Value'
              map:
                - inputs:
                    - '*'
                  output: '*'
                - expression: ($1+$2)/2
                  inputs:
                    - temperature.Value
                    - '"Tag 10".Value'
                  output: AvgTemp.Value
                - expression: 'true'
                  inputs: []
                  output: dataflow-processed
                - expression: ""
                  inputs:
                    - temperature.SourceTimestamp
                  output: ""
                - expression: ""
                  inputs:
                    - '"Tag 10"'
                  output: pressure
                - expression: cToF($1)
                  inputs:
                    - temperature.Value
                  output: temperatureF.Value
                - expression: scale ($1,0,10,0,100)
                  inputs:
                    - '"Tag 10".Value'
                  output: '"Scale Tag 10".Value'
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: myuniqueeventhub
              endpointRef: event-hub-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The transformation operation chains multiple map entries. The first passes all fields through with “*”. Subsequent entries calculate derived values: expression “($1+$2)/2” computes an average, “cToF($1)” converts Celsius to Fahrenheit, and “scale ($1,0,10,0,100)” normalizes values. Each map entry specifies inputs (field references) and output (target field name). The filter operation runs first, dropping messages that don’t meet threshold criteria.

Enrich telemetry with contextual data lookups

Industrial scenarios enrich telemetry by matching fields against reference datasets.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-adx-contexualized",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    datasets: [{
                        expression: "$1 == $2",
                        inputs: [
                            "$source.country",
                            "$context.country",
                        ],
                        key: "quality",
                    }],
                    map: [
                        {
                            inputs: ["*"],
                            output: "*",
                        },
                        {
                            inputs: ["$context(quality).*"],
                            output: "enriched.*",
                        },
                    ],
                },
                name: "transformation1",
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "mytable",
                    endpointRef: "adx-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-adx-contexualized",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "datasets": [{
                        "expression": "$1 == $2",
                        "inputs": [
                            "$source.country",
                            "$context.country",
                        ],
                        "key": "quality",
                    }],
                    "map": [
                        {
                            "inputs": ["*"],
                            "output": "*",
                        },
                        {
                            "inputs": ["$context(quality).*"],
                            "output": "enriched.*",
                        },
                    ],
                },
                "name": "transformation1",
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "mytable",
                    "endpoint_ref": "adx-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-adx-contexualized"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							Datasets: iotoperations.DataflowBuiltInTransformationDatasetArray{
								&iotoperations.DataflowBuiltInTransformationDatasetArgs{
									Expression: pulumi.String("$1 == $2"),
									Inputs: pulumi.StringArray{
										pulumi.String("$source.country"),
										pulumi.String("$context.country"),
									},
									Key: pulumi.String("quality"),
								},
							},
							Map: iotoperations.DataflowBuiltInTransformationMapArray{
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("*"),
									},
									Output: pulumi.String("*"),
								},
								&iotoperations.DataflowBuiltInTransformationMapArgs{
									Inputs: pulumi.StringArray{
										pulumi.String("$context(quality).*"),
									},
									Output: pulumi.String("enriched.*"),
								},
							},
						},
						Name:          pulumi.String("transformation1"),
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("mytable"),
							EndpointRef:     pulumi.String("adx-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-adx-contexualized",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        Datasets = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationDatasetArgs
                            {
                                Expression = "$1 == $2",
                                Inputs = new[]
                                {
                                    "$source.country",
                                    "$context.country",
                                },
                                Key = "quality",
                            },
                        },
                        Map = new[]
                        {
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "*",
                                },
                                Output = "*",
                            },
                            new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
                            {
                                Inputs = new[]
                                {
                                    "$context(quality).*",
                                },
                                Output = "enriched.*",
                            },
                        },
                    },
                    Name = "transformation1",
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "mytable",
                        EndpointRef = "adx-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-adx-contexualized")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .datasets(DataflowBuiltInTransformationDatasetArgs.builder()
                                .expression("$1 == $2")
                                .inputs(                                
                                    "$source.country",
                                    "$context.country")
                                .key("quality")
                                .build())
                            .map(                            
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .inputs("*")
                                    .output("*")
                                    .build(),
                                DataflowBuiltInTransformationMapArgs.builder()
                                    .inputs("$context(quality).*")
                                    .output("enriched.*")
                                    .build())
                            .build())
                        .name("transformation1")
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("mytable")
                            .endpointRef("adx-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-adx-contexualized
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              datasets:
                - expression: $1 == $2
                  inputs:
                    - $source.country
                    - $context.country
                  key: quality
              map:
                - inputs:
                    - '*'
                  output: '*'
                - inputs:
                    - $context(quality).*
                  output: enriched.*
            name: transformation1
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: mytable
              endpointRef: adx-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The datasets array defines lookups: the expression “$1 == $2” compares $source.country from incoming telemetry against $context.country from a contextual dataset. Matches populate a “quality” key. The map operations then reference this dataset with “$context(quality).” to copy enrichment fields into the output under “enriched.”. This pattern joins streaming telemetry with static reference data before writing to Azure Data Explorer.

Convert telemetry to Parquet for Microsoft Fabric

Analytics workloads in Microsoft Fabric benefit from Parquet format for efficient columnar storage.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
    dataflowName: "aio-to-fabric",
    dataflowProfileName: "resource-name123",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        mode: azure_native.iotoperations.OperationalMode.Enabled,
        operations: [
            {
                name: "source1",
                operationType: azure_native.iotoperations.OperationType.Source,
                sourceSettings: {
                    dataSources: ["azure-iot-operations/data/thermostat"],
                    endpointRef: "aio-builtin-broker-endpoint",
                },
            },
            {
                builtInTransformationSettings: {
                    schemaRef: "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
                    serializationFormat: azure_native.iotoperations.TransformationSerializationFormat.Parquet,
                },
                operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
            },
            {
                destinationSettings: {
                    dataDestination: "telemetryTable",
                    endpointRef: "fabric-endpoint",
                },
                name: "destination1",
                operationType: azure_native.iotoperations.OperationType.Destination,
            },
        ],
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow = azure_native.iotoperations.Dataflow("dataflow",
    dataflow_name="aio-to-fabric",
    dataflow_profile_name="resource-name123",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "mode": azure_native.iotoperations.OperationalMode.ENABLED,
        "operations": [
            {
                "name": "source1",
                "operation_type": azure_native.iotoperations.OperationType.SOURCE,
                "source_settings": {
                    "data_sources": ["azure-iot-operations/data/thermostat"],
                    "endpoint_ref": "aio-builtin-broker-endpoint",
                },
            },
            {
                "built_in_transformation_settings": {
                    "schema_ref": "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
                    "serialization_format": azure_native.iotoperations.TransformationSerializationFormat.PARQUET,
                },
                "operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
            },
            {
                "destination_settings": {
                    "data_destination": "telemetryTable",
                    "endpoint_ref": "fabric-endpoint",
                },
                "name": "destination1",
                "operation_type": azure_native.iotoperations.OperationType.DESTINATION,
            },
        ],
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
			DataflowName:        pulumi.String("aio-to-fabric"),
			DataflowProfileName: pulumi.String("resource-name123"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowPropertiesArgs{
				Mode: pulumi.String(iotoperations.OperationalModeEnabled),
				Operations: iotoperations.DataflowOperationArray{
					&iotoperations.DataflowOperationArgs{
						Name:          pulumi.String("source1"),
						OperationType: pulumi.String(iotoperations.OperationTypeSource),
						SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
							DataSources: pulumi.StringArray{
								pulumi.String("azure-iot-operations/data/thermostat"),
							},
							EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
						},
					},
					&iotoperations.DataflowOperationArgs{
						BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
							SchemaRef:           pulumi.String("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0"),
							SerializationFormat: pulumi.String(iotoperations.TransformationSerializationFormatParquet),
						},
						OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
					},
					&iotoperations.DataflowOperationArgs{
						DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
							DataDestination: pulumi.String("telemetryTable"),
							EndpointRef:     pulumi.String("fabric-endpoint"),
						},
						Name:          pulumi.String("destination1"),
						OperationType: pulumi.String(iotoperations.OperationTypeDestination),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
    {
        DataflowName = "aio-to-fabric",
        DataflowProfileName = "resource-name123",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
        {
            Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
            Operations = new[]
            {
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    Name = "source1",
                    OperationType = AzureNative.IoTOperations.OperationType.Source,
                    SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
                    {
                        DataSources = new[]
                        {
                            "azure-iot-operations/data/thermostat",
                        },
                        EndpointRef = "aio-builtin-broker-endpoint",
                    },
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
                    {
                        SchemaRef = "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
                        SerializationFormat = AzureNative.IoTOperations.TransformationSerializationFormat.Parquet,
                    },
                    OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
                },
                new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
                {
                    DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
                    {
                        DataDestination = "telemetryTable",
                        EndpointRef = "fabric-endpoint",
                    },
                    Name = "destination1",
                    OperationType = AzureNative.IoTOperations.OperationType.Destination,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
            .dataflowName("aio-to-fabric")
            .dataflowProfileName("resource-name123")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowPropertiesArgs.builder()
                .mode("Enabled")
                .operations(                
                    DataflowOperationArgs.builder()
                        .name("source1")
                        .operationType("Source")
                        .sourceSettings(DataflowSourceOperationSettingsArgs.builder()
                            .dataSources("azure-iot-operations/data/thermostat")
                            .endpointRef("aio-builtin-broker-endpoint")
                            .build())
                        .build(),
                    DataflowOperationArgs.builder()
                        .builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
                            .schemaRef("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0")
                            .serializationFormat("Parquet")
                            .build())
                        .operationType("BuiltInTransformation")
                        .build(),
                    DataflowOperationArgs.builder()
                        .destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
                            .dataDestination("telemetryTable")
                            .endpointRef("fabric-endpoint")
                            .build())
                        .name("destination1")
                        .operationType("Destination")
                        .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflow:
    type: azure-native:iotoperations:Dataflow
    properties:
      dataflowName: aio-to-fabric
      dataflowProfileName: resource-name123
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        mode: Enabled
        operations:
          - name: source1
            operationType: Source
            sourceSettings:
              dataSources:
                - azure-iot-operations/data/thermostat
              endpointRef: aio-builtin-broker-endpoint
          - builtInTransformationSettings:
              schemaRef: aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0
              serializationFormat: Parquet
            operationType: BuiltInTransformation
          - destinationSettings:
              dataDestination: telemetryTable
              endpointRef: fabric-endpoint
            name: destination1
            operationType: Destination
      resourceGroupName: rgiotoperations

The builtInTransformationSettings specifies serializationFormat as Parquet and references a schema via schemaRef. The schema URI “aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0” points to a registered Parquet schema that defines column types and structure. The transformation converts JSON telemetry to Parquet batches before writing to the Fabric table specified in dataDestination.

Beyond these examples

These snippets focus on specific dataflow-level features: MQTT source and destination routing, filtering and field mapping transformations, contextual data enrichment, and format conversion. They’re intentionally minimal rather than full data integration pipelines.

The examples reference pre-existing infrastructure such as DataflowProfile parent resources, DataflowEndpoint resources for MQTT brokers and cloud services, Azure IoT Operations instances with custom locations, and schema registries with Parquet schemas. They focus on configuring the dataflow rather than provisioning the surrounding infrastructure.

To keep things focused, common dataflow patterns are omitted, including:

  • Error handling and retry configuration
  • Batching and throughput tuning
  • Authentication and authorization setup for endpoints
  • Schema validation and evolution
  • Monitoring and observability configuration
  • Multi-stage transformation pipelines with intermediate topics

These omissions are intentional: the goal is to illustrate how each dataflow feature is wired, not provide drop-in data integration modules. See the Dataflow resource reference for all available configuration options.

Let's configure Azure IoT Operations Dataflows

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Lifecycle
What properties can't I change after creating a dataflow?
The dataflowName, dataflowProfileName, instanceName, resourceGroupName, and extendedLocation properties are immutable. Changes to these require resource replacement.
What's required to create a dataflow?
You must specify extendedLocation (with name and type CustomLocation), properties with at least one Source operation and one Destination operation, and the resource identifiers (dataflowName, dataflowProfileName, instanceName, resourceGroupName).
Operations & Pipeline
What operation types can I configure in a dataflow?

Three operation types are available:

  1. Source - Ingests data from endpoints
  2. BuiltInTransformation - Processes data with filters, maps, and datasets
  3. Destination - Sends data to endpoints
How do I reference an endpoint in my dataflow?
Use endpointRef in sourceSettings or destinationSettings to reference a pre-configured endpoint by name (e.g., endpointRef: "aio-builtin-broker-endpoint").
What destinations are supported?
Supported destinations include Event Hub, Azure Data Explorer (ADX), Event Grid, Microsoft Fabric, and MQTT topics via the built-in broker endpoint.
Data Transformation
How do I filter data based on field values?
Use filter in builtInTransformationSettings with an expression like $1 > 9000 && $2 >= 8000 and specify inputs as field paths (e.g., ["temperature.Value", "\"Tag 10\".Value"]).
How do I transform field values?

Use map in builtInTransformationSettings with expressions:

  • Calculations: ($1+$2)/2
  • Conversions: cToF($1)
  • Scaling: scale ($1,0,10,0,100) Specify inputs as field paths and output as the target field name.
Can I use wildcard mapping to pass through all fields?
Yes, configure a map with inputs: ["*"] and output: "*" to pass through all fields unchanged.
What serialization formats can I use for output?
Three formats are supported via serializationFormat in builtInTransformationSettings: Delta, Parquet, and Json.
How do I set up contextualization with datasets?
Define datasets in builtInTransformationSettings with an expression (e.g., $1 == $2), inputs from source and context fields, and a key for the enriched data. Then reference the dataset in map operations using $context(key).*.

Using a different cloud?

Explore integration guides for other cloud providers: