This guide shows how to land custom webhook events in Snowflake on Azure with Snowpipe auto-ingest.
It covers the storage, eventing, and Snowflake objects for this setup so you can get raw events flowing first and shape downstream tables later.
On Azure, this guide uses Azure Functions for the public handler and Azure Blob Storage for the raw landing zone. The shipped Azure path keeps payloads in Blob Storage, then loads them into Snowflake with Snowpipe auto-ingest.
The Snowpipe auto-ingest path keeps object storage in the middle. Your webhook handler writes files to cloud storage, and Snowflake auto-ingests them. It is a good fit when you want a durable raw archive or need a cloud-native queueing and retry story before Snowflake loads the data. This blueprint provisions the public Azure Function endpoint, the Blob landing container, a SAS-backed external Azure stage, an Event Grid subscription that forwards blob-created events into an Azure Storage queue, and a Snowflake auto-ingest pipe backed by a notification integration. You finish the one-time Azure consent and queue role grant after the first deploy by describing the notification integration in Snowflake.
This guide leaves sender registration in your hands, but the stack still creates the public endpoint and shared secret. Your sender posts JSON and signs the raw body with HMAC SHA256 in the X-Webhook-Signature header.
Quickstart
- Download the blueprint zip for your language below, or create a new Pulumi project with the same file layout shown in the Download section.
- Install dependencies for your selected language and configure Snowflake plus Azure.
- Deploy the stack to create the public Azure Function endpoint, the Blob landing container, the Event Grid backed storage queue, and the Snowflake loading objects.
- After
pulumi up, runDESC NOTIFICATION INTEGRATION WEBHOOK_EVENTS_NOTIFICATION_INTEGRATION;in Snowflake, copyAZURE_CONSENT_URLandAZURE_MULTI_TENANT_APP_NAME, open the consent URL, then grant Storage Queue Data Contributor on the queue named by exportednotificationQueueNameto that Snowflake app before you send a test event. - Register your sender with the exported endpoint and shared secret, then send a test event.
- Query the landing table in Snowflake to confirm the event arrived.
After the first test event, new rows usually appear in about two minutes.
Prerequisites
- a Pulumi account and the Pulumi CLI
- an Azure subscription where you can create resource groups, storage accounts, function apps, and RBAC assignments
- a Snowflake account where you can create databases, schemas, and loading roles
- a sender you control that can compute HMAC SHA256 for the raw body and POST JSON to the exported endpoint
- no extra Pulumi config is required. Your sender uses the exported
webhookUrlandwebhookSecret.
For the Pulumi language you selected:
Initialize your stack for Azure with:
pulumi stack init dev
pulumi config set azure-native:location eastus
Set up credentials with Pulumi ESC
This guide needs cloud credentials, Snowflake credentials, and any source-specific token required to provision the webhook. A single ESC environment is usually the smallest setup that still keeps secrets out of local files.
values:
azure:
login:
fn::open::azure-login:
oidc:
tenantId: 00000000-0000-0000-0000-000000000000
subscriptionId: 00000000-0000-0000-0000-000000000000
snowflake:
login:
fn::open::snowflake-login:
oidc:
account: <your-snowflake-account>
user: ESC_SERVICE_USER
organizationName: <your-org-name>
accountName: <your-account-name>
environmentVariables:
SNOWFLAKE_USER: ${snowflake.login.user}
SNOWFLAKE_TOKEN: ${snowflake.login.token}
pulumiConfig:
snowflake:organizationName: ${snowflake.organizationName}
snowflake:accountName: ${snowflake.accountName}
snowflake:authenticator: OAUTH
snowflake:role: PULUMI_DEPLOYER
Then reference it from your stack config:
environment:
- <your-org>/<your-environment>
config:
webhook-to-snowflake:database: LANDING_ZONE_WEBHOOKS
What you get in the download
The downloadable example zip includes:
Pulumi.yaml- the Pulumi program, dependency files, cloud runtime support files, and reusable components for the language you pick below
- a README with a shorter quick start for this exact setup
Azure Snowpipe setups export the queue name, queue URL, and notification integration name you need for the one-time consent and queue-role grant.
__main__.pyas the Pulumi entrypointcomponents/webhook_ingestion.pyfor the public webhook endpointcomponents/snowpipe_pipeline.pyfor the Snowflake loading pathfunctionapp/host.jsonfor the Azure Functions hostfunctionapp/requirements.txtfor the Azure Functions Python dependenciesfunctionapp/Webhook/__init__.pyfor request validation and blob writesfunctionapp/Webhook/function.jsonfor the public HTTP trigger routerequirements.txtfor the root Pulumi project
index.tsas the Pulumi entrypointcomponents/webhook_ingestion.tsfor the public webhook endpointcomponents/snowpipe_pipeline.tsfor the Snowflake loading pathfunctionapp/host.jsonfor the Azure Functions hostfunctionapp/requirements.txtfor the Azure Functions Python dependenciesfunctionapp/Webhook/__init__.pyfor request validation and blob writesfunctionapp/Webhook/function.jsonfor the public HTTP trigger routepackage.jsonandtsconfig.jsonfor the root Pulumi project
The next sections show the same entrypoint and component files that ship in the download.
Blueprint Pulumi program
This blueprint shows the full resource wiring for the Azure Snowpipe auto-ingest path with a Custom sender source. The downloadable repo uses the same entrypoint and component files shown below.
import pulumi
import pulumi_azure_native as azure_native
import pulumi_random as random
import pulumi_snowflake as snowflake
from components.snowpipe_pipeline import SnowpipePipeline
from components.webhook_ingestion import WebhookIngestion
config = pulumi.Config()
database_name = config.get("database") or "LANDING_ZONE_WEBHOOKS"
location = pulumi.Config("azure-native").require("location")
resource_group = azure_native.resources.ResourceGroup(
"webhook-rg",
resource_group_name=f"webhook-to-snowflake-{pulumi.get_stack()}",
location=location,
)
storage_suffix = random.RandomString(
"storage-suffix",
length=8,
special=False,
upper=False,
)
storage_account = azure_native.storage.StorageAccount(
"landingstorage",
account_name=storage_suffix.result.apply(lambda value: f"w2sf{value}"),
allow_blob_public_access=False,
allow_shared_key_access=True,
kind=azure_native.storage.Kind.STORAGE_V2,
location=location,
resource_group_name=resource_group.name,
sku=azure_native.storage.SkuArgs(name=azure_native.storage.SkuName.STANDARD_LRS),
)
landing_container = azure_native.storage.BlobContainer(
"landing-container",
account_name=storage_account.name,
container_name="incoming",
resource_group_name=resource_group.name,
)
storage_keys = azure_native.storage.list_storage_account_keys_output(
account_name=storage_account.name,
resource_group_name=resource_group.name,
)
storage_connection_string = pulumi.Output.all(storage_account.name, storage_keys.keys).apply(
lambda args: f"DefaultEndpointsProtocol=https;AccountName={args[0]};AccountKey={args[1][0].value};EndpointSuffix=core.windows.net"
)
database = snowflake.Database("landing-db", name=database_name)
schema = snowflake.Schema("raw-schema", name="RAW", database=database.name)
webhook_secret = random.RandomPassword("webhook-secret", length=32, special=False)
ingestion = WebhookIngestion(
"source-webhooks",
location=resource_group.location,
resource_group_name=resource_group.name,
storage_account_name=storage_account.name,
blob_endpoint=storage_account.primary_endpoints.blob,
landing_container_name=landing_container.name,
storage_connection_string=storage_connection_string,
webhook_secret=webhook_secret.result,
)
pipeline = SnowpipePipeline(
"source-events",
database=database.name,
schema_name=schema.name,
resource_group_name=resource_group.name,
storage_account_id=storage_account.id,
storage_account_name=storage_account.name,
landing_container_name=landing_container.name,
blob_endpoint=storage_account.primary_endpoints.blob,
)
endpoint_url = ingestion.endpoint_url
pulumi.export("webhookUrl", endpoint_url)
pulumi.export("webhookSecret", webhook_secret.result)
pulumi.export("publicEndpointUrl", ingestion.endpoint_url)
pulumi.export("landingContainerName", landing_container.name)
pulumi.export("notificationQueueName", pipeline.notification_queue_name)
pulumi.export("notificationQueueUrl", pipeline.notification_queue_url)
pulumi.export("notificationIntegrationName", pipeline.notification_integration_name)
pulumi.export("stageName", pipeline.stage_name)
pulumi.export("tableName", pipeline.table_name)
pulumi.export("pipeName", pipeline.pipe_name)
import * as azure_native from "@pulumi/azure-native";
import * as pulumi from "@pulumi/pulumi";
import * as random from "@pulumi/random";
import * as snowflake from "@pulumi/snowflake";
import { SnowpipePipeline } from "./components/snowpipe_pipeline";
import { WebhookIngestion } from "./components/webhook_ingestion";
const config = new pulumi.Config();
const databaseName = config.get("database") ?? "LANDING_ZONE_WEBHOOKS";
const location = new pulumi.Config("azure-native").require("location");
const resourceGroup = new azure_native.resources.ResourceGroup("webhook-rg", {
resourceGroupName: `webhook-to-snowflake-${pulumi.getStack()}`,
location,
});
const storageSuffix = new random.RandomString("storage-suffix", {
length: 8,
special: false,
upper: false,
});
const storageAccount = new azure_native.storage.StorageAccount("landingstorage", {
accountName: storageSuffix.result.apply((value) => `w2sf${value}`),
allowBlobPublicAccess: false,
allowSharedKeyAccess: true,
kind: azure_native.storage.Kind.StorageV2,
location,
resourceGroupName: resourceGroup.name,
sku: {
name: azure_native.storage.SkuName.Standard_LRS,
},
});
const landingContainer = new azure_native.storage.BlobContainer("landing-container", {
accountName: storageAccount.name,
containerName: "incoming",
resourceGroupName: resourceGroup.name,
});
const storageKeys = azure_native.storage.listStorageAccountKeysOutput({
accountName: storageAccount.name,
resourceGroupName: resourceGroup.name,
});
const storageConnectionString = pulumi.all([storageAccount.name, storageKeys.keys]).apply(([accountName, keys]) =>
`DefaultEndpointsProtocol=https;AccountName=${accountName};AccountKey=${keys[0].value};EndpointSuffix=core.windows.net`,
);
const database = new snowflake.Database("landing-db", { name: databaseName });
const schema = new snowflake.Schema("raw-schema", {
name: "RAW",
database: database.name,
});
const sharedSecret = new random.RandomPassword("webhook-secret", {
length: 32,
special: false,
});
const ingestion = new WebhookIngestion("source-webhooks", {
location: resourceGroup.location,
resourceGroupName: resourceGroup.name,
storageAccountName: storageAccount.name,
blobEndpoint: storageAccount.primaryEndpoints.blob,
landingContainerName: landingContainer.name,
storageConnectionString,
webhookSecret: sharedSecret.result,
});
const pipeline = new SnowpipePipeline("source-events", {
database: database.name,
schemaName: schema.name,
resourceGroupName: resourceGroup.name,
storageAccountId: storageAccount.id,
storageAccountName: storageAccount.name,
landingContainerName: landingContainer.name,
blobEndpoint: storageAccount.primaryEndpoints.blob,
});
const endpointUrl = ingestion.endpointUrl;
export const webhookUrl = endpointUrl;
export const webhookSecret = sharedSecret.result;
export const publicEndpointUrl = ingestion.endpointUrl;
export const landingContainerName = landingContainer.name;
export const notificationQueueName = pipeline.notificationQueueName;
export const notificationQueueUrl = pipeline.notificationQueueUrl;
export const notificationIntegrationName = pipeline.notificationIntegrationName;
export const stageName = pipeline.stageName;
export const tableName = pipeline.tableName;
export const pipeName = pipeline.pipeName;
Reusable components
The entrypoint stays small because the real ingestion work lives in reusable modules. These are the same component files packaged in the downloadable blueprint for this setup.
components/webhook_ingestion.py
Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.
from __future__ import annotations
from dataclasses import dataclass
import pulumi
import pulumi_azure_native as azure_native
@dataclass
class WebhookIngestion:
endpoint_url: pulumi.Output[str]
app_name: pulumi.Output[str]
def __init__(
self,
name: str,
*,
location: pulumi.Input[str],
resource_group_name: pulumi.Input[str],
storage_account_name: pulumi.Input[str],
blob_endpoint: pulumi.Input[str],
landing_container_name: pulumi.Input[str],
storage_connection_string: pulumi.Input[str],
webhook_secret: pulumi.Input[str],
) -> None:
deployment_container = azure_native.storage.BlobContainer(
f"{name}-deployments",
account_name=storage_account_name,
container_name="deploymentpackage",
resource_group_name=resource_group_name,
)
azure_native.storage.Blob(
f"{name}-package",
account_name=storage_account_name,
blob_name="functionapp.zip",
container_name=deployment_container.name,
content_type="application/zip",
resource_group_name=resource_group_name,
source=pulumi.AssetArchive(
{
"host.json": pulumi.FileAsset("functionapp/host.json"),
"requirements.txt": pulumi.FileAsset("functionapp/requirements.txt"),
"Webhook/function.json": pulumi.FileAsset(
"functionapp/Webhook/function.json"
),
"Webhook/__init__.py": pulumi.FileAsset(
"functionapp/Webhook/__init__.py"
),
}
),
type=azure_native.storage.BlobType.BLOCK,
)
plan = azure_native.web.AppServicePlan(
f"{name}-plan",
kind="functionapp",
location=location,
resource_group_name=resource_group_name,
reserved=True,
sku=azure_native.web.SkuDescriptionArgs(
name="FC1",
tier="FlexConsumption",
),
)
function_app = azure_native.web.WebApp(
f"{name}-app",
kind="functionapp,linux",
location=location,
https_only=True,
public_network_access="Enabled",
resource_group_name=resource_group_name,
server_farm_id=plan.id,
function_app_config=azure_native.web.FunctionAppConfigArgs(
deployment=azure_native.web.FunctionsDeploymentArgs(
storage=azure_native.web.FunctionsDeploymentStorageArgs(
authentication=azure_native.web.FunctionsDeploymentAuthenticationArgs(
storage_account_connection_string_name="DEPLOYMENT_STORAGE_CONNECTION_STRING",
type=azure_native.web.AuthenticationType.STORAGE_ACCOUNT_CONNECTION_STRING,
),
type=azure_native.web.FunctionsDeploymentStorageType.BLOB_CONTAINER,
value=pulumi.Output.format("{0}{1}", blob_endpoint, deployment_container.name),
)
),
runtime=azure_native.web.FunctionsRuntimeArgs(
name=azure_native.web.RuntimeName.PYTHON,
version="3.11",
),
scale_and_concurrency=azure_native.web.FunctionsScaleAndConcurrencyArgs(
instance_memory_mb=2048,
maximum_instance_count=40,
),
),
site_config=azure_native.web.SiteConfigArgs(
app_settings=[
azure_native.web.NameValuePairArgs(
name="AzureWebJobsStorage",
value=storage_connection_string,
),
azure_native.web.NameValuePairArgs(
name="DEPLOYMENT_STORAGE_CONNECTION_STRING",
value=storage_connection_string,
),
azure_native.web.NameValuePairArgs(
name="LANDING_CONNECTION_STRING",
value=storage_connection_string,
),
azure_native.web.NameValuePairArgs(
name="LANDING_CONTAINER_NAME",
value=landing_container_name,
),
azure_native.web.NameValuePairArgs(
name="WEBHOOK_SECRET",
value=webhook_secret,
),
azure_native.web.NameValuePairArgs(
name="FUNCTIONS_EXTENSION_VERSION",
value="~4",
),
]
),
)
self.endpoint_url = function_app.default_host_name.apply(
lambda host: f"https://{host}/api/webhook"
)
self.app_name = function_app.name
components/snowpipe_pipeline.py
Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the Snowpipe auto-ingest loading path.
from __future__ import annotations
from dataclasses import dataclass
import pulumi
import pulumi_azure_native as azure_native
import pulumi_snowflake as snowflake
def _copy_into_statement(database: pulumi.Input[str], schema_name: pulumi.Input[str]) -> pulumi.Output[str]:
return pulumi.Output.all(database, schema_name).apply(
lambda args: (
f'COPY INTO "{args[0]}"."{args[1]}"."WEBHOOK_EVENTS" '
f'FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() '
f'FROM @"{args[0]}"."{args[1]}"."WEBHOOK_EVENTS_STAGE") '
"FILE_FORMAT = (TYPE = JSON)"
)
)
@dataclass
class SnowpipePipeline:
stage_name: pulumi.Output[str]
table_name: pulumi.Output[str]
pipe_name: pulumi.Output[str]
notification_queue_name: pulumi.Output[str]
notification_queue_url: pulumi.Output[str]
notification_integration_name: pulumi.Output[str]
def __init__(
self,
name: str,
*,
database: pulumi.Input[str],
schema_name: pulumi.Input[str],
resource_group_name: pulumi.Input[str],
storage_account_id: pulumi.Input[str],
storage_account_name: pulumi.Input[str],
landing_container_name: pulumi.Input[str],
blob_endpoint: pulumi.Input[str],
) -> None:
preview_provider = snowflake.Provider(
f"{name}-preview-provider",
preview_features_enabled=[
"snowflakeNotificationIntegrationResource",
"snowflakePipeResource",
"snowflakeStageExternalAzureResource",
"snowflakeTableResource",
],
)
preview_opts = pulumi.ResourceOptions(provider=preview_provider)
stage_url = pulumi.Output.all(blob_endpoint, landing_container_name).apply(
lambda args: f"azure://{args[0].replace('https://', '').rstrip('/')}/{args[1]}/incoming/"
)
queue_url = pulumi.Output.from_input(storage_account_name).apply(
lambda account_name: f"https://{account_name}.queue.core.windows.net/webhook-events"
)
subject_prefix = pulumi.Output.from_input(landing_container_name).apply(
lambda container: f"/blobServices/default/containers/{container}/blobs/incoming/"
)
storage_keys = azure_native.storage.list_storage_account_keys_output(
account_name=storage_account_name,
resource_group_name=resource_group_name,
)
stage_sas = pulumi.Output.all(
storage_account_name,
landing_container_name,
resource_group_name,
storage_keys.keys,
).apply(
lambda args: azure_native.storage.list_storage_account_service_sas(
account_name=args[0],
canonicalized_resource=f"/blob/{args[0]}/{args[1]}",
key_to_sign=args[3][0].value,
permissions="rl",
protocols="https",
resource="c",
resource_group_name=args[2],
shared_access_expiry_time="2035-01-01T00:00:00Z",
).service_sas_token
)
table = snowflake.Table(
f"{name}-table",
database=database,
schema=schema_name,
name="WEBHOOK_EVENTS",
columns=[
snowflake.TableColumnArgs(name="FILENAME", type="STRING", nullable=False),
snowflake.TableColumnArgs(
name="LAST_MODIFIED_AT",
type="TIMESTAMP_NTZ",
nullable=False,
),
snowflake.TableColumnArgs(name="CONTENT", type="VARIANT"),
snowflake.TableColumnArgs(name="LOADED_AT", type="TIMESTAMP_NTZ"),
],
opts=preview_opts,
)
stage = snowflake.StageExternalAzure(
f"{name}-stage",
name="WEBHOOK_EVENTS_STAGE",
database=database,
schema=schema_name,
url=stage_url,
credentials=snowflake.StageExternalAzureCredentialsArgs(
azure_sas_token=stage_sas,
),
opts=preview_opts,
)
queue = azure_native.storage.Queue(
f"{name}-queue",
account_name=storage_account_name,
queue_name="webhook-events",
resource_group_name=resource_group_name,
)
event_subscription = azure_native.eventgrid.EventSubscription(
f"{name}-subscription",
destination=azure_native.eventgrid.StorageQueueEventSubscriptionDestinationArgs(
endpoint_type="StorageQueue",
queue_name=queue.name,
resource_id=storage_account_id,
),
event_subscription_name="webhook-events-created",
filter=azure_native.eventgrid.EventSubscriptionFilterArgs(
included_event_types=["Microsoft.Storage.BlobCreated"],
is_subject_case_sensitive=False,
subject_begins_with=subject_prefix,
),
scope=storage_account_id,
opts=pulumi.ResourceOptions(depends_on=[queue]),
)
tenant_id = azure_native.authorization.get_client_config_output().tenant_id
notification_integration = snowflake.NotificationIntegration(
f"{name}-notification-integration",
name="WEBHOOK_EVENTS_NOTIFICATION_INTEGRATION",
enabled=True,
notification_provider="AZURE_STORAGE_QUEUE",
azure_storage_queue_primary_uri=queue_url,
azure_tenant_id=tenant_id,
opts=pulumi.ResourceOptions.merge(
preview_opts,
pulumi.ResourceOptions(depends_on=[queue]),
),
)
pipe = snowflake.Pipe(
f"{name}-pipe",
database=database,
schema=schema_name,
name="WEBHOOK_EVENTS_PIPE",
auto_ingest=True,
integration=notification_integration.name,
copy_statement=_copy_into_statement(database, schema_name),
opts=pulumi.ResourceOptions.merge(
preview_opts,
pulumi.ResourceOptions(depends_on=[event_subscription, notification_integration]),
),
)
self.stage_name = stage.fully_qualified_name
self.table_name = table.name
self.pipe_name = pipe.name
self.notification_queue_name = queue.name
self.notification_queue_url = queue_url
self.notification_integration_name = notification_integration.name
components/webhook_ingestion.ts
Accepts the public webhook request, validates the signature, normalizes the payload, and writes the raw event into the landing path for this setup.
import * as azure_native from "@pulumi/azure-native";
import * as pulumi from "@pulumi/pulumi";
export interface WebhookIngestionArgs {
location: pulumi.Input<string>;
resourceGroupName: pulumi.Input<string>;
storageAccountName: pulumi.Input<string>;
blobEndpoint: pulumi.Input<string>;
landingContainerName: pulumi.Input<string>;
storageConnectionString: pulumi.Input<string>;
webhookSecret: pulumi.Input<string>;
}
export class WebhookIngestion {
public readonly endpointUrl: pulumi.Output<string>;
public readonly appName: pulumi.Output<string>;
constructor(name: string, args: WebhookIngestionArgs) {
const deploymentContainer = new azure_native.storage.BlobContainer(`${name}-deployments`, {
accountName: args.storageAccountName,
containerName: "deploymentpackage",
resourceGroupName: args.resourceGroupName,
});
new azure_native.storage.Blob(`${name}-package`, {
accountName: args.storageAccountName,
blobName: "functionapp.zip",
containerName: deploymentContainer.name,
contentType: "application/zip",
resourceGroupName: args.resourceGroupName,
source: new pulumi.asset.AssetArchive({
"host.json": new pulumi.asset.FileAsset("functionapp/host.json"),
"requirements.txt": new pulumi.asset.FileAsset("functionapp/requirements.txt"),
"Webhook/function.json": new pulumi.asset.FileAsset("functionapp/Webhook/function.json"),
"Webhook/__init__.py": new pulumi.asset.FileAsset("functionapp/Webhook/__init__.py"),
}),
type: azure_native.storage.BlobType.Block,
});
const plan = new azure_native.web.AppServicePlan(`${name}-plan`, {
kind: "functionapp",
location: args.location,
resourceGroupName: args.resourceGroupName,
reserved: true,
sku: {
name: "FC1",
tier: "FlexConsumption",
},
});
const app = new azure_native.web.WebApp(`${name}-app`, {
kind: "functionapp,linux",
location: args.location,
httpsOnly: true,
publicNetworkAccess: "Enabled",
resourceGroupName: args.resourceGroupName,
serverFarmId: plan.id,
functionAppConfig: {
deployment: {
storage: {
authentication: {
storageAccountConnectionStringName: "DEPLOYMENT_STORAGE_CONNECTION_STRING",
type: azure_native.web.AuthenticationType.StorageAccountConnectionString,
},
type: azure_native.web.FunctionsDeploymentStorageType.BlobContainer,
value: pulumi.interpolate`${args.blobEndpoint}${deploymentContainer.name}`,
},
},
runtime: {
name: azure_native.web.RuntimeName.Python,
version: "3.11",
},
scaleAndConcurrency: {
instanceMemoryMB: 2048,
maximumInstanceCount: 40,
},
},
siteConfig: {
appSettings: [
{ name: "AzureWebJobsStorage", value: args.storageConnectionString },
{ name: "DEPLOYMENT_STORAGE_CONNECTION_STRING", value: args.storageConnectionString },
{ name: "LANDING_CONNECTION_STRING", value: args.storageConnectionString },
{ name: "LANDING_CONTAINER_NAME", value: args.landingContainerName },
{ name: "WEBHOOK_SECRET", value: args.webhookSecret },
{ name: "FUNCTIONS_EXTENSION_VERSION", value: "~4" },
],
},
});
this.endpointUrl = app.defaultHostName.apply((host) => `https://${host}/api/webhook`);
this.appName = app.name;
}
}
components/snowpipe_pipeline.ts
Creates the Snowflake-side loading resources for this setup: the landing stage, the destination table, and the Snowpipe auto-ingest loading path.
import * as azure_native from "@pulumi/azure-native";
import * as pulumi from "@pulumi/pulumi";
import * as snowflake from "@pulumi/snowflake";
export interface SnowpipePipelineArgs {
database: pulumi.Input<string>;
schemaName: pulumi.Input<string>;
resourceGroupName: pulumi.Input<string>;
storageAccountId: pulumi.Input<string>;
storageAccountName: pulumi.Input<string>;
landingContainerName: pulumi.Input<string>;
blobEndpoint: pulumi.Input<string>;
}
function copyIntoStatement(database: pulumi.Input<string>, schemaName: pulumi.Input<string>) {
return pulumi.all([database, schemaName]).apply(([currentDatabase, currentSchema]) =>
`COPY INTO "${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS" ` +
`FROM (SELECT metadata$filename, metadata$file_last_modified, $1, sysdate() ` +
`FROM @"${currentDatabase}"."${currentSchema}"."WEBHOOK_EVENTS_STAGE") ` +
"FILE_FORMAT = (TYPE = JSON)",
);
}
export class SnowpipePipeline {
public readonly stageName: pulumi.Output<string>;
public readonly tableName: pulumi.Output<string>;
public readonly pipeName: pulumi.Output<string>;
public readonly notificationQueueName: pulumi.Output<string>;
public readonly notificationQueueUrl: pulumi.Output<string>;
public readonly notificationIntegrationName: pulumi.Output<string>;
constructor(name: string, args: SnowpipePipelineArgs) {
const previewProvider = new snowflake.Provider(`${name}-preview-provider`, {
previewFeaturesEnabled: [
"snowflakeNotificationIntegrationResource",
"snowflakePipeResource",
"snowflakeStageExternalAzureResource",
"snowflakeTableResource",
],
});
const previewOpts = { provider: previewProvider };
const stageUrl = pulumi
.all([args.blobEndpoint, args.landingContainerName])
.apply(([endpoint, container]) => `azure://${endpoint.replace("https://", "").replace(/\/$/, "")}/${container}/incoming/`);
const queueUrl = pulumi
.output(args.storageAccountName)
.apply((accountName) => `https://${accountName}.queue.core.windows.net/webhook-events`);
const subjectPrefix = pulumi
.output(args.landingContainerName)
.apply((containerName) => `/blobServices/default/containers/${containerName}/blobs/incoming/`);
const storageKeys = azure_native.storage.listStorageAccountKeysOutput({
accountName: args.storageAccountName,
resourceGroupName: args.resourceGroupName,
});
const stageSas = pulumi
.all([args.storageAccountName, args.landingContainerName, args.resourceGroupName, storageKeys.keys])
.apply(([accountName, containerName, resourceGroupName, keys]) =>
azure_native.storage.listStorageAccountServiceSAS({
accountName,
canonicalizedResource: `/blob/${accountName}/${containerName}`,
keyToSign: keys[0].value,
permissions: "rl",
protocols: "https",
resource: "c",
resourceGroupName,
sharedAccessExpiryTime: "2035-01-01T00:00:00Z",
}).then((result) => result.serviceSasToken),
);
const table = new snowflake.Table(`${name}-table`, {
database: args.database,
schema: args.schemaName,
name: "WEBHOOK_EVENTS",
columns: [
{ name: "FILENAME", type: "STRING", nullable: false },
{ name: "LAST_MODIFIED_AT", type: "TIMESTAMP_NTZ", nullable: false },
{ name: "CONTENT", type: "VARIANT" },
{ name: "LOADED_AT", type: "TIMESTAMP_NTZ" },
],
}, previewOpts);
const stage = new snowflake.StageExternalAzure(`${name}-stage`, {
name: "WEBHOOK_EVENTS_STAGE",
database: args.database,
schema: args.schemaName,
url: stageUrl,
credentials: {
azureSasToken: stageSas,
},
}, previewOpts);
const queue = new azure_native.storage.Queue(`${name}-queue`, {
accountName: args.storageAccountName,
queueName: "webhook-events",
resourceGroupName: args.resourceGroupName,
});
const eventSubscription = new azure_native.eventgrid.EventSubscription(`${name}-subscription`, {
destination: {
endpointType: "StorageQueue",
queueName: queue.name,
resourceId: args.storageAccountId,
},
eventSubscriptionName: "webhook-events-created",
filter: {
includedEventTypes: ["Microsoft.Storage.BlobCreated"],
isSubjectCaseSensitive: false,
subjectBeginsWith: subjectPrefix,
},
scope: args.storageAccountId,
}, { dependsOn: [queue] });
const tenantId = azure_native.authorization.getClientConfigOutput().tenantId;
const notificationIntegration = new snowflake.NotificationIntegration(`${name}-notification-integration`, {
name: "WEBHOOK_EVENTS_NOTIFICATION_INTEGRATION",
enabled: true,
notificationProvider: "AZURE_STORAGE_QUEUE",
azureStorageQueuePrimaryUri: queueUrl,
azureTenantId: tenantId,
}, pulumi.mergeOptions(previewOpts, { dependsOn: [queue] }));
const pipe = new snowflake.Pipe(`${name}-pipe`, {
database: args.database,
schema: args.schemaName,
name: "WEBHOOK_EVENTS_PIPE",
autoIngest: true,
integration: notificationIntegration.name,
copyStatement: copyIntoStatement(args.database, args.schemaName),
}, pulumi.mergeOptions(previewOpts, { dependsOn: [eventSubscription, notificationIntegration] }));
this.stageName = stage.fullyQualifiedName;
this.tableName = table.name;
this.pipeName = pipe.name;
this.notificationQueueName = queue.name;
this.notificationQueueUrl = queueUrl;
this.notificationIntegrationName = notificationIntegration.name;
}
}
Verify the data landed
After you send a test event, query Snowflake to confirm the records are visible:
SELECT FILENAME,
LAST_MODIFIED_AT,
CONTENT,
LOADED_AT
FROM LANDING_ZONE_WEBHOOKS.RAW.WEBHOOK_EVENTS
ORDER BY LOADED_AT DESC;
For this path, webhook payloads land in Azure Blob Storage first, then Snowflake auto-ingests new files from the Event Grid backed storage queue.
Operating notes
- Keep the first table as a raw landing zone. Flatten and model into downstream tables later.
- Rotate the shared webhook secret when you roll senders or suspect exposure.
- Watch the landing storage path and Snowflake task history so failed loads and malformed payloads do not go unnoticed.
- Use a least-privilege Snowflake reader role for analysts instead of querying with the loading role.
- When you choose batch loading, tune
taskIntervalMinutesto match how quickly you want new files copied into Snowflake and how much warehouse activity you want between loads. - Azure Snowpipe variants need a one-time Azure consent plus Storage Queue Data Contributor on the queue named by exported
notificationQueueNamefor the Snowflake app returned byDESC NOTIFICATION INTEGRATIONbefore auto-ingest starts reading queue messages.