The gcp:bigquery/routine:Routine resource, part of the Pulumi GCP provider, defines user-defined functions and stored procedures within BigQuery datasets: their language, execution logic, and integration points. This guide focuses on four capabilities: SQL and JavaScript scalar functions, table-valued functions, Spark procedures (PySpark and Scala), and remote function calls and data masking.
Routines belong to datasets and may reference BigQuery connections, Cloud Storage files, container images, or external HTTP endpoints. The examples are intentionally small. Combine them with your own datasets, connections, and external resources.
Create a SQL stored procedure
Data teams often need to encapsulate multi-step SQL logic into reusable procedures that can be called from queries or scheduled jobs.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "SQL",
securityMode: "INVOKER",
definitionBody: "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="SQL",
security_mode="INVOKER",
definition_body="CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("SQL"),
SecurityMode: pulumi.String("INVOKER"),
DefinitionBody: pulumi.String("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var sproc = new Gcp.BigQuery.Routine("sproc", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "SQL",
SecurityMode = "INVOKER",
DefinitionBody = "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var sproc = new Routine("sproc", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("SQL")
.securityMode("INVOKER")
.definitionBody("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
sproc:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: SQL
securityMode: INVOKER
definitionBody: CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);
The routineType property set to PROCEDURE defines this as a stored procedure rather than a function. The definitionBody contains the SQL logic to execute. The securityMode property controls whether the routine runs with the caller’s permissions (INVOKER) or the definer’s permissions (DEFINER).
Define a JavaScript scalar function with typed arguments
When SQL expressions aren’t sufficient, JavaScript functions provide custom logic with explicit type contracts for inputs and outputs.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "SCALAR_FUNCTION",
language: "JAVASCRIPT",
definitionBody: "CREATE FUNCTION multiplyInputs return x*y;",
arguments: [
{
name: "x",
dataType: "{\"typeKind\" : \"FLOAT64\"}",
},
{
name: "y",
dataType: "{\"typeKind\" : \"FLOAT64\"}",
},
],
returnType: "{\"typeKind\" : \"FLOAT64\"}",
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="SCALAR_FUNCTION",
language="JAVASCRIPT",
definition_body="CREATE FUNCTION multiplyInputs return x*y;",
arguments=[
{
"name": "x",
"data_type": "{\"typeKind\" : \"FLOAT64\"}",
},
{
"name": "y",
"data_type": "{\"typeKind\" : \"FLOAT64\"}",
},
],
return_type="{\"typeKind\" : \"FLOAT64\"}")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("SCALAR_FUNCTION"),
Language: pulumi.String("JAVASCRIPT"),
DefinitionBody: pulumi.String("CREATE FUNCTION multiplyInputs return x*y;"),
Arguments: bigquery.RoutineArgumentArray{
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("x"),
DataType: pulumi.String("{\"typeKind\" : \"FLOAT64\"}"),
},
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("y"),
DataType: pulumi.String("{\"typeKind\" : \"FLOAT64\"}"),
},
},
ReturnType: pulumi.String("{\"typeKind\" : \"FLOAT64\"}"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var sproc = new Gcp.BigQuery.Routine("sproc", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "SCALAR_FUNCTION",
Language = "JAVASCRIPT",
DefinitionBody = "CREATE FUNCTION multiplyInputs return x*y;",
Arguments = new[]
{
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "x",
DataType = "{\"typeKind\" : \"FLOAT64\"}",
},
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "y",
DataType = "{\"typeKind\" : \"FLOAT64\"}",
},
},
ReturnType = "{\"typeKind\" : \"FLOAT64\"}",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var sproc = new Routine("sproc", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("SCALAR_FUNCTION")
.language("JAVASCRIPT")
.definitionBody("CREATE FUNCTION multiplyInputs return x*y;")
.arguments(
RoutineArgumentArgs.builder()
.name("x")
.dataType("{\"typeKind\" : \"FLOAT64\"}")
.build(),
RoutineArgumentArgs.builder()
.name("y")
.dataType("{\"typeKind\" : \"FLOAT64\"}")
.build())
.returnType("{\"typeKind\" : \"FLOAT64\"}")
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
sproc:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: SCALAR_FUNCTION
language: JAVASCRIPT
definitionBody: CREATE FUNCTION multiplyInputs return x*y;
arguments:
- name: x
dataType: '{"typeKind" : "FLOAT64"}'
- name: y
dataType: '{"typeKind" : "FLOAT64"}'
returnType: '{"typeKind" : "FLOAT64"}'
The language property switches to JAVASCRIPT, and the definitionBody contains JavaScript code. The arguments array defines input parameters with explicit dataType specifications in JSON format. The returnType property specifies the output type, also in JSON format. BigQuery validates these types at query time.
Return table results from a function
Table-valued functions transform input parameters into result sets, enabling reusable query logic that returns multiple rows and columns.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "TABLE_VALUED_FUNCTION",
language: "SQL",
definitionBody: "SELECT 1 + value AS value\n",
arguments: [{
name: "value",
argumentKind: "FIXED_TYPE",
dataType: JSON.stringify({
typeKind: "INT64",
}),
}],
returnTableType: JSON.stringify({
columns: [{
name: "value",
type: {
typeKind: "INT64",
},
}],
}),
});
import pulumi
import json
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="TABLE_VALUED_FUNCTION",
language="SQL",
definition_body="SELECT 1 + value AS value\n",
arguments=[{
"name": "value",
"argument_kind": "FIXED_TYPE",
"data_type": json.dumps({
"typeKind": "INT64",
}),
}],
return_table_type=json.dumps({
"columns": [{
"name": "value",
"type": {
"typeKind": "INT64",
},
}],
}))
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
tmpJSON0, err := json.Marshal(map[string]interface{}{
"typeKind": "INT64",
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
tmpJSON1, err := json.Marshal(map[string]interface{}{
"columns": []map[string]interface{}{
map[string]interface{}{
"name": "value",
"type": map[string]interface{}{
"typeKind": "INT64",
},
},
},
})
if err != nil {
return err
}
json1 := string(tmpJSON1)
_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("TABLE_VALUED_FUNCTION"),
Language: pulumi.String("SQL"),
DefinitionBody: pulumi.String("SELECT 1 + value AS value\n"),
Arguments: bigquery.RoutineArgumentArray{
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("value"),
ArgumentKind: pulumi.String("FIXED_TYPE"),
DataType: pulumi.String(json0),
},
},
ReturnTableType: pulumi.String(json1),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var sproc = new Gcp.BigQuery.Routine("sproc", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "TABLE_VALUED_FUNCTION",
Language = "SQL",
DefinitionBody = @"SELECT 1 + value AS value
",
Arguments = new[]
{
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "value",
ArgumentKind = "FIXED_TYPE",
DataType = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["typeKind"] = "INT64",
}),
},
},
ReturnTableType = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["columns"] = new[]
{
new Dictionary<string, object?>
{
["name"] = "value",
["type"] = new Dictionary<string, object?>
{
["typeKind"] = "INT64",
},
},
},
}),
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var sproc = new Routine("sproc", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("TABLE_VALUED_FUNCTION")
.language("SQL")
.definitionBody("""
SELECT 1 + value AS value
""")
.arguments(RoutineArgumentArgs.builder()
.name("value")
.argumentKind("FIXED_TYPE")
.dataType(serializeJson(
jsonObject(
jsonProperty("typeKind", "INT64")
)))
.build())
.returnTableType(serializeJson(
jsonObject(
jsonProperty("columns", jsonArray(jsonObject(
jsonProperty("name", "value"),
jsonProperty("type", jsonObject(
jsonProperty("typeKind", "INT64")
))
)))
)))
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
sproc:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: TABLE_VALUED_FUNCTION
language: SQL
definitionBody: |
SELECT 1 + value AS value
arguments:
- name: value
argumentKind: FIXED_TYPE
dataType:
fn::toJSON:
typeKind: INT64
returnTableType:
fn::toJSON:
columns:
- name: value
type:
typeKind: INT64
Setting routineType to TABLE_VALUED_FUNCTION changes the return behavior from a single value to a table. The returnTableType property defines the output schema as JSON, specifying column names and types. The definitionBody contains a SQL query that produces the result set.
Run PySpark jobs with inline code
Large-scale data transformations often require Spark’s distributed processing, which BigQuery can orchestrate through stored procedures.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
spark: {},
});
const pyspark = new gcp.bigquery.Routine("pyspark", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "PYTHON",
definitionBody: `from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\\"spark-bigquery-demo\\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\\"bigquery\\") \\\\
.option(\\"table\\", \\"bigquery-public-data:samples.shakespeare\\") \\\\
.load()
words.createOrReplaceTempView(\\"words\\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\"sum(word_count)\\", \\"sum_word_count\\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\\"bigquery\\") \\\\
.option(\\"writeMethod\\", \\"direct\\") \\\\
.save(\\"wordcount_dataset.wordcount_output\\")
`,
sparkOptions: {
connection: testConnection.name,
runtimeVersion: "2.1",
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
spark={})
pyspark = gcp.bigquery.Routine("pyspark",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="PYTHON",
definition_body="""from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
.option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
.load()
words.createOrReplaceTempView(\"words\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
.option(\"writeMethod\", \"direct\") \\
.save(\"wordcount_dataset.wordcount_output\")
""",
spark_options={
"connection": test_connection.name,
"runtime_version": "2.1",
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
Spark: &bigquery.ConnectionSparkArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "pyspark", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("PYTHON"),
DefinitionBody: pulumi.String(`from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
.option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
.load()
words.createOrReplaceTempView(\"words\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
.option(\"writeMethod\", \"direct\") \\
.save(\"wordcount_dataset.wordcount_output\")
`),
SparkOptions: &bigquery.RoutineSparkOptionsArgs{
Connection: testConnection.Name,
RuntimeVersion: pulumi.String("2.1"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
Spark = null,
});
var pyspark = new Gcp.BigQuery.Routine("pyspark", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "PYTHON",
DefinitionBody = @"from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\""spark-bigquery-demo\"").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\""bigquery\"") \\
.option(\""table\"", \""bigquery-public-data:samples.shakespeare\"") \\
.load()
words.createOrReplaceTempView(\""words\"")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\""sum(word_count)\"", \""sum_word_count\"")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\""bigquery\"") \\
.option(\""writeMethod\"", \""direct\"") \\
.save(\""wordcount_dataset.wordcount_output\"")
",
SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
{
Connection = testConnection.Name,
RuntimeVersion = "2.1",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.spark(ConnectionSparkArgs.builder()
.build())
.build());
var pyspark = new Routine("pyspark", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("PYTHON")
.definitionBody("""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
.option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
.load()
words.createOrReplaceTempView(\"words\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
.option(\"writeMethod\", \"direct\") \\
.save(\"wordcount_dataset.wordcount_output\")
""")
.sparkOptions(RoutineSparkOptionsArgs.builder()
.connection(testConnection.name())
.runtimeVersion("2.1")
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
spark: {}
pyspark:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: PYTHON
definitionBody: "from pyspark.sql import SparkSession\n\nspark = SparkSession.builder.appName(\\\"spark-bigquery-demo\\\").getOrCreate()\n \n# Load data from BigQuery.\nwords = spark.read.format(\\\"bigquery\\\") \\\\\n .option(\\\"table\\\", \\\"bigquery-public-data:samples.shakespeare\\\") \\\\\n .load()\nwords.createOrReplaceTempView(\\\"words\\\")\n \n# Perform word count.\nword_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\\"sum(word_count)\\\", \\\"sum_word_count\\\")\nword_count.show()\nword_count.printSchema()\n \n# Saving the data to BigQuery\nword_count.write.format(\\\"bigquery\\\") \\\\\n .option(\\\"writeMethod\\\", \\\"direct\\\") \\\\\n .save(\\\"wordcount_dataset.wordcount_output\\\")\n"
sparkOptions:
connection: ${testConnection.name}
runtimeVersion: '2.1'
The sparkOptions block enables Spark execution by specifying a connection to a BigQuery Spark resource and a runtimeVersion. The definitionBody contains PySpark code that reads from BigQuery, performs transformations, and writes results back. BigQuery manages the Spark cluster lifecycle automatically.
Reference external PySpark files from Cloud Storage
Production Spark jobs typically live in version-controlled files rather than inline code, requiring references to GCS-hosted scripts and dependencies.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
spark: {},
});
const pysparkMainfile = new gcp.bigquery.Routine("pyspark_mainfile", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "PYTHON",
definitionBody: "",
sparkOptions: {
connection: testConnection.name,
runtimeVersion: "2.1",
mainFileUri: "gs://test-bucket/main.py",
pyFileUris: ["gs://test-bucket/lib.py"],
fileUris: ["gs://test-bucket/distribute_in_executor.json"],
archiveUris: ["gs://test-bucket/distribute_in_executor.tar.gz"],
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
spark={})
pyspark_mainfile = gcp.bigquery.Routine("pyspark_mainfile",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="PYTHON",
definition_body="",
spark_options={
"connection": test_connection.name,
"runtime_version": "2.1",
"main_file_uri": "gs://test-bucket/main.py",
"py_file_uris": ["gs://test-bucket/lib.py"],
"file_uris": ["gs://test-bucket/distribute_in_executor.json"],
"archive_uris": ["gs://test-bucket/distribute_in_executor.tar.gz"],
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
Spark: &bigquery.ConnectionSparkArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "pyspark_mainfile", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("PYTHON"),
DefinitionBody: pulumi.String(""),
SparkOptions: &bigquery.RoutineSparkOptionsArgs{
Connection: testConnection.Name,
RuntimeVersion: pulumi.String("2.1"),
MainFileUri: pulumi.String("gs://test-bucket/main.py"),
PyFileUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/lib.py"),
},
FileUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/distribute_in_executor.json"),
},
ArchiveUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/distribute_in_executor.tar.gz"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
Spark = null,
});
var pysparkMainfile = new Gcp.BigQuery.Routine("pyspark_mainfile", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "PYTHON",
DefinitionBody = "",
SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
{
Connection = testConnection.Name,
RuntimeVersion = "2.1",
MainFileUri = "gs://test-bucket/main.py",
PyFileUris = new[]
{
"gs://test-bucket/lib.py",
},
FileUris = new[]
{
"gs://test-bucket/distribute_in_executor.json",
},
ArchiveUris = new[]
{
"gs://test-bucket/distribute_in_executor.tar.gz",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.spark(ConnectionSparkArgs.builder()
.build())
.build());
var pysparkMainfile = new Routine("pysparkMainfile", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("PYTHON")
.definitionBody("")
.sparkOptions(RoutineSparkOptionsArgs.builder()
.connection(testConnection.name())
.runtimeVersion("2.1")
.mainFileUri("gs://test-bucket/main.py")
.pyFileUris("gs://test-bucket/lib.py")
.fileUris("gs://test-bucket/distribute_in_executor.json")
.archiveUris("gs://test-bucket/distribute_in_executor.tar.gz")
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
spark: {}
pysparkMainfile:
type: gcp:bigquery:Routine
name: pyspark_mainfile
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: PYTHON
definitionBody: ""
sparkOptions:
connection: ${testConnection.name}
runtimeVersion: '2.1'
mainFileUri: gs://test-bucket/main.py
pyFileUris:
- gs://test-bucket/lib.py
fileUris:
- gs://test-bucket/distribute_in_executor.json
archiveUris:
- gs://test-bucket/distribute_in_executor.tar.gz
The mainFileUri property points to the primary Python script in Cloud Storage. The pyFileUris, fileUris, and archiveUris properties reference additional dependencies that Spark distributes to executors. When definitionBody is empty, BigQuery executes the main file directly.
Execute Scala Spark jobs from JAR files
Teams with existing Scala Spark applications can run them in BigQuery by referencing compiled JARs and specifying the main class.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
spark: {},
});
const sparkJar = new gcp.bigquery.Routine("spark_jar", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "SCALA",
definitionBody: "",
sparkOptions: {
connection: testConnection.name,
runtimeVersion: "2.1",
containerImage: "gcr.io/my-project-id/my-spark-image:latest",
mainClass: "com.google.test.jar.MainClass",
jarUris: ["gs://test-bucket/uberjar_spark_spark3.jar"],
properties: {
"spark.dataproc.scaling.version": "2",
"spark.reducer.fetchMigratedShuffle.enabled": "true",
},
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
spark={})
spark_jar = gcp.bigquery.Routine("spark_jar",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="SCALA",
definition_body="",
spark_options={
"connection": test_connection.name,
"runtime_version": "2.1",
"container_image": "gcr.io/my-project-id/my-spark-image:latest",
"main_class": "com.google.test.jar.MainClass",
"jar_uris": ["gs://test-bucket/uberjar_spark_spark3.jar"],
"properties": {
"spark.dataproc.scaling.version": "2",
"spark.reducer.fetchMigratedShuffle.enabled": "true",
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
Spark: &bigquery.ConnectionSparkArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "spark_jar", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("SCALA"),
DefinitionBody: pulumi.String(""),
SparkOptions: &bigquery.RoutineSparkOptionsArgs{
Connection: testConnection.Name,
RuntimeVersion: pulumi.String("2.1"),
ContainerImage: pulumi.String("gcr.io/my-project-id/my-spark-image:latest"),
MainClass: pulumi.String("com.google.test.jar.MainClass"),
JarUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/uberjar_spark_spark3.jar"),
},
Properties: pulumi.StringMap{
"spark.dataproc.scaling.version": pulumi.String("2"),
"spark.reducer.fetchMigratedShuffle.enabled": pulumi.String("true"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
Spark = null,
});
var sparkJar = new Gcp.BigQuery.Routine("spark_jar", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "SCALA",
DefinitionBody = "",
SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
{
Connection = testConnection.Name,
RuntimeVersion = "2.1",
ContainerImage = "gcr.io/my-project-id/my-spark-image:latest",
MainClass = "com.google.test.jar.MainClass",
JarUris = new[]
{
"gs://test-bucket/uberjar_spark_spark3.jar",
},
Properties =
{
{ "spark.dataproc.scaling.version", "2" },
{ "spark.reducer.fetchMigratedShuffle.enabled", "true" },
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.spark(ConnectionSparkArgs.builder()
.build())
.build());
var sparkJar = new Routine("sparkJar", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("SCALA")
.definitionBody("")
.sparkOptions(RoutineSparkOptionsArgs.builder()
.connection(testConnection.name())
.runtimeVersion("2.1")
.containerImage("gcr.io/my-project-id/my-spark-image:latest")
.mainClass("com.google.test.jar.MainClass")
.jarUris("gs://test-bucket/uberjar_spark_spark3.jar")
.properties(Map.ofEntries(
Map.entry("spark.dataproc.scaling.version", "2"),
Map.entry("spark.reducer.fetchMigratedShuffle.enabled", "true")
))
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
spark: {}
sparkJar:
type: gcp:bigquery:Routine
name: spark_jar
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: SCALA
definitionBody: ""
sparkOptions:
connection: ${testConnection.name}
runtimeVersion: '2.1'
containerImage: gcr.io/my-project-id/my-spark-image:latest
mainClass: com.google.test.jar.MainClass
jarUris:
- gs://test-bucket/uberjar_spark_spark3.jar
properties:
spark.dataproc.scaling.version: '2'
spark.reducer.fetchMigratedShuffle.enabled: 'true'
The language switches to SCALA, and the mainClass property identifies the entry point in your JAR. The jarUris property lists JARs to load. The containerImage property specifies a custom container for the Spark runtime. The properties map passes Spark configuration options.
Create custom data masking functions
Data governance policies often require custom masking logic beyond BigQuery’s built-in functions, such as format-preserving transformations.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "tf_test_dataset_id_81126"});
const customMaskingRoutine = new gcp.bigquery.Routine("custom_masking_routine", {
datasetId: test.datasetId,
routineId: "custom_masking_routine",
routineType: "SCALAR_FUNCTION",
language: "SQL",
dataGovernanceType: "DATA_MASKING",
definitionBody: "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
arguments: [{
name: "ssn",
dataType: "{\"typeKind\" : \"STRING\"}",
}],
returnType: "{\"typeKind\" : \"STRING\"}",
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="tf_test_dataset_id_81126")
custom_masking_routine = gcp.bigquery.Routine("custom_masking_routine",
dataset_id=test.dataset_id,
routine_id="custom_masking_routine",
routine_type="SCALAR_FUNCTION",
language="SQL",
data_governance_type="DATA_MASKING",
definition_body="SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
arguments=[{
"name": "ssn",
"data_type": "{\"typeKind\" : \"STRING\"}",
}],
return_type="{\"typeKind\" : \"STRING\"}")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("tf_test_dataset_id_81126"),
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "custom_masking_routine", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("custom_masking_routine"),
RoutineType: pulumi.String("SCALAR_FUNCTION"),
Language: pulumi.String("SQL"),
DataGovernanceType: pulumi.String("DATA_MASKING"),
DefinitionBody: pulumi.String("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')"),
Arguments: bigquery.RoutineArgumentArray{
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("ssn"),
DataType: pulumi.String("{\"typeKind\" : \"STRING\"}"),
},
},
ReturnType: pulumi.String("{\"typeKind\" : \"STRING\"}"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "tf_test_dataset_id_81126",
});
var customMaskingRoutine = new Gcp.BigQuery.Routine("custom_masking_routine", new()
{
DatasetId = test.DatasetId,
RoutineId = "custom_masking_routine",
RoutineType = "SCALAR_FUNCTION",
Language = "SQL",
DataGovernanceType = "DATA_MASKING",
DefinitionBody = "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
Arguments = new[]
{
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "ssn",
DataType = "{\"typeKind\" : \"STRING\"}",
},
},
ReturnType = "{\"typeKind\" : \"STRING\"}",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("tf_test_dataset_id_81126")
.build());
var customMaskingRoutine = new Routine("customMaskingRoutine", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("custom_masking_routine")
.routineType("SCALAR_FUNCTION")
.language("SQL")
.dataGovernanceType("DATA_MASKING")
.definitionBody("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')")
.arguments(RoutineArgumentArgs.builder()
.name("ssn")
.dataType("{\"typeKind\" : \"STRING\"}")
.build())
.returnType("{\"typeKind\" : \"STRING\"}")
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: tf_test_dataset_id_81126
customMaskingRoutine:
type: gcp:bigquery:Routine
name: custom_masking_routine
properties:
datasetId: ${test.datasetId}
routineId: custom_masking_routine
routineType: SCALAR_FUNCTION
language: SQL
dataGovernanceType: DATA_MASKING
definitionBody: SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')
arguments:
- name: ssn
dataType: '{"typeKind" : "STRING"}'
returnType: '{"typeKind" : "STRING"}'
Setting dataGovernanceType to DATA_MASKING registers the function for use in column-level security policies. The definitionBody implements the masking logic using SQL functions. BigQuery validates that masking functions meet security requirements before allowing them in policies.
Call external APIs from BigQuery queries
Queries sometimes need to enrich data by calling external services, such as geocoding APIs or machine learning endpoints hosted outside BigQuery.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
cloudResource: {},
});
const remoteFunction = new gcp.bigquery.Routine("remote_function", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "SCALAR_FUNCTION",
definitionBody: "",
returnType: "{\"typeKind\" : \"STRING\"}",
remoteFunctionOptions: {
endpoint: "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
connection: testConnection.name,
maxBatchingRows: "10",
userDefinedContext: {
z: "1.5",
},
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
cloud_resource={})
remote_function = gcp.bigquery.Routine("remote_function",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="SCALAR_FUNCTION",
definition_body="",
return_type="{\"typeKind\" : \"STRING\"}",
remote_function_options={
"endpoint": "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
"connection": test_connection.name,
"max_batching_rows": "10",
"user_defined_context": {
"z": "1.5",
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
CloudResource: &bigquery.ConnectionCloudResourceArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "remote_function", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("SCALAR_FUNCTION"),
DefinitionBody: pulumi.String(""),
ReturnType: pulumi.String("{\"typeKind\" : \"STRING\"}"),
RemoteFunctionOptions: &bigquery.RoutineRemoteFunctionOptionsArgs{
Endpoint: pulumi.String("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add"),
Connection: testConnection.Name,
MaxBatchingRows: pulumi.String("10"),
UserDefinedContext: pulumi.StringMap{
"z": pulumi.String("1.5"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
CloudResource = null,
});
var remoteFunction = new Gcp.BigQuery.Routine("remote_function", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "SCALAR_FUNCTION",
DefinitionBody = "",
ReturnType = "{\"typeKind\" : \"STRING\"}",
RemoteFunctionOptions = new Gcp.BigQuery.Inputs.RoutineRemoteFunctionOptionsArgs
{
Endpoint = "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
Connection = testConnection.Name,
MaxBatchingRows = "10",
UserDefinedContext =
{
{ "z", "1.5" },
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionCloudResourceArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineRemoteFunctionOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.cloudResource(ConnectionCloudResourceArgs.builder()
.build())
.build());
var remoteFunction = new Routine("remoteFunction", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("SCALAR_FUNCTION")
.definitionBody("")
.returnType("{\"typeKind\" : \"STRING\"}")
.remoteFunctionOptions(RoutineRemoteFunctionOptionsArgs.builder()
.endpoint("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add")
.connection(testConnection.name())
.maxBatchingRows("10")
.userDefinedContext(Map.of("z", "1.5"))
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
cloudResource: {}
remoteFunction:
type: gcp:bigquery:Routine
name: remote_function
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: SCALAR_FUNCTION
definitionBody: ""
returnType: '{"typeKind" : "STRING"}'
remoteFunctionOptions:
endpoint: https://us-east1-my_gcf_project.cloudfunctions.net/remote_add
connection: ${testConnection.name}
maxBatchingRows: '10'
userDefinedContext:
z: '1.5'
The remoteFunctionOptions block configures external HTTP calls. The endpoint property specifies the URL to invoke. The connection property references a BigQuery Cloud Resource connection that handles authentication. The maxBatchingRows property controls how many rows BigQuery sends per request. The userDefinedContext map passes static parameters to the endpoint.
Beyond these examples
These snippets focus on specific routine-level features: SQL, JavaScript, Python, and Scala routine languages, Spark integration (inline code, external files, JAR execution), and remote function calls and data masking. They’re intentionally minimal rather than full data processing pipelines.
The examples may reference pre-existing infrastructure such as BigQuery datasets, BigQuery connections (Spark, Cloud Resource), Cloud Storage files (PySpark scripts, JARs, dependencies), Cloud Functions or external HTTP endpoints, and container images in GCR. They focus on configuring the routine rather than provisioning everything around it.
To keep things focused, common routine patterns are omitted, including:
- Determinism levels for JavaScript UDFs (determinismLevel)
- Imported JavaScript libraries (importedLibraries)
- Security modes (DEFINER vs INVOKER)
- External runtime options for Python UDFs (containerMemory, containerCpu)
These omissions are intentional: the goal is to illustrate how each routine feature is wired, not provide drop-in data processing modules. See the BigQuery Routine resource reference for all available configuration options.
Let's create GCP BigQuery Routines
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Routine Types & Languages
SCALAR_FUNCTION (returns single value), PROCEDURE (performs actions), or TABLE_VALUED_FUNCTION (returns table).SQL, JAVASCRIPT, PYTHON, JAVA, and SCALA.Data Types & Return Values
returnType field expects a JSON string, so any formatting changes create diffs even if the content is semantically identical. The API may return different formatting (e.g., switched field order or STRUCT vs RECORD). Use the schema as returned by the API to avoid recurring diffs.returnType is optional if language is SQL (inferred from definitionBody), but required for all other languages.dataType and returnType fields, like {"typeKind": "FLOAT64"} or {"typeKind": "STRING"}.Spark & External Runtimes
language to PYTHON, JAVA, or SCALA, and configure sparkOptions with a connection (BigQuery Connection resource) and runtimeVersion. You can provide code inline via definitionBody or reference files using mainFileUri.routineType to SCALAR_FUNCTION and configure remoteFunctionOptions with an endpoint (Cloud Function URL) and connection (BigQuery Connection resource).externalRuntimeOptions configure the runtime for Python UDFs, including containerMemory, containerCpu, and runtimeVersion (e.g., python-3.11).Security & Data Governance
DEFINER runs the routine with the creator’s permissions, while INVOKER runs it with the caller’s permissions. If not specified, the security mode is automatically determined from the routine’s configuration.dataGovernanceType to DATA_MASKING with routineType as SCALAR_FUNCTION. The function will be validated and made available as a masking function.Immutability & Updates
datasetId, routineId, project, and routineType properties are immutable. Changing any of these forces resource replacement.Using a different cloud?
Explore analytics guides for other cloud providers: