Create GCP BigQuery Routines

The gcp:bigquery/routine:Routine resource, part of the Pulumi GCP provider, defines user-defined functions and stored procedures within BigQuery datasets: their language, execution logic, and integration points. This guide focuses on four capabilities: SQL and JavaScript scalar functions, table-valued functions, Spark procedures (PySpark and Scala), and remote function calls and data masking.

Routines belong to datasets and may reference BigQuery connections, Cloud Storage files, container images, or external HTTP endpoints. The examples are intentionally small. Combine them with your own datasets, connections, and external resources.

Create a SQL stored procedure

Data teams often need to encapsulate multi-step SQL logic into reusable procedures that can be called from queries or scheduled jobs.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "SQL",
    securityMode: "INVOKER",
    definitionBody: "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="SQL",
    security_mode="INVOKER",
    definition_body="CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("PROCEDURE"),
			Language:       pulumi.String("SQL"),
			SecurityMode:   pulumi.String("INVOKER"),
			DefinitionBody: pulumi.String("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var sproc = new Gcp.BigQuery.Routine("sproc", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "SQL",
        SecurityMode = "INVOKER",
        DefinitionBody = "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var sproc = new Routine("sproc", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("SQL")
            .securityMode("INVOKER")
            .definitionBody("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  sproc:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: SQL
      securityMode: INVOKER
      definitionBody: CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);

The routineType property set to PROCEDURE defines this as a stored procedure rather than a function. The definitionBody contains the SQL logic to execute. The securityMode property controls whether the routine runs with the caller’s permissions (INVOKER) or the definer’s permissions (DEFINER).

Define a JavaScript scalar function with typed arguments

When SQL expressions aren’t sufficient, JavaScript functions provide custom logic with explicit type contracts for inputs and outputs.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "SCALAR_FUNCTION",
    language: "JAVASCRIPT",
    definitionBody: "CREATE FUNCTION multiplyInputs return x*y;",
    arguments: [
        {
            name: "x",
            dataType: "{\"typeKind\" :  \"FLOAT64\"}",
        },
        {
            name: "y",
            dataType: "{\"typeKind\" :  \"FLOAT64\"}",
        },
    ],
    returnType: "{\"typeKind\" :  \"FLOAT64\"}",
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="SCALAR_FUNCTION",
    language="JAVASCRIPT",
    definition_body="CREATE FUNCTION multiplyInputs return x*y;",
    arguments=[
        {
            "name": "x",
            "data_type": "{\"typeKind\" :  \"FLOAT64\"}",
        },
        {
            "name": "y",
            "data_type": "{\"typeKind\" :  \"FLOAT64\"}",
        },
    ],
    return_type="{\"typeKind\" :  \"FLOAT64\"}")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("SCALAR_FUNCTION"),
			Language:       pulumi.String("JAVASCRIPT"),
			DefinitionBody: pulumi.String("CREATE FUNCTION multiplyInputs return x*y;"),
			Arguments: bigquery.RoutineArgumentArray{
				&bigquery.RoutineArgumentArgs{
					Name:     pulumi.String("x"),
					DataType: pulumi.String("{\"typeKind\" :  \"FLOAT64\"}"),
				},
				&bigquery.RoutineArgumentArgs{
					Name:     pulumi.String("y"),
					DataType: pulumi.String("{\"typeKind\" :  \"FLOAT64\"}"),
				},
			},
			ReturnType: pulumi.String("{\"typeKind\" :  \"FLOAT64\"}"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var sproc = new Gcp.BigQuery.Routine("sproc", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "SCALAR_FUNCTION",
        Language = "JAVASCRIPT",
        DefinitionBody = "CREATE FUNCTION multiplyInputs return x*y;",
        Arguments = new[]
        {
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "x",
                DataType = "{\"typeKind\" :  \"FLOAT64\"}",
            },
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "y",
                DataType = "{\"typeKind\" :  \"FLOAT64\"}",
            },
        },
        ReturnType = "{\"typeKind\" :  \"FLOAT64\"}",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var sproc = new Routine("sproc", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("SCALAR_FUNCTION")
            .language("JAVASCRIPT")
            .definitionBody("CREATE FUNCTION multiplyInputs return x*y;")
            .arguments(            
                RoutineArgumentArgs.builder()
                    .name("x")
                    .dataType("{\"typeKind\" :  \"FLOAT64\"}")
                    .build(),
                RoutineArgumentArgs.builder()
                    .name("y")
                    .dataType("{\"typeKind\" :  \"FLOAT64\"}")
                    .build())
            .returnType("{\"typeKind\" :  \"FLOAT64\"}")
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  sproc:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: SCALAR_FUNCTION
      language: JAVASCRIPT
      definitionBody: CREATE FUNCTION multiplyInputs return x*y;
      arguments:
        - name: x
          dataType: '{"typeKind" :  "FLOAT64"}'
        - name: y
          dataType: '{"typeKind" :  "FLOAT64"}'
      returnType: '{"typeKind" :  "FLOAT64"}'

The language property switches to JAVASCRIPT, and the definitionBody contains JavaScript code. The arguments array defines input parameters with explicit dataType specifications in JSON format. The returnType property specifies the output type, also in JSON format. BigQuery validates these types at query time.

Return table results from a function

Table-valued functions transform input parameters into result sets, enabling reusable query logic that returns multiple rows and columns.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "TABLE_VALUED_FUNCTION",
    language: "SQL",
    definitionBody: "SELECT 1 + value AS value\n",
    arguments: [{
        name: "value",
        argumentKind: "FIXED_TYPE",
        dataType: JSON.stringify({
            typeKind: "INT64",
        }),
    }],
    returnTableType: JSON.stringify({
        columns: [{
            name: "value",
            type: {
                typeKind: "INT64",
            },
        }],
    }),
});
import pulumi
import json
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="TABLE_VALUED_FUNCTION",
    language="SQL",
    definition_body="SELECT 1 + value AS value\n",
    arguments=[{
        "name": "value",
        "argument_kind": "FIXED_TYPE",
        "data_type": json.dumps({
            "typeKind": "INT64",
        }),
    }],
    return_table_type=json.dumps({
        "columns": [{
            "name": "value",
            "type": {
                "typeKind": "INT64",
            },
        }],
    }))
package main

import (
	"encoding/json"

	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		tmpJSON0, err := json.Marshal(map[string]interface{}{
			"typeKind": "INT64",
		})
		if err != nil {
			return err
		}
		json0 := string(tmpJSON0)
		tmpJSON1, err := json.Marshal(map[string]interface{}{
			"columns": []map[string]interface{}{
				map[string]interface{}{
					"name": "value",
					"type": map[string]interface{}{
						"typeKind": "INT64",
					},
				},
			},
		})
		if err != nil {
			return err
		}
		json1 := string(tmpJSON1)
		_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("TABLE_VALUED_FUNCTION"),
			Language:       pulumi.String("SQL"),
			DefinitionBody: pulumi.String("SELECT 1 + value AS value\n"),
			Arguments: bigquery.RoutineArgumentArray{
				&bigquery.RoutineArgumentArgs{
					Name:         pulumi.String("value"),
					ArgumentKind: pulumi.String("FIXED_TYPE"),
					DataType:     pulumi.String(json0),
				},
			},
			ReturnTableType: pulumi.String(json1),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var sproc = new Gcp.BigQuery.Routine("sproc", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "TABLE_VALUED_FUNCTION",
        Language = "SQL",
        DefinitionBody = @"SELECT 1 + value AS value
",
        Arguments = new[]
        {
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "value",
                ArgumentKind = "FIXED_TYPE",
                DataType = JsonSerializer.Serialize(new Dictionary<string, object?>
                {
                    ["typeKind"] = "INT64",
                }),
            },
        },
        ReturnTableType = JsonSerializer.Serialize(new Dictionary<string, object?>
        {
            ["columns"] = new[]
            {
                new Dictionary<string, object?>
                {
                    ["name"] = "value",
                    ["type"] = new Dictionary<string, object?>
                    {
                        ["typeKind"] = "INT64",
                    },
                },
            },
        }),
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var sproc = new Routine("sproc", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("TABLE_VALUED_FUNCTION")
            .language("SQL")
            .definitionBody("""
SELECT 1 + value AS value
            """)
            .arguments(RoutineArgumentArgs.builder()
                .name("value")
                .argumentKind("FIXED_TYPE")
                .dataType(serializeJson(
                    jsonObject(
                        jsonProperty("typeKind", "INT64")
                    )))
                .build())
            .returnTableType(serializeJson(
                jsonObject(
                    jsonProperty("columns", jsonArray(jsonObject(
                        jsonProperty("name", "value"),
                        jsonProperty("type", jsonObject(
                            jsonProperty("typeKind", "INT64")
                        ))
                    )))
                )))
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  sproc:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: TABLE_VALUED_FUNCTION
      language: SQL
      definitionBody: |
        SELECT 1 + value AS value        
      arguments:
        - name: value
          argumentKind: FIXED_TYPE
          dataType:
            fn::toJSON:
              typeKind: INT64
      returnTableType:
        fn::toJSON:
          columns:
            - name: value
              type:
                typeKind: INT64

Setting routineType to TABLE_VALUED_FUNCTION changes the return behavior from a single value to a table. The returnTableType property defines the output schema as JSON, specifying column names and types. The definitionBody contains a SQL query that produces the result set.

Run PySpark jobs with inline code

Large-scale data transformations often require Spark’s distributed processing, which BigQuery can orchestrate through stored procedures.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    spark: {},
});
const pyspark = new gcp.bigquery.Routine("pyspark", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "PYTHON",
    definitionBody: `from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\\"spark-bigquery-demo\\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\\"bigquery\\") \\\\
  .option(\\"table\\", \\"bigquery-public-data:samples.shakespeare\\") \\\\
  .load()
words.createOrReplaceTempView(\\"words\\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\"sum(word_count)\\", \\"sum_word_count\\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\\"bigquery\\") \\\\
  .option(\\"writeMethod\\", \\"direct\\") \\\\
  .save(\\"wordcount_dataset.wordcount_output\\")
`,
    sparkOptions: {
        connection: testConnection.name,
        runtimeVersion: "2.1",
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    spark={})
pyspark = gcp.bigquery.Routine("pyspark",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="PYTHON",
    definition_body="""from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
  .option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
  .load()
words.createOrReplaceTempView(\"words\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
  .option(\"writeMethod\", \"direct\") \\
  .save(\"wordcount_dataset.wordcount_output\")
""",
    spark_options={
        "connection": test_connection.name,
        "runtime_version": "2.1",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId: pulumi.String("connection_id"),
			Location:     pulumi.String("US"),
			Spark:        &bigquery.ConnectionSparkArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "pyspark", &bigquery.RoutineArgs{
			DatasetId:   test.DatasetId,
			RoutineId:   pulumi.String("routine_id"),
			RoutineType: pulumi.String("PROCEDURE"),
			Language:    pulumi.String("PYTHON"),
			DefinitionBody: pulumi.String(`from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
  .option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
  .load()
words.createOrReplaceTempView(\"words\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
  .option(\"writeMethod\", \"direct\") \\
  .save(\"wordcount_dataset.wordcount_output\")
`),
			SparkOptions: &bigquery.RoutineSparkOptionsArgs{
				Connection:     testConnection.Name,
				RuntimeVersion: pulumi.String("2.1"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        Spark = null,
    });

    var pyspark = new Gcp.BigQuery.Routine("pyspark", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "PYTHON",
        DefinitionBody = @"from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\""spark-bigquery-demo\"").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\""bigquery\"") \\
  .option(\""table\"", \""bigquery-public-data:samples.shakespeare\"") \\
  .load()
words.createOrReplaceTempView(\""words\"")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\""sum(word_count)\"", \""sum_word_count\"")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\""bigquery\"") \\
  .option(\""writeMethod\"", \""direct\"") \\
  .save(\""wordcount_dataset.wordcount_output\"")
",
        SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
        {
            Connection = testConnection.Name,
            RuntimeVersion = "2.1",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .spark(ConnectionSparkArgs.builder()
                .build())
            .build());

        var pyspark = new Routine("pyspark", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("PYTHON")
            .definitionBody("""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
    
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
  .option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
  .load()
words.createOrReplaceTempView(\"words\")
    
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
    
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
  .option(\"writeMethod\", \"direct\") \\
  .save(\"wordcount_dataset.wordcount_output\")
            """)
            .sparkOptions(RoutineSparkOptionsArgs.builder()
                .connection(testConnection.name())
                .runtimeVersion("2.1")
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      spark: {}
  pyspark:
    type: gcp:bigquery:Routine
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: PYTHON
      definitionBody: "from pyspark.sql import SparkSession\n\nspark = SparkSession.builder.appName(\\\"spark-bigquery-demo\\\").getOrCreate()\n    \n# Load data from BigQuery.\nwords = spark.read.format(\\\"bigquery\\\") \\\\\n  .option(\\\"table\\\", \\\"bigquery-public-data:samples.shakespeare\\\") \\\\\n  .load()\nwords.createOrReplaceTempView(\\\"words\\\")\n    \n# Perform word count.\nword_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\\"sum(word_count)\\\", \\\"sum_word_count\\\")\nword_count.show()\nword_count.printSchema()\n    \n# Saving the data to BigQuery\nword_count.write.format(\\\"bigquery\\\") \\\\\n  .option(\\\"writeMethod\\\", \\\"direct\\\") \\\\\n  .save(\\\"wordcount_dataset.wordcount_output\\\")\n"
      sparkOptions:
        connection: ${testConnection.name}
        runtimeVersion: '2.1'

The sparkOptions block enables Spark execution by specifying a connection to a BigQuery Spark resource and a runtimeVersion. The definitionBody contains PySpark code that reads from BigQuery, performs transformations, and writes results back. BigQuery manages the Spark cluster lifecycle automatically.

Reference external PySpark files from Cloud Storage

Production Spark jobs typically live in version-controlled files rather than inline code, requiring references to GCS-hosted scripts and dependencies.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    spark: {},
});
const pysparkMainfile = new gcp.bigquery.Routine("pyspark_mainfile", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "PYTHON",
    definitionBody: "",
    sparkOptions: {
        connection: testConnection.name,
        runtimeVersion: "2.1",
        mainFileUri: "gs://test-bucket/main.py",
        pyFileUris: ["gs://test-bucket/lib.py"],
        fileUris: ["gs://test-bucket/distribute_in_executor.json"],
        archiveUris: ["gs://test-bucket/distribute_in_executor.tar.gz"],
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    spark={})
pyspark_mainfile = gcp.bigquery.Routine("pyspark_mainfile",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="PYTHON",
    definition_body="",
    spark_options={
        "connection": test_connection.name,
        "runtime_version": "2.1",
        "main_file_uri": "gs://test-bucket/main.py",
        "py_file_uris": ["gs://test-bucket/lib.py"],
        "file_uris": ["gs://test-bucket/distribute_in_executor.json"],
        "archive_uris": ["gs://test-bucket/distribute_in_executor.tar.gz"],
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId: pulumi.String("connection_id"),
			Location:     pulumi.String("US"),
			Spark:        &bigquery.ConnectionSparkArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "pyspark_mainfile", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("PROCEDURE"),
			Language:       pulumi.String("PYTHON"),
			DefinitionBody: pulumi.String(""),
			SparkOptions: &bigquery.RoutineSparkOptionsArgs{
				Connection:     testConnection.Name,
				RuntimeVersion: pulumi.String("2.1"),
				MainFileUri:    pulumi.String("gs://test-bucket/main.py"),
				PyFileUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/lib.py"),
				},
				FileUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/distribute_in_executor.json"),
				},
				ArchiveUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/distribute_in_executor.tar.gz"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        Spark = null,
    });

    var pysparkMainfile = new Gcp.BigQuery.Routine("pyspark_mainfile", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "PYTHON",
        DefinitionBody = "",
        SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
        {
            Connection = testConnection.Name,
            RuntimeVersion = "2.1",
            MainFileUri = "gs://test-bucket/main.py",
            PyFileUris = new[]
            {
                "gs://test-bucket/lib.py",
            },
            FileUris = new[]
            {
                "gs://test-bucket/distribute_in_executor.json",
            },
            ArchiveUris = new[]
            {
                "gs://test-bucket/distribute_in_executor.tar.gz",
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .spark(ConnectionSparkArgs.builder()
                .build())
            .build());

        var pysparkMainfile = new Routine("pysparkMainfile", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("PYTHON")
            .definitionBody("")
            .sparkOptions(RoutineSparkOptionsArgs.builder()
                .connection(testConnection.name())
                .runtimeVersion("2.1")
                .mainFileUri("gs://test-bucket/main.py")
                .pyFileUris("gs://test-bucket/lib.py")
                .fileUris("gs://test-bucket/distribute_in_executor.json")
                .archiveUris("gs://test-bucket/distribute_in_executor.tar.gz")
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      spark: {}
  pysparkMainfile:
    type: gcp:bigquery:Routine
    name: pyspark_mainfile
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: PYTHON
      definitionBody: ""
      sparkOptions:
        connection: ${testConnection.name}
        runtimeVersion: '2.1'
        mainFileUri: gs://test-bucket/main.py
        pyFileUris:
          - gs://test-bucket/lib.py
        fileUris:
          - gs://test-bucket/distribute_in_executor.json
        archiveUris:
          - gs://test-bucket/distribute_in_executor.tar.gz

The mainFileUri property points to the primary Python script in Cloud Storage. The pyFileUris, fileUris, and archiveUris properties reference additional dependencies that Spark distributes to executors. When definitionBody is empty, BigQuery executes the main file directly.

Execute Scala Spark jobs from JAR files

Teams with existing Scala Spark applications can run them in BigQuery by referencing compiled JARs and specifying the main class.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    spark: {},
});
const sparkJar = new gcp.bigquery.Routine("spark_jar", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "PROCEDURE",
    language: "SCALA",
    definitionBody: "",
    sparkOptions: {
        connection: testConnection.name,
        runtimeVersion: "2.1",
        containerImage: "gcr.io/my-project-id/my-spark-image:latest",
        mainClass: "com.google.test.jar.MainClass",
        jarUris: ["gs://test-bucket/uberjar_spark_spark3.jar"],
        properties: {
            "spark.dataproc.scaling.version": "2",
            "spark.reducer.fetchMigratedShuffle.enabled": "true",
        },
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    spark={})
spark_jar = gcp.bigquery.Routine("spark_jar",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="PROCEDURE",
    language="SCALA",
    definition_body="",
    spark_options={
        "connection": test_connection.name,
        "runtime_version": "2.1",
        "container_image": "gcr.io/my-project-id/my-spark-image:latest",
        "main_class": "com.google.test.jar.MainClass",
        "jar_uris": ["gs://test-bucket/uberjar_spark_spark3.jar"],
        "properties": {
            "spark.dataproc.scaling.version": "2",
            "spark.reducer.fetchMigratedShuffle.enabled": "true",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId: pulumi.String("connection_id"),
			Location:     pulumi.String("US"),
			Spark:        &bigquery.ConnectionSparkArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "spark_jar", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("PROCEDURE"),
			Language:       pulumi.String("SCALA"),
			DefinitionBody: pulumi.String(""),
			SparkOptions: &bigquery.RoutineSparkOptionsArgs{
				Connection:     testConnection.Name,
				RuntimeVersion: pulumi.String("2.1"),
				ContainerImage: pulumi.String("gcr.io/my-project-id/my-spark-image:latest"),
				MainClass:      pulumi.String("com.google.test.jar.MainClass"),
				JarUris: pulumi.StringArray{
					pulumi.String("gs://test-bucket/uberjar_spark_spark3.jar"),
				},
				Properties: pulumi.StringMap{
					"spark.dataproc.scaling.version":             pulumi.String("2"),
					"spark.reducer.fetchMigratedShuffle.enabled": pulumi.String("true"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        Spark = null,
    });

    var sparkJar = new Gcp.BigQuery.Routine("spark_jar", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "PROCEDURE",
        Language = "SCALA",
        DefinitionBody = "",
        SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
        {
            Connection = testConnection.Name,
            RuntimeVersion = "2.1",
            ContainerImage = "gcr.io/my-project-id/my-spark-image:latest",
            MainClass = "com.google.test.jar.MainClass",
            JarUris = new[]
            {
                "gs://test-bucket/uberjar_spark_spark3.jar",
            },
            Properties = 
            {
                { "spark.dataproc.scaling.version", "2" },
                { "spark.reducer.fetchMigratedShuffle.enabled", "true" },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .spark(ConnectionSparkArgs.builder()
                .build())
            .build());

        var sparkJar = new Routine("sparkJar", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("PROCEDURE")
            .language("SCALA")
            .definitionBody("")
            .sparkOptions(RoutineSparkOptionsArgs.builder()
                .connection(testConnection.name())
                .runtimeVersion("2.1")
                .containerImage("gcr.io/my-project-id/my-spark-image:latest")
                .mainClass("com.google.test.jar.MainClass")
                .jarUris("gs://test-bucket/uberjar_spark_spark3.jar")
                .properties(Map.ofEntries(
                    Map.entry("spark.dataproc.scaling.version", "2"),
                    Map.entry("spark.reducer.fetchMigratedShuffle.enabled", "true")
                ))
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      spark: {}
  sparkJar:
    type: gcp:bigquery:Routine
    name: spark_jar
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: PROCEDURE
      language: SCALA
      definitionBody: ""
      sparkOptions:
        connection: ${testConnection.name}
        runtimeVersion: '2.1'
        containerImage: gcr.io/my-project-id/my-spark-image:latest
        mainClass: com.google.test.jar.MainClass
        jarUris:
          - gs://test-bucket/uberjar_spark_spark3.jar
        properties:
          spark.dataproc.scaling.version: '2'
          spark.reducer.fetchMigratedShuffle.enabled: 'true'

The language switches to SCALA, and the mainClass property identifies the entry point in your JAR. The jarUris property lists JARs to load. The containerImage property specifies a custom container for the Spark runtime. The properties map passes Spark configuration options.

Create custom data masking functions

Data governance policies often require custom masking logic beyond BigQuery’s built-in functions, such as format-preserving transformations.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "tf_test_dataset_id_81126"});
const customMaskingRoutine = new gcp.bigquery.Routine("custom_masking_routine", {
    datasetId: test.datasetId,
    routineId: "custom_masking_routine",
    routineType: "SCALAR_FUNCTION",
    language: "SQL",
    dataGovernanceType: "DATA_MASKING",
    definitionBody: "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
    arguments: [{
        name: "ssn",
        dataType: "{\"typeKind\" :  \"STRING\"}",
    }],
    returnType: "{\"typeKind\" :  \"STRING\"}",
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="tf_test_dataset_id_81126")
custom_masking_routine = gcp.bigquery.Routine("custom_masking_routine",
    dataset_id=test.dataset_id,
    routine_id="custom_masking_routine",
    routine_type="SCALAR_FUNCTION",
    language="SQL",
    data_governance_type="DATA_MASKING",
    definition_body="SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
    arguments=[{
        "name": "ssn",
        "data_type": "{\"typeKind\" :  \"STRING\"}",
    }],
    return_type="{\"typeKind\" :  \"STRING\"}")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("tf_test_dataset_id_81126"),
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "custom_masking_routine", &bigquery.RoutineArgs{
			DatasetId:          test.DatasetId,
			RoutineId:          pulumi.String("custom_masking_routine"),
			RoutineType:        pulumi.String("SCALAR_FUNCTION"),
			Language:           pulumi.String("SQL"),
			DataGovernanceType: pulumi.String("DATA_MASKING"),
			DefinitionBody:     pulumi.String("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')"),
			Arguments: bigquery.RoutineArgumentArray{
				&bigquery.RoutineArgumentArgs{
					Name:     pulumi.String("ssn"),
					DataType: pulumi.String("{\"typeKind\" :  \"STRING\"}"),
				},
			},
			ReturnType: pulumi.String("{\"typeKind\" :  \"STRING\"}"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "tf_test_dataset_id_81126",
    });

    var customMaskingRoutine = new Gcp.BigQuery.Routine("custom_masking_routine", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "custom_masking_routine",
        RoutineType = "SCALAR_FUNCTION",
        Language = "SQL",
        DataGovernanceType = "DATA_MASKING",
        DefinitionBody = "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
        Arguments = new[]
        {
            new Gcp.BigQuery.Inputs.RoutineArgumentArgs
            {
                Name = "ssn",
                DataType = "{\"typeKind\" :  \"STRING\"}",
            },
        },
        ReturnType = "{\"typeKind\" :  \"STRING\"}",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("tf_test_dataset_id_81126")
            .build());

        var customMaskingRoutine = new Routine("customMaskingRoutine", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("custom_masking_routine")
            .routineType("SCALAR_FUNCTION")
            .language("SQL")
            .dataGovernanceType("DATA_MASKING")
            .definitionBody("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')")
            .arguments(RoutineArgumentArgs.builder()
                .name("ssn")
                .dataType("{\"typeKind\" :  \"STRING\"}")
                .build())
            .returnType("{\"typeKind\" :  \"STRING\"}")
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: tf_test_dataset_id_81126
  customMaskingRoutine:
    type: gcp:bigquery:Routine
    name: custom_masking_routine
    properties:
      datasetId: ${test.datasetId}
      routineId: custom_masking_routine
      routineType: SCALAR_FUNCTION
      language: SQL
      dataGovernanceType: DATA_MASKING
      definitionBody: SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')
      arguments:
        - name: ssn
          dataType: '{"typeKind" :  "STRING"}'
      returnType: '{"typeKind" :  "STRING"}'

Setting dataGovernanceType to DATA_MASKING registers the function for use in column-level security policies. The definitionBody implements the masking logic using SQL functions. BigQuery validates that masking functions meet security requirements before allowing them in policies.

Call external APIs from BigQuery queries

Queries sometimes need to enrich data by calling external services, such as geocoding APIs or machine learning endpoints hosted outside BigQuery.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
    connectionId: "connection_id",
    location: "US",
    cloudResource: {},
});
const remoteFunction = new gcp.bigquery.Routine("remote_function", {
    datasetId: test.datasetId,
    routineId: "routine_id",
    routineType: "SCALAR_FUNCTION",
    definitionBody: "",
    returnType: "{\"typeKind\" :  \"STRING\"}",
    remoteFunctionOptions: {
        endpoint: "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
        connection: testConnection.name,
        maxBatchingRows: "10",
        userDefinedContext: {
            z: "1.5",
        },
    },
});
import pulumi
import pulumi_gcp as gcp

test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
    connection_id="connection_id",
    location="US",
    cloud_resource={})
remote_function = gcp.bigquery.Routine("remote_function",
    dataset_id=test.dataset_id,
    routine_id="routine_id",
    routine_type="SCALAR_FUNCTION",
    definition_body="",
    return_type="{\"typeKind\" :  \"STRING\"}",
    remote_function_options={
        "endpoint": "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
        "connection": test_connection.name,
        "max_batching_rows": "10",
        "user_defined_context": {
            "z": "1.5",
        },
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
			DatasetId: pulumi.String("dataset_id"),
		})
		if err != nil {
			return err
		}
		testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
			ConnectionId:  pulumi.String("connection_id"),
			Location:      pulumi.String("US"),
			CloudResource: &bigquery.ConnectionCloudResourceArgs{},
		})
		if err != nil {
			return err
		}
		_, err = bigquery.NewRoutine(ctx, "remote_function", &bigquery.RoutineArgs{
			DatasetId:      test.DatasetId,
			RoutineId:      pulumi.String("routine_id"),
			RoutineType:    pulumi.String("SCALAR_FUNCTION"),
			DefinitionBody: pulumi.String(""),
			ReturnType:     pulumi.String("{\"typeKind\" :  \"STRING\"}"),
			RemoteFunctionOptions: &bigquery.RoutineRemoteFunctionOptionsArgs{
				Endpoint:        pulumi.String("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add"),
				Connection:      testConnection.Name,
				MaxBatchingRows: pulumi.String("10"),
				UserDefinedContext: pulumi.StringMap{
					"z": pulumi.String("1.5"),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var test = new Gcp.BigQuery.Dataset("test", new()
    {
        DatasetId = "dataset_id",
    });

    var testConnection = new Gcp.BigQuery.Connection("test", new()
    {
        ConnectionId = "connection_id",
        Location = "US",
        CloudResource = null,
    });

    var remoteFunction = new Gcp.BigQuery.Routine("remote_function", new()
    {
        DatasetId = test.DatasetId,
        RoutineId = "routine_id",
        RoutineType = "SCALAR_FUNCTION",
        DefinitionBody = "",
        ReturnType = "{\"typeKind\" :  \"STRING\"}",
        RemoteFunctionOptions = new Gcp.BigQuery.Inputs.RoutineRemoteFunctionOptionsArgs
        {
            Endpoint = "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
            Connection = testConnection.Name,
            MaxBatchingRows = "10",
            UserDefinedContext = 
            {
                { "z", "1.5" },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionCloudResourceArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineRemoteFunctionOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var test = new Dataset("test", DatasetArgs.builder()
            .datasetId("dataset_id")
            .build());

        var testConnection = new Connection("testConnection", ConnectionArgs.builder()
            .connectionId("connection_id")
            .location("US")
            .cloudResource(ConnectionCloudResourceArgs.builder()
                .build())
            .build());

        var remoteFunction = new Routine("remoteFunction", RoutineArgs.builder()
            .datasetId(test.datasetId())
            .routineId("routine_id")
            .routineType("SCALAR_FUNCTION")
            .definitionBody("")
            .returnType("{\"typeKind\" :  \"STRING\"}")
            .remoteFunctionOptions(RoutineRemoteFunctionOptionsArgs.builder()
                .endpoint("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add")
                .connection(testConnection.name())
                .maxBatchingRows("10")
                .userDefinedContext(Map.of("z", "1.5"))
                .build())
            .build());

    }
}
resources:
  test:
    type: gcp:bigquery:Dataset
    properties:
      datasetId: dataset_id
  testConnection:
    type: gcp:bigquery:Connection
    name: test
    properties:
      connectionId: connection_id
      location: US
      cloudResource: {}
  remoteFunction:
    type: gcp:bigquery:Routine
    name: remote_function
    properties:
      datasetId: ${test.datasetId}
      routineId: routine_id
      routineType: SCALAR_FUNCTION
      definitionBody: ""
      returnType: '{"typeKind" :  "STRING"}'
      remoteFunctionOptions:
        endpoint: https://us-east1-my_gcf_project.cloudfunctions.net/remote_add
        connection: ${testConnection.name}
        maxBatchingRows: '10'
        userDefinedContext:
          z: '1.5'

The remoteFunctionOptions block configures external HTTP calls. The endpoint property specifies the URL to invoke. The connection property references a BigQuery Cloud Resource connection that handles authentication. The maxBatchingRows property controls how many rows BigQuery sends per request. The userDefinedContext map passes static parameters to the endpoint.

Beyond these examples

These snippets focus on specific routine-level features: SQL, JavaScript, Python, and Scala routine languages, Spark integration (inline code, external files, JAR execution), and remote function calls and data masking. They’re intentionally minimal rather than full data processing pipelines.

The examples may reference pre-existing infrastructure such as BigQuery datasets, BigQuery connections (Spark, Cloud Resource), Cloud Storage files (PySpark scripts, JARs, dependencies), Cloud Functions or external HTTP endpoints, and container images in GCR. They focus on configuring the routine rather than provisioning everything around it.

To keep things focused, common routine patterns are omitted, including:

  • Determinism levels for JavaScript UDFs (determinismLevel)
  • Imported JavaScript libraries (importedLibraries)
  • Security modes (DEFINER vs INVOKER)
  • External runtime options for Python UDFs (containerMemory, containerCpu)

These omissions are intentional: the goal is to illustrate how each routine feature is wired, not provide drop-in data processing modules. See the BigQuery Routine resource reference for all available configuration options.

Let's create GCP BigQuery Routines

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Routine Types & Languages
What types of routines can I create in BigQuery?
You can create three types: SCALAR_FUNCTION (returns single value), PROCEDURE (performs actions), or TABLE_VALUED_FUNCTION (returns table).
What languages are supported for BigQuery routines?
BigQuery routines support SQL, JAVASCRIPT, PYTHON, JAVA, and SCALA.
Data Types & Return Values
Why am I seeing perpetual diffs on my routine's returnType?
The returnType field expects a JSON string, so any formatting changes create diffs even if the content is semantically identical. The API may return different formatting (e.g., switched field order or STRUCT vs RECORD). Use the schema as returned by the API to avoid recurring diffs.
When is returnType required?
returnType is optional if language is SQL (inferred from definitionBody), but required for all other languages.
How do I specify data types for arguments and return values?
Use JSON format for dataType and returnType fields, like {"typeKind": "FLOAT64"} or {"typeKind": "STRING"}.
Spark & External Runtimes
How do I create a Spark routine in BigQuery?
Set language to PYTHON, JAVA, or SCALA, and configure sparkOptions with a connection (BigQuery Connection resource) and runtimeVersion. You can provide code inline via definitionBody or reference files using mainFileUri.
How do I create a remote function that calls an external API?
Set routineType to SCALAR_FUNCTION and configure remoteFunctionOptions with an endpoint (Cloud Function URL) and connection (BigQuery Connection resource).
What are externalRuntimeOptions used for?
externalRuntimeOptions configure the runtime for Python UDFs, including containerMemory, containerCpu, and runtimeVersion (e.g., python-3.11).
Security & Data Governance
What's the difference between DEFINER and INVOKER security modes?
DEFINER runs the routine with the creator’s permissions, while INVOKER runs it with the caller’s permissions. If not specified, the security mode is automatically determined from the routine’s configuration.
How do I create a custom data masking function?
Set dataGovernanceType to DATA_MASKING with routineType as SCALAR_FUNCTION. The function will be validated and made available as a masking function.
Immutability & Updates
What properties can't I change after creating a routine?
The datasetId, routineId, project, and routineType properties are immutable. Changing any of these forces resource replacement.

Using a different cloud?

Explore analytics guides for other cloud providers: