Create GCP Dataplex Tasks

The gcp:dataplex/task:Task resource, part of the Pulumi GCP provider, defines scheduled data processing jobs in Dataplex: their code, execution environment, and trigger configuration. This guide focuses on three capabilities: recurring schedule configuration, Spark job infrastructure, and notebook execution.

Tasks run within Dataplex lakes and reference GCS artifacts, VPC networks, and service accounts that must exist separately. The examples are intentionally small. Combine them with your own lake infrastructure, networking, and IAM configuration.

Run Python scripts on a recurring schedule

Data pipelines often execute Python scripts at regular intervals for ETL, quality checks, or transformations.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const project = gcp.organizations.getProject({});
const example = new gcp.dataplex.Lake("example", {
    name: "tf-test-lake_56529",
    location: "us-central1",
    project: "my-project-name",
});
const exampleTask = new gcp.dataplex.Task("example", {
    taskId: "tf-test-task_75413",
    location: "us-central1",
    lake: example.name,
    description: "Test Task Basic",
    displayName: "task-basic",
    labels: {
        count: "3",
    },
    triggerSpec: {
        type: "RECURRING",
        disabled: false,
        maxRetries: 3,
        startTime: "2023-10-02T15:01:23Z",
        schedule: "1 * * * *",
    },
    executionSpec: {
        serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
        project: "my-project-name",
        maxJobExecutionLifetime: "100s",
        kmsKey: "234jn2kjn42k3n423",
    },
    spark: {
        pythonScriptFile: "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
    },
    project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp

project = gcp.organizations.get_project()
example = gcp.dataplex.Lake("example",
    name="tf-test-lake_56529",
    location="us-central1",
    project="my-project-name")
example_task = gcp.dataplex.Task("example",
    task_id="tf-test-task_75413",
    location="us-central1",
    lake=example.name,
    description="Test Task Basic",
    display_name="task-basic",
    labels={
        "count": "3",
    },
    trigger_spec={
        "type": "RECURRING",
        "disabled": False,
        "max_retries": 3,
        "start_time": "2023-10-02T15:01:23Z",
        "schedule": "1 * * * *",
    },
    execution_spec={
        "service_account": f"{project.number}-compute@developer.gserviceaccount.com",
        "project": "my-project-name",
        "max_job_execution_lifetime": "100s",
        "kms_key": "234jn2kjn42k3n423",
    },
    spark={
        "python_script_file": "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
    },
    project="my-project-name")
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		example, err := dataplex.NewLake(ctx, "example", &dataplex.LakeArgs{
			Name:     pulumi.String("tf-test-lake_56529"),
			Location: pulumi.String("us-central1"),
			Project:  pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		_, err = dataplex.NewTask(ctx, "example", &dataplex.TaskArgs{
			TaskId:      pulumi.String("tf-test-task_75413"),
			Location:    pulumi.String("us-central1"),
			Lake:        example.Name,
			Description: pulumi.String("Test Task Basic"),
			DisplayName: pulumi.String("task-basic"),
			Labels: pulumi.StringMap{
				"count": pulumi.String("3"),
			},
			TriggerSpec: &dataplex.TaskTriggerSpecArgs{
				Type:       pulumi.String("RECURRING"),
				Disabled:   pulumi.Bool(false),
				MaxRetries: pulumi.Int(3),
				StartTime:  pulumi.String("2023-10-02T15:01:23Z"),
				Schedule:   pulumi.String("1 * * * *"),
			},
			ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
				ServiceAccount:          pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
				Project:                 pulumi.String("my-project-name"),
				MaxJobExecutionLifetime: pulumi.String("100s"),
				KmsKey:                  pulumi.String("234jn2kjn42k3n423"),
			},
			Spark: &dataplex.TaskSparkArgs{
				PythonScriptFile: pulumi.String("gs://dataproc-examples/pyspark/hello-world/hello-world.py"),
			},
			Project: pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.DataPlex.Lake("example", new()
    {
        Name = "tf-test-lake_56529",
        Location = "us-central1",
        Project = "my-project-name",
    });

    var exampleTask = new Gcp.DataPlex.Task("example", new()
    {
        TaskId = "tf-test-task_75413",
        Location = "us-central1",
        Lake = example.Name,
        Description = "Test Task Basic",
        DisplayName = "task-basic",
        Labels = 
        {
            { "count", "3" },
        },
        TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
        {
            Type = "RECURRING",
            Disabled = false,
            MaxRetries = 3,
            StartTime = "2023-10-02T15:01:23Z",
            Schedule = "1 * * * *",
        },
        ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
        {
            ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
            Project = "my-project-name",
            MaxJobExecutionLifetime = "100s",
            KmsKey = "234jn2kjn42k3n423",
        },
        Spark = new Gcp.DataPlex.Inputs.TaskSparkArgs
        {
            PythonScriptFile = "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
        },
        Project = "my-project-name",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Lake("example", LakeArgs.builder()
            .name("tf-test-lake_56529")
            .location("us-central1")
            .project("my-project-name")
            .build());

        var exampleTask = new Task("exampleTask", TaskArgs.builder()
            .taskId("tf-test-task_75413")
            .location("us-central1")
            .lake(example.name())
            .description("Test Task Basic")
            .displayName("task-basic")
            .labels(Map.of("count", "3"))
            .triggerSpec(TaskTriggerSpecArgs.builder()
                .type("RECURRING")
                .disabled(false)
                .maxRetries(3)
                .startTime("2023-10-02T15:01:23Z")
                .schedule("1 * * * *")
                .build())
            .executionSpec(TaskExecutionSpecArgs.builder()
                .serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
                .project("my-project-name")
                .maxJobExecutionLifetime("100s")
                .kmsKey("234jn2kjn42k3n423")
                .build())
            .spark(TaskSparkArgs.builder()
                .pythonScriptFile("gs://dataproc-examples/pyspark/hello-world/hello-world.py")
                .build())
            .project("my-project-name")
            .build());

    }
}
resources:
  example:
    type: gcp:dataplex:Lake
    properties:
      name: tf-test-lake_56529
      location: us-central1
      project: my-project-name
  exampleTask:
    type: gcp:dataplex:Task
    name: example
    properties:
      taskId: tf-test-task_75413
      location: us-central1
      lake: ${example.name}
      description: Test Task Basic
      displayName: task-basic
      labels:
        count: '3'
      triggerSpec:
        type: RECURRING
        disabled: false
        maxRetries: 3
        startTime: 2023-10-02T15:01:23Z
        schedule: 1 * * * *
      executionSpec:
        serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
        project: my-project-name
        maxJobExecutionLifetime: 100s
        kmsKey: 234jn2kjn42k3n423
      spark:
        pythonScriptFile: gs://dataproc-examples/pyspark/hello-world/hello-world.py
      project: my-project-name
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The triggerSpec defines when the task runs. Setting type to “RECURRING” enables cron-style scheduling via the schedule property (“1 * * * *” means hourly at minute 1). The maxRetries property controls automatic retry attempts on failure. The executionSpec specifies the service account that runs the job, while the spark block points to your Python script in GCS.

Configure Spark infrastructure and dependencies

Production Spark jobs need control over executor counts, container images, and network placement.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

// VPC network
const _default = new gcp.compute.Network("default", {
    name: "tf-test-workstation-cluster_55138",
    autoCreateSubnetworks: true,
});
const project = gcp.organizations.getProject({});
const exampleSpark = new gcp.dataplex.Lake("example_spark", {
    name: "tf-test-lake_37559",
    location: "us-central1",
    project: "my-project-name",
});
const exampleSparkTask = new gcp.dataplex.Task("example_spark", {
    taskId: "tf-test-task_91980",
    location: "us-central1",
    lake: exampleSpark.name,
    triggerSpec: {
        type: "ON_DEMAND",
    },
    description: "task-spark-terraform",
    executionSpec: {
        serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
        args: {
            TASK_ARGS: "--output_location,gs://spark-job/task-result, --output_format, json",
        },
    },
    spark: {
        infrastructureSpec: {
            batch: {
                executorsCount: 2,
                maxExecutorsCount: 100,
            },
            containerImage: {
                image: "test-image",
                javaJars: ["test-java-jars.jar"],
                pythonPackages: ["gs://bucket-name/my/path/to/lib.tar.gz"],
                properties: {
                    name: "wrench",
                    mass: "1.3kg",
                    count: "3",
                },
            },
            vpcNetwork: {
                networkTags: ["test-network-tag"],
                subNetwork: _default.id,
            },
        },
        fileUris: ["gs://terrafrom-test/test.csv"],
        archiveUris: ["gs://terraform-test/test.csv"],
        sqlScript: "show databases",
    },
    project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp

# VPC network
default = gcp.compute.Network("default",
    name="tf-test-workstation-cluster_55138",
    auto_create_subnetworks=True)
project = gcp.organizations.get_project()
example_spark = gcp.dataplex.Lake("example_spark",
    name="tf-test-lake_37559",
    location="us-central1",
    project="my-project-name")
example_spark_task = gcp.dataplex.Task("example_spark",
    task_id="tf-test-task_91980",
    location="us-central1",
    lake=example_spark.name,
    trigger_spec={
        "type": "ON_DEMAND",
    },
    description="task-spark-terraform",
    execution_spec={
        "service_account": f"{project.number}-compute@developer.gserviceaccount.com",
        "args": {
            "TASK_ARGS": "--output_location,gs://spark-job/task-result, --output_format, json",
        },
    },
    spark={
        "infrastructure_spec": {
            "batch": {
                "executors_count": 2,
                "max_executors_count": 100,
            },
            "container_image": {
                "image": "test-image",
                "java_jars": ["test-java-jars.jar"],
                "python_packages": ["gs://bucket-name/my/path/to/lib.tar.gz"],
                "properties": {
                    "name": "wrench",
                    "mass": "1.3kg",
                    "count": "3",
                },
            },
            "vpc_network": {
                "network_tags": ["test-network-tag"],
                "sub_network": default.id,
            },
        },
        "file_uris": ["gs://terrafrom-test/test.csv"],
        "archive_uris": ["gs://terraform-test/test.csv"],
        "sql_script": "show databases",
    },
    project="my-project-name")
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		// VPC network
		_default, err := compute.NewNetwork(ctx, "default", &compute.NetworkArgs{
			Name:                  pulumi.String("tf-test-workstation-cluster_55138"),
			AutoCreateSubnetworks: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		exampleSpark, err := dataplex.NewLake(ctx, "example_spark", &dataplex.LakeArgs{
			Name:     pulumi.String("tf-test-lake_37559"),
			Location: pulumi.String("us-central1"),
			Project:  pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		_, err = dataplex.NewTask(ctx, "example_spark", &dataplex.TaskArgs{
			TaskId:   pulumi.String("tf-test-task_91980"),
			Location: pulumi.String("us-central1"),
			Lake:     exampleSpark.Name,
			TriggerSpec: &dataplex.TaskTriggerSpecArgs{
				Type: pulumi.String("ON_DEMAND"),
			},
			Description: pulumi.String("task-spark-terraform"),
			ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
				ServiceAccount: pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
				Args: pulumi.StringMap{
					"TASK_ARGS": pulumi.String("--output_location,gs://spark-job/task-result, --output_format, json"),
				},
			},
			Spark: &dataplex.TaskSparkArgs{
				InfrastructureSpec: &dataplex.TaskSparkInfrastructureSpecArgs{
					Batch: &dataplex.TaskSparkInfrastructureSpecBatchArgs{
						ExecutorsCount:    pulumi.Int(2),
						MaxExecutorsCount: pulumi.Int(100),
					},
					ContainerImage: &dataplex.TaskSparkInfrastructureSpecContainerImageArgs{
						Image: pulumi.String("test-image"),
						JavaJars: pulumi.StringArray{
							pulumi.String("test-java-jars.jar"),
						},
						PythonPackages: pulumi.StringArray{
							pulumi.String("gs://bucket-name/my/path/to/lib.tar.gz"),
						},
						Properties: pulumi.StringMap{
							"name":  pulumi.String("wrench"),
							"mass":  pulumi.String("1.3kg"),
							"count": pulumi.String("3"),
						},
					},
					VpcNetwork: &dataplex.TaskSparkInfrastructureSpecVpcNetworkArgs{
						NetworkTags: pulumi.StringArray{
							pulumi.String("test-network-tag"),
						},
						SubNetwork: _default.ID(),
					},
				},
				FileUris: pulumi.StringArray{
					pulumi.String("gs://terrafrom-test/test.csv"),
				},
				ArchiveUris: pulumi.StringArray{
					pulumi.String("gs://terraform-test/test.csv"),
				},
				SqlScript: pulumi.String("show databases"),
			},
			Project: pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    // VPC network
    var @default = new Gcp.Compute.Network("default", new()
    {
        Name = "tf-test-workstation-cluster_55138",
        AutoCreateSubnetworks = true,
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var exampleSpark = new Gcp.DataPlex.Lake("example_spark", new()
    {
        Name = "tf-test-lake_37559",
        Location = "us-central1",
        Project = "my-project-name",
    });

    var exampleSparkTask = new Gcp.DataPlex.Task("example_spark", new()
    {
        TaskId = "tf-test-task_91980",
        Location = "us-central1",
        Lake = exampleSpark.Name,
        TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
        {
            Type = "ON_DEMAND",
        },
        Description = "task-spark-terraform",
        ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
        {
            ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
            Args = 
            {
                { "TASK_ARGS", "--output_location,gs://spark-job/task-result, --output_format, json" },
            },
        },
        Spark = new Gcp.DataPlex.Inputs.TaskSparkArgs
        {
            InfrastructureSpec = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecArgs
            {
                Batch = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecBatchArgs
                {
                    ExecutorsCount = 2,
                    MaxExecutorsCount = 100,
                },
                ContainerImage = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecContainerImageArgs
                {
                    Image = "test-image",
                    JavaJars = new[]
                    {
                        "test-java-jars.jar",
                    },
                    PythonPackages = new[]
                    {
                        "gs://bucket-name/my/path/to/lib.tar.gz",
                    },
                    Properties = 
                    {
                        { "name", "wrench" },
                        { "mass", "1.3kg" },
                        { "count", "3" },
                    },
                },
                VpcNetwork = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecVpcNetworkArgs
                {
                    NetworkTags = new[]
                    {
                        "test-network-tag",
                    },
                    SubNetwork = @default.Id,
                },
            },
            FileUris = new[]
            {
                "gs://terrafrom-test/test.csv",
            },
            ArchiveUris = new[]
            {
                "gs://terraform-test/test.csv",
            },
            SqlScript = "show databases",
        },
        Project = "my-project-name",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecBatchArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecContainerImageArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecVpcNetworkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        // VPC network
        var default_ = new Network("default", NetworkArgs.builder()
            .name("tf-test-workstation-cluster_55138")
            .autoCreateSubnetworks(true)
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var exampleSpark = new Lake("exampleSpark", LakeArgs.builder()
            .name("tf-test-lake_37559")
            .location("us-central1")
            .project("my-project-name")
            .build());

        var exampleSparkTask = new Task("exampleSparkTask", TaskArgs.builder()
            .taskId("tf-test-task_91980")
            .location("us-central1")
            .lake(exampleSpark.name())
            .triggerSpec(TaskTriggerSpecArgs.builder()
                .type("ON_DEMAND")
                .build())
            .description("task-spark-terraform")
            .executionSpec(TaskExecutionSpecArgs.builder()
                .serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
                .args(Map.of("TASK_ARGS", "--output_location,gs://spark-job/task-result, --output_format, json"))
                .build())
            .spark(TaskSparkArgs.builder()
                .infrastructureSpec(TaskSparkInfrastructureSpecArgs.builder()
                    .batch(TaskSparkInfrastructureSpecBatchArgs.builder()
                        .executorsCount(2)
                        .maxExecutorsCount(100)
                        .build())
                    .containerImage(TaskSparkInfrastructureSpecContainerImageArgs.builder()
                        .image("test-image")
                        .javaJars("test-java-jars.jar")
                        .pythonPackages("gs://bucket-name/my/path/to/lib.tar.gz")
                        .properties(Map.ofEntries(
                            Map.entry("name", "wrench"),
                            Map.entry("mass", "1.3kg"),
                            Map.entry("count", "3")
                        ))
                        .build())
                    .vpcNetwork(TaskSparkInfrastructureSpecVpcNetworkArgs.builder()
                        .networkTags("test-network-tag")
                        .subNetwork(default_.id())
                        .build())
                    .build())
                .fileUris("gs://terrafrom-test/test.csv")
                .archiveUris("gs://terraform-test/test.csv")
                .sqlScript("show databases")
                .build())
            .project("my-project-name")
            .build());

    }
}
resources:
  # VPC network
  default:
    type: gcp:compute:Network
    properties:
      name: tf-test-workstation-cluster_55138
      autoCreateSubnetworks: true
  exampleSpark:
    type: gcp:dataplex:Lake
    name: example_spark
    properties:
      name: tf-test-lake_37559
      location: us-central1
      project: my-project-name
  exampleSparkTask:
    type: gcp:dataplex:Task
    name: example_spark
    properties:
      taskId: tf-test-task_91980
      location: us-central1
      lake: ${exampleSpark.name}
      triggerSpec:
        type: ON_DEMAND
      description: task-spark-terraform
      executionSpec:
        serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
        args:
          TASK_ARGS: --output_location,gs://spark-job/task-result, --output_format, json
      spark:
        infrastructureSpec:
          batch:
            executorsCount: 2
            maxExecutorsCount: 100
          containerImage:
            image: test-image
            javaJars:
              - test-java-jars.jar
            pythonPackages:
              - gs://bucket-name/my/path/to/lib.tar.gz
            properties:
              name: wrench
              mass: 1.3kg
              count: '3'
          vpcNetwork:
            networkTags:
              - test-network-tag
            subNetwork: ${default.id}
        fileUris:
          - gs://terrafrom-test/test.csv
        archiveUris:
          - gs://terraform-test/test.csv
        sqlScript: show databases
      project: my-project-name
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The infrastructureSpec configures compute resources and runtime dependencies. The batch block sets executor counts (executorsCount for initial allocation, maxExecutorsCount for autoscaling limits). The containerImage block specifies a custom image with Java JARs and Python packages. The vpcNetwork block places executors in your VPC with network tags for firewall rules. The sqlScript property contains the SQL to execute.

Execute Jupyter notebooks with scheduled runs

Data scientists develop workflows in Jupyter notebooks that need production schedules.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

// VPC network
const _default = new gcp.compute.Network("default", {
    name: "tf-test-workstation-cluster_37118",
    autoCreateSubnetworks: true,
});
const project = gcp.organizations.getProject({});
const exampleNotebook = new gcp.dataplex.Lake("example_notebook", {
    name: "tf-test-lake_80332",
    location: "us-central1",
    project: "my-project-name",
});
const exampleNotebookTask = new gcp.dataplex.Task("example_notebook", {
    taskId: "tf-test-task_13293",
    location: "us-central1",
    lake: exampleNotebook.name,
    triggerSpec: {
        type: "RECURRING",
        schedule: "1 * * * *",
    },
    executionSpec: {
        serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
        args: {
            TASK_ARGS: "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json",
        },
    },
    notebook: {
        notebook: "gs://terraform-test/test-notebook.ipynb",
        infrastructureSpec: {
            batch: {
                executorsCount: 2,
                maxExecutorsCount: 100,
            },
            containerImage: {
                image: "test-image",
                javaJars: ["test-java-jars.jar"],
                pythonPackages: ["gs://bucket-name/my/path/to/lib.tar.gz"],
                properties: {
                    name: "wrench",
                    mass: "1.3kg",
                    count: "3",
                },
            },
            vpcNetwork: {
                networkTags: ["test-network-tag"],
                network: _default.id,
            },
        },
        fileUris: ["gs://terraform-test/test.csv"],
        archiveUris: ["gs://terraform-test/test.csv"],
    },
    project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp

# VPC network
default = gcp.compute.Network("default",
    name="tf-test-workstation-cluster_37118",
    auto_create_subnetworks=True)
project = gcp.organizations.get_project()
example_notebook = gcp.dataplex.Lake("example_notebook",
    name="tf-test-lake_80332",
    location="us-central1",
    project="my-project-name")
example_notebook_task = gcp.dataplex.Task("example_notebook",
    task_id="tf-test-task_13293",
    location="us-central1",
    lake=example_notebook.name,
    trigger_spec={
        "type": "RECURRING",
        "schedule": "1 * * * *",
    },
    execution_spec={
        "service_account": f"{project.number}-compute@developer.gserviceaccount.com",
        "args": {
            "TASK_ARGS": "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json",
        },
    },
    notebook={
        "notebook": "gs://terraform-test/test-notebook.ipynb",
        "infrastructure_spec": {
            "batch": {
                "executors_count": 2,
                "max_executors_count": 100,
            },
            "container_image": {
                "image": "test-image",
                "java_jars": ["test-java-jars.jar"],
                "python_packages": ["gs://bucket-name/my/path/to/lib.tar.gz"],
                "properties": {
                    "name": "wrench",
                    "mass": "1.3kg",
                    "count": "3",
                },
            },
            "vpc_network": {
                "network_tags": ["test-network-tag"],
                "network": default.id,
            },
        },
        "file_uris": ["gs://terraform-test/test.csv"],
        "archive_uris": ["gs://terraform-test/test.csv"],
    },
    project="my-project-name")
package main

import (
	"fmt"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		// VPC network
		_default, err := compute.NewNetwork(ctx, "default", &compute.NetworkArgs{
			Name:                  pulumi.String("tf-test-workstation-cluster_37118"),
			AutoCreateSubnetworks: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		exampleNotebook, err := dataplex.NewLake(ctx, "example_notebook", &dataplex.LakeArgs{
			Name:     pulumi.String("tf-test-lake_80332"),
			Location: pulumi.String("us-central1"),
			Project:  pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		_, err = dataplex.NewTask(ctx, "example_notebook", &dataplex.TaskArgs{
			TaskId:   pulumi.String("tf-test-task_13293"),
			Location: pulumi.String("us-central1"),
			Lake:     exampleNotebook.Name,
			TriggerSpec: &dataplex.TaskTriggerSpecArgs{
				Type:     pulumi.String("RECURRING"),
				Schedule: pulumi.String("1 * * * *"),
			},
			ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
				ServiceAccount: pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
				Args: pulumi.StringMap{
					"TASK_ARGS": pulumi.String("--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json"),
				},
			},
			Notebook: &dataplex.TaskNotebookArgs{
				Notebook: pulumi.String("gs://terraform-test/test-notebook.ipynb"),
				InfrastructureSpec: &dataplex.TaskNotebookInfrastructureSpecArgs{
					Batch: &dataplex.TaskNotebookInfrastructureSpecBatchArgs{
						ExecutorsCount:    pulumi.Int(2),
						MaxExecutorsCount: pulumi.Int(100),
					},
					ContainerImage: &dataplex.TaskNotebookInfrastructureSpecContainerImageArgs{
						Image: pulumi.String("test-image"),
						JavaJars: pulumi.StringArray{
							pulumi.String("test-java-jars.jar"),
						},
						PythonPackages: pulumi.StringArray{
							pulumi.String("gs://bucket-name/my/path/to/lib.tar.gz"),
						},
						Properties: pulumi.StringMap{
							"name":  pulumi.String("wrench"),
							"mass":  pulumi.String("1.3kg"),
							"count": pulumi.String("3"),
						},
					},
					VpcNetwork: &dataplex.TaskNotebookInfrastructureSpecVpcNetworkArgs{
						NetworkTags: pulumi.StringArray{
							pulumi.String("test-network-tag"),
						},
						Network: _default.ID(),
					},
				},
				FileUris: pulumi.StringArray{
					pulumi.String("gs://terraform-test/test.csv"),
				},
				ArchiveUris: pulumi.StringArray{
					pulumi.String("gs://terraform-test/test.csv"),
				},
			},
			Project: pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    // VPC network
    var @default = new Gcp.Compute.Network("default", new()
    {
        Name = "tf-test-workstation-cluster_37118",
        AutoCreateSubnetworks = true,
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var exampleNotebook = new Gcp.DataPlex.Lake("example_notebook", new()
    {
        Name = "tf-test-lake_80332",
        Location = "us-central1",
        Project = "my-project-name",
    });

    var exampleNotebookTask = new Gcp.DataPlex.Task("example_notebook", new()
    {
        TaskId = "tf-test-task_13293",
        Location = "us-central1",
        Lake = exampleNotebook.Name,
        TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
        {
            Type = "RECURRING",
            Schedule = "1 * * * *",
        },
        ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
        {
            ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
            Args = 
            {
                { "TASK_ARGS", "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json" },
            },
        },
        Notebook = new Gcp.DataPlex.Inputs.TaskNotebookArgs
        {
            Notebook = "gs://terraform-test/test-notebook.ipynb",
            InfrastructureSpec = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecArgs
            {
                Batch = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecBatchArgs
                {
                    ExecutorsCount = 2,
                    MaxExecutorsCount = 100,
                },
                ContainerImage = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecContainerImageArgs
                {
                    Image = "test-image",
                    JavaJars = new[]
                    {
                        "test-java-jars.jar",
                    },
                    PythonPackages = new[]
                    {
                        "gs://bucket-name/my/path/to/lib.tar.gz",
                    },
                    Properties = 
                    {
                        { "name", "wrench" },
                        { "mass", "1.3kg" },
                        { "count", "3" },
                    },
                },
                VpcNetwork = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecVpcNetworkArgs
                {
                    NetworkTags = new[]
                    {
                        "test-network-tag",
                    },
                    Network = @default.Id,
                },
            },
            FileUris = new[]
            {
                "gs://terraform-test/test.csv",
            },
            ArchiveUris = new[]
            {
                "gs://terraform-test/test.csv",
            },
        },
        Project = "my-project-name",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecBatchArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecContainerImageArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecVpcNetworkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        // VPC network
        var default_ = new Network("default", NetworkArgs.builder()
            .name("tf-test-workstation-cluster_37118")
            .autoCreateSubnetworks(true)
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var exampleNotebook = new Lake("exampleNotebook", LakeArgs.builder()
            .name("tf-test-lake_80332")
            .location("us-central1")
            .project("my-project-name")
            .build());

        var exampleNotebookTask = new Task("exampleNotebookTask", TaskArgs.builder()
            .taskId("tf-test-task_13293")
            .location("us-central1")
            .lake(exampleNotebook.name())
            .triggerSpec(TaskTriggerSpecArgs.builder()
                .type("RECURRING")
                .schedule("1 * * * *")
                .build())
            .executionSpec(TaskExecutionSpecArgs.builder()
                .serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
                .args(Map.of("TASK_ARGS", "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json"))
                .build())
            .notebook(TaskNotebookArgs.builder()
                .notebook("gs://terraform-test/test-notebook.ipynb")
                .infrastructureSpec(TaskNotebookInfrastructureSpecArgs.builder()
                    .batch(TaskNotebookInfrastructureSpecBatchArgs.builder()
                        .executorsCount(2)
                        .maxExecutorsCount(100)
                        .build())
                    .containerImage(TaskNotebookInfrastructureSpecContainerImageArgs.builder()
                        .image("test-image")
                        .javaJars("test-java-jars.jar")
                        .pythonPackages("gs://bucket-name/my/path/to/lib.tar.gz")
                        .properties(Map.ofEntries(
                            Map.entry("name", "wrench"),
                            Map.entry("mass", "1.3kg"),
                            Map.entry("count", "3")
                        ))
                        .build())
                    .vpcNetwork(TaskNotebookInfrastructureSpecVpcNetworkArgs.builder()
                        .networkTags("test-network-tag")
                        .network(default_.id())
                        .build())
                    .build())
                .fileUris("gs://terraform-test/test.csv")
                .archiveUris("gs://terraform-test/test.csv")
                .build())
            .project("my-project-name")
            .build());

    }
}
resources:
  # VPC network
  default:
    type: gcp:compute:Network
    properties:
      name: tf-test-workstation-cluster_37118
      autoCreateSubnetworks: true
  exampleNotebook:
    type: gcp:dataplex:Lake
    name: example_notebook
    properties:
      name: tf-test-lake_80332
      location: us-central1
      project: my-project-name
  exampleNotebookTask:
    type: gcp:dataplex:Task
    name: example_notebook
    properties:
      taskId: tf-test-task_13293
      location: us-central1
      lake: ${exampleNotebook.name}
      triggerSpec:
        type: RECURRING
        schedule: 1 * * * *
      executionSpec:
        serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
        args:
          TASK_ARGS: --output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json
      notebook:
        notebook: gs://terraform-test/test-notebook.ipynb
        infrastructureSpec:
          batch:
            executorsCount: 2
            maxExecutorsCount: 100
          containerImage:
            image: test-image
            javaJars:
              - test-java-jars.jar
            pythonPackages:
              - gs://bucket-name/my/path/to/lib.tar.gz
            properties:
              name: wrench
              mass: 1.3kg
              count: '3'
          vpcNetwork:
            networkTags:
              - test-network-tag
            network: ${default.id}
        fileUris:
          - gs://terraform-test/test.csv
        archiveUris:
          - gs://terraform-test/test.csv
      project: my-project-name
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The notebook block replaces the spark block for notebook-based tasks. The notebook property points to your .ipynb file in GCS. The infrastructureSpec works identically to Spark tasks, configuring executors, container images, and VPC placement. The fileUris and archiveUris properties make additional files available to the notebook at runtime.

Beyond these examples

These snippets focus on specific task-level features: scheduling and retry configuration, Spark and notebook execution, and infrastructure and dependency management. They’re intentionally minimal rather than full data pipeline deployments.

The examples reference pre-existing infrastructure such as Dataplex lakes, VPC networks and subnets, GCS buckets with scripts, notebooks, and dependencies, and service accounts with appropriate IAM permissions. They focus on task configuration rather than provisioning the surrounding infrastructure.

To keep things focused, common task patterns are omitted, including:

  • On-demand triggers without schedules (type: ON_DEMAND shown but not explained)
  • KMS encryption keys (kmsKey property)
  • Job execution lifetime limits (maxJobExecutionLifetime)
  • Task monitoring and execution status tracking

These omissions are intentional: the goal is to illustrate how each task feature is wired, not provide drop-in pipeline modules. See the Dataplex Task resource reference for all available configuration options.

Let's create GCP Dataplex Tasks

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Task Types & Execution
What types of tasks can I create in Dataplex?
You can create three types: Basic Spark tasks using pythonScriptFile, Advanced Spark tasks with sqlScript and infrastructure configuration, or Notebook tasks that execute Jupyter notebooks from Cloud Storage.
What's the difference between spark and notebook tasks?
Both support similar infrastructure options (batch execution, container images, VPC networking), but Spark tasks run SQL scripts or Python files while Notebook tasks execute Jupyter notebooks. They use mutually exclusive configuration blocks.
Which service account does my task use?
You must specify a service account in executionSpec.serviceAccount. The examples use the default compute service account format: {project-number}-compute@developer.gserviceaccount.com.
Scheduling & Triggers
How do I schedule a task to run periodically?
Set triggerSpec.type to RECURRING and provide a cron schedule. For example, schedule: "1 * * * *" runs the task at 1 minute past every hour. You can also configure maxRetries, disabled, and startTime.
How do I run a task on-demand instead of on a schedule?
Set triggerSpec.type to ON_DEMAND. No schedule is required for on-demand tasks.
Infrastructure & Networking
How do I configure VPC networking for my task?
Add vpcNetwork to infrastructureSpec with networkTags and either subNetwork (for Spark tasks) or network (for Notebook tasks). Both reference the VPC network resource ID.
How do I control the number of executors for my task?
Configure infrastructureSpec.batch with executorsCount (initial count) and maxExecutorsCount (maximum for scaling). For example, start with 2 executors and scale up to 100.
Can I use custom container images for my task?
Yes, specify containerImage in infrastructureSpec with the image name, and optionally include javaJars, pythonPackages, and custom properties.
Configuration & Lifecycle
What properties can't I change after creating a task?
The project, lake, location, and taskId properties are immutable. Changing any of these requires recreating the task.
Why aren't all my labels showing up in Pulumi state?
The labels field is non-authoritative and only manages labels defined in your configuration. To see all labels on the resource (including those added by other clients or services), use the effectiveLabels output property.

Using a different cloud?

Explore analytics guides for other cloud providers: