How to Stream Data from Postgres to Snowflake: Full Guide
Streaming data from Postgres to Snowflake is a great way to support transactional and analytical workloads. And it’s easier than it seems.

Introduction

Data is a vital part of any organization. With the rise of different data sources, the Internet of things (IoT), social media, and various transactional and analytical databases, many organizations want to harness this data explosion to make informed, data-driven decisions.

Today, we’ll focus on a popular source and destination: streaming data from Postgres to Snowflake. This pairing of a relational database and a data warehouse is great for supporting transactional and analytical workloads, but it’s not always easy to sync them in real time.

Data streaming is the process of transmitting a continuous flow of data from a source to a destination. The source can be a transactional database, an IoT sensor, log files from an application, or file storage systems, amongst others. The data in these sources are then continuously transmitted to a destination. This destination could be a data warehouse, a message broker, cloud storage systems, real-time dashboards or monitoring tools, machine learning pipelines, an analytical database, or something else.

Below, I include a tutorial on how to do this quickly and easily. But first, let’s cover the basics.

Why is Data Streaming Important?

Data streaming allows data processing in real-time, allowing organizations to make fast data-driven and actionable decisions. This helps companies make predictions and take proactive measures to prevent issues before they occur. 

Data streaming has a lot of beneficial use cases. These include:

  • Fraud detection: A bank or an e-commerce application needs to capture fraud and block a fraudulent transaction the moment it happens. It is quite irrelevant for a system to detect a fraudulent transaction three days after it happens. 

    With data streaming, fraudulent or suspicious transactions are either blocked or flagged the moment they occur.
  • Healthcare: With data streaming, you can monitor your patients in real time and also alert staff to any changes in their condition. Also, wearing devices such as smart watches can monitor your vital organs in real time and alert you if there is any change in condition.
  • Logistics: With data streaming, users can monitor their goods and shipments in real time. This can also drive them to make informed decisions about the best routes to take, inventory management, and so on. 

    This can help forecast accurate estimated time of arrival of goods and boost customer satisfaction.

From these examples, you can see that the importance of data streaming in the modern era cannot be overemphasized. Data streaming is important in the health industry, finance industry, electronics industry, retail, construction, and logistics industry, amongst others.

Since data streaming is paramount for organizations, it is very important for data engineers to make use of data streaming tools that ensure low latency and high throughput. In this article, you will learn how to stream data from Postgres to Snowflake using Estuary Flow.

What is Estuary Flow?

Speed is vital for organizations of all industries. You want your destination systems to capture data from sources as soon as it appears. 

Estuary Flow is a real-time data operations platform that allows you to set up real-time pipelines with low latency and high throughput without you provisioning any infrastructure. 

Estuary Flow is highly scalable. Flow is a distributed system that scales with your data. It performs efficiently whether your upstream data inflow is 100MB/s or 5GB/s. You can also backfill huge volumes of data from your source systems in minutes.

Due to its scalability, low latency, and high throughput, Estuary Flow is trusted when it comes to real-time data streaming, data monitoring, and live reporting. 

Getting Started with Estuary Flow to load Data From Postgres to Snowflake

Learn how to effortlessly stream data from Postgres to Snowflake with these easy steps using Estuary Flow:

Step 1:  Create an Estuary Flow account
Step 2: Set up Amazon RDS
Step 3: Set up Postgres Locally
Step 4: Create a table and populate it
Step 5: Set up Snowflake
Step 6: Create a Capture
Step 7: Create a Materialization
Step 8: Verify the results

Let's explore all the steps of streaming PostgreSQL database to Snowflake.

Prerequisites

  • An Estuary Flow account
  • A Postgres database
  • A Snowflake account

Step 1: Create an Estuary Flow account

You can either sign up with your GitHub account or your Gmail. After logging in, you will see a welcome page as shown in the image below.

 

Blog Post Image

 

Congratulations!! You have successfully created an Estuary Flow account and you can perform your real-time data streaming with zero latency.

Step 2: Set up Amazon RDS

  • Create an Amazon RDS instance to host the Postgres database

In this tutorial, Amazon RDS will be used to host your Postgres database in the cloud. Apart from Amazon RDS, there are also quite a number of alternatives for hosting your local database instance on the cloud. You can also make use of Google Cloud SQL, Azure Database, Amazon Aurora, and so many others. 

To create an Amazon RDS instance, you have to log in to your AWS account. After login, you search for Amazon RDS on your search bar.

 

Blog Post Image

 

You then go to Databases > Create database.

 

Blog Post Image

 

  • Configure the Postgres database instance remotely with necessary parameters
Blog Post Image

 

 

Blog Post Image

 

 

Blog Post Image

 

 

Blog Post Image

 

As shown in the images above, configure a Postgres database instance remotely with:

  1. The username estuary_username
  2. Public access enabled 
  3. Database name set to estuary_db

With your configurations set, you then proceed to create the database. This may take some time because RDS needs to allocate compute resources.

 

Blog Post Image

 

After the database finishes creating, you can see the endpoints which you can use to connect to your local Postgres database.

  • Create a Parameter Group and Enable Replication Factor

Next, you need to create a parameter group and associate it with your database.  A parameter group is a set of configurations that you can set for your database. 

Navigate to RDS >> Parameter group. Name the parameter group estuary_parameter_group.

 

Blog Post Image

 

After creating a parameter group, you have to configure it. To use with Flow, the only configuration you need to do is to enable logical replication. This will allow data streaming using change data capture.

Edit your parameter group and set rds.logical_replication=1.

 

Blog Post Image
  • Associate the parameter group with the database

Go to RDS>>Databases>>Modify.

Under the Additional configuration setting, change the DB parameter group from default to estuary-parameter-group.

 

Blog Post Image

 

Reboot your database and continue.

Step 3: Set up Postgres Locally

  • Install the Postgres client locally

After signing up for Estuary Flow and creating your remote RDS instance, you’ll need to run a few queries to complete the database setup. To do that, you must first install the Postgres client locally

  • Connect to the remote RDS instance through the Postgres client
  • Open the Postgres client and register a new server with connection details.

 

Blog Post Image

 

In the Connection tab, you specify your database: 

  1. Hostname 
  2. Port number
  3. Username
  4. Database name
  5. Password

Recall that these parameters were configured when you created your remote Amazon RDS database instance. When all the parameters have been specified, you would successfully connect your local Postgres database instance to the remote Postgres database instance.

Step 4: Create a table and populate it

  • Create a schema and table for the monitoring system

Now, let’s popular the database with some sample data.

Imagine you are building a monitoring system. For this to work, you need to monitor and reflect real-time changes in temperature values from various data sources. For this tutorial, the data used can be found here.

You can create a schema and table by executing the code block on your Postgres query editor or psql terminal.

plaintext
create schema iot_schema; create table iot_schema.iot_table( id varchar(100) PRIMARY KEY, room_id varchar(100), noted_date varchar(100), temp integer, "out/in" varchar(50) );
  • Populate the table with sample data from a CSV file

To populate the table with data, execute the code below.

plaintext
\copy iot_schema.iot_table(select temp_id,room_id,noted_date,temp,’out/in’) FROM '/path-to-file/IOT-temp.csv' with (format csv, header true, delimiter ','); \copy iot_schema.iot_table FROM '{path_to_csv_file}' with (format csv, header true, delimiter ',');
  • Confirm data insertion and count records

After executing this command, the data will be successfully loaded into your Postgres table. You can confirm by extracting the first ten records.

plaintext
select * from iot_schema.iot_table limit 10

The output of this query is shown in the image below.

 

Blog Post Image

 

You can also get the total records in the table by executing the command:

plaintext
select count(*) from iot_schema.iot_table 

 

Blog Post Image

 

The output shows that there are 97,606 records in our table. Now, you want to stream all these records to your Snowflake destination for real-time analysis.

Step 5: Set up Snowflake

  • Create a Snowflake database and schema

If you are new to Snowflake, follow Snowflake’s guide to get started. On the Snowflake classic console, you can create a Snowflake database by running the command:

plaintext
create database estuary_snowflake_db

You can also create a schema in Snowflake by running the command:

plaintext
use "ESTUARY_SNOWFLAKE_DB"; create schema estuary_snowflake_schema;

You can get your Snowflake account name by executing the command:

plaintext
SELECT current_account();
  • Create a user for Estuary and grant necessary permissions

You also need to create a user for Estuary and grant the user data warehouse permissions. You can do that by executing the block of code below.

plaintext
set estuary_role = 'ESTUARY_ROLE'; set estuary_user = 'ESTUARY_USER'; set estuary_password = 'password'; create database if not exists identifier($database_name); use database identifier($database_name); create schema if not exists identifier($estuary_schema); create role if not exists identifier($estuary_role); grant role identifier($estuary_role) to role SYSADMIN; create user if not exists identifier($estuary_user) password = $estuary_password default_role = $estuary_role; grant role identifier($estuary_role) to user identifier($estuary_user); grant all on schema estuary_snowflake_schema to identifier($estuary_role); grant USAGE on warehouse estuary_wh to role identifier($estuary_role); -- grant Estuary access to database grant CREATE SCHEMA, MONITOR, USAGE on database estuary_snowflake_db to role identifier($estuary_role); -- change role to ACCOUNTADMIN for STORAGE INTEGRATION support to Estuary (only needed for Snowflake on GCP) use role ACCOUNTADMIN; grant CREATE INTEGRATION on account to role identifier($estuary_role); use identifier($estuary_role); COMMIT;

Step 6: Create a Capture

In the previous section, you created an Amazon RDS instance and a Snowflake destination. The next step is to create a capture in Estuary.  

  • Grant permissions to Estuary Flow for data capture in Postgres

Before creating a capture, you need to grant Estuary Flow the necessary permissions. You can achieve this by running the code below in your Postgres query editor.

plaintext
CREATE USER flow_capture WITH PASSWORD ‘password’; GRANT rds_replication TO flow_capture; GRANT SELECT ON ALL TABLES IN SCHEMA iot_schema TO flow_capture; GRANT ALL ON ALL TABLES IN SCHEMA iot_schema TO flow_capture; ALTER DEFAULT PRIVILEGES IN SCHEMA iot_schema GRANT SELECT ON TABLES TO flow_capture; GRANT USAGE ON SCHEMA iot_schema TO flow_capture ; CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT); GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture; GRANT ALL PRIVILEGES ON TABLE iot_schema.iot_table TO flow_capture; CREATE PUBLICATION flow_publication FOR ALL TABLES;

You can now create a capture by clicking on New Capture.

 

Blog Post Image

 

After clicking on New Capture, you then select your source. In this case, your source connector is Postgres. 

  • Create a capture in Estuary Flow, specifying source details

When you create a capture, specify your:

  1. Capture name
  2. Server address
  3. Username
  4. Database password
  5. Database name

 

These configurations were set when you created your Amazon RDS instance in the previous section. 

When you click on NEXT, you will see the tables and schema you also created in the previous section. 

 

Blog Post Image

 

You can now save and publish the capture.

Data is being ingested from Postgres. Now, you need to connect it to Snowflake.

Step 7: Create a Materialization

  • To create a materialization, navigate to the Materialization tab in the Flow UI and click on Materialization. 
  • Specify Snowflake as the destination and configure connection details.

 

Blog Post Image

 

When you select your destination, you will have to configure it by specifying the:

  1. Host URL
  2. Snowflake database name
  3. Warehouse name
  4. Account name
  5. Password

 

Blog Post Image
  • Specify the data collection from Postgres as the source

 

Blog Post Image
  • Enable the materialization to start data streaming.

When you enable the materialization, you will notice that data gets streamed from your source to your destination. Let’s take a look at the number of records that landed in the destination. 

 

Blog Post Image

 

From the image above, you notice that all the data got streamed from Postgres to Snowflake.

Step 8: Verify the results

In this section, you will append new data to your Postgres source and verify the results in your Snowflake destination.

  • Append new data to the Postgres source.

In your Postgres local server, you can run the command:

plaintext
insert into iot_schema.iot_table values ('__export__new_id_28282', 'Room admin','2023-02-01',67,'IN')

This will insert a new record into your Postgres database. 

  • Verify data stream in the Snowflake destination

 

Blog Post Image

 

You can see the results of your stream inserted in your Snowflake destination. You can see that Estuary Flow does this with very minimal latency. 

Looking to replicate your success with PostgreSQL to BigQuery? Check out our comprehensive guide on seamlessly moving data from PostgreSQL to BigQuery.

Conclusion

In this tutorial, you have learned about the importance of real-time data streaming. You have also learned how to stream data from your PostgreSQL to Snowflake with no latency using Estuary Flow. 

Questions about this tutorial? Reach out to us on Slack

Blog Post Image