The azure-native:iotoperations:DataflowEndpoint resource, part of the Pulumi Azure Native provider, defines dataflow endpoints that specify where Azure IoT Operations sends or receives messages: cloud services, message brokers, or local storage. This guide focuses on four capabilities: connecting to the built-in MQTT broker, sending data to Azure storage and analytics services, bridging to Kafka and Event Hubs, and loading data into Microsoft Fabric.
Dataflow endpoints reference Azure services, Kubernetes resources, and managed identities with appropriate permissions. The examples are intentionally small. Combine them with your own IoT Operations instance, destination services, and authentication configuration.
Connect to the built-in Azure IoT Operations broker
IoT Operations deployments typically start by connecting dataflows to the built-in MQTT broker, which serves as the central message hub for device telemetry and commands.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "aio-builtin-broker-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.Mqtt,
mqttSettings: {
authentication: {
method: "Kubernetes",
serviceAccountTokenSettings: {
audience: "aio-internal",
},
},
host: "aio-broker:18883",
tls: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
trustedCaCertificateConfigMapRef: "aio-ca-trust-bundle-test-only",
},
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="aio-builtin-broker-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.MQTT,
"mqtt_settings": {
"authentication": {
"method": "Kubernetes",
"service_account_token_settings": {
"audience": "aio-internal",
},
},
"host": "aio-broker:18883",
"tls": {
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"trusted_ca_certificate_config_map_ref": "aio-ca-trust-bundle-test-only",
},
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("aio-builtin-broker-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeMqtt),
MqttSettings: &iotoperations.DataflowEndpointMqttArgs{
Authentication: &iotoperations.DataflowEndpointMqttAuthenticationArgs{
Method: pulumi.String("Kubernetes"),
ServiceAccountTokenSettings: &iotoperations.DataflowEndpointAuthenticationServiceAccountTokenArgs{
Audience: pulumi.String("aio-internal"),
},
},
Host: pulumi.String("aio-broker:18883"),
Tls: &iotoperations.TlsPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
TrustedCaCertificateConfigMapRef: pulumi.String("aio-ca-trust-bundle-test-only"),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "aio-builtin-broker-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.Mqtt,
MqttSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointMqttAuthenticationArgs
{
Method = "Kubernetes",
ServiceAccountTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs
{
Audience = "aio-internal",
},
},
Host = "aio-broker:18883",
Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
TrustedCaCertificateConfigMapRef = "aio-ca-trust-bundle-test-only",
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointMqttAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationServiceAccountTokenArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("aio-builtin-broker-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("Mqtt")
.mqttSettings(DataflowEndpointMqttArgs.builder()
.authentication(DataflowEndpointMqttAuthenticationArgs.builder()
.method("Kubernetes")
.serviceAccountTokenSettings(DataflowEndpointAuthenticationServiceAccountTokenArgs.builder()
.audience("aio-internal")
.build())
.build())
.host("aio-broker:18883")
.tls(TlsPropertiesArgs.builder()
.mode("Enabled")
.trustedCaCertificateConfigMapRef("aio-ca-trust-bundle-test-only")
.build())
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: aio-builtin-broker-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: Mqtt
mqttSettings:
authentication:
method: Kubernetes
serviceAccountTokenSettings:
audience: aio-internal
host: aio-broker:18883
tls:
mode: Enabled
trustedCaCertificateConfigMapRef: aio-ca-trust-bundle-test-only
resourceGroupName: rgiotoperations
The mqttSettings block configures MQTT protocol details. The authentication method uses Kubernetes service account tokens with the audience set to “aio-internal”. The host property points to the broker service at “aio-broker:18883”, and TLS is enabled with a reference to the CA trust bundle ConfigMap. This configuration allows dataflows to route messages through the local broker without leaving the cluster.
Send data to Azure Data Lake Storage Gen2
Teams building analytics pipelines often land IoT data in ADLS Gen2 for long-term storage and downstream processing.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "adlsv2-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
dataLakeStorageSettings: {
authentication: {
accessTokenSettings: {
secretRef: "my-secret",
},
method: azure_native.iotoperations.DataLakeStorageAuthMethod.AccessToken,
},
host: "example.blob.core.windows.net",
},
endpointType: azure_native.iotoperations.EndpointType.DataLakeStorage,
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="adlsv2-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"data_lake_storage_settings": {
"authentication": {
"access_token_settings": {
"secret_ref": "my-secret",
},
"method": azure_native.iotoperations.DataLakeStorageAuthMethod.ACCESS_TOKEN,
},
"host": "example.blob.core.windows.net",
},
"endpoint_type": azure_native.iotoperations.EndpointType.DATA_LAKE_STORAGE,
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("adlsv2-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
DataLakeStorageSettings: &iotoperations.DataflowEndpointDataLakeStorageArgs{
Authentication: &iotoperations.DataflowEndpointDataLakeStorageAuthenticationArgs{
AccessTokenSettings: &iotoperations.DataflowEndpointAuthenticationAccessTokenArgs{
SecretRef: pulumi.String("my-secret"),
},
Method: pulumi.String(iotoperations.DataLakeStorageAuthMethodAccessToken),
},
Host: pulumi.String("example.blob.core.windows.net"),
},
EndpointType: pulumi.String(iotoperations.EndpointTypeDataLakeStorage),
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "adlsv2-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
DataLakeStorageSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataLakeStorageAuthenticationArgs
{
AccessTokenSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationAccessTokenArgs
{
SecretRef = "my-secret",
},
Method = AzureNative.IoTOperations.DataLakeStorageAuthMethod.AccessToken,
},
Host = "example.blob.core.windows.net",
},
EndpointType = AzureNative.IoTOperations.EndpointType.DataLakeStorage,
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataLakeStorageAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationAccessTokenArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("adlsv2-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.dataLakeStorageSettings(DataflowEndpointDataLakeStorageArgs.builder()
.authentication(DataflowEndpointDataLakeStorageAuthenticationArgs.builder()
.accessTokenSettings(DataflowEndpointAuthenticationAccessTokenArgs.builder()
.secretRef("my-secret")
.build())
.method("AccessToken")
.build())
.host("example.blob.core.windows.net")
.build())
.endpointType("DataLakeStorage")
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: adlsv2-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
dataLakeStorageSettings:
authentication:
accessTokenSettings:
secretRef: my-secret
method: AccessToken
host: example.blob.core.windows.net
endpointType: DataLakeStorage
resourceGroupName: rgiotoperations
The dataLakeStorageSettings block specifies the storage account host and authentication method. When endpointType is set to “DataLakeStorage”, the dataflow writes telemetry to blob storage. The authentication uses a system-assigned managed identity with an access token, requiring the identity to have Storage Blob Data Contributor permissions on the target account.
Stream data into Azure Data Explorer
Real-time analytics workloads use Azure Data Explorer to query high-volume time-series data with low latency.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "adx-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
dataExplorerSettings: {
authentication: {
method: "SystemAssignedManagedIdentity",
systemAssignedManagedIdentitySettings: {},
},
batching: {
latencySeconds: 9312,
maxMessages: 9028,
},
database: "example-database",
host: "example.westeurope.kusto.windows.net",
},
endpointType: azure_native.iotoperations.EndpointType.DataExplorer,
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="adx-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"data_explorer_settings": {
"authentication": {
"method": "SystemAssignedManagedIdentity",
"system_assigned_managed_identity_settings": {},
},
"batching": {
"latency_seconds": 9312,
"max_messages": 9028,
},
"database": "example-database",
"host": "example.westeurope.kusto.windows.net",
},
"endpoint_type": azure_native.iotoperations.EndpointType.DATA_EXPLORER,
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("adx-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
DataExplorerSettings: &iotoperations.DataflowEndpointDataExplorerArgs{
Authentication: &iotoperations.DataflowEndpointDataExplorerAuthenticationArgs{
Method: pulumi.Any("SystemAssignedManagedIdentity"),
SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
},
Batching: &iotoperations.BatchingConfigurationArgs{
LatencySeconds: pulumi.Int(9312),
MaxMessages: pulumi.Int(9028),
},
Database: pulumi.String("example-database"),
Host: pulumi.String("example.westeurope.kusto.windows.net"),
},
EndpointType: pulumi.String(iotoperations.EndpointTypeDataExplorer),
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "adx-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
DataExplorerSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointDataExplorerAuthenticationArgs
{
Method = "SystemAssignedManagedIdentity",
SystemAssignedManagedIdentitySettings = null,
},
Batching = new AzureNative.IoTOperations.Inputs.BatchingConfigurationArgs
{
LatencySeconds = 9312,
MaxMessages = 9028,
},
Database = "example-database",
Host = "example.westeurope.kusto.windows.net",
},
EndpointType = AzureNative.IoTOperations.EndpointType.DataExplorer,
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointDataExplorerAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.BatchingConfigurationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("adx-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.dataExplorerSettings(DataflowEndpointDataExplorerArgs.builder()
.authentication(DataflowEndpointDataExplorerAuthenticationArgs.builder()
.method("SystemAssignedManagedIdentity")
.systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
.build())
.build())
.batching(BatchingConfigurationArgs.builder()
.latencySeconds(9312)
.maxMessages(9028)
.build())
.database("example-database")
.host("example.westeurope.kusto.windows.net")
.build())
.endpointType("DataExplorer")
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: adx-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
dataExplorerSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings: {}
batching:
latencySeconds: 9312
maxMessages: 9028
database: example-database
host: example.westeurope.kusto.windows.net
endpointType: DataExplorer
resourceGroupName: rgiotoperations
The dataExplorerSettings block defines the ADX cluster host and target database. The batching configuration controls ingestion efficiency by grouping messages before sending them to ADX. With latencySeconds set to 9312 and maxMessages to 9028, the endpoint balances throughput and freshness. The authentication method uses system-assigned managed identity, which must have database ingestor permissions.
Forward messages to Azure Event Hubs
Event Hubs serves as a scalable ingestion layer for streaming architectures, connecting IoT data to downstream consumers.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "event-hub-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.Kafka,
kafkaSettings: {
authentication: {
method: azure_native.iotoperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
systemAssignedManagedIdentitySettings: {},
},
consumerGroupId: "aiodataflows",
host: "example.servicebus.windows.net:9093",
tls: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
},
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="event-hub-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
"kafka_settings": {
"authentication": {
"method": azure_native.iotoperations.KafkaAuthMethod.SYSTEM_ASSIGNED_MANAGED_IDENTITY,
"system_assigned_managed_identity_settings": {},
},
"consumer_group_id": "aiodataflows",
"host": "example.servicebus.windows.net:9093",
"tls": {
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
},
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("event-hub-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
Method: pulumi.String(iotoperations.KafkaAuthMethodSystemAssignedManagedIdentity),
SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
},
ConsumerGroupId: pulumi.String("aiodataflows"),
Host: pulumi.String("example.servicebus.windows.net:9093"),
Tls: &iotoperations.TlsPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "event-hub-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
{
Method = AzureNative.IoTOperations.KafkaAuthMethod.SystemAssignedManagedIdentity,
SystemAssignedManagedIdentitySettings = null,
},
ConsumerGroupId = "aiodataflows",
Host = "example.servicebus.windows.net:9093",
Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("event-hub-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("Kafka")
.kafkaSettings(DataflowEndpointKafkaArgs.builder()
.authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
.method("SystemAssignedManagedIdentity")
.systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
.build())
.build())
.consumerGroupId("aiodataflows")
.host("example.servicebus.windows.net:9093")
.tls(TlsPropertiesArgs.builder()
.mode("Enabled")
.build())
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: event-hub-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: Kafka
kafkaSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings: {}
consumerGroupId: aiodataflows
host: example.servicebus.windows.net:9093
tls:
mode: Enabled
resourceGroupName: rgiotoperations
The kafkaSettings block configures Event Hubs using its Kafka-compatible endpoint. The host property points to the Event Hubs namespace at port 9093, and consumerGroupId specifies which consumer group receives the messages. TLS is enabled for secure communication. The authentication uses system-assigned managed identity, requiring Event Hubs Data Sender role on the namespace.
Integrate with external Kafka clusters
Organizations with existing Kafka infrastructure can route IoT data to on-premises or cloud-hosted clusters.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "generic-kafka-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.Kafka,
kafkaSettings: {
authentication: {
method: azure_native.iotoperations.KafkaAuthMethod.Sasl,
saslSettings: {
saslType: azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.Plain,
secretRef: "my-secret",
},
},
batching: {
latencyMs: 5,
maxBytes: 1000000,
maxMessages: 100000,
mode: azure_native.iotoperations.OperationalMode.Enabled,
},
cloudEventAttributes: azure_native.iotoperations.CloudEventAttributeType.Propagate,
compression: azure_native.iotoperations.DataflowEndpointKafkaCompression.Gzip,
consumerGroupId: "dataflows",
copyMqttProperties: azure_native.iotoperations.OperationalMode.Enabled,
host: "example.kafka.local:9093",
kafkaAcks: azure_native.iotoperations.DataflowEndpointKafkaAcks.All,
partitionStrategy: azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.Default,
tls: {
mode: azure_native.iotoperations.OperationalMode.Enabled,
trustedCaCertificateConfigMapRef: "ca-certificates",
},
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="generic-kafka-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.KAFKA,
"kafka_settings": {
"authentication": {
"method": azure_native.iotoperations.KafkaAuthMethod.SASL,
"sasl_settings": {
"sasl_type": azure_native.iotoperations.DataflowEndpointAuthenticationSaslType.PLAIN,
"secret_ref": "my-secret",
},
},
"batching": {
"latency_ms": 5,
"max_bytes": 1000000,
"max_messages": 100000,
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
},
"cloud_event_attributes": azure_native.iotoperations.CloudEventAttributeType.PROPAGATE,
"compression": azure_native.iotoperations.DataflowEndpointKafkaCompression.GZIP,
"consumer_group_id": "dataflows",
"copy_mqtt_properties": azure_native.iotoperations.OperationalMode.ENABLED,
"host": "example.kafka.local:9093",
"kafka_acks": azure_native.iotoperations.DataflowEndpointKafkaAcks.ALL,
"partition_strategy": azure_native.iotoperations.DataflowEndpointKafkaPartitionStrategy.DEFAULT,
"tls": {
"mode": azure_native.iotoperations.OperationalMode.ENABLED,
"trusted_ca_certificate_config_map_ref": "ca-certificates",
},
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("generic-kafka-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeKafka),
KafkaSettings: &iotoperations.DataflowEndpointKafkaArgs{
Authentication: &iotoperations.DataflowEndpointKafkaAuthenticationArgs{
Method: pulumi.String(iotoperations.KafkaAuthMethodSasl),
SaslSettings: &iotoperations.DataflowEndpointAuthenticationSaslArgs{
SaslType: pulumi.String(iotoperations.DataflowEndpointAuthenticationSaslTypePlain),
SecretRef: pulumi.String("my-secret"),
},
},
Batching: &iotoperations.DataflowEndpointKafkaBatchingArgs{
LatencyMs: pulumi.Int(5),
MaxBytes: pulumi.Int(1000000),
MaxMessages: pulumi.Int(100000),
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
},
CloudEventAttributes: pulumi.String(iotoperations.CloudEventAttributeTypePropagate),
Compression: pulumi.String(iotoperations.DataflowEndpointKafkaCompressionGzip),
ConsumerGroupId: pulumi.String("dataflows"),
CopyMqttProperties: pulumi.String(iotoperations.OperationalModeEnabled),
Host: pulumi.String("example.kafka.local:9093"),
KafkaAcks: pulumi.String(iotoperations.DataflowEndpointKafkaAcksAll),
PartitionStrategy: pulumi.String(iotoperations.DataflowEndpointKafkaPartitionStrategyDefault),
Tls: &iotoperations.TlsPropertiesArgs{
Mode: pulumi.String(iotoperations.OperationalModeEnabled),
TrustedCaCertificateConfigMapRef: pulumi.String("ca-certificates"),
},
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "generic-kafka-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.Kafka,
KafkaSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaAuthenticationArgs
{
Method = AzureNative.IoTOperations.KafkaAuthMethod.Sasl,
SaslSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointAuthenticationSaslArgs
{
SaslType = AzureNative.IoTOperations.DataflowEndpointAuthenticationSaslType.Plain,
SecretRef = "my-secret",
},
},
Batching = new AzureNative.IoTOperations.Inputs.DataflowEndpointKafkaBatchingArgs
{
LatencyMs = 5,
MaxBytes = 1000000,
MaxMessages = 100000,
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
},
CloudEventAttributes = AzureNative.IoTOperations.CloudEventAttributeType.Propagate,
Compression = AzureNative.IoTOperations.DataflowEndpointKafkaCompression.Gzip,
ConsumerGroupId = "dataflows",
CopyMqttProperties = AzureNative.IoTOperations.OperationalMode.Enabled,
Host = "example.kafka.local:9093",
KafkaAcks = AzureNative.IoTOperations.DataflowEndpointKafkaAcks.All,
PartitionStrategy = AzureNative.IoTOperations.DataflowEndpointKafkaPartitionStrategy.Default,
Tls = new AzureNative.IoTOperations.Inputs.TlsPropertiesArgs
{
Mode = AzureNative.IoTOperations.OperationalMode.Enabled,
TrustedCaCertificateConfigMapRef = "ca-certificates",
},
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSaslArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointKafkaBatchingArgs;
import com.pulumi.azurenative.iotoperations.inputs.TlsPropertiesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("generic-kafka-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("Kafka")
.kafkaSettings(DataflowEndpointKafkaArgs.builder()
.authentication(DataflowEndpointKafkaAuthenticationArgs.builder()
.method("Sasl")
.saslSettings(DataflowEndpointAuthenticationSaslArgs.builder()
.saslType("Plain")
.secretRef("my-secret")
.build())
.build())
.batching(DataflowEndpointKafkaBatchingArgs.builder()
.latencyMs(5)
.maxBytes(1000000)
.maxMessages(100000)
.mode("Enabled")
.build())
.cloudEventAttributes("Propagate")
.compression("Gzip")
.consumerGroupId("dataflows")
.copyMqttProperties("Enabled")
.host("example.kafka.local:9093")
.kafkaAcks("All")
.partitionStrategy("Default")
.tls(TlsPropertiesArgs.builder()
.mode("Enabled")
.trustedCaCertificateConfigMapRef("ca-certificates")
.build())
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: generic-kafka-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: Kafka
kafkaSettings:
authentication:
method: Sasl
saslSettings:
saslType: Plain
secretRef: my-secret
batching:
latencyMs: 5
maxBytes: 1e+06
maxMessages: 100000
mode: Enabled
cloudEventAttributes: Propagate
compression: Gzip
consumerGroupId: dataflows
copyMqttProperties: Enabled
host: example.kafka.local:9093
kafkaAcks: All
partitionStrategy: Default
tls:
mode: Enabled
trustedCaCertificateConfigMapRef: ca-certificates
resourceGroupName: rgiotoperations
The kafkaSettings block includes comprehensive Kafka configuration. The authentication method uses SASL with credentials stored in a Kubernetes secret. The batching configuration optimizes throughput with latencyMs, maxBytes, and maxMessages settings. The compression property is set to “Gzip” to reduce network bandwidth. The kafkaAcks property controls durability, with “All” requiring acknowledgment from all in-sync replicas. The partitionStrategy determines how messages are distributed across Kafka partitions.
Load data into Microsoft Fabric OneLake
Microsoft Fabric provides unified analytics across data engineering, science, and business intelligence.
import * as pulumi from "@pulumi/pulumi";
import * as azure_native from "@pulumi/azure-native";
const dataflowEndpoint = new azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint", {
dataflowEndpointName: "fabric-endpoint",
extendedLocation: {
name: "qmbrfwcpwwhggszhrdjv",
type: azure_native.iotoperations.ExtendedLocationType.CustomLocation,
},
instanceName: "resource-name123",
properties: {
endpointType: azure_native.iotoperations.EndpointType.FabricOneLake,
fabricOneLakeSettings: {
authentication: {
method: "SystemAssignedManagedIdentity",
systemAssignedManagedIdentitySettings: {},
},
host: "onelake.dfs.fabric.microsoft.com",
names: {
lakehouseName: "example-lakehouse",
workspaceName: "example-workspace",
},
oneLakePathType: azure_native.iotoperations.DataflowEndpointFabricPathType.Tables,
},
},
resourceGroupName: "rgiotoperations",
});
import pulumi
import pulumi_azure_native as azure_native
dataflow_endpoint = azure_native.iotoperations.DataflowEndpoint("dataflowEndpoint",
dataflow_endpoint_name="fabric-endpoint",
extended_location={
"name": "qmbrfwcpwwhggszhrdjv",
"type": azure_native.iotoperations.ExtendedLocationType.CUSTOM_LOCATION,
},
instance_name="resource-name123",
properties={
"endpoint_type": azure_native.iotoperations.EndpointType.FABRIC_ONE_LAKE,
"fabric_one_lake_settings": {
"authentication": {
"method": "SystemAssignedManagedIdentity",
"system_assigned_managed_identity_settings": {},
},
"host": "onelake.dfs.fabric.microsoft.com",
"names": {
"lakehouse_name": "example-lakehouse",
"workspace_name": "example-workspace",
},
"one_lake_path_type": azure_native.iotoperations.DataflowEndpointFabricPathType.TABLES,
},
},
resource_group_name="rgiotoperations")
package main
import (
iotoperations "github.com/pulumi/pulumi-azure-native-sdk/iotoperations/v3"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := iotoperations.NewDataflowEndpoint(ctx, "dataflowEndpoint", &iotoperations.DataflowEndpointArgs{
DataflowEndpointName: pulumi.String("fabric-endpoint"),
ExtendedLocation: &iotoperations.ExtendedLocationArgs{
Name: pulumi.String("qmbrfwcpwwhggszhrdjv"),
Type: pulumi.String(iotoperations.ExtendedLocationTypeCustomLocation),
},
InstanceName: pulumi.String("resource-name123"),
Properties: &iotoperations.DataflowEndpointPropertiesArgs{
EndpointType: pulumi.String(iotoperations.EndpointTypeFabricOneLake),
FabricOneLakeSettings: &iotoperations.DataflowEndpointFabricOneLakeArgs{
Authentication: &iotoperations.DataflowEndpointFabricOneLakeAuthenticationArgs{
Method: pulumi.Any("SystemAssignedManagedIdentity"),
SystemAssignedManagedIdentitySettings: &iotoperations.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs{},
},
Host: pulumi.String("onelake.dfs.fabric.microsoft.com"),
Names: &iotoperations.DataflowEndpointFabricOneLakeNamesArgs{
LakehouseName: pulumi.String("example-lakehouse"),
WorkspaceName: pulumi.String("example-workspace"),
},
OneLakePathType: pulumi.String(iotoperations.DataflowEndpointFabricPathTypeTables),
},
},
ResourceGroupName: pulumi.String("rgiotoperations"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using AzureNative = Pulumi.AzureNative;
return await Deployment.RunAsync(() =>
{
var dataflowEndpoint = new AzureNative.IoTOperations.DataflowEndpoint("dataflowEndpoint", new()
{
DataflowEndpointName = "fabric-endpoint",
ExtendedLocation = new AzureNative.IoTOperations.Inputs.ExtendedLocationArgs
{
Name = "qmbrfwcpwwhggszhrdjv",
Type = AzureNative.IoTOperations.ExtendedLocationType.CustomLocation,
},
InstanceName = "resource-name123",
Properties = new AzureNative.IoTOperations.Inputs.DataflowEndpointPropertiesArgs
{
EndpointType = AzureNative.IoTOperations.EndpointType.FabricOneLake,
FabricOneLakeSettings = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeArgs
{
Authentication = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeAuthenticationArgs
{
Method = "SystemAssignedManagedIdentity",
SystemAssignedManagedIdentitySettings = null,
},
Host = "onelake.dfs.fabric.microsoft.com",
Names = new AzureNative.IoTOperations.Inputs.DataflowEndpointFabricOneLakeNamesArgs
{
LakehouseName = "example-lakehouse",
WorkspaceName = "example-workspace",
},
OneLakePathType = AzureNative.IoTOperations.DataflowEndpointFabricPathType.Tables,
},
},
ResourceGroupName = "rgiotoperations",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.azurenative.iotoperations.DataflowEndpoint;
import com.pulumi.azurenative.iotoperations.DataflowEndpointArgs;
import com.pulumi.azurenative.iotoperations.inputs.ExtendedLocationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointPropertiesArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeAuthenticationArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs;
import com.pulumi.azurenative.iotoperations.inputs.DataflowEndpointFabricOneLakeNamesArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var dataflowEndpoint = new DataflowEndpoint("dataflowEndpoint", DataflowEndpointArgs.builder()
.dataflowEndpointName("fabric-endpoint")
.extendedLocation(ExtendedLocationArgs.builder()
.name("qmbrfwcpwwhggszhrdjv")
.type("CustomLocation")
.build())
.instanceName("resource-name123")
.properties(DataflowEndpointPropertiesArgs.builder()
.endpointType("FabricOneLake")
.fabricOneLakeSettings(DataflowEndpointFabricOneLakeArgs.builder()
.authentication(DataflowEndpointFabricOneLakeAuthenticationArgs.builder()
.method("SystemAssignedManagedIdentity")
.systemAssignedManagedIdentitySettings(DataflowEndpointAuthenticationSystemAssignedManagedIdentityArgs.builder()
.build())
.build())
.host("onelake.dfs.fabric.microsoft.com")
.names(DataflowEndpointFabricOneLakeNamesArgs.builder()
.lakehouseName("example-lakehouse")
.workspaceName("example-workspace")
.build())
.oneLakePathType("Tables")
.build())
.build())
.resourceGroupName("rgiotoperations")
.build());
}
}
resources:
dataflowEndpoint:
type: azure-native:iotoperations:DataflowEndpoint
properties:
dataflowEndpointName: fabric-endpoint
extendedLocation:
name: qmbrfwcpwwhggszhrdjv
type: CustomLocation
instanceName: resource-name123
properties:
endpointType: FabricOneLake
fabricOneLakeSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings: {}
host: onelake.dfs.fabric.microsoft.com
names:
lakehouseName: example-lakehouse
workspaceName: example-workspace
oneLakePathType: Tables
resourceGroupName: rgiotoperations
The fabricOneLakeSettings block specifies the OneLake host and target workspace. The names property identifies the specific lakehouse and workspace where data will be written. The oneLakePathType property is set to “Tables”, directing the dataflow to write into lakehouse tables rather than files. The authentication uses system-assigned managed identity, which must have contributor permissions on the Fabric workspace.
Beyond these examples
These snippets focus on specific endpoint-level features: cloud destination endpoints, protocol bridges, and authentication methods. They’re intentionally minimal rather than full dataflow configurations.
The examples reference pre-existing infrastructure such as Azure IoT Operations instance and custom location, destination services, Kubernetes secrets and ConfigMaps for credentials and certificates, and managed identities with appropriate role assignments. They focus on configuring the endpoint rather than provisioning the surrounding infrastructure.
To keep things focused, common endpoint patterns are omitted, including:
- Local storage endpoints for edge buffering (localStorageSettings)
- Event Grid MQTT endpoints for pub/sub scenarios
- Generic MQTT broker endpoints with X.509 authentication
- CloudEvent attribute handling and MQTT property propagation
- Advanced batching tuning (latency vs throughput tradeoffs)
- Partition strategies and acknowledgment modes for Kafka
These omissions are intentional: the goal is to illustrate how each endpoint type is wired, not provide drop-in dataflow modules. See the DataflowEndpoint resource reference for all available configuration options.
Let's configure Azure IoT Operations Dataflow Endpoints
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Endpoint Types & Selection
DataExplorer (Azure Data Explorer), DataLakeStorage (ADLS Gen2), FabricOneLake (Microsoft Fabric), Kafka (Event Hub or generic Kafka), LocalStorage (persistent volumes), and Mqtt (Event Grid or generic MQTT brokers).Kafka for Azure Event Hub or Apache Kafka brokers with features like consumer groups, partitioning, and compression. Use Mqtt for Azure Event Grid MQTT broker, the built-in AIO broker, or generic MQTT brokers with QoS and retain settings.endpointType and other core properties like dataflowEndpointName, instanceName, and extendedLocation are immutable and require resource replacement to change.Authentication & Security
Authentication methods vary by endpoint type:
- Managed Identity:
SystemAssignedManagedIdentityorUserAssignedManagedIdentity(most endpoints) - Token-based:
AccessToken(Data Lake Storage),Kubernetesservice account tokens (AIO broker) - Certificate:
X509Certificate(MQTT, Kafka) - SASL:
Saslwith Plain or ScramSha256/ScramSha512 (Kafka)
tls.mode to Enabled and optionally specify trustedCaCertificateConfigMapRef pointing to a ConfigMap with CA certificates. For the AIO broker, use aio-ca-trust-bundle-test-only. Set mode to Disabled only for local development.SystemAssignedManagedIdentity authentication with the Kafka endpoint type. Set host to your Event Hub namespace (e.g., example.servicebus.windows.net:9093), enable TLS, and specify a consumerGroupId.Mqtt endpoint type with Kubernetes authentication method. Configure serviceAccountTokenSettings with audience set to aio-internal, host as aio-broker:18883, and enable TLS with trustedCaCertificateConfigMapRef set to aio-ca-trust-bundle-test-only.Performance & Batching
Batching configuration depends on endpoint type:
- Most endpoints: Use
latencySeconds(delay before sending) andmaxMessages(batch size) - Kafka: Use
latencyMs(milliseconds),maxBytes(size limit),maxMessages, and setmodetoEnabled
compression (None, Gzip, Snappy, Lz4), kafkaAcks (Zero, One, All for durability), partitionStrategy (Default, Static, Topic, Property), and copyMqttProperties to preserve MQTT metadata.Endpoint-Specific Configuration
endpointType to DataExplorer, configure host as <cluster>.<region>.kusto.windows.net, specify the target database, and use managed identity authentication. Optionally configure batching with latencySeconds and maxMessages.FabricOneLake endpoint type with host set to onelake.dfs.fabric.microsoft.com. Specify names.workspaceName and names.lakehouseName, choose oneLakePathType (Files or Tables), and use managed identity authentication.qos (0, 1, 2), retain (Keep or Never), keepAliveSeconds, sessionExpirySeconds, maxInflightMessages, protocol (Mqtt or WebSockets), and clientIdPrefix for connection identification.endpointType to LocalStorage and configure persistentVolumeClaimRef pointing to an existing Kubernetes PersistentVolumeClaim for storing dataflow data locally.Using a different cloud?
Explore integration guides for other cloud providers: