Load LaunchDarkly events into Snowflake on AWS with batch COPY INTO

Switch variant

Choose a different cloud, loading mode, or webhook source.

Build a AWS-based webhook ingestion path that lands LaunchDarkly events in Snowflake with batch COPY INTO, plus blueprint downloads, reusable component code, and operating notes.

Download blueprint

Get this AWS + Batch COPY INTO + LaunchDarkly blueprint project as a zip. Switch Pulumi language here to keep the download aligned with the install commands and blueprint program on the page.

Download the Python blueprint with the matching Pulumi program, dependency files, and README.

Download Python blueprint

Download the TypeScript blueprint with the matching Pulumi program, dependency files, and README.

Download TypeScript blueprint

Download the Go blueprint with the matching Pulumi program, dependency files, and README.

Download Go blueprint

This guide shows how to land LaunchDarkly events in Snowflake on AWS with batch COPY INTO.

It covers the storage, eventing, and Snowflake objects for this setup so you can get raw events flowing first and shape downstream tables later.

On AWS, this guide uses AWS Lambda for the public handler and Amazon S3 for staging or backup storage. From there the loading path for this variant (batch COPY INTO) carries records into Snowflake.

The batch COPY INTO path stages files first and leaves loading under your control. Choose it when you want to run COPY INTO on a predictable cadence and keep the raw payloads in object storage before Snowflake loads them. This blueprint provisions the public Lambda URL, the S3 landing zone, the Snowflake stage and table, an X-Small warehouse, and a Snowflake task that runs COPY INTO every five minutes.

In the blueprint, the top-level program creates the Snowflake database, schema, and warehouse, then passes those names into the reusable batch component. That means you can keep the blueprint-created objects, rename them, or swap them for your own existing database, schema, and warehouse without rewriting the loading logic. The same entrypoint also sets taskIntervalMinutes, which controls how often the Snowflake task runs COPY INTO.

If you want to change those defaults before deploying, set stack config like this:

pulumi config set database LANDING_ZONE_WEBHOOKS
pulumi config set schema RAW
pulumi config set warehouse WEBHOOK_BATCH_LOADER
pulumi config set taskIntervalMinutes 60

This guide provisions a LaunchDarkly webhook for you and points it at the deployed public endpoint. The handler validates the X-LD-Signature HMAC before forwarding each payload into the selected ingestion path.

Quickstart

  1. Download the blueprint zip for your language below, or create a new Pulumi project with the same file layout shown in the Download section.
  2. Install dependencies for your selected language and configure Snowflake plus AWS.
  3. For batch setups, decide whether you want to keep the blueprint database, schema, and warehouse names or point the program at names you already use. If needed, also change taskIntervalMinutes to the cadence you want.
  4. Deploy the stack to create the public Lambda URL, the S3 landing bucket, and the Snowflake loading objects.
  5. Register the webhook source and send a test event.
  6. Query the landing table in Snowflake to confirm the event arrived.

After the first test event, new rows usually appear within an hour because the Snowflake task runs on an hourly cadence.

Prerequisites

  • a Pulumi account and the Pulumi CLI
  • an AWS account where you can create the Lambda, S3, IAM, and loading-path resources this variant provisions
  • a Snowflake account where you can create databases, schemas, and loading roles
  • a LaunchDarkly access token that can create webhooks and read the target resources you want to emit
  • no extra Pulumi config is required unless you want to narrow which LaunchDarkly events the webhook emits by editing the statements array on the webhook resource later

For the Pulumi language you selected:

Python 3.11 or newer and a virtual environment tool
Node.js 20 or newer and npm
Go 1.23 or newer

Initialize your stack for AWS with:

pulumi stack init dev
pulumi config set aws:region us-west-2

Set up credentials with Pulumi ESC

This guide needs cloud credentials, Snowflake credentials, and any source-specific token required to provision the webhook. A single ESC environment is usually the smallest setup that still keeps secrets out of local files.

values:
  aws:
    login:
      fn::open::aws-login:
        oidc:
          roleArn: arn:aws:iam::123456789012:role/pulumi-esc-oidc
          sessionName: webhook-snowflake
  snowflake:
    login:
      fn::open::snowflake-login:
        oidc:
          account: <your-snowflake-account>
          user: ESC_SERVICE_USER
    organizationName: <your-org-name>
    accountName: <your-account-name>
  launchdarkly:
    accessToken:
      fn::secret: <your-launchdarkly-access-token>
  environmentVariables:
    SNOWFLAKE_USER: ${snowflake.login.user}
    SNOWFLAKE_TOKEN: ${snowflake.login.token}
  pulumiConfig:
    snowflake:organizationName: ${snowflake.organizationName}
    snowflake:accountName: ${snowflake.accountName}
    snowflake:authenticator: OAUTH
    snowflake:role: PULUMI_DEPLOYER


    launchdarkly:accessToken: ${launchdarkly.accessToken}

Then reference it from your stack config:

environment:
  - <your-org>/<your-environment>
config:
  webhook-to-snowflake:database: LANDING_ZONE_WEBHOOKS

What you get in the download

The downloadable example zip includes:

  • Pulumi.yaml
  • the Pulumi program, dependency files, cloud runtime support files, and reusable components for the language you pick below
  • a README with a shorter quick start for this exact setup

For batch setups, the top-level program is where you choose the Snowflake database, schema, warehouse, and task cadence. The reusable batch component then builds the stage, destination table, and scheduled load inside those objects.

  • __main__.py as the Pulumi entrypoint
  • components/webhook_ingestion.py for the public webhook endpoint
  • components/batch_pipeline.py for the Snowflake loading path
  • lambda/webhook_handler.py for request validation and writes into the selected ingestion path
  • requirements.txt for the root Pulumi project
  • index.ts as the Pulumi entrypoint
  • components/webhook_ingestion.ts for the public webhook endpoint
  • components/batch_pipeline.ts for the Snowflake loading path
  • lambda/webhook_handler.py for request validation and writes into the selected ingestion path
  • package.json and tsconfig.json for the root Pulumi project
  • main.go as the Pulumi entrypoint
  • components/webhook_ingestion.go for the public webhook endpoint
  • components/batch_pipeline.go for the Snowflake loading path
  • lambda/webhook_handler.py for request validation and writes into the selected ingestion path
  • go.mod for the root Pulumi project

The next sections show the same entrypoint and component files that ship in the download.

Blueprint Pulumi program

This blueprint shows the full resource wiring for the AWS batch COPY INTO path with a LaunchDarkly source. The downloadable repo uses the same entrypoint and component files shown below.

import pulumi
import pulumi_aws as aws
import pulumi_random as random
import pulumi_snowflake as snowflake
import lbrlabs_pulumi_launchdarkly as launchdarkly

from components.batch_pipeline import BatchPipeline
from components.webhook_ingestion import WebhookIngestion

config = pulumi.Config()
database_name = config.get("database") or "LANDING_ZONE_WEBHOOKS"
schema_name = config.get("schema") or "RAW"
warehouse_name = config.get("warehouse") or "WEBHOOK_BATCH_LOADER"
task_interval_minutes = config.get_int("taskIntervalMinutes") or 60


bucket = aws.s3.Bucket("landing-bucket")
database = snowflake.Database("landing-db", name=database_name)
schema = snowflake.Schema("raw-schema", name=schema_name, database=database.name)
warehouse = snowflake.Warehouse(
    "batch-loader-warehouse",
    name=warehouse_name,
    warehouse_size="XSMALL",
    auto_resume="true",
    auto_suspend=60,
    initially_suspended=False,
)
webhook_secret = random.RandomPassword("webhook-secret", length=32, special=False)

ingestion = WebhookIngestion(
    "source-webhooks",
    bucket_name=bucket.bucket,
    bucket_arn=bucket.arn,
    webhook_secret=webhook_secret.result,
)

pipeline = BatchPipeline(
    "source-events",
    bucket_name=bucket.bucket,
    bucket_arn=bucket.arn,
    database=database.name,
    schema_name=schema.name,
    warehouse_name=warehouse.name,
    task_interval_minutes=task_interval_minutes,
)

endpoint_url = ingestion.function_url

launchdarkly.Webhook(
    "source-webhook",
    url=endpoint_url,
    name="snowflake-webhook",
    on=True,
    secret=webhook_secret.result,
    statements=[
        launchdarkly.WebhookStatementArgs(
            effect="allow",
            actions=["*"],
            resources=["proj/*:env/*:flag/*"],
        )
    ],
    tags=["pulumi", "snowflake"],
)

pulumi.export("landing_bucket_name", bucket.bucket)
pulumi.export("stage_name", pipeline.stage_name)
pulumi.export("table_name", pipeline.table_name)
pulumi.export("warehouse_name", warehouse.name)
pulumi.export("task_name", pipeline.task_name)
import * as aws from "@pulumi/aws";
import * as pulumi from "@pulumi/pulumi";
import * as random from "@pulumi/random";
import * as snowflake from "@pulumi/snowflake";
import * as launchdarkly from "@lbrlabs/pulumi-launchdarkly";

import { BatchPipeline } from "./components/batch_pipeline";
import { WebhookIngestion } from "./components/webhook_ingestion";

const config = new pulumi.Config();
const databaseName = config.get("database") ?? "LANDING_ZONE_WEBHOOKS";
const schemaNameValue = config.get("schema") ?? "RAW";
const warehouseNameValue = config.get("warehouse") ?? "WEBHOOK_BATCH_LOADER";
const taskIntervalMinutes = config.getNumber("taskIntervalMinutes") ?? 60;


const bucket = new aws.s3.Bucket("landing-bucket");
const database = new snowflake.Database("landing-db", { name: databaseName });
const schema = new snowflake.Schema("raw-schema", {
    name: schemaNameValue,
    database: database.name,
});
const warehouse = new snowflake.Warehouse("batch-loader-warehouse", {
    name: warehouseNameValue,
    warehouseSize: "XSMALL",
    autoResume: "true",
    autoSuspend: 60,
    initiallySuspended: false,
});
const sharedSecret = new random.RandomPassword("webhook-secret", {
    length: 32,
    special: false,
});

const ingestion = new WebhookIngestion("source-webhooks", {
    bucketName: bucket.bucket,
    bucketArn: bucket.arn,
    webhookSecret: sharedSecret.result,
});

const pipeline = new BatchPipeline("source-events", {
    bucketName: bucket.bucket,
    bucketArn: bucket.arn,
    database: database.name,
    schemaName: schema.name,
    warehouseName: warehouse.name,
    taskIntervalMinutes,
});

const endpointUrl = ingestion.functionUrl;

new launchdarkly.Webhook("source-webhook", {
    url: endpointUrl,
    name: "snowflake-webhook",
    on: true,
    secret: sharedSecret.result,
    statements: [
        {
            effect: "allow",
            actions: ["*"],
            resources: ["proj/*:env/*:flag/*"],
        },
    ],
    tags: ["pulumi", "snowflake"],
});

export const landingBucketName = bucket.bucket;
export const stageName = pipeline.stageName;
export const tableName = pipeline.tableName;
export const warehouseName = warehouse.name;
export const taskName = pipeline.taskName;
package main

import (
	"webhook-to-snowflake-aws-batch-launchdarkly/components"
	"github.com/lbrlabs/pulumi-launchdarkly/sdk/go/launchdarkly"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
	"github.com/pulumi/pulumi-random/sdk/v4/go/random"
	"github.com/pulumi/pulumi-snowflake/sdk/v2/go/snowflake"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
)

func main() {
	pulumi.Run(Program)
}

func Program(ctx *pulumi.Context) error {
	cfg := config.New(ctx, "")
	databaseName := cfg.Get("database")
	if databaseName == "" {
		databaseName = "LANDING_ZONE_WEBHOOKS"
	}
	schemaName := cfg.Get("schema")
	if schemaName == "" {
		schemaName = "RAW"
	}
	warehouseName := cfg.Get("warehouse")
	if warehouseName == "" {
		warehouseName = "WEBHOOK_BATCH_LOADER"
	}
	taskIntervalMinutes := cfg.GetInt("taskIntervalMinutes")
	if taskIntervalMinutes == 0 {
		taskIntervalMinutes = 60
	}
	

	bucket, err := s3.NewBucket(ctx, "landing-bucket", nil)
	if err != nil {
		return err
	}

	database, err := snowflake.NewDatabase(ctx, "landing-db", &snowflake.DatabaseArgs{
		Name: pulumi.String(databaseName),
	})
	if err != nil {
		return err
	}

	schema, err := snowflake.NewSchema(ctx, "raw-schema", &snowflake.SchemaArgs{
		Name:     pulumi.String(schemaName),
		Database: database.Name,
	})
	if err != nil {
		return err
	}

	warehouse, err := snowflake.NewWarehouse(ctx, "batch-loader-warehouse", &snowflake.WarehouseArgs{
		Name:               pulumi.String(warehouseName),
		WarehouseSize:      pulumi.String("XSMALL"),
		AutoResume:         pulumi.String("true"),
		AutoSuspend:        pulumi.Int(60),
		InitiallySuspended: pulumi.Bool(false),
	})
	if err != nil {
		return err
	}

	sharedSecret, err := random.NewRandomPassword(ctx, "webhook-secret", &random.RandomPasswordArgs{
		Length:  pulumi.Int(32),
		Special: pulumi.Bool(false),
	})
	if err != nil {
		return err
	}

	ingestion, err := components.NewWebhookIngestion(ctx, "source-webhooks", &components.WebhookIngestionArgs{
		BucketName:    bucket.Bucket,
		BucketArn:     bucket.Arn,
		WebhookSecret: sharedSecret.Result,
	})
	if err != nil {
		return err
	}

	pipeline, err := components.NewBatchPipeline(ctx, "source-events", &components.BatchPipelineArgs{
		BucketName:          bucket.Bucket,
		BucketArn:           bucket.Arn,
		Database:            database.Name,
		SchemaName:          schema.Name,
		WarehouseName:       warehouse.Name,
		TaskIntervalMinutes: taskIntervalMinutes,
	})
	if err != nil {
		return err
	}

	endpointUrl := ingestion.FunctionUrl

	_, err = launchdarkly.NewWebhook(ctx, "source-webhook", &launchdarkly.WebhookArgs{
    Url:    endpointUrl,
    Name:   pulumi.String("snowflake-webhook"),
    On:     pulumi.Bool(true),
    Secret: sharedSecret.Result.ToStringPtrOutput(),
    Statements: launchdarkly.WebhookStatementArray{
        &launchdarkly.WebhookStatementArgs{
            Effect: pulumi.String("allow"),
            Actions: pulumi.StringArray{
                pulumi.String("*"),
            },
            Resources: pulumi.StringArray{
                pulumi.String("proj/*:env/*:flag/*"),
            },
        },
    },
    Tags: pulumi.StringArray{
        pulumi.String("pulumi"),
        pulumi.String("snowflake"),
    },
})
if err != nil {
    return err
}

	ctx.Export("landingBucketName", bucket.Bucket)
	ctx.Export("stageName", pipeline.StageName)
	ctx.Export("tableName", pipeline.TableName)
	ctx.Export("warehouseName", warehouse.Name)
	ctx.Export("taskName", pipeline.TaskName)
	return nil
}

Reusable components

The entrypoint stays small because the real ingestion work lives in reusable modules. These are the same component files packaged in the downloadable blueprint for this setup.

components/webhook_ingestion.py

Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.

from __future__ import annotations

from dataclasses import dataclass

import pulumi
import pulumi_aws as aws


@dataclass
class WebhookIngestion:
    function_url: pulumi.Output[str]
    function_name: pulumi.Output[str]

    def __init__(
        self,
        name: str,
        *,
        bucket_name: pulumi.Input[str],
        bucket_arn: pulumi.Input[str],
        webhook_secret: pulumi.Input[str],
        firehose_stream_name: pulumi.Input[str] | None = None,
        firehose_stream_arn: pulumi.Input[str] | None = None,
    ) -> None:
        policy_statements = [
            aws.iam.GetPolicyDocumentStatementArgs(
                effect="Allow",
                actions=[
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                ],
                resources=["arn:aws:logs:*:*:*"],
            ),
            aws.iam.GetPolicyDocumentStatementArgs(
                effect="Allow",
                actions=["s3:PutObject"],
                resources=[pulumi.Output.from_input(bucket_arn).apply(lambda arn: f"{arn}/incoming/*")],
            ),
        ]
        if firehose_stream_arn is not None:
            policy_statements.append(
                aws.iam.GetPolicyDocumentStatementArgs(
                    effect="Allow",
                    actions=["firehose:PutRecord", "firehose:PutRecordBatch"],
                    resources=[firehose_stream_arn],
                )
            )

        assume_role_policy = aws.iam.get_policy_document_output(
            statements=[
                aws.iam.GetPolicyDocumentStatementArgs(
                    effect="Allow",
                    principals=[
                        aws.iam.GetPolicyDocumentStatementPrincipalArgs(
                            type="Service",
                            identifiers=["lambda.amazonaws.com"],
                        )
                    ],
                    actions=["sts:AssumeRole"],
                )
            ]
        )

        role = aws.iam.Role(
            f"{name}-role",
            assume_role_policy=assume_role_policy.json,
        )

        policy = aws.iam.get_policy_document_output(
            statements=policy_statements
        )

        environment_variables = {
            "LANDING_BUCKET": bucket_name,
            "LANDING_PREFIX": "incoming",
            "WEBHOOK_SECRET": webhook_secret,
        }
        if firehose_stream_name is not None:
            environment_variables["FIREHOSE_STREAM_NAME"] = firehose_stream_name

        aws.iam.RolePolicy(
            f"{name}-policy",
            role=role.id,
            policy=policy.json,
        )

        function = aws.lambda_.Function(
            f"{name}-function",
            runtime="python3.11",
            role=role.arn,
            handler="webhook_handler.handler",
            timeout=30,
            memory_size=256,
            code=pulumi.AssetArchive(
                {"webhook_handler.py": pulumi.FileAsset("lambda/webhook_handler.py")}
            ),
            environment=aws.lambda_.FunctionEnvironmentArgs(
                variables=environment_variables
            ),
        )

        function_url = aws.lambda_.FunctionUrl(
            f"{name}-url",
            authorization_type="NONE",
            function_name=function.name,
            cors=aws.lambda_.FunctionUrlCorsArgs(
                allow_methods=["POST"],
                allow_origins=["*"],
            ),
        )

        self.function_url = function_url.function_url
        self.function_name = function.name

components/batch_pipeline.py

Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the batch COPY INTO loading path.

from __future__ import annotations

from dataclasses import dataclass

import pulumi
import pulumi_aws as aws
import pulumi_snowflake as snowflake


def _copy_into_statement(database: pulumi.Input[str], schema_name: pulumi.Input[str]) -> pulumi.Output[str]:
    return pulumi.Output.all(database, schema_name).apply(
        lambda args: (
            f'COPY INTO "{args[0]}"."{args[1]}"."WEBHOOK_EVENTS" '
            f'FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() '
            f'FROM @"{args[0]}"."{args[1]}"."WEBHOOK_EVENTS_STAGE") '
            "FILE_FORMAT = (TYPE = JSON)"
        )
    )


@dataclass
class BatchPipeline:
    stage_name: pulumi.Output[str]
    table_name: pulumi.Output[str]
    task_name: pulumi.Output[str]

    def __init__(
        self,
        name: str,
        *,
        bucket_name: pulumi.Input[str],
        bucket_arn: pulumi.Input[str],
        database: pulumi.Input[str],
        schema_name: pulumi.Input[str],
        warehouse_name: pulumi.Input[str],
        task_interval_minutes: int,
    ) -> None:
        loader_user = aws.iam.User(f"{name}-stage-user")
        loader_key = aws.iam.AccessKey(f"{name}-stage-key", user=loader_user.name)

        loader_policy = aws.iam.get_policy_document_output(
            statements=[
                aws.iam.GetPolicyDocumentStatementArgs(
                    effect="Allow",
                    actions=["s3:ListBucket"],
                    resources=[bucket_arn],
                ),
                aws.iam.GetPolicyDocumentStatementArgs(
                    effect="Allow",
                    actions=["s3:GetObject"],
                    resources=[pulumi.Output.from_input(bucket_arn).apply(lambda arn: f"{arn}/incoming/*")],
                ),
            ]
        )

        aws.iam.UserPolicy(
            f"{name}-stage-policy",
            user=loader_user.name,
            policy=loader_policy.json,
        )

        table = snowflake.Table(
            f"{name}-table",
            database=database,
            schema=schema_name,
            name="WEBHOOK_EVENTS",
            columns=[
                snowflake.TableColumnArgs(name="FILENAME", type="STRING", nullable=False),
                snowflake.TableColumnArgs(
                    name="LAST_MODIFIED_AT",
                    type="TIMESTAMP_NTZ",
                    nullable=False,
                ),
                snowflake.TableColumnArgs(name="CONTENT", type="VARIANT"),
                snowflake.TableColumnArgs(name="LOADED_AT", type="TIMESTAMP_NTZ"),
            ],
        )

        stage = snowflake.StageExternalS3(
            f"{name}-stage",
            database=database,
            schema=schema_name,
            name="WEBHOOK_EVENTS_STAGE",
            url=pulumi.Output.from_input(bucket_name).apply(lambda current: f"s3://{current}/incoming/"),
            credentials=snowflake.StageExternalS3CredentialsArgs(
                aws_key_id=loader_key.id,
                aws_secret_key=loader_key.secret,
            ),
        )

        task = snowflake.Task(
            f"{name}-task",
            database=database,
            schema=schema_name,
            name="WEBHOOK_EVENTS_TASK",
            warehouse=warehouse_name,
            started=True,
            schedule={"minutes": task_interval_minutes},
            sql_statement=_copy_into_statement(database, schema_name),
        )

        self.stage_name = stage.fully_qualified_name
        self.table_name = table.name
        self.warehouse_name = pulumi.Output.from_input(warehouse_name)
        self.task_name = task.name

components/webhook_ingestion.ts

Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.

import * as aws from "@pulumi/aws";
import * as pulumi from "@pulumi/pulumi";

export interface WebhookIngestionArgs {
    bucketName: pulumi.Input<string>;
    bucketArn: pulumi.Input<string>;
    webhookSecret: pulumi.Input<string>;
    firehoseStreamName?: pulumi.Input<string>;
    firehoseStreamArn?: pulumi.Input<string>;
}

export class WebhookIngestion {
    public readonly functionUrl: pulumi.Output<string>;
    public readonly functionName: pulumi.Output<string>;

    constructor(name: string, args: WebhookIngestionArgs) {
        const assumeRole = aws.iam.getPolicyDocumentOutput({
            statements: [
                {
                    effect: "Allow",
                    principals: [
                        {
                            type: "Service",
                            identifiers: ["lambda.amazonaws.com"],
                        },
                    ],
                    actions: ["sts:AssumeRole"],
                },
            ],
        });

        const role = new aws.iam.Role(`${name}-role`, {
            assumeRolePolicy: assumeRole.json,
        });

        const statements: aws.types.input.iam.GetPolicyDocumentStatementArgs[] = [
            {
                effect: "Allow",
                actions: [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                ],
                resources: ["arn:aws:logs:*:*:*"],
            },
            {
                effect: "Allow",
                actions: ["s3:PutObject"],
                resources: [pulumi.interpolate`${args.bucketArn}/incoming/*`],
            },
        ];
        if (args.firehoseStreamArn) {
            statements.push({
                effect: "Allow",
                actions: ["firehose:PutRecord", "firehose:PutRecordBatch"],
                resources: [args.firehoseStreamArn],
            });
        }

        const policy = aws.iam.getPolicyDocumentOutput({
            statements,
        });

        const environmentVariables: Record<string, pulumi.Input<string>> = {
            LANDING_BUCKET: args.bucketName,
            LANDING_PREFIX: "incoming",
            WEBHOOK_SECRET: args.webhookSecret,
        };
        if (args.firehoseStreamName) {
            environmentVariables.FIREHOSE_STREAM_NAME = args.firehoseStreamName;
        }

        new aws.iam.RolePolicy(`${name}-policy`, {
            role: role.id,
            policy: policy.json,
        });

        const fn = new aws.lambda.Function(`${name}-function`, {
            runtime: "python3.11",
            role: role.arn,
            handler: "webhook_handler.handler",
            timeout: 30,
            memorySize: 256,
            code: new pulumi.asset.AssetArchive({
                "webhook_handler.py": new pulumi.asset.FileAsset("lambda/webhook_handler.py"),
            }),
            environment: {
                variables: environmentVariables,
            },
        });

        const url = new aws.lambda.FunctionUrl(`${name}-url`, {
            authorizationType: "NONE",
            functionName: fn.name,
            cors: {
                allowMethods: ["POST"],
                allowOrigins: ["*"],
            },
        });

        this.functionUrl = url.functionUrl;
        this.functionName = fn.name;
    }
}

components/batch_pipeline.ts

Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the batch COPY INTO loading path.

import * as aws from "@pulumi/aws";
import * as pulumi from "@pulumi/pulumi";
import * as snowflake from "@pulumi/snowflake";

export interface BatchPipelineArgs {
    bucketName: pulumi.Input<string>;
    bucketArn: pulumi.Input<string>;
    database: pulumi.Input<string>;
    schemaName: pulumi.Input<string>;
    warehouseName: pulumi.Input<string>;
    taskIntervalMinutes: number;
}

function copyIntoStatement(database: pulumi.Input<string>, schemaName: pulumi.Input<string>) {
    return pulumi.all([database, schemaName]).apply(([currentDatabase, currentSchema]) =>
        `COPY INTO "${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS" ` +
        `FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() ` +
        `FROM @"${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS_STAGE") ` +
        "FILE_FORMAT = (TYPE = JSON)",
    );
}

export class BatchPipeline {
    public readonly stageName: pulumi.Output<string>;
    public readonly tableName: pulumi.Output<string>;
    public readonly warehouseName: pulumi.Output<string>;
    public readonly taskName: pulumi.Output<string>;

    constructor(name: string, args: BatchPipelineArgs) {
        const loaderUser = new aws.iam.User(`${name}-stage-user`);
        const loaderKey = new aws.iam.AccessKey(`${name}-stage-key`, {
            user: loaderUser.name,
        });

        const loaderPolicy = aws.iam.getPolicyDocumentOutput({
            statements: [
                {
                    effect: "Allow",
                    actions: ["s3:ListBucket"],
                    resources: [args.bucketArn],
                },
                {
                    effect: "Allow",
                    actions: ["s3:GetObject"],
                    resources: [pulumi.interpolate`${args.bucketArn}/incoming/*`],
                },
            ],
        });

        new aws.iam.UserPolicy(`${name}-stage-policy`, {
            user: loaderUser.name,
            policy: loaderPolicy.json,
        });

        const table = new snowflake.Table(`${name}-table`, {
            database: args.database,
            schema: args.schemaName,
            name: "WEBHOOK_EVENTS",
            columns: [
                { name: "FILENAME", type: "STRING", nullable: false },
                { name: "LAST_MODIFIED_AT", type: "TIMESTAMP_NTZ", nullable: false },
                { name: "CONTENT", type: "VARIANT" },
                { name: "LOADED_AT", type: "TIMESTAMP_NTZ" },
            ],
        });

        const stage = new snowflake.StageExternalS3(`${name}-stage`, {
            database: args.database,
            schema: args.schemaName,
            name: "WEBHOOK_EVENTS_STAGE",
            url: pulumi.interpolate`s3://${args.bucketName}/incoming/`,
            credentials: {
                awsKeyId: loaderKey.id,
                awsSecretKey: loaderKey.secret,
            },
        });

        const task = new snowflake.Task(`${name}-task`, {
            database: args.database,
            schema: args.schemaName,
            name: "WEBHOOK_EVENTS_TASK",
            warehouse: args.warehouseName,
            started: true,
            schedule: { minutes: args.taskIntervalMinutes },
            sqlStatement: copyIntoStatement(args.database, args.schemaName),
        });

        this.stageName = stage.fullyQualifiedName;
        this.tableName = table.name;
        this.warehouseName = pulumi.output(args.warehouseName);
        this.taskName = task.name;
    }
}

components/webhook_ingestion.go

Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.

package components

import (
	"fmt"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

type WebhookIngestionArgs struct {
	BucketName    pulumi.StringInput
	BucketArn     pulumi.StringInput
	WebhookSecret pulumi.StringInput
}

type WebhookIngestion struct {
	FunctionUrl  pulumi.StringOutput
	FunctionName pulumi.StringOutput
}

func NewWebhookIngestion(ctx *pulumi.Context, name string, args *WebhookIngestionArgs) (*WebhookIngestion, error) {
	assumeRolePolicy := `{
	  "Version": "2012-10-17",
	  "Statement": [
	    {
	      "Effect": "Allow",
	      "Principal": { "Service": "lambda.amazonaws.com" },
	      "Action": "sts:AssumeRole"
	    }
	  ]
	}`

	role, err := iam.NewRole(ctx, fmt.Sprintf("%s-role", name), &iam.RoleArgs{
		AssumeRolePolicy: pulumi.String(assumeRolePolicy),
	})
	if err != nil {
		return nil, err
	}

	policy := pulumi.All(args.BucketArn).ApplyT(func(values []interface{}) (string, error) {
		bucketArn := values[0].(string)
		return fmt.Sprintf(`{
	  "Version": "2012-10-17",
	  "Statement": [
	    {
	      "Effect": "Allow",
	      "Action": [
	        "logs:CreateLogGroup",
	        "logs:CreateLogStream",
	        "logs:PutLogEvents"
	      ],
	      "Resource": "arn:aws:logs:*:*:*"
	    },
	    {
	      "Effect": "Allow",
	      "Action": ["s3:PutObject"],
	      "Resource": "%s/incoming/*"
	    }
	  ]
	}`, bucketArn), nil
	}).(pulumi.StringOutput)

	_, err = iam.NewRolePolicy(ctx, fmt.Sprintf("%s-policy", name), &iam.RolePolicyArgs{
		Role:   role.ID(),
		Policy: policy,
	})
	if err != nil {
		return nil, err
	}

	function, err := lambda.NewFunction(ctx, fmt.Sprintf("%s-function", name), &lambda.FunctionArgs{
		Runtime:    pulumi.String("python3.11"),
		Role:       role.Arn,
		Handler:    pulumi.String("webhook_handler.handler"),
		Timeout:    pulumi.Int(30),
		MemorySize: pulumi.Int(256),
		Code:       pulumi.NewFileArchive("lambda"),
		Environment: &lambda.FunctionEnvironmentArgs{
			Variables: pulumi.StringMap{
				"LANDING_BUCKET":  args.BucketName,
				"LANDING_PREFIX":  pulumi.String("incoming"),
				"WEBHOOK_SECRET": args.WebhookSecret,
			},
		},
	})
	if err != nil {
		return nil, err
	}

	functionUrl, err := lambda.NewFunctionUrl(ctx, fmt.Sprintf("%s-url", name), &lambda.FunctionUrlArgs{
		AuthorizationType: pulumi.String("NONE"),
		FunctionName:      function.Name,
		Cors: &lambda.FunctionUrlCorsArgs{
			AllowMethods: pulumi.StringArray{pulumi.String("POST")},
			AllowOrigins: pulumi.StringArray{pulumi.String("*")},
		},
	})
	if err != nil {
		return nil, err
	}

	return &WebhookIngestion{
		FunctionUrl:  functionUrl.FunctionUrl,
		FunctionName: function.Name,
	}, nil
}

components/batch_pipeline.go

Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the batch COPY INTO loading path.

package components

import (
	"fmt"

	"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
	"github.com/pulumi/pulumi-snowflake/sdk/v2/go/snowflake"
	"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)

type BatchPipelineArgs struct {
	BucketName pulumi.StringInput
	BucketArn  pulumi.StringInput
	Database   pulumi.StringInput
	SchemaName pulumi.StringInput
	WarehouseName pulumi.StringInput
	TaskIntervalMinutes int
}

type BatchPipeline struct {
	StageName     pulumi.StringOutput
	TableName     pulumi.StringOutput
	WarehouseName pulumi.StringOutput
	TaskName      pulumi.StringOutput
}

func batchCopyIntoStatement(database pulumi.StringInput, schemaName pulumi.StringInput) pulumi.StringOutput {
	return pulumi.All(database, schemaName).ApplyT(func(values []interface{}) (string, error) {
		currentDatabase := values[0].(string)
		currentSchema := values[1].(string)
		return fmt.Sprintf(
			"COPY INTO \"%s\".\"%s\".\"WEBHOOK_EVENTS\" FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() FROM @\"%s\".\"%s\".\"WEBHOOK_EVENTS_STAGE\") FILE_FORMAT = (TYPE = JSON)",
			currentDatabase,
			currentSchema,
			currentDatabase,
			currentSchema,
		), nil
	}).(pulumi.StringOutput)
}

func NewBatchPipeline(ctx *pulumi.Context, name string, args *BatchPipelineArgs) (*BatchPipeline, error) {
	loaderUser, err := iam.NewUser(ctx, fmt.Sprintf("%s-stage-user", name), nil)
	if err != nil {
		return nil, err
	}

	loaderKey, err := iam.NewAccessKey(ctx, fmt.Sprintf("%s-stage-key", name), &iam.AccessKeyArgs{
		User: loaderUser.Name,
	})
	if err != nil {
		return nil, err
	}

	loaderPolicy := pulumi.All(args.BucketArn).ApplyT(func(values []interface{}) (string, error) {
		bucketArn := values[0].(string)
		return fmt.Sprintf(`{
	  "Version": "2012-10-17",
	  "Statement": [
	    {
	      "Effect": "Allow",
	      "Action": ["s3:ListBucket"],
	      "Resource": "%s"
	    },
	    {
	      "Effect": "Allow",
	      "Action": ["s3:GetObject"],
	      "Resource": "%s/incoming/*"
	    }
	  ]
	}`, bucketArn, bucketArn), nil
	}).(pulumi.StringOutput)

	_, err = iam.NewUserPolicy(ctx, fmt.Sprintf("%s-stage-policy", name), &iam.UserPolicyArgs{
		User:   loaderUser.Name,
		Policy: loaderPolicy,
	})
	if err != nil {
		return nil, err
	}

	table, err := snowflake.NewTable(ctx, fmt.Sprintf("%s-table", name), &snowflake.TableArgs{
		Database: args.Database,
		Schema:   args.SchemaName,
		Name:     pulumi.String("WEBHOOK_EVENTS"),
		Columns: snowflake.TableColumnArray{
			&snowflake.TableColumnArgs{Name: pulumi.String("FILENAME"), Type: pulumi.String("STRING"), Nullable: pulumi.Bool(false)},
			&snowflake.TableColumnArgs{Name: pulumi.String("LAST_MODIFIED_AT"), Type: pulumi.String("TIMESTAMP_NTZ"), Nullable: pulumi.Bool(false)},
			&snowflake.TableColumnArgs{Name: pulumi.String("CONTENT"), Type: pulumi.String("VARIANT")},
			&snowflake.TableColumnArgs{Name: pulumi.String("LOADED_AT"), Type: pulumi.String("TIMESTAMP_NTZ")},
		},
	})
	if err != nil {
		return nil, err
	}

	stage, err := snowflake.NewStageExternalS3(ctx, fmt.Sprintf("%s-stage", name), &snowflake.StageExternalS3Args{
		Database: args.Database,
		Schema:   args.SchemaName,
		Name:     pulumi.String("WEBHOOK_EVENTS_STAGE"),
		Url:      pulumi.Sprintf("s3://%s/incoming/", args.BucketName),
		Credentials: &snowflake.StageExternalS3CredentialsArgs{
			AwsKeyId:     loaderKey.ID().ToStringOutput(),
			AwsSecretKey: loaderKey.Secret,
		},
	})
	if err != nil {
		return nil, err
	}

	task, err := snowflake.NewTask(ctx, fmt.Sprintf("%s-task", name), &snowflake.TaskArgs{
		Database:     args.Database,
		Schema:       args.SchemaName,
		Name:         pulumi.String("WEBHOOK_EVENTS_TASK"),
		Warehouse:    args.WarehouseName,
		Started:      pulumi.Bool(true),
		Schedule:     &snowflake.TaskScheduleArgs{Minutes: pulumi.Int(args.TaskIntervalMinutes)},
		SqlStatement: batchCopyIntoStatement(args.Database, args.SchemaName),
	})
	if err != nil {
		return nil, err
	}

	return &BatchPipeline{
		StageName:     stage.FullyQualifiedName,
		TableName:     table.Name,
		WarehouseName: args.WarehouseName.ToStringOutput(),
		TaskName:      task.Name,
	}, nil
}

Verify the data landed

After you send a test event, query Snowflake to confirm the records are visible:

SELECT FILENAME,
       LAST_MODIFIED_AT,
       CONTENT,
       LOADED_AT
FROM LANDING_ZONE_WEBHOOKS.RAW.WEBHOOK_EVENTS
ORDER BY LOADED_AT DESC;

For this path, payloads stay in S3 until the Snowflake task runs COPY INTO against the external stage.

Operating notes

  • Keep the first table as a raw landing zone. Flatten and model into downstream tables later.
  • Rotate the shared webhook secret when you roll senders or suspect exposure.
  • Watch the landing storage path and Snowflake task history so failed loads and malformed payloads do not go unnoticed.
  • Use a least-privilege Snowflake reader role for analysts instead of querying with the loading role.
  • When you choose batch loading, tune taskIntervalMinutes to match how quickly you want new files copied into Snowflake and how much warehouse activity you want between loads.

Frequently asked questions

When should I choose batch loading?
Choose batch loading when you want predictable load windows, lower always-on activity, or tighter control over when COPY INTO runs. This blueprint provisions a Snowflake task that runs once an hour so the path is still end to end. Tune taskIntervalMinutes if you want a tighter or looser cadence.
Can I keep the raw payloads in cloud storage?
Yes. Every path writes the raw payloads to cloud storage before Snowflake loads them (S3 on AWS, Blob Storage on Azure, Cloud Storage on GCP). See the variant page you picked for specifics on how the loading path reads from or retains those objects.