Estuary

Building Real-Time ELT Pipelines with Estuary and Coalesce

Stream PostgreSQL CDC data into Snowflake with Estuary and transform it with Coalesce. Learn how to build a real time ELT pipeline with staging, dynamic tables, and Cortex AI.

Build Real Time ELT Pipelines with Estuary and Coalesce
Share this article

Key Takeaways

  • Stream PostgreSQL CDC into Snowflake in near real time without scripts, schedulers, or batch jobs.

  • Use Estuary to power dependable right-time data movement with predictable performance and reliability.

  • Transform streaming data at scale with Coalesce using Dynamic Tables, incremental processing, and persistent staging.

  • Apply Snowflake Cortex AI directly in ELT pipelines for real-time sentiment analysis and advanced analytics.

Introduction

While working with OLTP workloads, relational databases such as PostgreSQL work exceptionally, but often struggle with large-scale analytical workloads. In contrast, cloud-based data warehouses such as Snowflake are built for complex analytics, offering high performance and scalability. However, moving real time or CDC data from PostgreSQL to Snowflake is often challenging, especially when relying on manual ETL scripts, batch jobs, or tools that introduce latency and operational overhead.

Estuary, the right time data platform, simplifies this process by enabling continuous and dependable CDC streaming from PostgreSQL to Snowflake with minimal setup. Data teams can choose when data moves, whether sub second, near real time, or batch, while maintaining predictable reliability and unified pipeline operations.

Once raw data lands in Snowflake, transforming it into high-quality, analytics-ready datasets can be time-consuming. Many existing data transformation tools are expensive and difficult to scale, which often limits adoption. Without standardization and automation, environments become fragmented, manual deployments are difficult to maintain, and data lineage is hard to trace.

Coalesce addresses these challenges with a fully managed SaaS data transformation platform that combines low-code development, version control, governance, and automation—giving teams granular visibility into costs and performance down to the query level.

In this article, we walk through how to build a real time ELT pipeline by streaming simulated ecommerce CDC data from PostgreSQL into Snowflake using Estuary, and then use Coalesce to transform that data, build Dynamic Tables, and integrate Snowflake Cortex ML for real-time sentiment analysis—all within a modern, cloud-native ELT workflow.

Architecture Overview

connecting PostgreSQL, Estuary, Snowflake, and Coalesce.png

This pipeline continuously streams data from PostgreSQL, lands it in Snowflake in near real time, and transforms it into analytics-ready tables using Coalesce. Estuary handles right time data movement by continuously capturing CDC events from PostgreSQL and delivering them into Snowflake as they occur. Once the raw data is available, Coalesce applies standardized transformations, joins, dynamic refresh logic, incremental processing, and optional AI enhancements through Snowflake Cortex.

The result is a unified, cloud native workflow that keeps data fresh, reliable, and analytics ready with minimal operational effort.

This pipeline streams PostgreSQL CDC directly into Snowflake in near real time without batch jobs or schedulers. Estuary handles dependable right-time ingestion, while Coalesce manages transformations using Dynamic Tables and incremental processing to keep data fresh, scalable, and analytics ready.

Pre-requisites

Before building the real time ELT pipeline, make sure you have the following tools and accounts set up:

  1. Postgres: An active PostgreSQL database account hosted locally.
  2. Estuary Account: Sign up for a 30-day free trial.
  3. OpenAI account: The project uses OpenAI’s API - GPT-4 Mini to simulate product reviews.  
  4. Github repository: Clone or refer to this repository to see how we simulate the Postgres CDC data using a Python script.
  5. Snowflake Account: Required as the cloud data warehouse where raw CDC data will land and transform.
  6. Coalesce Account - Sign up for a 2-week free trial.

These prerequisites ensure you have everything needed to simulate CDC events, stream them into Snowflake with Estuary, and build transformations in Coalesce.

On Coalesce

Coalesce is a modern, cloud-native data transformation and governance platform designed to simplify and accelerate the “T” in ELT through low-code development, automation, and AI-assisted workflows. While it integrates with major cloud warehouses such as Snowflake, Databricks, Redshift, and BigQuery, this article focuses on its deep integration with Snowflake for scalable, AI-driven analytics.

Key Features of Coalesce:

  • Low-Code, Visual Interface: Build and manage data transformations through an intuitive drag-and-drop UI that automatically generates clean, performant SQL behind the scenes.
  • Reusable Templates & Version Control: Standardize ELT development with modular templates, reusable logic, and built-in version tracking for consistent, governed workflows.
  • AI-Assisted Development: Use AI translators to convert mappings and legacy logic into maintainable, cloud-native SQL patterns, reducing manual coding and refactoring time.
  • Data Governance Built-In: Enforce naming conventions, schema standards, and data contracts as you build, ensuring compliance and consistency across teams.
  • Automated Testing & Validation: Auto-generate tests, compare outputs, and validate transformations to improve reliability and speed up deployment cycles.
  • AI-Powered Documentation with Coalesce Copilot: Automatically generate lineage diagrams, metadata, and documentation to improve data transparency and quality.

Why Coalesce works well with Snowflake:

  • Agile Snowflake Data Modeling: Build and deploy complex Snowflake data models rapidly using Coalesce’s pre-built Node types (Work, Dimension, Fact, View) and Marketplace templates.
  • Native Snowflake Integration: Automatically generate Snowflake-specific SQL, code templates, and transformation logic optimized for Snowflake’s architecture.
  • Snowflake Cortex Compatibility: Seamlessly integrate with Snowflake Cortex to operationalize AI and ML workloads, including time-series forecasting, anomaly detection, and root-cause analysis.
  • Dynamic Tables, Streams & Tasks Automation: Manage and automate advanced Snowflake features such as Dynamic Tables, Streams, Tasks, and Iceberg Tables directly within Coalesce.

Step-by-Step Implementation

Step 1 – Set up Postgres Data

Create and Insert Product Data 

Add the following schema definitions to your PostgreSQL environment: 

plaintext
-- Products (use the 50-row insert you ran earlier) CREATE TABLE IF NOT EXISTS products ( product_id SERIAL PRIMARY KEY, sku VARCHAR(32) UNIQUE NOT NULL, name VARCHAR(120) NOT NULL, category VARCHAR(60) NOT NULL, gender VARCHAR(12) NOT NULL, color VARCHAR(40) NOT NULL, size_label VARCHAR(12) NOT NULL, price NUMERIC(10,2) NOT NULL CHECK (price >= 0), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); INSERT INTO products (sku, name, category, gender, color, size_label, price) VALUES ('TSH-0001','Classic Cotton T-Shirt','Tops','Unisex','White','M',19.99), ('TSH-0002','Classic Cotton T-Shirt','Tops','Unisex','Black','L',19.99), ('TSH-0003','Classic Cotton T-Shirt','Tops','Unisex','Navy','S',19.99), ('TSH-0004','Graphic Tee','Tops','Unisex','Grey','M',24.99), ('TSH-0005','Oversized Tee','Tops','Unisex','Beige','L',24.99), ('BLU-0006','Linen Blouse','Tops','Women','White','M',39.99), ('SWP-0007','Fleece Sweatshirt','Tops','Unisex','Olive','M',49.99), ('CRD-0008','Knit Cardigan','Tops','Women','Navy','S',59.99), ('PLS-0009','Polo Shirt','Tops','Men','Blue','L',34.99), ('TNK-0010','Ribbed Tank Top','Tops','Women','Black','M',14.99), ('JNS-0011','Slim Fit Jeans','Bottoms','Men','Blue','32',49.99), ('JNS-0012','High-Rise Jeans','Bottoms','Women','Black','28',59.99), ('CHN-0013','Chino Pants','Bottoms','Men','Beige','34',44.99), ('SKT-0014','A-Line Skirt','Bottoms','Women','Navy','M',39.99), ('SRT-0015','Casual Shorts','Bottoms','Unisex','Khaki','M',29.99), ('DRS-0016','Floral Summer Dress','Dresses','Women','Red','M',69.99), ('DRS-0017','Wrap Midi Dress','Dresses','Women','Green','S',79.99), ('DRS-0018','Slip Dress','Dresses','Women','Black','M',74.99), ('DRS-0019','Shirt Dress','Dresses','Women','Blue','L',64.99), ('JMP-0020','Denim Jumpsuit','Dresses','Women','Blue','M',89.99), ('JKT-0021','Denim Jacket','Outerwear','Unisex','Blue','L',79.99), ('COA-0022','Light Puffer Jacket','Outerwear','Unisex','Black','M',99.99), ('TRN-0023','Trench Coat','Outerwear','Women','Beige','M',129.99), ('BLZ-0024','Tailored Blazer','Outerwear','Men','Navy','L',119.99), ('RNC-0025','Raincoat','Outerwear','Unisex','Olive','M',89.99), ('SNK-0026','Running Sneakers','Footwear','Unisex','Black','9',89.99), ('SNK-0027','Running Sneakers','Footwear','Unisex','White','8',89.99), ('LOF-0028','Leather Loafers','Footwear','Men','Brown','10',129.99), ('BTN-0029','Chelsea Boots','Footwear','Unisex','Black','9',149.99), ('SDL-0030','Strappy Sandals','Footwear','Women','Beige','7',69.99), ('BLT-0031','Leather Belt','Accessories','Unisex','Brown','M',24.99), ('SCF-0032','Wool Scarf','Accessories','Unisex','Grey','L',29.99), ('CAP-0033','Cotton Cap','Accessories','Unisex','Navy','M',19.99), ('TTE-0034','Canvas Tote Bag','Accessories','Unisex','Natural','L',24.99), ('BNI-0035','Knit Beanie','Accessories','Unisex','Black','M',14.99), ('HOD-0036','Fleece Hoodie','Tops','Unisex','Charcoal','L',49.99), ('RGN-0037','Raglan Tee','Tops','Men','White','M',22.99), ('BTN-0038','Button-Down Shirt','Tops','Men','Light Blue','L',44.99), ('BLS-0039','Silk Blend Blouse','Tops','Women','Ivory','M',64.99), ('CRO-0040','Cropped Tee','Tops','Women','Pink','S',21.99), ('LGS-0041','High-Waist Leggings','Bottoms','Women','Black','M',39.99), ('TRK-0042','Track Pants','Bottoms','Unisex','Grey','M',34.99), ('SHR-0043','Biker Shorts','Bottoms','Women','Black','S',24.99), ('CAR-0044','Cargo Pants','Bottoms','Men','Olive','34',59.99), ('WDE-0045','Wide-Leg Trousers','Bottoms','Women','Beige','M',69.99), ('CRD-0046','Corduroy Jacket','Outerwear','Men','Tan','L',99.99), ('PRK-0047','Parka Jacket','Outerwear','Unisex','Navy','M',129.99), ('GIL-0048','Lightweight Gilet','Outerwear','Unisex','Black','M',69.99), ('HIK-0049','Hiking Boots','Footwear','Unisex','Brown','10',139.99), ('SLP-0050','Slip-On Sneakers','Footwear','Unisex','White','9',74.99);

b. Define Customers, Orders, and Reviews Schema in Postgres 

plaintext
-- ========================================= -- Customers table -- ========================================= CREATE TABLE IF NOT EXISTS retail.customers ( customer_id INT PRIMARY KEY, first_name VARCHAR(100), last_name VARCHAR(100), email VARCHAR(255) UNIQUE NOT NULL, phone_number VARCHAR(50), date_of_birth DATE, address VARCHAR(255), city VARCHAR(100), state VARCHAR(50), postal_code VARCHAR(20), country VARCHAR(50) DEFAULT 'USA', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- ========================================= -- Orders table -- ========================================= CREATE TABLE IF NOT EXISTS retail.orders ( order_id INT PRIMARY KEY, order_detail_id INT, customer_id INT REFERENCES retail.customers(customer_id), total_amount NUMERIC(12,2), order_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status VARCHAR(50), payment_method VARCHAR(50), shipping_address VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- ========================================= -- Order Detail table -- ========================================= CREATE TABLE IF NOT EXISTS retail.order_detail ( order_detail_id SERIAL PRIMARY KEY, order_id INT REFERENCES retail.orders(order_id) ON DELETE CASCADE, product_id INT REFERENCES retail.products(product_id), name VARCHAR(255), quantity INT CHECK (quantity > 0), price NUMERIC(10,2) CHECK (price >= 0), discount_amount NUMERIC(10,2) DEFAULT 0 CHECK (discount_amount >= 0), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- ========================================= -- Reviews table -- ========================================= CREATE TABLE IF NOT EXISTS retail.reviews ( review_id SERIAL PRIMARY KEY, user_id INT REFERENCES retail.customers(customer_id), product_id INT REFERENCES retail.products(product_id), rating INT CHECK (rating BETWEEN 1 AND 5), review_text TEXT, review_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

c. Simulate Orders and Reviews Data with Python Script

Clone this repository and run it to continuously generate synthetic data for your e-commerce system. The script uses the Faker library to create customers, orders, and reviews, and integrates with OpenAI’s API to generate realistic review text.

Once running, the script randomly inserts new customers, orders, and product reviews into your PostgreSQL database every few seconds, simulating a live transactional system. The generate_review_text() function uses GPT-4 Mini to create short, natural-sounding reviews — approximately 80% positive and 20% negative — to reflect realistic user sentiment patterns.

This data will later be captured in real time by Estuary.

Output of datagen.py file:

Terminal output showing Postgres data generation.png

a. Install Dependencies

b. Configure Environment Variables

plaintext
pip install -r requirements.txt

Create a .env file add the following:

plaintext
# .env # --- PostgreSQL Connection --- PG_DSN=postgresql://<username>:<password>@<host>:<port>/<database> # --- OpenAI API OPENAI_API_KEY=sk-your-openai-api-key-here
  • PG_DSN: This is the connection string that the script uses to connect to your Postgres instance.
  • OPENAI_API_KEY: Optional. Enables realistic product reviews via GPT models. If you don’t include this, the script will fallback to a static text generator.

c. Configure PostgreSQL for CDC

In postgres, enable Change Data Capture so Estuary can read new inserts and updates:

plaintext
#init.sql --Connect to your instance and create a new user and password: CREATE USER flow_capture WITH PASSWORD 'secret' REPLICATION; --Assign the appropriate role. GRANT pg_read_all_data TO flow_capture; --Create the watermarks table, grant privileges, and create publication: CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT); GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture; CREATE PUBLICATION flow_publication; ALTER PUBLICATION flow_publication SET (publish_via_partition_root = true); ALTER PUBLICATION flow_publication ADD TABLE public.flow_watermarks, <other_tables>; --Set WAL level to logical: ALTER SYSTEM SET wal_level = logical;

d. Expose the database to the internet via ngrok 

If you are running PostgreSQL locally, use ngrok to create a secure tunnel that Estuary can connect to:

plaintext
ngrok tcp 5432
Ngrok session displaying generated TCP forwarding address for PostgreSQL

Copy the forwarding address (e.g., tcp://0.tcp.ngrok.io:12345)—you’ll use this in the next step.

Step 2 – Set Up Estuary Capture

  1. Log in to Estuary and create a New Capture
  2. Select PostgreSQL as the source.
  3. Enter your connection details:
    1. Host: ngrok address or public endpoint
    2. Database name
    3. Username and password
    4. Port: 5432
PostgreSQL capture configuration in Estuary with connection details.png
  1. Choose the desired tables (orders, order_details, customers, products, reviews) for capture. 
  2. Save and start the capture to begin streaming data into your Estuary collections. 

Your data is now continuously replicated in near real-time from PostgreSQL into Estuary’s managed pipeline.

Capture Details:

Estuary capture dashboard for PostgreSQL with usage and connector details.png

In your collections, data will be captured:

Captured retail collections and document metrics in Estuary.png

Step 3 – Materialize Data to Snowflake

  1. In the Estuary Dashboard, go to Destinations and Create Materializations. Choose Snowflake as the destination connector. 
Estuary materialization creation screen with Snowflake connector selected
  1. Add your credentials to connect to Snowflake
Estuary materialization settings for Snowflake connection and warehouse.png
  1. Select the Estuary collections you created earlier as the source datasets. Choose Delta Updates to enable Snowpipe Streaming to load data into Snowflake in near-real-time. This ingestion method writes rows directly into Snowflake tables and automatically scales compute resources based on the data load.
Snowflake materialization configuration in Estuary with delta updates enabled.png
  1. Test the connection and click Deploy. Estuary will now continuously materialize PostgreSQL changes into Snowflake, maintaining CDC consistency.
Snowflake materialization status in the Estuary Destinations dashboard.png

Move from manual ingestion to dependable right time pipelines. Create Your First CDC Pipeline.

Step 4 – Connect Snowflake to Coalesce

Once your raw data resides in Snowflake, the next step is to connect it to Coalesce for transformation.

  1. Log in to your Snowflake account and copy your Account/Server URL. Example: If your URL is ABC123-1234567.snowflakecomputing.com, use ABC123-1234567 as the server ID.
Coalesce workspace setup screen prompting for Snowflake account URL
  1. In Coalesce, navigate to Build Settings → Environments or Workspaces.
  2. Click Edit on your target environment and choose Authentication Type: Username & Password.
Snowflake connection credentials configured in the Estuary workspace.png
  1. Enter your Snowflake username and password credentials.
  2. Save the configuration and test the connection to verify integration.

Step 5: Setup Storage Locations and Mapping 

In Coalesce, Storage Locations define where database objects (tables, views, or stages) are stored and how they map to physical schemas in Snowflake.

  1. Open Build Settings → Storage Locations.
  2. Configure logical mappings as follows:
    • Source Database: ESTUARY_DEMO_DB
    • Source Schema: RETAIL
    • Target Database: ESTUARY_DEMO_DB
    • Target Schema: RETAIL_STG (staging layer)
  3. Apply and save these mappings.
  4. Once configured, add the source tables (from Estuary-streamed collections) to your SQL Pipeline within Coalesce.
Coalesce storage mapping for Snowflake schemas.png

Next, add sources to SQL Pipeline:

Source table selection dialog in Coalesce for the SQL pipeline.png

You’re now ready to begin data modeling and transformations.

Step 6: Perform Transformations on Coalesce

The next stage is transforming it into analytics-ready models in Coalesce. This section outlines the end-to-end steps, showing how each transformation technique - Dynamic Tables, Incremental Processing, Persistent Staging, and Cortex AI contributes to building a modern, scalable ELT workflow.

Stage Table Transformation

Staging tables help prepare and clean raw data before further transformation or modeling. They are critical for maintaining standardized logic and enforcing data quality early in the pipeline.

Steps:

  1. In the Coalesce interface, navigate to your imported source tables (e.g., PRODUCTS).
  2. Right-click the PRODUCTS node and select “Add Node → Stage.”
  3. Open the SQL Editor for STG_PRODUCTS.
  4. Apply transformations such as trimming empty categories and replacing nulls:
plaintext
COALESCE(NULLIF(TRIM("PRODUCTS"."CATEGORY"), ''), 'General')
Coalesce transformation for standardizing category values.png
  1. Click Create and Run to apply the changes and materialize the staging table.
STG_PRODUCTS staging transformation in Coalesce.png

Dynamic Tables for Real-Time Data 

Dynamic Tables in Snowflake automate data refreshes without manual scheduling, enabling real-time orchestration of pipelines. Coalesce supports this natively through its Dynamic Table Package, making it ideal for CDC pipelines that continuously process new records. 

Steps: 

  1. Install the Dynamic Tables Package from the Coalesce Marketplace (@coalesce/dynamic-tables).
Coalesce package installation dialog for adding the dynamic tables package
  1. On the Build page, click Build Settings, select Packages, then click Install.
  1. Configure the package via Build Settings → Packages → Install and specify your Snowflake warehouse: 
{
  "targetDynamicTableWarehouse": "COMPUTE_WH"
}
        
  1. Select your base source nodes (ORDERS and ORDER_DETAIL) → Right-click → Join Nodes → Dynamic Table → Dynamic Table Work.
  1. In the Dynamic Table Options panel, set:
    • Warehouse: COMPUTE_WH
    • Lag Specification: 5 minute
    • Refresh Mode: AUTO
    • Initialize: ON_SCHEDULE
  1. Run the node to create continuously updating tables in Snowflake.
  2. Select the ORDER_DETAIL and ORDERS source nodes using the Shift key.
Staging and work node pipeline for orders data in Coalesce
  1. Join the Dynamic Table nodes by selecting DT_WRK_ORDERS and DT_WRK_ORDER_DETAIL, then right-click and select Join Nodes → Dynamic Tables → Dynamic Table Work.
Coalesce join logic connecting order detail and order tables.png
Staging, dynamic table, and master order pipeline in Coalesce.png

The visual pipeline shows ORDERS and ORDER_DETAIL nodes connected to a Dynamic Table Work node, representing continuous joins.

In Snowflake, you can view the changes in the graph:

The lag is 5 minutes. 

Snowflake dynamic table graph with upstream refresh status.png

Incremental Processing in Coalesce

In an environment where data is updating in real-time, transactional tables containing data like orders data can become quite large and it is unrealistic to process all of the data in the table every time it updates. In these situations, you can use incremental data loading. Let's learn how to do this by incrementally processing the data from your Dynamic Table pipeline. 

Incremental nodes optimize performance by processing only new or changed data, rather than reprocessing entire datasets each time. This is essential for CDC workloads and ensures timely updates with minimal resource usage. Similar to the Dynamic tables, you need to install a package for incremental processing.

Steps:

  1. Right-click your Dynamic Table (e.g., DT_WRK_ORDER_MASTER) → Add Node → Incremental → Incremental Load.
Coalesce pipeline graph menu for adding, copying, and configuring nodes.png
  1. Add order_id and order_detail as a business key. 
Business key and test settings configured in a Coalesce table node.png

3. Set a Persistent table location

Incremental load configuration in Coalesce using FLOW_PUBLISHED_AT.png
  • Storage Location: TARGET
  • Table Name: PSTG_ORDER_MASTER
  • Incremental Column: FLOW_PUBLISHED_AT
  1. Open the configuration options of the node on the right side of the screen and toggle on Filter data based on the Persistent table. This will allow you to configure the node to incrementally filter and process data.
Coalesce incremental load settings using a persistent table for orders.png
  1. Open the SQL Editor and click Generate Join to auto-create SQL logic for incremental loads.
Order data filtered incrementally in Coalesce using FLOW_PUBLISHED_AT.png
  1. Select Copy to Editor to copy the code into the SQL editor for you to reconfigure the dependency of the node as well as supply the incremental logic for the node.
  2. Next, you need to create a table to persist the data from the incremental node.
  3. Right Click on the INC_ORDER_MASTER node in the Browser and select Add Node -> Persistent Stage. 

(The Persistent Stage Node stores the historical results of incremental runs, ensuring that your downstream models always have access to a complete dataset. This is particularly useful for auditability, debugging, and long-term trend analysis.)

  1. Assign the Persistent Stage to the same target schema as your incremental node (e.g., RETAIL_STG).
Persistent staging configuration in Coalesce for PSTG_ORDERS_MASTER.png
  1. Run the node to materialize historical data in Snowflake.
End-to-end Coalesce data model with staging, fact, and dimension layers.png

You've now configured a pipeline of order data updating every 5 minutes.

Cortex AI for Sentiment Analysis

Coalesce supports the integration of AI and ML directly into your transformation pipelines. For example, you can use Snowflake Cortex functions to classify text or forecast values without writing any code. Coalesce guide on Operationalizing ML with Snowflake Cortex walks you through embedding these capabilities into production pipelines.

We will be using Cortex to analyze and classify customer review sentiment in real time. You will need to add a cortex package, as you did before for dynamic tables and incremental loading from Coalesce Marketplace. 

  1. Right Click on Reviews and Click on Cortex Functions
Coalesce context menu for adding Cortex Functions to the LLM_REVIEWS node
  1. Duplicate the review_text column and add review_sentiment column, and in the cortex package on the right
  2. Toggle on the Sentiment Analysis function in the right-hand panel.
Coalesce node applying Cortex sentiment analysis to review text.png
  1. Configure the function parameters and run the transformation. The function automatically calls Snowflake Cortex’s built-in sentiment model to categorize each review as positivenegative, or neutral.
  2. Save and run the node to populate sentiment results across the table.

On Snowflake you will get the following new column with the REVIEW_SENTIMENT:

Snowflake table preview displaying product reviews with sentiment scores

This enables marketing teams to identify which promotions drive positive sentiment and higher conversions.

Beyond sentiment analysis, Coalesce’s integration with Snowflake Cortex opens the door to a range of machine learning (ML) and AI-powered analytics use cases—all operationalized directly within your transformation workflows. By leveraging Cortex functions inside Coalesce nodes, teams can deploy predictive and intelligent models at scale without leaving the data warehouse environment.

Extend with Advanced ML Use Cases with Cortex AI :

  • Sales Forecasting (daily/weekly GMV prediction) using FORECAST feature,
  • Anomaly Detection (inventory spikes or drops) using ANOMALY_DETECT feature.

By chaining together Dynamic Tables, Incremental Processing, Persistent Staging, and Cortex AI, Coalesce delivers an end-to-end framework for managing real-time, intelligent transformations. These components work together to ensure that data remains fresh, cost-efficient, and analytics-ready, allowing data teams to focus on insights rather than infrastructure.

Conclusion 

Building a modern, real-time ELT pipeline no longer requires managing complex scripts or multiple disconnected tools. By combining Estuary, Snowflake, and Coalesce, data teams can achieve a seamless flow from data ingestion to transformation and AI-driven analytics—all within a governed, scalable, and cloud-native architecture.

Estuary simplifies real-time and CDC ingestion from PostgreSQL into Snowflake, ensuring continuous and reliable data availability. Once data lands in Snowflake, Coalesce takes over to transform it into high-quality, analytics-ready models through its low-code interface, AI-assisted logic, and reusable frameworks. Its integration with Snowflake Cortex further enhances this ecosystem by embedding machine learning and AI functions—from sentiment analysis to sales forecasting, anomaly detection, and customer segmentation—directly into data pipelines.

Together, these technologies enable organizations to:

  • Streamline data workflows and reduce manual maintenance.
  • Optimize processing efficiency with incremental and dynamic table automation.
  • Leverage AI and ML through Snowflake Cortex for intelligent analytics.
  • Empower both technical and non-technical users to collaborate on a unified platform.

In essence, Estuary and Coalesce empower data teams to move beyond pipeline management and focus on what truly matters—transforming real-time data into actionable insights that drive innovation and business growth.

Build a Real-Time PostgreSQL to Snowflake Pipeline

Design a dependable right-time CDC pipeline from PostgreSQL to Snowflake and simplify transformations with Coalesce.

FAQs

    Can this pipeline handle real-time CDC at scale?

    Yes. Estuary continuously streams PostgreSQL CDC data in right time while preserving consistency and order. Snowflake Snowpipe Streaming automatically scales ingestion, and Coalesce processes only new or changed data using Dynamic Tables and incremental logic, allowing the pipeline to scale without performance degradation.
    No. This architecture eliminates the need for external orchestrators. Estuary continuously streams data based on CDC events, Snowflake handles ingestion scaling, and Coalesce manages refresh logic through Dynamic Tables and Incremental nodes. The entire pipeline operates continuously without manual scheduling.
    PostgreSQL changes typically appear in Snowflake within seconds to a few minutes. Estuary streams CDC events in near real time, Snowpipe Streaming ingests them immediately, and Coalesce refreshes downstream models automatically based on the configured lag.
    Yes. Coalesce integrates directly with Snowflake Cortex, allowing teams to apply sentiment analysis, forecasting, anomaly detection, and other AI functions directly within transformation workflows—without moving data outside Snowflake.

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Start Building For Free

About the author

Picture of Ruhee Shrestha
Ruhee Shrestha Technical Writer

Ruhee has a background in Computer Science and Economics and has worked as a Data Engineer for SaaS providing tech startups, where she has automated ETL processes using cutting-edge technologies and migrated data infrastructures to the cloud with AWS/Azure services. She is currently pursuing a Master’s in Business Analytics with a focus on Operations and AI at Worcester Polytechnic Institute.

Related Articles

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.