Create GCP Dataplex Tasks

The gcp:dataplex/task:Task resource, part of the Pulumi GCP provider, defines a Dataplex task that encapsulates code (Python scripts, Spark SQL, or Jupyter notebooks), execution parameters, and scheduling configuration. This guide focuses on three capabilities: scheduled Python script execution, Spark infrastructure configuration, and notebook execution with custom containers.

Tasks run within Dataplex lakes and reference GCS-hosted code, container images, and VPC networks that must exist separately. The examples are intentionally small. Combine them with your own lakes, VPC configuration, and code artifacts.

Run a Python script on a recurring schedule

Data pipelines often need to run Python scripts on a schedule to process data in Cloud Storage.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const project = gcp.organizations.getProject({});
const example = new gcp.dataplex.Lake("example", {
    name: "tf-test-lake_33052",
    location: "us-central1",
    project: "my-project-name",
});
const exampleTask = new gcp.dataplex.Task("example", {
    taskId: "tf-test-task_3684",
    location: "us-central1",
    lake: example.name,
    description: "Test Task Basic",
    displayName: "task-basic",
    labels: {
        count: "3",
    },
    triggerSpec: {
        type: "RECURRING",
        disabled: false,
        maxRetries: 3,
        startTime: "2023-10-02T15:01:23Z",
        schedule: "1 * * * *",
    },
    executionSpec: {
        serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
        project: "my-project-name",
        maxJobExecutionLifetime: "100s",
        kmsKey: "234jn2kjn42k3n423",
    },
    spark: {
        pythonScriptFile: "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
    },
    project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp

project = gcp.organizations.get_project()
example = gcp.dataplex.Lake("example",
    name="tf-test-lake_33052",
    location="us-central1",
    project="my-project-name")
example_task = gcp.dataplex.Task("example",
    task_id="tf-test-task_3684",
    location="us-central1",
    lake=example.name,
    description="Test Task Basic",
    display_name="task-basic",
    labels={
        "count": "3",
    },
    trigger_spec={
        "type": "RECURRING",
        "disabled": False,
        "max_retries": 3,
        "start_time": "2023-10-02T15:01:23Z",
        "schedule": "1 * * * *",
    },
    execution_spec={
        "service_account": f"{project.number}-compute@developer.gserviceaccount.com",
        "project": "my-project-name",
        "max_job_execution_lifetime": "100s",
        "kms_key": "234jn2kjn42k3n423",
    },
    spark={
        "python_script_file": "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
    },
    project="my-project-name")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		example, err := dataplex.NewLake(ctx, "example", &dataplex.LakeArgs{
			Name:     pulumi.String("tf-test-lake_33052"),
			Location: pulumi.String("us-central1"),
			Project:  pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		_, err = dataplex.NewTask(ctx, "example", &dataplex.TaskArgs{
			TaskId:      pulumi.String("tf-test-task_3684"),
			Location:    pulumi.String("us-central1"),
			Lake:        example.Name,
			Description: pulumi.String("Test Task Basic"),
			DisplayName: pulumi.String("task-basic"),
			Labels: pulumi.StringMap{
				"count": pulumi.String("3"),
			},
			TriggerSpec: &dataplex.TaskTriggerSpecArgs{
				Type:       pulumi.String("RECURRING"),
				Disabled:   pulumi.Bool(false),
				MaxRetries: pulumi.Int(3),
				StartTime:  pulumi.String("2023-10-02T15:01:23Z"),
				Schedule:   pulumi.String("1 * * * *"),
			},
			ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
				ServiceAccount:          pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
				Project:                 pulumi.String("my-project-name"),
				MaxJobExecutionLifetime: pulumi.String("100s"),
				KmsKey:                  pulumi.String("234jn2kjn42k3n423"),
			},
			Spark: &dataplex.TaskSparkArgs{
				PythonScriptFile: pulumi.String("gs://dataproc-examples/pyspark/hello-world/hello-world.py"),
			},
			Project: pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var project = Gcp.Organizations.GetProject.Invoke();

    var example = new Gcp.DataPlex.Lake("example", new()
    {
        Name = "tf-test-lake_33052",
        Location = "us-central1",
        Project = "my-project-name",
    });

    var exampleTask = new Gcp.DataPlex.Task("example", new()
    {
        TaskId = "tf-test-task_3684",
        Location = "us-central1",
        Lake = example.Name,
        Description = "Test Task Basic",
        DisplayName = "task-basic",
        Labels = 
        {
            { "count", "3" },
        },
        TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
        {
            Type = "RECURRING",
            Disabled = false,
            MaxRetries = 3,
            StartTime = "2023-10-02T15:01:23Z",
            Schedule = "1 * * * *",
        },
        ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
        {
            ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
            Project = "my-project-name",
            MaxJobExecutionLifetime = "100s",
            KmsKey = "234jn2kjn42k3n423",
        },
        Spark = new Gcp.DataPlex.Inputs.TaskSparkArgs
        {
            PythonScriptFile = "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
        },
        Project = "my-project-name",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var example = new Lake("example", LakeArgs.builder()
            .name("tf-test-lake_33052")
            .location("us-central1")
            .project("my-project-name")
            .build());

        var exampleTask = new Task("exampleTask", TaskArgs.builder()
            .taskId("tf-test-task_3684")
            .location("us-central1")
            .lake(example.name())
            .description("Test Task Basic")
            .displayName("task-basic")
            .labels(Map.of("count", "3"))
            .triggerSpec(TaskTriggerSpecArgs.builder()
                .type("RECURRING")
                .disabled(false)
                .maxRetries(3)
                .startTime("2023-10-02T15:01:23Z")
                .schedule("1 * * * *")
                .build())
            .executionSpec(TaskExecutionSpecArgs.builder()
                .serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
                .project("my-project-name")
                .maxJobExecutionLifetime("100s")
                .kmsKey("234jn2kjn42k3n423")
                .build())
            .spark(TaskSparkArgs.builder()
                .pythonScriptFile("gs://dataproc-examples/pyspark/hello-world/hello-world.py")
                .build())
            .project("my-project-name")
            .build());

    }
}
resources:
  example:
    type: gcp:dataplex:Lake
    properties:
      name: tf-test-lake_33052
      location: us-central1
      project: my-project-name
  exampleTask:
    type: gcp:dataplex:Task
    name: example
    properties:
      taskId: tf-test-task_3684
      location: us-central1
      lake: ${example.name}
      description: Test Task Basic
      displayName: task-basic
      labels:
        count: '3'
      triggerSpec:
        type: RECURRING
        disabled: false
        maxRetries: 3
        startTime: 2023-10-02T15:01:23Z
        schedule: 1 * * * *
      executionSpec:
        serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
        project: my-project-name
        maxJobExecutionLifetime: 100s
        kmsKey: 234jn2kjn42k3n423
      spark:
        pythonScriptFile: gs://dataproc-examples/pyspark/hello-world/hello-world.py
      project: my-project-name
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The triggerSpec defines when the task runs. Setting type to “RECURRING” with a cron-style schedule runs the task hourly. The spark block points to your Python script in GCS via pythonScriptFile. The executionSpec specifies the service account that runs the job and handles retries via maxRetries.

Configure Spark infrastructure with custom containers and networking

Production Spark jobs often require custom container images, specific executor counts, and VPC networking to access private data sources.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

// VPC network
const _default = new gcp.compute.Network("default", {
    name: "tf-test-workstation-cluster_10719",
    autoCreateSubnetworks: true,
});
const project = gcp.organizations.getProject({});
const exampleSpark = new gcp.dataplex.Lake("example_spark", {
    name: "tf-test-lake_1443",
    location: "us-central1",
    project: "my-project-name",
});
const exampleSparkTask = new gcp.dataplex.Task("example_spark", {
    taskId: "tf-test-task_26032",
    location: "us-central1",
    lake: exampleSpark.name,
    triggerSpec: {
        type: "ON_DEMAND",
    },
    description: "task-spark-terraform",
    executionSpec: {
        serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
        args: {
            TASK_ARGS: "--output_location,gs://spark-job/task-result, --output_format, json",
        },
    },
    spark: {
        infrastructureSpec: {
            batch: {
                executorsCount: 2,
                maxExecutorsCount: 100,
            },
            containerImage: {
                image: "test-image",
                javaJars: ["test-java-jars.jar"],
                pythonPackages: ["gs://bucket-name/my/path/to/lib.tar.gz"],
                properties: {
                    name: "wrench",
                    mass: "1.3kg",
                    count: "3",
                },
            },
            vpcNetwork: {
                networkTags: ["test-network-tag"],
                subNetwork: _default.id,
            },
        },
        fileUris: ["gs://terrafrom-test/test.csv"],
        archiveUris: ["gs://terraform-test/test.csv"],
        sqlScript: "show databases",
    },
    project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp

# VPC network
default = gcp.compute.Network("default",
    name="tf-test-workstation-cluster_10719",
    auto_create_subnetworks=True)
project = gcp.organizations.get_project()
example_spark = gcp.dataplex.Lake("example_spark",
    name="tf-test-lake_1443",
    location="us-central1",
    project="my-project-name")
example_spark_task = gcp.dataplex.Task("example_spark",
    task_id="tf-test-task_26032",
    location="us-central1",
    lake=example_spark.name,
    trigger_spec={
        "type": "ON_DEMAND",
    },
    description="task-spark-terraform",
    execution_spec={
        "service_account": f"{project.number}-compute@developer.gserviceaccount.com",
        "args": {
            "TASK_ARGS": "--output_location,gs://spark-job/task-result, --output_format, json",
        },
    },
    spark={
        "infrastructure_spec": {
            "batch": {
                "executors_count": 2,
                "max_executors_count": 100,
            },
            "container_image": {
                "image": "test-image",
                "java_jars": ["test-java-jars.jar"],
                "python_packages": ["gs://bucket-name/my/path/to/lib.tar.gz"],
                "properties": {
                    "name": "wrench",
                    "mass": "1.3kg",
                    "count": "3",
                },
            },
            "vpc_network": {
                "network_tags": ["test-network-tag"],
                "sub_network": default.id,
            },
        },
        "file_uris": ["gs://terrafrom-test/test.csv"],
        "archive_uris": ["gs://terraform-test/test.csv"],
        "sql_script": "show databases",
    },
    project="my-project-name")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		// VPC network
		_default, err := compute.NewNetwork(ctx, "default", &compute.NetworkArgs{
			Name:                  pulumi.String("tf-test-workstation-cluster_10719"),
			AutoCreateSubnetworks: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		exampleSpark, err := dataplex.NewLake(ctx, "example_spark", &dataplex.LakeArgs{
			Name:     pulumi.String("tf-test-lake_1443"),
			Location: pulumi.String("us-central1"),
			Project:  pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		_, err = dataplex.NewTask(ctx, "example_spark", &dataplex.TaskArgs{
			TaskId:   pulumi.String("tf-test-task_26032"),
			Location: pulumi.String("us-central1"),
			Lake:     exampleSpark.Name,
			TriggerSpec: &dataplex.TaskTriggerSpecArgs{
				Type: pulumi.String("ON_DEMAND"),
			},
			Description: pulumi.String("task-spark-terraform"),
			ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
				ServiceAccount: pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
				Args: pulumi.StringMap{
					"TASK_ARGS": pulumi.String("--output_location,gs://spark-job/task-result, --output_format, json"),
				},
			},
			Spark: &dataplex.TaskSparkArgs{
				InfrastructureSpec: &dataplex.TaskSparkInfrastructureSpecArgs{
					Batch: &dataplex.TaskSparkInfrastructureSpecBatchArgs{
						ExecutorsCount:    pulumi.Int(2),
						MaxExecutorsCount: pulumi.Int(100),
					},
					ContainerImage: &dataplex.TaskSparkInfrastructureSpecContainerImageArgs{
						Image: pulumi.String("test-image"),
						JavaJars: pulumi.StringArray{
							pulumi.String("test-java-jars.jar"),
						},
						PythonPackages: pulumi.StringArray{
							pulumi.String("gs://bucket-name/my/path/to/lib.tar.gz"),
						},
						Properties: pulumi.StringMap{
							"name":  pulumi.String("wrench"),
							"mass":  pulumi.String("1.3kg"),
							"count": pulumi.String("3"),
						},
					},
					VpcNetwork: &dataplex.TaskSparkInfrastructureSpecVpcNetworkArgs{
						NetworkTags: pulumi.StringArray{
							pulumi.String("test-network-tag"),
						},
						SubNetwork: _default.ID(),
					},
				},
				FileUris: pulumi.StringArray{
					pulumi.String("gs://terrafrom-test/test.csv"),
				},
				ArchiveUris: pulumi.StringArray{
					pulumi.String("gs://terraform-test/test.csv"),
				},
				SqlScript: pulumi.String("show databases"),
			},
			Project: pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    // VPC network
    var @default = new Gcp.Compute.Network("default", new()
    {
        Name = "tf-test-workstation-cluster_10719",
        AutoCreateSubnetworks = true,
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var exampleSpark = new Gcp.DataPlex.Lake("example_spark", new()
    {
        Name = "tf-test-lake_1443",
        Location = "us-central1",
        Project = "my-project-name",
    });

    var exampleSparkTask = new Gcp.DataPlex.Task("example_spark", new()
    {
        TaskId = "tf-test-task_26032",
        Location = "us-central1",
        Lake = exampleSpark.Name,
        TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
        {
            Type = "ON_DEMAND",
        },
        Description = "task-spark-terraform",
        ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
        {
            ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
            Args = 
            {
                { "TASK_ARGS", "--output_location,gs://spark-job/task-result, --output_format, json" },
            },
        },
        Spark = new Gcp.DataPlex.Inputs.TaskSparkArgs
        {
            InfrastructureSpec = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecArgs
            {
                Batch = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecBatchArgs
                {
                    ExecutorsCount = 2,
                    MaxExecutorsCount = 100,
                },
                ContainerImage = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecContainerImageArgs
                {
                    Image = "test-image",
                    JavaJars = new[]
                    {
                        "test-java-jars.jar",
                    },
                    PythonPackages = new[]
                    {
                        "gs://bucket-name/my/path/to/lib.tar.gz",
                    },
                    Properties = 
                    {
                        { "name", "wrench" },
                        { "mass", "1.3kg" },
                        { "count", "3" },
                    },
                },
                VpcNetwork = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecVpcNetworkArgs
                {
                    NetworkTags = new[]
                    {
                        "test-network-tag",
                    },
                    SubNetwork = @default.Id,
                },
            },
            FileUris = new[]
            {
                "gs://terrafrom-test/test.csv",
            },
            ArchiveUris = new[]
            {
                "gs://terraform-test/test.csv",
            },
            SqlScript = "show databases",
        },
        Project = "my-project-name",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecBatchArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecContainerImageArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecVpcNetworkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        // VPC network
        var default_ = new Network("default", NetworkArgs.builder()
            .name("tf-test-workstation-cluster_10719")
            .autoCreateSubnetworks(true)
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var exampleSpark = new Lake("exampleSpark", LakeArgs.builder()
            .name("tf-test-lake_1443")
            .location("us-central1")
            .project("my-project-name")
            .build());

        var exampleSparkTask = new Task("exampleSparkTask", TaskArgs.builder()
            .taskId("tf-test-task_26032")
            .location("us-central1")
            .lake(exampleSpark.name())
            .triggerSpec(TaskTriggerSpecArgs.builder()
                .type("ON_DEMAND")
                .build())
            .description("task-spark-terraform")
            .executionSpec(TaskExecutionSpecArgs.builder()
                .serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
                .args(Map.of("TASK_ARGS", "--output_location,gs://spark-job/task-result, --output_format, json"))
                .build())
            .spark(TaskSparkArgs.builder()
                .infrastructureSpec(TaskSparkInfrastructureSpecArgs.builder()
                    .batch(TaskSparkInfrastructureSpecBatchArgs.builder()
                        .executorsCount(2)
                        .maxExecutorsCount(100)
                        .build())
                    .containerImage(TaskSparkInfrastructureSpecContainerImageArgs.builder()
                        .image("test-image")
                        .javaJars("test-java-jars.jar")
                        .pythonPackages("gs://bucket-name/my/path/to/lib.tar.gz")
                        .properties(Map.ofEntries(
                            Map.entry("name", "wrench"),
                            Map.entry("mass", "1.3kg"),
                            Map.entry("count", "3")
                        ))
                        .build())
                    .vpcNetwork(TaskSparkInfrastructureSpecVpcNetworkArgs.builder()
                        .networkTags("test-network-tag")
                        .subNetwork(default_.id())
                        .build())
                    .build())
                .fileUris("gs://terrafrom-test/test.csv")
                .archiveUris("gs://terraform-test/test.csv")
                .sqlScript("show databases")
                .build())
            .project("my-project-name")
            .build());

    }
}
resources:
  # VPC network
  default:
    type: gcp:compute:Network
    properties:
      name: tf-test-workstation-cluster_10719
      autoCreateSubnetworks: true
  exampleSpark:
    type: gcp:dataplex:Lake
    name: example_spark
    properties:
      name: tf-test-lake_1443
      location: us-central1
      project: my-project-name
  exampleSparkTask:
    type: gcp:dataplex:Task
    name: example_spark
    properties:
      taskId: tf-test-task_26032
      location: us-central1
      lake: ${exampleSpark.name}
      triggerSpec:
        type: ON_DEMAND
      description: task-spark-terraform
      executionSpec:
        serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
        args:
          TASK_ARGS: --output_location,gs://spark-job/task-result, --output_format, json
      spark:
        infrastructureSpec:
          batch:
            executorsCount: 2
            maxExecutorsCount: 100
          containerImage:
            image: test-image
            javaJars:
              - test-java-jars.jar
            pythonPackages:
              - gs://bucket-name/my/path/to/lib.tar.gz
            properties:
              name: wrench
              mass: 1.3kg
              count: '3'
          vpcNetwork:
            networkTags:
              - test-network-tag
            subNetwork: ${default.id}
        fileUris:
          - gs://terrafrom-test/test.csv
        archiveUris:
          - gs://terraform-test/test.csv
        sqlScript: show databases
      project: my-project-name
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The infrastructureSpec within the spark block controls compute resources. The batch section sets executor counts, while containerImage specifies your custom image and dependencies. The vpcNetwork block places executors in your VPC with network tags for firewall rules. The sqlScript property defines the Spark SQL to execute.

Execute Jupyter notebooks with managed infrastructure

Data scientists develop analysis workflows in Jupyter notebooks that need to run on a schedule with production-grade infrastructure.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

// VPC network
const _default = new gcp.compute.Network("default", {
    name: "tf-test-workstation-cluster_8647",
    autoCreateSubnetworks: true,
});
const project = gcp.organizations.getProject({});
const exampleNotebook = new gcp.dataplex.Lake("example_notebook", {
    name: "tf-test-lake_50610",
    location: "us-central1",
    project: "my-project-name",
});
const exampleNotebookTask = new gcp.dataplex.Task("example_notebook", {
    taskId: "tf-test-task_77124",
    location: "us-central1",
    lake: exampleNotebook.name,
    triggerSpec: {
        type: "RECURRING",
        schedule: "1 * * * *",
    },
    executionSpec: {
        serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
        args: {
            TASK_ARGS: "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json",
        },
    },
    notebook: {
        notebook: "gs://terraform-test/test-notebook.ipynb",
        infrastructureSpec: {
            batch: {
                executorsCount: 2,
                maxExecutorsCount: 100,
            },
            containerImage: {
                image: "test-image",
                javaJars: ["test-java-jars.jar"],
                pythonPackages: ["gs://bucket-name/my/path/to/lib.tar.gz"],
                properties: {
                    name: "wrench",
                    mass: "1.3kg",
                    count: "3",
                },
            },
            vpcNetwork: {
                networkTags: ["test-network-tag"],
                network: _default.id,
            },
        },
        fileUris: ["gs://terraform-test/test.csv"],
        archiveUris: ["gs://terraform-test/test.csv"],
    },
    project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp

# VPC network
default = gcp.compute.Network("default",
    name="tf-test-workstation-cluster_8647",
    auto_create_subnetworks=True)
project = gcp.organizations.get_project()
example_notebook = gcp.dataplex.Lake("example_notebook",
    name="tf-test-lake_50610",
    location="us-central1",
    project="my-project-name")
example_notebook_task = gcp.dataplex.Task("example_notebook",
    task_id="tf-test-task_77124",
    location="us-central1",
    lake=example_notebook.name,
    trigger_spec={
        "type": "RECURRING",
        "schedule": "1 * * * *",
    },
    execution_spec={
        "service_account": f"{project.number}-compute@developer.gserviceaccount.com",
        "args": {
            "TASK_ARGS": "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json",
        },
    },
    notebook={
        "notebook": "gs://terraform-test/test-notebook.ipynb",
        "infrastructure_spec": {
            "batch": {
                "executors_count": 2,
                "max_executors_count": 100,
            },
            "container_image": {
                "image": "test-image",
                "java_jars": ["test-java-jars.jar"],
                "python_packages": ["gs://bucket-name/my/path/to/lib.tar.gz"],
                "properties": {
                    "name": "wrench",
                    "mass": "1.3kg",
                    "count": "3",
                },
            },
            "vpc_network": {
                "network_tags": ["test-network-tag"],
                "network": default.id,
            },
        },
        "file_uris": ["gs://terraform-test/test.csv"],
        "archive_uris": ["gs://terraform-test/test.csv"],
    },
    project="my-project-name")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		// VPC network
		_default, err := compute.NewNetwork(ctx, "default", &compute.NetworkArgs{
			Name:                  pulumi.String("tf-test-workstation-cluster_8647"),
			AutoCreateSubnetworks: pulumi.Bool(true),
		})
		if err != nil {
			return err
		}
		project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
		if err != nil {
			return err
		}
		exampleNotebook, err := dataplex.NewLake(ctx, "example_notebook", &dataplex.LakeArgs{
			Name:     pulumi.String("tf-test-lake_50610"),
			Location: pulumi.String("us-central1"),
			Project:  pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		_, err = dataplex.NewTask(ctx, "example_notebook", &dataplex.TaskArgs{
			TaskId:   pulumi.String("tf-test-task_77124"),
			Location: pulumi.String("us-central1"),
			Lake:     exampleNotebook.Name,
			TriggerSpec: &dataplex.TaskTriggerSpecArgs{
				Type:     pulumi.String("RECURRING"),
				Schedule: pulumi.String("1 * * * *"),
			},
			ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
				ServiceAccount: pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
				Args: pulumi.StringMap{
					"TASK_ARGS": pulumi.String("--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json"),
				},
			},
			Notebook: &dataplex.TaskNotebookArgs{
				Notebook: pulumi.String("gs://terraform-test/test-notebook.ipynb"),
				InfrastructureSpec: &dataplex.TaskNotebookInfrastructureSpecArgs{
					Batch: &dataplex.TaskNotebookInfrastructureSpecBatchArgs{
						ExecutorsCount:    pulumi.Int(2),
						MaxExecutorsCount: pulumi.Int(100),
					},
					ContainerImage: &dataplex.TaskNotebookInfrastructureSpecContainerImageArgs{
						Image: pulumi.String("test-image"),
						JavaJars: pulumi.StringArray{
							pulumi.String("test-java-jars.jar"),
						},
						PythonPackages: pulumi.StringArray{
							pulumi.String("gs://bucket-name/my/path/to/lib.tar.gz"),
						},
						Properties: pulumi.StringMap{
							"name":  pulumi.String("wrench"),
							"mass":  pulumi.String("1.3kg"),
							"count": pulumi.String("3"),
						},
					},
					VpcNetwork: &dataplex.TaskNotebookInfrastructureSpecVpcNetworkArgs{
						NetworkTags: pulumi.StringArray{
							pulumi.String("test-network-tag"),
						},
						Network: _default.ID(),
					},
				},
				FileUris: pulumi.StringArray{
					pulumi.String("gs://terraform-test/test.csv"),
				},
				ArchiveUris: pulumi.StringArray{
					pulumi.String("gs://terraform-test/test.csv"),
				},
			},
			Project: pulumi.String("my-project-name"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    // VPC network
    var @default = new Gcp.Compute.Network("default", new()
    {
        Name = "tf-test-workstation-cluster_8647",
        AutoCreateSubnetworks = true,
    });

    var project = Gcp.Organizations.GetProject.Invoke();

    var exampleNotebook = new Gcp.DataPlex.Lake("example_notebook", new()
    {
        Name = "tf-test-lake_50610",
        Location = "us-central1",
        Project = "my-project-name",
    });

    var exampleNotebookTask = new Gcp.DataPlex.Task("example_notebook", new()
    {
        TaskId = "tf-test-task_77124",
        Location = "us-central1",
        Lake = exampleNotebook.Name,
        TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
        {
            Type = "RECURRING",
            Schedule = "1 * * * *",
        },
        ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
        {
            ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
            Args = 
            {
                { "TASK_ARGS", "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json" },
            },
        },
        Notebook = new Gcp.DataPlex.Inputs.TaskNotebookArgs
        {
            Notebook = "gs://terraform-test/test-notebook.ipynb",
            InfrastructureSpec = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecArgs
            {
                Batch = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecBatchArgs
                {
                    ExecutorsCount = 2,
                    MaxExecutorsCount = 100,
                },
                ContainerImage = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecContainerImageArgs
                {
                    Image = "test-image",
                    JavaJars = new[]
                    {
                        "test-java-jars.jar",
                    },
                    PythonPackages = new[]
                    {
                        "gs://bucket-name/my/path/to/lib.tar.gz",
                    },
                    Properties = 
                    {
                        { "name", "wrench" },
                        { "mass", "1.3kg" },
                        { "count", "3" },
                    },
                },
                VpcNetwork = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecVpcNetworkArgs
                {
                    NetworkTags = new[]
                    {
                        "test-network-tag",
                    },
                    Network = @default.Id,
                },
            },
            FileUris = new[]
            {
                "gs://terraform-test/test.csv",
            },
            ArchiveUris = new[]
            {
                "gs://terraform-test/test.csv",
            },
        },
        Project = "my-project-name",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecBatchArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecContainerImageArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecVpcNetworkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        // VPC network
        var default_ = new Network("default", NetworkArgs.builder()
            .name("tf-test-workstation-cluster_8647")
            .autoCreateSubnetworks(true)
            .build());

        final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
            .build());

        var exampleNotebook = new Lake("exampleNotebook", LakeArgs.builder()
            .name("tf-test-lake_50610")
            .location("us-central1")
            .project("my-project-name")
            .build());

        var exampleNotebookTask = new Task("exampleNotebookTask", TaskArgs.builder()
            .taskId("tf-test-task_77124")
            .location("us-central1")
            .lake(exampleNotebook.name())
            .triggerSpec(TaskTriggerSpecArgs.builder()
                .type("RECURRING")
                .schedule("1 * * * *")
                .build())
            .executionSpec(TaskExecutionSpecArgs.builder()
                .serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
                .args(Map.of("TASK_ARGS", "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json"))
                .build())
            .notebook(TaskNotebookArgs.builder()
                .notebook("gs://terraform-test/test-notebook.ipynb")
                .infrastructureSpec(TaskNotebookInfrastructureSpecArgs.builder()
                    .batch(TaskNotebookInfrastructureSpecBatchArgs.builder()
                        .executorsCount(2)
                        .maxExecutorsCount(100)
                        .build())
                    .containerImage(TaskNotebookInfrastructureSpecContainerImageArgs.builder()
                        .image("test-image")
                        .javaJars("test-java-jars.jar")
                        .pythonPackages("gs://bucket-name/my/path/to/lib.tar.gz")
                        .properties(Map.ofEntries(
                            Map.entry("name", "wrench"),
                            Map.entry("mass", "1.3kg"),
                            Map.entry("count", "3")
                        ))
                        .build())
                    .vpcNetwork(TaskNotebookInfrastructureSpecVpcNetworkArgs.builder()
                        .networkTags("test-network-tag")
                        .network(default_.id())
                        .build())
                    .build())
                .fileUris("gs://terraform-test/test.csv")
                .archiveUris("gs://terraform-test/test.csv")
                .build())
            .project("my-project-name")
            .build());

    }
}
resources:
  # VPC network
  default:
    type: gcp:compute:Network
    properties:
      name: tf-test-workstation-cluster_8647
      autoCreateSubnetworks: true
  exampleNotebook:
    type: gcp:dataplex:Lake
    name: example_notebook
    properties:
      name: tf-test-lake_50610
      location: us-central1
      project: my-project-name
  exampleNotebookTask:
    type: gcp:dataplex:Task
    name: example_notebook
    properties:
      taskId: tf-test-task_77124
      location: us-central1
      lake: ${exampleNotebook.name}
      triggerSpec:
        type: RECURRING
        schedule: 1 * * * *
      executionSpec:
        serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
        args:
          TASK_ARGS: --output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json
      notebook:
        notebook: gs://terraform-test/test-notebook.ipynb
        infrastructureSpec:
          batch:
            executorsCount: 2
            maxExecutorsCount: 100
          containerImage:
            image: test-image
            javaJars:
              - test-java-jars.jar
            pythonPackages:
              - gs://bucket-name/my/path/to/lib.tar.gz
            properties:
              name: wrench
              mass: 1.3kg
              count: '3'
          vpcNetwork:
            networkTags:
              - test-network-tag
            network: ${default.id}
        fileUris:
          - gs://terraform-test/test.csv
        archiveUris:
          - gs://terraform-test/test.csv
      project: my-project-name
variables:
  project:
    fn::invoke:
      function: gcp:organizations:getProject
      arguments: {}

The notebook block replaces the spark block for notebook-based tasks. The notebook property points to your .ipynb file in GCS. The infrastructureSpec works identically to Spark tasks, controlling executor counts and container configuration. The triggerSpec schedule runs the notebook hourly.

Beyond these examples

These snippets focus on specific task-level features: Python and Spark SQL execution, custom container images and VPC networking, and notebook execution with managed infrastructure. They’re intentionally minimal rather than full data pipeline deployments.

The examples reference pre-existing infrastructure such as Dataplex lakes, VPC networks and subnets, GCS buckets with scripts and notebooks, and container images in registries. They focus on configuring the task rather than provisioning the surrounding data platform.

To keep things focused, common task patterns are omitted, including:

  • On-demand triggers (type: ON_DEMAND without schedule)
  • KMS encryption keys (kmsKey in executionSpec)
  • Job execution lifetime limits (maxJobExecutionLifetime)
  • Custom arguments and environment variables (args in executionSpec)

These omissions are intentional: the goal is to illustrate how each task feature is wired, not provide drop-in data pipeline modules. See the Dataplex Task resource reference for all available configuration options.

Let's create GCP Dataplex Tasks

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Task Configuration & Types
What's the difference between Spark and Notebook tasks?
Spark tasks use the spark block for running Spark jobs (Python scripts, SQL scripts, or JARs), while Notebook tasks use the notebook block for running Jupyter notebooks. You must configure one or the other, not both.
What trigger types are available for Dataplex tasks?
Tasks support two trigger types: RECURRING for scheduled execution (requires a schedule field) and ON_DEMAND for manual triggering (no schedule needed).
Scheduling & Execution
How do I create a task that runs on a schedule?
Set triggerSpec.type to RECURRING and provide a schedule in cron format, such as 1 * * * * for hourly execution.
How do I create a task that runs on demand?
Set triggerSpec.type to ON_DEMAND. No schedule field is required for on-demand tasks.
What service account should I use for task execution?
Configure executionSpec.serviceAccount with an appropriate service account. Examples show using the compute service account: {project-number}-compute@developer.gserviceaccount.com.
Networking & Infrastructure
How do I configure VPC networking for my task?
Use the vpcNetwork block within infrastructureSpec (under spark or notebook), specifying network or subNetwork and optional networkTags.
How do I configure container images and dependencies?
Use the containerImage block within infrastructureSpec to specify a custom image, javaJars, pythonPackages, and additional properties.
Labels & Immutability
Why don't I see all labels in my Pulumi state?
The labels field is non-authoritative and only manages labels defined in your configuration. To see all labels on the resource (including those added by other clients), use effectiveLabels.
What properties can't be changed after creating a task?
The properties project, lake, location, and taskId are immutable. Changing any of these will force recreation of the task.
Can I change the trigger type from RECURRING to ON_DEMAND?
Yes, triggerSpec.type is not marked as immutable and can be updated. However, switching from RECURRING to ON_DEMAND means the schedule field will no longer be used.

Using a different cloud?

Explore analytics guides for other cloud providers: