Using Snowpipe for Continuous Data Ingestion
❄️ What is Snowpipe?
Snowpipe is Snowflake's continuous data ingestion service. It allows you to automatically load data into Snowflake tables as soon as new files appear in a cloud storage location (e.g., AWS S3, Azure Blob Storage, or GCP Cloud Storage).
๐ Key Benefits of Snowpipe
Near real-time ingestion
Automatic loading of new files
Serverless & scalable
Supports schema evolution and data transformation
Pay-as-you-use billing model (per data ingested)
๐ฆ Components Involved
Cloud Storage (e.g., S3 bucket)
Stage (external or internal storage reference)
Table (destination table in Snowflake)
File Format (defines how data is parsed)
Snowpipe (the ingestion pipeline)
Notification Integration (optional but recommended for automation)
๐ ️ Step-by-Step: Set Up Snowpipe with AWS S3
✅ Step 1: Create a Target Table
sql
Copy
Edit
CREATE TABLE logs (
id INT,
message STRING,
timestamp TIMESTAMP
);
✅ Step 2: Create a File Format
sql
Copy
Edit
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = 'CSV'
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SKIP_HEADER = 1;
✅ Step 3: Create an External Stage (to S3)
sql
Copy
Edit
CREATE OR REPLACE STAGE my_s3_stage
URL = 's3://your-bucket-name/data/'
STORAGE_INTEGRATION = my_s3_integration
FILE_FORMAT = my_csv_format;
๐ You must create and configure a STORAGE_INTEGRATION first to securely access S3. Let me know if you need help setting this up.
✅ Step 4: Create the Snowpipe
sql
Copy
Edit
CREATE OR REPLACE PIPE my_snowpipe
AS
COPY INTO logs
FROM @my_s3_stage
FILE_FORMAT = (FORMAT_NAME = my_csv_format);
✅ Step 5: Enable Event Notifications (Optional but Recommended)
For auto-triggering Snowpipe, configure event notifications in your cloud provider:
AWS: Configure S3 → SNS → SQS → Snowflake
Azure: Use Event Grid
GCP: Use Pub/Sub
Then, link it to Snowpipe using:
sql
Copy
Edit
ALTER PIPE my_snowpipe SET PIPE_EXECUTION_PAUSED = FALSE;
✅ Step 6: Load Data Manually (Optional)
If you want to load manually instead of waiting for automation:
sql
Copy
Edit
ALTER PIPE my_snowpipe REFRESH;
This tells Snowpipe to check for new files and load them.
๐ Monitor Snowpipe Activity
Check load history with:
sql
Copy
Edit
SELECT * FROM SNOWPIPE_USAGE_HISTORY
WHERE PIPE_NAME = 'MY_SNOWPIPE';
Or:
sql
Copy
Edit
SELECT * FROM INFORMATION_SCHEMA.LOAD_HISTORY
WHERE PIPE_NAME = 'MY_SNOWPIPE';
๐ง Best Practices
Best Practice Why It Matters
Use file naming conventions Helps Snowpipe track what’s loaded
Monitor load errors with LOAD_HISTORY Diagnose ingestion issues quickly
Limit file size to 10–250 MB Optimizes performance and cost
Use compressed files (e.g., .gz, .snappy) Reduces storage and speeds up ingestion
Set retention policies on cloud storage Avoid reprocessing and clutter
๐งช Bonus: Automate Everything with Python (Optional)
Use the Snowflake Python Connector or Snowpark to:
Trigger Snowpipe programmatically
Automate table creation and ingestion workflows
Monitor and log errors
✅ Summary
Step Description
1. Create table Target for ingested data
2. Create file format Define CSV, JSON, etc.
3. Create stage Reference to cloud storage
4. Create Snowpipe Data ingestion definition
5. Automate via events Trigger loading as files arrive
6. Monitor loads Track ingestion success/failure
Learn Data Engineering Snowflake course
Read More
A Step-by-Step Guide to Creating Tables in Snowflake
Snowflake’s Virtual Warehouses Explained
Visit Our Quality Thought Training Institute in Hyderabad
Comments
Post a Comment