Configure Azure IoT Operations Dataflow Endpoints

The azure-native:iotoperations:DataflowEndpoint resource, part of the Pulumi Azure Native provider, defines dataflow endpoints that specify where Azure IoT Operations sends or receives messages: cloud services, message brokers, or local storage. This guide focuses on four capabilities: connecting to the built-in MQTT broker, sending data to Azure storage and analytics services, bridging to Kafka and Event Hubs, and loading data into Microsoft Fabric.

Dataflow endpoints reference Azure services, Kubernetes resources, and managed identities with appropriate permissions. The examples are intentionally small. Combine them with your own IoT Operations instance, destination services, and authentication configuration.

Connect to the built-in Azure IoT Operations broker

IoT Operations deployments typically start by connecting dataflows to the built-in MQTT broker, which serves as the central message hub for device telemetry and commands.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "aio-builtin-broker-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.Mqtt,
        mqttSettings: {
            authentication: {
                method: "Kubernetes",
                serviceAccountTokenSettings: {
                    audience: "aio-internal",
                },
            },
            host: "aio-broker:18883",
            tls: {
                mode: azure_native.iotoperations.OperationalMode.Enabled,
                trustedCaCertificateConfigMapRef: "aio-ca-trust-bundle-test-only",
            },
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="aio-builtin-broker-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.MQTT,
        "mqtt_settings": {
            "authentication": {
                "method": "Kubernetes",
                "service_account_token_settings": {
                    "audience": "aio-internal",
                },
            },
            "host": "aio-broker:18883",
            "tls": {
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
                "trusted_ca_certificate_config_map_ref": "aio-ca-trust-bundle-test-only",
            },
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("aio-builtin-broker-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeMqtt),
				MqttSettings: &iotoperations.DataflowEndpointMqttArgs{
					Authentication: &iotoperations.DataflowEndpointMqttAuthenticationArgs{
						Method: pulumi.String("Kubernetes"),
						ServiceAccountTokenSettings: &iotoperations.DataflowEndpointAuthenticationServiceAccountTokenArgs{
							Audience: pulumi.String("aio-internal"),
						},
					},
					Host: pulumi.String("aio-broker:18883"),
					Tls: &iotoperations.TlsPropertiesArgs{
						Mode:                             pulumi.String(iotoperations.OperationalModeEnabled),
						TrustedCaCertificateConfigMapRef: pulumi.String("aio-ca-trust-bundle-test-only"),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "aio-builtin-broker-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.Mqtt,
            MqttSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttAuthenticationArgs
                {
                    Method = "Kubernetes",
                    ServiceAccountTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs
                    {
                        Audience = "aio-internal",
                    },
                },
                Host = "aio-broker:18883",
                Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
                {
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                    TrustedCaCertificateConfigMapRef = "aio-ca-trust-bundle-test-only",
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("aio-builtin-broker-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("Mqtt")
                .mqttSettings(DataflowEndpointMqttArgs.builder()
                    .authentication(DataflowEndpointMqttAuthenticationArgs.builder()
                        .method("Kubernetes")
                        .serviceAccountTokenSettings(DataflowEndpointAuthenticationServiceAccountTokenArgs.builder()
                            .audience("aio-internal")
                            .build())
                        .build())
                    .host("aio-broker:18883")
                    .tls(TlsPropertiesArgs.builder()
                        .mode("Enabled")
                        .trustedCaCertificateConfigMapRef("aio-ca-trust-bundle-test-only")
                        .build())
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: aio-builtin-broker-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: Mqtt
        mqttSettings:
          authentication:
            method: Kubernetes
            serviceAccountTokenSettings:
              audience: aio-internal
          host: aio-broker:18883
          tls:
            mode: Enabled
            trustedCaCertificateConfigMapRef: aio-ca-trust-bundle-test-only
      resourceGroupName: rgiotoperations

The mqttSettings block configures MQTT protocol details. The authentication method uses Kubernetes service account tokens with the audience set to “aio-internal”. The host property points to the broker service at “aio-broker:18883”, and TLS is enabled with a reference to the CA trust bundle ConfigMap. This configuration allows dataflows to route messages through the local broker without leaving the cluster.

Send data to Azure Data Lake Storage Gen2

Teams building analytics pipelines often land IoT data in ADLS Gen2 for long-term storage and downstream processing.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "adlsv2-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        dataLakeStorageSettings: {
            authentication: {
                accessTokenSettings: {
                    secretRef: "my-secret",
                },
                method: azure_native.iotoperations.DataLakeStorageAuthMethod.AccessToken,
            },
            host: "example.blob.core.windows.net",
        },
        endpointType: azure_native.iotoperations.EndpointType.DataLakeStorage,
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="adlsv2-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "data_lake_storage_settings": {
            "authentication": {
                "access_token_settings": {
                    "secret_ref": "my-secret",
                },
                "method": azure_native.iotoperations.DataLakeStorageAuthMethod.ACCESS_TOKEN,
            },
            "host": "example.blob.core.windows.net",
        },
        "endpoint_type": azure_native.iotoperations.EndpointType.DATA_LAKE_STORAGE,
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("adlsv2-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				DataLakeStorageSettings: &iotoperations.DataflowEndpointDataLakeStorageArgs{
					Authentication: &iotoperations.DataflowEndpointDataLakeStorageAuthenticationArgs{
						AccessTokenSettings: &iotoperations.DataflowEndpointAuthenticationAccessTokenArgs{
							SecretRef: pulumi.String("my-secret"),
						},
						Method: pulumi.String(iotoperations.DataLakeStorageAuthMethodAccessToken),
					},
					Host: pulumi.String("example.blob.core.windows.net"),
				},
				EndpointType: pulumi.String(iotoperations.EndpointTypeDataLakeStorage),
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "adlsv2-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            DataLakeStorageSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageAuthenticationArgs
                {
                    AccessTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationAccessTokenArgs
                    {
                        SecretRef = "my-secret",
                    },
                    Method = AzureNative.IoTOperations.DataLakeStorageAuthMethod.AccessToken,
                },
                Host = "example.blob.core.windows.net",
            },
            EndpointType = AzureNative.IoTOperations.EndpointType.DataLakeStorage,
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationAccessTokenArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("adlsv2-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .dataLakeStorageSettings(DataflowEndpointDataLakeStorageArgs.builder()
                    .authentication(DataflowEndpointDataLakeStorageAuthenticationArgs.builder()
                        .accessTokenSettings(DataflowEndpointAuthenticationAccessTokenArgs.builder()
                            .secretRef("my-secret")
                            .build())
                        .method("AccessToken")
                        .build())
                    .host("example.blob.core.windows.net")
                    .build())
                .endpointType("DataLakeStorage")
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: adlsv2-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        dataLakeStorageSettings:
          authentication:
            accessTokenSettings:
              secretRef: my-secret
            method: AccessToken
          host: example.blob.core.windows.net
        endpointType: DataLakeStorage
      resourceGroupName: rgiotoperations

The dataLakeStorageSettings block specifies the storage account host and authentication method. When endpointType is set to “DataLakeStorage”, the dataflow writes telemetry to blob storage. The authentication uses a system-assigned managed identity with an access token, requiring the identity to have Storage Blob Data Contributor permissions on the target account.

Stream data into Azure Data Explorer

Real-time analytics workloads use Azure Data Explorer to query high-volume time-series data with low latency.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "adx-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        dataExplorerSettings: {
            authentication: {
                method: "SystemAssignedManagedIdentity",
                systemAssignedManagedIdentitySettings: {},
            },
            batching: {
                latencySeconds: 9312,
                maxMessages: 9028,
            },
            database: "example-database",
            host: "example.westeurope.kusto.windows.net",
        },
        endpointType: azure_native.iotoperations.EndpointType.DataExplorer,
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="adx-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "data_explorer_settings": {
            "authentication": {
                "method": "SystemAssignedManagedIdentity",
                "system_assigned_managed_identity_settings": {},
            },
            "batching": {
                "latency_seconds": 9312,
                "max_messages": 9028,
            },
            "database": "example-database",
            "host": "example.westeurope.kusto.windows.net",
        },
        "endpoint_type": azure_native.iotoperations.EndpointType.DATA_EXPLORER,
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("adx-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				DataExplorerSettings: &iotoperations.DataflowEndpointDataExplorerArgs{
					Authentication: &iotoperations.DataflowEndpointDataExplorerAuthenticationArgs{
						Method:                                pulumi.Any("SystemAssignedManagedIdentity"),
						SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
					},
					Batching: &iotoperations.BatchingConfigurationArgs{
						LatencySeconds: pulumi.Int(9312),
						MaxMessages:    pulumi.Int(9028),
					},
					Database: pulumi.String("example-database"),
					Host:     pulumi.String("example.westeurope.kusto.windows.net"),
				},
				EndpointType: pulumi.String(iotoperations.EndpointTypeDataExplorer),
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "adx-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            DataExplorerSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerAuthenticationArgs
                {
                    Method = "SystemAssignedManagedIdentity",
                    SystemAssignedManagedIdentitySettings = null,
                },
                Batching = new AzureNative.IoTOperations.Inputs.BatchingConfigurationArgs
                {
                    LatencySeconds = 9312,
                    MaxMessages = 9028,
                },
                Database = "example-database",
                Host = "example.westeurope.kusto.windows.net",
            },
            EndpointType = AzureNative.IoTOperations.EndpointType.DataExplorer,
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.BatchingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("adx-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .dataExplorerSettings(DataflowEndpointDataExplorerArgs.builder()
                    .authentication(DataflowEndpointDataExplorerAuthenticationArgs.builder()
                        .method("SystemAssignedManagedIdentity")
                        .systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
                            .build())
                        .build())
                    .batching(BatchingConfigurationArgs.builder()
                        .latencySeconds(9312)
                        .maxMessages(9028)
                        .build())
                    .database("example-database")
                    .host("example.westeurope.kusto.windows.net")
                    .build())
                .endpointType("DataExplorer")
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: adx-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        dataExplorerSettings:
          authentication:
            method: SystemAssignedManagedIdentity
            systemAssignedManagedIdentitySettings: {}
          batching:
            latencySeconds: 9312
            maxMessages: 9028
          database: example-database
          host: example.westeurope.kusto.windows.net
        endpointType: DataExplorer
      resourceGroupName: rgiotoperations

The dataExplorerSettings block defines the ADX cluster host and target database. The batching configuration controls ingestion efficiency by grouping messages before sending them to ADX. With latencySeconds set to 9312 and maxMessages to 9028, the endpoint balances throughput and freshness. The authentication method uses system-assigned managed identity, which must have database ingestor permissions.

Forward messages to Azure Event Hubs

Event Hubs serves as a scalable ingestion layer for streaming architectures, connecting IoT data to downstream consumers.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "event-hub-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.Kafka,
        kafkaSettings: {
            authentication: {
                method: azure_native.iotoperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
                systemAssignedManagedIdentitySettings: {},
            },
            consumerGroupId: "aiodataflows",
            host: "example.servicebus.windows.net:9093",
            tls: {
                mode: azure_native.iotoperations.OperationalMode.Enabled,
            },
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="event-hub-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
        "kafka_settings": {
            "authentication": {
                "method": azure_native.iotoperations.KafkaAuthMethod.SYSTEM_ASSIGNED_MANAGED_IDENTITY,
                "system_assigned_managed_identity_settings": {},
            },
            "consumer_group_id": "aiodataflows",
            "host": "example.servicebus.windows.net:9093",
            "tls": {
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
            },
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("event-hub-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
				KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
					Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
						Method:                                pulumi.String(iotoperations.KafkaAuthMethodSystemAssignedManagedIdentity),
						SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
					},
					ConsumerGroupId: pulumi.String("aiodataflows"),
					Host:            pulumi.String("example.servicebus.windows.net:9093"),
					Tls: &iotoperations.TlsPropertiesArgs{
						Mode: pulumi.String(iotoperations.OperationalModeEnabled),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "event-hub-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
            KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
                {
                    Method = AzureNative.IoTOperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
                    SystemAssignedManagedIdentitySettings = null,
                },
                ConsumerGroupId = "aiodataflows",
                Host = "example.servicebus.windows.net:9093",
                Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
                {
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("event-hub-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("Kafka")
                .kafkaSettings(DataflowEndpointKafkaArgs.builder()
                    .authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
                        .method("SystemAssignedManagedIdentity")
                        .systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
                            .build())
                        .build())
                    .consumerGroupId("aiodataflows")
                    .host("example.servicebus.windows.net:9093")
                    .tls(TlsPropertiesArgs.builder()
                        .mode("Enabled")
                        .build())
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: event-hub-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: Kafka
        kafkaSettings:
          authentication:
            method: SystemAssignedManagedIdentity
            systemAssignedManagedIdentitySettings: {}
          consumerGroupId: aiodataflows
          host: example.servicebus.windows.net:9093
          tls:
            mode: Enabled
      resourceGroupName: rgiotoperations

The kafkaSettings block configures Event Hubs using its Kafka-compatible endpoint. The host property points to the Event Hubs namespace at port 9093, and consumerGroupId specifies which consumer group receives the messages. TLS is enabled for secure communication. The authentication uses system-assigned managed identity, requiring Event Hubs Data Sender role on the namespace.

Integrate with external Kafka clusters

Organizations with existing Kafka infrastructure can route IoT data to on-premises or cloud-hosted clusters.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "generic-kafka-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.Kafka,
        kafkaSettings: {
            authentication: {
                method: azure_native.iotoperations.KafkaAuthMethod.Sasl,
                saslSettings: {
                    saslType: azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.Plain,
                    secretRef: "my-secret",
                },
            },
            batching: {
                latencyMs: 5,
                maxBytes: 1000000,
                maxMessages: 100000,
                mode: azure_native.iotoperations.OperationalMode.Enabled,
            },
            cloudEventAttributes: azure_native.iotoperations.CloudEventAttributeType.Propagate,
            compression: azure_native.iotoperations.DataflowEndpointKafkaCompression.Gzip,
            consumerGroupId: "dataflows",
            copyMqttProperties: azure_native.iotoperations.OperationalMode.Enabled,
            host: "example.kafka.local:9093",
            kafkaAcks: azure_native.iotoperations.DataflowEndpointKafkaAcks.All,
            partitionStrategy: azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.Default,
            tls: {
                mode: azure_native.iotoperations.OperationalMode.Enabled,
                trustedCaCertificateConfigMapRef: "ca-certificates",
            },
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="generic-kafka-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
        "kafka_settings": {
            "authentication": {
                "method": azure_native.iotoperations.KafkaAuthMethod.SASL,
                "sasl_settings": {
                    "sasl_type": azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.PLAIN,
                    "secret_ref": "my-secret",
                },
            },
            "batching": {
                "latency_ms": 5,
                "max_bytes": 1000000,
                "max_messages": 100000,
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
            },
            "cloud_event_attributes": azure_native.iotoperations.CloudEventAttributeType.PROPAGATE,
            "compression": azure_native.iotoperations.DataflowEndpointKafkaCompression.GZIP,
            "consumer_group_id": "dataflows",
            "copy_mqtt_properties": azure_native.iotoperations.OperationalMode.ENABLED,
            "host": "example.kafka.local:9093",
            "kafka_acks": azure_native.iotoperations.DataflowEndpointKafkaAcks.ALL,
            "partition_strategy": azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.DEFAULT,
            "tls": {
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
                "trusted_ca_certificate_config_map_ref": "ca-certificates",
            },
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("generic-kafka-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
				KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
					Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
						Method: pulumi.String(iotoperations.KafkaAuthMethodSasl),
						SaslSettings: &iotoperations.DataflowEndpointAuthenticationSaslArgs{
							SaslType:  pulumi.String(iotoperations.DataflowEndpointAuthenticationSaslTypePlain),
							SecretRef: pulumi.String("my-secret"),
						},
					},
					Batching: &iotoperations.DataflowEndpointKafkaBatchingArgs{
						LatencyMs:   pulumi.Int(5),
						MaxBytes:    pulumi.Int(1000000),
						MaxMessages: pulumi.Int(100000),
						Mode:        pulumi.String(iotoperations.OperationalModeEnabled),
					},
					CloudEventAttributes: pulumi.String(iotoperations.CloudEventAttributeTypePropagate),
					Compression:          pulumi.String(iotoperations.DataflowEndpointKafkaCompressionGzip),
					ConsumerGroupId:      pulumi.String("dataflows"),
					CopyMqttProperties:   pulumi.String(iotoperations.OperationalModeEnabled),
					Host:                 pulumi.String("example.kafka.local:9093"),
					KafkaAcks:            pulumi.String(iotoperations.DataflowEndpointKafkaAcksAll),
					PartitionStrategy:    pulumi.String(iotoperations.DataflowEndpointKafkaPartitionStrategyDefault),
					Tls: &iotoperations.TlsPropertiesArgs{
						Mode:                             pulumi.String(iotoperations.OperationalModeEnabled),
						TrustedCaCertificateConfigMapRef: pulumi.String("ca-certificates"),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "generic-kafka-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
            KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
                {
                    Method = AzureNative.IoTOperations.KafkaAuthMethod.Sasl,
                    SaslSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationSaslArgs
                    {
                        SaslType = AzureNative.IoTOperations.DataflowEndpointAuthenticationSaslType.Plain,
                        SecretRef = "my-secret",
                    },
                },
                Batching = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaBatchingArgs
                {
                    LatencyMs = 5,
                    MaxBytes = 1000000,
                    MaxMessages = 100000,
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                },
                CloudEventAttributes = AzureNative.IoTOperations.CloudEventAttributeType.Propagate,
                Compression = AzureNative.IoTOperations.DataflowEndpointKafkaCompression.Gzip,
                ConsumerGroupId = "dataflows",
                CopyMqttProperties = AzureNative.IoTOperations.OperationalMode.Enabled,
                Host = "example.kafka.local:9093",
                KafkaAcks = AzureNative.IoTOperations.DataflowEndpointKafkaAcks.All,
                PartitionStrategy = AzureNative.IoTOperations.DataflowEndpointKafkaPartitionStrategy.Default,
                Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
                {
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                    TrustedCaCertificateConfigMapRef = "ca-certificates",
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSaslArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaBatchingArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("generic-kafka-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("Kafka")
                .kafkaSettings(DataflowEndpointKafkaArgs.builder()
                    .authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
                        .method("Sasl")
                        .saslSettings(DataflowEndpointAuthenticationSaslArgs.builder()
                            .saslType("Plain")
                            .secretRef("my-secret")
                            .build())
                        .build())
                    .batching(DataflowEndpointKafkaBatchingArgs.builder()
                        .latencyMs(5)
                        .maxBytes(1000000)
                        .maxMessages(100000)
                        .mode("Enabled")
                        .build())
                    .cloudEventAttributes("Propagate")
                    .compression("Gzip")
                    .consumerGroupId("dataflows")
                    .copyMqttProperties("Enabled")
                    .host("example.kafka.local:9093")
                    .kafkaAcks("All")
                    .partitionStrategy("Default")
                    .tls(TlsPropertiesArgs.builder()
                        .mode("Enabled")
                        .trustedCaCertificateConfigMapRef("ca-certificates")
                        .build())
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: generic-kafka-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: Kafka
        kafkaSettings:
          authentication:
            method: Sasl
            saslSettings:
              saslType: Plain
              secretRef: my-secret
          batching:
            latencyMs: 5
            maxBytes: 1e+06
            maxMessages: 100000
            mode: Enabled
          cloudEventAttributes: Propagate
          compression: Gzip
          consumerGroupId: dataflows
          copyMqttProperties: Enabled
          host: example.kafka.local:9093
          kafkaAcks: All
          partitionStrategy: Default
          tls:
            mode: Enabled
            trustedCaCertificateConfigMapRef: ca-certificates
      resourceGroupName: rgiotoperations

The kafkaSettings block includes comprehensive Kafka configuration. The authentication method uses SASL with credentials stored in a Kubernetes secret. The batching configuration optimizes throughput with latencyMs, maxBytes, and maxMessages settings. The compression property is set to “Gzip” to reduce network bandwidth. The kafkaAcks property controls durability, with “All” requiring acknowledgment from all in-sync replicas. The partitionStrategy determines how messages are distributed across Kafka partitions.

Load data into Microsoft Fabric OneLake

Microsoft Fabric provides unified analytics across data engineering, science, and business intelligence.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "fabric-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.FabricOneLake,
        fabricOneLakeSettings: {
            authentication: {
                method: "SystemAssignedManagedIdentity",
                systemAssignedManagedIdentitySettings: {},
            },
            host: "onelake.dfs.fabric.microsoft.com",
            names: {
                lakehouseName: "example-lakehouse",
                workspaceName: "example-workspace",
            },
            oneLakePathType: azure_native.iotoperations.DataflowEndpointFabricPathType.Tables,
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="fabric-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.FABRIC_ONE_LAKE,
        "fabric_one_lake_settings": {
            "authentication": {
                "method": "SystemAssignedManagedIdentity",
                "system_assigned_managed_identity_settings": {},
            },
            "host": "onelake.dfs.fabric.microsoft.com",
            "names": {
                "lakehouse_name": "example-lakehouse",
                "workspace_name": "example-workspace",
            },
            "one_lake_path_type": azure_native.iotoperations.DataflowEndpointFabricPathType.TABLES,
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("fabric-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeFabricOneLake),
				FabricOneLakeSettings: &iotoperations.DataflowEndpointFabricOneLakeArgs{
					Authentication: &iotoperations.DataflowEndpointFabricOneLakeAuthenticationArgs{
						Method:                                pulumi.Any("SystemAssignedManagedIdentity"),
						SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
					},
					Host: pulumi.String("onelake.dfs.fabric.microsoft.com"),
					Names: &iotoperations.DataflowEndpointFabricOneLakeNamesArgs{
						LakehouseName: pulumi.String("example-lakehouse"),
						WorkspaceName: pulumi.String("example-workspace"),
					},
					OneLakePathType: pulumi.String(iotoperations.DataflowEndpointFabricPathTypeTables),
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "fabric-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.FabricOneLake,
            FabricOneLakeSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeAuthenticationArgs
                {
                    Method = "SystemAssignedManagedIdentity",
                    SystemAssignedManagedIdentitySettings = null,
                },
                Host = "onelake.dfs.fabric.microsoft.com",
                Names = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeNamesArgs
                {
                    LakehouseName = "example-lakehouse",
                    WorkspaceName = "example-workspace",
                },
                OneLakePathType = AzureNative.IoTOperations.DataflowEndpointFabricPathType.Tables,
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeNamesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("fabric-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("FabricOneLake")
                .fabricOneLakeSettings(DataflowEndpointFabricOneLakeArgs.builder()
                    .authentication(DataflowEndpointFabricOneLakeAuthenticationArgs.builder()
                        .method("SystemAssignedManagedIdentity")
                        .systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
                            .build())
                        .build())
                    .host("onelake.dfs.fabric.microsoft.com")
                    .names(DataflowEndpointFabricOneLakeNamesArgs.builder()
                        .lakehouseName("example-lakehouse")
                        .workspaceName("example-workspace")
                        .build())
                    .oneLakePathType("Tables")
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: fabric-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: FabricOneLake
        fabricOneLakeSettings:
          authentication:
            method: SystemAssignedManagedIdentity
            systemAssignedManagedIdentitySettings: {}
          host: onelake.dfs.fabric.microsoft.com
          names:
            lakehouseName: example-lakehouse
            workspaceName: example-workspace
          oneLakePathType: Tables
      resourceGroupName: rgiotoperations

The fabricOneLakeSettings block specifies the OneLake host and target workspace. The names property identifies the specific lakehouse and workspace where data will be written. The oneLakePathType property is set to “Tables”, directing the dataflow to write into lakehouse tables rather than files. The authentication uses system-assigned managed identity, which must have contributor permissions on the Fabric workspace.

Beyond these examples

These snippets focus on specific endpoint-level features: cloud destination endpoints, protocol bridges, and authentication methods. They’re intentionally minimal rather than full dataflow configurations.

The examples reference pre-existing infrastructure such as Azure IoT Operations instance and custom location, destination services, Kubernetes secrets and ConfigMaps for credentials and certificates, and managed identities with appropriate role assignments. They focus on configuring the endpoint rather than provisioning the surrounding infrastructure.

To keep things focused, common endpoint patterns are omitted, including:

  • Local storage endpoints for edge buffering (localStorageSettings)
  • Event Grid MQTT endpoints for pub/sub scenarios
  • Generic MQTT broker endpoints with X.509 authentication
  • CloudEvent attribute handling and MQTT property propagation
  • Advanced batching tuning (latency vs throughput tradeoffs)
  • Partition strategies and acknowledgment modes for Kafka

These omissions are intentional: the goal is to illustrate how each endpoint type is wired, not provide drop-in dataflow modules. See the DataflowEndpoint resource reference for all available configuration options.

Let's configure Azure IoT Operations Dataflow Endpoints

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Endpoint Types & Selection
What endpoint types can I use for dataflows?
Six endpoint types are available: DataExplorer (Azure Data Explorer), DataLakeStorage (ADLS Gen2), FabricOneLake (Microsoft Fabric), Kafka (Event Hub or generic Kafka), LocalStorage (persistent volumes), and Mqtt (Event Grid or generic MQTT brokers).
How do I choose between Kafka and MQTT endpoints?
Use Kafka for Azure Event Hub or Apache Kafka brokers with features like consumer groups, partitioning, and compression. Use Mqtt for Azure Event Grid MQTT broker, the built-in AIO broker, or generic MQTT brokers with QoS and retain settings.
Can I change the endpoint type after creation?
No, endpointType and other core properties like dataflowEndpointName, instanceName, and extendedLocation are immutable and require resource replacement to change.
Authentication & Security
What authentication methods are available?

Authentication methods vary by endpoint type:

  • Managed Identity: SystemAssignedManagedIdentity or UserAssignedManagedIdentity (most endpoints)
  • Token-based: AccessToken (Data Lake Storage), Kubernetes service account tokens (AIO broker)
  • Certificate: X509Certificate (MQTT, Kafka)
  • SASL: Sasl with Plain or ScramSha256/ScramSha512 (Kafka)
How do I configure TLS for secure connections?
Set tls.mode to Enabled and optionally specify trustedCaCertificateConfigMapRef pointing to a ConfigMap with CA certificates. For the AIO broker, use aio-ca-trust-bundle-test-only. Set mode to Disabled only for local development.
How do I authenticate to Azure Event Hub?
Use SystemAssignedManagedIdentity authentication with the Kafka endpoint type. Set host to your Event Hub namespace (e.g., example.servicebus.windows.net:9093), enable TLS, and specify a consumerGroupId.
How do I connect to the built-in AIO MQTT broker?
Use Mqtt endpoint type with Kubernetes authentication method. Configure serviceAccountTokenSettings with audience set to aio-internal, host as aio-broker:18883, and enable TLS with trustedCaCertificateConfigMapRef set to aio-ca-trust-bundle-test-only.
Performance & Batching
How do I configure batching for better performance?

Batching configuration depends on endpoint type:

  • Most endpoints: Use latencySeconds (delay before sending) and maxMessages (batch size)
  • Kafka: Use latencyMs (milliseconds), maxBytes (size limit), maxMessages, and set mode to Enabled
What Kafka-specific performance settings are available?
Kafka endpoints support compression (None, Gzip, Snappy, Lz4), kafkaAcks (Zero, One, All for durability), partitionStrategy (Default, Static, Topic, Property), and copyMqttProperties to preserve MQTT metadata.
Endpoint-Specific Configuration
How do I configure Azure Data Explorer endpoints?
Set endpointType to DataExplorer, configure host as <cluster>.<region>.kusto.windows.net, specify the target database, and use managed identity authentication. Optionally configure batching with latencySeconds and maxMessages.
How do I configure Microsoft Fabric OneLake endpoints?
Use FabricOneLake endpoint type with host set to onelake.dfs.fabric.microsoft.com. Specify names.workspaceName and names.lakehouseName, choose oneLakePathType (Files or Tables), and use managed identity authentication.
What MQTT-specific settings can I configure?
MQTT endpoints support qos (0, 1, 2), retain (Keep or Never), keepAliveSeconds, sessionExpirySeconds, maxInflightMessages, protocol (Mqtt or WebSockets), and clientIdPrefix for connection identification.
How do I use local storage as an endpoint?
Set endpointType to LocalStorage and configure persistentVolumeClaimRef pointing to an existing Kubernetes PersistentVolumeClaim for storing dataflow data locally.

Using a different cloud?

Explore integration guides for other cloud providers: