The gcp:bigquery/routine:Routine resource, part of the Pulumi GCP provider, defines user-defined functions and stored procedures within BigQuery datasets. This guide focuses on four capabilities: SQL and JavaScript scalar functions, table-valued functions, PySpark and Scala Spark procedures, and remote function and data masking integration.
Routines belong to datasets and may reference BigQuery Connections for Spark or remote functions, Cloud Storage files, or external endpoints. The examples are intentionally small. Combine them with your own datasets, connections, and infrastructure.
Create a SQL stored procedure
Data teams often need to encapsulate multi-step SQL logic into reusable procedures that can be called from queries or scheduled jobs.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "SQL",
securityMode: "INVOKER",
definitionBody: "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="SQL",
security_mode="INVOKER",
definition_body="CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("SQL"),
SecurityMode: pulumi.String("INVOKER"),
DefinitionBody: pulumi.String("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var sproc = new Gcp.BigQuery.Routine("sproc", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "SQL",
SecurityMode = "INVOKER",
DefinitionBody = "CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var sproc = new Routine("sproc", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("SQL")
.securityMode("INVOKER")
.definitionBody("CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);")
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
sproc:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: SQL
securityMode: INVOKER
definitionBody: CREATE FUNCTION Add(x FLOAT64, y FLOAT64) RETURNS FLOAT64 AS (x + y);
The routineType property set to PROCEDURE defines this as a stored procedure rather than a function. The definitionBody contains the SQL logic to execute. The securityMode property controls whether the routine runs with the caller’s permissions (INVOKER) or the definer’s permissions (DEFINER).
Define a JavaScript scalar function with typed arguments
When SQL expressions aren’t sufficient, JavaScript functions provide custom logic with explicit type contracts for inputs and outputs.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "SCALAR_FUNCTION",
language: "JAVASCRIPT",
definitionBody: "CREATE FUNCTION multiplyInputs return x*y;",
arguments: [
{
name: "x",
dataType: "{\"typeKind\" : \"FLOAT64\"}",
},
{
name: "y",
dataType: "{\"typeKind\" : \"FLOAT64\"}",
},
],
returnType: "{\"typeKind\" : \"FLOAT64\"}",
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="SCALAR_FUNCTION",
language="JAVASCRIPT",
definition_body="CREATE FUNCTION multiplyInputs return x*y;",
arguments=[
{
"name": "x",
"data_type": "{\"typeKind\" : \"FLOAT64\"}",
},
{
"name": "y",
"data_type": "{\"typeKind\" : \"FLOAT64\"}",
},
],
return_type="{\"typeKind\" : \"FLOAT64\"}")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("SCALAR_FUNCTION"),
Language: pulumi.String("JAVASCRIPT"),
DefinitionBody: pulumi.String("CREATE FUNCTION multiplyInputs return x*y;"),
Arguments: bigquery.RoutineArgumentArray{
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("x"),
DataType: pulumi.String("{\"typeKind\" : \"FLOAT64\"}"),
},
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("y"),
DataType: pulumi.String("{\"typeKind\" : \"FLOAT64\"}"),
},
},
ReturnType: pulumi.String("{\"typeKind\" : \"FLOAT64\"}"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var sproc = new Gcp.BigQuery.Routine("sproc", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "SCALAR_FUNCTION",
Language = "JAVASCRIPT",
DefinitionBody = "CREATE FUNCTION multiplyInputs return x*y;",
Arguments = new[]
{
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "x",
DataType = "{\"typeKind\" : \"FLOAT64\"}",
},
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "y",
DataType = "{\"typeKind\" : \"FLOAT64\"}",
},
},
ReturnType = "{\"typeKind\" : \"FLOAT64\"}",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var sproc = new Routine("sproc", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("SCALAR_FUNCTION")
.language("JAVASCRIPT")
.definitionBody("CREATE FUNCTION multiplyInputs return x*y;")
.arguments(
RoutineArgumentArgs.builder()
.name("x")
.dataType("{\"typeKind\" : \"FLOAT64\"}")
.build(),
RoutineArgumentArgs.builder()
.name("y")
.dataType("{\"typeKind\" : \"FLOAT64\"}")
.build())
.returnType("{\"typeKind\" : \"FLOAT64\"}")
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
sproc:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: SCALAR_FUNCTION
language: JAVASCRIPT
definitionBody: CREATE FUNCTION multiplyInputs return x*y;
arguments:
- name: x
dataType: '{"typeKind" : "FLOAT64"}'
- name: y
dataType: '{"typeKind" : "FLOAT64"}'
returnType: '{"typeKind" : "FLOAT64"}'
The routineType SCALAR_FUNCTION returns a single value. The arguments array defines input parameters with dataType specified as JSON schemas. The returnType property defines the output type, also as a JSON schema. JavaScript functions execute in BigQuery’s JavaScript runtime.
Return table results from a function
Table-valued functions transform input parameters into result sets, enabling reusable query logic that returns multiple rows and columns.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const sproc = new gcp.bigquery.Routine("sproc", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "TABLE_VALUED_FUNCTION",
language: "SQL",
definitionBody: "SELECT 1 + value AS value\n",
arguments: [{
name: "value",
argumentKind: "FIXED_TYPE",
dataType: JSON.stringify({
typeKind: "INT64",
}),
}],
returnTableType: JSON.stringify({
columns: [{
name: "value",
type: {
typeKind: "INT64",
},
}],
}),
});
import pulumi
import json
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
sproc = gcp.bigquery.Routine("sproc",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="TABLE_VALUED_FUNCTION",
language="SQL",
definition_body="SELECT 1 + value AS value\n",
arguments=[{
"name": "value",
"argument_kind": "FIXED_TYPE",
"data_type": json.dumps({
"typeKind": "INT64",
}),
}],
return_table_type=json.dumps({
"columns": [{
"name": "value",
"type": {
"typeKind": "INT64",
},
}],
}))
package main
import (
"encoding/json"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
tmpJSON0, err := json.Marshal(map[string]interface{}{
"typeKind": "INT64",
})
if err != nil {
return err
}
json0 := string(tmpJSON0)
tmpJSON1, err := json.Marshal(map[string]interface{}{
"columns": []map[string]interface{}{
map[string]interface{}{
"name": "value",
"type": map[string]interface{}{
"typeKind": "INT64",
},
},
},
})
if err != nil {
return err
}
json1 := string(tmpJSON1)
_, err = bigquery.NewRoutine(ctx, "sproc", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("TABLE_VALUED_FUNCTION"),
Language: pulumi.String("SQL"),
DefinitionBody: pulumi.String("SELECT 1 + value AS value\n"),
Arguments: bigquery.RoutineArgumentArray{
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("value"),
ArgumentKind: pulumi.String("FIXED_TYPE"),
DataType: pulumi.String(json0),
},
},
ReturnTableType: pulumi.String(json1),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var sproc = new Gcp.BigQuery.Routine("sproc", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "TABLE_VALUED_FUNCTION",
Language = "SQL",
DefinitionBody = @"SELECT 1 + value AS value
",
Arguments = new[]
{
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "value",
ArgumentKind = "FIXED_TYPE",
DataType = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["typeKind"] = "INT64",
}),
},
},
ReturnTableType = JsonSerializer.Serialize(new Dictionary<string, object?>
{
["columns"] = new[]
{
new Dictionary<string, object?>
{
["name"] = "value",
["type"] = new Dictionary<string, object?>
{
["typeKind"] = "INT64",
},
},
},
}),
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import static com.pulumi.codegen.internal.Serialization.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var sproc = new Routine("sproc", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("TABLE_VALUED_FUNCTION")
.language("SQL")
.definitionBody("""
SELECT 1 + value AS value
""")
.arguments(RoutineArgumentArgs.builder()
.name("value")
.argumentKind("FIXED_TYPE")
.dataType(serializeJson(
jsonObject(
jsonProperty("typeKind", "INT64")
)))
.build())
.returnTableType(serializeJson(
jsonObject(
jsonProperty("columns", jsonArray(jsonObject(
jsonProperty("name", "value"),
jsonProperty("type", jsonObject(
jsonProperty("typeKind", "INT64")
))
)))
)))
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
sproc:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: TABLE_VALUED_FUNCTION
language: SQL
definitionBody: |
SELECT 1 + value AS value
arguments:
- name: value
argumentKind: FIXED_TYPE
dataType:
fn::toJSON:
typeKind: INT64
returnTableType:
fn::toJSON:
columns:
- name: value
type:
typeKind: INT64
The TABLE_VALUED_FUNCTION routine type returns a table rather than a scalar value. The returnTableType property defines the output schema as a JSON structure with column names and types. The argumentKind property specifies whether arguments are fixed types or any type.
Run PySpark jobs with inline code
Large-scale data transformations often require Spark’s distributed processing, which BigQuery can orchestrate through PySpark procedures.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
spark: {},
});
const pyspark = new gcp.bigquery.Routine("pyspark", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "PYTHON",
definitionBody: `from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\\"spark-bigquery-demo\\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\\"bigquery\\") \\\\
.option(\\"table\\", \\"bigquery-public-data:samples.shakespeare\\") \\\\
.load()
words.createOrReplaceTempView(\\"words\\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\"sum(word_count)\\", \\"sum_word_count\\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\\"bigquery\\") \\\\
.option(\\"writeMethod\\", \\"direct\\") \\\\
.save(\\"wordcount_dataset.wordcount_output\\")
`,
sparkOptions: {
connection: testConnection.name,
runtimeVersion: "2.1",
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
spark={})
pyspark = gcp.bigquery.Routine("pyspark",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="PYTHON",
definition_body="""from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
.option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
.load()
words.createOrReplaceTempView(\"words\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
.option(\"writeMethod\", \"direct\") \\
.save(\"wordcount_dataset.wordcount_output\")
""",
spark_options={
"connection": test_connection.name,
"runtime_version": "2.1",
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
Spark: &bigquery.ConnectionSparkArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "pyspark", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("PYTHON"),
DefinitionBody: pulumi.String(`from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
.option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
.load()
words.createOrReplaceTempView(\"words\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
.option(\"writeMethod\", \"direct\") \\
.save(\"wordcount_dataset.wordcount_output\")
`),
SparkOptions: &bigquery.RoutineSparkOptionsArgs{
Connection: testConnection.Name,
RuntimeVersion: pulumi.String("2.1"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
Spark = null,
});
var pyspark = new Gcp.BigQuery.Routine("pyspark", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "PYTHON",
DefinitionBody = @"from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\""spark-bigquery-demo\"").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\""bigquery\"") \\
.option(\""table\"", \""bigquery-public-data:samples.shakespeare\"") \\
.load()
words.createOrReplaceTempView(\""words\"")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\""sum(word_count)\"", \""sum_word_count\"")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\""bigquery\"") \\
.option(\""writeMethod\"", \""direct\"") \\
.save(\""wordcount_dataset.wordcount_output\"")
",
SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
{
Connection = testConnection.Name,
RuntimeVersion = "2.1",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.spark(ConnectionSparkArgs.builder()
.build())
.build());
var pyspark = new Routine("pyspark", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("PYTHON")
.definitionBody("""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(\"spark-bigquery-demo\").getOrCreate()
# Load data from BigQuery.
words = spark.read.format(\"bigquery\") \\
.option(\"table\", \"bigquery-public-data:samples.shakespeare\") \\
.load()
words.createOrReplaceTempView(\"words\")
# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\"sum(word_count)\", \"sum_word_count\")
word_count.show()
word_count.printSchema()
# Saving the data to BigQuery
word_count.write.format(\"bigquery\") \\
.option(\"writeMethod\", \"direct\") \\
.save(\"wordcount_dataset.wordcount_output\")
""")
.sparkOptions(RoutineSparkOptionsArgs.builder()
.connection(testConnection.name())
.runtimeVersion("2.1")
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
spark: {}
pyspark:
type: gcp:bigquery:Routine
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: PYTHON
definitionBody: "from pyspark.sql import SparkSession\n\nspark = SparkSession.builder.appName(\\\"spark-bigquery-demo\\\").getOrCreate()\n \n# Load data from BigQuery.\nwords = spark.read.format(\\\"bigquery\\\") \\\\\n .option(\\\"table\\\", \\\"bigquery-public-data:samples.shakespeare\\\") \\\\\n .load()\nwords.createOrReplaceTempView(\\\"words\\\")\n \n# Perform word count.\nword_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed(\\\"sum(word_count)\\\", \\\"sum_word_count\\\")\nword_count.show()\nword_count.printSchema()\n \n# Saving the data to BigQuery\nword_count.write.format(\\\"bigquery\\\") \\\\\n .option(\\\"writeMethod\\\", \\\"direct\\\") \\\\\n .save(\\\"wordcount_dataset.wordcount_output\\\")\n"
sparkOptions:
connection: ${testConnection.name}
runtimeVersion: '2.1'
The language property set to PYTHON with sparkOptions enables PySpark execution. The connection property references a BigQuery Connection configured for Spark. The runtimeVersion specifies the Spark runtime. The definitionBody contains the PySpark code that reads from BigQuery, processes data, and writes results back.
Reference external PySpark files from Cloud Storage
Complex Spark jobs often span multiple files and dependencies stored in Cloud Storage rather than inline code.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
spark: {},
});
const pysparkMainfile = new gcp.bigquery.Routine("pyspark_mainfile", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "PYTHON",
definitionBody: "",
sparkOptions: {
connection: testConnection.name,
runtimeVersion: "2.1",
mainFileUri: "gs://test-bucket/main.py",
pyFileUris: ["gs://test-bucket/lib.py"],
fileUris: ["gs://test-bucket/distribute_in_executor.json"],
archiveUris: ["gs://test-bucket/distribute_in_executor.tar.gz"],
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
spark={})
pyspark_mainfile = gcp.bigquery.Routine("pyspark_mainfile",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="PYTHON",
definition_body="",
spark_options={
"connection": test_connection.name,
"runtime_version": "2.1",
"main_file_uri": "gs://test-bucket/main.py",
"py_file_uris": ["gs://test-bucket/lib.py"],
"file_uris": ["gs://test-bucket/distribute_in_executor.json"],
"archive_uris": ["gs://test-bucket/distribute_in_executor.tar.gz"],
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
Spark: &bigquery.ConnectionSparkArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "pyspark_mainfile", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("PYTHON"),
DefinitionBody: pulumi.String(""),
SparkOptions: &bigquery.RoutineSparkOptionsArgs{
Connection: testConnection.Name,
RuntimeVersion: pulumi.String("2.1"),
MainFileUri: pulumi.String("gs://test-bucket/main.py"),
PyFileUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/lib.py"),
},
FileUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/distribute_in_executor.json"),
},
ArchiveUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/distribute_in_executor.tar.gz"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
Spark = null,
});
var pysparkMainfile = new Gcp.BigQuery.Routine("pyspark_mainfile", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "PYTHON",
DefinitionBody = "",
SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
{
Connection = testConnection.Name,
RuntimeVersion = "2.1",
MainFileUri = "gs://test-bucket/main.py",
PyFileUris = new[]
{
"gs://test-bucket/lib.py",
},
FileUris = new[]
{
"gs://test-bucket/distribute_in_executor.json",
},
ArchiveUris = new[]
{
"gs://test-bucket/distribute_in_executor.tar.gz",
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.spark(ConnectionSparkArgs.builder()
.build())
.build());
var pysparkMainfile = new Routine("pysparkMainfile", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("PYTHON")
.definitionBody("")
.sparkOptions(RoutineSparkOptionsArgs.builder()
.connection(testConnection.name())
.runtimeVersion("2.1")
.mainFileUri("gs://test-bucket/main.py")
.pyFileUris("gs://test-bucket/lib.py")
.fileUris("gs://test-bucket/distribute_in_executor.json")
.archiveUris("gs://test-bucket/distribute_in_executor.tar.gz")
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
spark: {}
pysparkMainfile:
type: gcp:bigquery:Routine
name: pyspark_mainfile
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: PYTHON
definitionBody: ""
sparkOptions:
connection: ${testConnection.name}
runtimeVersion: '2.1'
mainFileUri: gs://test-bucket/main.py
pyFileUris:
- gs://test-bucket/lib.py
fileUris:
- gs://test-bucket/distribute_in_executor.json
archiveUris:
- gs://test-bucket/distribute_in_executor.tar.gz
The mainFileUri property points to the primary Python file in Cloud Storage. The pyFileUris, fileUris, and archiveUris properties reference additional Python modules, data files, and compressed archives. This approach separates code from infrastructure configuration and enables code reuse across routines.
Execute Scala Spark jobs from JAR files
JVM-based Spark applications written in Scala or Java run from compiled JARs with custom container images and runtime properties.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
spark: {},
});
const sparkJar = new gcp.bigquery.Routine("spark_jar", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "PROCEDURE",
language: "SCALA",
definitionBody: "",
sparkOptions: {
connection: testConnection.name,
runtimeVersion: "2.1",
containerImage: "gcr.io/my-project-id/my-spark-image:latest",
mainClass: "com.google.test.jar.MainClass",
jarUris: ["gs://test-bucket/uberjar_spark_spark3.jar"],
properties: {
"spark.dataproc.scaling.version": "2",
"spark.reducer.fetchMigratedShuffle.enabled": "true",
},
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
spark={})
spark_jar = gcp.bigquery.Routine("spark_jar",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="PROCEDURE",
language="SCALA",
definition_body="",
spark_options={
"connection": test_connection.name,
"runtime_version": "2.1",
"container_image": "gcr.io/my-project-id/my-spark-image:latest",
"main_class": "com.google.test.jar.MainClass",
"jar_uris": ["gs://test-bucket/uberjar_spark_spark3.jar"],
"properties": {
"spark.dataproc.scaling.version": "2",
"spark.reducer.fetchMigratedShuffle.enabled": "true",
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
Spark: &bigquery.ConnectionSparkArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "spark_jar", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("PROCEDURE"),
Language: pulumi.String("SCALA"),
DefinitionBody: pulumi.String(""),
SparkOptions: &bigquery.RoutineSparkOptionsArgs{
Connection: testConnection.Name,
RuntimeVersion: pulumi.String("2.1"),
ContainerImage: pulumi.String("gcr.io/my-project-id/my-spark-image:latest"),
MainClass: pulumi.String("com.google.test.jar.MainClass"),
JarUris: pulumi.StringArray{
pulumi.String("gs://test-bucket/uberjar_spark_spark3.jar"),
},
Properties: pulumi.StringMap{
"spark.dataproc.scaling.version": pulumi.String("2"),
"spark.reducer.fetchMigratedShuffle.enabled": pulumi.String("true"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
Spark = null,
});
var sparkJar = new Gcp.BigQuery.Routine("spark_jar", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "PROCEDURE",
Language = "SCALA",
DefinitionBody = "",
SparkOptions = new Gcp.BigQuery.Inputs.RoutineSparkOptionsArgs
{
Connection = testConnection.Name,
RuntimeVersion = "2.1",
ContainerImage = "gcr.io/my-project-id/my-spark-image:latest",
MainClass = "com.google.test.jar.MainClass",
JarUris = new[]
{
"gs://test-bucket/uberjar_spark_spark3.jar",
},
Properties =
{
{ "spark.dataproc.scaling.version", "2" },
{ "spark.reducer.fetchMigratedShuffle.enabled", "true" },
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionSparkArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineSparkOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.spark(ConnectionSparkArgs.builder()
.build())
.build());
var sparkJar = new Routine("sparkJar", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("PROCEDURE")
.language("SCALA")
.definitionBody("")
.sparkOptions(RoutineSparkOptionsArgs.builder()
.connection(testConnection.name())
.runtimeVersion("2.1")
.containerImage("gcr.io/my-project-id/my-spark-image:latest")
.mainClass("com.google.test.jar.MainClass")
.jarUris("gs://test-bucket/uberjar_spark_spark3.jar")
.properties(Map.ofEntries(
Map.entry("spark.dataproc.scaling.version", "2"),
Map.entry("spark.reducer.fetchMigratedShuffle.enabled", "true")
))
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
spark: {}
sparkJar:
type: gcp:bigquery:Routine
name: spark_jar
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: PROCEDURE
language: SCALA
definitionBody: ""
sparkOptions:
connection: ${testConnection.name}
runtimeVersion: '2.1'
containerImage: gcr.io/my-project-id/my-spark-image:latest
mainClass: com.google.test.jar.MainClass
jarUris:
- gs://test-bucket/uberjar_spark_spark3.jar
properties:
spark.dataproc.scaling.version: '2'
spark.reducer.fetchMigratedShuffle.enabled: 'true'
The language SCALA with jarUris points to compiled JAR files in Cloud Storage. The mainClass property specifies the entry point. The containerImage property references a custom container with required dependencies. The properties map passes Spark configuration options to the runtime.
Create custom data masking functions
Organizations implementing column-level security need custom masking logic that BigQuery can apply automatically to sensitive fields.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "tf_test_dataset_id_22811"});
const customMaskingRoutine = new gcp.bigquery.Routine("custom_masking_routine", {
datasetId: test.datasetId,
routineId: "custom_masking_routine",
routineType: "SCALAR_FUNCTION",
language: "SQL",
dataGovernanceType: "DATA_MASKING",
definitionBody: "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
arguments: [{
name: "ssn",
dataType: "{\"typeKind\" : \"STRING\"}",
}],
returnType: "{\"typeKind\" : \"STRING\"}",
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="tf_test_dataset_id_22811")
custom_masking_routine = gcp.bigquery.Routine("custom_masking_routine",
dataset_id=test.dataset_id,
routine_id="custom_masking_routine",
routine_type="SCALAR_FUNCTION",
language="SQL",
data_governance_type="DATA_MASKING",
definition_body="SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
arguments=[{
"name": "ssn",
"data_type": "{\"typeKind\" : \"STRING\"}",
}],
return_type="{\"typeKind\" : \"STRING\"}")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("tf_test_dataset_id_22811"),
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "custom_masking_routine", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("custom_masking_routine"),
RoutineType: pulumi.String("SCALAR_FUNCTION"),
Language: pulumi.String("SQL"),
DataGovernanceType: pulumi.String("DATA_MASKING"),
DefinitionBody: pulumi.String("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')"),
Arguments: bigquery.RoutineArgumentArray{
&bigquery.RoutineArgumentArgs{
Name: pulumi.String("ssn"),
DataType: pulumi.String("{\"typeKind\" : \"STRING\"}"),
},
},
ReturnType: pulumi.String("{\"typeKind\" : \"STRING\"}"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "tf_test_dataset_id_22811",
});
var customMaskingRoutine = new Gcp.BigQuery.Routine("custom_masking_routine", new()
{
DatasetId = test.DatasetId,
RoutineId = "custom_masking_routine",
RoutineType = "SCALAR_FUNCTION",
Language = "SQL",
DataGovernanceType = "DATA_MASKING",
DefinitionBody = "SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')",
Arguments = new[]
{
new Gcp.BigQuery.Inputs.RoutineArgumentArgs
{
Name = "ssn",
DataType = "{\"typeKind\" : \"STRING\"}",
},
},
ReturnType = "{\"typeKind\" : \"STRING\"}",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineArgumentArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("tf_test_dataset_id_22811")
.build());
var customMaskingRoutine = new Routine("customMaskingRoutine", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("custom_masking_routine")
.routineType("SCALAR_FUNCTION")
.language("SQL")
.dataGovernanceType("DATA_MASKING")
.definitionBody("SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')")
.arguments(RoutineArgumentArgs.builder()
.name("ssn")
.dataType("{\"typeKind\" : \"STRING\"}")
.build())
.returnType("{\"typeKind\" : \"STRING\"}")
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: tf_test_dataset_id_22811
customMaskingRoutine:
type: gcp:bigquery:Routine
name: custom_masking_routine
properties:
datasetId: ${test.datasetId}
routineId: custom_masking_routine
routineType: SCALAR_FUNCTION
language: SQL
dataGovernanceType: DATA_MASKING
definitionBody: SAFE.REGEXP_REPLACE(ssn, '[0-9]', 'X')
arguments:
- name: ssn
dataType: '{"typeKind" : "STRING"}'
returnType: '{"typeKind" : "STRING"}'
The dataGovernanceType property set to DATA_MASKING registers this function for use in column-level security policies. BigQuery validates the function and makes it available for masking sensitive data. The definitionBody implements the masking logic, here using regex to replace digits with ‘X’.
Call external APIs from BigQuery queries
Queries sometimes need to enrich data by calling external services like Cloud Functions or external APIs during query execution.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const test = new gcp.bigquery.Dataset("test", {datasetId: "dataset_id"});
const testConnection = new gcp.bigquery.Connection("test", {
connectionId: "connection_id",
location: "US",
cloudResource: {},
});
const remoteFunction = new gcp.bigquery.Routine("remote_function", {
datasetId: test.datasetId,
routineId: "routine_id",
routineType: "SCALAR_FUNCTION",
definitionBody: "",
returnType: "{\"typeKind\" : \"STRING\"}",
remoteFunctionOptions: {
endpoint: "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
connection: testConnection.name,
maxBatchingRows: "10",
userDefinedContext: {
z: "1.5",
},
},
});
import pulumi
import pulumi_gcp as gcp
test = gcp.bigquery.Dataset("test", dataset_id="dataset_id")
test_connection = gcp.bigquery.Connection("test",
connection_id="connection_id",
location="US",
cloud_resource={})
remote_function = gcp.bigquery.Routine("remote_function",
dataset_id=test.dataset_id,
routine_id="routine_id",
routine_type="SCALAR_FUNCTION",
definition_body="",
return_type="{\"typeKind\" : \"STRING\"}",
remote_function_options={
"endpoint": "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
"connection": test_connection.name,
"max_batching_rows": "10",
"user_defined_context": {
"z": "1.5",
},
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/bigquery"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
test, err := bigquery.NewDataset(ctx, "test", &bigquery.DatasetArgs{
DatasetId: pulumi.String("dataset_id"),
})
if err != nil {
return err
}
testConnection, err := bigquery.NewConnection(ctx, "test", &bigquery.ConnectionArgs{
ConnectionId: pulumi.String("connection_id"),
Location: pulumi.String("US"),
CloudResource: &bigquery.ConnectionCloudResourceArgs{},
})
if err != nil {
return err
}
_, err = bigquery.NewRoutine(ctx, "remote_function", &bigquery.RoutineArgs{
DatasetId: test.DatasetId,
RoutineId: pulumi.String("routine_id"),
RoutineType: pulumi.String("SCALAR_FUNCTION"),
DefinitionBody: pulumi.String(""),
ReturnType: pulumi.String("{\"typeKind\" : \"STRING\"}"),
RemoteFunctionOptions: &bigquery.RoutineRemoteFunctionOptionsArgs{
Endpoint: pulumi.String("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add"),
Connection: testConnection.Name,
MaxBatchingRows: pulumi.String("10"),
UserDefinedContext: pulumi.StringMap{
"z": pulumi.String("1.5"),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var test = new Gcp.BigQuery.Dataset("test", new()
{
DatasetId = "dataset_id",
});
var testConnection = new Gcp.BigQuery.Connection("test", new()
{
ConnectionId = "connection_id",
Location = "US",
CloudResource = null,
});
var remoteFunction = new Gcp.BigQuery.Routine("remote_function", new()
{
DatasetId = test.DatasetId,
RoutineId = "routine_id",
RoutineType = "SCALAR_FUNCTION",
DefinitionBody = "",
ReturnType = "{\"typeKind\" : \"STRING\"}",
RemoteFunctionOptions = new Gcp.BigQuery.Inputs.RoutineRemoteFunctionOptionsArgs
{
Endpoint = "https://us-east1-my_gcf_project.cloudfunctions.net/remote_add",
Connection = testConnection.Name,
MaxBatchingRows = "10",
UserDefinedContext =
{
{ "z", "1.5" },
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.bigquery.Dataset;
import com.pulumi.gcp.bigquery.DatasetArgs;
import com.pulumi.gcp.bigquery.Connection;
import com.pulumi.gcp.bigquery.ConnectionArgs;
import com.pulumi.gcp.bigquery.inputs.ConnectionCloudResourceArgs;
import com.pulumi.gcp.bigquery.Routine;
import com.pulumi.gcp.bigquery.RoutineArgs;
import com.pulumi.gcp.bigquery.inputs.RoutineRemoteFunctionOptionsArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var test = new Dataset("test", DatasetArgs.builder()
.datasetId("dataset_id")
.build());
var testConnection = new Connection("testConnection", ConnectionArgs.builder()
.connectionId("connection_id")
.location("US")
.cloudResource(ConnectionCloudResourceArgs.builder()
.build())
.build());
var remoteFunction = new Routine("remoteFunction", RoutineArgs.builder()
.datasetId(test.datasetId())
.routineId("routine_id")
.routineType("SCALAR_FUNCTION")
.definitionBody("")
.returnType("{\"typeKind\" : \"STRING\"}")
.remoteFunctionOptions(RoutineRemoteFunctionOptionsArgs.builder()
.endpoint("https://us-east1-my_gcf_project.cloudfunctions.net/remote_add")
.connection(testConnection.name())
.maxBatchingRows("10")
.userDefinedContext(Map.of("z", "1.5"))
.build())
.build());
}
}
resources:
test:
type: gcp:bigquery:Dataset
properties:
datasetId: dataset_id
testConnection:
type: gcp:bigquery:Connection
name: test
properties:
connectionId: connection_id
location: US
cloudResource: {}
remoteFunction:
type: gcp:bigquery:Routine
name: remote_function
properties:
datasetId: ${test.datasetId}
routineId: routine_id
routineType: SCALAR_FUNCTION
definitionBody: ""
returnType: '{"typeKind" : "STRING"}'
remoteFunctionOptions:
endpoint: https://us-east1-my_gcf_project.cloudfunctions.net/remote_add
connection: ${testConnection.name}
maxBatchingRows: '10'
userDefinedContext:
z: '1.5'
The remoteFunctionOptions property configures external service integration. The endpoint property points to a Cloud Function or API endpoint. The connection property references a BigQuery Connection for Cloud Resource. The maxBatchingRows property controls how many rows BigQuery sends per request. The userDefinedContext property passes custom parameters to the endpoint.
Beyond these examples
These snippets focus on specific routine-level features: SQL, JavaScript, and Python function types, PySpark and Scala Spark procedures, and remote function integration and data masking. They’re intentionally minimal rather than full data processing pipelines.
The examples may reference pre-existing infrastructure such as BigQuery datasets, BigQuery Connections (for Spark and remote functions), Cloud Storage buckets (for Spark files), and container images and Cloud Functions (for Spark and remote functions). They focus on configuring the routine rather than provisioning everything around it.
To keep things focused, common routine patterns are omitted, including:
- Security mode configuration (DEFINER vs INVOKER)
- Determinism level for JavaScript functions
- Imported JavaScript libraries
- External runtime options for Python UDFs
These omissions are intentional: the goal is to illustrate how each routine feature is wired, not provide drop-in data processing modules. See the BigQuery Routine resource reference for all available configuration options.
Let's create GCP BigQuery Routines
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Common Issues & Limitations
returnType field expects a JSON string, so any changes to the string create diffs even if the JSON structure is unchanged. The API may return different JSON formatting (e.g., switched order of values or STRUCT vs RECORD field types). Use the schema as returned by the API to avoid recurring diffs.datasetId, routineId, project, and routineType properties are immutable. Changing any of these requires recreating the routine.Routine Types & Configuration
You can create three types:
- SCALAR_FUNCTION - Returns a single value
- PROCEDURE - Performs actions without returning a value
- TABLE_VALUED_FUNCTION - Returns a table
returnType is optional if language is SQL (inferred from definitionBody at query time), but required for all other languages (JAVASCRIPT, PYTHON, JAVA, SCALA).definitionBody can be an empty string for remote functions or Spark routines that use mainFileUri to specify the code location.securityMode determines whose permissions are used when executing the routine. If not specified, BigQuery automatically determines the security mode from the routine’s configuration.Advanced Features
language to PYTHON, routineType to PROCEDURE, and configure sparkOptions with a connection (to a BigQuery Connection resource) and runtimeVersion. You can either provide code in definitionBody or use mainFileUri to reference a GCS file.language to SCALA or JAVA, routineType to PROCEDURE, and configure sparkOptions with mainClass and jarUris pointing to your compiled JAR files in GCS.routineType to SCALAR_FUNCTION, leave definitionBody empty, and configure remoteFunctionOptions with an endpoint (Cloud Function URL) and connection (BigQuery Connection resource).dataGovernanceType to DATA_MASKING, routineType to SCALAR_FUNCTION, and provide the masking logic in definitionBody. The function will be validated and made available as a masking function.returnTableType can only be set when routineType is TABLE_VALUED_FUNCTION. If absent, the return table type is inferred from definitionBody at query time. If present, the result columns are cast to match the specified types.Using a different cloud?
Explore analytics guides for other cloud providers: