Estuary

Streaming SQL Full Guide: How to Transform Real-Time Data

Transforming real-time data has always been hard, but streaming SQL tools make it easy. Get up and running with 3 practical tutorials.

streaming sql - SQL History Timeline
Share this article

Processing real-time data in motion has historically been challenging and expensive. The perceived difficulty put it out of the question for many, despite numerous use cases and benefits. 

In this article, we will dive deep into a new form of SQL that makes processing data in motion much easier, as well as a SaaS solution that makes streaming data processing much cheaper.

We will provide 3 tutorials that show you how to process data in motion with SQL and Estuary Flow, which is a low-code ETL platform that streamlines the creation and maintenance of data pipelines, ingesting and transforming data in real time from sources to destinations. 

But first, let's talk about why the current status quo of SQL is limited. 

A Brief History of SQL

streaming sql - SQL History Timeline

SQL was developed over half a century ago by IBM. It has since then become the industry-standard language for database creation and manipulation. 

What has changed since then?

SQL was developed at a time when data was scarce, and expensive to store, analyze, and use. Real-time data was barely available if at all. Historically, SQL has been primarily used to query static data - data at rest

Although fast access to insights has always been a north star for analytics and SQL, in the past this was almost impossible.

But, this has changed.

Over the years, the Big Data era has changed how businesses interact with data. In 2013, the global market for Big Data reached $10 billion. In 2016, the world has experienced unprecedented data growth. 90% of the world’s data at that time was created in the previous 2 years alone. With this amount of data becoming available to the world, it’s no longer sufficient to query static data only. 

The need is clearly there. Now, we just need a solution.

In 2009, LinkedIn invented Apache Kafka, which made stream processing possible. It was a big step towards bringing stream processing to the mainstream. However, to manage the infrastructure and tooling, significant development resources were needed. 

In the following years, several SaaS platforms such as Estuary began to offer managed solutions, dramatically simplifying implementation complexity and ease of use, making real-time streaming data more accessible, and making the handling of it more approachable.1

In the last few years, some of these SaaS platforms have added the capability to apply plain SQL to data in motion, not just data at restThis is what we call Streaming SQL

Streaming SQL has upped the game of what data-driven organizations are able to do with the vast amount of data available in order to gain a competitive edge. 

For example, instead of analyzing month-to-date app usage as of yesterday, the rise of Streaming SQL means analyzing month-to-date usage as of seconds ago.

Allied Market Research estimated that the data analytics market will grow to $420.98 billion by 2027.2 Those who will make the best out of this big opportunity will be the ones leveraging powerful tools such as Streaming SQL. 

What is Streaming SQL?

As already mentioned, in recent years the enormous growth in data volumes and the increasing need for real-time data analysis have made SQL a crucial component of data management and business analytics. 

However, traditional SQL solutions that operate on stored data in databases cannot effectively support real-time stream processing requirements. 

As a result, there is a need for a new type of SQL that can process continuous data streams. This is where Streaming SQL comes in.

Before we dive deeper, let us examine what exactly we mean by “streaming” in this context.

"Streaming" refers to the handling of data as a continuous flow of events or messages through message brokers such as KafkaGazetteAmazon Kinesis, or Pulsar

streaming sql - Brokers Producer Consumer

These event streams can include various types of data, from user actions on websites or mobile apps, IoT sensor data, and server metrics to traditional database activities captured using change data capture (CDC).

Traditional SQL runs on databases while Streaming SQL runs on streams of live data. Running SQL on databases returns a static set of results from a single point in time

On the other hand, with Streaming SQL you could run the exact same SQL query on a stream of real-time data and get a point-in-time answer. 

In short, Streaming SQL is designed to process subsets of data quickly and deliver results in real time. 

Streaming SQL can transform, filter, aggregate, and enrich data in flight, making it a powerful tool for organizations to extract maximum value from constantly streaming data. 

Also, Streaming SQL can work with a wide range of data sources and environments, from public cloud platforms to Kafka, Hadoop, NoSQL, and relational databases.

Streaming SQL has become an essential part of effective real-time stream processing solutions. It enables data-driven organizations to analyze and act on streaming data and to quickly identify the value in new data. With Streaming SQL, organizations can quickly adopt a modern data architecture and create streaming data pipelines to support their business needs.

Streaming SQL vs. Traditional SQL

Streaming SQL can be understood as a variant of traditional SQL where Streaming SQL is specifically designed to process data streams in real time. 

There are two major differences between traditional SQL and Streaming SQL. 

  1. Static vs Continuous Stream: As mentioned, the primary difference is that traditional SQL operates on static data stored in databases, whereas Streaming SQL works with continuously flowing data streams as they are generated, potentially by multiple sources. This continuous nature is what makes them valuable compared to traditional SQL solutions. 
streaming sql - Traditional vs Streaming SQL

Also, Streaming SQL solutions use windows and event tables to trigger actions when data changes. You can write SQL queries for streaming data without writing code.

In a nutshell, Streaming SQL is best for processing data streams that are constantly changing, while traditional SQL is best for querying and analyzing data that is already stored in a database.

  1. Sliding Window Approach: Streaming SQL uses a sliding window approach, which involves breaking the data stream into small, discrete segments or windows and processing each window separately. This approach enables Streaming SQL to handle data streams that are too large to be processed as a single entity.

The Need for Streaming SQL

While several CDC tools allow you to ingest data from a source to a target, few offer SQL transformation capabilities. Often, replicating data as-is from one place to another may not be sufficient for your needs. 

For example, you may want to do some filtering, apply certain calculations to your source data, or aggregate data from multiple documents before the data arrive at the destination. 

Other common use cases include merging across several collections using a common key and applying business logic to the source data. 

Using derivations in Flow, you can perform a variety of transformations, from a simple remapping use case to a highly complex stateful transaction processing. 

A derivation is a data collection that is derived from applying transformations to one or more source collections. Derivations work continuously, ensuring they stay in sync with any updates made to the source collections in real time. 

Flow enables you to write derivations using either SQLite or TypeScript in three simple steps.

The Flow Derivations doc here walks through a tutorial that illustrates a complex stateful transaction processing use case. However, often you may just need to apply a simple transformation to your data, which is what this tutorial aims to show.

Four Common Use Cases for Streaming SQL

SQL transformation can be extremely useful during data replication as it allows data to be transformed and modified as it is being replicated from one source to another. 

The following are a few ways in which SQL transformation can be used during data replication.

streaming sql - Use Cases
  1. Data cleansing: Data replication often involves moving data from one system to another, which can result in data quality issues. You can use SQL transformation to clean and standardize data during replication, ensuring that the data is consistent across systems.
  2. Data mapping: When replicating data between two systems, it is often necessary to map fields from one system to another. You can use SQL transformation to map fields and transform data types during replication, ensuring that the data is properly mapped and formatted.
  3. Data filtering: During replication, it may be necessary to filter out certain records or data elements based on specific criteria. You can use SQL transformation to filter out data during replication, ensuring that only the relevant data is replicated to the target system.
  4. Data aggregation: During replication, it may be necessary to aggregate data from multiple sources or to create summary data for reporting purposes. You can use SQL transformation to aggregate and summarize data during replication, making it easier to analyze and report on.

Overall, SQL transformation is useful during data replication because it allows you to transform, clean, and format data as it is being moved from one system to another. This helps ensure that the data is consistent and accurate across systems, and that it is properly mapped and formatted for its intended use.

Using flowctl

Each of our following transformation examples use Estuary's flowctl CLI to transform data collections.

If you have Homebrew, you can install flowctl simply with:

plaintext
brew tap estuary/flowctl brew install flowctl

You can see our docs for additional installation options and more information on the tool.

To authenticate your session, run flowctl auth login

streaming sql - Auth Token

You can get your access token for flowctl from the UI. Navigate to the Admin page, CLI-API tab:

streaming sql - Get Access Token from UI

Paste your access token into the Terminal.

With flowctl installed and authenticated, you're ready to transform your data.

Tutorial Example 1: Stateless Transform

In this tutorial, we will walk through a few SQL transformation use cases, showing you how and where to put your SQL queries to transform your data collection using Estuary Flow.

Scenario

Suppose you have a table of employees, with their names, address, and region they are in:

streaming sql - Employees Table Original

And you want to filter by region, and derive a target collection where only employees in the US are included

streaming sql - Employees Table Red Box

Suppose this is the end result you want in your destination:

 streaming sql - End Result Table

Prerequisites

Before following the steps below, you should already have a collection you want to apply the transformation to. You will also need an environment with Estuary's flowctl CLI.

  1. Ensure you have flowctl installed and authenticated.
  2. Create an employees table in any database of your choice.
  3. Create a Capture in Estuary Flow. If you’re new to Flow, you can get started with a free account.
  4. Name the Capture: employees

Tutorial Steps

A transformation in Estuary requires creating a new collection which will be derived from an existing collection. To do so, we'll describe a new collection specification in a flow.yaml file and submit that to Estuary.

If you'd like a local copy of your source collection for reference, you can run:

plaintext
flowctl catalog pull-specs --name your/full/collection/name

For the full collection name, we'll be using a source collection called JennyPoc/sqlserversource/employees throughout our example.

This command creates a new file structure with folders for each /-delimited section of the collection name.

You can add your derived collection schema directly to the innermost nested flow.yaml file or work in a new flow.yaml file. Follow the tutorial steps to format your specification.

Step 1: Basic YAML structure

Whether working off of an existing file or a newly-created one, open up your flow.yaml file.

Under a collections stanza, add a basic specification to describe your desired derived collection:

plaintext
collections: JennyPoc/sqldemo: schema: properties: your_key: type: string required: - your_key type: object key: - /your_key derive: using: sqlite: {} transforms: - name: employees source: JennyPoc/sqlserversource/employees lambda: sqldemo.lambda.employees.sql
streaming sql - initial derivation spec

We'll be modifying this spec further soon, but first take a moment to note several interesting aspects:

  • Derived collection name:JennyPoc/sqldemo will be the name of the resulting collection
  • Transformation language: our transformation will use SQL rather than TypeScript
  • Source collection: we provide JennyPoc/sqlserversource/employees as our source collection for our transformation

Step 2: Modify derivation specification

Once you're comfortable with the basic specification structure, make the following updates:

  1. Update the schema specs to include the fields you want to see in your derived collection.

    Note: All Flow collections have a JSON schema, which defines their structure, representation, and constraints. You can learn more about collection schemas here.

You can either manually update the JSON schema or use --infer-schema to get your schema specs automatically populated. See below for further details on how to run the command.

  1. Designate a key (For our example, we will key our collection on EmployeeID.)
  2. While we can reference a separate SQL file for more involved transformations, we're using a fairly simple one that can be written in-line. Replace the lambda .sql filename with SQL select statement that filters on Region:
plaintext
Select $EmployeeID, $LastName, $FirstName, $Address, $Region where $Region = ‘US’;
streaming sql - Update Lambda in flow yaml

Step 3: Preview and publish your transformation

To see a preview, run the following flowctl command in a Terminal:

plaintext
$ flowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'
streaming sql - flowctl preview

To use schema inference mentioned above, run the following flowctl command with the --infer-schema flag:

plaintext
$ flowctl preview --source flow.yaml --infer-schema --interval 200ms | jq -c 'del(._meta)'

To stop the preview, type Ctrl Z or Ctrl C:

streaming sql - Stop the preview

Once you've confirmed your output results are as expected, you can publish your derivation to Estuary:

plaintext
flowctl catalog publish --source flow.yaml

If we then publish a materialization on this data, the resulting table looks like this:

streaming sql -  Materialization table

This tutorial video walks through the above steps live.

In our next example, we will show how to use migrations and lambda, as well as how to publish the materialization.

Tutorial Example 2: Stateful Transform

Scenario: 

For our next two examples, we will create a transformation off of our Wikipedia live demo, which captures change events on Wikipedia edits from the Wikipedia API and then ingests the collection to a Google Sheet in real time.

The original materialized collection includes the following fields:

  • The date of the Wikipedia edit
  • Whether the edit was made by a bot or a human
  • Total edits
  • Total new lines

For Example 2, we will apply a simple SQL transformation to retain the first and last time we saw a user, and their lifetime edits.

For Example 3, we will apply another SQL transformation on the same dataset to show aggregations.

Prerequisites: 

Again, as a prerequisite before you add a new transformation, you should already have a collection you want to apply the transformation to. 

For the next two examples in this tutorial, we will apply the transformation to this collection in the demo prefix: demo/wikipedia/recentchange-sampled

streaming sql - demo wikipedia recentchange sampled

This collection is a 3% sample of the enormous demo/wikipedia/recentchange collection which contains millions of documents. Since the purpose of this tutorial is to demonstrate a proof of concept, we avoid publishing a derivation that processes hundreds of gigabytes of data. 

Tutorial Steps

As in the stateless transformation example, we'll create our derivation by describing a new collection in a flow.yaml file. You can create a new file or add your derived collection to an existing file.

To pull a local copy of the source collection spec for reference, you can run:

plaintext
flowctl catalog pull-specs --name demo/wikipedia/recentchange-sampled

The collection specification will be in the resulting demo/wikipedia/recentchange-sampled directory created in your working directory, in the deepest-nested flow.yaml file.

Step 1: Basic YAML structure

Whether working off of an existing file or a newly-created one, open up your flow.yaml file.

Under a collections stanza, add a basic specification to describe your desired derived collection:

plaintext
collections: Estuary-New/sqltransformdemo: schema: type: object properties: user: type: string required: - [user] key: - [/user] derive: using: sqlite: migrations: - sqltransformdemo.migration.0.sql transforms: - name: recentchange source: demo/wikipedia/recentchange-sampled lambda: sqltransformdemo.lambda.recentchange.sql
streaming sql - Flow yaml

There are a few components in the flow.yaml. Since a derivation is a collection, it has a schema. The top part of flow.yaml is a JSON schema of the collection, including its properties and key.

You need to update your schema specs and key accordingly. This user guide is also a good reference.

Because this is a SQLite derivation, it can have one or more migrations that it can use, as well as one or more transforms. The migrations and transforms are listed under the derive section of the flow.yaml file.

What a transform does is: Every time a document is available from the source collection, it reads from the collection of data, and evaluates the lambda function, which is a block of SQL statements in the case of a SQLite transform.

Depending on the complexity of your SQL statements, you can either put them directly in flow.yaml under the derive section, or use the migrations and lambda .sql files to hold your SQL, in which case the flow.yaml holds the filenames of the migrations and lambda .sql files.

Step 2: Create SQL files

This transformation example is not written fully in-line like the previous one. We reference separate .sql files in the specification that don't exist yet.

Instead of creating associated files from scratch, we can generate stub files using flowctl:

plaintext
flowctl generate --source flow.yaml

This should create both the sqltransformdemo.migration.0.sql migration file as well as the sqltransformdemo.lambda.recentchange.sql transformation file.

Open up the migration file first.

Step 3: Update migration

The migrations .sql file contains the new table that your derivation will use. 

You can use migrations to create or alter tables. Each migration is run only once, and new migrations will be applied as needed. 

Note that not all derivations require creating a new table, such as Example 1 and 3. 

By default, the <derivation name>.migration.0.sql is populated with the following example:

streaming sql - migration sql

For our tutorial, we will create a new table users as follows:

plaintext
create table users (   user text primary key not null,   first_seen text not null,   last_seen text not null,   total_edits integer not null,   total_lines integer not null );

Step 4: Write transformations

Next, the lambda file is where you put your SQL transformation statements. By default, it is populated with the following example, showing you examples of fields you can leverage:

streaming sql - lambda recentchange sql

For this tutorial, we will use the following SQL statements to track usernames, first seen and last seen timestamps, total edits, and total new lines

Put the following in your SQL lambda file:

plaintext
insert into users (user, first_seen, last_seen, total_edits, total_lines) select $user, $meta$dt, $meta$dt, 1, coalesce($length$new - $length$old, 0) where $type = 'edit' and $meta$dt on conflict do update set    last_seen = $meta$dt,    total_edits = total_edits + 1,    total_lines = total_lines + coalesce($length$new - $length$old, 0); select    user,    first_seen,    last_seen,    total_edits,    total_lines    from users    where $type = 'edit' and user = $user;

Let’s dissect these SQL statements a bit. 

You may be wondering why we are setting both first_seen and last_seen to $meta$dt - where are we minimizing or maximizing those values?

We are maximizing the last_seen value through this upsert statement in the lambda:

plaintext
on conflict do update set    last_seen = $meta$dt,    total_edits = total_edits + 1,    total_lines = total_lines + coalesce($length$new - $length$old, 0)

The upsert makes last_seen to be the latest $meta$dt when the user makes an edit.

The upsert is also aggregating total_edits and total_lines. It doesn’t require a schema annotation as it is accumulated in the database.

Lastly, the reason why we are inserting data into tables and then selecting them out is because we want to track states for this use case. This is only necessary for a stateful transform. In order to keep track of when we first and last saw a user across events, we need to store this info somewhere.

Step 5: Preview and publish

To run a preview, run the following flowctl command in a Terminal:

plaintext
flowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'

When the flowctl preview command runs, you should see the live updates from the derivation as follows:

streaming sql - flowctl preview

To stop the preview, type Ctrl Z or Ctrl C:

streaming sql - stop preview

To publish the derived collection, run the flowctl catalog publish command as follows. See this doc for a complete flowctl usage guide. 

streaming sql - catalog publish

You should now see your published collection in the Flow UI:

streaming sql - collection specs

Note that SQL statements are applied on a go-forward basis only, so you will see updates reflected in your destination whenever new source documents arrive at your source.

Tutorial Example 3: Aggregations

In this example, we will use the same dataset from our Wikipedia demo collection to show aggregations. 

We will look at the total number of lines per edit and include the date of the edit and whether the edit was made by a bot or a human. 

To do that, first update the flow.yaml file with the following schema specs and remove <derivation name>.migrations.0.sql from flow.yaml, as we will not be creating a new table for this derivation. 

The properties in our schema include:

  1. date
  2. bot (boolean field indicating whether the edit was made by a bot or a human)
  3. total_lines
  4. total_edits

We are keying our collection on the date field this time.

streaming sql - example 3 flow yaml

Notice we are adding a merge annotation to our schema specs:

streaming sql - reduce strategy

When multiple documents get added to collections that share a common key, Flow opportunistically merges all such documents into a single representative document for that key through a strategy called reduction

The above reduce annotation provides Flow with the specific reduction strategy to use at your document locations. You can learn more about reductions here.

Next, populate the lambda file with the following SQL statement:

plaintext
select    date($meta$dt) as date,    $bot,    coalesce($length$new - $length$old, 0) as total_lines,    1 as total_edits    where $type = 'edit';

The coalesce extracts the delta change of the particular source document.

Now run the preview to take a look at the output:

plaintext
flowctl preview --source flow.yaml --interval 200ms | jq -c 'del(._meta)'
streaming sql - flowctl preview

As before, to publish the derived collection, run the flowctl catalog publish command. 

See this doc for a complete flowctl usage guide. 

See this tutorial video for a deeper dive and another example on how to apply SQL transformations to Wikipedia live data.

Conclusion

In this tutorial, we have walked through how to use simple SQL statements to create derivations in Flow. 

Using Streaming SQL, you can easily apply a wide range of transformations to your data before they arrive at the destination in your data pipeline.

We have walked through three examples. In the first example, we apply a simple SQL query to filter streaming data on a specific column. In the second example, using our Wikipedia live demo, we applied a simple SQL transformation to retain the first and last time we saw a user and their lifetime edits. In the third example, we applied another SQL transformation to show aggregations of the raw data.

SQL transformation is a powerful capability during data replication because it allows data to be transformed, cleaned, and formatted as it is being moved from one system to another. This can help ensure the data is consistent and accurate across systems, and that it is properly mapped and formatted for its intended use.

Try the SQL transformation capability in Flow today! If you’re new to Flow, you can register here for free.

References

  1. https://www.scalevp.com/blog/stream-processing-becomes-mainstream
  2. https://www.techtarget.com/whatis/feature/A-history-and-timeline-of-big-data

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Start Building For Free

About the author

Picture of Jeffrey Richman
Jeffrey Richman

With over 15 years in data engineering, a seasoned expert in driving growth for early-stage data companies, focusing on strategies that attract customers and users. Extensive writing provides insights to help companies scale efficiently and effectively in an evolving data landscape.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.