Estuary

Real-Time Data Integration with Estuary Flow: From Postgres to S3 Data Lake

Learn how to move real-time data from Postgres to S3 and Google Sheets using Estuary Flow. Set up CDC, transformations, and analytics in minutes

Blog post hero image
Share this article

Operationalizing data is one of the biggest challenges organizations face these days. A significant component of that challenge is moving data between different systems in a reliable, accurate, and timely manner, a set of processes often lumped under the ETL/ELT umbrella.

In this example, we’ll show you how to use Estuary Flow’s real-time data integration capabilities to move data from your operational database (e.g., Postgres) into an S3 data lake to enable downstream analytics and BI use cases.

At the same time, we’ll also showcase how you can use Flow’s powerful data transformation engine to compute and serve always-up-to-date metrics to business users via Google Sheets.

Let’s get started!

Prerequisites

To follow the guide below, you’ll need:

  • An Estuary account. Go to the Flow web app at dashboard.estuary.dev to sign up for the free tier.
  • Docker. To ensure you can follow this tutorial, we’re providing a Docker Compose definition that simulates a stream of live data coming into an operational database.
  • An ngrok account. Since we’re creating our own database that’ll run locally on your machine, we need a way for Flow to connect to this database. Ngrok lets you expose the database to the internet.
    • While a free ngrok account is sufficient for this tutorial, you will need to add a payment method to your account to enable TCP forwarding, which is required for this tutorial.
  • An S3 bucket to write files to. See this guide for instructions on setting up a new S3 bucket.
  • An AWS root or IAM user with the s3:PutObject permission for the S3 bucket. For this user, you'll need the access key and secret access key. See the AWS blog for help finding these credentials.
  • The URL of a Google spreadsheet that does not contain the output of a prior Flow materialization.

Step 1: Set up PostgreSQL with Change Data Capture

We recommended using our change data capture (CDC) connector when pulling data from Postgres. This uses logical replication and boasts lower latency, more accurately captures updates to the source data (including deletions) and also has a smaller performance impact on your database.

To showcase using Postgres as a data source in Estuary Flow, we’ll use the Docker Compose configuration defined in this repo. This will spin up a Postgres database for you with an orders table and simulate a stream of orders to showcase Flow’s real-time data integration capabilities.

To do so:

  1. Clone the repo and run the following command from the root of the repo in your terminal:
plaintext
docker compose up
  1. This should start streaming some logs about the database and the data generation service into your terminal. The data stream will also create a new order every 5 seconds, with the logs showing each new record created, for example:
plaintext
datagen | Table: public.orders datagen | Record: {"id":38629,"user_id":"1db0a2a4-0644-4267-a485-b306c58ffd85","amount":132,"delivered":true,"created_at":"2024-10-09T20:36:03.631Z"}
  1. Leave this terminal as-is and open a new terminal session/window.

  2. In the new terminal, expose the Postgres database to the internet by running ngrok tcp 5432. This should start the ngrok tunnel and show you the following screen:

ngrok details

Take note of the the first URL in the Forwarding section, the one that begins with tcp://. You’ll need this in the next step.

  1. Test that your database was exposed to the internet correctly by running the following command in a new terminal. Replace the link after -h below with the link from the previous step. The port (what comes after -p) is the portion of the URL that follows the colon. For example, in the ngrok screenshot above, the port is 16619. Note that the password is postgres, by default.
plaintext
$ docker run --rm -it postgres psql -h 0.tcp.ap.ngrok.io -p 16619 -U postgres -d postgres Password for user postgres: psql (17.0, server 16.4 (Debian 16.4-1.pgdg120+1)) Type "help" for help. postgres=#
  1. This should open a prompt with a connection to Postgres. Next, run the following commands to see you have the orders and flow_watermarks tables, as expected, and that the orders table is being updated with new rows every 5 seconds:
plaintext
psql (16.4 (Debian 16.4-1.pgdg120+1)) Type "help" for help. postgres=# \\d              List of relations Schema |  Name | Type  |  Owner   --------+-----------------+-------+---------- public | flow_watermarks | table | postgres public | orders  | table | postgres (2 rows) postgres=# select count(*) from orders; count ------- 1026 (1 row) postgres=# -- wait 5 seconds then run the command again postgres=# select count(*) from orders; count ------- 1029 (1 row)
  1. You can see that the table is being updated in real-time, as the number of rows is increasing.

Step 2: Add the Postgres database as a source in Estuary Flow

Now that the database is up and running, we’ll add it as a source in Flow. To do so:

  1. Go to the Sources page in Flow.

  2. Click on New Capture.

  3. Select PostgresSQL from the list of possible connectors.

  4. This should open a new page asking for some configuration details for your Postgres database, as shown below:

     
    Capture configuration


  5. Give the collection a name (I used raw-orders).

  6. Fill in the necessary Endpoint Config details from Step 1 above. If you ran the Docker Compose command without changing anything in the repo, then:

    • The Server Address is the host URL that we got after running ngrok tcp 5432. Don’t forget to include the port, in the format <host>:<port>. For example, I’d use 0.tcp.ap.ngrok.io:16619 in the example above.
    • The User is flow_capture
    • The Password is secret.
    • The Database is postgres.
  7. Press Next in the top-right of the page and wait for Flow to query the database for a list of tables.

  8. Under Output Collections, you should now see the orders table. You can see the schema that Estuary Flow has inferred for this table by clicking on the COLLECTION tab.

     
    Output colletions


  9. You can read more about the Schema Evolution settings at the top in our docs. For now, we’ll leave all settings as they are.

  10. Press Next and then Save and publish on the top right of the screen.

  11. If you go to the Sources page in Estuary Flow, you should now see the new raw-orders source, as well as some statistics on the status of the connector and the amount of data Flow has read from this source.

     
    image6.png

Step 3: Set up your AWS S3 bucket as a destination

We’ll use S3 as our data lake for storing the raw data captured from Postgres. We’ll store the data as Parquet files, as it’s the de-facto standard for data lake storage and also boasts significantly better compression and performance than, say, CSVs.

To set it up,

  1. Go to the Destinations page in Flow.

  2. Click on New Materialization.

  3. Select Amazon S3 Parquet from the list of possible connectors.

  4. This should open a new page asking for some configuration details, as shown below:

     
    Materialization configuration


  5. Give the materialization a name (e.g., I’ve called it orders-sink-s3 above).

  6. Put in the relevant details for your S3 bucket and credentials under Endpoint Config.

  7. Under the Source Collections tab, click on Source from capture and select the raw-orders/source-postgres source we added in Step 2 and press Continue.

     
    Source from capture config


  8. Press Next and Save and Publish in the top right section of the page.

  9. Estuary will now start populating your S3 bucket with the data. To check that it’s working, you can run the aws s3 ls s3://your-bucket-name from the terminal. For example, here’s what I get when I run the command:
plaintext
user@desktop:~/estuary$ aws s3 ls s3://estuary-parquet-sink/                 PRE orders/
  1. As expected, there’s now a folder in S3 with the orders data.

You can now push the data from S3 into a data warehouse of your choice or run analytics over the Parquet files in S3 directly using, say, DuckDB.

Step 4: Set up a transformation for computing aggregate metrics

One of the most common analytics use cases when dealing with sales data is to compute aggregate metrics (e.g., a sum or mean) for revenue, number of orders, etc. over a period of time.

While a simple SQL query in a data warehouse can definitely get you that, re-running these queries every time your BI tool refreshes can result in wasted compute.

If you opt to compute these metrics as batch jobs, you avoid your compute budget from blowing up but the metrics are now stale, preventing downstream stakeholders and systems from making real-time decisions using this data.

That’s where Estuary transformations come to the rescue. Once you’ve added a particular data stream to Estuary as a source (e.g., the orders table above), you can create derived datasets that can transform or aggregate data from that stream into a new collection (what we call a Derivation in Estuary).

We support defining transformations using both SQL and TypeScript, allowing you the flexibility to create powerful transformations customized to your use case.

In this case, we’ll run a simple reduce operation to compute the total sales and number of orders by day, which we will then make available to our non-technical stakeholders using Google Sheets.

To create a new derivation:

  1. Go to the Collections page in Flow.

  2. Click the New Transformation button.

  3. On the new pane that opens, click the ADD button next to Collections, and select the raw-orders/orders/ collection.

     
    Create new derivation


  4. Select SQL as the language, and give the derived collection a new name — e.g., I called it daily-orders.

  5. Then press PROCEED TO GITPOD. This should open an IDE with some files, as shown below. Note that you may need to sign in to Gitpod using your GitHub account.
    image11.png


  6. Open the folder with the same name as your organization prefix (in my case, MJK_Corp, as shown above) and open the flow.yaml file in it.

  7. As seen in the screenshot above, it’s already been populated with some dummy code. Update the schema and key properties with the following values:
plaintext
schema: type: object properties:    date:      format: date      type: string    daily_sales:      type: integer      reduce:        strategy: sum    num_orders:      type: integer      reduce:        strategy: sum reduce:    strategy: merge required:    - date    - daily_sales    - num_orders key: - /date
  1. This denotes that we are expecting this collection to have three fields called date,daily_sales, and num_orders. The reduce properties denote that the daily_sales and num_ordersvalues are reduced by summing together the relevant values for each date. You can read more about the syntax for defining schemas and derivations in Estuary Flow from ours docs here.

  2. Next, change the transforms property to the following:
plaintext
transforms: - name: daily_summary    shuffle: any    source: MJK_Corp/raw-orders/orders    lambda: daily-orders.lambda.orders.sql
  1. Here, we’re giving the transformation a new name (daily_summary) and defining the source collection this transformation requires. (Note: don’t forget to replace MJK_Corp above with the name of your organization prefix.) We also point to the SQL file that contains the code that defines how the raw data from the source collection is transformed. We’ll examine that next.

  2. Open the daily-orders.lambda.orders.sql file. This has some pre-populated template code. Delete everything and replace it with the following:
plaintext
select    date($created_at) as date,    $amount as daily_sales,    1 as num_orders ;
  1. This defines the required SQL transformations for the daily summary report we want to generate. Each row in the orders table that comes in will have the created_at field converted from a timestamp to a date.

    The counter for the new num_orders field will be set to 1. When multiple rows are reduced, they will be summed together (i.e., we add all the 1’s together), giving us a count of the number of rows (i.e., orders) for that day. Similarly, we set the daily_sales field equal to the amount of the order, and when multiple orders are reduced, the output will be the sum of the amount across all orders for that day.

  2. Test that the code is all working as expected by running the following in the Gitpod terminal:
plaintext
flowctl preview --source flow.yaml
  1. If everything’s working as expected, you’ll see the derived rows printing to your terminal. For example:
plaintext
{"daily_sales":70,"date":"2023-12-30","num_orders":1}
  1. Once you’ve confirmed the result, press Ctrl + C to end the preview.
  2. Publish the derivation to Flow with the following command:
plaintext
flowctl catalog publish --source flow.yaml
  1. Type in Y when asked to confirm and wait for the Publish successful message.
  2. You can now close the Gitpod workspace.
  3. Go to the Collections page in Flow and you should now find the daily-orders collection. Open the Overview for the collection and scroll down to the Data Preview section, where you will see sample documents for each date.

     
    Preview documents

We’ve now created a new collection that updates the total sales and number of orders per day in real-time. Say goodbye to outdated metrics and clunky batch jobs in your data warehouse!

Step 5: Materialize the daily summary to Google Sheets

Now that we’ve created the derivation, let’s materialize it to Google Sheets. This way, the real-time, up-to-date summary is easily available to our less technical stakeholders to review and easily visualize.

To set it up:

  1. Create a new spreadsheet in Google Sheets and copy its URL.

  2. Go to the Destinations page in Flow.

  3. Click on New Materialization.

  4. Select Google Sheets from the list of possible connectors.

  5. Give the materialization a name (e.g., I’ve called it daily-orders-sink-sheets) and fill in the required details under the Endpoint Config section.

     
    configure google sheets materialization


    Paste the URL to the spreadsheet here. Remember to include the /edit at the end of the URL. Then, click the Sign in with Google button and give Estuary Flow permissions to access Google Sheets on your behalf.

  6. Under Source Collections, click the ADD button next to COLLECTIONS and select the daily-orders collection:

     
    Add new collection to materialization


  7. Press Next and then Save and Publish in the top right section of the page.

  8. Go to your spreadsheet and look for the new daily-orders sheet with the daily sales metrics.

How Estuary Enables Data-Driven Decision Making

To conclude, using Flow, we were able to seamlessly build an analytics pipeline moving data from our operational database into a data lake (or data warehouse) for downstream analytics and AI/ML use cases.

Specifically, Estuary enables data teams to move quickly, while offering an incredibly robust set of features, including:

  • Seamless data integration with an intuitive UI and without the need to write any code.
  • Lightning-fast data capture from Postgres with minimal configuration.
  • Real-time data syncs between sources and destinations, enabling true data-driven decision making in critical industries.
  • On-the-fly data transformations, empowering stakeholders and decision makers with real-time metrics.
  • Replicating data from the same source to multiple destinations, minimizing redundant data transfers.
  • rich library of connectors, enabling you to move data across your systems of choice.
  • performant, distributed architecture that can dynamically scale up to meet the demands of even the largest organizations.
  • Extremely competitive, predictable pricing based on the amount of data moved.

Conclusion

Copying data from a relational database into a data warehouse or data lake, especially in real-time, can be an arduous task requiring a dedicated in-house engineering team to set up a complex, distributed infrastructure with many moving parts.

Thankfully, modern tools like Estuary Flow have made this process as simple as possible, allowing your business to focus on operational concerns and freeing up your engineering team to focus on revenue-generating features, instead of toiling away at building the plumbing needed to move data from one system to another.

If you're interested in the product or want more information, start your free trial or get in touch with our sales team today.

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Start Building For Free

About the author

Muhammad is a data scientist with a passion for building robust data and AI solutions. He specializes in helping data teams of all sizes maximize developer productivity by adopting modern data tooling and software engineering best practices.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.