Estuary

Streaming Joins Are Hard

Why are streaming joins so tricky? Discover the hidden complexities of real-time data, and learn how to tame the infinite stream!

Picture of Dani Pálma
Dani Pálma
P
Phil Fried
Streaming Joins Are Hard
Share this article
Screenshot 2024-10-21 at 18.12.53.png
They are

When working with batch processing in databases, joining tables is straightforward and familiar. However, when you try to apply the same principles to real-time streaming data, the complexity increases significantly.

Streaming data isn't static like tables in databases—it's unbounded, constantly updating, and poses significant challenges in managing state.

In this article, we’ll explore why streaming joins are difficult, and provide real-world examples to illustrate how they work in practice.

Traditional Joins: A Refresher

To understand why streaming joins are complex, let’s briefly review how joins work in traditional database systems. Suppose we have two tables—authors and books—and we want to join them on the author_id field:

author_id

name

genre

country

birth_year

1J.K. RowlingFantasyUnited Kingdom1965
2George OrwellDystopianUnited Kingdom1903
3Agatha ChristieMysteryUnited Kingdom1890
Authors table

And the other one:

book_id

author_id

title

publication_date

11Harry Potter1997-06-26
2219841949-06-08
33Murder on the Orient Express1934-01-01
Books table

To join these tables on the author_id, we would write an SQL query:

plaintext
SELECT * FROM authors JOIN books ON authors.author_id = books.author_id;

The output might look like this:

author_id

name

genre

country

book_id

title

publication_date

1J.K. RowlingFantasyUnited Kingdom1Harry Potter1997-06-26
2George OrwellDystopianUnited Kingdom219841949-06-08
3Agatha ChristieMysteryUnited Kingdom3Murder on the Orient Express1934-01-01
Join results

While this is conceptually very simple, the query planner actually has a number of different execution strategies that it'll use depending on scenario. We can ignore most of the details of that, and just talk illustratively about the hash join algorithm.

The basic hash join is really quite simple. Iterate through all the records of either authors or books (usually whichever has fewer rows) and build a hash map of the id to the row value. Here's a trivial example in Python:

python
author_map = {} for author_row in authors_table: author_map[author_row['id']] = author_row for books_row in books_table: # this is an inner join, so only output a row if there's a matching a row if author_map[books_row['author_id']]:    output_joined_row(author_map[books_row['author_id']], books_row)

This is simple and effective in batch processing because the data is finite, and all rows are accessible at the time of the query.

Why doesn't this translate into streaming contexts?

The simplest explanation is that, with streaming data, all data sets are implicitly unbounded.

Let's unpack that.

Unlike a database, which is framed in terms of "tables", "rows", and "queries", streaming systems have "streams", "events", and "consumers" (in Flow we call them Collections, Documents, and Tasks). A "table" is a store containing a finite number of "rows", which is why you can easily iterate them in a query. But a "stream" is an infinite sequence of "events", where each event represents a modification of one or more entities. And instead of short-lived ad-hoc "queries", you process streaming data with "consumers" which are long-lived processes that observe each event.

In other words, you can't "just" iterate all the rows in authors_table because you don't know what they are.

Now you may be thinking, well, wait a minute, can't we just build our author_map inside a consumer process? And couldn't we just do a lookup in that author_map whenever we observe a new books event?

And you'd of course be right. You can do that, as long as you have enough memory to store every single author you've ever seen. In many cases, this can be a perfectly reasonable approach to joining data in streaming systems, Flow included.

But recall that steams are conceptually infinite. You don't know how many authors you might observe in the lifetime of a given stream and consumer, and it may not be possible to store them all. Consider if we instead tried to join user_sessions to audit_logs. There is no upper bound on the number of user sessions or audit logs. If our app has many users over the course of a long period, it will quickly become impractical (or impossible) to store data on every single user session in a hash map.

So this is the essence of what makes "streaming joins" difficult, and it's really just the same thing that makes everything difficult in computers: state. In the context of streaming joins, the most important thing is to ensure that your state (e.g. author_map) doesn't exhibit unbounded growth.

But fear not, because even in cases where there's an infinite number of distinct entities, there are ways to prevent the state from growing too large. You can also use an external system for storing state, which allows it to grow well beyond what fits in the memory of your consumer.

We'll discuss a number of different approaches, and help you figure out which ones are most appropriate for different use cases.

So, to summarize the challenges:

  1. Unbounded Data: Unlike batch tables, streams are infinite. You can't "just wait" for all the rows to arrive before performing a join.
  2. State Management: You need to store the intermediate state (i.e., the partial join results) while waiting for more data to arrive.
  3. Memory Constraints: Streams can generate an unbounded number of events, and you can't store everything in memory.
  4. Latency: Joins in a streaming context need to be performed in real-time, and you must consider how long to wait for matching events from the other stream.
  5. Out-of-order Data: Events in streams might arrive late or out of sequence, complicating the joining logic.

Let’s take a look at how we can tackle these in a streaming environment.

Joining Two Datasets in Streaming Context

Let’s take a practical example using the artists and albums tables from your original scenario:

Artists Table Stream:

artist_id

name

genre

country

formed_year

monthly_listeners

1The MelodicsPopUSA2010500000
2Rhythm RidersRockUK2005750000
Artists

Albums Table Stream:

album_id

artist_id

title

release_date

total_tracks

album_type

label

total_plays

11Harmonic Waves2022-05-1012StudioMelody Records1000000
21Acoustic Dreams2020-03-1510EPMelody Records750000
Albums

Now, let’s say you want to join the artist data with album data in real-time. Here's the SQL query you would use in theory:

python
SELECT artists.artist_id, artists.name, albums.title, albums.total_plays FROM artists JOIN albums ON artists.artist_id = albums.artist_id;

In a traditional database, you would expect an immediate result, like this:

artist_id

name

title

total_plays

1The MelodicsHarmonic Waves1000000
1The MelodicsAcoustic Dreams750000
Join results

However, in a streaming scenario, the results don't materialize all at once. Instead, you would see events trickle in over time as albums are released or new artist data arrives.

Handling Out-of-Order Data

Sometimes, an album might arrive before the corresponding artist information, or vice versa. In streaming, this is called out-of-order data, and it's essential to manage late arrivals effectively. One common technique is to use windowing strategies, where you define a time window (e.g., 1 hour) to wait for matching data before considering the data incomplete.

For example, you might wait up to 1 hour for an artist event after receiving an album event. If no matching artist event is received within that time, the album data might be considered "incomplete" or handled separately.

In this scenario, you would need to store intermediate data. When an album event arrives, but the corresponding artist event hasn’t, you need to temporarily store the album data until the matching artist data arrives. This introduces state management complexities, as streams can grow indefinitely.

For example:

  • Albums stream receives"Harmonic Waves" with artist_id=1
  • Artists stream hasn’t received the artist information for artist_id=1 yet

At this point, we store the album event in memory and wait for the corresponding artist event. Once both events are present, we can output the join result:

python
{"artist_id":1"name":"The Melodics""title":"Harmonic Waves""total_plays":1000000}

A few more techniques for efficient streaming joins:

  1. State Limiting: Avoid unbounded memory usage by limiting how much intermediate state (e.g., unmatched albums) you store. Once a threshold is reached, decide how to handle excess data.
  2. Materialized Joins: Offload the results of streaming joins into materialized views or tables in a downstream system, allowing you to handle larger datasets than can fit in memory.
  3. Bloom Filters: Use probabilistic data structures like bloom filters to efficiently test whether a matching key exists in a dataset, reducing the need for full in-memory storage.

Visualizing Streaming Joins

Let’s look at an intermediate result after processing several streaming events. For instance:

Before Join (Separate Streams):

python
{"artist_id":1"name":"The Melodics""monthly_listeners":500000} {"album_id":1"artist_id":1"title":"Harmonic Waves""total_plays":1000000}

After Join (Combined):

python
{"artist_id":1"name":"The Melodics""title":"Harmonic Waves""total_plays":1000000"monthly_listeners":500000}

In real-time, this process happens continuously as new artist and album data flows in, with the system managing state to ensure that when an album event is received, it either immediately joins with existing artist data or waits for that data to arrive.

Estuary Flow’s Approaches To Streaming Joins

With all this theory out of the way, let’s take a look at how Flow thinks about streaming joins.

Let’s say you want to implement the following join in Estuary Flow:

python
select * from authors left outer join books on authors.id = books.author_id

Because of how Flow’s continuous map-reduce architecture for streaming transformations, a join transformation can be broken down into a map and a reduce phase as well.

In the first, map phase, Flow selects the fields you wish to be present in the final collection.

In the second phase, during reduction, Flow will utilize the target system’s merge capabilities to condense all documents that belong together, using the target as state.

Screenshot 2024-10-21 at 18.17.51.png
Streaming MapReduce

Note: must use equality conditions (e.g. where a.thing = b.other_thing). Range predicates like where a.thing > b.other_thing won't work.

If you’re interested in getting started with Derivations in Flow: check out this article for a step-by-step guide.

What about inner joins?

Fair question. Keep in mind that it's typically pretty trivial to turn an outer join into an inner join, just by filtering out rows where either side of the join is null. The state is all offloaded into the destination table, which allows it to be much bigger than would fit in the memory of the consumer.

Storing state internally

SQL derivations can store state in their own SQLite tables, which gives a lot more flexibility in the types of joins that this approach can accommodate. But in order to do this, it must be practical to store all the required state in the derivation's SQLite database.

:::note

A derivation is a collection that is constructed by applying transformations to one or more sourced collections. Derivations operate continuously, keeping up with updates to the source collections as they happen.
:::

We don't define, much less enforce, any particular limit on how much data you can put into a SQLite database. But of course, there's such a thing as too much. This is the primary limitation on what you can and can't do in a streaming join. We can define an arbitrary limit on how much data a SQLite database can store, but then we'd still need a way to estimate the amount of space required for various joins.

A simplifying assumption for the purposes of this estimation is to assume that we must store a complete materialization of all collection data used in the join. For example, in order to approximate the required amount of space to accomplish something like select * from a join b on a.id = b.a_id join c on a.id = c.a_id, we could assume that the SQLite database would need to store a row per distinct key per table.

You "materialize" your data into the derivation SQLite database, and then you can run whatever join queries you want to, albeit with huge caveats about handling deletions and reductions.

But of course, a derivation is not the same as a materialization. There's no way to use reduce annotations when adding data to a derivation SQLite database. So if you wanted to do any sort of rollup of one of the source collections, you'd need to implement that manually using SQLite lambdas. That includes handling deletion events by both deleting from the derivation SQLite table and emitting some sort of deletion or update event for the joined row. I don't think there are any generic instructions on how to do that. It depends on the desired semantics of the join query and the data model of the source and derived collections.

Windowing is one of the main strategies that's used to bound the amount of state in a streaming join. This is a whole topic of its own, and there are lots of important details and different ways to do it. Suffice to say for now that if we want users to do windowing, then we'd probably want to have a windowing guide of some sort that can explain how to do it.

So to summarize, keeping state in a SQL derivation allows for more types of joins, but we'd probably need to break this approach down further into some guides that make some simplifying assumptions about the data and the type of join queries you can do.

Wrapping up

In summary, streaming joins are difficult because they deal with unbounded data, require efficient state management, and must operate under strict memory and latency constraints. Fortunately, with the right strategies—like windowing, state limiting, and leveraging external storage systems—many of these challenges can be mitigated.

Whether you’re dealing with high-volume streams or complex multi-way joins, the right approach and platform make all the difference.

If you are interested in Estuary Flow, sign up for a free trial here.

Join our community Slack channel: https://go.estuary.dev/slack

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Build a Pipeline

Start streaming your data for free

Build a Pipeline

About the authors

Picture of Dani Pálma
Dani Pálma

Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.

P
Phil Fried

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.