The gcp:dataplex/task:Task resource, part of the Pulumi GCP provider, defines scheduled data processing jobs in Dataplex: their code, execution environment, and trigger configuration. This guide focuses on three capabilities: recurring schedule configuration, Spark job infrastructure, and notebook execution.
Tasks run within Dataplex lakes and reference GCS artifacts, VPC networks, and service accounts that must exist separately. The examples are intentionally small. Combine them with your own lake infrastructure, networking, and IAM configuration.
Run Python scripts on a recurring schedule
Data pipelines often execute Python scripts at regular intervals for ETL, quality checks, or transformations.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const project = gcp.organizations.getProject({});
const example = new gcp.dataplex.Lake("example", {
name: "tf-test-lake_56529",
location: "us-central1",
project: "my-project-name",
});
const exampleTask = new gcp.dataplex.Task("example", {
taskId: "tf-test-task_75413",
location: "us-central1",
lake: example.name,
description: "Test Task Basic",
displayName: "task-basic",
labels: {
count: "3",
},
triggerSpec: {
type: "RECURRING",
disabled: false,
maxRetries: 3,
startTime: "2023-10-02T15:01:23Z",
schedule: "1 * * * *",
},
executionSpec: {
serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
project: "my-project-name",
maxJobExecutionLifetime: "100s",
kmsKey: "234jn2kjn42k3n423",
},
spark: {
pythonScriptFile: "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
},
project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp
project = gcp.organizations.get_project()
example = gcp.dataplex.Lake("example",
name="tf-test-lake_56529",
location="us-central1",
project="my-project-name")
example_task = gcp.dataplex.Task("example",
task_id="tf-test-task_75413",
location="us-central1",
lake=example.name,
description="Test Task Basic",
display_name="task-basic",
labels={
"count": "3",
},
trigger_spec={
"type": "RECURRING",
"disabled": False,
"max_retries": 3,
"start_time": "2023-10-02T15:01:23Z",
"schedule": "1 * * * *",
},
execution_spec={
"service_account": f"{project.number}-compute@developer.gserviceaccount.com",
"project": "my-project-name",
"max_job_execution_lifetime": "100s",
"kms_key": "234jn2kjn42k3n423",
},
spark={
"python_script_file": "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
},
project="my-project-name")
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
example, err := dataplex.NewLake(ctx, "example", &dataplex.LakeArgs{
Name: pulumi.String("tf-test-lake_56529"),
Location: pulumi.String("us-central1"),
Project: pulumi.String("my-project-name"),
})
if err != nil {
return err
}
_, err = dataplex.NewTask(ctx, "example", &dataplex.TaskArgs{
TaskId: pulumi.String("tf-test-task_75413"),
Location: pulumi.String("us-central1"),
Lake: example.Name,
Description: pulumi.String("Test Task Basic"),
DisplayName: pulumi.String("task-basic"),
Labels: pulumi.StringMap{
"count": pulumi.String("3"),
},
TriggerSpec: &dataplex.TaskTriggerSpecArgs{
Type: pulumi.String("RECURRING"),
Disabled: pulumi.Bool(false),
MaxRetries: pulumi.Int(3),
StartTime: pulumi.String("2023-10-02T15:01:23Z"),
Schedule: pulumi.String("1 * * * *"),
},
ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
ServiceAccount: pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
Project: pulumi.String("my-project-name"),
MaxJobExecutionLifetime: pulumi.String("100s"),
KmsKey: pulumi.String("234jn2kjn42k3n423"),
},
Spark: &dataplex.TaskSparkArgs{
PythonScriptFile: pulumi.String("gs://dataproc-examples/pyspark/hello-world/hello-world.py"),
},
Project: pulumi.String("my-project-name"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var project = Gcp.Organizations.GetProject.Invoke();
var example = new Gcp.DataPlex.Lake("example", new()
{
Name = "tf-test-lake_56529",
Location = "us-central1",
Project = "my-project-name",
});
var exampleTask = new Gcp.DataPlex.Task("example", new()
{
TaskId = "tf-test-task_75413",
Location = "us-central1",
Lake = example.Name,
Description = "Test Task Basic",
DisplayName = "task-basic",
Labels =
{
{ "count", "3" },
},
TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
{
Type = "RECURRING",
Disabled = false,
MaxRetries = 3,
StartTime = "2023-10-02T15:01:23Z",
Schedule = "1 * * * *",
},
ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
{
ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
Project = "my-project-name",
MaxJobExecutionLifetime = "100s",
KmsKey = "234jn2kjn42k3n423",
},
Spark = new Gcp.DataPlex.Inputs.TaskSparkArgs
{
PythonScriptFile = "gs://dataproc-examples/pyspark/hello-world/hello-world.py",
},
Project = "my-project-name",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var example = new Lake("example", LakeArgs.builder()
.name("tf-test-lake_56529")
.location("us-central1")
.project("my-project-name")
.build());
var exampleTask = new Task("exampleTask", TaskArgs.builder()
.taskId("tf-test-task_75413")
.location("us-central1")
.lake(example.name())
.description("Test Task Basic")
.displayName("task-basic")
.labels(Map.of("count", "3"))
.triggerSpec(TaskTriggerSpecArgs.builder()
.type("RECURRING")
.disabled(false)
.maxRetries(3)
.startTime("2023-10-02T15:01:23Z")
.schedule("1 * * * *")
.build())
.executionSpec(TaskExecutionSpecArgs.builder()
.serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
.project("my-project-name")
.maxJobExecutionLifetime("100s")
.kmsKey("234jn2kjn42k3n423")
.build())
.spark(TaskSparkArgs.builder()
.pythonScriptFile("gs://dataproc-examples/pyspark/hello-world/hello-world.py")
.build())
.project("my-project-name")
.build());
}
}
resources:
example:
type: gcp:dataplex:Lake
properties:
name: tf-test-lake_56529
location: us-central1
project: my-project-name
exampleTask:
type: gcp:dataplex:Task
name: example
properties:
taskId: tf-test-task_75413
location: us-central1
lake: ${example.name}
description: Test Task Basic
displayName: task-basic
labels:
count: '3'
triggerSpec:
type: RECURRING
disabled: false
maxRetries: 3
startTime: 2023-10-02T15:01:23Z
schedule: 1 * * * *
executionSpec:
serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
project: my-project-name
maxJobExecutionLifetime: 100s
kmsKey: 234jn2kjn42k3n423
spark:
pythonScriptFile: gs://dataproc-examples/pyspark/hello-world/hello-world.py
project: my-project-name
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The triggerSpec defines when the task runs. Setting type to “RECURRING” enables cron-style scheduling via the schedule property (“1 * * * *” means hourly at minute 1). The maxRetries property controls automatic retry attempts on failure. The executionSpec specifies the service account that runs the job, while the spark block points to your Python script in GCS.
Configure Spark infrastructure and dependencies
Production Spark jobs need control over executor counts, container images, and network placement.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
// VPC network
const _default = new gcp.compute.Network("default", {
name: "tf-test-workstation-cluster_55138",
autoCreateSubnetworks: true,
});
const project = gcp.organizations.getProject({});
const exampleSpark = new gcp.dataplex.Lake("example_spark", {
name: "tf-test-lake_37559",
location: "us-central1",
project: "my-project-name",
});
const exampleSparkTask = new gcp.dataplex.Task("example_spark", {
taskId: "tf-test-task_91980",
location: "us-central1",
lake: exampleSpark.name,
triggerSpec: {
type: "ON_DEMAND",
},
description: "task-spark-terraform",
executionSpec: {
serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
args: {
TASK_ARGS: "--output_location,gs://spark-job/task-result, --output_format, json",
},
},
spark: {
infrastructureSpec: {
batch: {
executorsCount: 2,
maxExecutorsCount: 100,
},
containerImage: {
image: "test-image",
javaJars: ["test-java-jars.jar"],
pythonPackages: ["gs://bucket-name/my/path/to/lib.tar.gz"],
properties: {
name: "wrench",
mass: "1.3kg",
count: "3",
},
},
vpcNetwork: {
networkTags: ["test-network-tag"],
subNetwork: _default.id,
},
},
fileUris: ["gs://terrafrom-test/test.csv"],
archiveUris: ["gs://terraform-test/test.csv"],
sqlScript: "show databases",
},
project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp
# VPC network
default = gcp.compute.Network("default",
name="tf-test-workstation-cluster_55138",
auto_create_subnetworks=True)
project = gcp.organizations.get_project()
example_spark = gcp.dataplex.Lake("example_spark",
name="tf-test-lake_37559",
location="us-central1",
project="my-project-name")
example_spark_task = gcp.dataplex.Task("example_spark",
task_id="tf-test-task_91980",
location="us-central1",
lake=example_spark.name,
trigger_spec={
"type": "ON_DEMAND",
},
description="task-spark-terraform",
execution_spec={
"service_account": f"{project.number}-compute@developer.gserviceaccount.com",
"args": {
"TASK_ARGS": "--output_location,gs://spark-job/task-result, --output_format, json",
},
},
spark={
"infrastructure_spec": {
"batch": {
"executors_count": 2,
"max_executors_count": 100,
},
"container_image": {
"image": "test-image",
"java_jars": ["test-java-jars.jar"],
"python_packages": ["gs://bucket-name/my/path/to/lib.tar.gz"],
"properties": {
"name": "wrench",
"mass": "1.3kg",
"count": "3",
},
},
"vpc_network": {
"network_tags": ["test-network-tag"],
"sub_network": default.id,
},
},
"file_uris": ["gs://terrafrom-test/test.csv"],
"archive_uris": ["gs://terraform-test/test.csv"],
"sql_script": "show databases",
},
project="my-project-name")
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
// VPC network
_default, err := compute.NewNetwork(ctx, "default", &compute.NetworkArgs{
Name: pulumi.String("tf-test-workstation-cluster_55138"),
AutoCreateSubnetworks: pulumi.Bool(true),
})
if err != nil {
return err
}
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
exampleSpark, err := dataplex.NewLake(ctx, "example_spark", &dataplex.LakeArgs{
Name: pulumi.String("tf-test-lake_37559"),
Location: pulumi.String("us-central1"),
Project: pulumi.String("my-project-name"),
})
if err != nil {
return err
}
_, err = dataplex.NewTask(ctx, "example_spark", &dataplex.TaskArgs{
TaskId: pulumi.String("tf-test-task_91980"),
Location: pulumi.String("us-central1"),
Lake: exampleSpark.Name,
TriggerSpec: &dataplex.TaskTriggerSpecArgs{
Type: pulumi.String("ON_DEMAND"),
},
Description: pulumi.String("task-spark-terraform"),
ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
ServiceAccount: pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
Args: pulumi.StringMap{
"TASK_ARGS": pulumi.String("--output_location,gs://spark-job/task-result, --output_format, json"),
},
},
Spark: &dataplex.TaskSparkArgs{
InfrastructureSpec: &dataplex.TaskSparkInfrastructureSpecArgs{
Batch: &dataplex.TaskSparkInfrastructureSpecBatchArgs{
ExecutorsCount: pulumi.Int(2),
MaxExecutorsCount: pulumi.Int(100),
},
ContainerImage: &dataplex.TaskSparkInfrastructureSpecContainerImageArgs{
Image: pulumi.String("test-image"),
JavaJars: pulumi.StringArray{
pulumi.String("test-java-jars.jar"),
},
PythonPackages: pulumi.StringArray{
pulumi.String("gs://bucket-name/my/path/to/lib.tar.gz"),
},
Properties: pulumi.StringMap{
"name": pulumi.String("wrench"),
"mass": pulumi.String("1.3kg"),
"count": pulumi.String("3"),
},
},
VpcNetwork: &dataplex.TaskSparkInfrastructureSpecVpcNetworkArgs{
NetworkTags: pulumi.StringArray{
pulumi.String("test-network-tag"),
},
SubNetwork: _default.ID(),
},
},
FileUris: pulumi.StringArray{
pulumi.String("gs://terrafrom-test/test.csv"),
},
ArchiveUris: pulumi.StringArray{
pulumi.String("gs://terraform-test/test.csv"),
},
SqlScript: pulumi.String("show databases"),
},
Project: pulumi.String("my-project-name"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
// VPC network
var @default = new Gcp.Compute.Network("default", new()
{
Name = "tf-test-workstation-cluster_55138",
AutoCreateSubnetworks = true,
});
var project = Gcp.Organizations.GetProject.Invoke();
var exampleSpark = new Gcp.DataPlex.Lake("example_spark", new()
{
Name = "tf-test-lake_37559",
Location = "us-central1",
Project = "my-project-name",
});
var exampleSparkTask = new Gcp.DataPlex.Task("example_spark", new()
{
TaskId = "tf-test-task_91980",
Location = "us-central1",
Lake = exampleSpark.Name,
TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
{
Type = "ON_DEMAND",
},
Description = "task-spark-terraform",
ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
{
ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
Args =
{
{ "TASK_ARGS", "--output_location,gs://spark-job/task-result, --output_format, json" },
},
},
Spark = new Gcp.DataPlex.Inputs.TaskSparkArgs
{
InfrastructureSpec = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecArgs
{
Batch = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecBatchArgs
{
ExecutorsCount = 2,
MaxExecutorsCount = 100,
},
ContainerImage = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecContainerImageArgs
{
Image = "test-image",
JavaJars = new[]
{
"test-java-jars.jar",
},
PythonPackages = new[]
{
"gs://bucket-name/my/path/to/lib.tar.gz",
},
Properties =
{
{ "name", "wrench" },
{ "mass", "1.3kg" },
{ "count", "3" },
},
},
VpcNetwork = new Gcp.DataPlex.Inputs.TaskSparkInfrastructureSpecVpcNetworkArgs
{
NetworkTags = new[]
{
"test-network-tag",
},
SubNetwork = @default.Id,
},
},
FileUris = new[]
{
"gs://terrafrom-test/test.csv",
},
ArchiveUris = new[]
{
"gs://terraform-test/test.csv",
},
SqlScript = "show databases",
},
Project = "my-project-name",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecBatchArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecContainerImageArgs;
import com.pulumi.gcp.dataplex.inputs.TaskSparkInfrastructureSpecVpcNetworkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
// VPC network
var default_ = new Network("default", NetworkArgs.builder()
.name("tf-test-workstation-cluster_55138")
.autoCreateSubnetworks(true)
.build());
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var exampleSpark = new Lake("exampleSpark", LakeArgs.builder()
.name("tf-test-lake_37559")
.location("us-central1")
.project("my-project-name")
.build());
var exampleSparkTask = new Task("exampleSparkTask", TaskArgs.builder()
.taskId("tf-test-task_91980")
.location("us-central1")
.lake(exampleSpark.name())
.triggerSpec(TaskTriggerSpecArgs.builder()
.type("ON_DEMAND")
.build())
.description("task-spark-terraform")
.executionSpec(TaskExecutionSpecArgs.builder()
.serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
.args(Map.of("TASK_ARGS", "--output_location,gs://spark-job/task-result, --output_format, json"))
.build())
.spark(TaskSparkArgs.builder()
.infrastructureSpec(TaskSparkInfrastructureSpecArgs.builder()
.batch(TaskSparkInfrastructureSpecBatchArgs.builder()
.executorsCount(2)
.maxExecutorsCount(100)
.build())
.containerImage(TaskSparkInfrastructureSpecContainerImageArgs.builder()
.image("test-image")
.javaJars("test-java-jars.jar")
.pythonPackages("gs://bucket-name/my/path/to/lib.tar.gz")
.properties(Map.ofEntries(
Map.entry("name", "wrench"),
Map.entry("mass", "1.3kg"),
Map.entry("count", "3")
))
.build())
.vpcNetwork(TaskSparkInfrastructureSpecVpcNetworkArgs.builder()
.networkTags("test-network-tag")
.subNetwork(default_.id())
.build())
.build())
.fileUris("gs://terrafrom-test/test.csv")
.archiveUris("gs://terraform-test/test.csv")
.sqlScript("show databases")
.build())
.project("my-project-name")
.build());
}
}
resources:
# VPC network
default:
type: gcp:compute:Network
properties:
name: tf-test-workstation-cluster_55138
autoCreateSubnetworks: true
exampleSpark:
type: gcp:dataplex:Lake
name: example_spark
properties:
name: tf-test-lake_37559
location: us-central1
project: my-project-name
exampleSparkTask:
type: gcp:dataplex:Task
name: example_spark
properties:
taskId: tf-test-task_91980
location: us-central1
lake: ${exampleSpark.name}
triggerSpec:
type: ON_DEMAND
description: task-spark-terraform
executionSpec:
serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
args:
TASK_ARGS: --output_location,gs://spark-job/task-result, --output_format, json
spark:
infrastructureSpec:
batch:
executorsCount: 2
maxExecutorsCount: 100
containerImage:
image: test-image
javaJars:
- test-java-jars.jar
pythonPackages:
- gs://bucket-name/my/path/to/lib.tar.gz
properties:
name: wrench
mass: 1.3kg
count: '3'
vpcNetwork:
networkTags:
- test-network-tag
subNetwork: ${default.id}
fileUris:
- gs://terrafrom-test/test.csv
archiveUris:
- gs://terraform-test/test.csv
sqlScript: show databases
project: my-project-name
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The infrastructureSpec configures compute resources and runtime dependencies. The batch block sets executor counts (executorsCount for initial allocation, maxExecutorsCount for autoscaling limits). The containerImage block specifies a custom image with Java JARs and Python packages. The vpcNetwork block places executors in your VPC with network tags for firewall rules. The sqlScript property contains the SQL to execute.
Execute Jupyter notebooks with scheduled runs
Data scientists develop workflows in Jupyter notebooks that need production schedules.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
// VPC network
const _default = new gcp.compute.Network("default", {
name: "tf-test-workstation-cluster_37118",
autoCreateSubnetworks: true,
});
const project = gcp.organizations.getProject({});
const exampleNotebook = new gcp.dataplex.Lake("example_notebook", {
name: "tf-test-lake_80332",
location: "us-central1",
project: "my-project-name",
});
const exampleNotebookTask = new gcp.dataplex.Task("example_notebook", {
taskId: "tf-test-task_13293",
location: "us-central1",
lake: exampleNotebook.name,
triggerSpec: {
type: "RECURRING",
schedule: "1 * * * *",
},
executionSpec: {
serviceAccount: project.then(project => `${project.number}-compute@developer.gserviceaccount.com`),
args: {
TASK_ARGS: "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json",
},
},
notebook: {
notebook: "gs://terraform-test/test-notebook.ipynb",
infrastructureSpec: {
batch: {
executorsCount: 2,
maxExecutorsCount: 100,
},
containerImage: {
image: "test-image",
javaJars: ["test-java-jars.jar"],
pythonPackages: ["gs://bucket-name/my/path/to/lib.tar.gz"],
properties: {
name: "wrench",
mass: "1.3kg",
count: "3",
},
},
vpcNetwork: {
networkTags: ["test-network-tag"],
network: _default.id,
},
},
fileUris: ["gs://terraform-test/test.csv"],
archiveUris: ["gs://terraform-test/test.csv"],
},
project: "my-project-name",
});
import pulumi
import pulumi_gcp as gcp
# VPC network
default = gcp.compute.Network("default",
name="tf-test-workstation-cluster_37118",
auto_create_subnetworks=True)
project = gcp.organizations.get_project()
example_notebook = gcp.dataplex.Lake("example_notebook",
name="tf-test-lake_80332",
location="us-central1",
project="my-project-name")
example_notebook_task = gcp.dataplex.Task("example_notebook",
task_id="tf-test-task_13293",
location="us-central1",
lake=example_notebook.name,
trigger_spec={
"type": "RECURRING",
"schedule": "1 * * * *",
},
execution_spec={
"service_account": f"{project.number}-compute@developer.gserviceaccount.com",
"args": {
"TASK_ARGS": "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json",
},
},
notebook={
"notebook": "gs://terraform-test/test-notebook.ipynb",
"infrastructure_spec": {
"batch": {
"executors_count": 2,
"max_executors_count": 100,
},
"container_image": {
"image": "test-image",
"java_jars": ["test-java-jars.jar"],
"python_packages": ["gs://bucket-name/my/path/to/lib.tar.gz"],
"properties": {
"name": "wrench",
"mass": "1.3kg",
"count": "3",
},
},
"vpc_network": {
"network_tags": ["test-network-tag"],
"network": default.id,
},
},
"file_uris": ["gs://terraform-test/test.csv"],
"archive_uris": ["gs://terraform-test/test.csv"],
},
project="my-project-name")
package main
import (
"fmt"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/compute"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/dataplex"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/organizations"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
// VPC network
_default, err := compute.NewNetwork(ctx, "default", &compute.NetworkArgs{
Name: pulumi.String("tf-test-workstation-cluster_37118"),
AutoCreateSubnetworks: pulumi.Bool(true),
})
if err != nil {
return err
}
project, err := organizations.LookupProject(ctx, &organizations.LookupProjectArgs{}, nil)
if err != nil {
return err
}
exampleNotebook, err := dataplex.NewLake(ctx, "example_notebook", &dataplex.LakeArgs{
Name: pulumi.String("tf-test-lake_80332"),
Location: pulumi.String("us-central1"),
Project: pulumi.String("my-project-name"),
})
if err != nil {
return err
}
_, err = dataplex.NewTask(ctx, "example_notebook", &dataplex.TaskArgs{
TaskId: pulumi.String("tf-test-task_13293"),
Location: pulumi.String("us-central1"),
Lake: exampleNotebook.Name,
TriggerSpec: &dataplex.TaskTriggerSpecArgs{
Type: pulumi.String("RECURRING"),
Schedule: pulumi.String("1 * * * *"),
},
ExecutionSpec: &dataplex.TaskExecutionSpecArgs{
ServiceAccount: pulumi.Sprintf("%v-compute@developer.gserviceaccount.com", project.Number),
Args: pulumi.StringMap{
"TASK_ARGS": pulumi.String("--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json"),
},
},
Notebook: &dataplex.TaskNotebookArgs{
Notebook: pulumi.String("gs://terraform-test/test-notebook.ipynb"),
InfrastructureSpec: &dataplex.TaskNotebookInfrastructureSpecArgs{
Batch: &dataplex.TaskNotebookInfrastructureSpecBatchArgs{
ExecutorsCount: pulumi.Int(2),
MaxExecutorsCount: pulumi.Int(100),
},
ContainerImage: &dataplex.TaskNotebookInfrastructureSpecContainerImageArgs{
Image: pulumi.String("test-image"),
JavaJars: pulumi.StringArray{
pulumi.String("test-java-jars.jar"),
},
PythonPackages: pulumi.StringArray{
pulumi.String("gs://bucket-name/my/path/to/lib.tar.gz"),
},
Properties: pulumi.StringMap{
"name": pulumi.String("wrench"),
"mass": pulumi.String("1.3kg"),
"count": pulumi.String("3"),
},
},
VpcNetwork: &dataplex.TaskNotebookInfrastructureSpecVpcNetworkArgs{
NetworkTags: pulumi.StringArray{
pulumi.String("test-network-tag"),
},
Network: _default.ID(),
},
},
FileUris: pulumi.StringArray{
pulumi.String("gs://terraform-test/test.csv"),
},
ArchiveUris: pulumi.StringArray{
pulumi.String("gs://terraform-test/test.csv"),
},
},
Project: pulumi.String("my-project-name"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
// VPC network
var @default = new Gcp.Compute.Network("default", new()
{
Name = "tf-test-workstation-cluster_37118",
AutoCreateSubnetworks = true,
});
var project = Gcp.Organizations.GetProject.Invoke();
var exampleNotebook = new Gcp.DataPlex.Lake("example_notebook", new()
{
Name = "tf-test-lake_80332",
Location = "us-central1",
Project = "my-project-name",
});
var exampleNotebookTask = new Gcp.DataPlex.Task("example_notebook", new()
{
TaskId = "tf-test-task_13293",
Location = "us-central1",
Lake = exampleNotebook.Name,
TriggerSpec = new Gcp.DataPlex.Inputs.TaskTriggerSpecArgs
{
Type = "RECURRING",
Schedule = "1 * * * *",
},
ExecutionSpec = new Gcp.DataPlex.Inputs.TaskExecutionSpecArgs
{
ServiceAccount = $"{project.Apply(getProjectResult => getProjectResult.Number)}-compute@developer.gserviceaccount.com",
Args =
{
{ "TASK_ARGS", "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json" },
},
},
Notebook = new Gcp.DataPlex.Inputs.TaskNotebookArgs
{
Notebook = "gs://terraform-test/test-notebook.ipynb",
InfrastructureSpec = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecArgs
{
Batch = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecBatchArgs
{
ExecutorsCount = 2,
MaxExecutorsCount = 100,
},
ContainerImage = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecContainerImageArgs
{
Image = "test-image",
JavaJars = new[]
{
"test-java-jars.jar",
},
PythonPackages = new[]
{
"gs://bucket-name/my/path/to/lib.tar.gz",
},
Properties =
{
{ "name", "wrench" },
{ "mass", "1.3kg" },
{ "count", "3" },
},
},
VpcNetwork = new Gcp.DataPlex.Inputs.TaskNotebookInfrastructureSpecVpcNetworkArgs
{
NetworkTags = new[]
{
"test-network-tag",
},
Network = @default.Id,
},
},
FileUris = new[]
{
"gs://terraform-test/test.csv",
},
ArchiveUris = new[]
{
"gs://terraform-test/test.csv",
},
},
Project = "my-project-name",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.compute.Network;
import com.pulumi.gcp.compute.NetworkArgs;
import com.pulumi.gcp.organizations.OrganizationsFunctions;
import com.pulumi.gcp.organizations.inputs.GetProjectArgs;
import com.pulumi.gcp.dataplex.Lake;
import com.pulumi.gcp.dataplex.LakeArgs;
import com.pulumi.gcp.dataplex.Task;
import com.pulumi.gcp.dataplex.TaskArgs;
import com.pulumi.gcp.dataplex.inputs.TaskTriggerSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskExecutionSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecBatchArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecContainerImageArgs;
import com.pulumi.gcp.dataplex.inputs.TaskNotebookInfrastructureSpecVpcNetworkArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
// VPC network
var default_ = new Network("default", NetworkArgs.builder()
.name("tf-test-workstation-cluster_37118")
.autoCreateSubnetworks(true)
.build());
final var project = OrganizationsFunctions.getProject(GetProjectArgs.builder()
.build());
var exampleNotebook = new Lake("exampleNotebook", LakeArgs.builder()
.name("tf-test-lake_80332")
.location("us-central1")
.project("my-project-name")
.build());
var exampleNotebookTask = new Task("exampleNotebookTask", TaskArgs.builder()
.taskId("tf-test-task_13293")
.location("us-central1")
.lake(exampleNotebook.name())
.triggerSpec(TaskTriggerSpecArgs.builder()
.type("RECURRING")
.schedule("1 * * * *")
.build())
.executionSpec(TaskExecutionSpecArgs.builder()
.serviceAccount(String.format("%s-compute@developer.gserviceaccount.com", project.number()))
.args(Map.of("TASK_ARGS", "--output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json"))
.build())
.notebook(TaskNotebookArgs.builder()
.notebook("gs://terraform-test/test-notebook.ipynb")
.infrastructureSpec(TaskNotebookInfrastructureSpecArgs.builder()
.batch(TaskNotebookInfrastructureSpecBatchArgs.builder()
.executorsCount(2)
.maxExecutorsCount(100)
.build())
.containerImage(TaskNotebookInfrastructureSpecContainerImageArgs.builder()
.image("test-image")
.javaJars("test-java-jars.jar")
.pythonPackages("gs://bucket-name/my/path/to/lib.tar.gz")
.properties(Map.ofEntries(
Map.entry("name", "wrench"),
Map.entry("mass", "1.3kg"),
Map.entry("count", "3")
))
.build())
.vpcNetwork(TaskNotebookInfrastructureSpecVpcNetworkArgs.builder()
.networkTags("test-network-tag")
.network(default_.id())
.build())
.build())
.fileUris("gs://terraform-test/test.csv")
.archiveUris("gs://terraform-test/test.csv")
.build())
.project("my-project-name")
.build());
}
}
resources:
# VPC network
default:
type: gcp:compute:Network
properties:
name: tf-test-workstation-cluster_37118
autoCreateSubnetworks: true
exampleNotebook:
type: gcp:dataplex:Lake
name: example_notebook
properties:
name: tf-test-lake_80332
location: us-central1
project: my-project-name
exampleNotebookTask:
type: gcp:dataplex:Task
name: example_notebook
properties:
taskId: tf-test-task_13293
location: us-central1
lake: ${exampleNotebook.name}
triggerSpec:
type: RECURRING
schedule: 1 * * * *
executionSpec:
serviceAccount: ${project.number}-compute@developer.gserviceaccount.com
args:
TASK_ARGS: --output_location,gs://spark-job-jars-anrajitha/task-result, --output_format, json
notebook:
notebook: gs://terraform-test/test-notebook.ipynb
infrastructureSpec:
batch:
executorsCount: 2
maxExecutorsCount: 100
containerImage:
image: test-image
javaJars:
- test-java-jars.jar
pythonPackages:
- gs://bucket-name/my/path/to/lib.tar.gz
properties:
name: wrench
mass: 1.3kg
count: '3'
vpcNetwork:
networkTags:
- test-network-tag
network: ${default.id}
fileUris:
- gs://terraform-test/test.csv
archiveUris:
- gs://terraform-test/test.csv
project: my-project-name
variables:
project:
fn::invoke:
function: gcp:organizations:getProject
arguments: {}
The notebook block replaces the spark block for notebook-based tasks. The notebook property points to your .ipynb file in GCS. The infrastructureSpec works identically to Spark tasks, configuring executors, container images, and VPC placement. The fileUris and archiveUris properties make additional files available to the notebook at runtime.
Beyond these examples
These snippets focus on specific task-level features: scheduling and retry configuration, Spark and notebook execution, and infrastructure and dependency management. They’re intentionally minimal rather than full data pipeline deployments.
The examples reference pre-existing infrastructure such as Dataplex lakes, VPC networks and subnets, GCS buckets with scripts, notebooks, and dependencies, and service accounts with appropriate IAM permissions. They focus on task configuration rather than provisioning the surrounding infrastructure.
To keep things focused, common task patterns are omitted, including:
- On-demand triggers without schedules (type: ON_DEMAND shown but not explained)
- KMS encryption keys (kmsKey property)
- Job execution lifetime limits (maxJobExecutionLifetime)
- Task monitoring and execution status tracking
These omissions are intentional: the goal is to illustrate how each task feature is wired, not provide drop-in pipeline modules. See the Dataplex Task resource reference for all available configuration options.
Let's create GCP Dataplex Tasks
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Task Types & Execution
pythonScriptFile, Advanced Spark tasks with sqlScript and infrastructure configuration, or Notebook tasks that execute Jupyter notebooks from Cloud Storage.executionSpec.serviceAccount. The examples use the default compute service account format: {project-number}-compute@developer.gserviceaccount.com.Scheduling & Triggers
triggerSpec.type to RECURRING and provide a cron schedule. For example, schedule: "1 * * * *" runs the task at 1 minute past every hour. You can also configure maxRetries, disabled, and startTime.triggerSpec.type to ON_DEMAND. No schedule is required for on-demand tasks.Infrastructure & Networking
vpcNetwork to infrastructureSpec with networkTags and either subNetwork (for Spark tasks) or network (for Notebook tasks). Both reference the VPC network resource ID.infrastructureSpec.batch with executorsCount (initial count) and maxExecutorsCount (maximum for scaling). For example, start with 2 executors and scale up to 100.containerImage in infrastructureSpec with the image name, and optionally include javaJars, pythonPackages, and custom properties.Configuration & Lifecycle
project, lake, location, and taskId properties are immutable. Changing any of these requires recreating the task.labels field is non-authoritative and only manages labels defined in your configuration. To see all labels on the resource (including those added by other clients or services), use the effectiveLabels output property.Using a different cloud?
Explore analytics guides for other cloud providers: