Much ink has been spilled about GPT’s impressive capabilities, generally framed around a problem definition, a toy static data set, and novel prompting techniques. This post is not that.

This post addresses how you can productionize a custom, always-on, GPT-enabled transformation pipeline with your actual data. As a demonstration, I’ll use Estuary’s Slack instance to:

  • Capture Slack conversations as they’re happening
  • Derive a continuous GPT summarization of each thread’s content thus far
  • Materialize evolving threads and summaries into Google Sheets

 


This post started only as a technical demonstration – summarization is essentially the “hello world” of GPT prompting techniques – but I’ve found the end product to be really quite useful, as a bird’s eye view of the conversations within our organization!
 

gpt real time pipeline - spreadsheet of slack thread summaries
Our end result. New threads become spreadsheet rows that update as the conversation unfolds.

Context Is Everything

AI pipelines have a context problem.

A critical fact to understand about GPT (and all trained AI models, really), is that they’re pure functions of their training set – which in the case of ChatGPT cuts off after 2021 – and a current prompt. That’s it. It’s an intelligent, touchingly immature assistant that has great recall of its training set but starts every interaction with horrible amnesia.

This property is a poor match for the way we represent and work with data records, which in the case of a threaded Slack message look like:

gpt real time pipeline - Estuary document for a slack message
A captured Slack thread document in all its glory.

 

This API response is structured this way for Very Good Reasons. It’s a representation designed for a machine. But as a human? I can’t make heads or tails of it. “Lol me too” about what? Which channel was this in? Who’s user U015FJTKTB4?

GPT is definitely not going to know either. It’s not magic. Which means that, before we can ask it, we need to piece together the broader task context – joining with other thread messages, user metadata, and the channel to arrive at a useful prompt:

plaintext
Slack thread in #general, beginning 2023-06-01T19:27:03.944Z:    Olivia Iannone aka <@U027YJMEEUC>:        Forgot to mention at standup: I will be out of office Friday        and Monday!    Olivia Iannone:        (As a rule, any time I'm gonna be out, I come to standup and        tell myself, "Ok Olivia, _this time_ try and remember to tell        everyone you're going to be out," and the thought inevitably        leaves my brain before it's my turn)    Phil aka <@U015FJTKTB4>: lol me too Summarize into one terse sentence for a busy executive.


With this rich context, GPT produces a very sensible summary in our output sheet:

gpt real time pipeline - summary of one slack thread
Sheets row for this conversation.

Sizing the Problem

Conversations are happening all the time, and I’d like my spreadsheet to update along with them. I can tolerate some latency but I’d like it to be minimal.

I know I’ll need to perform a meaningful amount of transformation before I can hand off threads to GPT for summarization. I can smell a multiway SQL join.

At the same time, GPT has practical limitations: OpenAI offers an ergonomic REST API but it has fairly restrictive rate limits, which bound the overall API throughput I can utilize. It’s also far more expensive than your average API, so I only want to invoke it when a thread is changing due to new or edited messages, and perhaps only after first rolling up any closely-arriving messages.

At its core, this is a pipeline with distinct “capture,” “transform,” and “materialize” phases:

  • Capture incremental updates from Slack’s API.
  • Transform new Slack records into a full thread context and summarize with GPT.
  • Materialize thread summaries back out into a Google Sheet.

If I were to build this using other common tooling, I would probably use:

  • Capture: Fivetran is the dominant tool for extract & load into a warehouse (the “EL” of “ELT”).
  • Transform: A custom script, perhaps scheduled with Airflow, that looks for threads updated since the last invocation, assembles their full context via a SELECT, calls out to OpenAI’s API for completions, and then upserts rows into a summaries table. Or, dbt’s Python models could be an option, but beware of limitations.
  • Materialize: Hightouch is a common tool for reverse-ETL, which can sync my summaries table to a Google Sheet.

This stack could certainly work. It’s also (at least) three different tools, plus a warehouse and something to run my Python script. Everything is on its own periodic schedule, and I’ll need to be careful to minimize latency. It’s also incumbent upon me to make sure everything is properly incremental: if I accidentally do a full refresh, that could be an expensive OpenAI bill.

But this is a post that’s not-so-subtly pitching Estuary Flow, so… let’s see what that looks like?

There's a Connector for That

Capturing from Slack is the easy part. I use Estuary Flow’s UI to create a Capture with the Slack connector. I even authenticate with OAuth, no API keys required 👍.

gpt real time pipeline - Estuary slack connector
Using Flow to capture from Slack.

My capture builds out a few data collections, of which I’ll need userschannels, channel_messagesand threads:

gpt real time pipeline - running estuary slack capture
The running capture and its output collections.

 

If you want to build this yourself, there are a variety of OSS connector options to choose from:

Building Up Context

I tackle the transformation phase by composing a pair of SQLite and TypeScript derivations, which are collections that build themselves through transformation of other collections.

The first SQLite derivation is responsible for indexing thread messages along with user and channel metadata. When a message is added or updated, the thread’s denormalized content is published into the derived collection:

gpt real time pipeline - structured rollup of slack thread
A structured roll-up of a current thread state.

A second TypeScript derivation takes these structured roll-ups and formats a text string for the entire thread, which is handed to GPT for summarization (called a “completion”):

gpt real time pipeline - summarized completion Flow document
A structured roll-up of a current thread state.

Derivations have a few properties that make them well-suited for this task:

  • They run continuously, reacting to new Slack messages with sub-second latency.
  • They’re incremental. Derivation lambdas are invoked with new data as it arrives.
  • They’re transactional and pipelined. I can achieve more OpenAI throughput by starting multiple API calls, only waiting for them to complete once the transaction starts to close.
  • They reduce data volumes: multiple thread updates processed in the same transaction are automatically combined into a single output, which means a smaller OpenAI bill.
  • They’re fully managed. I don’t need to provision any infrastructure to run them.
  • They’re composable. I can create future GPT tasks which draw from the same collection of structured roll-ups.

I’ve published the full Flow specifications for this pipeline in a Git repository.

The World Runs on Spreadsheets

Materializations are like captures in reverse: they maintain continuous views of collections in an endpoint system, like a database or Pub/Sub topic or a Google Sheet. I use Estuary Flow’s UI to create a Materialization with the Google Sheets connector:

gpt real time pipeline - creating Estuary google sheets materialization

Once configured, the materialization keeps my Sheet up-to-date with sub-second latency.

Next Steps

I went with a deliberately basic GPT prompting technique for this post. Summarization, while useful, is barely scratching at the possible use cases of a pipeline like this. Other use cases include:

  • Tracking work streams discussed in a thread, with descriptions and owners.
  • Updating a CRM system with relevant customer discussions in Slack.
  • Materializing whole Slack threads into Pinecone with semantic embeddings. Then, build question-answering applications on top of your organization’s institutional knowledge.
  • Proactively identifying when a discussion in Slack can be informed from the existing body of institutional knowledge.
  • Monitoring for security or regulatory compliance.

Try out this pipeline using Flow today (it’s free to start), let us know what you think, and how Flow can better serve other use cases you have in mind. Thanks for reading!

Start streaming your data for free

Build a Pipeline