Estuary

How To Join Two Collections in Estuary Flow using SQL

Learn how to join two collections in Estuary Flow using SQL using this quick guide.

How To Join Two Collections in Estuary Flow using SQL
Share this article

This guide will teach you how to write and publish a SQL derivation in Estuary Flow, which will join two collections together on a common key.

Setting up your development environment

The data sources used in this tutorial are two tables in a PostgreSQL database. Check out the repository here for all the code needed and instructions on how to seed a demo environment for yourself using only Docker. 

You can also just read through the rest of the guide!

Artists table sample:

artist_id

name

genre

country

formed_year

monthly_listeners

1The MelodicsPopUnited States2010500000
2Rhythm RidersRockUnited Kingdom2005750000
3Jazzy JointsJazzFrance1998250000
Artists

 

 

 

 

 

 

Albums table sample:

album_id

artist_id

title

release_date

total_tracks

album_type

label

total_plays

11Harmonic Waves2022-05-1012StudioMelody Records1000000
21Acoustic Dreams2020-03-1510EPMelody Records750000
32Electric Fury2021-11-2014StudioRock On Ltd2000000
43Smooth Sax2019-07-058LiveJazz Co500000
Albums

 

 

 

 

 

 

 

 

 

As you can see, both tables contain a field called artist_id. This is what we're going to use as the key in our join operation. One artist can have multiple albums, but one album can only belong to one artist. There may also be some artists without any albums.

In order to transform join these tables in Flow, the first step is to start capturing them. For this, you can quickly spin up a PostgreSQL capture.

You can take a look into each via the data preview window on the Collections page to verify that the sample data has already landed in Flow.

Writing the derivation

Set up your folder structure so you can organize the resources required for the derivation. Create a working directory to follow along, and inside, create a flow.yaml file.

Inside your flow.yaml file, add the following contents:

plaintext
collections:  dani-demo/demo-derivations1/artist_total_plays:    schema:      description: A document that represents the click count for each ad platform      type: object      properties:        artist_id:          type: integer        artist_name:          type: string          default: ""          reduce: { strategy: maximize }        total_plays:          type: integer          default: 0          reduce: { strategy: sum }      required:        - artist_id      reduce:        strategy: merge    key:      - /artist_id    derive:      using:        sqlite:          migrations: []      transforms:        - name: fromAlbums          source:            name: dani-demo/demo-music/albums          shuffle:            key:              - /artist_id          lambda: |            select $artist_id, $total_plays;        - name: fromArtists          source:            name: dani-demo/demo-music/artists          shuffle:            key:              - /artist_id          lambda: |            select $artist_id, $name as artist_name;

Let's take a look at this in more detail. Essentially, we define one collection which is a derivation that is the result of two transformations.

In the schema definition, we specify what structure we want the documents of the result collection to take on. The outermost reduce strategy is set to merge, which means that documents with the same key (in this case, artist_id) will be combined.

The derivation details are defined in the derive section of the YAML:

  • We're using SQLite as the engine for our transformations.
  • There are two transformations: fromAlbums and fromArtists.
  • Each transformation shuffles data on the artist_id key.
  • The lambda sections contain the SQL queries that will be executed for each transformation.

Understanding the SQL Transformations

Image #1.png
Derivation flow

Let's break down the SQL queries in the derivation:

  1. fromAlbums transformation: select $artist_id, "" as artist_name, $total_plays;
    This query selects the artist_id and total_plays from each album. The $ syntax is used to reference fields from the source document.
  2. fromArtists transformation: select $artist_id, $name as artist_name, 0 as total_plays;
    This query selects the artist_id and name (aliased as artist_name) from each artist and adds a total_plays column initialized to 0.

The Flow runtime will then merge these results based on the artist_id, summing up the total_plays for each artist and selecting the “largest” value in each group for the artist_name, which is essentially a comparison between an empty string and an actual value.

Previewing the Derivation results

To preview the derivation results, you can use flowctl.

plaintext
flowctl preview --source flow.yaml --name dani-demo/demo-music/artist_total_play

The results will look something like this:

plaintext
{"artist_id":1034,"artist_name":"Elizabeth Johnson"} {"artist_id":1034,"total_plays":69242063} {"artist_id":1034,"total_plays":88384450} {"artist_id":1035,"artist_name":"Brittany Fernandez"} {"artist_id":1035,"total_plays":31324569} {"artist_id":1035,"total_plays":11181346} {"artist_id":1036,"artist_name":"Mary Higgins"} {"artist_id":1036,"total_plays":41909178} {"artist_id":1036,"total_plays":39340532} {"artist_id":1037,"artist_name":"Steven Flowers"} {"artist_id":1037,"total_plays":86347088} {"artist_id":1038,"artist_name":"Travis Thompson"} {"artist_id":1038,"total_plays":41095614}

As you can see, this is the not-yet-reduced version of the derived collection. This means the IDs that should be merged are still in separate records. 

If you take a look at the first three lines in the above example, you can see that they belong to the same artist_id, but are the results of the two separate transforms we defined in the derivation.

The first line {"artist_id":1034,"artist_name":"Elizabeth Johnson"} results from the artists table using the fromArtists lambda, while the other two come from the albums table using the fromAlbums lambda.

When Flow materializes this collection into a data warehouse, the expectation is to first, combine records with the same artist_id, then in the merged records, further reduce the total_plays fields using the sum strategy, as in adding the values together.

To help visualize this, the results of step 1 would look something like this internally to Flow:

plaintext
{"artist_id":1034,"artist_name":"Elizabeth Johnson","total_plays": [69242063, 88384450]}

And then, the final result would look like this:

plaintext
{"artist_id":1034,"artist_name":"Elizabeth Johnson","total_plays": 157626513}

Publishing the Derivation

To publish the derivation, use the following command:

plaintext
flowctl catalog publish --source flow.yaml

After it's successfully published, head over to the Flow dashboard to see the new collection.

Creating a Materialization

To see the results of your derivation, you'll need to create a materialization. Here's an example for Snowflake:

  1. Go to the materialization creation page in the Flow dashboard and search for Snowflake.
Image #2.png
Snowflake connector
  1. Configure the materialization with your Snowflake connection details.
Image #3.png
Snowflake connector configuration
  1. In the configuration step, select the derivation you created (in the example: dani-demo/demo-music/artist_total_plays) as the source collection.
  2. Save and publish your materialization.

This is what the results will look like:

Image #4.png
Results in Snowflake

As you can see, each artist only appears once, and the total_plays value is indeed the sum of the individual values from the respective albums.

Wrapping up

In this guide, you learned how to write a SQL derivation in Estuary Flow to join two collections and calculate aggregate data. This approach allows you to process data in real-time, ensuring that your analytics are always up-to-date.

Remember to clean up any resources you created for this tutorial if you no longer need them!

Start streaming your data for free

Build a Pipeline

About the author

Picture of Dani Pálma
Dani Pálma

Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.