This guide shows how to land GitHub events in Snowflake on AWS with Snowpipe auto-ingest.
It covers the storage, eventing, and Snowflake objects for this setup so you can get raw events flowing first and shape downstream tables later.
On AWS, this guide uses AWS Lambda for the public handler and Amazon S3 for staging or backup storage. From there the loading path for this variant (Snowpipe auto-ingest) carries records into Snowflake.
The Snowpipe auto-ingest path keeps object storage in the middle. Your webhook handler writes files to cloud storage, and Snowflake auto-ingests them. It is a good fit when you want a durable raw archive or need a cloud-native queueing and retry story before Snowflake loads the data. This blueprint provisions the public Lambda URL, the S3 landing zone, the Snowflake stage and table, and an auto-ingest pipe wired to S3 object-created notifications.
This guide provisions the GitHub webhook for you and points it at the deployed public endpoint so repository events land in Snowflake on the first deploy.
Quickstart
- Download the blueprint zip for your language below, or create a new Pulumi project with the same file layout shown in the Download section.
- Install dependencies for your selected language and configure Snowflake plus AWS.
- Deploy the stack to create the public Lambda URL, the S3 landing bucket, and the Snowflake loading objects.
- Register the webhook source and send a test event.
- Query the landing table in Snowflake to confirm the event arrived.
After the first test event, new rows usually appear in about two minutes.
Prerequisites
- a Pulumi account and the Pulumi CLI
- an AWS account where you can create the Lambda, S3, IAM, and loading-path resources this variant provisions
- a Snowflake account where you can create databases, schemas, and loading roles
- a GitHub personal access token or app credential that can manage repository webhooks
- the GitHub repository name you want to connect, set with
pulumi config set webhook-repo <repo>
For the Pulumi language you selected:
Initialize your stack for AWS with:
pulumi stack init dev
pulumi config set aws:region us-west-2
Set up credentials with Pulumi ESC
This guide needs cloud credentials, Snowflake credentials, and any source-specific token required to provision the webhook. A single ESC environment is usually the smallest setup that still keeps secrets out of local files.
values:
aws:
login:
fn::open::aws-login:
oidc:
roleArn: arn:aws:iam::123456789012:role/pulumi-esc-oidc
sessionName: webhook-snowflake
snowflake:
login:
fn::open::snowflake-login:
oidc:
account: <your-snowflake-account>
user: ESC_SERVICE_USER
organizationName: <your-org-name>
accountName: <your-account-name>
github:
token:
fn::secret: <your-github-token>
owner: <your-github-org-or-user>
environmentVariables:
SNOWFLAKE_USER: ${snowflake.login.user}
SNOWFLAKE_TOKEN: ${snowflake.login.token}
pulumiConfig:
snowflake:organizationName: ${snowflake.organizationName}
snowflake:accountName: ${snowflake.accountName}
snowflake:authenticator: OAUTH
snowflake:role: PULUMI_DEPLOYER
github:token: ${github.token}
github:owner: ${github.owner}
Then reference it from your stack config:
environment:
- <your-org>/<your-environment>
config:
webhook-to-snowflake:database: LANDING_ZONE_WEBHOOKS
What you get in the download
The downloadable example zip includes:
Pulumi.yaml- the Pulumi program, dependency files, cloud runtime support files, and reusable components for the language you pick below
- a README with a shorter quick start for this exact setup
__main__.pyas the Pulumi entrypointcomponents/webhook_ingestion.pyfor the public webhook endpointcomponents/snowpipe_pipeline.pyfor the Snowflake loading pathlambda/webhook_handler.pyfor request validation and writes into the selected ingestion pathrequirements.txtfor the root Pulumi project
index.tsas the Pulumi entrypointcomponents/webhook_ingestion.tsfor the public webhook endpointcomponents/snowpipe_pipeline.tsfor the Snowflake loading pathlambda/webhook_handler.pyfor request validation and writes into the selected ingestion pathpackage.jsonandtsconfig.jsonfor the root Pulumi project
main.goas the Pulumi entrypointcomponents/webhook_ingestion.gofor the public webhook endpointcomponents/snowpipe_pipeline.gofor the Snowflake loading pathlambda/webhook_handler.pyfor request validation and writes into the selected ingestion pathgo.modfor the root Pulumi project
The next sections show the same entrypoint and component files that ship in the download.
Blueprint Pulumi program
This blueprint shows the full resource wiring for the AWS Snowpipe auto-ingest path with a GitHub source. The downloadable repo uses the same entrypoint and component files shown below.
import pulumi
import pulumi_aws as aws
import pulumi_random as random
import pulumi_snowflake as snowflake
import pulumi_github as github
from components.snowpipe_pipeline import SnowpipePipeline
from components.webhook_ingestion import WebhookIngestion
config = pulumi.Config()
database_name = config.get("database") or "LANDING_ZONE_WEBHOOKS"
webhook_repo = config.require("webhook-repo")
bucket = aws.s3.Bucket("landing-bucket")
database = snowflake.Database("landing-db", name=database_name)
schema = snowflake.Schema("raw-schema", name="RAW", database=database.name)
webhook_secret = random.RandomPassword("webhook-secret", length=32, special=False)
ingestion = WebhookIngestion(
"source-webhooks",
bucket_name=bucket.bucket,
bucket_arn=bucket.arn,
webhook_secret=webhook_secret.result,
)
pipeline = SnowpipePipeline(
"source-events",
bucket_id=bucket.id,
bucket_name=bucket.bucket,
bucket_arn=bucket.arn,
database=database.name,
schema_name=schema.name,
)
endpoint_url = ingestion.function_url
github.RepositoryWebhook(
"source-webhook",
repository=webhook_repo,
configuration=github.RepositoryWebhookConfigurationArgs(
url=endpoint_url,
content_type="json",
secret=webhook_secret.result,
),
events=["push", "pull_request", "issues", "star"],
)
pulumi.export("landing_bucket_name", bucket.bucket)
pulumi.export("stage_name", pipeline.stage_name)
pulumi.export("table_name", pipeline.table_name)
import * as aws from "@pulumi/aws";
import * as pulumi from "@pulumi/pulumi";
import * as random from "@pulumi/random";
import * as snowflake from "@pulumi/snowflake";
import * as github from "@pulumi/github";
import { SnowpipePipeline } from "./components/snowpipe_pipeline";
import { WebhookIngestion } from "./components/webhook_ingestion";
const config = new pulumi.Config();
const databaseName = config.get("database") ?? "LANDING_ZONE_WEBHOOKS";
const webhookRepo = config.require("webhook-repo");
const bucket = new aws.s3.Bucket("landing-bucket");
const database = new snowflake.Database("landing-db", { name: databaseName });
const schema = new snowflake.Schema("raw-schema", {
name: "RAW",
database: database.name,
});
const sharedSecret = new random.RandomPassword("webhook-secret", {
length: 32,
special: false,
});
const ingestion = new WebhookIngestion("source-webhooks", {
bucketName: bucket.bucket,
bucketArn: bucket.arn,
webhookSecret: sharedSecret.result,
});
const pipeline = new SnowpipePipeline("source-events", {
bucketId: bucket.id,
bucketName: bucket.bucket,
bucketArn: bucket.arn,
database: database.name,
schemaName: schema.name,
});
const endpointUrl = ingestion.functionUrl;
new github.RepositoryWebhook("source-webhook", {
repository: webhookRepo,
configuration: {
url: endpointUrl,
contentType: "json",
secret: sharedSecret.result,
},
events: ["push", "pull_request", "issues", "star"],
});
export const landingBucketName = bucket.bucket;
export const stageName = pipeline.stageName;
export const tableName = pipeline.tableName;
package main
import (
"webhook-to-snowflake-aws-snowpipe-github/components"
"github.com/pulumi/pulumi-github/sdk/v6/go/github"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi-random/sdk/v4/go/random"
"github.com/pulumi/pulumi-snowflake/sdk/v2/go/snowflake"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
)
func main() {
pulumi.Run(Program)
}
func Program(ctx *pulumi.Context) error {
cfg := config.New(ctx, "")
databaseName := cfg.Get("database")
if databaseName == "" {
databaseName = "LANDING_ZONE_WEBHOOKS"
}
webhookRepo := cfg.Require("webhook-repo")
bucket, err := s3.NewBucket(ctx, "landing-bucket", nil)
if err != nil {
return err
}
database, err := snowflake.NewDatabase(ctx, "landing-db", &snowflake.DatabaseArgs{
Name: pulumi.String(databaseName),
})
if err != nil {
return err
}
schema, err := snowflake.NewSchema(ctx, "raw-schema", &snowflake.SchemaArgs{
Name: pulumi.String("RAW"),
Database: database.Name,
})
if err != nil {
return err
}
sharedSecret, err := random.NewRandomPassword(ctx, "webhook-secret", &random.RandomPasswordArgs{
Length: pulumi.Int(32),
Special: pulumi.Bool(false),
})
if err != nil {
return err
}
ingestion, err := components.NewWebhookIngestion(ctx, "source-webhooks", &components.WebhookIngestionArgs{
BucketName: bucket.Bucket,
BucketArn: bucket.Arn,
WebhookSecret: sharedSecret.Result,
})
if err != nil {
return err
}
pipeline, err := components.NewSnowpipePipeline(ctx, "source-events", &components.SnowpipePipelineArgs{
BucketId: bucket.ID().ToStringOutput(),
BucketName: bucket.Bucket,
BucketArn: bucket.Arn,
Database: database.Name,
SchemaName: schema.Name,
})
if err != nil {
return err
}
endpointUrl := ingestion.FunctionUrl
_, err = github.NewRepositoryWebhook(ctx, "source-webhook", &github.RepositoryWebhookArgs{
Repository: pulumi.String(webhookRepo),
Configuration: &github.RepositoryWebhookConfigurationArgs{
Url: endpointUrl,
ContentType: pulumi.String("json"),
Secret: sharedSecret.Result,
},
Events: pulumi.StringArray{
pulumi.String("push"),
pulumi.String("pull_request"),
pulumi.String("issues"),
pulumi.String("star"),
},
})
if err != nil {
return err
}
ctx.Export("landingBucketName", bucket.Bucket)
ctx.Export("stageName", pipeline.StageName)
ctx.Export("tableName", pipeline.TableName)
return nil
}
Reusable components
The entrypoint stays small because the real ingestion work lives in reusable modules. These are the same component files packaged in the downloadable blueprint for this setup.
components/webhook_ingestion.py
Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.
from __future__ import annotations
from dataclasses import dataclass
import pulumi
import pulumi_aws as aws
@dataclass
class WebhookIngestion:
function_url: pulumi.Output[str]
function_name: pulumi.Output[str]
def __init__(
self,
name: str,
*,
bucket_name: pulumi.Input[str],
bucket_arn: pulumi.Input[str],
webhook_secret: pulumi.Input[str],
firehose_stream_name: pulumi.Input[str] | None = None,
firehose_stream_arn: pulumi.Input[str] | None = None,
) -> None:
policy_statements = [
aws.iam.GetPolicyDocumentStatementArgs(
effect="Allow",
actions=[
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
resources=["arn:aws:logs:*:*:*"],
),
aws.iam.GetPolicyDocumentStatementArgs(
effect="Allow",
actions=["s3:PutObject"],
resources=[pulumi.Output.from_input(bucket_arn).apply(lambda arn: f"{arn}/incoming/*")],
),
]
if firehose_stream_arn is not None:
policy_statements.append(
aws.iam.GetPolicyDocumentStatementArgs(
effect="Allow",
actions=["firehose:PutRecord", "firehose:PutRecordBatch"],
resources=[firehose_stream_arn],
)
)
assume_role_policy = aws.iam.get_policy_document_output(
statements=[
aws.iam.GetPolicyDocumentStatementArgs(
effect="Allow",
principals=[
aws.iam.GetPolicyDocumentStatementPrincipalArgs(
type="Service",
identifiers=["lambda.amazonaws.com"],
)
],
actions=["sts:AssumeRole"],
)
]
)
role = aws.iam.Role(
f"{name}-role",
assume_role_policy=assume_role_policy.json,
)
policy = aws.iam.get_policy_document_output(
statements=policy_statements
)
environment_variables = {
"LANDING_BUCKET": bucket_name,
"LANDING_PREFIX": "incoming",
"WEBHOOK_SECRET": webhook_secret,
}
if firehose_stream_name is not None:
environment_variables["FIREHOSE_STREAM_NAME"] = firehose_stream_name
aws.iam.RolePolicy(
f"{name}-policy",
role=role.id,
policy=policy.json,
)
function = aws.lambda_.Function(
f"{name}-function",
runtime="python3.11",
role=role.arn,
handler="webhook_handler.handler",
timeout=30,
memory_size=256,
code=pulumi.AssetArchive(
{"webhook_handler.py": pulumi.FileAsset("lambda/webhook_handler.py")}
),
environment=aws.lambda_.FunctionEnvironmentArgs(
variables=environment_variables
),
)
function_url = aws.lambda_.FunctionUrl(
f"{name}-url",
authorization_type="NONE",
function_name=function.name,
cors=aws.lambda_.FunctionUrlCorsArgs(
allow_methods=["POST"],
allow_origins=["*"],
),
)
self.function_url = function_url.function_url
self.function_name = function.name
components/snowpipe_pipeline.py
Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the Snowpipe auto-ingest loading path.
from __future__ import annotations
from dataclasses import dataclass
import pulumi
import pulumi_aws as aws
import pulumi_snowflake as snowflake
def _copy_into_statement(database: pulumi.Input[str], schema_name: pulumi.Input[str]) -> pulumi.Output[str]:
return pulumi.Output.all(database, schema_name).apply(
lambda args: (
f'COPY INTO "{args[0]}"."{args[1]}"."WEBHOOK_EVENTS" '
f'FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() '
f'FROM @"{args[0]}"."{args[1]}"."WEBHOOK_EVENTS_STAGE") '
"FILE_FORMAT = (TYPE = JSON)"
)
)
@dataclass
class SnowpipePipeline:
stage_name: pulumi.Output[str]
table_name: pulumi.Output[str]
pipe_name: pulumi.Output[str]
def __init__(
self,
name: str,
*,
bucket_id: pulumi.Input[str],
bucket_name: pulumi.Input[str],
bucket_arn: pulumi.Input[str],
database: pulumi.Input[str],
schema_name: pulumi.Input[str],
) -> None:
loader_user = aws.iam.User(f"{name}-stage-user")
loader_key = aws.iam.AccessKey(f"{name}-stage-key", user=loader_user.name)
loader_policy = aws.iam.get_policy_document_output(
statements=[
aws.iam.GetPolicyDocumentStatementArgs(
effect="Allow",
actions=["s3:ListBucket"],
resources=[bucket_arn],
),
aws.iam.GetPolicyDocumentStatementArgs(
effect="Allow",
actions=["s3:GetObject"],
resources=[pulumi.Output.from_input(bucket_arn).apply(lambda arn: f"{arn}/incoming/*")],
),
]
)
aws.iam.UserPolicy(
f"{name}-stage-policy",
user=loader_user.name,
policy=loader_policy.json,
)
table = snowflake.Table(
f"{name}-table",
database=database,
schema=schema_name,
name="WEBHOOK_EVENTS",
columns=[
snowflake.TableColumnArgs(name="FILENAME", type="STRING", nullable=False),
snowflake.TableColumnArgs(
name="LAST_MODIFIED_AT",
type="TIMESTAMP_NTZ",
nullable=False,
),
snowflake.TableColumnArgs(name="CONTENT", type="VARIANT"),
snowflake.TableColumnArgs(name="LOADED_AT", type="TIMESTAMP_NTZ"),
],
)
stage = snowflake.StageExternalS3(
f"{name}-stage",
database=database,
schema=schema_name,
name="WEBHOOK_EVENTS_STAGE",
url=pulumi.Output.from_input(bucket_name).apply(lambda current: f"s3://{current}/incoming/"),
credentials=snowflake.StageExternalS3CredentialsArgs(
aws_key_id=loader_key.id,
aws_secret_key=loader_key.secret,
),
)
topic = aws.sns.Topic(f"{name}-events")
topic_policy_document = pulumi.Output.all(bucket_arn, topic.arn).apply(
lambda args: f'''{{
"Version": "2012-10-17",
"Statement": [
{{
"Effect": "Allow",
"Principal": {{ "Service": "s3.amazonaws.com" }},
"Action": "SNS:Publish",
"Resource": "{args[1]}",
"Condition": {{
"ArnLike": {{
"aws:SourceArn": "{args[0]}"
}}
}}
}}
]
}}'''
)
topic_policy = aws.sns.TopicPolicy(
f"{name}-events-policy",
arn=topic.arn,
policy=topic_policy_document,
)
pipe = snowflake.Pipe(
f"{name}-pipe",
database=database,
schema=schema_name,
name="WEBHOOK_EVENTS_PIPE",
auto_ingest=True,
aws_sns_topic_arn=topic.arn,
copy_statement=_copy_into_statement(database, schema_name),
)
aws.s3.BucketNotification(
f"{name}-notifications",
bucket=bucket_id,
topics=[
aws.s3.BucketNotificationTopicArgs(
topic_arn=topic.arn,
events=["s3:ObjectCreated:*"],
filter_prefix="incoming/",
)
],
opts=pulumi.ResourceOptions(depends_on=[topic_policy]),
)
self.stage_name = stage.fully_qualified_name
self.table_name = table.name
self.pipe_name = pipe.name
components/webhook_ingestion.ts
Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.
import * as aws from "@pulumi/aws";
import * as pulumi from "@pulumi/pulumi";
export interface WebhookIngestionArgs {
bucketName: pulumi.Input<string>;
bucketArn: pulumi.Input<string>;
webhookSecret: pulumi.Input<string>;
firehoseStreamName?: pulumi.Input<string>;
firehoseStreamArn?: pulumi.Input<string>;
}
export class WebhookIngestion {
public readonly functionUrl: pulumi.Output<string>;
public readonly functionName: pulumi.Output<string>;
constructor(name: string, args: WebhookIngestionArgs) {
const assumeRole = aws.iam.getPolicyDocumentOutput({
statements: [
{
effect: "Allow",
principals: [
{
type: "Service",
identifiers: ["lambda.amazonaws.com"],
},
],
actions: ["sts:AssumeRole"],
},
],
});
const role = new aws.iam.Role(`${name}-role`, {
assumeRolePolicy: assumeRole.json,
});
const statements: aws.types.input.iam.GetPolicyDocumentStatementArgs[] = [
{
effect: "Allow",
actions: [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
resources: ["arn:aws:logs:*:*:*"],
},
{
effect: "Allow",
actions: ["s3:PutObject"],
resources: [pulumi.interpolate`${args.bucketArn}/incoming/*`],
},
];
if (args.firehoseStreamArn) {
statements.push({
effect: "Allow",
actions: ["firehose:PutRecord", "firehose:PutRecordBatch"],
resources: [args.firehoseStreamArn],
});
}
const policy = aws.iam.getPolicyDocumentOutput({
statements,
});
const environmentVariables: Record<string, pulumi.Input<string>> = {
LANDING_BUCKET: args.bucketName,
LANDING_PREFIX: "incoming",
WEBHOOK_SECRET: args.webhookSecret,
};
if (args.firehoseStreamName) {
environmentVariables.FIREHOSE_STREAM_NAME = args.firehoseStreamName;
}
new aws.iam.RolePolicy(`${name}-policy`, {
role: role.id,
policy: policy.json,
});
const fn = new aws.lambda.Function(`${name}-function`, {
runtime: "python3.11",
role: role.arn,
handler: "webhook_handler.handler",
timeout: 30,
memorySize: 256,
code: new pulumi.asset.AssetArchive({
"webhook_handler.py": new pulumi.asset.FileAsset("lambda/webhook_handler.py"),
}),
environment: {
variables: environmentVariables,
},
});
const url = new aws.lambda.FunctionUrl(`${name}-url`, {
authorizationType: "NONE",
functionName: fn.name,
cors: {
allowMethods: ["POST"],
allowOrigins: ["*"],
},
});
this.functionUrl = url.functionUrl;
this.functionName = fn.name;
}
}
components/snowpipe_pipeline.ts
Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the Snowpipe auto-ingest loading path.
import * as aws from "@pulumi/aws";
import * as pulumi from "@pulumi/pulumi";
import * as snowflake from "@pulumi/snowflake";
export interface SnowpipePipelineArgs {
bucketId: pulumi.Input<string>;
bucketName: pulumi.Input<string>;
bucketArn: pulumi.Input<string>;
database: pulumi.Input<string>;
schemaName: pulumi.Input<string>;
}
function copyIntoStatement(database: pulumi.Input<string>, schemaName: pulumi.Input<string>) {
return pulumi.all([database, schemaName]).apply(([currentDatabase, currentSchema]) =>
`COPY INTO "${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS" ` +
`FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() ` +
`FROM @"${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS_STAGE") ` +
"FILE_FORMAT = (TYPE = JSON)",
);
}
export class SnowpipePipeline {
public readonly stageName: pulumi.Output<string>;
public readonly tableName: pulumi.Output<string>;
public readonly pipeName: pulumi.Output<string>;
constructor(name: string, args: SnowpipePipelineArgs) {
const loaderUser = new aws.iam.User(`${name}-stage-user`);
const loaderKey = new aws.iam.AccessKey(`${name}-stage-key`, {
user: loaderUser.name,
});
const loaderPolicy = aws.iam.getPolicyDocumentOutput({
statements: [
{
effect: "Allow",
actions: ["s3:ListBucket"],
resources: [args.bucketArn],
},
{
effect: "Allow",
actions: ["s3:GetObject"],
resources: [pulumi.interpolate`${args.bucketArn}/incoming/*`],
},
],
});
new aws.iam.UserPolicy(`${name}-stage-policy`, {
user: loaderUser.name,
policy: loaderPolicy.json,
});
const table = new snowflake.Table(`${name}-table`, {
database: args.database,
schema: args.schemaName,
name: "WEBHOOK_EVENTS",
columns: [
{ name: "FILENAME", type: "STRING", nullable: false },
{ name: "LAST_MODIFIED_AT", type: "TIMESTAMP_NTZ", nullable: false },
{ name: "CONTENT", type: "VARIANT" },
{ name: "LOADED_AT", type: "TIMESTAMP_NTZ" },
],
});
const stage = new snowflake.StageExternalS3(`${name}-stage`, {
database: args.database,
schema: args.schemaName,
name: "WEBHOOK_EVENTS_STAGE",
url: pulumi.interpolate`s3://${args.bucketName}/incoming/`,
credentials: {
awsKeyId: loaderKey.id,
awsSecretKey: loaderKey.secret,
},
});
const topic = new aws.sns.Topic(`${name}-events`);
const topicPolicy = new aws.sns.TopicPolicy(`${name}-events-policy`, {
arn: topic.arn,
policy: pulumi.all([args.bucketArn, topic.arn]).apply(([bucketArn, topicArn]) => `{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": "s3.amazonaws.com" },
"Action": "SNS:Publish",
"Resource": "${topicArn}",
"Condition": {
"ArnLike": {
"aws:SourceArn": "${bucketArn}"
}
}
}
]
}`),
});
const pipe = new snowflake.Pipe(`${name}-pipe`, {
database: args.database,
schema: args.schemaName,
name: "WEBHOOK_EVENTS_PIPE",
autoIngest: true,
awsSnsTopicArn: topic.arn,
copyStatement: copyIntoStatement(args.database, args.schemaName),
});
new aws.s3.BucketNotification(`${name}-notifications`, {
bucket: args.bucketId,
topics: [
{
topicArn: topic.arn,
events: ["s3:ObjectCreated:*"],
filterPrefix: "incoming/",
},
],
}, { dependsOn: [topicPolicy] });
this.stageName = stage.fullyQualifiedName;
this.tableName = table.name;
this.pipeName = pipe.name;
}
}
components/webhook_ingestion.go
Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.
package components
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/lambda"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
type WebhookIngestionArgs struct {
BucketName pulumi.StringInput
BucketArn pulumi.StringInput
WebhookSecret pulumi.StringInput
}
type WebhookIngestion struct {
FunctionUrl pulumi.StringOutput
FunctionName pulumi.StringOutput
}
func NewWebhookIngestion(ctx *pulumi.Context, name string, args *WebhookIngestionArgs) (*WebhookIngestion, error) {
assumeRolePolicy := `{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": "lambda.amazonaws.com" },
"Action": "sts:AssumeRole"
}
]
}`
role, err := iam.NewRole(ctx, fmt.Sprintf("%s-role", name), &iam.RoleArgs{
AssumeRolePolicy: pulumi.String(assumeRolePolicy),
})
if err != nil {
return nil, err
}
policy := pulumi.All(args.BucketArn).ApplyT(func(values []interface{}) (string, error) {
bucketArn := values[0].(string)
return fmt.Sprintf(`{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": ["s3:PutObject"],
"Resource": "%s/incoming/*"
}
]
}`, bucketArn), nil
}).(pulumi.StringOutput)
_, err = iam.NewRolePolicy(ctx, fmt.Sprintf("%s-policy", name), &iam.RolePolicyArgs{
Role: role.ID(),
Policy: policy,
})
if err != nil {
return nil, err
}
function, err := lambda.NewFunction(ctx, fmt.Sprintf("%s-function", name), &lambda.FunctionArgs{
Runtime: pulumi.String("python3.11"),
Role: role.Arn,
Handler: pulumi.String("webhook_handler.handler"),
Timeout: pulumi.Int(30),
MemorySize: pulumi.Int(256),
Code: pulumi.NewFileArchive("lambda"),
Environment: &lambda.FunctionEnvironmentArgs{
Variables: pulumi.StringMap{
"LANDING_BUCKET": args.BucketName,
"LANDING_PREFIX": pulumi.String("incoming"),
"WEBHOOK_SECRET": args.WebhookSecret,
},
},
})
if err != nil {
return nil, err
}
functionUrl, err := lambda.NewFunctionUrl(ctx, fmt.Sprintf("%s-url", name), &lambda.FunctionUrlArgs{
AuthorizationType: pulumi.String("NONE"),
FunctionName: function.Name,
Cors: &lambda.FunctionUrlCorsArgs{
AllowMethods: pulumi.StringArray{pulumi.String("POST")},
AllowOrigins: pulumi.StringArray{pulumi.String("*")},
},
})
if err != nil {
return nil, err
}
return &WebhookIngestion{
FunctionUrl: functionUrl.FunctionUrl,
FunctionName: function.Name,
}, nil
}
components/snowpipe_pipeline.go
Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the Snowpipe auto-ingest loading path.
package components
import (
"fmt"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/iam"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/s3"
"github.com/pulumi/pulumi-aws/sdk/v7/go/aws/sns"
"github.com/pulumi/pulumi-snowflake/sdk/v2/go/snowflake"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
)
type SnowpipePipelineArgs struct {
BucketId pulumi.StringInput
BucketName pulumi.StringInput
BucketArn pulumi.StringInput
Database pulumi.StringInput
SchemaName pulumi.StringInput
}
type SnowpipePipeline struct {
StageName pulumi.StringOutput
TableName pulumi.StringOutput
PipeName pulumi.StringOutput
}
func snowpipeCopyIntoStatement(database pulumi.StringInput, schemaName pulumi.StringInput) pulumi.StringOutput {
return pulumi.All(database, schemaName).ApplyT(func(values []interface{}) (string, error) {
currentDatabase := values[0].(string)
currentSchema := values[1].(string)
return fmt.Sprintf(
"COPY INTO \"%s\".\"%s\".\"WEBHOOK_EVENTS\" FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() FROM @\"%s\".\"%s\".\"WEBHOOK_EVENTS_STAGE\") FILE_FORMAT = (TYPE = JSON)",
currentDatabase,
currentSchema,
currentDatabase,
currentSchema,
), nil
}).(pulumi.StringOutput)
}
func NewSnowpipePipeline(ctx *pulumi.Context, name string, args *SnowpipePipelineArgs) (*SnowpipePipeline, error) {
loaderUser, err := iam.NewUser(ctx, fmt.Sprintf("%s-stage-user", name), nil)
if err != nil {
return nil, err
}
loaderKey, err := iam.NewAccessKey(ctx, fmt.Sprintf("%s-stage-key", name), &iam.AccessKeyArgs{
User: loaderUser.Name,
})
if err != nil {
return nil, err
}
loaderPolicy := pulumi.All(args.BucketArn).ApplyT(func(values []interface{}) (string, error) {
bucketArn := values[0].(string)
return fmt.Sprintf(`{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": "%s"
},
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "%s/incoming/*"
}
]
}`, bucketArn, bucketArn), nil
}).(pulumi.StringOutput)
_, err = iam.NewUserPolicy(ctx, fmt.Sprintf("%s-stage-policy", name), &iam.UserPolicyArgs{
User: loaderUser.Name,
Policy: loaderPolicy,
})
if err != nil {
return nil, err
}
table, err := snowflake.NewTable(ctx, fmt.Sprintf("%s-table", name), &snowflake.TableArgs{
Database: args.Database,
Schema: args.SchemaName,
Name: pulumi.String("WEBHOOK_EVENTS"),
Columns: snowflake.TableColumnArray{
&snowflake.TableColumnArgs{Name: pulumi.String("FILENAME"), Type: pulumi.String("STRING"), Nullable: pulumi.Bool(false)},
&snowflake.TableColumnArgs{Name: pulumi.String("LAST_MODIFIED_AT"), Type: pulumi.String("TIMESTAMP_NTZ"), Nullable: pulumi.Bool(false)},
&snowflake.TableColumnArgs{Name: pulumi.String("CONTENT"), Type: pulumi.String("VARIANT")},
&snowflake.TableColumnArgs{Name: pulumi.String("LOADED_AT"), Type: pulumi.String("TIMESTAMP_NTZ")},
},
})
if err != nil {
return nil, err
}
stage, err := snowflake.NewStageExternalS3(ctx, fmt.Sprintf("%s-stage", name), &snowflake.StageExternalS3Args{
Database: args.Database,
Schema: args.SchemaName,
Name: pulumi.String("WEBHOOK_EVENTS_STAGE"),
Url: pulumi.Sprintf("s3://%s/incoming/", args.BucketName),
Credentials: &snowflake.StageExternalS3CredentialsArgs{
AwsKeyId: loaderKey.ID().ToStringOutput(),
AwsSecretKey: loaderKey.Secret,
},
})
if err != nil {
return nil, err
}
topic, err := sns.NewTopic(ctx, fmt.Sprintf("%s-events", name), nil)
if err != nil {
return nil, err
}
topicPolicyDocument := pulumi.All(args.BucketArn, topic.Arn).ApplyT(func(values []interface{}) (string, error) {
bucketArn := values[0].(string)
topicArn := values[1].(string)
return fmt.Sprintf(`{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": "s3.amazonaws.com" },
"Action": "SNS:Publish",
"Resource": "%s",
"Condition": {
"ArnLike": {
"aws:SourceArn": "%s"
}
}
}
]
}`, topicArn, bucketArn), nil
}).(pulumi.StringOutput)
topicPolicy, err := sns.NewTopicPolicy(ctx, fmt.Sprintf("%s-events-policy", name), &sns.TopicPolicyArgs{
Arn: topic.Arn,
Policy: topicPolicyDocument,
})
if err != nil {
return nil, err
}
pipe, err := snowflake.NewPipe(ctx, fmt.Sprintf("%s-pipe", name), &snowflake.PipeArgs{
Database: args.Database,
Schema: args.SchemaName,
Name: pulumi.String("WEBHOOK_EVENTS_PIPE"),
AutoIngest: pulumi.Bool(true),
AwsSnsTopicArn: topic.Arn,
CopyStatement: snowpipeCopyIntoStatement(args.Database, args.SchemaName),
})
if err != nil {
return nil, err
}
_, err = s3.NewBucketNotification(ctx, fmt.Sprintf("%s-notifications", name), &s3.BucketNotificationArgs{
Bucket: args.BucketId,
Topics: s3.BucketNotificationTopicArray{
&s3.BucketNotificationTopicArgs{
TopicArn: topic.Arn,
Events: pulumi.StringArray{pulumi.String("s3:ObjectCreated:*")},
FilterPrefix: pulumi.String("incoming/"),
},
},
}, pulumi.DependsOn([]pulumi.Resource{topicPolicy}))
if err != nil {
return nil, err
}
return &SnowpipePipeline{
StageName: stage.FullyQualifiedName,
TableName: table.Name,
PipeName: pipe.Name,
}, nil
}
Verify the data landed
After you send a test event, query Snowflake to confirm the records are visible:
SELECT FILENAME,
LAST_MODIFIED_AT,
CONTENT,
LOADED_AT
FROM LANDING_ZONE_WEBHOOKS.RAW.WEBHOOK_EVENTS
ORDER BY LOADED_AT DESC;
For this path, webhook payloads land in S3 first, then Snowflake auto-ingests new files.
Operating notes
- Keep the first table as a raw landing zone. Flatten and model into downstream tables later.
- Rotate the shared webhook secret when you roll senders or suspect exposure.
- Watch the landing storage path and Snowflake task history so failed loads and malformed payloads do not go unnoticed.
- Use a least-privilege Snowflake reader role for analysts instead of querying with the loading role.
- When you choose batch loading, tune
taskIntervalMinutesto match how quickly you want new files copied into Snowflake and how much warehouse activity you want between loads.