Estuary

How to Join Two Collections with a SQL Derivation in Estuary Flow

Learn to write and publish a SQL derivation in Estuary Flow that joins two collections on artist_id, previews results with flowctl, and materializes to Snowflake.

How To Join Two Collections in Estuary Flow using SQL
Share this article

Key Takeaways

  • You will join two Flow collections with a SQL derivation keyed by artist_id.
  • The derivation emits artist_idartist_name, and total_plays, then reduces by key.
  • You can preview locally with flowctl, publish to Flow, and materialize to Snowflake.
  • Works well for real time use cases where new albums or artists arrive continuously.

Introduction

When working with streaming data, it’s common to need joins and aggregations across multiple sources. For example, you might want to connect artist details with their album performance to get real-time insights. Traditionally, this requires batch jobs or complex SQL inside a data warehouse.

In this tutorial, you’ll learn how to use Estuary Flow SQL derivations to join two collections (artists and albums) on a common key, artist_id. The derivation will calculate each artist’s total plays and ensure results stay continuously up to date. By the end, you’ll be able to preview results locally, publish the derivation to Flow, and materialize the output directly into Snowflake for analytics.

Setting up your development environment

The data sources used in this tutorial are two tables in a PostgreSQL database. Check out the repository here for all the code needed and instructions on how to seed a demo environment for yourself using only Docker. 

You can also just read through the rest of the guide!

Understanding the Sample Artists and Albums Tables

Artists table sample:

artist_id

name

genre

country

formed_year

monthly_listeners

1The MelodicsPopUnited States2010500000
2Rhythm RidersRockUnited Kingdom2005750000
3Jazzy JointsJazzFrance1998250000
Artists

Albums table sample:

album_id

artist_id

title

release_date

total_tracks

album_type

label

total_plays

11Harmonic Waves2022-05-1012StudioMelody Records1000000
21Acoustic Dreams2020-03-1510EPMelody Records750000
32Electric Fury2021-11-2014StudioRock On Ltd2000000
43Smooth Sax2019-07-058LiveJazz Co500000
Albums

As you can see, both tables contain a field called artist_id. This is what we're going to use as the key in our join operation. One artist can have multiple albums, but one album can only belong to one artist. There may also be some artists without any albums.

In order to transform join these tables in Flow, the first step is to start capturing them. For this, you can quickly spin up a PostgreSQL capture.

You can take a look into each via the data preview window on the Collections page to verify that the sample data has already landed in Flow.

Writing a SQL Derivation in flow.yaml

Set up your folder structure so you can organize the resources required for the derivation. Create a working directory to follow along, and inside, create a flow.yaml file.

Inside your flow.yaml file, add the following contents:

plaintext
collections:  dani-demo/demo-derivations1/artist_total_plays:    schema:      description: A document that represents the click count for each ad platform      type: object      properties:        artist_id:          type: integer        artist_name:          type: string          default: ""          reduce: { strategy: maximize }        total_plays:          type: integer          default: 0          reduce: { strategy: sum }      required:        - artist_id      reduce:        strategy: merge    key:      - /artist_id    derive:      using:        sqlite:          migrations: []      transforms:        - name: fromAlbums          source:            name: dani-demo/demo-music/albums          shuffle:            key:              - /artist_id          lambda: |            select $artist_id, $total_plays;        - name: fromArtists          source:            name: dani-demo/demo-music/artists          shuffle:            key:              - /artist_id          lambda: |            select $artist_id, $name as artist_name;

Let's take a look at this in more detail. Essentially, we define one collection which is a derivation that is the result of two transformations.

In the schema definition, we specify what structure we want the documents of the result collection to take on. The outermost reduce strategy is set to merge, which means that documents with the same key (in this case, artist_id) will be combined.

The derivation details are defined in the derive section of the YAML:

  • We're using SQLite as the engine for our transformations.
  • There are two transformations: fromAlbums and fromArtists.
  • Each transformation shuffles data on the artist_id key.
  • The lambda sections contain the SQL queries that will be executed for each transformation.

Understanding the SQL Transformations

Diagram of joining artists and albums by artist_id and materializing total plays in Snowflake.
Derivation flow

Let's break down the SQL queries in the derivation:

  1. fromAlbums transformation: select $artist_id, "" as artist_name, $total_plays;
    This query selects the artist_id and total_plays from each album. The $ syntax is used to reference fields from the source document.
  2. fromArtists transformation: select $artist_id, $name as artist_name, 0 as total_plays;
    This query selects the artist_id and name (aliased as artist_name) from each artist and adds a total_plays column initialized to 0.

The Flow runtime will then merge these results based on the artist_id, summing up the total_plays for each artist and selecting the “largest” value in each group for the artist_name, which is essentially a comparison between an empty string and an actual value.

Previewing the Derivation results

To preview the derivation results, you can use flowctl.

plaintext
flowctl preview --source flow.yaml --name dani-demo/demo-music/artist_total_play

The results will look something like this:

plaintext
{"artist_id":1034,"artist_name":"Elizabeth Johnson"} {"artist_id":1034,"total_plays":69242063} {"artist_id":1034,"total_plays":88384450} {"artist_id":1035,"artist_name":"Brittany Fernandez"} {"artist_id":1035,"total_plays":31324569} {"artist_id":1035,"total_plays":11181346} {"artist_id":1036,"artist_name":"Mary Higgins"} {"artist_id":1036,"total_plays":41909178} {"artist_id":1036,"total_plays":39340532} {"artist_id":1037,"artist_name":"Steven Flowers"} {"artist_id":1037,"total_plays":86347088} {"artist_id":1038,"artist_name":"Travis Thompson"} {"artist_id":1038,"total_plays":41095614}

As you can see, this is the not-yet-reduced version of the derived collection. This means the IDs that should be merged are still in separate records. 

If you take a look at the first three lines in the above example, you can see that they belong to the same artist_id, but are the results of the two separate transforms we defined in the derivation.

The first line {"artist_id":1034,"artist_name":"Elizabeth Johnson"} results from the artists table using the fromArtists lambda, while the other two come from the albums table using the fromAlbums lambda.

When Flow materializes this collection into a data warehouse, the expectation is to first, combine records with the same artist_id, then in the merged records, further reduce the total_plays fields using the sum strategy, as in adding the values together.

To help visualize this, the results of step 1 would look something like this internally to Flow:

plaintext
{"artist_id":1034,"artist_name":"Elizabeth Johnson","total_plays": [69242063, 88384450]}

And then, the final result would look like this:

plaintext
{"artist_id":1034,"artist_name":"Elizabeth Johnson","total_plays": 157626513}

Publishing the Derivation

To publish the derivation, use the following command:

plaintext
flowctl catalog publish --source flow.yaml

After it's successfully published, head over to the Flow dashboard to see the new collection.

Creating a Materialization

To see the results of your derivation, you'll need to create a materialization. Here's an example for Snowflake:

  • Go to the materialization creation page in the Flow dashboard and search for Snowflake.
Snowflake Materialization
Snowflake connector
  • Configure the materialization with your Snowflake connection details.
Configure Snowflake Materialization
Snowflake connector configuration
  • In the configuration step, select the derivation you created (in the example: dani-demo/demo-music/artist_total_plays) as the source collection.
  • Save and publish your materialization.

This is what the results will look like:

Image #4.png
Results in Snowflake

As you can see, each artist only appears once, and the total_plays value is indeed the sum of the individual values from the respective albums.

Wrapping up

In this tutorial, you learned how to use SQL derivations in Estuary Flow to join two collections on a shared key (artist_id) and calculate aggregated results in real time. By capturing data from PostgreSQL, defining a derivation in flow.yaml, and using SQL transformations, you were able to merge artists and albums into a single, continuously updated collection.

With just a few steps, you can preview results locally using flowctl, publish the derivation to Flow, and materialize the output directly into Snowflake. This approach removes the need for complex batch jobs or warehouse-only transformations, ensuring your analytics stay fresh, accurate, and ready for downstream use cases.

Ready to build more real-time joins and transformations?

👉 Get started with Estuary Flow for free and start streaming your data into Snowflake, BigQuery, Databricks, or any other destination today.

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Start Building For Free

About the author

Picture of Dani Pálma
Dani PálmaHead of Data & Marketing

Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.

Related Articles

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.