The azure-native:iotoperations:DataflowEndpoint resource, part of the Pulumi Azure Native provider, defines dataflow endpoints that specify where messages are sent or received: MQTT brokers, Kafka clusters, Azure services, or local storage. This guide focuses on four capabilities: MQTT broker connections, Kafka and Event Hub streaming, Azure storage and analytics services, and local persistent storage.
Dataflow endpoints reference existing Azure IoT Operations instances, Kubernetes secrets and ConfigMaps, Azure resources, and persistent volumes. The examples are intentionally small. Combine them with your own authentication configuration, batching policies, and network settings.
Connect to the built-in Azure IoT Operations broker
IoT Operations deployments typically start by connecting dataflows to the built-in MQTT broker, which serves as the central message hub for device telemetry and commands.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "aio-builtin-broker-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.Mqtt,
mqttSettings: {
authentication: {
method: "Kubernetes",
serviceAccountTokenSettings: {
audience: "aio-internal",
},
},
host: "aio-broker:18883",
tls: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
trustedCaCertificateConfigMapRef: "aio-ca-trust-bundle-test-only",
},
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="aio-builtin-broker-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.MQTT,
"mqtt_settings": {
"authentication": {
"method": "Kubernetes",
"service_account_token_settings": {
"audience": "aio-internal",
},
},
"host": "aio-broker:18883",
"tls": {
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"trusted_ca_certificate_config_map_ref": "aio-ca-trust-bundle-test-only",
},
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("aio-builtin-broker-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeMqtt),
MqttSettings: &iotoperations.DataflowEndpointMqttArgs{
Authentication: &iotoperations.DataflowEndpointMqttAuthenticationArgs{
Method: pulumi.String("Kubernetes"),
ServiceAccountTokenSettings: &iotoperations.DataflowEndpointAuthenticationServiceAccountTokenArgs{
Audience: pulumi.String("aio-internal"),
},
},
Host: pulumi.String("aio-broker:18883"),
Tls: &iotoperations.TlsPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
TrustedCaCertificateConfigMapRef: pulumi.String("aio-ca-trust-bundle-test-only"),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "aio-builtin-broker-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.Mqtt,
MqttSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttAuthenticationArgs
{
Method = "Kubernetes",
ServiceAccountTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs
{
Audience = "aio-internal",
},
},
Host = "aio-broker:18883",
Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
TrustedCaCertificateConfigMapRef = "aio-ca-trust-bundle-test-only",
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("aio-builtin-broker-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("Mqtt")
.mqttSettings(DataflowEndpointMqttArgs.builder()
.authentication(DataflowEndpointMqttAuthenticationArgs.builder()
.method("Kubernetes")
.serviceAccountTokenSettings(DataflowEndpointAuthenticationServiceAccountTokenArgs.builder()
.audience("aio-internal")
.build())
.build())
.host("aio-broker:18883")
.tls(TlsPropertiesArgs.builder()
.mode("Enabled")
.trustedCaCertificateConfigMapRef("aio-ca-trust-bundle-test-only")
.build())
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: aio-builtin-broker-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: Mqtt
mqttSettings:
authentication:
method: Kubernetes
serviceAccountTokenSettings:
audience: aio-internal
host: aio-broker:18883
tls:
mode: Enabled
trustedCaCertificateConfigMapRef: aio-ca-trust-bundle-test-only
resourceGroupName: rgiotoperations
The mqttSettings block configures MQTT connectivity. The authentication method uses Kubernetes service account tokens with the audience set to “aio-internal” for cluster-internal communication. The host property points to the built-in broker service at “aio-broker:18883”, and TLS is enabled with a trusted CA certificate from a ConfigMap.
Connect to external MQTT brokers with X.509 authentication
Factory gateways and edge deployments often need to bridge messages to external MQTT brokers using certificate-based authentication.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "generic-mqtt-broker-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.Mqtt,
mqttSettings: {
authentication: {
method: azure_native.iotoperations.MqttAuthMethod.X509Certificate,
x509CertificateSettings: {
secretRef: "example-secret",
},
},
clientIdPrefix: "factory-gateway",
host: "example.broker.local:1883",
keepAliveSeconds: 60,
maxInflightMessages: 100,
protocol: azure_native.iotoperations.BrokerProtocolType.WebSockets,
qos: 1,
retain: azure_native.iotoperations.MqttRetainType.Keep,
sessionExpirySeconds: 3600,
tls: {
mode: azure_native.iotoperations.OperationalMode.Disabled,
},
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="generic-mqtt-broker-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.MQTT,
"mqtt_settings": {
"authentication": {
"method": azure_native.iotoperations.MqttAuthMethod.X509_CERTIFICATE,
"x509_certificate_settings": {
"secret_ref": "example-secret",
},
},
"client_id_prefix": "factory-gateway",
"host": "example.broker.local:1883",
"keep_alive_seconds": 60,
"max_inflight_messages": 100,
"protocol": azure_native.iotoperations.BrokerProtocolType.WEB_SOCKETS,
"qos": 1,
"retain": azure_native.iotoperations.MqttRetainType.KEEP,
"session_expiry_seconds": 3600,
"tls": {
"mode": azure_native.iotoperations.OperationalMode.DISABLED,
},
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("generic-mqtt-broker-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeMqtt),
MqttSettings: &iotoperations.DataflowEndpointMqttArgs{
Authentication: &iotoperations.DataflowEndpointMqttAuthenticationArgs{
Method: pulumi.String(iotoperations.MqttAuthMethodX509Certificate),
X509CertificateSettings: &iotoperations.DataflowEndpointAuthenticationX509Args{
SecretRef: pulumi.String("example-secret"),
},
},
ClientIdPrefix: pulumi.String("factory-gateway"),
Host: pulumi.String("example.broker.local:1883"),
KeepAliveSeconds: pulumi.Int(60),
MaxInflightMessages: pulumi.Int(100),
Protocol: pulumi.String(iotoperations.BrokerProtocolTypeWebSockets),
Qos: pulumi.Int(1),
Retain: pulumi.String(iotoperations.MqttRetainTypeKeep),
SessionExpirySeconds: pulumi.Int(3600),
Tls: &iotoperations.TlsPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeDisabled),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "generic-mqtt-broker-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.Mqtt,
MqttSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttAuthenticationArgs
{
Method = AzureNative.IoTOperations.MqttAuthMethod.X509Certificate,
X509CertificateSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationX509Args
{
SecretRef = "example-secret",
},
},
ClientIdPrefix = "factory-gateway",
Host = "example.broker.local:1883",
KeepAliveSeconds = 60,
MaxInflightMessages = 100,
Protocol = AzureNative.IoTOperations.BrokerProtocolType.WebSockets,
Qos = 1,
Retain = AzureNative.IoTOperations.MqttRetainType.Keep,
SessionExpirySeconds = 3600,
Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Disabled,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationX509Args;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("generic-mqtt-broker-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("Mqtt")
.mqttSettings(DataflowEndpointMqttArgs.builder()
.authentication(DataflowEndpointMqttAuthenticationArgs.builder()
.method("X509Certificate")
.x509CertificateSettings(DataflowEndpointAuthenticationX509Args.builder()
.secretRef("example-secret")
.build())
.build())
.clientIdPrefix("factory-gateway")
.host("example.broker.local:1883")
.keepAliveSeconds(60)
.maxInflightMessages(100)
.protocol("WebSockets")
.qos(1)
.retain("Keep")
.sessionExpirySeconds(3600)
.tls(TlsPropertiesArgs.builder()
.mode("Disabled")
.build())
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: generic-mqtt-broker-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: Mqtt
mqttSettings:
authentication:
method: X509Certificate
x509CertificateSettings:
secretRef: example-secret
clientIdPrefix: factory-gateway
host: example.broker.local:1883
keepAliveSeconds: 60
maxInflightMessages: 100
protocol: WebSockets
qos: 1
retain: Keep
sessionExpirySeconds: 3600
tls:
mode: Disabled
resourceGroupName: rgiotoperations
The authentication method switches to X.509 certificates, with the secretRef pointing to a Kubernetes secret containing the client certificate. The protocol property selects WebSockets transport, and connection parameters like keepAliveSeconds, maxInflightMessages, and qos control reliability and throughput. The retain property determines whether the broker should retain messages for new subscribers.
Stream to Azure Event Hubs via Kafka protocol
Cloud analytics pipelines commonly ingest IoT telemetry through Event Hubs, which exposes a Kafka-compatible endpoint.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "event-hub-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.Kafka,
kafkaSettings: {
authentication: {
method: azure_native.iotoperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
systemAssignedManagedIdentitySettings: {},
},
consumerGroupId: "aiodataflows",
host: "example.servicebus.windows.net:9093",
tls: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
},
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="event-hub-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
"kafka_settings": {
"authentication": {
"method": azure_native.iotoperations.KafkaAuthMethod.SYSTEM_ASSIGNED_MANAGED_IDENTITY,
"system_assigned_managed_identity_settings": {},
},
"consumer_group_id": "aiodataflows",
"host": "example.servicebus.windows.net:9093",
"tls": {
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
},
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("event-hub-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
Method: pulumi.String(iotoperations.KafkaAuthMethodSystemAssignedManagedIdentity),
SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
},
ConsumerGroupId: pulumi.String("aiodataflows"),
Host: pulumi.String("example.servicebus.windows.net:9093"),
Tls: &iotoperations.TlsPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "event-hub-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
{
Method = AzureNative.IoTOperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
SystemAssignedManagedIdentitySettings = null,
},
ConsumerGroupId = "aiodataflows",
Host = "example.servicebus.windows.net:9093",
Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("event-hub-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("Kafka")
.kafkaSettings(DataflowEndpointKafkaArgs.builder()
.authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
.method("SystemAssignedManagedIdentity")
.systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
.build())
.build())
.consumerGroupId("aiodataflows")
.host("example.servicebus.windows.net:9093")
.tls(TlsPropertiesArgs.builder()
.mode("Enabled")
.build())
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: event-hub-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: Kafka
kafkaSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings: {}
consumerGroupId: aiodataflows
host: example.servicebus.windows.net:9093
tls:
mode: Enabled
resourceGroupName: rgiotoperations
Event Hubs uses the kafkaSettings block with systemAssignedManagedIdentitySettings for authentication. The consumerGroupId identifies the consumer group for reading messages, and the host property points to the Event Hub namespace’s Kafka endpoint. TLS is enabled for secure communication.
Send to generic Kafka clusters with SASL authentication
On-premises or third-party Kafka deployments require SASL authentication and often need batching and compression to optimize network usage.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "generic-kafka-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.Kafka,
kafkaSettings: {
authentication: {
method: azure_native.iotoperations.KafkaAuthMethod.Sasl,
saslSettings: {
saslType: azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.Plain,
secretRef: "my-secret",
},
},
batching: {
latencyMs: 5,
maxBytes: 1000000,
maxMessages: 100000,
mode: azure_native.iotoperations.OperationalMode.Enabled,
},
cloudEventAttributes: azure_native.iotoperations.CloudEventAttributeType.Propagate,
compression: azure_native.iotoperations.DataflowEndpointKafkaCompression.Gzip,
consumerGroupId: "dataflows",
copyMqttProperties: azure_native.iotoperations.OperationalMode.Enabled,
host: "example.kafka.local:9093",
kafkaAcks: azure_native.iotoperations.DataflowEndpointKafkaAcks.All,
partitionStrategy: azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.Default,
tls: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
trustedCaCertificateConfigMapRef: "ca-certificates",
},
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="generic-kafka-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
"kafka_settings": {
"authentication": {
"method": azure_native.iotoperations.KafkaAuthMethod.SASL,
"sasl_settings": {
"sasl_type": azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.PLAIN,
"secret_ref": "my-secret",
},
},
"batching": {
"latency_ms": 5,
"max_bytes": 1000000,
"max_messages": 100000,
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
},
"cloud_event_attributes": azure_native.iotoperations.CloudEventAttributeType.PROPAGATE,
"compression": azure_native.iotoperations.DataflowEndpointKafkaCompression.GZIP,
"consumer_group_id": "dataflows",
"copy_mqtt_properties": azure_native.iotoperations.OperationalMode.ENABLED,
"host": "example.kafka.local:9093",
"kafka_acks": azure_native.iotoperations.DataflowEndpointKafkaAcks.ALL,
"partition_strategy": azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.DEFAULT,
"tls": {
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"trusted_ca_certificate_config_map_ref": "ca-certificates",
},
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("generic-kafka-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
Method: pulumi.String(iotoperations.KafkaAuthMethodSasl),
SaslSettings: &iotoperations.DataflowEndpointAuthenticationSaslArgs{
SaslType: pulumi.String(iotoperations.DataflowEndpointAuthenticationSaslTypePlain),
SecretRef: pulumi.String("my-secret"),
},
},
Batching: &iotoperations.DataflowEndpointKafkaBatchingArgs{
LatencyMs: pulumi.Int(5),
MaxBytes: pulumi.Int(1000000),
MaxMessages: pulumi.Int(100000),
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
},
CloudEventAttributes: pulumi.String(iotoperations.CloudEventAttributeTypePropagate),
Compression: pulumi.String(iotoperations.DataflowEndpointKafkaCompressionGzip),
ConsumerGroupId: pulumi.String("dataflows"),
CopyMqttProperties: pulumi.String(iotoperations.OperationalModeEnabled),
Host: pulumi.String("example.kafka.local:9093"),
KafkaAcks: pulumi.String(iotoperations.DataflowEndpointKafkaAcksAll),
PartitionStrategy: pulumi.String(iotoperations.DataflowEndpointKafkaPartitionStrategyDefault),
Tls: &iotoperations.TlsPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
TrustedCaCertificateConfigMapRef: pulumi.String("ca-certificates"),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "generic-kafka-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
{
Method = AzureNative.IoTOperations.KafkaAuthMethod.Sasl,
SaslSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationSaslArgs
{
SaslType = AzureNative.IoTOperations.DataflowEndpointAuthenticationSaslType.Plain,
SecretRef = "my-secret",
},
},
Batching = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaBatchingArgs
{
LatencyMs = 5,
MaxBytes = 1000000,
MaxMessages = 100000,
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
},
CloudEventAttributes = AzureNative.IoTOperations.CloudEventAttributeType.Propagate,
Compression = AzureNative.IoTOperations.DataflowEndpointKafkaCompression.Gzip,
ConsumerGroupId = "dataflows",
CopyMqttProperties = AzureNative.IoTOperations.OperationalMode.Enabled,
Host = "example.kafka.local:9093",
KafkaAcks = AzureNative.IoTOperations.DataflowEndpointKafkaAcks.All,
PartitionStrategy = AzureNative.IoTOperations.DataflowEndpointKafkaPartitionStrategy.Default,
Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
TrustedCaCertificateConfigMapRef = "ca-certificates",
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSaslArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaBatchingArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("generic-kafka-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("Kafka")
.kafkaSettings(DataflowEndpointKafkaArgs.builder()
.authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
.method("Sasl")
.saslSettings(DataflowEndpointAuthenticationSaslArgs.builder()
.saslType("Plain")
.secretRef("my-secret")
.build())
.build())
.batching(DataflowEndpointKafkaBatchingArgs.builder()
.latencyMs(5)
.maxBytes(1000000)
.maxMessages(100000)
.mode("Enabled")
.build())
.cloudEventAttributes("Propagate")
.compression("Gzip")
.consumerGroupId("dataflows")
.copyMqttProperties("Enabled")
.host("example.kafka.local:9093")
.kafkaAcks("All")
.partitionStrategy("Default")
.tls(TlsPropertiesArgs.builder()
.mode("Enabled")
.trustedCaCertificateConfigMapRef("ca-certificates")
.build())
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: generic-kafka-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: Kafka
kafkaSettings:
authentication:
method: Sasl
saslSettings:
saslType: Plain
secretRef: my-secret
batching:
latencyMs: 5
maxBytes: 1e+06
maxMessages: 100000
mode: Enabled
cloudEventAttributes: Propagate
compression: Gzip
consumerGroupId: dataflows
copyMqttProperties: Enabled
host: example.kafka.local:9093
kafkaAcks: All
partitionStrategy: Default
tls:
mode: Enabled
trustedCaCertificateConfigMapRef: ca-certificates
resourceGroupName: rgiotoperations
The authentication method uses SASL/PLAIN with credentials stored in a Kubernetes secret. The batching block controls message aggregation with latencyMs, maxBytes, and maxMessages thresholds. The compression property enables Gzip compression to reduce network bandwidth. The kafkaAcks property set to “All” ensures messages are replicated before acknowledgment.
Write to Azure Data Lake Storage Gen2
Data lake architectures store raw telemetry in ADLS Gen2 for long-term retention and batch analytics.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "adlsv2-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
dataLakeStorageSettings: {
authentication: {
accessTokenSettings: {
secretRef: "my-secret",
},
method: azure_native.iotoperations.DataLakeStorageAuthMethod.AccessToken,
},
host: "example.blob.core.windows.net",
},
endpointType: azure_native.iotoperations.EndpointType.DataLakeStorage,
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="adlsv2-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"data_lake_storage_settings": {
"authentication": {
"access_token_settings": {
"secret_ref": "my-secret",
},
"method": azure_native.iotoperations.DataLakeStorageAuthMethod.ACCESS_TOKEN,
},
"host": "example.blob.core.windows.net",
},
"endpoint_type": azure_native.iotoperations.EndpointType.DATA_LAKE_STORAGE,
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("adlsv2-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
DataLakeStorageSettings: &iotoperations.DataflowEndpointDataLakeStorageArgs{
Authentication: &iotoperations.DataflowEndpointDataLakeStorageAuthenticationArgs{
AccessTokenSettings: &iotoperations.DataflowEndpointAuthenticationAccessTokenArgs{
SecretRef: pulumi.String("my-secret"),
},
Method: pulumi.String(iotoperations.DataLakeStorageAuthMethodAccessToken),
},
Host: pulumi.String("example.blob.core.windows.net"),
},
EndpointType: pulumi.String(iotoperations.EndpointTypeDataLakeStorage),
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "adlsv2-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
DataLakeStorageSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageAuthenticationArgs
{
AccessTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationAccessTokenArgs
{
SecretRef = "my-secret",
},
Method = AzureNative.IoTOperations.DataLakeStorageAuthMethod.AccessToken,
},
Host = "example.blob.core.windows.net",
},
EndpointType = AzureNative.IoTOperations.EndpointType.DataLakeStorage,
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationAccessTokenArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("adlsv2-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.dataLakeStorageSettings(DataflowEndpointDataLakeStorageArgs.builder()
.authentication(DataflowEndpointDataLakeStorageAuthenticationArgs.builder()
.accessTokenSettings(DataflowEndpointAuthenticationAccessTokenArgs.builder()
.secretRef("my-secret")
.build())
.method("AccessToken")
.build())
.host("example.blob.core.windows.net")
.build())
.endpointType("DataLakeStorage")
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: adlsv2-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
dataLakeStorageSettings:
authentication:
accessTokenSettings:
secretRef: my-secret
method: AccessToken
host: example.blob.core.windows.net
endpointType: DataLakeStorage
resourceGroupName: rgiotoperations
The dataLakeStorageSettings block configures ADLS Gen2 connectivity. The authentication method uses access tokens stored in a Kubernetes secret. The host property points to the storage account’s blob endpoint.
Load into Azure Data Explorer for real-time analytics
Real-time analytics workloads use Azure Data Explorer to query streaming telemetry with low latency.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "adx-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
dataExplorerSettings: {
authentication: {
method: "SystemAssignedManagedIdentity",
systemAssignedManagedIdentitySettings: {},
},
batching: {
latencySeconds: 9312,
maxMessages: 9028,
},
database: "example-database",
host: "example.westeurope.kusto.windows.net",
},
endpointType: azure_native.iotoperations.EndpointType.DataExplorer,
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="adx-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"data_explorer_settings": {
"authentication": {
"method": "SystemAssignedManagedIdentity",
"system_assigned_managed_identity_settings": {},
},
"batching": {
"latency_seconds": 9312,
"max_messages": 9028,
},
"database": "example-database",
"host": "example.westeurope.kusto.windows.net",
},
"endpoint_type": azure_native.iotoperations.EndpointType.DATA_EXPLORER,
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("adx-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
DataExplorerSettings: &iotoperations.DataflowEndpointDataExplorerArgs{
Authentication: &iotoperations.DataflowEndpointDataExplorerAuthenticationArgs{
Method: pulumi.Any("SystemAssignedManagedIdentity"),
SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
},
Batching: &iotoperations.BatchingConfigurationArgs{
LatencySeconds: pulumi.Int(9312),
MaxMessages: pulumi.Int(9028),
},
Database: pulumi.String("example-database"),
Host: pulumi.String("example.westeurope.kusto.windows.net"),
},
EndpointType: pulumi.String(iotoperations.EndpointTypeDataExplorer),
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "adx-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
DataExplorerSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerAuthenticationArgs
{
Method = "SystemAssignedManagedIdentity",
SystemAssignedManagedIdentitySettings = null,
},
Batching = new AzureNative.IoTOperations.Inputs.BatchingConfigurationArgs
{
LatencySeconds = 9312,
MaxMessages = 9028,
},
Database = "example-database",
Host = "example.westeurope.kusto.windows.net",
},
EndpointType = AzureNative.IoTOperations.EndpointType.DataExplorer,
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.BatchingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("adx-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.dataExplorerSettings(DataflowEndpointDataExplorerArgs.builder()
.authentication(DataflowEndpointDataExplorerAuthenticationArgs.builder()
.method("SystemAssignedManagedIdentity")
.systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
.build())
.build())
.batching(BatchingConfigurationArgs.builder()
.latencySeconds(9312)
.maxMessages(9028)
.build())
.database("example-database")
.host("example.westeurope.kusto.windows.net")
.build())
.endpointType("DataExplorer")
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: adx-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
dataExplorerSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings: {}
batching:
latencySeconds: 9312
maxMessages: 9028
database: example-database
host: example.westeurope.kusto.windows.net
endpointType: DataExplorer
resourceGroupName: rgiotoperations
The dataExplorerSettings block configures ADX connectivity with systemAssignedManagedIdentitySettings for authentication. The database property specifies the target database name, and the host property points to the ADX cluster endpoint. The batching block aggregates messages before ingestion to improve throughput.
Write to Microsoft Fabric OneLake
Microsoft Fabric deployments centralize data in OneLake, where lakehouses provide unified storage for analytics and AI workloads.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "fabric-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.FabricOneLake,
fabricOneLakeSettings: {
authentication: {
method: "SystemAssignedManagedIdentity",
systemAssignedManagedIdentitySettings: {},
},
host: "onelake.dfs.fabric.microsoft.com",
names: {
lakehouseName: "example-lakehouse",
workspaceName: "example-workspace",
},
oneLakePathType: azure_native.iotoperations.DataflowEndpointFabricPathType.Tables,
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="fabric-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.FABRIC_ONE_LAKE,
"fabric_one_lake_settings": {
"authentication": {
"method": "SystemAssignedManagedIdentity",
"system_assigned_managed_identity_settings": {},
},
"host": "onelake.dfs.fabric.microsoft.com",
"names": {
"lakehouse_name": "example-lakehouse",
"workspace_name": "example-workspace",
},
"one_lake_path_type": azure_native.iotoperations.DataflowEndpointFabricPathType.TABLES,
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("fabric-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeFabricOneLake),
FabricOneLakeSettings: &iotoperations.DataflowEndpointFabricOneLakeArgs{
Authentication: &iotoperations.DataflowEndpointFabricOneLakeAuthenticationArgs{
Method: pulumi.Any("SystemAssignedManagedIdentity"),
SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
},
Host: pulumi.String("onelake.dfs.fabric.microsoft.com"),
Names: &iotoperations.DataflowEndpointFabricOneLakeNamesArgs{
LakehouseName: pulumi.String("example-lakehouse"),
WorkspaceName: pulumi.String("example-workspace"),
},
OneLakePathType: pulumi.String(iotoperations.DataflowEndpointFabricPathTypeTables),
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "fabric-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.FabricOneLake,
FabricOneLakeSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeAuthenticationArgs
{
Method = "SystemAssignedManagedIdentity",
SystemAssignedManagedIdentitySettings = null,
},
Host = "onelake.dfs.fabric.microsoft.com",
Names = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeNamesArgs
{
LakehouseName = "example-lakehouse",
WorkspaceName = "example-workspace",
},
OneLakePathType = AzureNative.IoTOperations.DataflowEndpointFabricPathType.Tables,
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeNamesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("fabric-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("FabricOneLake")
.fabricOneLakeSettings(DataflowEndpointFabricOneLakeArgs.builder()
.authentication(DataflowEndpointFabricOneLakeAuthenticationArgs.builder()
.method("SystemAssignedManagedIdentity")
.systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
.build())
.build())
.host("onelake.dfs.fabric.microsoft.com")
.names(DataflowEndpointFabricOneLakeNamesArgs.builder()
.lakehouseName("example-lakehouse")
.workspaceName("example-workspace")
.build())
.oneLakePathType("Tables")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: fabric-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: FabricOneLake
fabricOneLakeSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings: {}
host: onelake.dfs.fabric.microsoft.com
names:
lakehouseName: example-lakehouse
workspaceName: example-workspace
oneLakePathType: Tables
resourceGroupName: rgiotoperations
The fabricOneLakeSettings block configures Fabric connectivity. The names property specifies the workspace and lakehouse names, and the oneLakePathType property determines whether data is written to tables or files. The host property points to the Fabric OneLake endpoint.
Buffer to local persistent storage
Edge deployments with intermittent connectivity buffer messages to local storage before forwarding to cloud endpoints.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "local-storage-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.LocalStorage,
localStorageSettings: {
persistentVolumeClaimRef: "example-pvc",
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="local-storage-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.LOCAL_STORAGE,
"local_storage_settings": {
"persistent_volume_claim_ref": "example-pvc",
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("local-storage-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeLocalStorage),
LocalStorageSettings: &iotoperations.DataflowEndpointLocalStorageArgs{
PersistentVolumeClaimRef: pulumi.String("example-pvc"),
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "local-storage-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.LocalStorage,
LocalStorageSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointLocalStorageArgs
{
PersistentVolumeClaimRef = "example-pvc",
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointLocalStorageArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("local-storage-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("LocalStorage")
.localStorageSettings(DataflowEndpointLocalStorageArgs.builder()
.persistentVolumeClaimRef("example-pvc")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: local-storage-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: LocalStorage
localStorageSettings:
persistentVolumeClaimRef: example-pvc
resourceGroupName: rgiotoperations
The localStorageSettings block references a Kubernetes PersistentVolumeClaim by name. Messages are written to the persistent volume, providing offline resilience when cloud connectivity is unavailable.
Beyond these examples
These snippets focus on specific endpoint-level features: MQTT and Kafka protocol endpoints, Azure service integrations, and authentication methods. They’re intentionally minimal rather than full dataflow pipelines.
The examples reference pre-existing infrastructure such as Azure IoT Operations instances and custom locations, Kubernetes secrets and ConfigMaps for credentials and certificates, Azure resources like storage accounts and Event Hub namespaces, and persistent volume claims for local storage. They focus on configuring the endpoint rather than provisioning the surrounding infrastructure.
To keep things focused, common endpoint patterns are omitted, including:
- CloudEvent attribute handling (cloudEventAttributes property)
- MQTT message retention and QoS tuning
- Kafka partition strategies and acknowledgment modes
- User-assigned managed identity configuration
These omissions are intentional: the goal is to illustrate how each endpoint type is wired, not provide drop-in dataflow modules. See the DataflowEndpoint resource reference for all available configuration options.
Let's configure Azure IoT Operations Dataflow Endpoints
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Endpoint Types & Configuration
DataExplorer (Azure Data Explorer/Kusto), DataLakeStorage (Azure Data Lake Storage Gen2), FabricOneLake (Microsoft Fabric), Kafka (including Azure Event Hub), LocalStorage (persistent volume claims), and Mqtt (MQTT brokers including the built-in AIO broker).extendedLocation property is immutable after creation. While the schema doesn’t explicitly state endpoint type immutability, changing fundamental endpoint configuration typically requires recreating the resource.endpointType to Mqtt, configure host as aio-broker:18883, use Kubernetes authentication with serviceAccountTokenSettings.audience set to aio-internal, and enable TLS with trustedCaCertificateConfigMapRef pointing to aio-ca-trust-bundle-test-only.endpointType of Kafka with kafkaSettings. Set host to your Event Hub namespace (e.g., example.servicebus.windows.net:9093), configure SystemAssignedManagedIdentity authentication, specify a consumerGroupId, and enable TLS.endpointType to DataExplorer and configure dataExplorerSettings with host (your cluster URL like example.westeurope.kusto.windows.net), database name, and SystemAssignedManagedIdentity authentication with optional batching settings.endpointType of FabricOneLake with fabricOneLakeSettings. Configure host as onelake.dfs.fabric.microsoft.com, specify workspaceName and lakehouseName in the names object, set oneLakePathType (Tables or Files), and use SystemAssignedManagedIdentity authentication.Authentication & Security
Authentication methods vary by endpoint type:
- MQTT:
SystemAssignedManagedIdentity,UserAssignedManagedIdentity,Kubernetes(service account tokens),X509Certificate - Kafka:
SystemAssignedManagedIdentity,UserAssignedManagedIdentity,Sasl(Plain, ScramSha256, ScramSha512),X509Certificate - Data Explorer:
SystemAssignedManagedIdentity,UserAssignedManagedIdentity - Data Lake Storage:
SystemAssignedManagedIdentity,UserAssignedManagedIdentity,AccessToken - Fabric OneLake:
SystemAssignedManagedIdentity,UserAssignedManagedIdentity
tls object with mode set to Enabled or Disabled. For custom CA certificates, set trustedCaCertificateConfigMapRef to reference a ConfigMap containing your trusted certificates. TLS is available for MQTT and Kafka endpoints.authentication.method to Sasl and configure saslSettings with saslType (Plain, ScramSha256, or ScramSha512) and secretRef pointing to a Kubernetes secret containing your credentials.Performance & Batching
latencyMs (milliseconds), maxBytes, maxMessages, and a mode setting (Enabled/Disabled). Other endpoints (Data Explorer, Data Lake Storage, Fabric OneLake) use only latencySeconds and maxMessages without a mode setting.compression (None, Gzip, Snappy, Lz4), kafkaAcks acknowledgment levels (Zero, One, All), partitionStrategy (Default, Static, Topic, Property), and copyMqttProperties to preserve MQTT metadata.Protocol-Specific Settings
qos (quality of service level 0-2), retain (Keep or Never), keepAliveSeconds, maxInflightMessages, protocol (Mqtt or WebSockets), sessionExpirySeconds, and clientIdPrefix for client identification.Using a different cloud?
Explore integration guides for other cloud providers: