The gcp:eventarc/pipeline:Pipeline resource, part of the Pulumi GCP provider, defines an Eventarc pipeline that routes events to destinations with optional transformation, authentication, and format conversion. This guide focuses on three capabilities: routing to Pub/Sub, HTTP endpoints, and Workflows; authentication with OIDC and OAuth tokens; and payload transformation and format conversion.
Pipelines reference existing infrastructure like Pub/Sub topics, Workflows, network attachments, service accounts, and KMS keys. The examples are intentionally small. Combine them with your own event sources, authentication configuration, and destination infrastructure.
Route events to a Pub/Sub topic
Event-driven architectures often begin by routing events to Pub/Sub topics, where multiple subscribers can process messages independently.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const topic = new gcp.pubsub.Topic("topic", {name: "some-topic"});
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
topic: topic.id,
}],
labels: {
test_label: "test-eventarc-label",
},
annotations: {
test_annotation: "test-eventarc-annotation",
},
displayName: "Testing Pipeline",
});
import pulumi
import pulumi_gcp as gcp
topic = gcp.pubsub.Topic("topic", name="some-topic")
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"topic": topic.id,
}],
labels={
"test_label": "test-eventarc-label",
},
annotations={
"test_annotation": "test-eventarc-annotation",
},
display_name="Testing Pipeline")
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
topic, err := pubsub.NewTopic(ctx, "topic", &pubsub.TopicArgs{
Name: pulumi.String("some-topic"),
})
if err != nil {
return err
}
_, err = eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
Topic: topic.ID(),
},
},
Labels: pulumi.StringMap{
"test_label": pulumi.String("test-eventarc-label"),
},
Annotations: pulumi.StringMap{
"test_annotation": pulumi.String("test-eventarc-annotation"),
},
DisplayName: pulumi.String("Testing Pipeline"),
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var topic = new Gcp.PubSub.Topic("topic", new()
{
Name = "some-topic",
});
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
Topic = topic.Id,
},
},
Labels =
{
{ "test_label", "test-eventarc-label" },
},
Annotations =
{
{ "test_annotation", "test-eventarc-annotation" },
},
DisplayName = "Testing Pipeline",
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var topic = new Topic("topic", TopicArgs.builder()
.name("some-topic")
.build());
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.topic(topic.id())
.build())
.labels(Map.of("test_label", "test-eventarc-label"))
.annotations(Map.of("test_annotation", "test-eventarc-annotation"))
.displayName("Testing Pipeline")
.build());
}
}
resources:
topic:
type: gcp:pubsub:Topic
properties:
name: some-topic
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- topic: ${topic.id}
labels:
test_label: test-eventarc-label
annotations:
test_annotation: test-eventarc-annotation
displayName: Testing Pipeline
The destinations property defines where events are sent. When you specify a topic, the pipeline forwards all incoming events to that Pub/Sub topic. The location and pipelineId properties identify the pipeline within your GCP project.
Deliver events to an HTTP endpoint
Applications that expose HTTP APIs can receive events directly through pipeline delivery.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
httpEndpoint: {
uri: "https://10.77.0.0:80/route",
},
networkConfig: {
networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
}],
});
import pulumi
import pulumi_gcp as gcp
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"http_endpoint": {
"uri": "https://10.77.0.0:80/route",
},
"network_config": {
"network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
}])
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
Uri: pulumi.String("https://10.77.0.0:80/route"),
},
NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
},
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
{
Uri = "https://10.77.0.0:80/route",
},
NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
{
NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
.uri("https://10.77.0.0:80/route")
.build())
.networkConfig(PipelineDestinationNetworkConfigArgs.builder()
.networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
.build())
.build())
.build());
}
}
resources:
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- httpEndpoint:
uri: https://10.77.0.0:80/route
networkConfig:
networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
The httpEndpoint property sets the target URI. The networkConfig property enables private connectivity through a network attachment, allowing you to reach endpoints that aren’t publicly accessible. This configuration routes events to your HTTP service without exposing it to the internet.
Trigger Workflows from events
Workflows orchestrate multi-step processes across Google Cloud services. Pipelines can invoke workflows in response to events.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const workflow = new gcp.workflows.Workflow("workflow", {
name: "some-workflow",
deletionProtection: false,
region: "us-central1",
sourceContents: `# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: \${sys.get_env(\\"url\\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: \${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: \${WikiResult.body[1]}
`,
});
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
workflow: workflow.id,
}],
});
import pulumi
import pulumi_gcp as gcp
workflow = gcp.workflows.Workflow("workflow",
name="some-workflow",
deletion_protection=False,
region="us-central1",
source_contents="""# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
""")
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"workflow": workflow.id,
}])
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/workflows"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
workflow, err := workflows.NewWorkflow(ctx, "workflow", &workflows.WorkflowArgs{
Name: pulumi.String("some-workflow"),
DeletionProtection: pulumi.Bool(false),
Region: pulumi.String("us-central1"),
SourceContents: pulumi.String(`# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
`),
})
if err != nil {
return err
}
_, err = eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
Workflow: workflow.ID(),
},
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var workflow = new Gcp.Workflows.Workflow("workflow", new()
{
Name = "some-workflow",
DeletionProtection = false,
Region = "us-central1",
SourceContents = @"# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\""url\"")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
",
});
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
Workflow = workflow.Id,
},
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.workflows.Workflow;
import com.pulumi.gcp.workflows.WorkflowArgs;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var workflow = new Workflow("workflow", WorkflowArgs.builder()
.name("some-workflow")
.deletionProtection(false)
.region("us-central1")
.sourceContents("""
# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: ${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: ${WikiResult.body[1]}
""")
.build());
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.workflow(workflow.id())
.build())
.build());
}
}
resources:
workflow:
type: gcp:workflows:Workflow
properties:
name: some-workflow
deletionProtection: false
region: us-central1
sourceContents: |
# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
# the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
# from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.
- getCurrentTime:
call: http.get
args:
url: $${sys.get_env(\"url\")}
result: CurrentDateTime
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: $${CurrentDateTime.body.dayOfTheWeek}
result: WikiResult
- returnOutput:
return: $${WikiResult.body[1]}
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- workflow: ${workflow.id}
When you set the workflow property in destinations, the pipeline starts a new workflow execution for each incoming event. The workflow receives the event data as input and can orchestrate complex multi-step processes.
Authenticate with OIDC and transform JSON payloads
Secure HTTP endpoints often require authentication tokens. Pipelines can generate OIDC tokens automatically and transform event payloads before delivery.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
httpEndpoint: {
uri: "https://10.77.0.0:80/route",
messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
networkConfig: {
networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
authenticationConfig: {
googleOidc: {
serviceAccount: "my@service-account.com",
audience: "http://www.example.com",
},
},
outputPayloadFormat: {
json: {},
},
}],
inputPayloadFormat: {
json: {},
},
retryPolicy: {
maxRetryDelay: "50s",
maxAttempts: 2,
minRetryDelay: "40s",
},
mediations: [{
transformation: {
transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
},
}],
loggingConfig: {
logSeverity: "DEBUG",
},
});
import pulumi
import pulumi_gcp as gcp
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"http_endpoint": {
"uri": "https://10.77.0.0:80/route",
"message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
"network_config": {
"network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
"authentication_config": {
"google_oidc": {
"service_account": "my@service-account.com",
"audience": "http://www.example.com",
},
},
"output_payload_format": {
"json": {},
},
}],
input_payload_format={
"json": {},
},
retry_policy={
"max_retry_delay": "50s",
"max_attempts": 2,
"min_retry_delay": "40s",
},
mediations=[{
"transformation": {
"transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
},
}],
logging_config={
"log_severity": "DEBUG",
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
Uri: pulumi.String("https://10.77.0.0:80/route"),
MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
},
NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
},
AuthenticationConfig: &eventarc.PipelineDestinationAuthenticationConfigArgs{
GoogleOidc: &eventarc.PipelineDestinationAuthenticationConfigGoogleOidcArgs{
ServiceAccount: pulumi.String("my@service-account.com"),
Audience: pulumi.String("http://www.example.com"),
},
},
OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
Json: &eventarc.PipelineDestinationOutputPayloadFormatJsonArgs{},
},
},
},
InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
Json: &eventarc.PipelineInputPayloadFormatJsonArgs{},
},
RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
MaxRetryDelay: pulumi.String("50s"),
MaxAttempts: pulumi.Int(2),
MinRetryDelay: pulumi.String("40s"),
},
Mediations: eventarc.PipelineMediationArray{
&eventarc.PipelineMediationArgs{
Transformation: &eventarc.PipelineMediationTransformationArgs{
TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
},
},
},
LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
LogSeverity: pulumi.String("DEBUG"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
{
Uri = "https://10.77.0.0:80/route",
MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
{
NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
AuthenticationConfig = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigArgs
{
GoogleOidc = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigGoogleOidcArgs
{
ServiceAccount = "my@service-account.com",
Audience = "http://www.example.com",
},
},
OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
{
Json = null,
},
},
},
InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
{
Json = null,
},
RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
{
MaxRetryDelay = "50s",
MaxAttempts = 2,
MinRetryDelay = "40s",
},
Mediations = new[]
{
new Gcp.Eventarc.Inputs.PipelineMediationArgs
{
Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
{
TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
},
},
},
LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
{
LogSeverity = "DEBUG",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigGoogleOidcArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatJsonArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatJsonArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
.uri("https://10.77.0.0:80/route")
.messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
.build())
.networkConfig(PipelineDestinationNetworkConfigArgs.builder()
.networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
.build())
.authenticationConfig(PipelineDestinationAuthenticationConfigArgs.builder()
.googleOidc(PipelineDestinationAuthenticationConfigGoogleOidcArgs.builder()
.serviceAccount("my@service-account.com")
.audience("http://www.example.com")
.build())
.build())
.outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
.json(PipelineDestinationOutputPayloadFormatJsonArgs.builder()
.build())
.build())
.build())
.inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
.json(PipelineInputPayloadFormatJsonArgs.builder()
.build())
.build())
.retryPolicy(PipelineRetryPolicyArgs.builder()
.maxRetryDelay("50s")
.maxAttempts(2)
.minRetryDelay("40s")
.build())
.mediations(PipelineMediationArgs.builder()
.transformation(PipelineMediationTransformationArgs.builder()
.transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""")
.build())
.build())
.loggingConfig(PipelineLoggingConfigArgs.builder()
.logSeverity("DEBUG")
.build())
.build());
}
}
resources:
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- httpEndpoint:
uri: https://10.77.0.0:80/route
messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
networkConfig:
networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
authenticationConfig:
googleOidc:
serviceAccount: my@service-account.com
audience: http://www.example.com
outputPayloadFormat:
json: {}
inputPayloadFormat:
json: {}
retryPolicy:
maxRetryDelay: 50s
maxAttempts: 2
minRetryDelay: 40s
mediations:
- transformation:
transformationTemplate: |
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
loggingConfig:
logSeverity: DEBUG
The authenticationConfig property configures how the pipeline authenticates to the destination. The googleOidc block generates OpenID Connect tokens using the specified service account. The mediations property defines transformations applied to events; the transformationTemplate uses CEL (Common Expression Language) to reshape the event structure. The inputPayloadFormat and outputPayloadFormat properties control serialization; setting both to json maintains JSON throughout the pipeline.
Use OAuth tokens with Protocol Buffer schemas
Some APIs require OAuth 2.0 tokens instead of OIDC. Pipelines support both authentication methods and can convert between formats.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
destinations: [{
httpEndpoint: {
uri: "https://10.77.0.0:80/route",
messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
networkConfig: {
networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
authenticationConfig: {
oauthToken: {
serviceAccount: "my@service-account.com",
scope: "https://www.googleapis.com/auth/cloud-platform",
},
},
outputPayloadFormat: {
protobuf: {
schemaDefinition: `syntax = \\"proto3\\";
message schema {
string name = 1;
string severity = 2;
}
`,
},
},
}],
inputPayloadFormat: {
protobuf: {
schemaDefinition: `syntax = \\"proto3\\";
message schema {
string name = 1;
string severity = 2;
}
`,
},
},
retryPolicy: {
maxRetryDelay: "50s",
maxAttempts: 2,
minRetryDelay: "40s",
},
mediations: [{
transformation: {
transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
},
}],
loggingConfig: {
logSeverity: "DEBUG",
},
});
import pulumi
import pulumi_gcp as gcp
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
destinations=[{
"http_endpoint": {
"uri": "https://10.77.0.0:80/route",
"message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
"network_config": {
"network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
"authentication_config": {
"oauth_token": {
"service_account": "my@service-account.com",
"scope": "https://www.googleapis.com/auth/cloud-platform",
},
},
"output_payload_format": {
"protobuf": {
"schema_definition": """syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""",
},
},
}],
input_payload_format={
"protobuf": {
"schema_definition": """syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""",
},
},
retry_policy={
"max_retry_delay": "50s",
"max_attempts": 2,
"min_retry_delay": "40s",
},
mediations=[{
"transformation": {
"transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
},
}],
logging_config={
"log_severity": "DEBUG",
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
Uri: pulumi.String("https://10.77.0.0:80/route"),
MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
},
NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
},
AuthenticationConfig: &eventarc.PipelineDestinationAuthenticationConfigArgs{
OauthToken: &eventarc.PipelineDestinationAuthenticationConfigOauthTokenArgs{
ServiceAccount: pulumi.String("my@service-account.com"),
Scope: pulumi.String("https://www.googleapis.com/auth/cloud-platform"),
},
},
OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
Protobuf: &eventarc.PipelineDestinationOutputPayloadFormatProtobufArgs{
SchemaDefinition: pulumi.String(`syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
`),
},
},
},
},
InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
Protobuf: &eventarc.PipelineInputPayloadFormatProtobufArgs{
SchemaDefinition: pulumi.String(`syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
`),
},
},
RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
MaxRetryDelay: pulumi.String("50s"),
MaxAttempts: pulumi.Int(2),
MinRetryDelay: pulumi.String("40s"),
},
Mediations: eventarc.PipelineMediationArray{
&eventarc.PipelineMediationArgs{
Transformation: &eventarc.PipelineMediationTransformationArgs{
TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
},
},
},
LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
LogSeverity: pulumi.String("DEBUG"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
{
Uri = "https://10.77.0.0:80/route",
MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
{
NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
AuthenticationConfig = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigArgs
{
OauthToken = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigOauthTokenArgs
{
ServiceAccount = "my@service-account.com",
Scope = "https://www.googleapis.com/auth/cloud-platform",
},
},
OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
{
Protobuf = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatProtobufArgs
{
SchemaDefinition = @"syntax = \""proto3\"";
message schema {
string name = 1;
string severity = 2;
}
",
},
},
},
},
InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
{
Protobuf = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatProtobufArgs
{
SchemaDefinition = @"syntax = \""proto3\"";
message schema {
string name = 1;
string severity = 2;
}
",
},
},
RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
{
MaxRetryDelay = "50s",
MaxAttempts = 2,
MinRetryDelay = "40s",
},
Mediations = new[]
{
new Gcp.Eventarc.Inputs.PipelineMediationArgs
{
Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
{
TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
},
},
},
LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
{
LogSeverity = "DEBUG",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigOauthTokenArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatProtobufArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatProtobufArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.destinations(PipelineDestinationArgs.builder()
.httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
.uri("https://10.77.0.0:80/route")
.messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
.build())
.networkConfig(PipelineDestinationNetworkConfigArgs.builder()
.networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
.build())
.authenticationConfig(PipelineDestinationAuthenticationConfigArgs.builder()
.oauthToken(PipelineDestinationAuthenticationConfigOauthTokenArgs.builder()
.serviceAccount("my@service-account.com")
.scope("https://www.googleapis.com/auth/cloud-platform")
.build())
.build())
.outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
.protobuf(PipelineDestinationOutputPayloadFormatProtobufArgs.builder()
.schemaDefinition("""
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""")
.build())
.build())
.build())
.inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
.protobuf(PipelineInputPayloadFormatProtobufArgs.builder()
.schemaDefinition("""
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""")
.build())
.build())
.retryPolicy(PipelineRetryPolicyArgs.builder()
.maxRetryDelay("50s")
.maxAttempts(2)
.minRetryDelay("40s")
.build())
.mediations(PipelineMediationArgs.builder()
.transformation(PipelineMediationTransformationArgs.builder()
.transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""")
.build())
.build())
.loggingConfig(PipelineLoggingConfigArgs.builder()
.logSeverity("DEBUG")
.build())
.build());
}
}
resources:
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
destinations:
- httpEndpoint:
uri: https://10.77.0.0:80/route
messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
networkConfig:
networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
authenticationConfig:
oauthToken:
serviceAccount: my@service-account.com
scope: https://www.googleapis.com/auth/cloud-platform
outputPayloadFormat:
protobuf:
schemaDefinition: |
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
inputPayloadFormat:
protobuf:
schemaDefinition: |
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
retryPolicy:
maxRetryDelay: 50s
maxAttempts: 2
minRetryDelay: 40s
mediations:
- transformation:
transformationTemplate: |
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
loggingConfig:
logSeverity: DEBUG
The oauthToken block generates OAuth 2.0 access tokens with the specified scope. The protobuf format requires a schemaDefinition that defines the message structure. When you set inputPayloadFormat and outputPayloadFormat to protobuf, the pipeline deserializes incoming events and serializes outgoing events according to your schema.
Encrypt events with customer-managed keys
Compliance requirements often mandate customer-managed encryption keys. Pipelines can encrypt event data using Cloud KMS keys.
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
const primary = new gcp.eventarc.Pipeline("primary", {
location: "us-central1",
pipelineId: "some-pipeline",
cryptoKeyName: "some-key",
destinations: [{
httpEndpoint: {
uri: "https://10.77.0.0:80/route",
messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
networkConfig: {
networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
outputPayloadFormat: {
avro: {
schemaDefinition: "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
},
},
}],
inputPayloadFormat: {
avro: {
schemaDefinition: "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
},
},
retryPolicy: {
maxRetryDelay: "50s",
maxAttempts: 2,
minRetryDelay: "40s",
},
mediations: [{
transformation: {
transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
},
}],
loggingConfig: {
logSeverity: "DEBUG",
},
});
import pulumi
import pulumi_gcp as gcp
primary = gcp.eventarc.Pipeline("primary",
location="us-central1",
pipeline_id="some-pipeline",
crypto_key_name="some-key",
destinations=[{
"http_endpoint": {
"uri": "https://10.77.0.0:80/route",
"message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
"network_config": {
"network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
"output_payload_format": {
"avro": {
"schema_definition": "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
},
},
}],
input_payload_format={
"avro": {
"schema_definition": "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
},
},
retry_policy={
"max_retry_delay": "50s",
"max_attempts": 2,
"min_retry_delay": "40s",
},
mediations=[{
"transformation": {
"transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
},
}],
logging_config={
"log_severity": "DEBUG",
})
package main
import (
"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
func main() {
pulumi.Run(func(ctx *pulumi.Context) error {
_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
Location: pulumi.String("us-central1"),
PipelineId: pulumi.String("some-pipeline"),
CryptoKeyName: pulumi.String("some-key"),
Destinations: eventarc.PipelineDestinationArray{
&eventarc.PipelineDestinationArgs{
HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
Uri: pulumi.String("https://10.77.0.0:80/route"),
MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
},
NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
},
OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
Avro: &eventarc.PipelineDestinationOutputPayloadFormatAvroArgs{
SchemaDefinition: pulumi.String("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}"),
},
},
},
},
InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
Avro: &eventarc.PipelineInputPayloadFormatAvroArgs{
SchemaDefinition: pulumi.String("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}"),
},
},
RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
MaxRetryDelay: pulumi.String("50s"),
MaxAttempts: pulumi.Int(2),
MinRetryDelay: pulumi.String("40s"),
},
Mediations: eventarc.PipelineMediationArray{
&eventarc.PipelineMediationArgs{
Transformation: &eventarc.PipelineMediationTransformationArgs{
TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
},
},
},
LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
LogSeverity: pulumi.String("DEBUG"),
},
})
if err != nil {
return err
}
return nil
})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;
return await Deployment.RunAsync(() =>
{
var primary = new Gcp.Eventarc.Pipeline("primary", new()
{
Location = "us-central1",
PipelineId = "some-pipeline",
CryptoKeyName = "some-key",
Destinations = new[]
{
new Gcp.Eventarc.Inputs.PipelineDestinationArgs
{
HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
{
Uri = "https://10.77.0.0:80/route",
MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
},
NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
{
NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
},
OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
{
Avro = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatAvroArgs
{
SchemaDefinition = "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
},
},
},
},
InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
{
Avro = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatAvroArgs
{
SchemaDefinition = "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
},
},
RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
{
MaxRetryDelay = "50s",
MaxAttempts = 2,
MinRetryDelay = "40s",
},
Mediations = new[]
{
new Gcp.Eventarc.Inputs.PipelineMediationArgs
{
Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
{
TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
},
},
},
LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
{
LogSeverity = "DEBUG",
},
});
});
package generated_program;
import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatAvroArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatAvroArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
public class App {
public static void main(String[] args) {
Pulumi.run(App::stack);
}
public static void stack(Context ctx) {
var primary = new Pipeline("primary", PipelineArgs.builder()
.location("us-central1")
.pipelineId("some-pipeline")
.cryptoKeyName("some-key")
.destinations(PipelineDestinationArgs.builder()
.httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
.uri("https://10.77.0.0:80/route")
.messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
.build())
.networkConfig(PipelineDestinationNetworkConfigArgs.builder()
.networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
.build())
.outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
.avro(PipelineDestinationOutputPayloadFormatAvroArgs.builder()
.schemaDefinition("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}")
.build())
.build())
.build())
.inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
.avro(PipelineInputPayloadFormatAvroArgs.builder()
.schemaDefinition("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}")
.build())
.build())
.retryPolicy(PipelineRetryPolicyArgs.builder()
.maxRetryDelay("50s")
.maxAttempts(2)
.minRetryDelay("40s")
.build())
.mediations(PipelineMediationArgs.builder()
.transformation(PipelineMediationTransformationArgs.builder()
.transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""")
.build())
.build())
.loggingConfig(PipelineLoggingConfigArgs.builder()
.logSeverity("DEBUG")
.build())
.build());
}
}
resources:
primary:
type: gcp:eventarc:Pipeline
properties:
location: us-central1
pipelineId: some-pipeline
cryptoKeyName: some-key
destinations:
- httpEndpoint:
uri: https://10.77.0.0:80/route
messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
networkConfig:
networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
outputPayloadFormat:
avro:
schemaDefinition: '{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'
inputPayloadFormat:
avro:
schemaDefinition: '{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'
retryPolicy:
maxRetryDelay: 50s
maxAttempts: 2
minRetryDelay: 40s
mediations:
- transformation:
transformationTemplate: |
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
loggingConfig:
logSeverity: DEBUG
The cryptoKeyName property specifies a KMS crypto key for encrypting event data at rest. The avro format uses a JSON schema definition to describe the record structure. This configuration ensures events are encrypted with your own keys while maintaining structured data serialization.
Beyond these examples
These snippets focus on specific pipeline features: destination types (Pub/Sub, HTTP, Workflows), authentication (OIDC, OAuth), and payload formats and transformations. They’re intentionally minimal rather than full event processing systems.
The examples may reference pre-existing infrastructure such as Pub/Sub topics, Workflows resources, network attachments for private connectivity, service accounts with appropriate IAM permissions, and KMS crypto keys for encryption. They focus on configuring the pipeline rather than provisioning everything around it.
To keep things focused, common pipeline patterns are omitted, including:
- Retry policy tuning (maxAttempts, minRetryDelay, maxRetryDelay)
- Logging configuration (logSeverity)
- Message binding templates for header manipulation
- Labels and annotations for resource organization
These omissions are intentional: the goal is to illustrate how each pipeline feature is wired, not provide drop-in event processing modules. See the Eventarc Pipeline resource reference for all available configuration options.
Let's create GCP Eventarc Pipelines
Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.
Try Pulumi Cloud for FREEFrequently Asked Questions
Destinations & Routing
destinations array.topic, HTTP httpEndpoint (requires networkAttachment), or Workflows workflow.authenticationConfig with either googleOidc (service account and audience) or oauthToken (service account and scope).Retry & Error Handling
minRetryDelay and maxRetryDelay to the same value in retryPolicy. This makes the duration between retries constant.Payload Formats & Transformation
inputPayloadFormat and outputPayloadFormat.schemaDefinition.mediations array.Security & Encryption
cryptoKeyName with a KMS crypto key. If not set, an internal Google-owned key is used to encrypt messages.Resource Configuration
location, pipelineId, and project properties are immutable. Changing them will force resource recreation.labels and annotations fields are non-authoritative and only manage values in your configuration. Use effectiveLabels and effectiveAnnotations to see all values present on the resource.Using a different cloud?
Explore messaging guides for other cloud providers: