Introduction
Data is a vital part of any organization. With the rise of different data sources, the Internet of things (IoT), social media, and various transactional and analytical databases, many organizations want to harness this data explosion to make informed, data-driven decisions.
Today, we’ll focus on a popular source and destination: how to stream data from Postgres to Snowflake. This pairing of a relational database and a data warehouse is great for supporting transactional and analytical workloads, but it’s not always easy to sync them in real time.
Data streaming is the process of transmitting a continuous flow of data from a source to a destination. The source can be a transactional database, an IoT sensor, log files from an application, or file storage systems, amongst others. The data in these sources are then continuously transmitted to a destination. This destination could be a data warehouse, a message broker, cloud storage systems, real-time dashboards or monitoring tools, machine learning pipelines, an analytical database, or something else.
Below, I include a tutorial on how to do this quickly and easily. But first, let’s cover the basics.
Streamlining Data: PostgreSQL to BigQuery
PostgreSQL to BigQuery - If you're interested in moving data from PostgreSQL to BigQuery, we've got you covered. Learn how to seamlessly transfer your data in minutes using our comprehensive guide.
Why is Data Streaming Important?
Data streaming allows data processing in real-time, allowing organizations to make fast data-driven and actionable decisions. This helps companies make predictions and take proactive measures to prevent issues before they occur.
Data streaming has a lot of beneficial use cases. These include:
- Fraud detection: A bank or an e-commerce application needs to capture fraud and block a fraudulent transaction the moment it happens. It is quite irrelevant for a system to detect a fraudulent transaction three days after it happens.
With data streaming, fraudulent or suspicious transactions are either blocked or flagged the moment they occur. - Healthcare: With data streaming, you can monitor your patients in real time and also alert staff to any changes in their condition. Also, wearing devices such as smart watches can monitor your vital organs in real time and alert you if there is any change in condition.
- Logistics: With data streaming, users can monitor their goods and shipments in real time. This can also drive them to make informed decisions about the best routes to take, inventory management, and so on.
This can help forecast accurate estimated time of arrival of goods and boost customer satisfaction.
From these examples, you can see that the importance of data streaming in the modern era cannot be overemphasized. Data streaming is important in the health industry, finance industry, electronics industry, retail, construction, and logistics industry, amongst others.
Since data streaming is paramount for organizations, it is very important for data engineers to make use of data streaming tools that ensure low latency and high throughput. In this article, you will learn how to stream data from Postgres to Snowflake using Estuary Flow.
What is Estuary Flow?
Speed is vital for organizations of all industries. You want your destination systems to capture data from sources as soon as it appears.
Estuary Flow is a real-time data operations platform that allows you to set up real-time pipelines with low latency and high throughput without you provisioning any infrastructure.
Estuary Flow is highly scalable. Flow is a distributed system that scales with your data. It performs efficiently whether your upstream data inflow is 100MB/s or 5GB/s. You can also backfill huge volumes of data from your source systems in minutes.
Due to its scalability, low latency, and high throughput, Estuary Flow is trusted when it comes to real-time data streaming, data monitoring, and live reporting.
Getting Started with Estuary Flow to Stream Data From Postgres to Snowflake
To get started with Estuary Flow, create a free account in the web app.
You can either sign up with your GitHub account or your Gmail. After logging in, you will see a welcome page as shown in the image below.
Congratulations!! You have successfully created an Estuary Flow account and you can perform your real-time data streaming with zero latency.
Set up Amazon RDS
In this tutorial, Amazon RDS will be used to host your Postgres database in the cloud. Apart from Amazon RDS, there are also quite a number of alternatives for hosting your local database instance on the cloud. You can also make use of Google Cloud SQL, Azure Database, Amazon Aurora, and so many others.
To create an Amazon RDS instance, you have to log in to your AWS account. After login, you search for Amazon RDS on your search bar.
You then go to Databases > Create database.
The next step is to configure your remote Postgres database.
As shown in the images above, configure a Postgres database instance remotely with:
- The username estuary_username.
- Public access enabled.
- Database name set to estuary_db.
With your configurations set, you then proceed to create the database. This may take some time because RDS needs to allocate compute resources.
After the database finishes creating, you can see the endpoints which you can use to connect to your local Postgres database.
Create a Parameter Group and Enable Replication Factor
Next, you need to create a parameter group and associate it with your database. A parameter group is a set of configurations that you can set for your database.
Navigate to RDS >> Parameter group. Name the parameter group estuary_parameter_group.
After creating a parameter group, you have to configure it. To use with Flow, the only configuration you need to do is to enable logical replication. This will allow data streaming using change data capture.
Edit your parameter group and set rds.logical_replication=1.
Next, associate your parameter group with your database.
Go to RDS>>Databases>>Modify.
Under the Additional configuration setting, change the DB parameter group from default to estuary-parameter-group.
Reboot your database and continue.
Set up Postgres Locally
After signing up for Estuary Flow and creating your remote RDS instance, you’ll need to run a few queries to complete the database setup. To do that, you must first install the Postgres client locally.
Now you can connect to your remote RDS instance. Open the Postgres client and register a new server.
In the Connection tab, you specify your database:
- Hostname
- Port number
- Username
- Database name
- Password
Recall that these parameters were configured when you created your remote Amazon RDS database instance. When all the parameters have been specified, you would successfully connect your local Postgres database instance to the remote Postgres database instance.
Create a table and populate it
Now, let’s popular the database with some sample data.
Imagine you are building a monitoring system. For this to work, you need to monitor and reflect real-time changes in temperature values from various data sources. For this tutorial, the data used can be found here.
You can create a schema and table by executing the code block on your Postgres query editor or psql terminal.
plaintextcreate schema iot_schema;
create table iot_schema.iot_table(
id varchar(100) PRIMARY KEY,
room_id varchar(100),
noted_date varchar(100),
temp integer,
"out/in" varchar(50)
);
To populate the table with data, execute the code below.
plaintext\copy iot_schema.iot_table(select temp_id,room_id,noted_date,temp,’out/in’) FROM '/path-to-file/IOT-temp.csv' with (format csv, header true, delimiter ',');
\copy iot_schema.iot_table FROM '{path_to_csv_file}' with (format csv, header true, delimiter ',');
After executing this command, the data will be successfully loaded into your Postgres table. You can confirm by extracting the first ten records.
plaintextselect * from iot_schema.iot_table limit 10
The output of this query is shown in the image below.
You can also get the total records in the table by executing the command:
plaintextselect count(*) from iot_schema.iot_table
The output shows that there are 97,606 records in our table. Now, you want to stream all these records to your Snowflake destination for real-time analysis.
Set up Snowflake
If you are new to Snowflake, follow Snowflake’s guide to get started. On the Snowflake classic console, you can create a Snowflake database by running the command:
plaintextcreate database estuary_snowflake_db
You can also create a schema in Snowflake by running the command:
plaintextuse "ESTUARY_SNOWFLAKE_DB";
create schema estuary_snowflake_schema;
You can get your Snowflake account name by executing the command:
plaintextSELECT current_account();
You also need to create a user for Estuary and grant the user data warehouse permissions. You can do that by executing the block of code below.
plaintextset estuary_role = 'ESTUARY_ROLE';
set estuary_user = 'ESTUARY_USER';
set estuary_password = 'password';
create database if not exists identifier($database_name);
use database identifier($database_name);
create schema if not exists identifier($estuary_schema);
create role if not exists identifier($estuary_role);
grant role identifier($estuary_role) to role SYSADMIN;
create user if not exists identifier($estuary_user)
password = $estuary_password
default_role = $estuary_role;
grant role identifier($estuary_role) to user identifier($estuary_user);
grant all on schema estuary_snowflake_schema to identifier($estuary_role);
grant USAGE on warehouse estuary_wh to role identifier($estuary_role);
-- grant Estuary access to database
grant CREATE SCHEMA, MONITOR, USAGE on database estuary_snowflake_db to role identifier($estuary_role);
-- change role to ACCOUNTADMIN for STORAGE INTEGRATION support to Estuary (only needed for Snowflake on GCP)
use role ACCOUNTADMIN;
grant CREATE INTEGRATION on account to role identifier($estuary_role);
use identifier($estuary_role);
COMMIT;
Create a Capture
In the previous section, you created an Amazon RDS instance and a Snowflake destination. The next step is to create a capture in Estuary.
Before creating a capture, you need to grant Estuary Flow the necessary permissions. You can achieve this by running the code below in your Postgres query editor.
plaintextCREATE USER flow_capture WITH PASSWORD ‘password’;
GRANT rds_replication TO flow_capture;
GRANT SELECT ON ALL TABLES IN SCHEMA iot_schema TO flow_capture;
GRANT ALL ON ALL TABLES IN SCHEMA iot_schema TO flow_capture;
ALTER DEFAULT PRIVILEGES IN SCHEMA iot_schema GRANT SELECT ON TABLES TO flow_capture;
GRANT USAGE ON SCHEMA iot_schema TO flow_capture ;
CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT);
GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;
GRANT ALL PRIVILEGES ON TABLE iot_schema.iot_table TO flow_capture;
CREATE PUBLICATION flow_publication FOR ALL TABLES;
You can now create a capture by clicking on New Capture.
After clicking on New Capture, you then select your source. In this case, your source connector is Postgres.
When you create a capture, specify your:
- Capture name
- Server address
- Username
- Database password
- Database name
These configurations were set when you created your Amazon RDS instance in the previous section.
When you click on NEXT, you will see the tables and schema you also created in the previous section.
You can now save and publish the capture.
Data is being ingested from Postgres. Now, you need to connect it to Snowflake.
Create a Materialization
To create a materialization, navigate to the Materialization tab in the Flow UI and click on Materialization.
You then specify your destination. The destination in this case is Snowflake Data Cloud.
When you select your destination, you will have to configure it by specifying the:
- Host URL
- Snowflake database name
- Warehouse name
- Account name
- Password
You will also have to specify your collection representing the data captured from Postgres.
When you enable the materialization, you will notice that data gets streamed from your source to your destination. Let’s take a look at the number of records that landed in the destination.
From the image above, you notice that all the data got streamed from Postgres to Snowflake.
Verify the results
In this section, you will append new data to your Postgres source and verify the results in your Snowflake destination.
In your Postgres local server, you can run the command:
plaintextinsert into iot_schema.iot_table values ('__export__new_id_28282',
'Room admin','2023-02-01',67,'IN')
This will insert a new record into your Postgres database. You can verify this stream in your Snowflake data warehouse by executing the command shown below.
You can see the results of your stream inserted in your Snowflake destination. You can see that Estuary Flow does this with very minimal latency.
Conclusion
In this tutorial, you have learned about the importance of real-time data streaming. You have also learned how to stream data from your source to your destination with no latency using Estuary Flow.
Questions about this tutorial? Reach out to us on Slack.