The azure-native:iotoperations:Dataflow resource, part of the Pulumi Azure Native provider, defines data movement pipelines within Azure IoT Operations. It orchestrates how telemetry flows from source endpoints through transformation logic to destination endpoints. This guide focuses on three capabilities: MQTT-to-cloud routing patterns, built-in transformation operations, and format conversion for analytics destinations.
Dataflows belong to a DataflowProfile and reference DataflowEndpoint resources that define connectivity to MQTT brokers, Event Grid, Event Hub, Azure Data Explorer, and Microsoft Fabric. The examples are intentionally small. Combine them with your own endpoint configurations and IoT Operations infrastructure.
Route MQTT telemetry to Event Grid topics
IoT deployments often route device telemetry from MQTT brokers to Azure Event Grid for event-driven processing.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-event-grid",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["thermostats/+/telemetry/temperature/#"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
destinationSettings: {
dataDestination: "factory/telemetry",
endpointRef: "event-grid-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-event-grid",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["thermostats/+/telemetry/temperature/#"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"destination_settings": {
"data_destination": "factory/telemetry",
"endpoint_ref": "event-grid-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-event-grid"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("thermostats/+/telemetry/temperature/#"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("factory/telemetry"),
EndpointRef: pulumi.String("event-grid-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-event-grid",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"thermostats/+/telemetry/temperature/#",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "factory/telemetry",
EndpointRef = "event-grid-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-event-grid")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("thermostats/+/telemetry/temperature/#")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("factory/telemetry")
.endpointRef("event-grid-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-event-grid
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- thermostats/+/telemetry/temperature/#
endpointRef: aio-builtin-broker-endpoint
- destinationSettings:
dataDestination: factory/telemetry
endpointRef: event-grid-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The operations array defines a pipeline: the source operation reads from MQTT topics matching the wildcard pattern, and the destination operation writes to an Event Grid topic. The endpointRef properties point to DataflowEndpoint resources that handle authentication and connectivity. The dataDestination specifies the Event Grid topic name where messages land.
Filter telemetry and republish to MQTT topics
Applications filter incoming telemetry based on threshold conditions before routing to specialized MQTT topics.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "mqtt-filter-to-topic",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
filter: [{
description: "filter-datapoint",
expression: "$1 > 9000 && $2 >= 8000",
inputs: [
"temperature.Value",
"\"Tag 10\".Value",
],
type: azure_native.iotoperations.FilterType.Filter,
}],
map: [{
inputs: ["*"],
output: "*",
type: azure_native.iotoperations.DataflowMappingType.PassThrough,
}],
},
name: "transformation1",
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "data/filtered/thermostat",
endpointRef: "aio-builtin-broker-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="mqtt-filter-to-topic",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"filter": [{
"description": "filter-datapoint",
"expression": "$1 > 9000 && $2 >= 8000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value",
],
"type": azure_native.iotoperations.FilterType.FILTER,
}],
"map": [{
"inputs": ["*"],
"output": "*",
"type": azure_native.iotoperations.DataflowMappingType.PASS_THROUGH,
}],
},
"name": "transformation1",
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "data/filtered/thermostat",
"endpoint_ref": "aio-builtin-broker-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("mqtt-filter-to-topic"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
&iotoperations.DataflowBuiltInTransformationFilterArgs{
Description: pulumi.String("filter-datapoint"),
Expression: pulumi.String("$1 > 9000 && $2 >= 8000"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
pulumi.String("\"Tag 10\".Value"),
},
Type: pulumi.String(iotoperations.FilterTypeFilter),
},
},
Map: iotoperations.DataflowBuiltInTransformationMapArray{
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("*"),
},
Output: pulumi.String("*"),
Type: pulumi.String(iotoperations.DataflowMappingTypePassThrough),
},
},
},
Name: pulumi.String("transformation1"),
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("data/filtered/thermostat"),
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "mqtt-filter-to-topic",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
Filter = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
{
Description = "filter-datapoint",
Expression = "$1 > 9000 && $2 >= 8000",
Inputs = new[]
{
"temperature.Value",
"\"Tag 10\".Value",
},
Type = AzureNative.IoTOperations.FilterType.Filter,
},
},
Map = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"*",
},
Output = "*",
Type = AzureNative.IoTOperations.DataflowMappingType.PassThrough,
},
},
},
Name = "transformation1",
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "data/filtered/thermostat",
EndpointRef = "aio-builtin-broker-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("mqtt-filter-to-topic")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.filter(DataflowBuiltInTransformationFilterArgs.builder()
.description("filter-datapoint")
.expression("$1 > 9000 && $2 >= 8000")
.inputs(
"temperature.Value",
"\"Tag 10\".Value")
.type("Filter")
.build())
.map(DataflowBuiltInTransformationMapArgs.builder()
.inputs("*")
.output("*")
.type("PassThrough")
.build())
.build())
.name("transformation1")
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("data/filtered/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: mqtt-filter-to-topic
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
filter:
- description: filter-datapoint
expression: $1 > 9000 && $2 >= 8000
inputs:
- temperature.Value
- '"Tag 10".Value'
type: Filter
map:
- inputs:
- '*'
output: '*'
type: PassThrough
name: transformation1
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: data/filtered/thermostat
endpointRef: aio-builtin-broker-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The builtInTransformationSettings block sits between source and destination operations. The filter array evaluates expressions against input fields; only messages where both temperature.Value exceeds 9000 and “Tag 10”.Value exceeds 8000 pass through. The map array with PassThrough type forwards all fields unchanged. Filtered messages republish to a different MQTT topic on the same broker.
Transform telemetry with expressions and send to Event Hub
Complex pipelines combine filtering, field mapping, and custom expressions to reshape telemetry for Event Hub analytics.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-event-hub-transformed",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
filter: [{
expression: "$1 > 9000 && $2 >= 8000",
inputs: [
"temperature.Value",
"\"Tag 10\".Value",
],
}],
map: [
{
inputs: ["*"],
output: "*",
},
{
expression: "($1+$2)/2",
inputs: [
"temperature.Value",
"\"Tag 10\".Value",
],
output: "AvgTemp.Value",
},
{
expression: "true",
inputs: [],
output: "dataflow-processed",
},
{
expression: "",
inputs: ["temperature.SourceTimestamp"],
output: "",
},
{
expression: "",
inputs: ["\"Tag 10\""],
output: "pressure",
},
{
expression: "cToF($1)",
inputs: ["temperature.Value"],
output: "temperatureF.Value",
},
{
expression: "scale ($1,0,10,0,100)",
inputs: ["\"Tag 10\".Value"],
output: "\"Scale Tag 10\".Value",
},
],
},
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "myuniqueeventhub",
endpointRef: "event-hub-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-event-hub-transformed",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"filter": [{
"expression": "$1 > 9000 && $2 >= 8000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value",
],
}],
"map": [
{
"inputs": ["*"],
"output": "*",
},
{
"expression": "($1+$2)/2",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value",
],
"output": "AvgTemp.Value",
},
{
"expression": "true",
"inputs": [],
"output": "dataflow-processed",
},
{
"expression": "",
"inputs": ["temperature.SourceTimestamp"],
"output": "",
},
{
"expression": "",
"inputs": ["\"Tag 10\""],
"output": "pressure",
},
{
"expression": "cToF($1)",
"inputs": ["temperature.Value"],
"output": "temperatureF.Value",
},
{
"expression": "scale ($1,0,10,0,100)",
"inputs": ["\"Tag 10\".Value"],
"output": "\"Scale Tag 10\".Value",
},
],
},
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "myuniqueeventhub",
"endpoint_ref": "event-hub-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-event-hub-transformed"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
Filter: iotoperations.DataflowBuiltInTransformationFilterArray{
&iotoperations.DataflowBuiltInTransformationFilterArgs{
Expression: pulumi.String("$1 > 9000 && $2 >= 8000"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
pulumi.String("\"Tag 10\".Value"),
},
},
},
Map: iotoperations.DataflowBuiltInTransformationMapArray{
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("*"),
},
Output: pulumi.String("*"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("($1+$2)/2"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
pulumi.String("\"Tag 10\".Value"),
},
Output: pulumi.String("AvgTemp.Value"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("true"),
Inputs: pulumi.StringArray{},
Output: pulumi.String("dataflow-processed"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String(""),
Inputs: pulumi.StringArray{
pulumi.String("temperature.SourceTimestamp"),
},
Output: pulumi.String(""),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String(""),
Inputs: pulumi.StringArray{
pulumi.String("\"Tag 10\""),
},
Output: pulumi.String("pressure"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("cToF($1)"),
Inputs: pulumi.StringArray{
pulumi.String("temperature.Value"),
},
Output: pulumi.String("temperatureF.Value"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Expression: pulumi.String("scale ($1,0,10,0,100)"),
Inputs: pulumi.StringArray{
pulumi.String("\"Tag 10\".Value"),
},
Output: pulumi.String("\"Scale Tag 10\".Value"),
},
},
},
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("myuniqueeventhub"),
EndpointRef: pulumi.String("event-hub-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-event-hub-transformed",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
Filter = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationFilterArgs
{
Expression = "$1 > 9000 && $2 >= 8000",
Inputs = new[]
{
"temperature.Value",
"\"Tag 10\".Value",
},
},
},
Map = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"*",
},
Output = "*",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "($1+$2)/2",
Inputs = new[]
{
"temperature.Value",
"\"Tag 10\".Value",
},
Output = "AvgTemp.Value",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "true",
Inputs = new() { },
Output = "dataflow-processed",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "",
Inputs = new[]
{
"temperature.SourceTimestamp",
},
Output = "",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "",
Inputs = new[]
{
"\"Tag 10\"",
},
Output = "pressure",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "cToF($1)",
Inputs = new[]
{
"temperature.Value",
},
Output = "temperatureF.Value",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Expression = "scale ($1,0,10,0,100)",
Inputs = new[]
{
"\"Tag 10\".Value",
},
Output = "\"Scale Tag 10\".Value",
},
},
},
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "myuniqueeventhub",
EndpointRef = "event-hub-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-event-hub-transformed")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.filter(DataflowBuiltInTransformationFilterArgs.builder()
.expression("$1 > 9000 && $2 >= 8000")
.inputs(
"temperature.Value",
"\"Tag 10\".Value")
.build())
.map(
DataflowBuiltInTransformationMapArgs.builder()
.inputs("*")
.output("*")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("($1+$2)/2")
.inputs(
"temperature.Value",
"\"Tag 10\".Value")
.output("AvgTemp.Value")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("true")
.inputs()
.output("dataflow-processed")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("")
.inputs("temperature.SourceTimestamp")
.output("")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("")
.inputs("\"Tag 10\"")
.output("pressure")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("cToF($1)")
.inputs("temperature.Value")
.output("temperatureF.Value")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.expression("scale ($1,0,10,0,100)")
.inputs("\"Tag 10\".Value")
.output("\"Scale Tag 10\".Value")
.build())
.build())
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("myuniqueeventhub")
.endpointRef("event-hub-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-event-hub-transformed
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
filter:
- expression: $1 > 9000 && $2 >= 8000
inputs:
- temperature.Value
- '"Tag 10".Value'
map:
- inputs:
- '*'
output: '*'
- expression: ($1+$2)/2
inputs:
- temperature.Value
- '"Tag 10".Value'
output: AvgTemp.Value
- expression: 'true'
inputs: []
output: dataflow-processed
- expression: ""
inputs:
- temperature.SourceTimestamp
output: ""
- expression: ""
inputs:
- '"Tag 10"'
output: pressure
- expression: cToF($1)
inputs:
- temperature.Value
output: temperatureF.Value
- expression: scale ($1,0,10,0,100)
inputs:
- '"Tag 10".Value'
output: '"Scale Tag 10".Value'
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: myuniqueeventhub
endpointRef: event-hub-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The transformation operation chains multiple map entries. The first passes all fields through with “*”. Subsequent entries calculate derived values: expression “($1+$2)/2” computes an average, “cToF($1)” converts Celsius to Fahrenheit, and “scale ($1,0,10,0,100)” normalizes values. Each map entry specifies inputs (field references) and output (target field name). The filter operation runs first, dropping messages that don’t meet threshold criteria.
Enrich telemetry with contextual data lookups
Industrial scenarios enrich telemetry by matching fields against reference datasets.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-adx-contexualized",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
datasets: [{
expression: "$1 == $2",
inputs: [
"$source.country",
"$context.country",
],
key: "quality",
}],
map: [
{
inputs: ["*"],
output: "*",
},
{
inputs: ["$context(quality).*"],
output: "enriched.*",
},
],
},
name: "transformation1",
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "mytable",
endpointRef: "adx-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-adx-contexualized",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"datasets": [{
"expression": "$1 == $2",
"inputs": [
"$source.country",
"$context.country",
],
"key": "quality",
}],
"map": [
{
"inputs": ["*"],
"output": "*",
},
{
"inputs": ["$context(quality).*"],
"output": "enriched.*",
},
],
},
"name": "transformation1",
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "mytable",
"endpoint_ref": "adx-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-adx-contexualized"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
Datasets: iotoperations.DataflowBuiltInTransformationDatasetArray{
&iotoperations.DataflowBuiltInTransformationDatasetArgs{
Expression: pulumi.String("$1 == $2"),
Inputs: pulumi.StringArray{
pulumi.String("$source.country"),
pulumi.String("$context.country"),
},
Key: pulumi.String("quality"),
},
},
Map: iotoperations.DataflowBuiltInTransformationMapArray{
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("*"),
},
Output: pulumi.String("*"),
},
&iotoperations.DataflowBuiltInTransformationMapArgs{
Inputs: pulumi.StringArray{
pulumi.String("$context(quality).*"),
},
Output: pulumi.String("enriched.*"),
},
},
},
Name: pulumi.String("transformation1"),
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("mytable"),
EndpointRef: pulumi.String("adx-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-adx-contexualized",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
Datasets = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationDatasetArgs
{
Expression = "$1 == $2",
Inputs = new[]
{
"$source.country",
"$context.country",
},
Key = "quality",
},
},
Map = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"*",
},
Output = "*",
},
new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationMapArgs
{
Inputs = new[]
{
"$context(quality).*",
},
Output = "enriched.*",
},
},
},
Name = "transformation1",
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "mytable",
EndpointRef = "adx-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-adx-contexualized")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.datasets(DataflowBuiltInTransformationDatasetArgs.builder()
.expression("$1 == $2")
.inputs(
"$source.country",
"$context.country")
.key("quality")
.build())
.map(
DataflowBuiltInTransformationMapArgs.builder()
.inputs("*")
.output("*")
.build(),
DataflowBuiltInTransformationMapArgs.builder()
.inputs("$context(quality).*")
.output("enriched.*")
.build())
.build())
.name("transformation1")
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("mytable")
.endpointRef("adx-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-adx-contexualized
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
datasets:
- expression: $1 == $2
inputs:
- $source.country
- $context.country
key: quality
map:
- inputs:
- '*'
output: '*'
- inputs:
- $context(quality).*
output: enriched.*
name: transformation1
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: mytable
endpointRef: adx-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The datasets array defines lookups: the expression “$1 == $2” compares $source.country from incoming telemetry against $context.country from a contextual dataset. Matches populate a “quality” key. The map operations then reference this dataset with “$context(quality).” to copy enrichment fields into the output under “enriched.”. This pattern joins streaming telemetry with static reference data before writing to Azure Data Explorer.
Convert telemetry to Parquet for Microsoft Fabric
Analytics workloads in Microsoft Fabric benefit from Parquet format for efficient columnar storage.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflow = new azure_native.iotoperations.Dataflow("dataflow", {
dataflowName: "aio-to-fabric",
dataflowProfileName: "resource-name123",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
operations: [
{
name: "source1",
operationType: azure_native.iotoperations.OperationType.Source,
sourceSettings: {
dataSources: ["azure-iot-operations/data/thermostat"],
endpointRef: "aio-builtin-broker-endpoint",
},
},
{
builtInTransformationSettings: {
schemaRef: "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
serializationFormat: azure_native.iotoperations.TransformationSerializationFormat.Parquet,
},
operationType: azure_native.iotoperations.OperationType.BuiltInTransformation,
},
{
destinationSettings: {
dataDestination: "telemetryTable",
endpointRef: "fabric-endpoint",
},
name: "destination1",
operationType: azure_native.iotoperations.OperationType.Destination,
},
],
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow = azure_native.iotoperations.Dataflow("dataflow",
dataflow_name="aio-to-fabric",
dataflow_profile_name="resource-name123",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"operations": [
{
"name": "source1",
"operation_type": azure_native.iotoperations.OperationType.SOURCE,
"source_settings": {
"data_sources": ["azure-iot-operations/data/thermostat"],
"endpoint_ref": "aio-builtin-broker-endpoint",
},
},
{
"built_in_transformation_settings": {
"schema_ref": "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
"serialization_format": azure_native.iotoperations.TransformationSerializationFormat.PARQUET,
},
"operation_type": azure_native.iotoperations.OperationType.BUILT_IN_TRANSFORMATION,
},
{
"destination_settings": {
"data_destination": "telemetryTable",
"endpoint_ref": "fabric-endpoint",
},
"name": "destination1",
"operation_type": azure_native.iotoperations.OperationType.DESTINATION,
},
],
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflow(ctx, "dataflow", &iotoperations.DataflowArgs{
DataflowName: pulumi.String("aio-to-fabric"),
DataflowProfileName: pulumi.String("resource-name123"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
Operations: iotoperations.DataflowOperationArray{
&iotoperations.DataflowOperationArgs{
Name: pulumi.String("source1"),
OperationType: pulumi.String(iotoperations.OperationTypeSource),
SourceSettings: &iotoperations.DataflowSourceOperationSettingsArgs{
DataSources: pulumi.StringArray{
pulumi.String("azure-iot-operations/data/thermostat"),
},
EndpointRef: pulumi.String("aio-builtin-broker-endpoint"),
},
},
&iotoperations.DataflowOperationArgs{
BuiltInTransformationSettings: &iotoperations.DataflowBuiltInTransformationSettingsArgs{
SchemaRef: pulumi.String("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0"),
SerializationFormat: pulumi.String(iotoperations.TransformationSerializationFormatParquet),
},
OperationType: pulumi.String(iotoperations.OperationTypeBuiltInTransformation),
},
&iotoperations.DataflowOperationArgs{
DestinationSettings: &iotoperations.DataflowDestinationOperationSettingsArgs{
DataDestination: pulumi.String("telemetryTable"),
EndpointRef: pulumi.String("fabric-endpoint"),
},
Name: pulumi.String("destination1"),
OperationType: pulumi.String(iotoperations.OperationTypeDestination),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflow = new AzureNative.IoTOperations.Dataflow("dataflow", new()
{
DataflowName = "aio-to-fabric",
DataflowProfileName = "resource-name123",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
Operations = new[]
{
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
Name = "source1",
OperationType = AzureNative.IoTOperations.OperationType.Source,
SourceSettings = new AzureNative.IoTOperations.Inputs.DataflowSourceOperationSettingsArgs
{
DataSources = new[]
{
"azure-iot-operations/data/thermostat",
},
EndpointRef = "aio-builtin-broker-endpoint",
},
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
BuiltInTransformationSettings = new AzureNative.IoTOperations.Inputs.DataflowBuiltInTransformationSettingsArgs
{
SchemaRef = "aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0",
SerializationFormat = AzureNative.IoTOperations.TransformationSerializationFormat.Parquet,
},
OperationType = AzureNative.IoTOperations.OperationType.BuiltInTransformation,
},
new AzureNative.IoTOperations.Inputs.DataflowOperationArgs
{
DestinationSettings = new AzureNative.IoTOperations.Inputs.DataflowDestinationOperationSettingsArgs
{
DataDestination = "telemetryTable",
EndpointRef = "fabric-endpoint",
},
Name = "destination1",
OperationType = AzureNative.IoTOperations.OperationType.Destination,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.Dataflow;
import com.pulumi.azurenative.iotoperations.DataflowArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflow = new Dataflow("dataflow", DataflowArgs.builder()
.dataflowName("aio-to-fabric")
.dataflowProfileName("resource-name123")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowPropertiesArgs.builder()
.mode("Enabled")
.operations(
DataflowOperationArgs.builder()
.name("source1")
.operationType("Source")
.sourceSettings(DataflowSourceOperationSettingsArgs.builder()
.dataSources("azure-iot-operations/data/thermostat")
.endpointRef("aio-builtin-broker-endpoint")
.build())
.build(),
DataflowOperationArgs.builder()
.builtInTransformationSettings(DataflowBuiltInTransformationSettingsArgs.builder()
.schemaRef("aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0")
.serializationFormat("Parquet")
.build())
.operationType("BuiltInTransformation")
.build(),
DataflowOperationArgs.builder()
.destinationSettings(DataflowDestinationOperationSettingsArgs.builder()
.dataDestination("telemetryTable")
.endpointRef("fabric-endpoint")
.build())
.name("destination1")
.operationType("Destination")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflow:
type: azure-native:iotoperations:Dataflow
properties:
dataflowName: aio-to-fabric
dataflowProfileName: resource-name123
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
mode: Enabled
operations:
- name: source1
operationType: Source
sourceSettings:
dataSources:
- azure-iot-operations/data/thermostat
endpointRef: aio-builtin-broker-endpoint
- builtInTransformationSettings:
schemaRef: aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0
serializationFormat: Parquet
operationType: BuiltInTransformation
- destinationSettings:
dataDestination: telemetryTable
endpointRef: fabric-endpoint
name: destination1
operationType: Destination
resourceGroupName: rgiotoperations
The builtInTransformationSettings specifies serializationFormat as Parquet and references a schema via schemaRef. The schema URI “aio-sr://exampleNamespace/exmapleParquetSchema:1.0.0” points to a registered Parquet schema that defines column types and structure. The transformation converts JSON telemetry to Parquet batches before writing to the Fabric table specified in dataDestination.
Beyond these examples
These snippets focus on specific dataflow-level features: MQTT source and destination routing, filtering and field mapping transformations, contextual data enrichment, and format conversion. They’re intentionally minimal rather than full data integration pipelines.
The examples reference pre-existing infrastructure such as DataflowProfile parent resources, DataflowEndpoint resources for MQTT brokers and cloud services, Azure IoT Operations instances with custom locations, and schema registries with Parquet schemas. They focus on configuring the dataflow rather than provisioning the surrounding infrastructure.
To keep things focused, common dataflow patterns are omitted, including:
- Error handling and retry configuration
- Batching and throughput tuning
- Authentication and authorization setup for endpoints
- Schema validation and evolution
- Monitoring and observability configuration
- Multi-stage transformation pipelines with intermediate topics
These omissions are intentional: the goal is to illustrate how each dataflow feature is wired, not provide drop-in data integration modules. See the Dataflow resource reference for all available configuration options.
Let's configure Azure IoT Operations Dataflows
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Configuration & Lifecycle
dataflowName, dataflowProfileName, instanceName, resourceGroupName, and extendedLocation properties are immutable. Changes to these require resource replacement.extendedLocation (with name and type CustomLocation), properties with at least one Source operation and one Destination operation, and the resource identifiers (dataflowName, dataflowProfileName, instanceName, resourceGroupName).Operations & Pipeline
Three operation types are available:
- Source - Ingests data from endpoints
- BuiltInTransformation - Processes data with filters, maps, and datasets
- Destination - Sends data to endpoints
endpointRef in sourceSettings or destinationSettings to reference a pre-configured endpoint by name (e.g., endpointRef: "aio-builtin-broker-endpoint").Data Transformation
filter in builtInTransformationSettings with an expression like $1 > 9000 && $2 >= 8000 and specify inputs as field paths (e.g., ["temperature.Value", "\"Tag 10\".Value"]).Use map in builtInTransformationSettings with expressions:
- Calculations:
($1+$2)/2 - Conversions:
cToF($1) - Scaling:
scale ($1,0,10,0,100)Specifyinputsas field paths andoutputas the target field name.
inputs: ["*"] and output: "*" to pass through all fields unchanged.serializationFormat in builtInTransformationSettings: Delta, Parquet, and Json.datasets in builtInTransformationSettings with an expression (e.g., $1 == $2), inputs from source and context fields, and a key for the enriched data. Then reference the dataset in map operations using $context(key).*.Using a different cloud?
Explore integration guides for other cloud providers: