The gcp:eventarc/pipeline:Pipeline resource, part of the Pulumi GCP provider, defines an Eventarc pipeline that routes events from sources to destinations with optional transformation and authentication. This guide focuses on three capabilities: routing to Pub/Sub, HTTP endpoints, and Workflows; OIDC and OAuth authentication; and payload transformation and format conversion.
Pipelines reference existing Pub/Sub topics, Workflows, network attachments, and service accounts. The examples are intentionally small. Combine them with your own event sources, authentication configuration, and transformation logic.
Route events to a Pub/Sub topic
Event-driven architectures often begin by routing events to Pub/Sub topics, where multiple subscribers can process messages independently.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const topic = new gcp.pubsub.Topic("topic", {name: "some-topic"});
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
topic: topic.id,
}],
labels: {
test_label: "test-eventarc-label",
},
annotations: {
test_annotation: "test-eventarc-annotation",
},
displayName: "Testing Pipeline",
});
import pulumi
import pulumi_gcp as gcp
topic = gcp.pubsub.Topic("topic", name="some-topic")
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"topic": topic.id,
}],
labels={
"test_label": "test-eventarc-label",
},
annotations={
"test_annotation": "test-eventarc-annotation",
},
display_name="Testing Pipeline")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
topic, err := pubsub.NewTopic(ctx, "topic", &pubsub.TopicArgs{
Name: pulumi.String("some-topic"),
})
if err != nil {
return err
}
_, err = eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
Topic: topic.ID(),
},
},
Labels: pulumi.StringMap{
"test_label": pulumi.String("test-eventarc-label"),
},
Annotations: pulumi.StringMap{
"test_annotation": pulumi.String("test-eventarc-annotation"),
},
DisplayName: pulumi.String("Testing Pipeline"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var topic = new Gcp.PubSub.Topic("topic", new()
{
Name = "some-topic",
});
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
Topic = topic.Id,
},
},
Labels =
{
{ "test_label", "test-eventarc-label" },
},
Annotations =
{
{ "test_annotation", "test-eventarc-annotation" },
},
DisplayName = "Testing Pipeline",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var topic = new Topic("topic", TopicArgs.builder()
.name("some-topic")
.build());
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.topic(topic.id())
.build())
.labels(Map.of("test_label", "test-eventarc-label"))
.annotations(Map.of("test_annotation", "test-eventarc-annotation"))
.displayName("Testing Pipeline")
.build());
}
}
resources:
topic:
type: gcp:pubsub:Topic
properties:
name: some-topic
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- topic: ${topic.id}
labels:
test_label: test-eventarc-label
annotations:
test_annotation: test-eventarc-annotation
displayName: Testing Pipeline
The destinations property defines where events are delivered. Setting the topic field routes events to a Pub/Sub topic, enabling fan-out processing. The location and pipelineId properties identify the pipeline within your GCP project.
Deliver events to an HTTP endpoint
Applications that expose HTTP APIs can receive events directly through pipeline delivery.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
httpEndpoint: {
uri: "https://10.77.0.0:80/route",
},
networkConfig: {
networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
}],
});
import pulumi
import pulumi_gcp as gcp
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"http_endpoint": {
"uri": "https://10.77.0.0:80/route",
},
"network_config": {
"network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
}])
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
Uri: pulumi.String("https://10.77.0.0:80/route"),
},
NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
{
Uri = "https://10.77.0.0:80/route",
},
NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
{
NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
.uri("https://10.77.0.0:80/route")
.build())
.networkConfig(PipelineDestinationNetworkConfigArgs.builder()
.networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
.build())
.build())
.build());
}
}
resources:
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- httpEndpoint:
uri: https://10.77.0.0:80/route
networkConfig:
networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
The httpEndpoint property specifies the target URI. The networkConfig property with networkAttachment enables private connectivity to endpoints that aren’t publicly accessible, routing traffic through a VPC network attachment.
Trigger Workflows from events
Workflows orchestrate multi-step processes across Google Cloud services. Pipelines can invoke workflows in response to events.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const workflow = new gcp.workflows.Workflow("workflow", {
name: "some-workflow",
deletionProtection: false,
region: "us-central1",
sourceContents: `# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: \${sys.get_env(\\"url\\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: \${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: \${WikiResult.body[1]}
`,
});
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
workflow: workflow.id,
}],
});
import pulumi
import pulumi_gcp as gcp
workflow = gcp.workflows.Workflow("workflow",
name="some-workflow",
deletion_protection=False,
region="us-central1",
source_contents="""# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
""")
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"workflow": workflow.id,
}])
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/workflows"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
workflow, err := workflows.NewWorkflow(ctx, "workflow", &workflows.WorkflowArgs{
Name: pulumi.String("some-workflow"),
DeletionProtection: pulumi.Bool(false),
Region: pulumi.String("us-central1"),
SourceContents: pulumi.String(`# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
`),
})
if err != nil {
return err
}
_, err = eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
Workflow: workflow.ID(),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var workflow = new Gcp.Workflows.Workflow("workflow", new()
{
Name = "some-workflow",
DeletionProtection = false,
Region = "us-central1",
SourceContents = @"# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\""url\"")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
",
});
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
Workflow = workflow.Id,
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.workflows.Workflow;
import com.pulumi.gcp.workflows.WorkflowArgs;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var workflow = new Workflow("workflow", WorkflowArgs.builder()
.name("some-workflow")
.deletionProtection(false)
.region("us-central1")
.sourceContents("""
# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
""")
.build());
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.workflow(workflow.id())
.build())
.build());
}
}
resources:
workflow:
type: gcp:workflows:Workflow
properties:
name: some-workflow
deletionProtection: false
region: us-central1
sourceContents: |
# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: $${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: $${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: $${WikiResult.body[1]}
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- workflow: ${workflow.id}
Setting the workflow field in destinations triggers workflow executions when events arrive. The pipeline passes event data to the workflow, which can then orchestrate multi-step processes across Google Cloud services.
Authenticate with OIDC and transform JSON payloads
Secure HTTP endpoints often require OIDC authentication. Pipelines can authenticate requests and transform event payloads before delivery.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
httpEndpoint: {
uri: "https://10.77.0.0:80/route",
messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
networkConfig: {
networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
authenticationConfig: {
googleOidc: {
serviceAccount: "my@service-account.com",
audience: "http://www.example.com",
},
},
outputPayloadFormat: {
json: {},
},
}],
inputPayloadFormat: {
json: {},
},
retryPolicy: {
maxRetryDelay: "50s",
maxAttempts: 2,
minRetryDelay: "40s",
},
mediations: [{
transformation: {
transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
},
}],
loggingConfig: {
logSeverity: "DEBUG",
},
});
import pulumi
import pulumi_gcp as gcp
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"http_endpoint": {
"uri": "https://10.77.0.0:80/route",
"message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
"network_config": {
"network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
"authentication_config": {
"google_oidc": {
"service_account": "my@service-account.com",
"audience": "http://www.example.com",
},
},
"output_payload_format": {
"json": {},
},
}],
input_payload_format={
"json": {},
},
retry_policy={
"max_retry_delay": "50s",
"max_attempts": 2,
"min_retry_delay": "40s",
},
mediations=[{
"transformation": {
"transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
},
}],
logging_config={
"log_severity": "DEBUG",
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
Uri: pulumi.String("https://10.77.0.0:80/route"),
MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
},
NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
},
AuthenticationConfig: &eventarc.PipelineDestinationAuthenticationConfigArgs{
GoogleOidc: &eventarc.PipelineDestinationAuthenticationConfigGoogleOidcArgs{
ServiceAccount: pulumi.String("my@service-account.com"),
Audience: pulumi.String("http://www.example.com"),
},
},
OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
Json: &eventarc.PipelineDestinationOutputPayloadFormatJsonArgs{},
},
},
},
InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
Json: &eventarc.PipelineInputPayloadFormatJsonArgs{},
},
RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
MaxRetryDelay: pulumi.String("50s"),
MaxAttempts: pulumi.Int(2),
MinRetryDelay: pulumi.String("40s"),
},
Mediations: eventarc.PipelineMediationArray{
&eventarc.PipelineMediationArgs{
Transformation: &eventarc.PipelineMediationTransformationArgs{
TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
},
},
},
LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
LogSeverity: pulumi.String("DEBUG"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
{
Uri = "https://10.77.0.0:80/route",
MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
{
NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
AuthenticationConfig = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigArgs
{
GoogleOidc = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigGoogleOidcArgs
{
ServiceAccount = "my@service-account.com",
Audience = "http://www.example.com",
},
},
OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
{
Json = null,
},
},
},
InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
{
Json = null,
},
RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
{
MaxRetryDelay = "50s",
MaxAttempts = 2,
MinRetryDelay = "40s",
},
Mediations = new[]
{
new Gcp.Eventarc.Inputs.PipelineMediationArgs
{
Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
{
TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
},
},
},
LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
{
LogSeverity = "DEBUG",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigGoogleOidcArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatJsonArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatJsonArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
.uri("https://10.77.0.0:80/route")
.messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
.build())
.networkConfig(PipelineDestinationNetworkConfigArgs.builder()
.networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
.build())
.authenticationConfig(PipelineDestinationAuthenticationConfigArgs.builder()
.googleOidc(PipelineDestinationAuthenticationConfigGoogleOidcArgs.builder()
.serviceAccount("my@service-account.com")
.audience("http://www.example.com")
.build())
.build())
.outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
.json(PipelineDestinationOutputPayloadFormatJsonArgs.builder()
.build())
.build())
.build())
.inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
.json(PipelineInputPayloadFormatJsonArgs.builder()
.build())
.build())
.retryPolicy(PipelineRetryPolicyArgs.builder()
.maxRetryDelay("50s")
.maxAttempts(2)
.minRetryDelay("40s")
.build())
.mediations(PipelineMediationArgs.builder()
.transformation(PipelineMediationTransformationArgs.builder()
.transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""")
.build())
.build())
.loggingConfig(PipelineLoggingConfigArgs.builder()
.logSeverity("DEBUG")
.build())
.build());
}
}
resources:
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- httpEndpoint:
uri: https://10.77.0.0:80/route
messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
networkConfig:
networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
authenticationConfig:
googleOidc:
serviceAccount: my@service-account.com
audience: http://www.example.com
outputPayloadFormat:
json: {}
inputPayloadFormat:
json: {}
retryPolicy:
maxRetryDelay: 50s
maxAttempts: 2
minRetryDelay: 40s
mediations:
- transformation:
transformationTemplate: |
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
loggingConfig:
logSeverity: DEBUG
The authenticationConfig property with googleOidc enables OIDC authentication using a service account. The mediations property with transformation applies a transformationTemplate to modify event payloads. The retryPolicy controls exponential backoff behavior, and loggingConfig sets the severity level for pipeline logs.
Use OAuth tokens with Protocol Buffer schemas
Some APIs require OAuth 2.0 tokens instead of OIDC. Protocol Buffer schemas provide strongly-typed message formats.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
httpEndpoint: {
uri: "https://10.77.0.0:80/route",
messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
networkConfig: {
networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
authenticationConfig: {
oauthToken: {
serviceAccount: "my@service-account.com",
scope: "https://www.googleapis.com/auth/cloud-platform",
},
},
outputPayloadFormat: {
protobuf: {
schemaDefinition: `syntax = \\"proto3\\";
message schema {
string name = 1;
string severity = 2;
}
`,
},
},
}],
inputPayloadFormat: {
protobuf: {
schemaDefinition: `syntax = \\"proto3\\";
message schema {
string name = 1;
string severity = 2;
}
`,
},
},
retryPolicy: {
maxRetryDelay: "50s",
maxAttempts: 2,
minRetryDelay: "40s",
},
mediations: [{
transformation: {
transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
},
}],
loggingConfig: {
logSeverity: "DEBUG",
},
});
import pulumi
import pulumi_gcp as gcp
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"http_endpoint": {
"uri": "https://10.77.0.0:80/route",
"message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
"network_config": {
"network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
"authentication_config": {
"oauth_token": {
"service_account": "my@service-account.com",
"scope": "https://www.googleapis.com/auth/cloud-platform",
},
},
"output_payload_format": {
"protobuf": {
"schema_definition": """syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""",
},
},
}],
input_payload_format={
"protobuf": {
"schema_definition": """syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""",
},
},
retry_policy={
"max_retry_delay": "50s",
"max_attempts": 2,
"min_retry_delay": "40s",
},
mediations=[{
"transformation": {
"transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
},
}],
logging_config={
"log_severity": "DEBUG",
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
Uri: pulumi.String("https://10.77.0.0:80/route"),
MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
},
NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
},
AuthenticationConfig: &eventarc.PipelineDestinationAuthenticationConfigArgs{
OauthToken: &eventarc.PipelineDestinationAuthenticationConfigOauthTokenArgs{
ServiceAccount: pulumi.String("my@service-account.com"),
Scope: pulumi.String("https://www.googleapis.com/auth/cloud-platform"),
},
},
OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
Protobuf: &eventarc.PipelineDestinationOutputPayloadFormatProtobufArgs{
SchemaDefinition: pulumi.String(`syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
`),
},
},
},
},
InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
Protobuf: &eventarc.PipelineInputPayloadFormatProtobufArgs{
SchemaDefinition: pulumi.String(`syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
`),
},
},
RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
MaxRetryDelay: pulumi.String("50s"),
MaxAttempts: pulumi.Int(2),
MinRetryDelay: pulumi.String("40s"),
},
Mediations: eventarc.PipelineMediationArray{
&eventarc.PipelineMediationArgs{
Transformation: &eventarc.PipelineMediationTransformationArgs{
TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
},
},
},
LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
LogSeverity: pulumi.String("DEBUG"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
{
Uri = "https://10.77.0.0:80/route",
MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
{
NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
AuthenticationConfig = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigArgs
{
OauthToken = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigOauthTokenArgs
{
ServiceAccount = "my@service-account.com",
Scope = "https://www.googleapis.com/auth/cloud-platform",
},
},
OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
{
Protobuf = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatProtobufArgs
{
SchemaDefinition = @"syntax = \""proto3\"";
message schema {
string name = 1;
string severity = 2;
}
",
},
},
},
},
InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
{
Protobuf = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatProtobufArgs
{
SchemaDefinition = @"syntax = \""proto3\"";
message schema {
string name = 1;
string severity = 2;
}
",
},
},
RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
{
MaxRetryDelay = "50s",
MaxAttempts = 2,
MinRetryDelay = "40s",
},
Mediations = new[]
{
new Gcp.Eventarc.Inputs.PipelineMediationArgs
{
Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
{
TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
},
},
},
LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
{
LogSeverity = "DEBUG",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigOauthTokenArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatProtobufArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatProtobufArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
.uri("https://10.77.0.0:80/route")
.messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
.build())
.networkConfig(PipelineDestinationNetworkConfigArgs.builder()
.networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
.build())
.authenticationConfig(PipelineDestinationAuthenticationConfigArgs.builder()
.oauthToken(PipelineDestinationAuthenticationConfigOauthTokenArgs.builder()
.serviceAccount("my@service-account.com")
.scope("https://www.googleapis.com/auth/cloud-platform")
.build())
.build())
.outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
.protobuf(PipelineDestinationOutputPayloadFormatProtobufArgs.builder()
.schemaDefinition("""
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""")
.build())
.build())
.build())
.inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
.protobuf(PipelineInputPayloadFormatProtobufArgs.builder()
.schemaDefinition("""
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""")
.build())
.build())
.retryPolicy(PipelineRetryPolicyArgs.builder()
.maxRetryDelay("50s")
.maxAttempts(2)
.minRetryDelay("40s")
.build())
.mediations(PipelineMediationArgs.builder()
.transformation(PipelineMediationTransformationArgs.builder()
.transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""")
.build())
.build())
.loggingConfig(PipelineLoggingConfigArgs.builder()
.logSeverity("DEBUG")
.build())
.build());
}
}
resources:
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- httpEndpoint:
uri: https://10.77.0.0:80/route
messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
networkConfig:
networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
authenticationConfig:
oauthToken:
serviceAccount: my@service-account.com
scope: https://www.googleapis.com/auth/cloud-platform
outputPayloadFormat:
protobuf:
schemaDefinition: |
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
inputPayloadFormat:
protobuf:
schemaDefinition: |
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
retryPolicy:
maxRetryDelay: 50s
maxAttempts: 2
minRetryDelay: 40s
mediations:
- transformation:
transformationTemplate: |
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
loggingConfig:
logSeverity: DEBUG
The oauthToken property authenticates with OAuth 2.0, specifying a scope for API access. The protobuf property in both inputPayloadFormat and outputPayloadFormat defines Protocol Buffer schemas for structured message validation. The schemaDefinition contains the proto3 schema text.
Beyond these examples
These snippets focus on specific pipeline features: destination types (Pub/Sub, HTTP, Workflows), authentication methods (OIDC, OAuth), and payload formats and transformation. They’re intentionally minimal rather than full event processing systems.
The examples may reference pre-existing infrastructure such as Pub/Sub topics, Workflows, network attachments, and service accounts with appropriate IAM permissions. They focus on configuring the pipeline rather than provisioning everything around it.
To keep things focused, common pipeline patterns are omitted, including:
- Custom retry timing (minRetryDelay, maxRetryDelay, maxAttempts)
- Message binding templates for header manipulation
- Customer-managed encryption keys (cryptoKeyName)
- Avro schema format (shown in EX6 but not covered in guide)
These omissions are intentional: the goal is to illustrate how each pipeline feature is wired, not provide drop-in event processing modules. See the Eventarc Pipeline resource reference for all available configuration options.
Let's create GCP Eventarc Pipelines
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Destinations & Routing
topic), HTTP endpoint (httpEndpoint), or Workflow (workflow). You can configure exactly one destination per Pipeline.authenticationConfig with either googleOidc (requires serviceAccount and audience) or oauthToken (requires serviceAccount and scope).networkConfig.networkAttachment with a network attachment resource path (e.g., projects/my-project/regions/us-central1/networkAttachments/some-attachment).Retry & Error Handling
minRetryDelay and maxRetryDelay to the same value in retryPolicy. This makes the duration between retries constant.Message Transformation & Formats
{}), Protobuf (with schemaDefinition), and Avro (with schemaDefinition) for both input and output formats.mediations with a transformation object containing transformationTemplate. Currently, only one Transformation operation is allowed per Pipeline.Resource Management & Security
location, pipelineId, and project properties are immutable. Changing any of these forces resource replacement.labels and annotations fields are non-authoritative and only manage values in your configuration. Use effectiveLabels and effectiveAnnotations to see all values present on the resource.cryptoKeyName to your KMS crypto key path (format: projects/{project}/locations/{location}/keyRings/{keyring}/cryptoKeys/{key}). If not set, a Google-owned key is used.pipelineId must match the pattern ^a-z?$ (lowercase letters, numbers, and hyphens).Using a different cloud?
Explore messaging guides for other cloud providers: