In a landscape where every moment counts, a single mistake can make or break a company's trajectory. Ignoring data-driven insights might just be the knockout punch that sends a business into a tailspin of oblivion. Amid this struggle for survival, Snowflake triggers emerge as a gateway to the future of data processing.
Automated data management techniques like Snowflake triggers are not just useful; they're downright essential to keep things running smoothly and efficiently. With a staggering 207% growth in the number of organizations using data across all three major public clouds, triggers emerge as the true MVPs.
If that caught your attention, we’ve got some more insights for you. This guide will break down Snowflake triggers for you and show you exactly how to master Streams and Tasks. No more complexities – just straightforward and effective usage.
Understanding Snowflake Triggers: The Path To Data Excellence
When we refer to "Snowflake Triggers," we're actually talking about a unique approach Snowflake takes toward automating database actions. In most traditional databases, a trigger is a set of instructions that are executed when a specific event occurs, like an update to a data row.
However, in Snowflake, the concept of triggers as we know it doesn't exist. Instead, Snowflake uses a different method to achieve similar outcomes using a combination of features called Streams and Tasks.
The Snowflake database service does not provide native support for triggers in the traditional sense but this doesn't limit its capabilities. It uses Streams and Tasks to replicate the functionalities of triggers in a more efficient and scalable way.
What does this mean for you as a developer or data engineer? You get the functionality you would from a trigger but with more flexibility and control. It lets you automate specific actions based on changes in your data, much like traditional triggers. However, you get to do it in a more efficient, scalable way that's built for the demands of big data and cloud computing.
Let's now see what these Streams and Tasks are.
What Are Snowflake Streams?
In Snowflake, a Stream is an object type that functions to track and record changes in a table over a certain period. Think of it as a tool that lets you monitor alterations within your table data, be it updated rows or different time intervals.
With Streams, developers can query and extract precise information from tables. This Change Data Capture (CDC) capability of Streams makes it a valuable resource for managing data changes efficiently.
What Are Snowflake Tasks?
On the other hand, a Task in Snowflake is an object type used to set up recurring schedules for executing SQL statements, including those that fetch data from stored procedures. Tasks can run continuously and simultaneously which makes them perfect for handling complex, periodic data processing. They provide an automated way to regularly carry out specific SQL operations to improve efficiency and consistency in data management.
In most data pipelines, which are typically continuous, Tasks work together with Streams. A Task can check if a Stream holds any changed data for a table. If no changes are found, the Task determines whether the pipeline used, altered, or skipped the data. This symbiotic relationship between Tasks and Streams in Snowflake mimics traditional triggers for efficient and scalable data processing.
Leveraging Snowflake's Streams & Tasks: A General Blueprint
Let’s talk about the steps and the general flow of the process.
Database & Schema Setup
First off, set up your database and schema. This ensures that all your subsequent operations - creating tables, streams, and tasks - are performed in the correct context.
Source Table Creation
Create the source table which holds the data you want to track changes for. This includes an update timestamp field that logs when each record was last updated.
History Table Creation
Establish a history table. This table serves as a record of changes to the source table. It includes fields for start time and end time for tracking the validity period of each record, and a current flag to indicate the current record for each key.
Create a Stream on the source table. A Stream is a Snowflake object that logs changes (inserts, updates, and deletes) to a table. You can select these changes as if they were rows in a table.
Change Logic View
Set up a View that defines the logic for handling the changes captured by the Stream. This View generates the data to be inserted or updated in the history table and assigns a DML type ('I', 'U', or 'D') to each record based on the type of change.
Implement a MERGE statement to integrate the changes defined in the View into the history table. This statement updates existing records, deletes records, or inserts new records based on the DML type of each record in the View.
Role Creation For Task Execution
Create a role for task administration and give it the EXECUTE TASK privilege. This role is used to execute tasks in Snowflake.
Warehouse Creation For Task
Establish a warehouse that will provide the computing resources for the task to run.
Construct a Task that schedules the MERGE statement to run periodically. The task uses the data warehouse you've set up and is scheduled to run whenever data is in the Stream.
Finally, resume the task so it starts running. By default, tasks are created in a suspended state and must be resumed before they start running.
The best practice is to customize the table names and schemas to fit your requirements and then use this robust blueprint for all your data warehousing operations in Snowflake.
The real power of Snowflake's Streams and Tasks shines when we use them in real-life situations. Let's move beyond the theoretical understanding and see their application in different situations for a better understanding of their utility and effectiveness.
2 Examples Of Snowflake Streams & Tasks: A Practical Guide
Here are 2 examples that will show how Streams and Tasks can be used in different business contexts.
Example 1: Real-time Employee Data Management With Snowflake Streams & Tasks
Let’s consider the HR department of a firm. Here, we’ll specifically focus on an employees' table. We want to develop a real-time change-tracking mechanism for employee data, a common need across many organizations for monitoring new hires, tracking departures, or documenting departmental changes.
For this, we'll implement a stream on our employees' table to capture any data modifications. We'll also create an employees_history table to store a detailed history of the employee records. A regular Task in Snowflake merges the changes that our stream captured into the history table to keep the history data current.
Let's walk through the SQL scripts that construct this robust real-time data tracking system in Snowflake.
Database & Schema Setup
sqlCREATE DATABASE company; USE DATABASE company; CREATE SCHEMA hr; USE SCHEMA hr;
This SQL command generates a new database named ‘company’ and a schema named ‘hr’. It then switches to this database and schema for subsequent operations.
Source Table Creation
sqlCREATE OR REPLACE TABLE employees ( emp_id NUMBER, first_name VARCHAR(50), last_name VARCHAR(50), dept_id NUMBER, hire_date TIMESTAMP_NTZ, update_timestamp TIMESTAMP_NTZ );
It creates the ‘employees’ table which is the source table holding the data we want to track changes for.
History Table Creation
sqlCREATE OR REPLACE TABLE employees_history ( emp_id NUMBER, first_name VARCHAR(50), last_name VARCHAR(50), dept_id NUMBER, hire_date TIMESTAMP_NTZ, start_time TIMESTAMP_NTZ, end_time TIMESTAMP_NTZ, current_flag INT );
It generates the ‘employees_history’ table which will hold a history of changes to the ‘employees’ table.
sqlCREATE OR REPLACE STREAM employees_table_changes ON TABLE employees;
This creates a stream named ‘employees_table_changes’ on the ‘employees' table.
Change Logic View
sqlCREATE OR REPLACE VIEW employees_change_data AS SELECT emp_id, first_name, last_name, dept_id, hire_date, start_time, end_time, current_flag, dml_type FROM ( -- Logic for new insertions SELECT emp_id, first_name, last_name, dept_id, hire_date, update_timestamp AS start_time, LAG(update_timestamp) OVER (PARTITION BY emp_id ORDER BY update_timestamp DESC) AS end_time_raw, CASE WHEN end_time_raw IS NULL THEN '9999-12-31'::TIMESTAMP_NTZ ELSE end_time_raw END AS end_time, CASE WHEN end_time_raw IS NULL THEN 1 ELSE 0 END AS current_flag, 'I' AS dml_type FROM employees_table_changes WHERE metadata$action = 'INSERT' AND metadata$isupdate = 'FALSE' UNION ALL -- Logic for updates SELECT emp_id, first_name, last_name, dept_id, hire_date, update_timestamp AS start_time, LAG(update_timestamp) OVER (PARTITION BY emp_id ORDER BY update_timestamp DESC) AS end_time_raw, CASE WHEN end_time_raw IS NULL THEN '9999-12-31'::TIMESTAMP_NTZ ELSE end_time_raw END AS end_time, CASE WHEN end_time_raw IS NULL THEN 1 ELSE 0 END AS current_flag, 'U' AS dml_type FROM employees_table_changes WHERE metadata$action = 'UPDATE' UNION ALL -- Logic for deletions SELECT emp_id, NULL, NULL, NULL, NULL, start_time, CURRENT_TIMESTAMP()::TIMESTAMP_NTZ, NULL, 'D' AS dml_type FROM employees_history eh WHERE eh.emp_id IN (SELECT DISTINCT emp_id FROM employees_table_changes WHERE metadata$action = 'DELETE') AND eh.current_flag = 1 );
This script will create a view named ‘employees_change_data’ that defines the logic for handling the changes captured by the ‘employees_table_changes’ stream. The logic includes handling new insertions, updates, and deletions.
The view generates a row for each change, specifying the type of change ('I' for insert, 'U' for update, 'D' for delete) in the ‘dml_type’ column. The ‘current_flag’ is set to 1 for new records and 0 for old records. The ‘start_time’ and ‘end_time’ define the time range during which each version of the record is valid.
Merge Changes Into The History Table
sqlMERGE INTO employees_history eh USING employees_change_data ecd ON eh.emp_id = ecd.emp_id AND eh.start_time = ecd.start_time WHEN MATCHED AND ecd.dml_type = 'U' THEN UPDATE SET eh.end_time = ecd.end_time, eh.current_flag = 0 WHEN MATCHED AND ecd.dml_type = 'D' THEN UPDATE SET eh.end_time = ecd.end_time, eh.current_flag = 0 WHEN NOT MATCHED AND ecd.dml_type = 'I' THEN INSERT (emp_id, first_name, last_name, dept_id, hire_date, start_time, end_time, current_flag) VALUES (ecd.emp_id, ecd.first_name, ecd.last_name, ecd.dept_id, ecd.hire_date, ecd.start_time, ecd.end_time, ecd.current_flag);
This merges the changes defined in the ‘employees_change_data’ view into the ‘employees_history’ table. It checks the ‘dml_type’ field from the view to determine whether to update existing records or insert new records into the history table. The ‘current_flag’ is set to 0 for updated or deleted records to indicate that they are no longer the current version of the record.
Role Creation For Task Execution
sqlUSE ROLE securityadmin; CREATE ROLE taskadmin; USE ROLE accountadmin; GRANT EXECUTE TASK ON ACCOUNT TO ROLE taskadmin; USE ROLE securityadmin; GRANT ROLE taskadmin TO ROLE sysadmin;
This script creates a role named ‘taskadmin’ and grants the EXECUTE TASK privilege to it.
Warehouse Creation For Task
sqlCREATE WAREHOUSE IF NOT EXISTS task_warehouse WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 120;
This creates a warehouse named ‘task_warehouse’ that will provide the compute resources for the task to run.
sqlCREATE OR REPLACE TASK update_employee_history WAREHOUSE = task_warehouse SCHEDULE = '1 minute' WHEN SYSTEM$STREAM_HAS_DATA('employees_table_changes') AS MERGE INTO employees_history eh USING employees_change_data ecd ON eh.emp_id = ecd.emp_id AND eh.start_time = ecd.start_time WHEN MATCHED AND ecd.dml_type = 'U' THEN UPDATE SET eh.end_time = ecd.end_time, eh.current_flag = 0 WHEN MATCHED AND ecd.dml_type = 'D' THEN UPDATE SET eh.end_time = ecd.end_time, eh.current_flag = 0 WHEN NOT MATCHED AND ecd.dml_type = 'I' THEN INSERT (emp_id, first_name, last_name, dept_id, hire_date, start_time, end_time, current_flag) VALUES (ecd.emp_id, ecd.first_name, ecd.last_name, ecd.dept_id, ecd.hire_date, ecd.start_time, ecd.end_time, ecd.current_flag);
This script creates a task named ‘update_employee_history’ that uses the ‘task_warehouse’ for its computation. The task is scheduled to run every minute when there's new data in the ‘employees_table_changes’ stream. The task's job is to execute the MERGE statement which updates the ‘employees_history’ table based on the changes captured in the ‘employees_change_data’ view.
sqlALTER TASK populate_employees_history RESUME;
This will resume the ‘populate_employees_history’ task so that it starts running.
Example 2: Real-Time Tracking Of Product Sales Using Snowflake Streams & Tasks
Here, we will use Snowflake's powerful Streams and Tasks features to track real-time changes in product sales data.
Setting Up The Environment
sql-- setup the context USE ROLE sysadmin; USE DATABASE demo_db; USE SCHEMA public; USE WAREHOUSE compute_wh;
This sets up the necessary context for the following operations. It sets the active role to ‘sysadmin’, the active database to ‘demo_db’, the active schema to the ‘public’, and the active warehouse to ‘compute_wh’.
Creating The Raw & Final Tables
sql--create a raw table where change data capture will be triggered CREATE OR REPLACE TABLE sales_raw_tbl ( product_id NUMBER, sale_date DATE, units_sold NUMBER ); --create the final table where post cdc, data will CREATE OR REPLACE TABLE sales_final_tbl ( product_id NUMBER, sale_date DATE, units_sold NUMBER );
We’ll use this script to create 2 tables: ‘sales_raw_tbl’ and ‘sales_final_tbl’. The ‘sales_raw_tbl’ will capture the real-time sales data while ‘sales_final_tbl’ will hold the processed data.
Creating The Stream
sql--define the stream on the top of sales_raw_tbl CREATE OR REPLACE STREAM sales_stream ON TABLE sales_raw_tbl APPEND_ONLY=TRUE;
With this script, we have created a stream called ‘sales_stream’, on ‘sales_raw_tbl’,. This stream will track any changes made to the ‘sales_raw_tbl’.
Loading Initial Data Into The Final Table
sql-- 1st time data load from sales_raw_tbl to sales_final_tbl INSERT INTO sales_final_tbl SELECT * FROM sales_raw_tbl;
This script loads the initial data from ‘sales_raw_tbl’ into ‘sales_final_tbl’. This could represent an initial bulk load of historical sales data.
Creating The Task
sql--create a task to load these new CDC data to final table CREATE OR REPLACE TASK sales_task WAREHOUSE = compute_wh SCHEDULE = '5 minute' WHEN SYSTEM$STREAM_HAS_DATA('sales_stream') AS INSERT INTO sales_final_tbl SELECT * FROM sales_stream;
We’ll create a task named ‘sales_task’ that uses the ‘compute_wh’ for its computation. The task is scheduled to run every 5 minutes when there's new data in the ‘sales_stream’. The task's job is to execute the INSERT statement which updates the ‘sales_final_tbl’ based on the changes captured in the ‘sales_stream’.
Activating The Task
sql--activate the task to start consuming it USE ROLE accountadmin; ALTER TASK sales_task RESUME;
This activates the ‘sales_task’. Once activated, the task will start running according to its schedule, ensuring that ‘sales_final_tbl’ is always up-to-date with the latest sales data.
Supercharge Your Data Operations With Estuary Flow and Snowflake Integration
While we explored the practical side of Snowflake triggers, we also suggest checking out Estuary Flow — an advanced data pipeline tool that optimizes data operations in synergy with Snowflake's capabilities.
This means that, in addition to having Snowflake triggers to automate database actions, using Estuary Flow to set up your data pipeline can simplify your data management tasks while supercharging your ability to handle real-time data processing and integration. By employing real-time change data capture (CDC), automated schema management, and data deduplication, Flow ensures streamlined and effective data integration.
Estuary Flow brings a new level of automation and resilience to your Snowflake data operations so you can capture and react to data changes in real time. It provides a robust, scalable, and fault-tolerant architecture for managing your data pipelines.
Whether it's dealing with evolving data models, ensuring data accuracy, or handling large volumes of data, Flow manages it all with finesse. Its capabilities align perfectly with Snowflake's Streams and Tasks, making it a great tool to enhance your real-time data processing needs.
Real-Time CDC & Data Deduplication
Flow's real-time CDC can capture changes from your data sources instantly so that your Snowflake Streams are always fed with the most recent data. Its automated data deduplication ensures that your Tasks always operate on unique records, reducing redundancy and improving efficiency.
Automated Schema Management & Built-In Testing
With automated schema management, Flow adapts to evolving data models to ensure that your Snowflake Tasks always work with up-to-date schema. The built-in testing capabilities provide confidence in the data accuracy to make your Snowflake operations more reliable.
Fault-Tolerant Architecture & Scalability
Flow's fault-tolerant architecture keeps your Snowflake Streams and Tasks running even in the face of failures. Its scalability with your data volume guarantees that your Snowflake operations can handle growing data loads without a hitch.
Today, staying competitive means having the ability to act instantaneously and that's exactly what Snowflake triggers do. They give you the power to spot emerging trends, foresee potential disasters, and make fast decisions with lightning speed.
The value of Snowflake triggers is not restricted to any specific domain. From eCommerce giants optimizing product recommendations on the fly to healthcare systems responding to urgent patient needs, the impact stretches across industries.
Now the question arises: how can you amplify the benefits of these capabilities in your own operations? When you combine the power of Snowflake triggers with Estuary Flow, you’ll get the best of both worlds. It adds an extra layer of brilliance to your data management, elevating your data operations to a whole new level.