Create GCP Eventarc Pipelines

The gcp:eventarc/pipeline:Pipeline resource, part of the Pulumi GCP provider, defines an Eventarc pipeline that routes events to destinations with optional transformation, authentication, and format conversion. This guide focuses on three capabilities: routing to Pub/Sub, HTTP endpoints, and Workflows; authentication with OIDC and OAuth tokens; and payload transformation and format conversion.

Pipelines reference existing infrastructure like Pub/Sub topics, Workflows, network attachments, service accounts, and KMS keys. The examples are intentionally small. Combine them with your own event sources, authentication configuration, and destination infrastructure.

Route events to a Pub/Sub topic

Event-driven architectures often begin by routing events to Pub/Sub topics, where multiple subscribers can process messages independently.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const topic = new gcp.pubsub.Topic("topic", {name: "some-topic"});
const primary = new gcp.eventarc.Pipeline("primary", {
    location: "us-central1",
    pipelineId: "some-pipeline",
    destinations: [{
        topic: topic.id,
    }],
    labels: {
        test_label: "test-eventarc-label",
    },
    annotations: {
        test_annotation: "test-eventarc-annotation",
    },
    displayName: "Testing Pipeline",
});
import pulumi
import pulumi_gcp as gcp

topic = gcp.pubsub.Topic("topic", name="some-topic")
primary = gcp.eventarc.Pipeline("primary",
    location="us-central1",
    pipeline_id="some-pipeline",
    destinations=[{
        "topic": topic.id,
    }],
    labels={
        "test_label": "test-eventarc-label",
    },
    annotations={
        "test_annotation": "test-eventarc-annotation",
    },
    display_name="Testing Pipeline")
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/pubsub"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		topic, err := pubsub.NewTopic(ctx, "topic", &pubsub.TopicArgs{
			Name: pulumi.String("some-topic"),
		})
		if err != nil {
			return err
		}
		_, err = eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
			Location:   pulumi.String("us-central1"),
			PipelineId: pulumi.String("some-pipeline"),
			Destinations: eventarc.PipelineDestinationArray{
				&eventarc.PipelineDestinationArgs{
					Topic: topic.ID(),
				},
			},
			Labels: pulumi.StringMap{
				"test_label": pulumi.String("test-eventarc-label"),
			},
			Annotations: pulumi.StringMap{
				"test_annotation": pulumi.String("test-eventarc-annotation"),
			},
			DisplayName: pulumi.String("Testing Pipeline"),
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var topic = new Gcp.PubSub.Topic("topic", new()
    {
        Name = "some-topic",
    });

    var primary = new Gcp.Eventarc.Pipeline("primary", new()
    {
        Location = "us-central1",
        PipelineId = "some-pipeline",
        Destinations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineDestinationArgs
            {
                Topic = topic.Id,
            },
        },
        Labels = 
        {
            { "test_label", "test-eventarc-label" },
        },
        Annotations = 
        {
            { "test_annotation", "test-eventarc-annotation" },
        },
        DisplayName = "Testing Pipeline",
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.pubsub.Topic;
import com.pulumi.gcp.pubsub.TopicArgs;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var topic = new Topic("topic", TopicArgs.builder()
            .name("some-topic")
            .build());

        var primary = new Pipeline("primary", PipelineArgs.builder()
            .location("us-central1")
            .pipelineId("some-pipeline")
            .destinations(PipelineDestinationArgs.builder()
                .topic(topic.id())
                .build())
            .labels(Map.of("test_label", "test-eventarc-label"))
            .annotations(Map.of("test_annotation", "test-eventarc-annotation"))
            .displayName("Testing Pipeline")
            .build());

    }
}
resources:
  topic:
    type: gcp:pubsub:Topic
    properties:
      name: some-topic
  primary:
    type: gcp:eventarc:Pipeline
    properties:
      location: us-central1
      pipelineId: some-pipeline
      destinations:
        - topic: ${topic.id}
      labels:
        test_label: test-eventarc-label
      annotations:
        test_annotation: test-eventarc-annotation
      displayName: Testing Pipeline

The destinations property defines where events are sent. When you specify a topic, the pipeline forwards all incoming events to that Pub/Sub topic. The location and pipelineId properties identify the pipeline within your GCP project.

Deliver events to an HTTP endpoint

Applications that expose HTTP APIs can receive events directly through pipeline delivery.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const primary = new gcp.eventarc.Pipeline("primary", {
    location: "us-central1",
    pipelineId: "some-pipeline",
    destinations: [{
        httpEndpoint: {
            uri: "https://10.77.0.0:80/route",
        },
        networkConfig: {
            networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
    }],
});
import pulumi
import pulumi_gcp as gcp

primary = gcp.eventarc.Pipeline("primary",
    location="us-central1",
    pipeline_id="some-pipeline",
    destinations=[{
        "http_endpoint": {
            "uri": "https://10.77.0.0:80/route",
        },
        "network_config": {
            "network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
    }])
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
			Location:   pulumi.String("us-central1"),
			PipelineId: pulumi.String("some-pipeline"),
			Destinations: eventarc.PipelineDestinationArray{
				&eventarc.PipelineDestinationArgs{
					HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
						Uri: pulumi.String("https://10.77.0.0:80/route"),
					},
					NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
						NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
					},
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var primary = new Gcp.Eventarc.Pipeline("primary", new()
    {
        Location = "us-central1",
        PipelineId = "some-pipeline",
        Destinations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineDestinationArgs
            {
                HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
                {
                    Uri = "https://10.77.0.0:80/route",
                },
                NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
                {
                    NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
                },
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var primary = new Pipeline("primary", PipelineArgs.builder()
            .location("us-central1")
            .pipelineId("some-pipeline")
            .destinations(PipelineDestinationArgs.builder()
                .httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
                    .uri("https://10.77.0.0:80/route")
                    .build())
                .networkConfig(PipelineDestinationNetworkConfigArgs.builder()
                    .networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
                    .build())
                .build())
            .build());

    }
}
resources:
  primary:
    type: gcp:eventarc:Pipeline
    properties:
      location: us-central1
      pipelineId: some-pipeline
      destinations:
        - httpEndpoint:
            uri: https://10.77.0.0:80/route
          networkConfig:
            networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment

The httpEndpoint property sets the target URI. The networkConfig property enables private connectivity through a network attachment, allowing you to reach endpoints that aren’t publicly accessible. This configuration routes events to your HTTP service without exposing it to the internet.

Trigger Workflows from events

Workflows orchestrate multi-step processes across Google Cloud services. Pipelines can invoke workflows in response to events.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const workflow = new gcp.workflows.Workflow("workflow", {
    name: "some-workflow",
    deletionProtection: false,
    region: "us-central1",
    sourceContents: `# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
#   the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
#   from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the  or it will cause errors.

- getCurrentTime:
    call: http.get
    args:
        url: \${sys.get_env(\\"url\\")}
    result: CurrentDateTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: \${CurrentDateTime.body.dayOfTheWeek}
    result: WikiResult
- returnOutput:
    return: \${WikiResult.body[1]}
`,
});
const primary = new gcp.eventarc.Pipeline("primary", {
    location: "us-central1",
    pipelineId: "some-pipeline",
    destinations: [{
        workflow: workflow.id,
    }],
});
import pulumi
import pulumi_gcp as gcp

workflow = gcp.workflows.Workflow("workflow",
    name="some-workflow",
    deletion_protection=False,
    region="us-central1",
    source_contents="""# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
#   the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
#   from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.

- getCurrentTime:
    call: http.get
    args:
        url: ${sys.get_env(\"url\")}
    result: CurrentDateTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${CurrentDateTime.body.dayOfTheWeek}
    result: WikiResult
- returnOutput:
    return: ${WikiResult.body[1]}
""")
primary = gcp.eventarc.Pipeline("primary",
    location="us-central1",
    pipeline_id="some-pipeline",
    destinations=[{
        "workflow": workflow.id,
    }])
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/workflows"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		workflow, err := workflows.NewWorkflow(ctx, "workflow", &workflows.WorkflowArgs{
			Name:               pulumi.String("some-workflow"),
			DeletionProtection: pulumi.Bool(false),
			Region:             pulumi.String("us-central1"),
			SourceContents: pulumi.String(`# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
#   the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
#   from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.

- getCurrentTime:
    call: http.get
    args:
        url: ${sys.get_env(\"url\")}
    result: CurrentDateTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${CurrentDateTime.body.dayOfTheWeek}
    result: WikiResult
- returnOutput:
    return: ${WikiResult.body[1]}
`),
		})
		if err != nil {
			return err
		}
		_, err = eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
			Location:   pulumi.String("us-central1"),
			PipelineId: pulumi.String("some-pipeline"),
			Destinations: eventarc.PipelineDestinationArray{
				&eventarc.PipelineDestinationArgs{
					Workflow: workflow.ID(),
				},
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var workflow = new Gcp.Workflows.Workflow("workflow", new()
    {
        Name = "some-workflow",
        DeletionProtection = false,
        Region = "us-central1",
        SourceContents = @"# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
#   the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
#   from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.

- getCurrentTime:
    call: http.get
    args:
        url: ${sys.get_env(\""url\"")}
    result: CurrentDateTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${CurrentDateTime.body.dayOfTheWeek}
    result: WikiResult
- returnOutput:
    return: ${WikiResult.body[1]}
",
    });

    var primary = new Gcp.Eventarc.Pipeline("primary", new()
    {
        Location = "us-central1",
        PipelineId = "some-pipeline",
        Destinations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineDestinationArgs
            {
                Workflow = workflow.Id,
            },
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.workflows.Workflow;
import com.pulumi.gcp.workflows.WorkflowArgs;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var workflow = new Workflow("workflow", WorkflowArgs.builder()
            .name("some-workflow")
            .deletionProtection(false)
            .region("us-central1")
            .sourceContents("""
# This is a sample workflow, feel free to replace it with your source code
#
# This workflow does the following:
# - reads current time and date information from an external API and stores
#   the response in CurrentDateTime variable
# - retrieves a list of Wikipedia articles related to the day of the week
#   from CurrentDateTime
# - returns the list of articles as an output of the workflow
# FYI, In terraform you need to escape the $$ or it will cause errors.

- getCurrentTime:
    call: http.get
    args:
        url: ${sys.get_env(\"url\")}
    result: CurrentDateTime
- readWikipedia:
    call: http.get
    args:
        url: https://en.wikipedia.org/w/api.php
        query:
            action: opensearch
            search: ${CurrentDateTime.body.dayOfTheWeek}
    result: WikiResult
- returnOutput:
    return: ${WikiResult.body[1]}
            """)
            .build());

        var primary = new Pipeline("primary", PipelineArgs.builder()
            .location("us-central1")
            .pipelineId("some-pipeline")
            .destinations(PipelineDestinationArgs.builder()
                .workflow(workflow.id())
                .build())
            .build());

    }
}
resources:
  workflow:
    type: gcp:workflows:Workflow
    properties:
      name: some-workflow
      deletionProtection: false
      region: us-central1
      sourceContents: |
        # This is a sample workflow, feel free to replace it with your source code
        #
        # This workflow does the following:
        # - reads current time and date information from an external API and stores
        #   the response in CurrentDateTime variable
        # - retrieves a list of Wikipedia articles related to the day of the week
        #   from CurrentDateTime
        # - returns the list of articles as an output of the workflow
        # FYI, In terraform you need to escape the $$ or it will cause errors.

        - getCurrentTime:
            call: http.get
            args:
                url: $${sys.get_env(\"url\")}
            result: CurrentDateTime
        - readWikipedia:
            call: http.get
            args:
                url: https://en.wikipedia.org/w/api.php
                query:
                    action: opensearch
                    search: $${CurrentDateTime.body.dayOfTheWeek}
            result: WikiResult
        - returnOutput:
            return: $${WikiResult.body[1]}        
  primary:
    type: gcp:eventarc:Pipeline
    properties:
      location: us-central1
      pipelineId: some-pipeline
      destinations:
        - workflow: ${workflow.id}

When you set the workflow property in destinations, the pipeline starts a new workflow execution for each incoming event. The workflow receives the event data as input and can orchestrate complex multi-step processes.

Authenticate with OIDC and transform JSON payloads

Secure HTTP endpoints often require authentication tokens. Pipelines can generate OIDC tokens automatically and transform event payloads before delivery.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const primary = new gcp.eventarc.Pipeline("primary", {
    location: "us-central1",
    pipelineId: "some-pipeline",
    destinations: [{
        httpEndpoint: {
            uri: "https://10.77.0.0:80/route",
            messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
        },
        networkConfig: {
            networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
        authenticationConfig: {
            googleOidc: {
                serviceAccount: "my@service-account.com",
                audience: "http://www.example.com",
            },
        },
        outputPayloadFormat: {
            json: {},
        },
    }],
    inputPayloadFormat: {
        json: {},
    },
    retryPolicy: {
        maxRetryDelay: "50s",
        maxAttempts: 2,
        minRetryDelay: "40s",
    },
    mediations: [{
        transformation: {
            transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
        },
    }],
    loggingConfig: {
        logSeverity: "DEBUG",
    },
});
import pulumi
import pulumi_gcp as gcp

primary = gcp.eventarc.Pipeline("primary",
    location="us-central1",
    pipeline_id="some-pipeline",
    destinations=[{
        "http_endpoint": {
            "uri": "https://10.77.0.0:80/route",
            "message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
        },
        "network_config": {
            "network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
        "authentication_config": {
            "google_oidc": {
                "service_account": "my@service-account.com",
                "audience": "http://www.example.com",
            },
        },
        "output_payload_format": {
            "json": {},
        },
    }],
    input_payload_format={
        "json": {},
    },
    retry_policy={
        "max_retry_delay": "50s",
        "max_attempts": 2,
        "min_retry_delay": "40s",
    },
    mediations=[{
        "transformation": {
            "transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
        },
    }],
    logging_config={
        "log_severity": "DEBUG",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
			Location:   pulumi.String("us-central1"),
			PipelineId: pulumi.String("some-pipeline"),
			Destinations: eventarc.PipelineDestinationArray{
				&eventarc.PipelineDestinationArgs{
					HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
						Uri:                    pulumi.String("https://10.77.0.0:80/route"),
						MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
					},
					NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
						NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
					},
					AuthenticationConfig: &eventarc.PipelineDestinationAuthenticationConfigArgs{
						GoogleOidc: &eventarc.PipelineDestinationAuthenticationConfigGoogleOidcArgs{
							ServiceAccount: pulumi.String("my@service-account.com"),
							Audience:       pulumi.String("http://www.example.com"),
						},
					},
					OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
						Json: &eventarc.PipelineDestinationOutputPayloadFormatJsonArgs{},
					},
				},
			},
			InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
				Json: &eventarc.PipelineInputPayloadFormatJsonArgs{},
			},
			RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
				MaxRetryDelay: pulumi.String("50s"),
				MaxAttempts:   pulumi.Int(2),
				MinRetryDelay: pulumi.String("40s"),
			},
			Mediations: eventarc.PipelineMediationArray{
				&eventarc.PipelineMediationArgs{
					Transformation: &eventarc.PipelineMediationTransformationArgs{
						TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
					},
				},
			},
			LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
				LogSeverity: pulumi.String("DEBUG"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var primary = new Gcp.Eventarc.Pipeline("primary", new()
    {
        Location = "us-central1",
        PipelineId = "some-pipeline",
        Destinations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineDestinationArgs
            {
                HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
                {
                    Uri = "https://10.77.0.0:80/route",
                    MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
                },
                NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
                {
                    NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
                },
                AuthenticationConfig = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigArgs
                {
                    GoogleOidc = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigGoogleOidcArgs
                    {
                        ServiceAccount = "my@service-account.com",
                        Audience = "http://www.example.com",
                    },
                },
                OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
                {
                    Json = null,
                },
            },
        },
        InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
        {
            Json = null,
        },
        RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
        {
            MaxRetryDelay = "50s",
            MaxAttempts = 2,
            MinRetryDelay = "40s",
        },
        Mediations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineMediationArgs
            {
                Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
                {
                    TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
                },
            },
        },
        LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
        {
            LogSeverity = "DEBUG",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigGoogleOidcArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatJsonArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatJsonArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var primary = new Pipeline("primary", PipelineArgs.builder()
            .location("us-central1")
            .pipelineId("some-pipeline")
            .destinations(PipelineDestinationArgs.builder()
                .httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
                    .uri("https://10.77.0.0:80/route")
                    .messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
                    .build())
                .networkConfig(PipelineDestinationNetworkConfigArgs.builder()
                    .networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
                    .build())
                .authenticationConfig(PipelineDestinationAuthenticationConfigArgs.builder()
                    .googleOidc(PipelineDestinationAuthenticationConfigGoogleOidcArgs.builder()
                        .serviceAccount("my@service-account.com")
                        .audience("http://www.example.com")
                        .build())
                    .build())
                .outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
                    .json(PipelineDestinationOutputPayloadFormatJsonArgs.builder()
                        .build())
                    .build())
                .build())
            .inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
                .json(PipelineInputPayloadFormatJsonArgs.builder()
                    .build())
                .build())
            .retryPolicy(PipelineRetryPolicyArgs.builder()
                .maxRetryDelay("50s")
                .maxAttempts(2)
                .minRetryDelay("40s")
                .build())
            .mediations(PipelineMediationArgs.builder()
                .transformation(PipelineMediationTransformationArgs.builder()
                    .transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
                    """)
                    .build())
                .build())
            .loggingConfig(PipelineLoggingConfigArgs.builder()
                .logSeverity("DEBUG")
                .build())
            .build());

    }
}
resources:
  primary:
    type: gcp:eventarc:Pipeline
    properties:
      location: us-central1
      pipelineId: some-pipeline
      destinations:
        - httpEndpoint:
            uri: https://10.77.0.0:80/route
            messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
          networkConfig:
            networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
          authenticationConfig:
            googleOidc:
              serviceAccount: my@service-account.com
              audience: http://www.example.com
          outputPayloadFormat:
            json: {}
      inputPayloadFormat:
        json: {}
      retryPolicy:
        maxRetryDelay: 50s
        maxAttempts: 2
        minRetryDelay: 40s
      mediations:
        - transformation:
            transformationTemplate: |
              {
              \"id\": message.id,
              \"datacontenttype\": \"application/json\",
              \"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
              }              
      loggingConfig:
        logSeverity: DEBUG

The authenticationConfig property configures how the pipeline authenticates to the destination. The googleOidc block generates OpenID Connect tokens using the specified service account. The mediations property defines transformations applied to events; the transformationTemplate uses CEL (Common Expression Language) to reshape the event structure. The inputPayloadFormat and outputPayloadFormat properties control serialization; setting both to json maintains JSON throughout the pipeline.

Use OAuth tokens with Protocol Buffer schemas

Some APIs require OAuth 2.0 tokens instead of OIDC. Pipelines support both authentication methods and can convert between formats.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const primary = new gcp.eventarc.Pipeline("primary", {
    location: "us-central1",
    pipelineId: "some-pipeline",
    destinations: [{
        httpEndpoint: {
            uri: "https://10.77.0.0:80/route",
            messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
        },
        networkConfig: {
            networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
        authenticationConfig: {
            oauthToken: {
                serviceAccount: "my@service-account.com",
                scope: "https://www.googleapis.com/auth/cloud-platform",
            },
        },
        outputPayloadFormat: {
            protobuf: {
                schemaDefinition: `syntax = \\"proto3\\";
message schema {
string name = 1;
string severity = 2;
}
`,
            },
        },
    }],
    inputPayloadFormat: {
        protobuf: {
            schemaDefinition: `syntax = \\"proto3\\";
message schema {
string name = 1;
string severity = 2;
}
`,
        },
    },
    retryPolicy: {
        maxRetryDelay: "50s",
        maxAttempts: 2,
        minRetryDelay: "40s",
    },
    mediations: [{
        transformation: {
            transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
        },
    }],
    loggingConfig: {
        logSeverity: "DEBUG",
    },
});
import pulumi
import pulumi_gcp as gcp

primary = gcp.eventarc.Pipeline("primary",
    location="us-central1",
    pipeline_id="some-pipeline",
    destinations=[{
        "http_endpoint": {
            "uri": "https://10.77.0.0:80/route",
            "message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
        },
        "network_config": {
            "network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
        "authentication_config": {
            "oauth_token": {
                "service_account": "my@service-account.com",
                "scope": "https://www.googleapis.com/auth/cloud-platform",
            },
        },
        "output_payload_format": {
            "protobuf": {
                "schema_definition": """syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""",
            },
        },
    }],
    input_payload_format={
        "protobuf": {
            "schema_definition": """syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
""",
        },
    },
    retry_policy={
        "max_retry_delay": "50s",
        "max_attempts": 2,
        "min_retry_delay": "40s",
    },
    mediations=[{
        "transformation": {
            "transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
        },
    }],
    logging_config={
        "log_severity": "DEBUG",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
			Location:   pulumi.String("us-central1"),
			PipelineId: pulumi.String("some-pipeline"),
			Destinations: eventarc.PipelineDestinationArray{
				&eventarc.PipelineDestinationArgs{
					HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
						Uri:                    pulumi.String("https://10.77.0.0:80/route"),
						MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
					},
					NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
						NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
					},
					AuthenticationConfig: &eventarc.PipelineDestinationAuthenticationConfigArgs{
						OauthToken: &eventarc.PipelineDestinationAuthenticationConfigOauthTokenArgs{
							ServiceAccount: pulumi.String("my@service-account.com"),
							Scope:          pulumi.String("https://www.googleapis.com/auth/cloud-platform"),
						},
					},
					OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
						Protobuf: &eventarc.PipelineDestinationOutputPayloadFormatProtobufArgs{
							SchemaDefinition: pulumi.String(`syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
`),
						},
					},
				},
			},
			InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
				Protobuf: &eventarc.PipelineInputPayloadFormatProtobufArgs{
					SchemaDefinition: pulumi.String(`syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
`),
				},
			},
			RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
				MaxRetryDelay: pulumi.String("50s"),
				MaxAttempts:   pulumi.Int(2),
				MinRetryDelay: pulumi.String("40s"),
			},
			Mediations: eventarc.PipelineMediationArray{
				&eventarc.PipelineMediationArgs{
					Transformation: &eventarc.PipelineMediationTransformationArgs{
						TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
					},
				},
			},
			LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
				LogSeverity: pulumi.String("DEBUG"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var primary = new Gcp.Eventarc.Pipeline("primary", new()
    {
        Location = "us-central1",
        PipelineId = "some-pipeline",
        Destinations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineDestinationArgs
            {
                HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
                {
                    Uri = "https://10.77.0.0:80/route",
                    MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
                },
                NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
                {
                    NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
                },
                AuthenticationConfig = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigArgs
                {
                    OauthToken = new Gcp.Eventarc.Inputs.PipelineDestinationAuthenticationConfigOauthTokenArgs
                    {
                        ServiceAccount = "my@service-account.com",
                        Scope = "https://www.googleapis.com/auth/cloud-platform",
                    },
                },
                OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
                {
                    Protobuf = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatProtobufArgs
                    {
                        SchemaDefinition = @"syntax = \""proto3\"";
message schema {
string name = 1;
string severity = 2;
}
",
                    },
                },
            },
        },
        InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
        {
            Protobuf = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatProtobufArgs
            {
                SchemaDefinition = @"syntax = \""proto3\"";
message schema {
string name = 1;
string severity = 2;
}
",
            },
        },
        RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
        {
            MaxRetryDelay = "50s",
            MaxAttempts = 2,
            MinRetryDelay = "40s",
        },
        Mediations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineMediationArgs
            {
                Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
                {
                    TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
                },
            },
        },
        LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
        {
            LogSeverity = "DEBUG",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationAuthenticationConfigOauthTokenArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatProtobufArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatProtobufArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var primary = new Pipeline("primary", PipelineArgs.builder()
            .location("us-central1")
            .pipelineId("some-pipeline")
            .destinations(PipelineDestinationArgs.builder()
                .httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
                    .uri("https://10.77.0.0:80/route")
                    .messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
                    .build())
                .networkConfig(PipelineDestinationNetworkConfigArgs.builder()
                    .networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
                    .build())
                .authenticationConfig(PipelineDestinationAuthenticationConfigArgs.builder()
                    .oauthToken(PipelineDestinationAuthenticationConfigOauthTokenArgs.builder()
                        .serviceAccount("my@service-account.com")
                        .scope("https://www.googleapis.com/auth/cloud-platform")
                        .build())
                    .build())
                .outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
                    .protobuf(PipelineDestinationOutputPayloadFormatProtobufArgs.builder()
                        .schemaDefinition("""
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
                        """)
                        .build())
                    .build())
                .build())
            .inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
                .protobuf(PipelineInputPayloadFormatProtobufArgs.builder()
                    .schemaDefinition("""
syntax = \"proto3\";
message schema {
string name = 1;
string severity = 2;
}
                    """)
                    .build())
                .build())
            .retryPolicy(PipelineRetryPolicyArgs.builder()
                .maxRetryDelay("50s")
                .maxAttempts(2)
                .minRetryDelay("40s")
                .build())
            .mediations(PipelineMediationArgs.builder()
                .transformation(PipelineMediationTransformationArgs.builder()
                    .transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
                    """)
                    .build())
                .build())
            .loggingConfig(PipelineLoggingConfigArgs.builder()
                .logSeverity("DEBUG")
                .build())
            .build());

    }
}
resources:
  primary:
    type: gcp:eventarc:Pipeline
    properties:
      location: us-central1
      pipelineId: some-pipeline
      destinations:
        - httpEndpoint:
            uri: https://10.77.0.0:80/route
            messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
          networkConfig:
            networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
          authenticationConfig:
            oauthToken:
              serviceAccount: my@service-account.com
              scope: https://www.googleapis.com/auth/cloud-platform
          outputPayloadFormat:
            protobuf:
              schemaDefinition: |
                syntax = \"proto3\";
                message schema {
                string name = 1;
                string severity = 2;
                }                
      inputPayloadFormat:
        protobuf:
          schemaDefinition: |
            syntax = \"proto3\";
            message schema {
            string name = 1;
            string severity = 2;
            }            
      retryPolicy:
        maxRetryDelay: 50s
        maxAttempts: 2
        minRetryDelay: 40s
      mediations:
        - transformation:
            transformationTemplate: |
              {
              \"id\": message.id,
              \"datacontenttype\": \"application/json\",
              \"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
              }              
      loggingConfig:
        logSeverity: DEBUG

The oauthToken block generates OAuth 2.0 access tokens with the specified scope. The protobuf format requires a schemaDefinition that defines the message structure. When you set inputPayloadFormat and outputPayloadFormat to protobuf, the pipeline deserializes incoming events and serializes outgoing events according to your schema.

Encrypt events with customer-managed keys

Compliance requirements often mandate customer-managed encryption keys. Pipelines can encrypt event data using Cloud KMS keys.

import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";

const primary = new gcp.eventarc.Pipeline("primary", {
    location: "us-central1",
    pipelineId: "some-pipeline",
    cryptoKeyName: "some-key",
    destinations: [{
        httpEndpoint: {
            uri: "https://10.77.0.0:80/route",
            messageBindingTemplate: "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
        },
        networkConfig: {
            networkAttachment: "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
        outputPayloadFormat: {
            avro: {
                schemaDefinition: "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
            },
        },
    }],
    inputPayloadFormat: {
        avro: {
            schemaDefinition: "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
        },
    },
    retryPolicy: {
        maxRetryDelay: "50s",
        maxAttempts: 2,
        minRetryDelay: "40s",
    },
    mediations: [{
        transformation: {
            transformationTemplate: `{
\\"id\\": message.id,
\\"datacontenttype\\": \\"application/json\\",
\\"data\\": \\"{ \\\\\\"scrubbed\\\\\\": \\\\\\"true\\\\\\" }\\"
}
`,
        },
    }],
    loggingConfig: {
        logSeverity: "DEBUG",
    },
});
import pulumi
import pulumi_gcp as gcp

primary = gcp.eventarc.Pipeline("primary",
    location="us-central1",
    pipeline_id="some-pipeline",
    crypto_key_name="some-key",
    destinations=[{
        "http_endpoint": {
            "uri": "https://10.77.0.0:80/route",
            "message_binding_template": "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
        },
        "network_config": {
            "network_attachment": "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
        },
        "output_payload_format": {
            "avro": {
                "schema_definition": "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
            },
        },
    }],
    input_payload_format={
        "avro": {
            "schema_definition": "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
        },
    },
    retry_policy={
        "max_retry_delay": "50s",
        "max_attempts": 2,
        "min_retry_delay": "40s",
    },
    mediations=[{
        "transformation": {
            "transformation_template": """{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
""",
        },
    }],
    logging_config={
        "log_severity": "DEBUG",
    })
package main

import (
	"github.com/pulumi/pulumi-gcp/sdk/v9/go/gcp/eventarc"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

func main() {
	pulumi.Run(func(ctx *pulumi.Context) error {
		_, err := eventarc.NewPipeline(ctx, "primary", &eventarc.PipelineArgs{
			Location:      pulumi.String("us-central1"),
			PipelineId:    pulumi.String("some-pipeline"),
			CryptoKeyName: pulumi.String("some-key"),
			Destinations: eventarc.PipelineDestinationArray{
				&eventarc.PipelineDestinationArgs{
					HttpEndpoint: &eventarc.PipelineDestinationHttpEndpointArgs{
						Uri:                    pulumi.String("https://10.77.0.0:80/route"),
						MessageBindingTemplate: pulumi.String("{\"headers\":{\"new-header-key\": \"new-header-value\"}}"),
					},
					NetworkConfig: &eventarc.PipelineDestinationNetworkConfigArgs{
						NetworkAttachment: pulumi.String("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment"),
					},
					OutputPayloadFormat: &eventarc.PipelineDestinationOutputPayloadFormatArgs{
						Avro: &eventarc.PipelineDestinationOutputPayloadFormatAvroArgs{
							SchemaDefinition: pulumi.String("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}"),
						},
					},
				},
			},
			InputPayloadFormat: &eventarc.PipelineInputPayloadFormatArgs{
				Avro: &eventarc.PipelineInputPayloadFormatAvroArgs{
					SchemaDefinition: pulumi.String("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}"),
				},
			},
			RetryPolicy: &eventarc.PipelineRetryPolicyArgs{
				MaxRetryDelay: pulumi.String("50s"),
				MaxAttempts:   pulumi.Int(2),
				MinRetryDelay: pulumi.String("40s"),
			},
			Mediations: eventarc.PipelineMediationArray{
				&eventarc.PipelineMediationArgs{
					Transformation: &eventarc.PipelineMediationTransformationArgs{
						TransformationTemplate: pulumi.String(`{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
`),
					},
				},
			},
			LoggingConfig: &eventarc.PipelineLoggingConfigArgs{
				LogSeverity: pulumi.String("DEBUG"),
			},
		})
		if err != nil {
			return err
		}
		return nil
	})
}
using System.Collections.Generic;
using System.Linq;
using Pulumi;
using Gcp = Pulumi.Gcp;

return await Deployment.RunAsync(() => 
{
    var primary = new Gcp.Eventarc.Pipeline("primary", new()
    {
        Location = "us-central1",
        PipelineId = "some-pipeline",
        CryptoKeyName = "some-key",
        Destinations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineDestinationArgs
            {
                HttpEndpoint = new Gcp.Eventarc.Inputs.PipelineDestinationHttpEndpointArgs
                {
                    Uri = "https://10.77.0.0:80/route",
                    MessageBindingTemplate = "{\"headers\":{\"new-header-key\": \"new-header-value\"}}",
                },
                NetworkConfig = new Gcp.Eventarc.Inputs.PipelineDestinationNetworkConfigArgs
                {
                    NetworkAttachment = "projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment",
                },
                OutputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatArgs
                {
                    Avro = new Gcp.Eventarc.Inputs.PipelineDestinationOutputPayloadFormatAvroArgs
                    {
                        SchemaDefinition = "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
                    },
                },
            },
        },
        InputPayloadFormat = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatArgs
        {
            Avro = new Gcp.Eventarc.Inputs.PipelineInputPayloadFormatAvroArgs
            {
                SchemaDefinition = "{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}",
            },
        },
        RetryPolicy = new Gcp.Eventarc.Inputs.PipelineRetryPolicyArgs
        {
            MaxRetryDelay = "50s",
            MaxAttempts = 2,
            MinRetryDelay = "40s",
        },
        Mediations = new[]
        {
            new Gcp.Eventarc.Inputs.PipelineMediationArgs
            {
                Transformation = new Gcp.Eventarc.Inputs.PipelineMediationTransformationArgs
                {
                    TransformationTemplate = @"{
\""id\"": message.id,
\""datacontenttype\"": \""application/json\"",
\""data\"": \""{ \\\""scrubbed\\\"": \\\""true\\\"" }\""
}
",
                },
            },
        },
        LoggingConfig = new Gcp.Eventarc.Inputs.PipelineLoggingConfigArgs
        {
            LogSeverity = "DEBUG",
        },
    });

});
package generated_program;

import com.pulumi.Context;
import com.pulumi.Pulumi;
import com.pulumi.core.Output;
import com.pulumi.gcp.eventarc.Pipeline;
import com.pulumi.gcp.eventarc.PipelineArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationHttpEndpointArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationNetworkConfigArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineDestinationOutputPayloadFormatAvroArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineInputPayloadFormatAvroArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineRetryPolicyArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineMediationTransformationArgs;
import com.pulumi.gcp.eventarc.inputs.PipelineLoggingConfigArgs;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

public class App {
    public static void main(String[] args) {
        Pulumi.run(App::stack);
    }

    public static void stack(Context ctx) {
        var primary = new Pipeline("primary", PipelineArgs.builder()
            .location("us-central1")
            .pipelineId("some-pipeline")
            .cryptoKeyName("some-key")
            .destinations(PipelineDestinationArgs.builder()
                .httpEndpoint(PipelineDestinationHttpEndpointArgs.builder()
                    .uri("https://10.77.0.0:80/route")
                    .messageBindingTemplate("{\"headers\":{\"new-header-key\": \"new-header-value\"}}")
                    .build())
                .networkConfig(PipelineDestinationNetworkConfigArgs.builder()
                    .networkAttachment("projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment")
                    .build())
                .outputPayloadFormat(PipelineDestinationOutputPayloadFormatArgs.builder()
                    .avro(PipelineDestinationOutputPayloadFormatAvroArgs.builder()
                        .schemaDefinition("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}")
                        .build())
                    .build())
                .build())
            .inputPayloadFormat(PipelineInputPayloadFormatArgs.builder()
                .avro(PipelineInputPayloadFormatAvroArgs.builder()
                    .schemaDefinition("{\"type\": \"record\", \"name\": \"my_record\", \"fields\": [{\"name\": \"my_field\", \"type\": \"string\"}]}")
                    .build())
                .build())
            .retryPolicy(PipelineRetryPolicyArgs.builder()
                .maxRetryDelay("50s")
                .maxAttempts(2)
                .minRetryDelay("40s")
                .build())
            .mediations(PipelineMediationArgs.builder()
                .transformation(PipelineMediationTransformationArgs.builder()
                    .transformationTemplate("""
{
\"id\": message.id,
\"datacontenttype\": \"application/json\",
\"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
}
                    """)
                    .build())
                .build())
            .loggingConfig(PipelineLoggingConfigArgs.builder()
                .logSeverity("DEBUG")
                .build())
            .build());

    }
}
resources:
  primary:
    type: gcp:eventarc:Pipeline
    properties:
      location: us-central1
      pipelineId: some-pipeline
      cryptoKeyName: some-key
      destinations:
        - httpEndpoint:
            uri: https://10.77.0.0:80/route
            messageBindingTemplate: '{"headers":{"new-header-key": "new-header-value"}}'
          networkConfig:
            networkAttachment: projects/my-project-name/regions/us-central1/networkAttachments/some-network-attachment
          outputPayloadFormat:
            avro:
              schemaDefinition: '{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'
      inputPayloadFormat:
        avro:
          schemaDefinition: '{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'
      retryPolicy:
        maxRetryDelay: 50s
        maxAttempts: 2
        minRetryDelay: 40s
      mediations:
        - transformation:
            transformationTemplate: |
              {
              \"id\": message.id,
              \"datacontenttype\": \"application/json\",
              \"data\": \"{ \\\"scrubbed\\\": \\\"true\\\" }\"
              }              
      loggingConfig:
        logSeverity: DEBUG

The cryptoKeyName property specifies a KMS crypto key for encrypting event data at rest. The avro format uses a JSON schema definition to describe the record structure. This configuration ensures events are encrypted with your own keys while maintaining structured data serialization.

Beyond these examples

These snippets focus on specific pipeline features: destination types (Pub/Sub, HTTP, Workflows), authentication (OIDC, OAuth), and payload formats and transformations. They’re intentionally minimal rather than full event processing systems.

The examples may reference pre-existing infrastructure such as Pub/Sub topics, Workflows resources, network attachments for private connectivity, service accounts with appropriate IAM permissions, and KMS crypto keys for encryption. They focus on configuring the pipeline rather than provisioning everything around it.

To keep things focused, common pipeline patterns are omitted, including:

  • Retry policy tuning (maxAttempts, minRetryDelay, maxRetryDelay)
  • Logging configuration (logSeverity)
  • Message binding templates for header manipulation
  • Labels and annotations for resource organization

These omissions are intentional: the goal is to illustrate how each pipeline feature is wired, not provide drop-in event processing modules. See the Eventarc Pipeline resource reference for all available configuration options.

Let's create GCP Eventarc Pipelines

Get started with Pulumi Cloud, then follow our quick setup guide to deploy this infrastructure.

Try Pulumi Cloud for FREE

Frequently Asked Questions

Destinations & Routing
Can I configure multiple destinations for a pipeline?
No, exactly one destination is supported per Pipeline. Configure a single destination in the destinations array.
What destination types are available?
Three types: Pub/Sub topic, HTTP httpEndpoint (requires networkAttachment), or Workflows workflow.
How do I authenticate to an HTTP endpoint destination?
Use authenticationConfig with either googleOidc (service account and audience) or oauthToken (service account and scope).
Retry & Error Handling
What's the default retry behavior for pipelines?
Retries start with a 5-second delay and double after each failed attempt (10s, 20s, 40s, etc.), capped at 60 seconds by default.
How do I make retries happen at constant intervals?
Set minRetryDelay and maxRetryDelay to the same value in retryPolicy. This makes the duration between retries constant.
Payload Formats & Transformation
What payload formats are supported?
JSON, Protobuf, and Avro for both inputPayloadFormat and outputPayloadFormat.
Do I need to provide a schema for all payload formats?
No. JSON format doesn’t require a schema, but Protobuf and Avro require schemaDefinition.
Can I add multiple transformation operations to a pipeline?
No, only one Transformation operation is allowed per Pipeline. Configure a single transformation in the mediations array.
Security & Encryption
How do I encrypt pipeline event data?
Use cryptoKeyName with a KMS crypto key. If not set, an internal Google-owned key is used to encrypt messages.
Resource Configuration
What properties can't I change after creating a pipeline?
The location, pipelineId, and project properties are immutable. Changing them will force resource recreation.
Why don't my labels and annotations show all values on the resource?
The labels and annotations fields are non-authoritative and only manage values in your configuration. Use effectiveLabels and effectiveAnnotations to see all values present on the resource.

Using a different cloud?

Explore messaging guides for other cloud providers: