Configure Azure IoT Operations Dataflow Endpoints

The azure-native:iotoperations:DataflowEndpoint resource, part of the Pulumi Azure Native provider, defines dataflow endpoints that specify where messages are sent or received: MQTT brokers, Kafka clusters, Azure services, or local storage. This guide focuses on four capabilities: MQTT broker connections, Kafka and Event Hub streaming, Azure storage and analytics services, and local persistent storage.

Dataflow endpoints reference existing Azure IoT Operations instances, Kubernetes secrets and ConfigMaps, Azure resources, and persistent volumes. The examples are intentionally small. Combine them with your own authentication configuration, batching policies, and network settings.

Connect to the built-in Azure IoT Operations broker

IoT Operations deployments typically start by connecting dataflows to the built-in MQTT broker, which serves as the central message hub for device telemetry and commands.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "aio-builtin-broker-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.Mqtt,
        mqttSettings: {
            authentication: {
                method: "Kubernetes",
                serviceAccountTokenSettings: {
                    audience: "aio-internal",
                },
            },
            host: "aio-broker:18883",
            tls: {
                mode: azure_native.iotoperations.OperationalMode.Enabled,
                trustedCaCertificateConfigMapRef: "aio-ca-trust-bundle-test-only",
            },
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="aio-builtin-broker-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.MQTT,
        "mqtt_settings": {
            "authentication": {
                "method": "Kubernetes",
                "service_account_token_settings": {
                    "audience": "aio-internal",
                },
            },
            "host": "aio-broker:18883",
            "tls": {
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
                "trusted_ca_certificate_config_map_ref": "aio-ca-trust-bundle-test-only",
            },
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("aio-builtin-broker-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeMqtt),
				MqttSettings: &iotoperations.DataflowEndpointMqttArgs{
					Authentication: &iotoperations.DataflowEndpointMqttAuthenticationArgs{
						Method: pulumi.String("Kubernetes"),
						ServiceAccountTokenSettings: &iotoperations.DataflowEndpointAuthenticationServiceAccountTokenArgs{
							Audience: pulumi.String("aio-internal"),
						},
					},
					Host: pulumi.String("aio-broker:18883"),
					Tls: &iotoperations.TlsPropertiesArgs{
						Mode:                             pulumi.String(iotoperations.OperationalModeEnabled),
						TrustedCaCertificateConfigMapRef: pulumi.String("aio-ca-trust-bundle-test-only"),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "aio-builtin-broker-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.Mqtt,
            MqttSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttAuthenticationArgs
                {
                    Method = "Kubernetes",
                    ServiceAccountTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs
                    {
                        Audience = "aio-internal",
                    },
                },
                Host = "aio-broker:18883",
                Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
                {
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                    TrustedCaCertificateConfigMapRef = "aio-ca-trust-bundle-test-only",
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("aio-builtin-broker-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("Mqtt")
                .mqttSettings(DataflowEndpointMqttArgs.builder()
                    .authentication(DataflowEndpointMqttAuthenticationArgs.builder()
                        .method("Kubernetes")
                        .serviceAccountTokenSettings(DataflowEndpointAuthenticationServiceAccountTokenArgs.builder()
                            .audience("aio-internal")
                            .build())
                        .build())
                    .host("aio-broker:18883")
                    .tls(TlsPropertiesArgs.builder()
                        .mode("Enabled")
                        .trustedCaCertificateConfigMapRef("aio-ca-trust-bundle-test-only")
                        .build())
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: aio-builtin-broker-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: Mqtt
        mqttSettings:
          authentication:
            method: Kubernetes
            serviceAccountTokenSettings:
              audience: aio-internal
          host: aio-broker:18883
          tls:
            mode: Enabled
            trustedCaCertificateConfigMapRef: aio-ca-trust-bundle-test-only
      resourceGroupName: rgiotoperations

The mqttSettings block configures MQTT connectivity. The authentication method uses Kubernetes service account tokens with the audience set to “aio-internal” for cluster-internal communication. The host property points to the built-in broker service at “aio-broker:18883”, and TLS is enabled with a trusted CA certificate from a ConfigMap.

Connect to external MQTT brokers with X.509 authentication

Factory gateways and edge deployments often need to bridge messages to external MQTT brokers using certificate-based authentication.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "generic-mqtt-broker-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.Mqtt,
        mqttSettings: {
            authentication: {
                method: azure_native.iotoperations.MqttAuthMethod.X509Certificate,
                x509CertificateSettings: {
                    secretRef: "example-secret",
                },
            },
            clientIdPrefix: "factory-gateway",
            host: "example.broker.local:1883",
            keepAliveSeconds: 60,
            maxInflightMessages: 100,
            protocol: azure_native.iotoperations.BrokerProtocolType.WebSockets,
            qos: 1,
            retain: azure_native.iotoperations.MqttRetainType.Keep,
            sessionExpirySeconds: 3600,
            tls: {
                mode: azure_native.iotoperations.OperationalMode.Disabled,
            },
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="generic-mqtt-broker-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.MQTT,
        "mqtt_settings": {
            "authentication": {
                "method": azure_native.iotoperations.MqttAuthMethod.X509_CERTIFICATE,
                "x509_certificate_settings": {
                    "secret_ref": "example-secret",
                },
            },
            "client_id_prefix": "factory-gateway",
            "host": "example.broker.local:1883",
            "keep_alive_seconds": 60,
            "max_inflight_messages": 100,
            "protocol": azure_native.iotoperations.BrokerProtocolType.WEB_SOCKETS,
            "qos": 1,
            "retain": azure_native.iotoperations.MqttRetainType.KEEP,
            "session_expiry_seconds": 3600,
            "tls": {
                "mode": azure_native.iotoperations.OperationalMode.DISABLED,
            },
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("generic-mqtt-broker-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeMqtt),
				MqttSettings: &iotoperations.DataflowEndpointMqttArgs{
					Authentication: &iotoperations.DataflowEndpointMqttAuthenticationArgs{
						Method: pulumi.String(iotoperations.MqttAuthMethodX509Certificate),
						X509CertificateSettings: &iotoperations.DataflowEndpointAuthenticationX509Args{
							SecretRef: pulumi.String("example-secret"),
						},
					},
					ClientIdPrefix:       pulumi.String("factory-gateway"),
					Host:                 pulumi.String("example.broker.local:1883"),
					KeepAliveSeconds:     pulumi.Int(60),
					MaxInflightMessages:  pulumi.Int(100),
					Protocol:             pulumi.String(iotoperations.BrokerProtocolTypeWebSockets),
					Qos:                  pulumi.Int(1),
					Retain:               pulumi.String(iotoperations.MqttRetainTypeKeep),
					SessionExpirySeconds: pulumi.Int(3600),
					Tls: &iotoperations.TlsPropertiesArgs{
						Mode: pulumi.String(iotoperations.OperationalModeDisabled),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "generic-mqtt-broker-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.Mqtt,
            MqttSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttAuthenticationArgs
                {
                    Method = AzureNative.IoTOperations.MqttAuthMethod.X509Certificate,
                    X509CertificateSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationX509Args
                    {
                        SecretRef = "example-secret",
                    },
                },
                ClientIdPrefix = "factory-gateway",
                Host = "example.broker.local:1883",
                KeepAliveSeconds = 60,
                MaxInflightMessages = 100,
                Protocol = AzureNative.IoTOperations.BrokerProtocolType.WebSockets,
                Qos = 1,
                Retain = AzureNative.IoTOperations.MqttRetainType.Keep,
                SessionExpirySeconds = 3600,
                Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
                {
                    Mode = AzureNative.IoTOperations.OperationalMode.Disabled,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationX509Args;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("generic-mqtt-broker-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("Mqtt")
                .mqttSettings(DataflowEndpointMqttArgs.builder()
                    .authentication(DataflowEndpointMqttAuthenticationArgs.builder()
                        .method("X509Certificate")
                        .x509CertificateSettings(DataflowEndpointAuthenticationX509Args.builder()
                            .secretRef("example-secret")
                            .build())
                        .build())
                    .clientIdPrefix("factory-gateway")
                    .host("example.broker.local:1883")
                    .keepAliveSeconds(60)
                    .maxInflightMessages(100)
                    .protocol("WebSockets")
                    .qos(1)
                    .retain("Keep")
                    .sessionExpirySeconds(3600)
                    .tls(TlsPropertiesArgs.builder()
                        .mode("Disabled")
                        .build())
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: generic-mqtt-broker-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: Mqtt
        mqttSettings:
          authentication:
            method: X509Certificate
            x509CertificateSettings:
              secretRef: example-secret
          clientIdPrefix: factory-gateway
          host: example.broker.local:1883
          keepAliveSeconds: 60
          maxInflightMessages: 100
          protocol: WebSockets
          qos: 1
          retain: Keep
          sessionExpirySeconds: 3600
          tls:
            mode: Disabled
      resourceGroupName: rgiotoperations

The authentication method switches to X.509 certificates, with the secretRef pointing to a Kubernetes secret containing the client certificate. The protocol property selects WebSockets transport, and connection parameters like keepAliveSeconds, maxInflightMessages, and qos control reliability and throughput. The retain property determines whether the broker should retain messages for new subscribers.

Stream to Azure Event Hubs via Kafka protocol

Cloud analytics pipelines commonly ingest IoT telemetry through Event Hubs, which exposes a Kafka-compatible endpoint.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "event-hub-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.Kafka,
        kafkaSettings: {
            authentication: {
                method: azure_native.iotoperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
                systemAssignedManagedIdentitySettings: {},
            },
            consumerGroupId: "aiodataflows",
            host: "example.servicebus.windows.net:9093",
            tls: {
                mode: azure_native.iotoperations.OperationalMode.Enabled,
            },
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="event-hub-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
        "kafka_settings": {
            "authentication": {
                "method": azure_native.iotoperations.KafkaAuthMethod.SYSTEM_ASSIGNED_MANAGED_IDENTITY,
                "system_assigned_managed_identity_settings": {},
            },
            "consumer_group_id": "aiodataflows",
            "host": "example.servicebus.windows.net:9093",
            "tls": {
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
            },
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("event-hub-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
				KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
					Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
						Method:                                pulumi.String(iotoperations.KafkaAuthMethodSystemAssignedManagedIdentity),
						SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
					},
					ConsumerGroupId: pulumi.String("aiodataflows"),
					Host:            pulumi.String("example.servicebus.windows.net:9093"),
					Tls: &iotoperations.TlsPropertiesArgs{
						Mode: pulumi.String(iotoperations.OperationalModeEnabled),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "event-hub-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
            KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
                {
                    Method = AzureNative.IoTOperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
                    SystemAssignedManagedIdentitySettings = null,
                },
                ConsumerGroupId = "aiodataflows",
                Host = "example.servicebus.windows.net:9093",
                Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
                {
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("event-hub-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("Kafka")
                .kafkaSettings(DataflowEndpointKafkaArgs.builder()
                    .authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
                        .method("SystemAssignedManagedIdentity")
                        .systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
                            .build())
                        .build())
                    .consumerGroupId("aiodataflows")
                    .host("example.servicebus.windows.net:9093")
                    .tls(TlsPropertiesArgs.builder()
                        .mode("Enabled")
                        .build())
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: event-hub-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: Kafka
        kafkaSettings:
          authentication:
            method: SystemAssignedManagedIdentity
            systemAssignedManagedIdentitySettings: {}
          consumerGroupId: aiodataflows
          host: example.servicebus.windows.net:9093
          tls:
            mode: Enabled
      resourceGroupName: rgiotoperations

Event Hubs uses the kafkaSettings block with systemAssignedManagedIdentitySettings for authentication. The consumerGroupId identifies the consumer group for reading messages, and the host property points to the Event Hub namespace’s Kafka endpoint. TLS is enabled for secure communication.

Send to generic Kafka clusters with SASL authentication

On-premises or third-party Kafka deployments require SASL authentication and often need batching and compression to optimize network usage.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "generic-kafka-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.Kafka,
        kafkaSettings: {
            authentication: {
                method: azure_native.iotoperations.KafkaAuthMethod.Sasl,
                saslSettings: {
                    saslType: azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.Plain,
                    secretRef: "my-secret",
                },
            },
            batching: {
                latencyMs: 5,
                maxBytes: 1000000,
                maxMessages: 100000,
                mode: azure_native.iotoperations.OperationalMode.Enabled,
            },
            cloudEventAttributes: azure_native.iotoperations.CloudEventAttributeType.Propagate,
            compression: azure_native.iotoperations.DataflowEndpointKafkaCompression.Gzip,
            consumerGroupId: "dataflows",
            copyMqttProperties: azure_native.iotoperations.OperationalMode.Enabled,
            host: "example.kafka.local:9093",
            kafkaAcks: azure_native.iotoperations.DataflowEndpointKafkaAcks.All,
            partitionStrategy: azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.Default,
            tls: {
                mode: azure_native.iotoperations.OperationalMode.Enabled,
                trustedCaCertificateConfigMapRef: "ca-certificates",
            },
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="generic-kafka-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
        "kafka_settings": {
            "authentication": {
                "method": azure_native.iotoperations.KafkaAuthMethod.SASL,
                "sasl_settings": {
                    "sasl_type": azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.PLAIN,
                    "secret_ref": "my-secret",
                },
            },
            "batching": {
                "latency_ms": 5,
                "max_bytes": 1000000,
                "max_messages": 100000,
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
            },
            "cloud_event_attributes": azure_native.iotoperations.CloudEventAttributeType.PROPAGATE,
            "compression": azure_native.iotoperations.DataflowEndpointKafkaCompression.GZIP,
            "consumer_group_id": "dataflows",
            "copy_mqtt_properties": azure_native.iotoperations.OperationalMode.ENABLED,
            "host": "example.kafka.local:9093",
            "kafka_acks": azure_native.iotoperations.DataflowEndpointKafkaAcks.ALL,
            "partition_strategy": azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.DEFAULT,
            "tls": {
                "mode": azure_native.iotoperations.OperationalMode.ENABLED,
                "trusted_ca_certificate_config_map_ref": "ca-certificates",
            },
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("generic-kafka-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
				KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
					Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
						Method: pulumi.String(iotoperations.KafkaAuthMethodSasl),
						SaslSettings: &iotoperations.DataflowEndpointAuthenticationSaslArgs{
							SaslType:  pulumi.String(iotoperations.DataflowEndpointAuthenticationSaslTypePlain),
							SecretRef: pulumi.String("my-secret"),
						},
					},
					Batching: &iotoperations.DataflowEndpointKafkaBatchingArgs{
						LatencyMs:   pulumi.Int(5),
						MaxBytes:    pulumi.Int(1000000),
						MaxMessages: pulumi.Int(100000),
						Mode:        pulumi.String(iotoperations.OperationalModeEnabled),
					},
					CloudEventAttributes: pulumi.String(iotoperations.CloudEventAttributeTypePropagate),
					Compression:          pulumi.String(iotoperations.DataflowEndpointKafkaCompressionGzip),
					ConsumerGroupId:      pulumi.String("dataflows"),
					CopyMqttProperties:   pulumi.String(iotoperations.OperationalModeEnabled),
					Host:                 pulumi.String("example.kafka.local:9093"),
					KafkaAcks:            pulumi.String(iotoperations.DataflowEndpointKafkaAcksAll),
					PartitionStrategy:    pulumi.String(iotoperations.DataflowEndpointKafkaPartitionStrategyDefault),
					Tls: &iotoperations.TlsPropertiesArgs{
						Mode:                             pulumi.String(iotoperations.OperationalModeEnabled),
						TrustedCaCertificateConfigMapRef: pulumi.String("ca-certificates"),
					},
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "generic-kafka-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
            KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
                {
                    Method = AzureNative.IoTOperations.KafkaAuthMethod.Sasl,
                    SaslSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationSaslArgs
                    {
                        SaslType = AzureNative.IoTOperations.DataflowEndpointAuthenticationSaslType.Plain,
                        SecretRef = "my-secret",
                    },
                },
                Batching = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaBatchingArgs
                {
                    LatencyMs = 5,
                    MaxBytes = 1000000,
                    MaxMessages = 100000,
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                },
                CloudEventAttributes = AzureNative.IoTOperations.CloudEventAttributeType.Propagate,
                Compression = AzureNative.IoTOperations.DataflowEndpointKafkaCompression.Gzip,
                ConsumerGroupId = "dataflows",
                CopyMqttProperties = AzureNative.IoTOperations.OperationalMode.Enabled,
                Host = "example.kafka.local:9093",
                KafkaAcks = AzureNative.IoTOperations.DataflowEndpointKafkaAcks.All,
                PartitionStrategy = AzureNative.IoTOperations.DataflowEndpointKafkaPartitionStrategy.Default,
                Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
                {
                    Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
                    TrustedCaCertificateConfigMapRef = "ca-certificates",
                },
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSaslArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaBatchingArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("generic-kafka-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("Kafka")
                .kafkaSettings(DataflowEndpointKafkaArgs.builder()
                    .authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
                        .method("Sasl")
                        .saslSettings(DataflowEndpointAuthenticationSaslArgs.builder()
                            .saslType("Plain")
                            .secretRef("my-secret")
                            .build())
                        .build())
                    .batching(DataflowEndpointKafkaBatchingArgs.builder()
                        .latencyMs(5)
                        .maxBytes(1000000)
                        .maxMessages(100000)
                        .mode("Enabled")
                        .build())
                    .cloudEventAttributes("Propagate")
                    .compression("Gzip")
                    .consumerGroupId("dataflows")
                    .copyMqttProperties("Enabled")
                    .host("example.kafka.local:9093")
                    .kafkaAcks("All")
                    .partitionStrategy("Default")
                    .tls(TlsPropertiesArgs.builder()
                        .mode("Enabled")
                        .trustedCaCertificateConfigMapRef("ca-certificates")
                        .build())
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: generic-kafka-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: Kafka
        kafkaSettings:
          authentication:
            method: Sasl
            saslSettings:
              saslType: Plain
              secretRef: my-secret
          batching:
            latencyMs: 5
            maxBytes: 1e+06
            maxMessages: 100000
            mode: Enabled
          cloudEventAttributes: Propagate
          compression: Gzip
          consumerGroupId: dataflows
          copyMqttProperties: Enabled
          host: example.kafka.local:9093
          kafkaAcks: All
          partitionStrategy: Default
          tls:
            mode: Enabled
            trustedCaCertificateConfigMapRef: ca-certificates
      resourceGroupName: rgiotoperations

The authentication method uses SASL/PLAIN with credentials stored in a Kubernetes secret. The batching block controls message aggregation with latencyMs, maxBytes, and maxMessages thresholds. The compression property enables Gzip compression to reduce network bandwidth. The kafkaAcks property set to “All” ensures messages are replicated before acknowledgment.

Write to Azure Data Lake Storage Gen2

Data lake architectures store raw telemetry in ADLS Gen2 for long-term retention and batch analytics.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "adlsv2-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        dataLakeStorageSettings: {
            authentication: {
                accessTokenSettings: {
                    secretRef: "my-secret",
                },
                method: azure_native.iotoperations.DataLakeStorageAuthMethod.AccessToken,
            },
            host: "example.blob.core.windows.net",
        },
        endpointType: azure_native.iotoperations.EndpointType.DataLakeStorage,
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="adlsv2-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "data_lake_storage_settings": {
            "authentication": {
                "access_token_settings": {
                    "secret_ref": "my-secret",
                },
                "method": azure_native.iotoperations.DataLakeStorageAuthMethod.ACCESS_TOKEN,
            },
            "host": "example.blob.core.windows.net",
        },
        "endpoint_type": azure_native.iotoperations.EndpointType.DATA_LAKE_STORAGE,
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("adlsv2-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				DataLakeStorageSettings: &iotoperations.DataflowEndpointDataLakeStorageArgs{
					Authentication: &iotoperations.DataflowEndpointDataLakeStorageAuthenticationArgs{
						AccessTokenSettings: &iotoperations.DataflowEndpointAuthenticationAccessTokenArgs{
							SecretRef: pulumi.String("my-secret"),
						},
						Method: pulumi.String(iotoperations.DataLakeStorageAuthMethodAccessToken),
					},
					Host: pulumi.String("example.blob.core.windows.net"),
				},
				EndpointType: pulumi.String(iotoperations.EndpointTypeDataLakeStorage),
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "adlsv2-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            DataLakeStorageSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageAuthenticationArgs
                {
                    AccessTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationAccessTokenArgs
                    {
                        SecretRef = "my-secret",
                    },
                    Method = AzureNative.IoTOperations.DataLakeStorageAuthMethod.AccessToken,
                },
                Host = "example.blob.core.windows.net",
            },
            EndpointType = AzureNative.IoTOperations.EndpointType.DataLakeStorage,
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationAccessTokenArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("adlsv2-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .dataLakeStorageSettings(DataflowEndpointDataLakeStorageArgs.builder()
                    .authentication(DataflowEndpointDataLakeStorageAuthenticationArgs.builder()
                        .accessTokenSettings(DataflowEndpointAuthenticationAccessTokenArgs.builder()
                            .secretRef("my-secret")
                            .build())
                        .method("AccessToken")
                        .build())
                    .host("example.blob.core.windows.net")
                    .build())
                .endpointType("DataLakeStorage")
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: adlsv2-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        dataLakeStorageSettings:
          authentication:
            accessTokenSettings:
              secretRef: my-secret
            method: AccessToken
          host: example.blob.core.windows.net
        endpointType: DataLakeStorage
      resourceGroupName: rgiotoperations

The dataLakeStorageSettings block configures ADLS Gen2 connectivity. The authentication method uses access tokens stored in a Kubernetes secret. The host property points to the storage account’s blob endpoint.

Load into Azure Data Explorer for real-time analytics

Real-time analytics workloads use Azure Data Explorer to query streaming telemetry with low latency.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "adx-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        dataExplorerSettings: {
            authentication: {
                method: "SystemAssignedManagedIdentity",
                systemAssignedManagedIdentitySettings: {},
            },
            batching: {
                latencySeconds: 9312,
                maxMessages: 9028,
            },
            database: "example-database",
            host: "example.westeurope.kusto.windows.net",
        },
        endpointType: azure_native.iotoperations.EndpointType.DataExplorer,
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="adx-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "data_explorer_settings": {
            "authentication": {
                "method": "SystemAssignedManagedIdentity",
                "system_assigned_managed_identity_settings": {},
            },
            "batching": {
                "latency_seconds": 9312,
                "max_messages": 9028,
            },
            "database": "example-database",
            "host": "example.westeurope.kusto.windows.net",
        },
        "endpoint_type": azure_native.iotoperations.EndpointType.DATA_EXPLORER,
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("adx-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				DataExplorerSettings: &iotoperations.DataflowEndpointDataExplorerArgs{
					Authentication: &iotoperations.DataflowEndpointDataExplorerAuthenticationArgs{
						Method:                                pulumi.Any("SystemAssignedManagedIdentity"),
						SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
					},
					Batching: &iotoperations.BatchingConfigurationArgs{
						LatencySeconds: pulumi.Int(9312),
						MaxMessages:    pulumi.Int(9028),
					},
					Database: pulumi.String("example-database"),
					Host:     pulumi.String("example.westeurope.kusto.windows.net"),
				},
				EndpointType: pulumi.String(iotoperations.EndpointTypeDataExplorer),
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "adx-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            DataExplorerSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerAuthenticationArgs
                {
                    Method = "SystemAssignedManagedIdentity",
                    SystemAssignedManagedIdentitySettings = null,
                },
                Batching = new AzureNative.IoTOperations.Inputs.BatchingConfigurationArgs
                {
                    LatencySeconds = 9312,
                    MaxMessages = 9028,
                },
                Database = "example-database",
                Host = "example.westeurope.kusto.windows.net",
            },
            EndpointType = AzureNative.IoTOperations.EndpointType.DataExplorer,
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.BatchingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("adx-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .dataExplorerSettings(DataflowEndpointDataExplorerArgs.builder()
                    .authentication(DataflowEndpointDataExplorerAuthenticationArgs.builder()
                        .method("SystemAssignedManagedIdentity")
                        .systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
                            .build())
                        .build())
                    .batching(BatchingConfigurationArgs.builder()
                        .latencySeconds(9312)
                        .maxMessages(9028)
                        .build())
                    .database("example-database")
                    .host("example.westeurope.kusto.windows.net")
                    .build())
                .endpointType("DataExplorer")
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: adx-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        dataExplorerSettings:
          authentication:
            method: SystemAssignedManagedIdentity
            systemAssignedManagedIdentitySettings: {}
          batching:
            latencySeconds: 9312
            maxMessages: 9028
          database: example-database
          host: example.westeurope.kusto.windows.net
        endpointType: DataExplorer
      resourceGroupName: rgiotoperations

The dataExplorerSettings block configures ADX connectivity with systemAssignedManagedIdentitySettings for authentication. The database property specifies the target database name, and the host property points to the ADX cluster endpoint. The batching block aggregates messages before ingestion to improve throughput.

Write to Microsoft Fabric OneLake

Microsoft Fabric deployments centralize data in OneLake, where lakehouses provide unified storage for analytics and AI workloads.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "fabric-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.FabricOneLake,
        fabricOneLakeSettings: {
            authentication: {
                method: "SystemAssignedManagedIdentity",
                systemAssignedManagedIdentitySettings: {},
            },
            host: "onelake.dfs.fabric.microsoft.com",
            names: {
                lakehouseName: "example-lakehouse",
                workspaceName: "example-workspace",
            },
            oneLakePathType: azure_native.iotoperations.DataflowEndpointFabricPathType.Tables,
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="fabric-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.FABRIC_ONE_LAKE,
        "fabric_one_lake_settings": {
            "authentication": {
                "method": "SystemAssignedManagedIdentity",
                "system_assigned_managed_identity_settings": {},
            },
            "host": "onelake.dfs.fabric.microsoft.com",
            "names": {
                "lakehouse_name": "example-lakehouse",
                "workspace_name": "example-workspace",
            },
            "one_lake_path_type": azure_native.iotoperations.DataflowEndpointFabricPathType.TABLES,
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("fabric-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeFabricOneLake),
				FabricOneLakeSettings: &iotoperations.DataflowEndpointFabricOneLakeArgs{
					Authentication: &iotoperations.DataflowEndpointFabricOneLakeAuthenticationArgs{
						Method:                                pulumi.Any("SystemAssignedManagedIdentity"),
						SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
					},
					Host: pulumi.String("onelake.dfs.fabric.microsoft.com"),
					Names: &iotoperations.DataflowEndpointFabricOneLakeNamesArgs{
						LakehouseName: pulumi.String("example-lakehouse"),
						WorkspaceName: pulumi.String("example-workspace"),
					},
					OneLakePathType: pulumi.String(iotoperations.DataflowEndpointFabricPathTypeTables),
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "fabric-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.FabricOneLake,
            FabricOneLakeSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeArgs
            {
                Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeAuthenticationArgs
                {
                    Method = "SystemAssignedManagedIdentity",
                    SystemAssignedManagedIdentitySettings = null,
                },
                Host = "onelake.dfs.fabric.microsoft.com",
                Names = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeNamesArgs
                {
                    LakehouseName = "example-lakehouse",
                    WorkspaceName = "example-workspace",
                },
                OneLakePathType = AzureNative.IoTOperations.DataflowEndpointFabricPathType.Tables,
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeNamesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("fabric-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("FabricOneLake")
                .fabricOneLakeSettings(DataflowEndpointFabricOneLakeArgs.builder()
                    .authentication(DataflowEndpointFabricOneLakeAuthenticationArgs.builder()
                        .method("SystemAssignedManagedIdentity")
                        .systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
                            .build())
                        .build())
                    .host("onelake.dfs.fabric.microsoft.com")
                    .names(DataflowEndpointFabricOneLakeNamesArgs.builder()
                        .lakehouseName("example-lakehouse")
                        .workspaceName("example-workspace")
                        .build())
                    .oneLakePathType("Tables")
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: fabric-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: FabricOneLake
        fabricOneLakeSettings:
          authentication:
            method: SystemAssignedManagedIdentity
            systemAssignedManagedIdentitySettings: {}
          host: onelake.dfs.fabric.microsoft.com
          names:
            lakehouseName: example-lakehouse
            workspaceName: example-workspace
          oneLakePathType: Tables
      resourceGroupName: rgiotoperations

The fabricOneLakeSettings block configures Fabric connectivity. The names property specifies the workspace and lakehouse names, and the oneLakePathType property determines whether data is written to tables or files. The host property points to the Fabric OneLake endpoint.

Buffer to local persistent storage

Edge deployments with intermittent connectivity buffer messages to local storage before forwarding to cloud endpoints.

import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";

const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
    dataflowEndpointName: "local-storage-endpoint",
    extendedLocation: {
        name: "qmbrfwcpwwhggszhrdjv",
        type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
    },
    instanceName: "resource-name123",
    properties: {
        endpointType: azure_native.iotoperations.EndpointType.LocalStorage,
        localStorageSettings: {
            persistentVolumeClaimRef: "example-pvc",
        },
    },
    resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native

dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
    dataflow_endpoint_name="local-storage-endpoint",
    extended_location={
        "name": "qmbrfwcpwwhggszhrdjv",
        "type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
    },
    instance_name="resource-name123",
    properties={
        "endpoint_type": azure_native.iotoperations.EndpointType.LOCAL_STORAGE,
        "local_storage_settings": {
            "persistent_volume_claim_ref": "example-pvc",
        },
    },
    resource_group_name="rgiotoperations")
package main

import (
	iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
			DataflowEndpointName: pulumi.String("local-storage-endpoint"),
			ExtendedLocation: &iotoperations.ExtendedLocationArgs{
				Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
				Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
			},
			InstanceName: pulumi.String("resource-name123"),
			Properties: &iotoperations.DataflowEndpointPropertiesArgs{
				EndpointType: pulumi.String(iotoperations.EndpointTypeLocalStorage),
				LocalStorageSettings: &iotoperations.DataflowEndpointLocalStorageArgs{
					PersistentVolumeClaimRef: pulumi.String("example-pvc"),
				},
			},
			ResourceGroupName: pulumi.String("rgiotoperations"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;

return await Deployment.RunAsync(() => 
{
    var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
    {
        DataflowEndpointName = "local-storage-endpoint",
        ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
        {
            Name = "qmbrfwcpwwhggszhrdjv",
            Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
        },
        InstanceName = "resource-name123",
        Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
        {
            EndpointType = AzureNative.IoTOperations.EndpointType.LocalStorage,
            LocalStorageSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointLocalStorageArgs
            {
                PersistentVolumeClaimRef = "example-pvc",
            },
        },
        ResourceGroupName = "rgiotoperations",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointLocalStorageArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
            .dataflowEndpointName("local-storage-endpoint")
            .extendedLocation(ExtendedLocationArgs.builder()
                .name("qmbrfwcpwwhggszhrdjv")
                .type("CustomLocation")
                .build())
            .instanceName("resource-name123")
            .properties(DataflowEndpointPropertiesArgs.builder()
                .endpointType("LocalStorage")
                .localStorageSettings(DataflowEndpointLocalStorageArgs.builder()
                    .persistentVolumeClaimRef("example-pvc")
                    .build())
                .build())
            .resourceGroupName("rgiotoperations")
            .build());

    }
}
resources:
  dataflowEndpoint:
    type: azure-native:iotoperations:DataflowEndpoint
    properties:
      dataflowEndpointName: local-storage-endpoint
      extendedLocation:
        name: qmbrfwcpwwhggszhrdjv
        type: CustomLocation
      instanceName: resource-name123
      properties:
        endpointType: LocalStorage
        localStorageSettings:
          persistentVolumeClaimRef: example-pvc
      resourceGroupName: rgiotoperations

The localStorageSettings block references a Kubernetes PersistentVolumeClaim by name. Messages are written to the persistent volume, providing offline resilience when cloud connectivity is unavailable.

Beyond these examples

These snippets focus on specific endpoint-level features: MQTT and Kafka protocol endpoints, Azure service integrations, and authentication methods. They’re intentionally minimal rather than full dataflow pipelines.

The examples reference pre-existing infrastructure such as Azure IoT Operations instances and custom locations, Kubernetes secrets and ConfigMaps for credentials and certificates, Azure resources like storage accounts and Event Hub namespaces, and persistent volume claims for local storage. They focus on configuring the endpoint rather than provisioning the surrounding infrastructure.

To keep things focused, common endpoint patterns are omitted, including:

  • CloudEvent attribute handling (cloudEventAttributes property)
  • MQTT message retention and QoS tuning
  • Kafka partition strategies and acknowledgment modes
  • User-assigned managed identity configuration

These omissions are intentional: the goal is to illustrate how each endpoint type is wired, not provide drop-in dataflow modules. See the DataflowEndpoint resource reference for all available configuration options.

Let's configure Azure IoT Operations Dataflow Endpoints

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Endpoint Types & Configuration
What types of dataflow endpoints can I configure?
You can configure six endpoint types: DataExplorer (Azure Data Explorer/Kusto), DataLakeStorage (Azure Data Lake Storage Gen2), FabricOneLake (Microsoft Fabric), Kafka (including Azure Event Hub), LocalStorage (persistent volume claims), and Mqtt (MQTT brokers including the built-in AIO broker).
Can I change the endpoint type after creation?
The extendedLocation property is immutable after creation. While the schema doesn’t explicitly state endpoint type immutability, changing fundamental endpoint configuration typically requires recreating the resource.
How do I connect to the built-in AIO MQTT broker?
Set endpointType to Mqtt, configure host as aio-broker:18883, use Kubernetes authentication with serviceAccountTokenSettings.audience set to aio-internal, and enable TLS with trustedCaCertificateConfigMapRef pointing to aio-ca-trust-bundle-test-only.
How do I connect to Azure Event Hub?
Use endpointType of Kafka with kafkaSettings. Set host to your Event Hub namespace (e.g., example.servicebus.windows.net:9093), configure SystemAssignedManagedIdentity authentication, specify a consumerGroupId, and enable TLS.
How do I connect to Azure Data Explorer?
Set endpointType to DataExplorer and configure dataExplorerSettings with host (your cluster URL like example.westeurope.kusto.windows.net), database name, and SystemAssignedManagedIdentity authentication with optional batching settings.
How do I connect to Microsoft Fabric OneLake?
Use endpointType of FabricOneLake with fabricOneLakeSettings. Configure host as onelake.dfs.fabric.microsoft.com, specify workspaceName and lakehouseName in the names object, set oneLakePathType (Tables or Files), and use SystemAssignedManagedIdentity authentication.
Authentication & Security
What authentication methods are supported for different endpoint types?

Authentication methods vary by endpoint type:

  • MQTT: SystemAssignedManagedIdentity, UserAssignedManagedIdentity, Kubernetes (service account tokens), X509Certificate
  • Kafka: SystemAssignedManagedIdentity, UserAssignedManagedIdentity, Sasl (Plain, ScramSha256, ScramSha512), X509Certificate
  • Data Explorer: SystemAssignedManagedIdentity, UserAssignedManagedIdentity
  • Data Lake Storage: SystemAssignedManagedIdentity, UserAssignedManagedIdentity, AccessToken
  • Fabric OneLake: SystemAssignedManagedIdentity, UserAssignedManagedIdentity
How do I configure TLS for my endpoint?
Configure the tls object with mode set to Enabled or Disabled. For custom CA certificates, set trustedCaCertificateConfigMapRef to reference a ConfigMap containing your trusted certificates. TLS is available for MQTT and Kafka endpoints.
How do I use SASL authentication with Kafka?
Set authentication.method to Sasl and configure saslSettings with saslType (Plain, ScramSha256, or ScramSha512) and secretRef pointing to a Kubernetes secret containing your credentials.
Performance & Batching
What's the difference between Kafka batching and other endpoint batching?
Kafka batching uses latencyMs (milliseconds), maxBytes, maxMessages, and a mode setting (Enabled/Disabled). Other endpoints (Data Explorer, Data Lake Storage, Fabric OneLake) use only latencySeconds and maxMessages without a mode setting.
What Kafka-specific performance settings are available?
Kafka endpoints support compression (None, Gzip, Snappy, Lz4), kafkaAcks acknowledgment levels (Zero, One, All), partitionStrategy (Default, Static, Topic, Property), and copyMqttProperties to preserve MQTT metadata.
Protocol-Specific Settings
What MQTT-specific settings can I configure?
MQTT endpoints support qos (quality of service level 0-2), retain (Keep or Never), keepAliveSeconds, maxInflightMessages, protocol (Mqtt or WebSockets), sessionExpirySeconds, and clientIdPrefix for client identification.

Using a different cloud?

Explore integration guides for other cloud providers: