Estuary

Stateful Stream Processing: Concepts, Tools, & Challenges

Stateful data and stream processing: can you have both? The answer is "yes," with a few caveats.

Share this article

Real-time data streaming and processing is now a key value-add for businesses. 

As opposed to batch processing, in real-time processing, data is processed as it is generated and received. The data is continuously produced, and with this comes a lot of complexity and pitfalls that we need to be aware of as Data Engineers.

Imagine you’re a Data Engineer working at a gigantic e-commerce site. Most of the business needs that you’ll be required to power will need data joins, aggregations, and many other complex operations. Imagine this data is generated in a streaming manner; how would you go about answering business questions?

In this article, we’ll look at how you can work with streaming data to perform these complex operations: using stateful streaming.

We’ll explore important concepts, tools that you can use, and challenges you might face!

Bounded and unbounded data

Data generated in a (streaming) system will mostly exist in two types:

stateful stream processing - bounded vs unbouded

Bounded vs unbounded data

Bounded Data

Bounded data has a defined start and a defined end.

This is the sort of data that can be ingested in full and then processed; basically batch processing. It’s easier to work with, especially at low volumes, but as the velocity and size of data increases, bounded data can also be processed in a streaming manner.

Countries of the world and user demographics in a system are examples of bounded streams.

Unbounded Data

On the other hand, unbounded data has a defined start but no defined end. Essentially, this is an infinite stream of data.

Unbounded data is what we understand more as streaming data. It’s more complex to process unbounded data since we have to continuously process it as it arrives while taking into account data that might be late (but we probably don’t want to wait for it), it may arrive out of the expected order, among many other complexities.

Clickstream events and IoT device pings are examples of unbounded streams.

Stateless vs stateful stream processing

stateful stream processing - stateless vs stateful

Stateful vs stateless streaming by Harleen

When working with both unbounded and bounded streams of data, there are generally two ways to work with the events received:

  1. Process each event as received
  2. Process each event as received, but including or taking into account history/context (other received and/or processed events)

With the first workflow, we have no idea about other events; we receive an event and process it as is. But with the second, we store information about other events, i.e. the state, and use this information to process the current event!

Therefore the first workflow is stateless streaming while the second is stateful streaming!

The limitations of stateless stream processing

The lack of context when processing new events —  that is, not being able to relate this event with other events — means you lack the ability to do quite a bit:

  1. You can’t aggregate your data
  2. You can’t join or compare data streams
  3. You can’t identify patterns
  4. You’re limited in use cases

It’s clear that stateless streaming has limitations that might make it hard to answer your business questions. 

Similar to the backfill problem that we explored before, being able to process your events in the context of other events (or in our previous case, working with both real-time and historical data) proves to be very useful.

Example use cases of stateful stream processing

The ability to process the current event in the context of other events comes very much in handy in the following example cases:

Fraud and anomaly detection

In transactional systems, detecting fraudulent and anomalous behavior in real time is critical.

Anomalies (and fraud) are behaviors that are outside the norm… “the norm” being previous habits in the system!

Therefore, when processing the current transaction, you need to be able to compare its attributes in real time with previous habits!

ML/AI systems

In the age of immediate responses, we expect on-the-fly recommendations when making purchases from Amazon, customer support to answer our questions as quickly and accurately as possible, our social media feeds to recommend the best content always.

These systems need to infer the best response based on your current interactions vs previous interactions. For instance, Amazon will recommend other items to buy based on your current cart contents and items you’ve viewed before.

Device (IoT) monitoring

When working with IoT devices, monitoring their health becomes quite important.

Health of the device can be defined as: for a defined window of time, the device sends a certain number of pings that are no more than 5% fewer than the previous window. Basically, if we expect our device to send 100 pings every hour, if the following hour we receive less than 95, we have a problem.

In this case, we need to store the state of the previous hour to process the current hour!

Important stateful stream processing concepts

Timely stream processing

Let’s think through the scenario above where you are monitoring the health of IoT devices. 

If we’re processing this data in a batch manner, this monitoring would be relatively straightforward (with a lot of caveats of course). We can run a query every hour that compares data from this hour vs the previous. 

But, you’re very likely not going to be using batch processing here. You therefore need a way to compare a continuously flowing stream of pings across windows of time.

This is a common paradigm of stateful stream processing where time places a key role, hence timely stream processing. 

Let’s look at some important concepts

Windowing

Windowing is the mechanism by which we take an infinite stream of data and create bounded batches based on time (it doesn’t always have to be time-bound; for example, you can window by number of events received, but this is the most common way).

Depending on our use case, we can define different types of windows to discretize our data. Some common ones are:

Tumbling windows

These are non-overlapping, strictly defined windows. For our use case above, you would go for a tumbling window for every hour.

stateful stream processing - tumbling windows

Tumbling windows by Apache Flink

Sliding windows

Say you wanted instead to compute a rolling count of pings received every 30 minutes. 

stateful stream processing - sliding windows

Sliding windows by Apache Flink

If you’re familiar with window functions in SQL, this would be similar to doing something like:

plaintext
SELECT *, COUNT(ping_id) OVER(PARTITION BY sensor_id ORDER BY ping_time ASC ROWS BETWEEN 30 PRECEDING AND CURRENT ROW) AS pings_received FROM sensor_pings_table;

You can read more about Apache Flink windowing capabilities in their documentation.

Another important concept is the definition of “time.”

Ingestion time

This is the time when the event is received by the processor. 

Processing time

This is the time when the event is processed. This could be as soon as it’s ingested or much later.

For a 1-hour tumbling window configuration, pings are collected for exactly one hour and then processed.

Using processing time is simple but is easily affected by system failure problems — scheduled downtimes, among others. Especially with distributed sources, events arriving late and/or out of order also make this choice unsuitable for cases where the time the event occurred is more important.

Event time

This is the time when the event was produced by the originating system. This timestamp will usually be part of the payload sent to the processor.

For a 1-hour tumbling window configuration, pings generated over an hour are collected and then processed.

Using event time, however, has more latency than processing time since you might need to wait on late-arriving events

Watermarks

Since the event time isn’t the same as the processing time, our processing system needs a way to keep track of the event time. A watermark is a timestamp that basically informs the system that any events produced before said time have already been processed.

There are different strategies for setting watermarks. You can read about the options in Flink here and here.

stateful stream processing - time

Skew between event and processing time in reality (Tyler Akidau)

Late-arriving events

In reality, events may arrive later than the calculated watermark. Apache Flink by default will discard the late events, but you can specify an allowed level of lateness. In this case, the event is processed even when its event time is much later than the watermark.

Considerations with timely stream processing

When working with timely streaming, there are several considerations to make:

  1. How big your windows should be

The size of the window will have an impact on the amount of storage you’re using (the bigger the window, the more storage). Your analytics will also be slower the bigger your window is. 

You need to strike a good balance between the size and type of the window, and your business needs.

  1. How to deal with late and out-of-order events

When working with event time, you need strategies to work with events arriving late or out-of-order. For instance, for late events, you could discard them altogether, allow for a window of lateness, or store them as a separate output to deal with separately.

  1. Should you use event time or processing time

This decision is mostly dependent on what you’re looking to achieve with the stream. Processing time is easier to work with but may not be what you need

  1. Implementing timely processing in a distributed manner

When working in a distributed system (multiple producers, parallel processors) the complexities of watermarking, dealing with late events, scalability, performance etc. increase quite a bit.

State persistence

To do stateful streaming, we need to store state. We need to persist it in one way or another so that it’s available when processing each event. 

State will usually be stored in embedded storage in a key-based manner.

Embedded storage

State stores will usually be lightweight and local to the operators in the processor. This means that the operator can access the state store without performing slow and fragile network calls. 

In a distributed system, the state can also be easily replicated and shared with parallel operators.

Key-based

State will also be stored grouped by keys defined from your input stream. The key is the main way that the current event being processed is matched with other events (the state). Key-based storage also allows operators to work on different keyed streams in a parallel manner. The storage can also be partitioned based on the key.

In-memory and persistent storage

Storage will also come in two main forms:

  1. In-memory storage
  2. Persistent storage
stateful stream processing - storage

In-memory and persistent storage for state management by Cloudera


In-memory storage is usually the default way to store the state because it’s lightweight and extremely fast to access. Flink for instance, utilizes objects stored in a Java Heap as its in-memory store. In-memory stores however trade off available size for speed.

State can also be stored in persistent storage. Different tools will utilize various options for persistent storage: Apache Flink uses Rocks DB, while Estuary Flow’s Gazette will use a journal in BLOB storage such as S3. Persistent storage allows for large state storage capacity, with better scaling, but suffers from slower access due to the necessary read and write overheads.

Checkpoints and savepoints

To ensure fault tolerance and recovery, stateful streaming tools will often implement some form of saving the state at a point in time.

Checkpoints are consistent snapshots of the state that are taken and stored asynchronously to events processing. Checkpoints will usually expire when newer checkpoints are saved. Savepoints are manually triggered snapshots that don’t expire. 

Checkpoints and savepoints are used to recover from failures or downtimes. The processor can continue from the most recent snapshot of the state!

Challenges of stateful streaming

As much as building your streaming systems to achieve stateful-ness seems like the silver bullet to your business needs, there are several reasons you might think otherwise:

State-management complexity

The biggest challenges with implementing stateful streaming are the results of having and managing state.

In stateless streaming, you only need to care about how to process incoming events. In stateful streaming, you have the extra overhead of managing state. This will manifest in:

Fault tolerance

You need mechanisms to store state, replicate state, and perform checkpoints among others for fast and reliable recovery in case of failures and downtimes.

Scalability

Scaling from, say, one processing node to multiple processing nodes becomes complex when you need to take into account how you’re going to manage state with more nodes.

Data inconsistency

As with most distributed systems, keeping data consistent is a challenge. In a stateful streaming system you now also need to take into account issues such as late-arriving events.

Analysis complexity and latency

Data analysis becomes increasingly complex when you need to implement, for instance, complex windows to understand your unbounded streams. Couple the complexity with increasing state storage and management requirements, you can expect increased latency with your analyses. 

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

stateful stream processing - flink

Data flow through Flink

Flink includes two core APIs: a DataStream API for bounded or unbounded streams of data and a DataSet API for bounded data sets. Flink also offers a Table API, which is a SQL-like expression language for relational stream and batch processing that can be easily embedded in Flink's DataStream and DataSet APIs. The highest-level language supported by Flink is SQL, but programs can be written in Java, Scala, or Python.

Apache Kafka Streams

Apache Kafka is a distributed event store and stream processing platform that was built at LinkedIn and made open source in 2011.

stateful stream processing - kafka streams

Kafka architecture including Connect and Streams

Kafka Streams is a library that was added to the Kafka project for building streaming applications, specifically applications that transform input Kafka topics into output. Kafka Streams addresses the following and more:

  • Event-at-a-time processing (not microbatch) with millisecond latency
  • Stateful processing including distributed joins and aggregations
  • Windowing with out-of-order data using a DataFlow-like model
  • Distributed processing and fault-tolerance with fast failover

Apache Spark Structured Streaming

Apache Spark is an open-source, distributed processing system used for big data workloads. It provides development APIs in Java, Scala, Python, and R, and supports batch processing, interactive queries, real-time analytics, machine learning, and graph processing.

stateful stream processing - spark

Streaming data represented as an unbounded table on Spark

Structured Streaming is a stream processing engine built on the Spark SQL engine. Developers can express streaming computation the same way they would express a batch computation on static data (basically run analysis on a never-ending table). The Spark SQL engine will then take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

Streams can either be processed in a micro-batch manner or the Continuous Processing mode introduced in Spark 2.3

Amazon Kinesis Data Analytics

AWS’ Kinesis Data Analytics provides a managed service to run Apache Flink or Apache Beam applications.

stateful stream processing - kinesis

Kinesis Data Analytics example use case

Estuary Flow

stateful stream processing - estuary flow

Built on Gazette (a service built in Go that makes it easy to build platforms that flexibly mix SQL, batch, and millisecond-latency streaming processing paradigms), Estuary Flow is a real-time data operations platform that allows you to set up real-time pipelines with low latency and high throughput without you provisioning any infrastructure.

Flow avoids complex windowing implementations and in its stead utilizes the journal abstraction provided by Gazette to power both batch and streaming analytics!

See also: how Flow's design compares to other solutions discussed above.

While supporting stateless streaming, Flow has little technical overhead and can be used through an intuitive web UI designed for engineers and business users alike. To get started, create a free trial account on Flow’s website.

Conclusion

In this article, we’ve explored the paradigms of processing streaming data i.e. stateless and stateful streaming.

We’ve then explored the reasons to consider (or not consider) using stateful streaming for your business needs, and the important concepts you should understand before embarking on this journey.

Finally we looked at some of the popular tools that you can use to implement stateful streaming

Whether to go stateless or stateful is a decision that we leave to you as a business.

Which factors are most important to your organization, and what would you choose? Connect to us on LinkedIn or Slack and let us know!

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Build a Pipeline

Start streaming your data for free

Build a Pipeline

About the author

Picture of Jeffrey Richman
Jeffrey Richman

With over 15 years in data engineering, a seasoned expert in driving growth for early-stage data companies, focusing on strategies that attract customers and users. Extensive writing provides insights to help companies scale efficiently and effectively in an evolving data landscape.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.