Create GCP BigQuery Routines

The gcp:bigquery/routine:Routine resource, part of the Pulumi GCP provider, defines user-defined functions and stored procedures within BigQuery datasets. This guide focuses on four capabilities: SQL and JavaScript scalar functions, table-valued functions, PySpark and Scala Spark procedures, and remote function and data masking integration.

Routines belong to datasets and may reference BigQuery Connections for Spark or remote functions, Cloud Storage files, or external endpoints. The examples are intentionally small. Combine them with your own datasets, connections, and infrastructure.

Create a SQL stored procedure

Data teams often need to encapsulate multi-step SQL logic into reusable procedures that can be called from queries or scheduled jobs.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "SQL",
    securityMode: "INVOKER",
    definitionBody: "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="SQL",
    security_mode="INVOKER",
    definition_body="CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("PROCEDURE"),
			Language:       pulumi.String("SQL"),
			SecurityMode:   pulumi.String("INVOKER"),
			DefinitionBody: pulumi.String("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var sproc = new Gcp.BigQuery.Routine("sproc", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "SQL",
        SecurityMode = "INVOKER",
        DefinitionBody = "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var sproc = new Routine("sproc", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("SQL")
            .securityMode("INVOKER")
            .definitionBody("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  sproc:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: SQL
      securityMode: INVOKER
      definitionBody: CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);

The routineType property set to PROCEDURE defines this as a stored procedure rather than a function. The definitionBody contains the SQL logic to execute. The securityMode property controls whether the routine runs with the caller’s permissions (INVOKER) or the definer’s permissions (DEFINER).

Define a JavaScript scalar function with typed arguments

When SQL expressions aren’t sufficient, JavaScript functions provide custom logic with explicit type contracts for inputs and outputs.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "SCALAR_FUNCTION",
    language: "JAVASCRIPT",
    definitionBody: "CREATE FUNCTION multiplyInputs return x*y;",
    arguments: [
        {
            name: "x",
            dataType: "{\"typeKind\" :  \"FLOAT64\"}",
        },
        {
            name: "y",
            dataType: "{\"typeKind\" :  \"FLOAT64\"}",
        },
    ],
    returnType: "{\"typeKind\" :  \"FLOAT64\"}",
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="SCALAR_FUNCTION",
    language="JAVASCRIPT",
    definition_body="CREATE FUNCTION multiplyInputs return x*y;",
    arguments=[
        {
            "name": "x",
            "data_type": "{\"typeKind\" :  \"FLOAT64\"}",
        },
        {
            "name": "y",
            "data_type": "{\"typeKind\" :  \"FLOAT64\"}",
        },
    ],
    return_type="{\"typeKind\" :  \"FLOAT64\"}")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("SCALAR_FUNCTION"),
			Language:       pulumi.String("JAVASCRIPT"),
			DefinitionBody: pulumi.String("CREATE FUNCTION multiplyInputs return x*y;"),
			Arguments: bigquery.RoutineArgumentArray{
				&bigquery.RoutineArgumentArgs{
					Name:     pulumi.String("x"),
					DataType: pulumi.String("{\"typeKind\" :  \"FLOAT64\"}"),
				},
				&bigquery.RoutineArgumentArgs{
					Name:     pulumi.String("y"),
					DataType: pulumi.String("{\"typeKind\" :  \"FLOAT64\"}"),
				},
			},
			ReturnType: pulumi.String("{\"typeKind\" :  \"FLOAT64\"}"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var sproc = new Gcp.BigQuery.Routine("sproc", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "SCALAR_FUNCTION",
        Language = "JAVASCRIPT",
        DefinitionBody = "CREATE FUNCTION multiplyInputs return x*y;",
        Arguments = new[]
        {
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "x",
                DataType = "{\"typeKind\" :  \"FLOAT64\"}",
            },
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "y",
                DataType = "{\"typeKind\" :  \"FLOAT64\"}",
            },
        },
        ReturnType = "{\"typeKind\" :  \"FLOAT64\"}",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var sproc = new Routine("sproc", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("SCALAR_FUNCTION")
            .language("JAVASCRIPT")
            .definitionBody("CREATE FUNCTION multiplyInputs return x*y;")
            .arguments(            
                RoutineArgumentArgs.builder()
                    .name("x")
                    .dataType("{\"typeKind\" :  \"FLOAT64\"}")
                    .build(),
                RoutineArgumentArgs.builder()
                    .name("y")
                    .dataType("{\"typeKind\" :  \"FLOAT64\"}")
                    .build())
            .returnType("{\"typeKind\" :  \"FLOAT64\"}")
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  sproc:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: SCALAR_FUNCTION
      language: JAVASCRIPT
      definitionBody: CREATE FUNCTION multiplyInputs return x*y;
      arguments:
        - name: x
          dataType: '{"typeKind" :  "FLOAT64"}'
        - name: y
          dataType: '{"typeKind" :  "FLOAT64"}'
      returnType: '{"typeKind" :  "FLOAT64"}'

The routineType SCALAR_FUNCTION returns a single value. The arguments array defines input parameters with dataType specified as JSON schemas. The returnType property defines the output type, also as a JSON schema. JavaScript functions execute in BigQuery’s JavaScript runtime.

Return table results from a function

Table-valued functions transform input parameters into result sets, enabling reusable query logic that returns multiple rows and columns.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "TABLE_VALUED_FUNCTION",
    language: "SQL",
    definitionBody: "SELECT 1 + value AS value\n",
    arguments: [{
        name: "value",
        argumentKind: "FIXED_TYPE",
        dataType: JSON.stringify({
            typeKind: "INT64",
        }),
    }],
    returnTableType: JSON.stringify({
        columns: [{
            name: "value",
            type: {
                typeKind: "INT64",
            },
        }],
    }),
});
import pulumi
import json
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="TABLE_VALUED_FUNCTION",
    language="SQL",
    definition_body="SELECT 1 + value AS value\n",
    arguments=[{
        "name": "value",
        "argument_kind": "FIXED_TYPE",
        "data_type": json.dumps({
            "typeKind": "INT64",
        }),
    }],
    return_table_type=json.dumps({
        "columns": [{
            "name": "value",
            "type": {
                "typeKind": "INT64",
            },
        }],
    }))
package main

import (
	"encoding/json"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		tmpJSON0, err := json.Marshal(map[string]interface{}{
			"typeKind": "INT64",
		})
		if err != nil {
			return err
		}
		json0 := string(tmpJSON0)
		tmpJSON1, err := json.Marshal(map[string]interface{}{
			"columns": []map[string]interface{}{
				map[string]interface{}{
					"name": "value",
					"type": map[string]interface{}{
						"typeKind": "INT64",
					},
				},
			},
		})
		if err != nil {
			return err
		}
		json1 := string(tmpJSON1)
		_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("TABLE_VALUED_FUNCTION"),
			Language:       pulumi.String("SQL"),
			DefinitionBody: pulumi.String("SELECT 1 + value AS value\n"),
			Arguments: bigquery.RoutineArgumentArray{
				&bigquery.RoutineArgumentArgs{
					Name:         pulumi.String("value"),
					ArgumentKind: pulumi.String("FIXED_TYPE"),
					DataType:     pulumi.String(json0),
				},
			},
			ReturnTableType: pulumi.String(json1),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var sproc = new Gcp.BigQuery.Routine("sproc", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "TABLE_VALUED_FUNCTION",
        Language = "SQL",
        DefinitionBody = @"SELECT 1 + value AS value
",
        Arguments = new[]
        {
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "value",
                ArgumentKind = "FIXED_TYPE",
                DataType = JsonSerializer.Serialize(new Dictionary<string, object?>
                {
                    ["typeKind"] = "INT64",
                }),
            },
        },
        ReturnTableType = JsonSerializer.Serialize(new Dictionary<string, object?>
        {
            ["columns"] = new[]
            {
                new Dictionary<string, object?>
                {
                    ["name"] = "value",
                    ["type"] = new Dictionary<string, object?>
                    {
                        ["typeKind"] = "INT64",
                    },
                },
            },
        }),
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var sproc = new Routine("sproc", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("TABLE_VALUED_FUNCTION")
            .language("SQL")
            .definitionBody("""
SELECT 1 + value AS value
            """)
            .arguments(RoutineArgumentArgs.builder()
                .name("value")
                .argumentKind("FIXED_TYPE")
                .dataType(serializeJson(
                    jsonObject(
                        jsonProperty("typeKind", "INT64")
                    )))
                .build())
            .returnTableType(serializeJson(
                jsonObject(
                    jsonProperty("columns", jsonArray(jsonObject(
                        jsonProperty("name", "value"),
                        jsonProperty("type", jsonObject(
                            jsonProperty("typeKind", "INT64")
                        ))
                    )))
                )))
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  sproc:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: TABLE_VALUED_FUNCTION
      language: SQL
      definitionBody: |
        SELECT 1 + value AS value        
      arguments:
        - name: value
          argumentKind: FIXED_TYPE
          dataType:
            fn::toJSON:
              typeKind: INT64
      returnTableType:
        fn::toJSON:
          columns:
            - name: value
              type:
                typeKind: INT64

The TABLE_VALUED_FUNCTION routine type returns a table rather than a scalar value. The returnTableType property defines the output schema as a JSON structure with column names and types. The argumentKind property specifies whether arguments are fixed types or any type.

Run PySpark jobs with inline code

Large-scale data transformations often require Spark’s distributed processing, which BigQuery can orchestrate through PySpark procedures.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    spark: {},
});
const pyspark = new gcp.bigquery.Routine("pyspark", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "PYTHON",
    definitionBody: `from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\\"spark-bigquery-demo\\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\\"bigquery\\") \\\\
  .option(\\"table\\", \\"bigquery-public-data:samples.shakespeare\\") \\\\
  .load()
words.createOrReplaceTempView(\\"words\\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\"sum(word_count)\\", \\"sum_word_count\\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\\"bigquery\\") \\\\
  .option(\\"writeMethod\\", \\"direct\\") \\\\
  .save(\\"wordcount_dataset.wordcount_output\\")
`,
    sparkOptions: {
        connection: testConnection.name,
        runtimeVersion: "2.1",
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    spark={})
pyspark = gcp.bigquery.Routine("pyspark",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="PYTHON",
    definition_body="""from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
  .option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
  .load()
words.createOrReplaceTempView(\"words\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
  .option(\"writeMethod\", \"direct\") \\
  .save(\"wordcount_dataset.wordcount_output\")
""",
    spark_options={
        "connection": test_connection.name,
        "runtime_version": "2.1",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId: pulumi.String("connection_id"),
			Location:     pulumi.String("US"),
			Spark:        &bigquery.ConnectionSparkArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "pyspark", &bigquery.RoutineArgs{
			DatasetId:   test.DatasetId,
			RoutineId:   pulumi.String("routine_id"),
			RoutineType: pulumi.String("PROCEDURE"),
			Language:    pulumi.String("PYTHON"),
			DefinitionBody: pulumi.String(`from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
  .option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
  .load()
words.createOrReplaceTempView(\"words\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
  .option(\"writeMethod\", \"direct\") \\
  .save(\"wordcount_dataset.wordcount_output\")
`),
			SparkOptions: &bigquery.RoutineSparkOptionsArgs{
				Connection:     testConnection.Name,
				RuntimeVersion: pulumi.String("2.1"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        Spark = null,
    });

    var pyspark = new Gcp.BigQuery.Routine("pyspark", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "PYTHON",
        DefinitionBody = @"from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\""spark-bigquery-demo\"").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\""bigquery\"") \\
  .option(\""table\"", \""bigquery-public-data:samples.shakespeare\"") \\
  .load()
words.createOrReplaceTempView(\""words\"")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\""sum(word_count)\"", \""sum_word_count\"")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\""bigquery\"") \\
  .option(\""writeMethod\"", \""direct\"") \\
  .save(\""wordcount_dataset.wordcount_output\"")
",
        SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
        {
            Connection = testConnection.Name,
            RuntimeVersion = "2.1",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .spark(ConnectionSparkArgs.builder()
                .build())
            .build());

        var pyspark = new Routine("pyspark", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("PYTHON")
            .definitionBody("""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
  .option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
  .load()
words.createOrReplaceTempView(\"words\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
  .option(\"writeMethod\", \"direct\") \\
  .save(\"wordcount_dataset.wordcount_output\")
            """)
            .sparkOptions(RoutineSparkOptionsArgs.builder()
                .connection(testConnection.name())
                .runtimeVersion("2.1")
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      spark: {}
  pyspark:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: PYTHON
      definitionBody: "from pyspark.sql import SparkSession\n\nspark = SparkSession.builder.appName(\\\"spark-bigquery-demo\\\").getOrCreate()\n    \n# Load data from BigQuery.\nwords = spark.read.format(\\\"bigquery\\\") \\\\\n  .option(\\\"table\\\", \\\"bigquery-public-data:samples.shakespeare\\\") \\\\\n  .load()\nwords.createOrReplaceTempView(\\\"words\\\")\n    \n# Perform word count.\nword_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\\"sum(word_count)\\\", \\\"sum_word_count\\\")\nword_count.show()\nword_count.printSchema()\n    \n# Saving the data to BigQuery\nword_count.write.format(\\\"bigquery\\\") \\\\\n  .option(\\\"writeMethod\\\", \\\"direct\\\") \\\\\n  .save(\\\"wordcount_dataset.wordcount_output\\\")\n"
      sparkOptions:
        connection: ${testConnection.name}
        runtimeVersion: '2.1'

The language property set to PYTHON with sparkOptions enables PySpark execution. The connection property references a BigQuery Connection configured for Spark. The runtimeVersion specifies the Spark runtime. The definitionBody contains the PySpark code that reads from BigQuery, processes data, and writes results back.

Reference external PySpark files from Cloud Storage

Complex Spark jobs often span multiple files and dependencies stored in Cloud Storage rather than inline code.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    spark: {},
});
const pysparkMainfile = new gcp.bigquery.Routine("pyspark_mainfile", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "PYTHON",
    definitionBody: "",
    sparkOptions: {
        connection: testConnection.name,
        runtimeVersion: "2.1",
        mainFileUri: "gs://test-bucket/main.py",
        pyFileUris: ["gs://test-bucket/lib.py"],
        fileUris: ["gs://test-bucket/distribute_in_executor.json"],
        archiveUris: ["gs://test-bucket/distribute_in_executor.tar.gz"],
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    spark={})
pyspark_mainfile = gcp.bigquery.Routine("pyspark_mainfile",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="PYTHON",
    definition_body="",
    spark_options={
        "connection": test_connection.name,
        "runtime_version": "2.1",
        "main_file_uri": "gs://test-bucket/main.py",
        "py_file_uris": ["gs://test-bucket/lib.py"],
        "file_uris": ["gs://test-bucket/distribute_in_executor.json"],
        "archive_uris": ["gs://test-bucket/distribute_in_executor.tar.gz"],
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId: pulumi.String("connection_id"),
			Location:     pulumi.String("US"),
			Spark:        &bigquery.ConnectionSparkArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "pyspark_mainfile", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("PROCEDURE"),
			Language:       pulumi.String("PYTHON"),
			DefinitionBody: pulumi.String(""),
			SparkOptions: &bigquery.RoutineSparkOptionsArgs{
				Connection:     testConnection.Name,
				RuntimeVersion: pulumi.String("2.1"),
				MainFileUri:    pulumi.String("gs://test-bucket/main.py"),
				PyFileUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/lib.py"),
				},
				FileUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/distribute_in_executor.json"),
				},
				ArchiveUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/distribute_in_executor.tar.gz"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        Spark = null,
    });

    var pysparkMainfile = new Gcp.BigQuery.Routine("pyspark_mainfile", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "PYTHON",
        DefinitionBody = "",
        SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
        {
            Connection = testConnection.Name,
            RuntimeVersion = "2.1",
            MainFileUri = "gs://test-bucket/main.py",
            PyFileUris = new[]
            {
                "gs://test-bucket/lib.py",
            },
            FileUris = new[]
            {
                "gs://test-bucket/distribute_in_executor.json",
            },
            ArchiveUris = new[]
            {
                "gs://test-bucket/distribute_in_executor.tar.gz",
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .spark(ConnectionSparkArgs.builder()
                .build())
            .build());

        var pysparkMainfile = new Routine("pysparkMainfile", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("PYTHON")
            .definitionBody("")
            .sparkOptions(RoutineSparkOptionsArgs.builder()
                .connection(testConnection.name())
                .runtimeVersion("2.1")
                .mainFileUri("gs://test-bucket/main.py")
                .pyFileUris("gs://test-bucket/lib.py")
                .fileUris("gs://test-bucket/distribute_in_executor.json")
                .archiveUris("gs://test-bucket/distribute_in_executor.tar.gz")
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      spark: {}
  pysparkMainfile:
    type: gcp:bigquery:Routine
    name: pyspark_mainfile
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: PYTHON
      definitionBody: ""
      sparkOptions:
        connection: ${testConnection.name}
        runtimeVersion: '2.1'
        mainFileUri: gs://test-bucket/main.py
        pyFileUris:
          - gs://test-bucket/lib.py
        fileUris:
          - gs://test-bucket/distribute_in_executor.json
        archiveUris:
          - gs://test-bucket/distribute_in_executor.tar.gz

The mainFileUri property points to the primary Python file in Cloud Storage. The pyFileUris, fileUris, and archiveUris properties reference additional Python modules, data files, and compressed archives. This approach separates code from infrastructure configuration and enables code reuse across routines.

Execute Scala Spark jobs from JAR files

JVM-based Spark applications written in Scala or Java run from compiled JARs with custom container images and runtime properties.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    spark: {},
});
const sparkJar = new gcp.bigquery.Routine("spark_jar", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "SCALA",
    definitionBody: "",
    sparkOptions: {
        connection: testConnection.name,
        runtimeVersion: "2.1",
        containerImage: "gcr.io/my-project-id/my-spark-image:latest",
        mainClass: "com.google.test.jar.MainClass",
        jarUris: ["gs://test-bucket/uberjar_spark_spark3.jar"],
        properties: {
            "spark.dataproc.scaling.version": "2",
            "spark.reducer.fetchMigratedShuffle.enabled": "true",
        },
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    spark={})
spark_jar = gcp.bigquery.Routine("spark_jar",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="SCALA",
    definition_body="",
    spark_options={
        "connection": test_connection.name,
        "runtime_version": "2.1",
        "container_image": "gcr.io/my-project-id/my-spark-image:latest",
        "main_class": "com.google.test.jar.MainClass",
        "jar_uris": ["gs://test-bucket/uberjar_spark_spark3.jar"],
        "properties": {
            "spark.dataproc.scaling.version": "2",
            "spark.reducer.fetchMigratedShuffle.enabled": "true",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId: pulumi.String("connection_id"),
			Location:     pulumi.String("US"),
			Spark:        &bigquery.ConnectionSparkArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "spark_jar", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("PROCEDURE"),
			Language:       pulumi.String("SCALA"),
			DefinitionBody: pulumi.String(""),
			SparkOptions: &bigquery.RoutineSparkOptionsArgs{
				Connection:     testConnection.Name,
				RuntimeVersion: pulumi.String("2.1"),
				ContainerImage: pulumi.String("gcr.io/my-project-id/my-spark-image:latest"),
				MainClass:      pulumi.String("com.google.test.jar.MainClass"),
				JarUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/uberjar_spark_spark3.jar"),
				},
				Properties: pulumi.StringMap{
					"spark.dataproc.scaling.version":             pulumi.String("2"),
					"spark.reducer.fetchMigratedShuffle.enabled": pulumi.String("true"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        Spark = null,
    });

    var sparkJar = new Gcp.BigQuery.Routine("spark_jar", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "SCALA",
        DefinitionBody = "",
        SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
        {
            Connection = testConnection.Name,
            RuntimeVersion = "2.1",
            ContainerImage = "gcr.io/my-project-id/my-spark-image:latest",
            MainClass = "com.google.test.jar.MainClass",
            JarUris = new[]
            {
                "gs://test-bucket/uberjar_spark_spark3.jar",
            },
            Properties = 
            {
                { "spark.dataproc.scaling.version", "2" },
                { "spark.reducer.fetchMigratedShuffle.enabled", "true" },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .spark(ConnectionSparkArgs.builder()
                .build())
            .build());

        var sparkJar = new Routine("sparkJar", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("SCALA")
            .definitionBody("")
            .sparkOptions(RoutineSparkOptionsArgs.builder()
                .connection(testConnection.name())
                .runtimeVersion("2.1")
                .containerImage("gcr.io/my-project-id/my-spark-image:latest")
                .mainClass("com.google.test.jar.MainClass")
                .jarUris("gs://test-bucket/uberjar_spark_spark3.jar")
                .properties(Map.ofEntries(
                    Map.entry("spark.dataproc.scaling.version", "2"),
                    Map.entry("spark.reducer.fetchMigratedShuffle.enabled", "true")
                ))
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      spark: {}
  sparkJar:
    type: gcp:bigquery:Routine
    name: spark_jar
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: SCALA
      definitionBody: ""
      sparkOptions:
        connection: ${testConnection.name}
        runtimeVersion: '2.1'
        containerImage: gcr.io/my-project-id/my-spark-image:latest
        mainClass: com.google.test.jar.MainClass
        jarUris:
          - gs://test-bucket/uberjar_spark_spark3.jar
        properties:
          spark.dataproc.scaling.version: '2'
          spark.reducer.fetchMigratedShuffle.enabled: 'true'

The language SCALA with jarUris points to compiled JAR files in Cloud Storage. The mainClass property specifies the entry point. The containerImage property references a custom container with required dependencies. The properties map passes Spark configuration options to the runtime.

Create custom data masking functions

Organizations implementing column-level security need custom masking logic that BigQuery can apply automatically to sensitive fields.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "tf_test_dataset_id_22811"});
const customMaskingRoutine = new gcp.bigquery.Routine("custom_masking_routine", {
    datasetId: test.datasetId,
    routineId: "custom_masking_routine",
    routineType: "SCALAR_FUNCTION",
    language: "SQL",
    dataGovernanceType: "DATA_MASKING",
    definitionBody: "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
    arguments: [{
        name: "ssn",
        dataType: "{\"typeKind\" :  \"STRING\"}",
    }],
    returnType: "{\"typeKind\" :  \"STRING\"}",
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="tf_test_dataset_id_22811")
custom_masking_routine = gcp.bigquery.Routine("custom_masking_routine",
    dataset_id=test.dataset_id,
    routine_id="custom_masking_routine",
    routine_type="SCALAR_FUNCTION",
    language="SQL",
    data_governance_type="DATA_MASKING",
    definition_body="SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
    arguments=[{
        "name": "ssn",
        "data_type": "{\"typeKind\" :  \"STRING\"}",
    }],
    return_type="{\"typeKind\" :  \"STRING\"}")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("tf_test_dataset_id_22811"),
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "custom_masking_routine", &bigquery.RoutineArgs{
			DatasetId:          test.DatasetId,
			RoutineId:          pulumi.String("custom_masking_routine"),
			RoutineType:        pulumi.String("SCALAR_FUNCTION"),
			Language:           pulumi.String("SQL"),
			DataGovernanceType: pulumi.String("DATA_MASKING"),
			DefinitionBody:     pulumi.String("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')"),
			Arguments: bigquery.RoutineArgumentArray{
				&bigquery.RoutineArgumentArgs{
					Name:     pulumi.String("ssn"),
					DataType: pulumi.String("{\"typeKind\" :  \"STRING\"}"),
				},
			},
			ReturnType: pulumi.String("{\"typeKind\" :  \"STRING\"}"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "tf_test_dataset_id_22811",
    });

    var customMaskingRoutine = new Gcp.BigQuery.Routine("custom_masking_routine", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "custom_masking_routine",
        RoutineType = "SCALAR_FUNCTION",
        Language = "SQL",
        DataGovernanceType = "DATA_MASKING",
        DefinitionBody = "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
        Arguments = new[]
        {
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "ssn",
                DataType = "{\"typeKind\" :  \"STRING\"}",
            },
        },
        ReturnType = "{\"typeKind\" :  \"STRING\"}",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("tf_test_dataset_id_22811")
            .build());

        var customMaskingRoutine = new Routine("customMaskingRoutine", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("custom_masking_routine")
            .routineType("SCALAR_FUNCTION")
            .language("SQL")
            .dataGovernanceType("DATA_MASKING")
            .definitionBody("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')")
            .arguments(RoutineArgumentArgs.builder()
                .name("ssn")
                .dataType("{\"typeKind\" :  \"STRING\"}")
                .build())
            .returnType("{\"typeKind\" :  \"STRING\"}")
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: tf_test_dataset_id_22811
  customMaskingRoutine:
    type: gcp:bigquery:Routine
    name: custom_masking_routine
    properties:
      datasetId: ${test.datasetId}
      routineId: custom_masking_routine
      routineType: SCALAR_FUNCTION
      language: SQL
      dataGovernanceType: DATA_MASKING
      definitionBody: SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')
      arguments:
        - name: ssn
          dataType: '{"typeKind" :  "STRING"}'
      returnType: '{"typeKind" :  "STRING"}'

The dataGovernanceType property set to DATA_MASKING registers this function for use in column-level security policies. BigQuery validates the function and makes it available for masking sensitive data. The definitionBody implements the masking logic, here using regex to replace digits with ‘X’.

Call external APIs from BigQuery queries

Queries sometimes need to enrich data by calling external services like Cloud Functions or external APIs during query execution.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    cloudResource: {},
});
const remoteFunction = new gcp.bigquery.Routine("remote_function", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "SCALAR_FUNCTION",
    definitionBody: "",
    returnType: "{\"typeKind\" :  \"STRING\"}",
    remoteFunctionOptions: {
        endpoint: "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
        connection: testConnection.name,
        maxBatchingRows: "10",
        userDefinedContext: {
            z: "1.5",
        },
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    cloud_resource={})
remote_function = gcp.bigquery.Routine("remote_function",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="SCALAR_FUNCTION",
    definition_body="",
    return_type="{\"typeKind\" :  \"STRING\"}",
    remote_function_options={
        "endpoint": "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
        "connection": test_connection.name,
        "max_batching_rows": "10",
        "user_defined_context": {
            "z": "1.5",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId:  pulumi.String("connection_id"),
			Location:      pulumi.String("US"),
			CloudResource: &bigquery.ConnectionCloudResourceArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "remote_function", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("SCALAR_FUNCTION"),
			DefinitionBody: pulumi.String(""),
			ReturnType:     pulumi.String("{\"typeKind\" :  \"STRING\"}"),
			RemoteFunctionOptions: &bigquery.RoutineRemoteFunctionOptionsArgs{
				Endpoint:        pulumi.String("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add"),
				Connection:      testConnection.Name,
				MaxBatchingRows: pulumi.String("10"),
				UserDefinedContext: pulumi.StringMap{
					"z": pulumi.String("1.5"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        CloudResource = null,
    });

    var remoteFunction = new Gcp.BigQuery.Routine("remote_function", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "SCALAR_FUNCTION",
        DefinitionBody = "",
        ReturnType = "{\"typeKind\" :  \"STRING\"}",
        RemoteFunctionOptions = new Gcp.BigQuery.Inputs.RoutineRemoteFunctionOptionsArgs
        {
            Endpoint = "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
            Connection = testConnection.Name,
            MaxBatchingRows = "10",
            UserDefinedContext = 
            {
                { "z", "1.5" },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionCloudResourceArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineRemoteFunctionOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .cloudResource(ConnectionCloudResourceArgs.builder()
                .build())
            .build());

        var remoteFunction = new Routine("remoteFunction", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("SCALAR_FUNCTION")
            .definitionBody("")
            .returnType("{\"typeKind\" :  \"STRING\"}")
            .remoteFunctionOptions(RoutineRemoteFunctionOptionsArgs.builder()
                .endpoint("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add")
                .connection(testConnection.name())
                .maxBatchingRows("10")
                .userDefinedContext(Map.of("z", "1.5"))
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      cloudResource: {}
  remoteFunction:
    type: gcp:bigquery:Routine
    name: remote_function
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: SCALAR_FUNCTION
      definitionBody: ""
      returnType: '{"typeKind" :  "STRING"}'
      remoteFunctionOptions:
        endpoint: https://us-east1-my_gcf_project.cloudfunctions.net/remote_add
        connection: ${testConnection.name}
        maxBatchingRows: '10'
        userDefinedContext:
          z: '1.5'

The remoteFunctionOptions property configures external service integration. The endpoint property points to a Cloud Function or API endpoint. The connection property references a BigQuery Connection for Cloud Resource. The maxBatchingRows property controls how many rows BigQuery sends per request. The userDefinedContext property passes custom parameters to the endpoint.

Beyond these examples

These snippets focus on specific routine-level features: SQL, JavaScript, and Python function types, PySpark and Scala Spark procedures, and remote function integration and data masking. They’re intentionally minimal rather than full data processing pipelines.

The examples may reference pre-existing infrastructure such as BigQuery datasets, BigQuery Connections (for Spark and remote functions), Cloud Storage buckets (for Spark files), and container images and Cloud Functions (for Spark and remote functions). They focus on configuring the routine rather than provisioning everything around it.

To keep things focused, common routine patterns are omitted, including:

  • Security mode configuration (DEFINER vs INVOKER)
  • Determinism level for JavaScript functions
  • Imported JavaScript libraries
  • External runtime options for Python UDFs

These omissions are intentional: the goal is to illustrate how each routine feature is wired, not provide drop-in data processing modules. See the BigQuery Routine resource reference for all available configuration options.

Let's create GCP BigQuery Routines

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Common Issues & Limitations
Why am I seeing perpetual diffs on my routine's returnType?
The returnType field expects a JSON string, so any changes to the string create diffs even if the JSON structure is unchanged. The API may return different JSON formatting (e.g., switched order of values or STRUCT vs RECORD field types). Use the schema as returned by the API to avoid recurring diffs.
What properties can't I change after creating a routine?
The datasetId, routineId, project, and routineType properties are immutable. Changing any of these requires recreating the routine.
Routine Types & Configuration
What routine types can I create in BigQuery?

You can create three types:

  1. SCALAR_FUNCTION - Returns a single value
  2. PROCEDURE - Performs actions without returning a value
  3. TABLE_VALUED_FUNCTION - Returns a table
What languages are supported for BigQuery routines?
BigQuery routines support SQL, JAVASCRIPT, PYTHON, JAVA, and SCALA. The language choice affects which features are available (e.g., Spark options require PYTHON, JAVA, or SCALA).
When is returnType required?
The returnType is optional if language is SQL (inferred from definitionBody at query time), but required for all other languages (JAVASCRIPT, PYTHON, JAVA, SCALA).
When can definitionBody be empty?
The definitionBody can be an empty string for remote functions or Spark routines that use mainFileUri to specify the code location.
What's the difference between DEFINER and INVOKER security modes?
The securityMode determines whose permissions are used when executing the routine. If not specified, BigQuery automatically determines the security mode from the routine’s configuration.
Advanced Features
How do I create a PySpark routine in BigQuery?
Set language to PYTHON, routineType to PROCEDURE, and configure sparkOptions with a connection (to a BigQuery Connection resource) and runtimeVersion. You can either provide code in definitionBody or use mainFileUri to reference a GCS file.
How do I create a Spark routine using Scala or Java?
Set language to SCALA or JAVA, routineType to PROCEDURE, and configure sparkOptions with mainClass and jarUris pointing to your compiled JAR files in GCS.
How do I create a remote function?
Set routineType to SCALAR_FUNCTION, leave definitionBody empty, and configure remoteFunctionOptions with an endpoint (Cloud Function URL) and connection (BigQuery Connection resource).
How do I create a custom data masking function?
Set dataGovernanceType to DATA_MASKING, routineType to SCALAR_FUNCTION, and provide the masking logic in definitionBody. The function will be validated and made available as a masking function.
When do I need to specify returnTableType?
The returnTableType can only be set when routineType is TABLE_VALUED_FUNCTION. If absent, the return table type is inferred from definitionBody at query time. If present, the result columns are cast to match the specified types.

Using a different cloud?

Explore analytics guides for other cloud providers: