Load GitHub events into Snowflake on GCP with Snowpipe auto-ingest

Switch variant

Choose a different cloud, loading mode, or webhook source.

Build a GCP-based webhook ingestion path that lands GitHub events in Snowflake with Snowpipe auto-ingest, plus blueprint downloads, reusable component code, and operating notes.

Download blueprint

Get this GCP + Snowpipe auto-ingest + GitHub blueprint project as a zip. Switch Pulumi language here to keep the download aligned with the install commands and blueprint program on the page.

Download the Python blueprint with the matching Pulumi program, dependency files, and README.

Download Python blueprint

Download the TypeScript blueprint with the matching Pulumi program, dependency files, and README.

Download TypeScript blueprint

This guide shows how to land GitHub events in Snowflake on GCP with Snowpipe auto-ingest.

It covers the storage, eventing, and Snowflake objects for this setup so you can get raw events flowing first and shape downstream tables later.

On GCP, this guide uses Cloud Functions (2nd gen) for the public handler and Google Cloud Storage for the raw landing zone. The loading path for this variant (Snowpipe auto-ingest) carries those objects into Snowflake.

The Snowpipe auto-ingest path keeps object storage in the middle. Your webhook handler writes files to cloud storage, and Snowflake auto-ingests them. It is a good fit when you want a durable raw archive or need a cloud-native queueing and retry story before Snowflake loads the data. This blueprint provisions the public Cloud Function endpoint, the GCS landing zone, a Snowflake storage integration and external GCS stage, a Pub/Sub topic plus subscription, and an auto-ingest pipe wired to object-finalize notifications.

This guide provisions the GitHub webhook for you and points it at the deployed public endpoint so repository events land in Snowflake on the first deploy.

Quickstart

  1. Download the blueprint zip for your language below, or create a new Pulumi project with the same file layout shown in the Download section.
  2. Install dependencies for your selected language and configure Snowflake plus GCP.
  3. Deploy the stack to create the public Cloud Function endpoint, the GCS landing bucket, and the Snowflake loading objects.
  4. Register the webhook source and send a test event.
  5. Query the landing table in Snowflake to confirm the event arrived.

After the first test event, new rows usually appear in about two minutes.

Prerequisites

  • a Pulumi account and the Pulumi CLI
  • a Google Cloud project where you can create Cloud Functions, Cloud Storage buckets, Pub/Sub topics, and IAM bindings
  • a Snowflake account where you can create databases, schemas, and loading roles
  • a GitHub personal access token or app credential that can manage repository webhooks
  • the GitHub repository name you want to connect, set with pulumi config set webhook-repo <repo>

For the Pulumi language you selected:

Python 3.11 or newer and a virtual environment tool
Node.js 20 or newer and npm

Initialize your stack for GCP with:

pulumi stack init dev
pulumi config set gcp:project 123456789012
pulumi config set gcp:region us-central1

Set up credentials with Pulumi ESC

This guide needs cloud credentials, Snowflake credentials, and any source-specific token required to provision the webhook. A single ESC environment is usually the smallest setup that still keeps secrets out of local files.

values:
  gcp:
    login:
      fn::open::gcp-login:
        project: 123456789012
        oidc:
          workloadPoolId: pulumi-esc
          providerId: pulumi-esc
          serviceAccount: pulumi-esc@example-project.iam.gserviceaccount.com
  snowflake:
    login:
      fn::open::snowflake-login:
        oidc:
          account: <your-snowflake-account>
          user: ESC_SERVICE_USER
    organizationName: <your-org-name>
    accountName: <your-account-name>
  github:
    token:
      fn::secret: <your-github-token>
    owner: <your-github-org-or-user>
  environmentVariables:
    SNOWFLAKE_USER: ${snowflake.login.user}
    SNOWFLAKE_TOKEN: ${snowflake.login.token}
  pulumiConfig:
    snowflake:organizationName: ${snowflake.organizationName}
    snowflake:accountName: ${snowflake.accountName}
    snowflake:authenticator: OAUTH
    snowflake:role: PULUMI_DEPLOYER
    gcp:project: ${gcp.login.project}
    gcp:region: us-central1
    gcp:accessToken: ${gcp.login.accessToken}

    github:token: ${github.token}
    github:owner: ${github.owner}

Then reference it from your stack config:

environment:
  - <your-org>/<your-environment>
config:
  webhook-to-snowflake:database: LANDING_ZONE_WEBHOOKS

What you get in the download

The downloadable example zip includes:

  • Pulumi.yaml
  • the Pulumi program, dependency files, cloud runtime support files, and reusable components for the language you pick below
  • a README with a shorter quick start for this exact setup
  • __main__.py as the Pulumi entrypoint
  • components/webhook_ingestion.py for the public webhook endpoint
  • components/snowpipe_pipeline.py for the Snowflake loading path
  • cloudfunction/main.py for request validation and GCS writes
  • cloudfunction/requirements.txt for the Cloud Functions runtime dependencies
  • requirements.txt for the root Pulumi project
  • index.ts as the Pulumi entrypoint
  • components/webhook_ingestion.ts for the public webhook endpoint
  • components/snowpipe_pipeline.ts for the Snowflake loading path
  • cloudfunction/main.py for request validation and GCS writes
  • cloudfunction/requirements.txt for the Cloud Functions runtime dependencies
  • package.json and tsconfig.json for the root Pulumi project

The next sections show the same entrypoint and component files that ship in the download.

Blueprint Pulumi program

This blueprint shows the full resource wiring for the GCP Snowpipe auto-ingest path with a GitHub source. The downloadable repo uses the same entrypoint and component files shown below.

import pulumi
import pulumi_gcp as gcp
import pulumi_random as random
import pulumi_snowflake as snowflake
import pulumi_github as github

from components.snowpipe_pipeline import SnowpipePipeline
from components.webhook_ingestion import WebhookIngestion

config = pulumi.Config()
database_name = config.get("database") or "LANDING_ZONE_WEBHOOKS"
project = pulumi.Config("gcp").require("project")
region = pulumi.Config("gcp").require("region")
webhook_repo = config.require("webhook-repo")

landing_bucket = gcp.storage.Bucket(
    "landing-bucket",
    location=region,
    project=project,
    uniform_bucket_level_access=True,
    public_access_prevention="enforced",
)
database = snowflake.Database("landing-db", name=database_name)
schema = snowflake.Schema("raw-schema", name="RAW", database=database.name)
webhook_secret = random.RandomPassword("webhook-secret", length=32, special=False)

ingestion = WebhookIngestion(
    "source-webhooks",
    bucket_name=landing_bucket.name,
    project=project,
    region=region,
    webhook_secret=webhook_secret.result,
)

pipeline = SnowpipePipeline(
    "source-events",
    bucket_name=landing_bucket.name,
    project=project,
    database=database.name,
    schema_name=schema.name,
)

endpoint_url = ingestion.endpoint_url

github.RepositoryWebhook(
    "source-webhook",
    repository=webhook_repo,
    configuration=github.RepositoryWebhookConfigurationArgs(
        url=endpoint_url,
        content_type="json",
        secret=webhook_secret.result,
    ),
    events=["push", "pull_request", "issues", "star"],
)

pulumi.export("landing_bucket_name", landing_bucket.name)
pulumi.export("stage_name", pipeline.stage_name)
pulumi.export("table_name", pipeline.table_name)
pulumi.export("pipe_name", pipeline.pipe_name)
import * as gcp from "@pulumi/gcp";
import * as pulumi from "@pulumi/pulumi";
import * as random from "@pulumi/random";
import * as snowflake from "@pulumi/snowflake";
import * as github from "@pulumi/github";

import { SnowpipePipeline } from "./components/snowpipe_pipeline";
import { WebhookIngestion } from "./components/webhook_ingestion";

const config = new pulumi.Config();
const databaseName = config.get("database") ?? "LANDING_ZONE_WEBHOOKS";
const project = new pulumi.Config("gcp").require("project");
const region = new pulumi.Config("gcp").require("region");
const webhookRepo = config.require("webhook-repo");

const landingBucket = new gcp.storage.Bucket("landing-bucket", {
    location: region,
    project,
    uniformBucketLevelAccess: true,
    publicAccessPrevention: "enforced",
});
const database = new snowflake.Database("landing-db", { name: databaseName });
const schema = new snowflake.Schema("raw-schema", {
    name: "RAW",
    database: database.name,
});
const sharedSecret = new random.RandomPassword("webhook-secret", {
    length: 32,
    special: false,
});

const ingestion = new WebhookIngestion("source-webhooks", {
    bucketName: landingBucket.name,
    project,
    region,
    webhookSecret: sharedSecret.result,
});

const pipeline = new SnowpipePipeline("source-events", {
    bucketName: landingBucket.name,
    project,
    database: database.name,
    schemaName: schema.name,
});

const endpointUrl = ingestion.endpointUrl;

new github.RepositoryWebhook("source-webhook", {
    repository: webhookRepo,
    configuration: {
        url: endpointUrl,
        contentType: "json",
        secret: sharedSecret.result,
    },
    events: ["push", "pull_request", "issues", "star"],
});

export const landingBucketName = landingBucket.name;
export const stageName = pipeline.stageName;
export const tableName = pipeline.tableName;
export const pipeName = pipeline.pipeName;

Reusable components

The entrypoint stays small because the real ingestion work lives in reusable modules. These are the same component files packaged in the downloadable blueprint for this setup.

components/webhook_ingestion.py

Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.

from __future__ import annotations

import base64
import hashlib
import tempfile
from dataclasses import dataclass
from pathlib import Path
from zipfile import ZIP_DEFLATED, ZipFile

import pulumi
import pulumi_gcp as gcp
import pulumi_random as random


def _create_function_archive(name: str) -> tuple[str, str]:
    archive_path = Path(tempfile.gettempdir()) / f"{name}-cloudfunction.zip"
    with ZipFile(archive_path, "w", compression=ZIP_DEFLATED) as archive:
        archive.write(Path("cloudfunction/main.py"), arcname="main.py")
        archive.write(Path("cloudfunction/requirements.txt"), arcname="requirements.txt")

    archive_bytes = archive_path.read_bytes()
    source_md5hash = base64.b64encode(hashlib.md5(archive_bytes).digest()).decode("utf-8")
    return str(archive_path), source_md5hash


@dataclass
class WebhookIngestion:
    endpoint_url: pulumi.Output[str]
    function_name: pulumi.Output[str]

    def __init__(
        self,
        name: str,
        *,
        bucket_name: pulumi.Input[str],
        project: pulumi.Input[str],
        region: pulumi.Input[str],
        webhook_secret: pulumi.Input[str],
    ) -> None:
        archive_path, source_md5hash = _create_function_archive(name)

        service_account_suffix = random.RandomString(
            f"{name}-service-account-suffix",
            length=8,
            special=False,
            upper=False,
        )
        service_account = gcp.serviceaccount.Account(
            f"{name}-service-account",
            account_id=service_account_suffix.result.apply(lambda value: f"w2sf-{value}"),
            display_name="Webhook ingestion function",
        )

        gcp.storage.BucketIAMMember(
            f"{name}-bucket-writer",
            bucket=bucket_name,
            role="roles/storage.objectCreator",
            member=service_account.email.apply(lambda email: f"serviceAccount:{email}"),
        )

        source_object = gcp.storage.BucketObject(
            f"{name}-source-object",
            bucket=bucket_name,
            name="deployments/cloudfunction-source.zip",
            source=pulumi.FileAsset(archive_path),
            source_md5hash=source_md5hash,
        )

        function = gcp.cloudfunctionsv2.Function(
            f"{name}-function",
            name=f"webhook-to-snowflake-{name}",
            location=region,
            build_config=gcp.cloudfunctionsv2.FunctionBuildConfigArgs(
                runtime="python311",
                entry_point="webhook",
                source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs(
                    storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs(
                        bucket=bucket_name,
                        object=source_object.name,
                    )
                ),
            ),
            service_config=gcp.cloudfunctionsv2.FunctionServiceConfigArgs(
                available_memory="256M",
                timeout_seconds=30,
                ingress_settings="ALLOW_ALL",
                all_traffic_on_latest_revision=True,
                service_account_email=service_account.email,
                environment_variables={
                    "LANDING_BUCKET": bucket_name,
                    "LANDING_PREFIX": "incoming",
                    "WEBHOOK_SECRET": webhook_secret,
                },
            ),
        )

        gcp.cloudfunctionsv2.FunctionIamMember(
            f"{name}-function-invoker",
            project=project,
            location=function.location,
            cloud_function=function.name,
            role="roles/cloudfunctions.invoker",
            member="allUsers",
        )

        gcp.cloudrun.IamMember(
            f"{name}-run-invoker",
            project=project,
            location=function.location,
            service=function.name,
            role="roles/run.invoker",
            member="allUsers",
        )

        self.endpoint_url = function.service_config.uri
        self.function_name = function.name

components/snowpipe_pipeline.py

Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the Snowpipe auto-ingest loading path.

from __future__ import annotations

from dataclasses import dataclass

import pulumi
import pulumi_gcp as gcp
import pulumi_snowflake as snowflake


def _copy_into_statement(database: pulumi.Input[str], schema_name: pulumi.Input[str]) -> pulumi.Output[str]:
    return pulumi.Output.all(database, schema_name).apply(
        lambda args: (
            f'COPY INTO "{args[0]}"."{args[1]}"."WEBHOOK_EVENTS" '
            f'FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() '
            f'FROM @"{args[0]}"."{args[1]}"."WEBHOOK_EVENTS_STAGE") '
            "FILE_FORMAT = (TYPE = JSON)"
        )
    )


@dataclass
class SnowpipePipeline:
    stage_name: pulumi.Output[str]
    table_name: pulumi.Output[str]
    pipe_name: pulumi.Output[str]

    def __init__(
        self,
        name: str,
        *,
        bucket_name: pulumi.Input[str],
        project: pulumi.Input[str],
        database: pulumi.Input[str],
        schema_name: pulumi.Input[str],
    ) -> None:
        preview_provider = snowflake.Provider(
            f"{name}-preview-provider",
            preview_features_enabled=[
                "snowflakeNotificationIntegrationResource",
                "snowflakePipeResource",
                "snowflakeStageExternalGcsResource",
                "snowflakeStorageIntegrationResource",
                "snowflakeTableResource",
            ],
        )
        preview_opts = pulumi.ResourceOptions(provider=preview_provider)

        table = snowflake.Table(
            f"{name}-table",
            database=database,
            schema=schema_name,
            name="WEBHOOK_EVENTS",
            columns=[
                snowflake.TableColumnArgs(name="FILENAME", type="STRING", nullable=False),
                snowflake.TableColumnArgs(
                    name="LAST_MODIFIED_AT",
                    type="TIMESTAMP_NTZ",
                    nullable=False,
                ),
                snowflake.TableColumnArgs(name="CONTENT", type="VARIANT"),
                snowflake.TableColumnArgs(name="LOADED_AT", type="TIMESTAMP_NTZ"),
            ],
            opts=preview_opts,
        )

        stage_url = pulumi.Output.from_input(bucket_name).apply(
            lambda current: f"gcs://{current}/incoming/"
        )
        storage_integration = snowflake.StorageIntegration(
            f"{name}-storage-integration",
            name="WEBHOOK_EVENTS_STORAGE_INTEGRATION",
            enabled=True,
            storage_provider="GCS",
            storage_allowed_locations=[stage_url],
            opts=preview_opts,
        )

        storage_member = storage_integration.storage_gcp_service_account.apply(
            lambda service_account: f"serviceAccount:{service_account}"
        )
        bucket_reader = gcp.storage.BucketIAMMember(
            f"{name}-bucket-reader",
            bucket=bucket_name,
            role="roles/storage.objectViewer",
            member=storage_member,
        )
        bucket_metadata_reader = gcp.storage.BucketIAMMember(
            f"{name}-bucket-metadata-reader",
            bucket=bucket_name,
            role="roles/storage.legacyBucketReader",
            member=storage_member,
        )

        stage = snowflake.StageExternalGcs(
            f"{name}-stage",
            database=database,
            schema=schema_name,
            name="WEBHOOK_EVENTS_STAGE",
            url=stage_url,
            storage_integration=storage_integration.name,
            opts=pulumi.ResourceOptions.merge(
                preview_opts,
                pulumi.ResourceOptions(depends_on=[bucket_reader, bucket_metadata_reader]),
            ),
        )

        topic = gcp.pubsub.Topic(f"{name}-events")
        gcs_account = gcp.storage.get_project_service_account(project=project)
        topic_publisher = gcp.pubsub.TopicIAMBinding(
            f"{name}-topic-publisher",
            topic=topic.id,
            role="roles/pubsub.publisher",
            members=[f"serviceAccount:{gcs_account.email_address}"],
        )

        subscription = gcp.pubsub.Subscription(
            f"{name}-subscription",
            topic=topic.id,
        )

        bucket_notification = gcp.storage.Notification(
            f"{name}-bucket-notification",
            bucket=bucket_name,
            payload_format="JSON_API_V1",
            topic=topic.id,
            event_types=["OBJECT_FINALIZE"],
            object_name_prefix="incoming/",
            opts=pulumi.ResourceOptions(depends_on=[topic_publisher]),
        )

        notification_integration = snowflake.NotificationIntegration(
            f"{name}-notification-integration",
            name="WEBHOOK_EVENTS_NOTIFICATION_INTEGRATION",
            enabled=True,
            notification_provider="GCP_PUBSUB",
            gcp_pubsub_subscription_name=subscription.id,
            opts=preview_opts,
        )

        notification_member = notification_integration.gcp_pubsub_service_account.apply(
            lambda service_account: f"serviceAccount:{service_account}"
        )
        subscription_reader = gcp.pubsub.SubscriptionIAMMember(
            f"{name}-subscription-reader",
            project=project,
            subscription=subscription.name,
            role="roles/pubsub.subscriber",
            member=notification_member,
        )
        monitoring_viewer = gcp.projects.IAMMember(
            f"{name}-monitoring-viewer",
            project=project,
            role="roles/monitoring.viewer",
            member=notification_member,
        )

        pipe = snowflake.Pipe(
            f"{name}-pipe",
            database=database,
            schema=schema_name,
            name="WEBHOOK_EVENTS_PIPE",
            auto_ingest=True,
            integration=notification_integration.name,
            copy_statement=_copy_into_statement(database, schema_name),
            opts=pulumi.ResourceOptions.merge(
                preview_opts,
                pulumi.ResourceOptions(
                    depends_on=[bucket_notification, subscription_reader, monitoring_viewer]
                ),
            ),
        )

        self.stage_name = stage.fully_qualified_name
        self.table_name = table.name
        self.pipe_name = pipe.name

components/webhook_ingestion.ts

Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.

import AdmZip from "adm-zip";
import * as crypto from "crypto";
import * as fs from "fs";
import * as os from "os";
import * as path from "path";

import * as gcp from "@pulumi/gcp";
import * as pulumi from "@pulumi/pulumi";
import * as random from "@pulumi/random";

export interface WebhookIngestionArgs {
    bucketName: pulumi.Input<string>;
    project: pulumi.Input<string>;
    region: pulumi.Input<string>;
    webhookSecret: pulumi.Input<string>;
}

function createFunctionArchive(name: string): { archivePath: string; sourceMd5hash: string } {
    const zip = new AdmZip();
    zip.addLocalFile(path.join("cloudfunction", "main.py"), "", "main.py");
    zip.addLocalFile(path.join("cloudfunction", "requirements.txt"), "", "requirements.txt");

    const buffer = zip.toBuffer();
    const archivePath = path.join(os.tmpdir(), `${name}-cloudfunction.zip`);
    fs.writeFileSync(archivePath, buffer);

    return {
        archivePath,
        sourceMd5hash: crypto.createHash("md5").update(buffer).digest("base64"),
    };
}

export class WebhookIngestion {
    public readonly endpointUrl: pulumi.Output<string>;
    public readonly functionName: pulumi.Output<string>;

    constructor(name: string, args: WebhookIngestionArgs) {
        const { archivePath, sourceMd5hash } = createFunctionArchive(name);

        const serviceAccountSuffix = new random.RandomString(`${name}-service-account-suffix`, {
            length: 8,
            special: false,
            upper: false,
        });

        const serviceAccount = new gcp.serviceaccount.Account(`${name}-service-account`, {
            accountId: serviceAccountSuffix.result.apply((value) => `w2sf-${value}`),
            displayName: "Webhook ingestion function",
        });

        new gcp.storage.BucketIAMMember(`${name}-bucket-writer`, {
            bucket: args.bucketName,
            role: "roles/storage.objectCreator",
            member: serviceAccount.email.apply((email) => `serviceAccount:${email}`),
        });

        const sourceObject = new gcp.storage.BucketObject(`${name}-source-object`, {
            bucket: args.bucketName,
            name: "deployments/cloudfunction-source.zip",
            source: new pulumi.asset.FileAsset(archivePath),
            sourceMd5hash,
        });

        const fn = new gcp.cloudfunctionsv2.Function(`${name}-function`, {
            name: `webhook-to-snowflake-${name}`,
            location: args.region,
            buildConfig: {
                runtime: "python311",
                entryPoint: "webhook",
                source: {
                    storageSource: {
                        bucket: args.bucketName,
                        object: sourceObject.name,
                    },
                },
            },
            serviceConfig: {
                availableMemory: "256M",
                timeoutSeconds: 30,
                ingressSettings: "ALLOW_ALL",
                allTrafficOnLatestRevision: true,
                serviceAccountEmail: serviceAccount.email,
                environmentVariables: {
                    LANDING_BUCKET: args.bucketName,
                    LANDING_PREFIX: "incoming",
                    WEBHOOK_SECRET: args.webhookSecret,
                },
            },
        });

        new gcp.cloudfunctionsv2.FunctionIamMember(`${name}-function-invoker`, {
            project: args.project,
            location: fn.location,
            cloudFunction: fn.name,
            role: "roles/cloudfunctions.invoker",
            member: "allUsers",
        });

        new gcp.cloudrun.IamMember(`${name}-run-invoker`, {
            project: args.project,
            location: fn.location,
            service: fn.name,
            role: "roles/run.invoker",
            member: "allUsers",
        });

        this.endpointUrl = fn.serviceConfig.apply((sc) => sc?.uri ?? "");
        this.functionName = fn.name;
    }
}

components/snowpipe_pipeline.ts

Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the Snowpipe auto-ingest loading path.

import * as gcp from "@pulumi/gcp";
import * as pulumi from "@pulumi/pulumi";
import * as snowflake from "@pulumi/snowflake";

export interface SnowpipePipelineArgs {
    bucketName: pulumi.Input<string>;
    project: pulumi.Input<string>;
    database: pulumi.Input<string>;
    schemaName: pulumi.Input<string>;
}

function copyIntoStatement(database: pulumi.Input<string>, schemaName: pulumi.Input<string>) {
    return pulumi.all([database, schemaName]).apply(([currentDatabase, currentSchema]) =>
        `COPY INTO "${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS" ` +
        `FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() ` +
        `FROM @"${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS_STAGE") ` +
        "FILE_FORMAT = (TYPE = JSON)",
    );
}

export class SnowpipePipeline {
    public readonly stageName: pulumi.Output<string>;
    public readonly tableName: pulumi.Output<string>;
    public readonly pipeName: pulumi.Output<string>;

    constructor(name: string, args: SnowpipePipelineArgs) {
        const previewProvider = new snowflake.Provider(`${name}-preview-provider`, {
            previewFeaturesEnabled: [
                "snowflakeNotificationIntegrationResource",
                "snowflakePipeResource",
                "snowflakeStageExternalGcsResource",
                "snowflakeStorageIntegrationResource",
                "snowflakeTableResource",
            ],
        });
        const previewOpts = { provider: previewProvider };

        const table = new snowflake.Table(`${name}-table`, {
            database: args.database,
            schema: args.schemaName,
            name: "WEBHOOK_EVENTS",
            columns: [
                { name: "FILENAME", type: "STRING", nullable: false },
                { name: "LAST_MODIFIED_AT", type: "TIMESTAMP_NTZ", nullable: false },
                { name: "CONTENT", type: "VARIANT" },
                { name: "LOADED_AT", type: "TIMESTAMP_NTZ" },
            ],
        }, previewOpts);

        const stageUrl = pulumi.output(args.bucketName).apply((bucketName) => `gcs://${bucketName}/incoming/`);
        const storageIntegration = new snowflake.StorageIntegration(`${name}-storage-integration`, {
            name: "WEBHOOK_EVENTS_STORAGE_INTEGRATION",
            enabled: true,
            storageProvider: "GCS",
            storageAllowedLocations: [stageUrl],
        }, previewOpts);

        const storageMember = storageIntegration.storageGcpServiceAccount.apply((serviceAccount) => `serviceAccount:${serviceAccount}`);
        const bucketReader = new gcp.storage.BucketIAMMember(`${name}-bucket-reader`, {
            bucket: args.bucketName,
            role: "roles/storage.objectViewer",
            member: storageMember,
        });
        const bucketMetadataReader = new gcp.storage.BucketIAMMember(`${name}-bucket-metadata-reader`, {
            bucket: args.bucketName,
            role: "roles/storage.legacyBucketReader",
            member: storageMember,
        });

        const stage = new snowflake.StageExternalGcs(`${name}-stage`, {
            database: args.database,
            schema: args.schemaName,
            name: "WEBHOOK_EVENTS_STAGE",
            url: stageUrl,
            storageIntegration: storageIntegration.name,
        }, pulumi.mergeOptions(previewOpts, { dependsOn: [bucketReader, bucketMetadataReader] }));

        const topic = new gcp.pubsub.Topic(`${name}-events`);
        const gcsAccount = gcp.storage.getProjectServiceAccountOutput({ project: args.project });
        const topicPublisher = new gcp.pubsub.TopicIAMBinding(`${name}-topic-publisher`, {
            topic: topic.id,
            role: "roles/pubsub.publisher",
            members: [gcsAccount.emailAddress.apply((emailAddress) => `serviceAccount:${emailAddress}`)],
        });

        const subscription = new gcp.pubsub.Subscription(`${name}-subscription`, {
            topic: topic.id,
        });

        const bucketNotification = new gcp.storage.Notification(`${name}-bucket-notification`, {
            bucket: args.bucketName,
            payloadFormat: "JSON_API_V1",
            topic: topic.id,
            eventTypes: ["OBJECT_FINALIZE"],
            objectNamePrefix: "incoming/",
        }, { dependsOn: [topicPublisher] });

        const notificationIntegration = new snowflake.NotificationIntegration(`${name}-notification-integration`, {
            name: "WEBHOOK_EVENTS_NOTIFICATION_INTEGRATION",
            enabled: true,
            notificationProvider: "GCP_PUBSUB",
            gcpPubsubSubscriptionName: subscription.id,
        }, previewOpts);

        const notificationMember = notificationIntegration.gcpPubsubServiceAccount.apply((serviceAccount) => `serviceAccount:${serviceAccount}`);
        const subscriptionReader = new gcp.pubsub.SubscriptionIAMMember(`${name}-subscription-reader`, {
            project: args.project,
            subscription: subscription.name,
            role: "roles/pubsub.subscriber",
            member: notificationMember,
        });
        const monitoringViewer = new gcp.projects.IAMMember(`${name}-monitoring-viewer`, {
            project: args.project,
            role: "roles/monitoring.viewer",
            member: notificationMember,
        });

        const pipe = new snowflake.Pipe(`${name}-pipe`, {
            database: args.database,
            schema: args.schemaName,
            name: "WEBHOOK_EVENTS_PIPE",
            autoIngest: true,
            integration: notificationIntegration.name,
            copyStatement: copyIntoStatement(args.database, args.schemaName),
        }, pulumi.mergeOptions(previewOpts, { dependsOn: [bucketNotification, subscriptionReader, monitoringViewer] }));

        this.stageName = stage.fullyQualifiedName;
        this.tableName = table.name;
        this.pipeName = pipe.name;
    }
}

Verify the data landed

After you send a test event, query Snowflake to confirm the records are visible:

SELECT FILENAME,
       LAST_MODIFIED_AT,
       CONTENT,
       LOADED_AT
FROM LANDING_ZONE_WEBHOOKS.RAW.WEBHOOK_EVENTS
ORDER BY LOADED_AT DESC;

For this path, webhook payloads land in GCS first, then Snowflake auto-ingests new files from the Pub/Sub-backed notification path.

Operating notes

  • Keep the first table as a raw landing zone. Flatten and model into downstream tables later.
  • Rotate the shared webhook secret when you roll senders or suspect exposure.
  • Watch the landing storage path and Snowflake task history so failed loads and malformed payloads do not go unnoticed.
  • Use a least-privilege Snowflake reader role for analysts instead of querying with the loading role.
  • When you choose batch loading, tune taskIntervalMinutes to match how quickly you want new files copied into Snowflake and how much warehouse activity you want between loads.

Frequently asked questions

When should I choose batch loading?
Choose batch loading when you want predictable load windows, lower always-on activity, or tighter control over when COPY INTO runs. This blueprint provisions a Snowflake task that runs once an hour so the path is still end to end. Tune taskIntervalMinutes if you want a tighter or looser cadence.
Can I keep the raw payloads in cloud storage?
Yes. Every path writes the raw payloads to cloud storage before Snowflake loads them (S3 on AWS, Blob Storage on Azure, Cloud Storage on GCP). See the variant page you picked for specifics on how the loading path reads from or retains those objects.