The azure-native:iotoperations:Dataflow resource, part of the Pulumi Azure Native provider, defines data routing pipelines within Azure IoT Operations that move telemetry between MQTT brokers and cloud destinations, with optional transformation. This guide focuses on three capabilities: MQTT-to-cloud routing patterns, filtering and transformation expressions, and contextual enrichment and format conversion.
Dataflows belong to a DataflowProfile and reference DataflowEndpoint resources for sources and destinations. The examples are intentionally small. Combine them with your own endpoint configurations, schema registries, and transformation logic.
Route MQTT telemetry to Event Grid topics
IoT deployments often route device telemetry from MQTT brokers to Azure Event Grid for event-driven processing.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-event-grid",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["thermostats/+/telemetry/temperature/#"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
destinationSettings: {
dataDestination: "factory/telemetry",
endpointRef: "event-grid-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-event-grid",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["thermostats/+/telemetry/temperature/#"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"destination_settings": {
"data_destination": "factory/telemetry",
"endpoint_ref": "event-grid-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-event-grid"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("thermostats/+/telemetry/temperature/#"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("factory/telemetry"),
EndpointRef: pulumi.String("event-grid-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-event-grid",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"thermostats/+/telemetry/temperature/#",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "factory/telemetry",
EndpointRef = "event-grid-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-event-grid")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("thermostats/+/telemetry/temperature/#")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("factory/telemetry")
.endpointRef("event-grid-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-event-grid
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- thermostats/+/telemetry/temperature/#
endpointRef: aio-builtin-broker-endpoint
- destinationSettings:
dataDestination: factory/telemetry
endpointRef: event-grid-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The operations array defines a pipeline: a source operation reads from MQTT topics via dataSources (supporting wildcards like + and #), and a destination operation writes to Event Grid via dataDestination. The endpointRef properties link to separately defined DataflowEndpoint resources that handle authentication and connection details.
Filter telemetry and republish to MQTT topics
Applications filter incoming telemetry based on threshold conditions before routing to specialized MQTT topics.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "mqtt-filter-to-topic",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
filter: [{
description: "filter-datapoint",
expression: "$1 > 9000 && $2 >= 8000",
inputs: [
"temperature.Value",
"\"Tag 10\".Value",
],
type: azure_native.iotoperations.FilterType.Filter,
}],
map: [{
inputs: ["*"],
output: "*",
type: azure_native.iotoperations.DataflowMappingType.PassThrough,
}],
},
name: "transformation1",
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "data/filtered/thermostat",
endpointRef: "aio-builtin-broker-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="mqtt-filter-to-topic",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"filter": [{
"description": "filter-datapoint",
"expression": "$1 > 9000 && $2 >= 8000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value",
],
"type": azure_native.iotoperations.FilterType.FILTER,
}],
"map": [{
"inputs": ["*"],
"output": "*",
"type": azure_native.iotoperations.DataflowMappingType.PASS_THROUGH,
}],
},
"name": "transformation1",
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "data/filtered/thermostat",
"endpoint_ref": "aio-builtin-broker-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("mqtt-filter-to-topic"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
&iotoperations.DataflowBuiltInTransformationFilterArgs{
Description: pulumi.String("filter-datapoint"),
Expression: pulumi.String("$1 > 9000 && $2 >= 8000"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
pulumi.String("\"Tag 10\".Value"),
},
Type: pulumi.String(iotoperations.FilterTypeFilter),
},
},
Map: iotoperations.DataflowBuiltInTransformationMapArray{
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("*"),
},
Output: pulumi.String("*"),
Type: pulumi.String(iotoperations.DataflowMappingTypePassThrough),
},
},
},
Name: pulumi.String("transformation1"),
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("data/filtered/thermostat"),
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "mqtt-filter-to-topic",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
Filter = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
{
Description = "filter-datapoint",
Expression = "$1 > 9000 && $2 >= 8000",
Inputs = new[]
{
"temperature.Value",
"\"Tag 10\".Value",
},
Type = AzureNative.IoTOperations.FilterType.Filter,
},
},
Map = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"*",
},
Output = "*",
Type = AzureNative.IoTOperations.DataflowMappingType.PassThrough,
},
},
},
Name = "transformation1",
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "data/filtered/thermostat",
EndpointRef = "aio-builtin-broker-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("mqtt-filter-to-topic")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.filter(DataflowBuiltInTransformationFilterArgs.builder()
.description("filter-datapoint")
.expression("$1 > 9000 && $2 >= 8000")
.inputs(
"temperature.Value",
"\"Tag 10\".Value")
.type("Filter")
.build())
.map(DataflowBuiltInTransformationMapArgs.builder()
.inputs("*")
.output("*")
.type("PassThrough")
.build())
.build())
.name("transformation1")
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("data/filtered/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: mqtt-filter-to-topic
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
filter:
- description: filter-datapoint
expression: $1 > 9000 && $2 >= 8000
inputs:
- temperature.Value
- '"Tag 10".Value'
type: Filter
map:
- inputs:
- '*'
output: '*'
type: PassThrough
name: transformation1
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: data/filtered/thermostat
endpointRef: aio-builtin-broker-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The builtInTransformationSettings block sits between source and destination operations. The filter array evaluates expressions against inputs (field references like temperature.Value), and only matching records proceed. The map array with PassThrough type forwards all fields unchanged. This pattern creates a filtered republish workflow within the same MQTT broker.
Transform telemetry with expressions and route to Event Hub
Complex scenarios require filtering, calculating derived metrics, and applying custom transformations before Event Hub delivery.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-event-hub-transformed",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
filter: [{
expression: "$1 > 9000 && $2 >= 8000",
inputs: [
"temperature.Value",
"\"Tag 10\".Value",
],
}],
map: [
{
inputs: ["*"],
output: "*",
},
{
expression: "($1+$2)/2",
inputs: [
"temperature.Value",
"\"Tag 10\".Value",
],
output: "AvgTemp.Value",
},
{
expression: "true",
inputs: [],
output: "dataflow-processed",
},
{
expression: "",
inputs: ["temperature.SourceTimestamp"],
output: "",
},
{
expression: "",
inputs: ["\"Tag 10\""],
output: "pressure",
},
{
expression: "cToF($1)",
inputs: ["temperature.Value"],
output: "temperatureF.Value",
},
{
expression: "scale ($1,0,10,0,100)",
inputs: ["\"Tag 10\".Value"],
output: "\"Scale Tag 10\".Value",
},
],
},
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "myuniqueeventhub",
endpointRef: "event-hub-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-event-hub-transformed",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"filter": [{
"expression": "$1 > 9000 && $2 >= 8000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value",
],
}],
"map": [
{
"inputs": ["*"],
"output": "*",
},
{
"expression": "($1+$2)/2",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value",
],
"output": "AvgTemp.Value",
},
{
"expression": "true",
"inputs": [],
"output": "dataflow-processed",
},
{
"expression": "",
"inputs": ["temperature.SourceTimestamp"],
"output": "",
},
{
"expression": "",
"inputs": ["\"Tag 10\""],
"output": "pressure",
},
{
"expression": "cToF($1)",
"inputs": ["temperature.Value"],
"output": "temperatureF.Value",
},
{
"expression": "scale ($1,0,10,0,100)",
"inputs": ["\"Tag 10\".Value"],
"output": "\"Scale Tag 10\".Value",
},
],
},
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "myuniqueeventhub",
"endpoint_ref": "event-hub-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-event-hub-transformed"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
&iotoperations.DataflowBuiltInTransformationFilterArgs{
Expression: pulumi.String("$1 > 9000 && $2 >= 8000"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
pulumi.String("\"Tag 10\".Value"),
},
},
},
Map: iotoperations.DataflowBuiltInTransformationMapArray{
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("*"),
},
Output: pulumi.String("*"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("($1+$2)/2"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
pulumi.String("\"Tag 10\".Value"),
},
Output: pulumi.String("AvgTemp.Value"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("true"),
Inputs: pulumi.StringArray{},
Output: pulumi.String("dataflow-processed"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String(""),
Inputs: pulumi.StringArray{
pulumi.String("temperature.SourceTimestamp"),
},
Output: pulumi.String(""),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String(""),
Inputs: pulumi.StringArray{
pulumi.String("\"Tag 10\""),
},
Output: pulumi.String("pressure"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("cToF($1)"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
},
Output: pulumi.String("temperatureF.Value"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("scale ($1,0,10,0,100)"),
Inputs: pulumi.StringArray{
pulumi.String("\"Tag 10\".Value"),
},
Output: pulumi.String("\"Scale Tag 10\".Value"),
},
},
},
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("myuniqueeventhub"),
EndpointRef: pulumi.String("event-hub-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-event-hub-transformed",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
Filter = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
{
Expression = "$1 > 9000 && $2 >= 8000",
Inputs = new[]
{
"temperature.Value",
"\"Tag 10\".Value",
},
},
},
Map = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"*",
},
Output = "*",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "($1+$2)/2",
Inputs = new[]
{
"temperature.Value",
"\"Tag 10\".Value",
},
Output = "AvgTemp.Value",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "true",
Inputs = new() { },
Output = "dataflow-processed",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "",
Inputs = new[]
{
"temperature.SourceTimestamp",
},
Output = "",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "",
Inputs = new[]
{
"\"Tag 10\"",
},
Output = "pressure",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "cToF($1)",
Inputs = new[]
{
"temperature.Value",
},
Output = "temperatureF.Value",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "scale ($1,0,10,0,100)",
Inputs = new[]
{
"\"Tag 10\".Value",
},
Output = "\"Scale Tag 10\".Value",
},
},
},
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "myuniqueeventhub",
EndpointRef = "event-hub-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-event-hub-transformed")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.filter(DataflowBuiltInTransformationFilterArgs.builder()
.expression("$1 > 9000 && $2 >= 8000")
.inputs(
"temperature.Value",
"\"Tag 10\".Value")
.build())
.map(
DataflowBuiltInTransformationMapArgs.builder()
.inputs("*")
.output("*")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("($1+$2)/2")
.inputs(
"temperature.Value",
"\"Tag 10\".Value")
.output("AvgTemp.Value")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("true")
.inputs()
.output("dataflow-processed")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("")
.inputs("temperature.SourceTimestamp")
.output("")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("")
.inputs("\"Tag 10\"")
.output("pressure")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("cToF($1)")
.inputs("temperature.Value")
.output("temperatureF.Value")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("scale ($1,0,10,0,100)")
.inputs("\"Tag 10\".Value")
.output("\"Scale Tag 10\".Value")
.build())
.build())
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("myuniqueeventhub")
.endpointRef("event-hub-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-event-hub-transformed
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
filter:
- expression: $1 > 9000 && $2 >= 8000
inputs:
- temperature.Value
- '"Tag 10".Value'
map:
- inputs:
- '*'
output: '*'
- expression: ($1+$2)/2
inputs:
- temperature.Value
- '"Tag 10".Value'
output: AvgTemp.Value
- expression: 'true'
inputs: []
output: dataflow-processed
- expression: ""
inputs:
- temperature.SourceTimestamp
output: ""
- expression: ""
inputs:
- '"Tag 10"'
output: pressure
- expression: cToF($1)
inputs:
- temperature.Value
output: temperatureF.Value
- expression: scale ($1,0,10,0,100)
inputs:
- '"Tag 10".Value'
output: '"Scale Tag 10".Value'
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: myuniqueeventhub
endpointRef: event-hub-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
Multiple map entries define a transformation pipeline: passthrough (* to *), calculated fields (expressions like ($1+$2)/2), constant additions (true to dataflow-processed), field renames (empty expression copies input to output), and function calls (cToF($1) for unit conversion, scale($1,0,10,0,100) for normalization). The filter operation runs first, then transformations apply to surviving records.
Enrich telemetry with contextual data from datasets
Industrial IoT applications join incoming telemetry with reference data to add context like location or quality indicators.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-adx-contexualized",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
datasets: [{
expression: "$1 == $2",
inputs: [
"$source.country",
"$context.country",
],
key: "quality",
}],
map: [
{
inputs: ["*"],
output: "*",
},
{
inputs: ["$context(quality).*"],
output: "enriched.*",
},
],
},
name: "transformation1",
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "mytable",
endpointRef: "adx-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-adx-contexualized",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"datasets": [{
"expression": "$1 == $2",
"inputs": [
"$source.country",
"$context.country",
],
"key": "quality",
}],
"map": [
{
"inputs": ["*"],
"output": "*",
},
{
"inputs": ["$context(quality).*"],
"output": "enriched.*",
},
],
},
"name": "transformation1",
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "mytable",
"endpoint_ref": "adx-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-adx-contexualized"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
Datasets: iotoperations.DataflowBuiltInTransformationDatasetArray{
&iotoperations.DataflowBuiltInTransformationDatasetArgs{
Expression: pulumi.String("$1 == $2"),
Inputs: pulumi.StringArray{
pulumi.String("$source.country"),
pulumi.String("$context.country"),
},
Key: pulumi.String("quality"),
},
},
Map: iotoperations.DataflowBuiltInTransformationMapArray{
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("*"),
},
Output: pulumi.String("*"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("$context(quality).*"),
},
Output: pulumi.String("enriched.*"),
},
},
},
Name: pulumi.String("transformation1"),
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("mytable"),
EndpointRef: pulumi.String("adx-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-adx-contexualized",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
Datasets = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationDatasetArgs
{
Expression = "$1 == $2",
Inputs = new[]
{
"$source.country",
"$context.country",
},
Key = "quality",
},
},
Map = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"*",
},
Output = "*",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"$context(quality).*",
},
Output = "enriched.*",
},
},
},
Name = "transformation1",
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "mytable",
EndpointRef = "adx-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-adx-contexualized")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.datasets(DataflowBuiltInTransformationDatasetArgs.builder()
.expression("$1 == $2")
.inputs(
"$source.country",
"$context.country")
.key("quality")
.build())
.map(
DataflowBuiltInTransformationMapArgs.builder()
.inputs("*")
.output("*")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.inputs("$context(quality).*")
.output("enriched.*")
.build())
.build())
.name("transformation1")
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("mytable")
.endpointRef("adx-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-adx-contexualized
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
datasets:
- expression: $1 == $2
inputs:
- $source.country
- $context.country
key: quality
map:
- inputs:
- '*'
output: '*'
- inputs:
- $context(quality).*
output: enriched.*
name: transformation1
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: mytable
endpointRef: adx-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The datasets array defines reusable expressions that compare source fields ($source.country) with context data ($context.country), storing results under a key (quality). The map operations then reference these datasets via $context(quality).* to enrich the output. This pattern separates data matching logic from field mapping, enabling complex contextualization workflows.
Convert telemetry to Parquet for Microsoft Fabric
Analytics workloads in Microsoft Fabric benefit from columnar formats like Parquet, which require schema-based serialization.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-fabric",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
schemaRef: "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
serializationFormat: azure_native.iotoperations.TransformationSerializationFormat.Parquet,
},
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "telemetryTable",
endpointRef: "fabric-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-fabric",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"schema_ref": "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
"serialization_format": azure_native.iotoperations.TransformationSerializationFormat.PARQUET,
},
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "telemetryTable",
"endpoint_ref": "fabric-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-fabric"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
SchemaRef: pulumi.String("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0"),
SerializationFormat: pulumi.String(iotoperations.TransformationSerializationFormatParquet),
},
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("telemetryTable"),
EndpointRef: pulumi.String("fabric-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-fabric",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
SchemaRef = "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
SerializationFormat = AzureNative.IoTOperations.TransformationSerializationFormat.Parquet,
},
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "telemetryTable",
EndpointRef = "fabric-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-fabric")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.schemaRef("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0")
.serializationFormat("Parquet")
.build())
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("telemetryTable")
.endpointRef("fabric-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-fabric
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
schemaRef: aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0
serializationFormat: Parquet
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: telemetryTable
endpointRef: fabric-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The builtInTransformationSettings block specifies serializationFormat as Parquet and references a schema via schemaRef (using the aio-sr:// URI scheme). The transformation operation converts JSON telemetry to Parquet according to the schema before the destination operation writes to Fabric. This pattern enables efficient columnar storage for analytics.
Beyond these examples
These snippets focus on specific dataflow-level features: MQTT-to-cloud routing patterns, filtering and transformation expressions, and contextual enrichment and format conversion. They’re intentionally minimal rather than full IoT data pipelines.
The examples reference pre-existing infrastructure such as DataflowProfile parent resources, DataflowEndpoint resources for MQTT brokers and cloud services, Azure IoT Operations instances with Custom Locations, and schema registry entries for Parquet serialization. They focus on configuring the dataflow pipeline rather than provisioning the surrounding infrastructure.
To keep things focused, common dataflow patterns are omitted, including:
- Operational mode configuration (mode property always ‘Enabled’ in examples)
- Asset references (assetRef in sourceSettings)
- Error handling and retry policies
- Batching and performance tuning
- Authentication and authorization configuration
- Monitoring and observability setup
These omissions are intentional: the goal is to illustrate how each dataflow operation is wired, not provide drop-in IoT solutions. See the Dataflow resource reference for all available configuration options.
Let's configure Azure IoT Operations Dataflows
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Resource Configuration & Immutability
extendedLocation, dataflowName, dataflowProfileName, instanceName, and resourceGroupName properties are immutable and cannot be changed after creation.pulumi package add azure-native iotoperations [ApiVersion]. Available versions include 2024-08-15-preview through 2025-10-01.Operations & Pipeline Structure
You can use three operation types in the operations array:
- Source - Reads data from endpoints using
sourceSettings - BuiltInTransformation - Transforms data using
builtInTransformationSettings - Destination - Writes data to endpoints using
destinationSettings
endpointRef in sourceSettings or destinationSettings to reference endpoint names like aio-builtin-broker-endpoint or event-hub-endpoint.Data Transformation
builtInTransformationSettings.filter with an expression (like $1 > 9000 && $2 >= 8000) and inputs array specifying the fields to evaluate.PassThrough copies data unchanged (inputs: ["*"], output: "*"), while NewProperties creates new fields using expressions to transform input values.$source.field and $context.field in inputs, then reference the dataset with $context(datasetKey).* in map operations.($1+$2)/2), comparison ($1 > 9000), boolean (&&, >=), and custom functions (cToF($1), scale($1,0,10,0,100)).Serialization & Schemas
Json (source data), Delta (transformation output), and Parquet (for destinations like Fabric).schemaRef with the aio-sr:// format: aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0.Data Sources & Patterns
+ for single-level wildcards and # for multi-level wildcards in dataSources, like thermostats/+/telemetry/temperature/#.Using a different cloud?
Explore integration guides for other cloud providers: