Create AWS SageMaker Flow Definitions

The aws:sagemaker/flowDefinition:FlowDefinition resource, part of the Pulumi AWS provider, defines a SageMaker AI human review workflow: how tasks are routed to reviewers, when reviews are triggered, and where results are stored. This guide focuses on three capabilities: private and public workteam configuration, automatic triggering from AWS AI services, and task pricing for crowdsourced reviews.

Flow definitions depend on SageMaker workteams, human task UIs, S3 buckets, and IAM roles. The examples are intentionally small. Combine them with your own workteam setup, task templates, and storage infrastructure.

Route human review tasks to a private workteam

Most human-in-the-loop workflows start by defining how tasks are presented to reviewers and where results are stored.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.sagemaker.FlowDefinition("example", {
    flowDefinitionName: "example",
    roleArn: exampleAwsIamRole.arn,
    humanLoopConfig: {
        humanTaskUiArn: exampleAwsSagemakerHumanTaskUi.arn,
        taskAvailabilityLifetimeInSeconds: 1,
        taskCount: 1,
        taskDescription: "example",
        taskTitle: "example",
        workteamArn: exampleAwsSagemakerWorkteam.arn,
    },
    outputConfig: {
        s3OutputPath: `s3://${exampleAwsS3Bucket.bucket}/`,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.sagemaker.FlowDefinition("example",
    flow_definition_name="example",
    role_arn=example_aws_iam_role["arn"],
    human_loop_config={
        "human_task_ui_arn": example_aws_sagemaker_human_task_ui["arn"],
        "task_availability_lifetime_in_seconds": 1,
        "task_count": 1,
        "task_description": "example",
        "task_title": "example",
        "workteam_arn": example_aws_sagemaker_workteam["arn"],
    },
    output_config={
        "s3_output_path": f"s3://{example_aws_s3_bucket['bucket']}/",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sagemaker"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := sagemaker.NewFlowDefinition(ctx, "example", &sagemaker.FlowDefinitionArgs{
			FlowDefinitionName: pulumi.String("example"),
			RoleArn:            pulumi.Any(exampleAwsIamRole.Arn),
			HumanLoopConfig: &sagemaker.FlowDefinitionHumanLoopConfigArgs{
				HumanTaskUiArn:                    pulumi.Any(exampleAwsSagemakerHumanTaskUi.Arn),
				TaskAvailabilityLifetimeInSeconds: pulumi.Int(1),
				TaskCount:                         pulumi.Int(1),
				TaskDescription:                   pulumi.String("example"),
				TaskTitle:                         pulumi.String("example"),
				WorkteamArn:                       pulumi.Any(exampleAwsSagemakerWorkteam.Arn),
			},
			OutputConfig: &sagemaker.FlowDefinitionOutputConfigArgs{
				S3OutputPath: pulumi.Sprintf("s3://%v/", exampleAwsS3Bucket.Bucket),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Sagemaker.FlowDefinition("example", new()
    {
        FlowDefinitionName = "example",
        RoleArn = exampleAwsIamRole.Arn,
        HumanLoopConfig = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopConfigArgs
        {
            HumanTaskUiArn = exampleAwsSagemakerHumanTaskUi.Arn,
            TaskAvailabilityLifetimeInSeconds = 1,
            TaskCount = 1,
            TaskDescription = "example",
            TaskTitle = "example",
            WorkteamArn = exampleAwsSagemakerWorkteam.Arn,
        },
        OutputConfig = new Aws.Sagemaker.Inputs.FlowDefinitionOutputConfigArgs
        {
            S3OutputPath = $"s3://{exampleAwsS3Bucket.Bucket}/",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.sagemaker.FlowDefinition;
import com.pulumi.aws.sagemaker.FlowDefinitionArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopConfigArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionOutputConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new FlowDefinition("example", FlowDefinitionArgs.builder()
            .flowDefinitionName("example")
            .roleArn(exampleAwsIamRole.arn())
            .humanLoopConfig(FlowDefinitionHumanLoopConfigArgs.builder()
                .humanTaskUiArn(exampleAwsSagemakerHumanTaskUi.arn())
                .taskAvailabilityLifetimeInSeconds(1)
                .taskCount(1)
                .taskDescription("example")
                .taskTitle("example")
                .workteamArn(exampleAwsSagemakerWorkteam.arn())
                .build())
            .outputConfig(FlowDefinitionOutputConfigArgs.builder()
                .s3OutputPath(String.format("s3://%s/", exampleAwsS3Bucket.bucket()))
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:sagemaker:FlowDefinition
    properties:
      flowDefinitionName: example
      roleArn: ${exampleAwsIamRole.arn}
      humanLoopConfig:
        humanTaskUiArn: ${exampleAwsSagemakerHumanTaskUi.arn}
        taskAvailabilityLifetimeInSeconds: 1
        taskCount: 1
        taskDescription: example
        taskTitle: example
        workteamArn: ${exampleAwsSagemakerWorkteam.arn}
      outputConfig:
        s3OutputPath: s3://${exampleAwsS3Bucket.bucket}/

The humanLoopConfig property defines the review workflow: which workteam receives tasks (workteamArn), how tasks are presented (humanTaskUiArn), and how many reviews are needed per item (taskCount). The outputConfig property specifies where SageMaker writes review results. The roleArn grants SageMaker permission to write to S3 and manage the workflow.

Use Amazon Mechanical Turk for public crowdsourcing

When you need scale beyond your internal team, Amazon’s public workforce provides access to thousands of workers.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.sagemaker.FlowDefinition("example", {
    flowDefinitionName: "example",
    roleArn: exampleAwsIamRole.arn,
    humanLoopConfig: {
        humanTaskUiArn: exampleAwsSagemakerHumanTaskUi.arn,
        taskAvailabilityLifetimeInSeconds: 1,
        taskCount: 1,
        taskDescription: "example",
        taskTitle: "example",
        workteamArn: `arn:aws:sagemaker:${current.region}:394669845002:workteam/public-crowd/default`,
        publicWorkforceTaskPrice: {
            amountInUsd: {
                cents: 1,
                tenthFractionsOfACent: 2,
            },
        },
    },
    outputConfig: {
        s3OutputPath: `s3://${exampleAwsS3Bucket.bucket}/`,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.sagemaker.FlowDefinition("example",
    flow_definition_name="example",
    role_arn=example_aws_iam_role["arn"],
    human_loop_config={
        "human_task_ui_arn": example_aws_sagemaker_human_task_ui["arn"],
        "task_availability_lifetime_in_seconds": 1,
        "task_count": 1,
        "task_description": "example",
        "task_title": "example",
        "workteam_arn": f"arn:aws:sagemaker:{current['region']}:394669845002:workteam/public-crowd/default",
        "public_workforce_task_price": {
            "amount_in_usd": {
                "cents": 1,
                "tenth_fractions_of_a_cent": 2,
            },
        },
    },
    output_config={
        "s3_output_path": f"s3://{example_aws_s3_bucket['bucket']}/",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sagemaker"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := sagemaker.NewFlowDefinition(ctx, "example", &sagemaker.FlowDefinitionArgs{
			FlowDefinitionName: pulumi.String("example"),
			RoleArn:            pulumi.Any(exampleAwsIamRole.Arn),
			HumanLoopConfig: &sagemaker.FlowDefinitionHumanLoopConfigArgs{
				HumanTaskUiArn:                    pulumi.Any(exampleAwsSagemakerHumanTaskUi.Arn),
				TaskAvailabilityLifetimeInSeconds: pulumi.Int(1),
				TaskCount:                         pulumi.Int(1),
				TaskDescription:                   pulumi.String("example"),
				TaskTitle:                         pulumi.String("example"),
				WorkteamArn:                       pulumi.Sprintf("arn:aws:sagemaker:%v:394669845002:workteam/public-crowd/default", current.Region),
				PublicWorkforceTaskPrice: &sagemaker.FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceArgs{
					AmountInUsd: &sagemaker.FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceAmountInUsdArgs{
						Cents:                 pulumi.Int(1),
						TenthFractionsOfACent: pulumi.Int(2),
					},
				},
			},
			OutputConfig: &sagemaker.FlowDefinitionOutputConfigArgs{
				S3OutputPath: pulumi.Sprintf("s3://%v/", exampleAwsS3Bucket.Bucket),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Sagemaker.FlowDefinition("example", new()
    {
        FlowDefinitionName = "example",
        RoleArn = exampleAwsIamRole.Arn,
        HumanLoopConfig = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopConfigArgs
        {
            HumanTaskUiArn = exampleAwsSagemakerHumanTaskUi.Arn,
            TaskAvailabilityLifetimeInSeconds = 1,
            TaskCount = 1,
            TaskDescription = "example",
            TaskTitle = "example",
            WorkteamArn = $"arn:aws:sagemaker:{current.Region}:394669845002:workteam/public-crowd/default",
            PublicWorkforceTaskPrice = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceArgs
            {
                AmountInUsd = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceAmountInUsdArgs
                {
                    Cents = 1,
                    TenthFractionsOfACent = 2,
                },
            },
        },
        OutputConfig = new Aws.Sagemaker.Inputs.FlowDefinitionOutputConfigArgs
        {
            S3OutputPath = $"s3://{exampleAwsS3Bucket.Bucket}/",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.sagemaker.FlowDefinition;
import com.pulumi.aws.sagemaker.FlowDefinitionArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopConfigArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceAmountInUsdArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionOutputConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new FlowDefinition("example", FlowDefinitionArgs.builder()
            .flowDefinitionName("example")
            .roleArn(exampleAwsIamRole.arn())
            .humanLoopConfig(FlowDefinitionHumanLoopConfigArgs.builder()
                .humanTaskUiArn(exampleAwsSagemakerHumanTaskUi.arn())
                .taskAvailabilityLifetimeInSeconds(1)
                .taskCount(1)
                .taskDescription("example")
                .taskTitle("example")
                .workteamArn(String.format("arn:aws:sagemaker:%s:394669845002:workteam/public-crowd/default", current.region()))
                .publicWorkforceTaskPrice(FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceArgs.builder()
                    .amountInUsd(FlowDefinitionHumanLoopConfigPublicWorkforceTaskPriceAmountInUsdArgs.builder()
                        .cents(1)
                        .tenthFractionsOfACent(2)
                        .build())
                    .build())
                .build())
            .outputConfig(FlowDefinitionOutputConfigArgs.builder()
                .s3OutputPath(String.format("s3://%s/", exampleAwsS3Bucket.bucket()))
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:sagemaker:FlowDefinition
    properties:
      flowDefinitionName: example
      roleArn: ${exampleAwsIamRole.arn}
      humanLoopConfig:
        humanTaskUiArn: ${exampleAwsSagemakerHumanTaskUi.arn}
        taskAvailabilityLifetimeInSeconds: 1
        taskCount: 1
        taskDescription: example
        taskTitle: example
        workteamArn: arn:aws:sagemaker:${current.region}:394669845002:workteam/public-crowd/default
        publicWorkforceTaskPrice:
          amountInUsd:
            cents: 1
            tenthFractionsOfACent: 2
      outputConfig:
        s3OutputPath: s3://${exampleAwsS3Bucket.bucket}/

The workteamArn points to AWS’s managed public workforce. The publicWorkforceTaskPrice property sets compensation: cents and tenthFractionsOfACent combine to define the payment per task (e.g., 1 cent + 2 tenth-fractions = $0.012 per task). This configuration routes tasks to Mechanical Turk workers instead of a private team.

Trigger reviews automatically from AWS AI services

SageMaker can automatically route predictions from Textract or Rekognition to human reviewers when confidence is low or sampling conditions are met.

import * as pulumi from "@pulumi/pulumi";
import * as aws from "@pulumi/aws";

const example = new aws.sagemaker.FlowDefinition("example", {
    flowDefinitionName: "example",
    roleArn: exampleAwsIamRole.arn,
    humanLoopConfig: {
        humanTaskUiArn: exampleAwsSagemakerHumanTaskUi.arn,
        taskAvailabilityLifetimeInSeconds: 1,
        taskCount: 1,
        taskDescription: "example",
        taskTitle: "example",
        workteamArn: exampleAwsSagemakerWorkteam.arn,
    },
    humanLoopRequestSource: {
        awsManagedHumanLoopRequestSource: "AWS/Textract/AnalyzeDocument/Forms/V1",
    },
    humanLoopActivationConfig: {
        humanLoopActivationConditionsConfig: {
            humanLoopActivationConditions: `        {
\\t\\t\\t\\"Conditions\\": [
\\t\\t\\t  {
\\t\\t\\t\\t\\"ConditionType\\": \\"Sampling\\",
\\t\\t\\t\\t\\"ConditionParameters\\": {
\\t\\t\\t\\t  \\"RandomSamplingPercentage\\": 5
\\t\\t\\t\\t}
\\t\\t\\t  }
\\t\\t\\t]
\\t\\t}
`,
        },
    },
    outputConfig: {
        s3OutputPath: `s3://${exampleAwsS3Bucket.bucket}/`,
    },
});
import pulumi
import pulumi_aws as aws

example = aws.sagemaker.FlowDefinition("example",
    flow_definition_name="example",
    role_arn=example_aws_iam_role["arn"],
    human_loop_config={
        "human_task_ui_arn": example_aws_sagemaker_human_task_ui["arn"],
        "task_availability_lifetime_in_seconds": 1,
        "task_count": 1,
        "task_description": "example",
        "task_title": "example",
        "workteam_arn": example_aws_sagemaker_workteam["arn"],
    },
    human_loop_request_source={
        "aws_managed_human_loop_request_source": "AWS/Textract/AnalyzeDocument/Forms/V1",
    },
    human_loop_activation_config={
        "human_loop_activation_conditions_config": {
            "human_loop_activation_conditions": """        {
\t\t\t\"Conditions\": [
\t\t\t  {
\t\t\t\t\"ConditionType\": \"Sampling\",
\t\t\t\t\"ConditionParameters\": {
\t\t\t\t  \"RandomSamplingPercentage\": 5
\t\t\t\t}
\t\t\t  }
\t\t\t]
\t\t}
""",
        },
    },
    output_config={
        "s3_output_path": f"s3://{example_aws_s3_bucket['bucket']}/",
    })
package main

import (
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sagemaker"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := sagemaker.NewFlowDefinition(ctx, "example", &sagemaker.FlowDefinitionArgs{
			FlowDefinitionName: pulumi.String("example"),
			RoleArn:            pulumi.Any(exampleAwsIamRole.Arn),
			HumanLoopConfig: &sagemaker.FlowDefinitionHumanLoopConfigArgs{
				HumanTaskUiArn:                    pulumi.Any(exampleAwsSagemakerHumanTaskUi.Arn),
				TaskAvailabilityLifetimeInSeconds: pulumi.Int(1),
				TaskCount:                         pulumi.Int(1),
				TaskDescription:                   pulumi.String("example"),
				TaskTitle:                         pulumi.String("example"),
				WorkteamArn:                       pulumi.Any(exampleAwsSagemakerWorkteam.Arn),
			},
			HumanLoopRequestSource: &sagemaker.FlowDefinitionHumanLoopRequestSourceArgs{
				AwsManagedHumanLoopRequestSource: pulumi.String("AWS/Textract/AnalyzeDocument/Forms/V1"),
			},
			HumanLoopActivationConfig: &sagemaker.FlowDefinitionHumanLoopActivationConfigArgs{
				HumanLoopActivationConditionsConfig: &sagemaker.FlowDefinitionHumanLoopActivationConfigHumanLoopActivationConditionsConfigArgs{
					HumanLoopActivationConditions: pulumi.String(`        {
\t\t\t\"Conditions\": [
\t\t\t  {
\t\t\t\t\"ConditionType\": \"Sampling\",
\t\t\t\t\"ConditionParameters\": {
\t\t\t\t  \"RandomSamplingPercentage\": 5
\t\t\t\t}
\t\t\t  }
\t\t\t]
\t\t}
`),
				},
			},
			OutputConfig: &sagemaker.FlowDefinitionOutputConfigArgs{
				S3OutputPath: pulumi.Sprintf("s3://%v/", exampleAwsS3Bucket.Bucket),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Aws = Pulumi.Aws;

return await Deployment.RunAsync(() => 
{
    var example = new Aws.Sagemaker.FlowDefinition("example", new()
    {
        FlowDefinitionName = "example",
        RoleArn = exampleAwsIamRole.Arn,
        HumanLoopConfig = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopConfigArgs
        {
            HumanTaskUiArn = exampleAwsSagemakerHumanTaskUi.Arn,
            TaskAvailabilityLifetimeInSeconds = 1,
            TaskCount = 1,
            TaskDescription = "example",
            TaskTitle = "example",
            WorkteamArn = exampleAwsSagemakerWorkteam.Arn,
        },
        HumanLoopRequestSource = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopRequestSourceArgs
        {
            AwsManagedHumanLoopRequestSource = "AWS/Textract/AnalyzeDocument/Forms/V1",
        },
        HumanLoopActivationConfig = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopActivationConfigArgs
        {
            HumanLoopActivationConditionsConfig = new Aws.Sagemaker.Inputs.FlowDefinitionHumanLoopActivationConfigHumanLoopActivationConditionsConfigArgs
            {
                HumanLoopActivationConditions = @"        {
\t\t\t\""Conditions\"": [
\t\t\t  {
\t\t\t\t\""ConditionType\"": \""Sampling\"",
\t\t\t\t\""ConditionParameters\"": {
\t\t\t\t  \""RandomSamplingPercentage\"": 5
\t\t\t\t}
\t\t\t  }
\t\t\t]
\t\t}
",
            },
        },
        OutputConfig = new Aws.Sagemaker.Inputs.FlowDefinitionOutputConfigArgs
        {
            S3OutputPath = $"s3://{exampleAwsS3Bucket.Bucket}/",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.aws.sagemaker.FlowDefinition;
import com.pulumi.aws.sagemaker.FlowDefinitionArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopConfigArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopRequestSourceArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopActivationConfigArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionHumanLoopActivationConfigHumanLoopActivationConditionsConfigArgs;
import com.pulumi.aws.sagemaker.inputs.FlowDefinitionOutputConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var example = new FlowDefinition("example", FlowDefinitionArgs.builder()
            .flowDefinitionName("example")
            .roleArn(exampleAwsIamRole.arn())
            .humanLoopConfig(FlowDefinitionHumanLoopConfigArgs.builder()
                .humanTaskUiArn(exampleAwsSagemakerHumanTaskUi.arn())
                .taskAvailabilityLifetimeInSeconds(1)
                .taskCount(1)
                .taskDescription("example")
                .taskTitle("example")
                .workteamArn(exampleAwsSagemakerWorkteam.arn())
                .build())
            .humanLoopRequestSource(FlowDefinitionHumanLoopRequestSourceArgs.builder()
                .awsManagedHumanLoopRequestSource("AWS/Textract/AnalyzeDocument/Forms/V1")
                .build())
            .humanLoopActivationConfig(FlowDefinitionHumanLoopActivationConfigArgs.builder()
                .humanLoopActivationConditionsConfig(FlowDefinitionHumanLoopActivationConfigHumanLoopActivationConditionsConfigArgs.builder()
                    .humanLoopActivationConditions("""
        {
\t\t\t\"Conditions\": [
\t\t\t  {
\t\t\t\t\"ConditionType\": \"Sampling\",
\t\t\t\t\"ConditionParameters\": {
\t\t\t\t  \"RandomSamplingPercentage\": 5
\t\t\t\t}
\t\t\t  }
\t\t\t]
\t\t}
                    """)
                    .build())
                .build())
            .outputConfig(FlowDefinitionOutputConfigArgs.builder()
                .s3OutputPath(String.format("s3://%s/", exampleAwsS3Bucket.bucket()))
                .build())
            .build());

    }
}
resources:
  example:
    type: aws:sagemaker:FlowDefinition
    properties:
      flowDefinitionName: example
      roleArn: ${exampleAwsIamRole.arn}
      humanLoopConfig:
        humanTaskUiArn: ${exampleAwsSagemakerHumanTaskUi.arn}
        taskAvailabilityLifetimeInSeconds: 1
        taskCount: 1
        taskDescription: example
        taskTitle: example
        workteamArn: ${exampleAwsSagemakerWorkteam.arn}
      humanLoopRequestSource:
        awsManagedHumanLoopRequestSource: AWS/Textract/AnalyzeDocument/Forms/V1
      humanLoopActivationConfig:
        humanLoopActivationConditionsConfig:
          humanLoopActivationConditions: |2
                    {
            \t\t\t\"Conditions\": [
            \t\t\t  {
            \t\t\t\t\"ConditionType\": \"Sampling\",
            \t\t\t\t\"ConditionParameters\": {
            \t\t\t\t  \"RandomSamplingPercentage\": 5
            \t\t\t\t}
            \t\t\t  }
            \t\t\t]
            \t\t}
      outputConfig:
        s3OutputPath: s3://${exampleAwsS3Bucket.bucket}/

The humanLoopRequestSource property connects the flow to an AWS AI service (here, Textract document analysis). The humanLoopActivationConfig property defines when to trigger reviews: this example uses random sampling to send 5% of predictions to human reviewers. The humanLoopActivationConditions property contains a JSON configuration that specifies sampling percentage and condition type.

Beyond these examples

These snippets focus on specific flow definition features: private and public workteam configuration, automatic triggering from AWS AI services, and task pricing and sampling conditions. They’re intentionally minimal rather than full human review systems.

The examples rely on pre-existing infrastructure such as SageMaker workteams and human task UIs, S3 buckets for output storage, and IAM roles with SageMaker and S3 permissions. They focus on configuring the flow definition rather than provisioning the surrounding infrastructure.

To keep things focused, common flow definition patterns are omitted, including:

  • Task availability timeouts and expiration handling
  • Custom task templates and UI customization
  • Advanced activation conditions (confidence thresholds, multiple conditions)
  • KMS encryption for output data

These omissions are intentional: the goal is to illustrate how each flow definition feature is wired, not provide drop-in human review systems. See the SageMaker Flow Definition resource reference for all available configuration options.

Let's create AWS SageMaker Flow Definitions

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Configuration & Immutability
What properties can't be changed after creating a flow definition?
The following properties are immutable and require resource replacement if changed: flowDefinitionName, humanLoopConfig, outputConfig, roleArn, humanLoopActivationConfig, and humanLoopRequestSource.
What's required to create a flow definition?
You must provide flowDefinitionName, roleArn, humanLoopConfig (with task UI, workteam, and task details), and outputConfig (with S3 output path).
Where are human review results stored?
Results are uploaded to the S3 path specified in outputConfig.s3OutputPath, which must be configured at creation time.
Workteam Configuration
How do I use a public workteam instead of a private one?
Set workteamArn to the public workteam ARN format (arn:aws:sagemaker:{region}:394669845002:workteam/public-crowd/default) and configure publicWorkforceTaskPrice with amountInUsd containing cents and tenthFractionsOfACent.
What IAM permissions does the role need?
The role specified in roleArn must have permissions to call other AWS services on your behalf, including access to S3 for output storage and any integrated services like Textract or Rekognition.
Human Loop Activation
How do I integrate with AWS Textract or Rekognition?
Configure humanLoopRequestSource with awsManagedHumanLoopRequestSource set to the appropriate service path (e.g., AWS/Textract/AnalyzeDocument/Forms/V1).
How do I configure conditional human loop activation?
Use humanLoopActivationConfig with humanLoopActivationConditionsConfig containing a JSON string that defines conditions like sampling percentage or confidence thresholds.

Using a different cloud?

Explore analytics guides for other cloud providers: