Estuary

How to Build AI-Driven Customer Support Intelligence with Estuary and Snowflake Cortex

Is your business sitting on a treasure trove of customer support data? Learn how to build an AI-powered pipeline using Estuary and Snowflake Cortex to turn that data into actionable intelligence.

Blog post hero image
Share this article

Customer support data is one of the richest but most underutilized operational signals. While companies collect massive volumes of complaint tickets, chats, transcripts and product reviews, most pipelines still process this data in batches, leaving support teams reacting hours—or days—too late. Streaming ingestion combined with native AI inference opens the door to a fundamentally different model: continuous support intelligence. Understanding the nature of customer issues, their frequency, and emerging patterns can directly inform product improvements, operational fixes, and customer experience strategy.

In this article, we’ll build an AI-powered customer support intelligence pipeline that ingests, enriches, and operationalizes support case data. To simulate a production-scale environment, we first apply prompt engineering with OpenAI in Python script to generate high-fidelity synthetic support tickets that reflect realistic customer language, urgency, and issue diversity. By combining Estuary for right-time ingestion with Snowflake Cortex AI to automatically summarize, classify, and recommend next-best actions on support cases, this solution demonstrates how modern data platforms can transform raw support interactions into structured, actionable intelligence.

On Snowflake Cortex AI

Recent advances in data engineering and AI are transforming how organizations operationalize customer support data. Traditional pipelines primarily support retrospective reporting, but platforms like Snowflake now enable AI inference directly where the data resides. With native capabilities such as Snowflake Cortex AI, teams can summarize, classify, and semantically analyze large volumes of support interactions without exporting data to external ML systems.

Because Cortex runs within Snowflake’s secure compute environment, it reduces architectural complexity while improving latency, governance, and scalability. Data teams can invoke LLM-powered functions directly in SQL to enrich support cases in near real time—turning the warehouse from passive storage into an active intelligence engine for faster operational decisions.

Cortex Search and RAG Foundations 

Retrieval Augmented Generation (RAG) is a pattern that enhances large language model responses by grounding them in relevant enterprise data. Rather than relying solely on the model’s pretrained knowledge, RAG retrieves semantically similar records from a curated knowledge base and injects that context into the generation step. This is particularly powerful for customer support scenarios, where historical cases, resolution notes, and product-specific issues provide critical context for accurate recommendations.

One example of this is Snowflake Cortex Search. It provides the semantic retrieval layer needed to implement this pattern natively within the data platform. By indexing unstructured support text and enabling vector-based similarity search, Cortex Search allows applications and agents to quickly locate relevant past cases. When combined with Cortex LLM functions (such as COMPLETE or SUMMARIZE), this creates a foundation for enterprise support copilots that can retrieve similar incidents, generate agent-ready summaries, and recommend next-best actions.

In this project, you'll learn how to leverage Snowflake Cortex to: 

  • Automatically summarize incoming support tickets.
  • Perform sentiment and issue analysis.
  • Generate AI-driven next-step recommendations for agents.

Where Estuary Fits - Webhook Based Ingestion

While Snowflake Cortex provides the intelligence layer, the effectiveness of any real-time AI system depends on how quickly fresh data arrives. This is where Estuary plays a critical role as the right-time data movement platform in the architecture. 

In this pipeline, Estuary is responsible for continuously ingesting customer support case events and delivering them into Snowflake with low latency and high reliability. Rather than relying on batch uploads or scheduled ETL jobs, Estuary enables a streaming-first pattern where support interactions become available for AI enrichment within seconds of being generated. This ensures that Cortex models always operate on the most current customer signals.

Webhooks are used as the ingestion mechanism because customer support systems are inherently event-driven. Each new ticket, update, or customer message represents a discrete event that should be processed immediately. Compared to polling-based ingestion, webhooks provide:

  • Lower latency - events are pushed as they occur
  • Greater efficiency — no need for constant polling
  • Simpler integration with SaaS support platforms
  • Better scalability for bursty ticket volumes

By pairing webhook-based ingestion with Estuary’s exactly-once delivery and schema evolution handling, the pipeline achieves a robust foundation for real-time support intelligence. The result is an architecture where customer support data flows continuously from source to warehouse to AI enrichment, enabling faster detection of risk, quicker agent response, and more proactive customer experience management.

Overview of the Pipeline

Estuary-SnowflakeCortex-OpenAI-pipeline-overview.png
This architecture shows how synthetic customer feedback is generated with Python and the OpenAI API, streamed to Snowflake via Estuary, and analyzed using Snowflake Cortex AI for AI-powered support workflows.

Prompt-engineered support cases stream through Estuary into Snowflake, where Cortex AI continuously analyzes, classifies, and operationalizes customer support intelligence in near real time. 

The Architecture: 

  • Data Generation Layer: Prompt Engineering (Python + OpenAI) 
  • Right-Time Ingestion Layer: Estuary Webhooks to continuously stream support events into Snowflake
  • Data Platform Layer: Snowflake (Storage + Modeling) 
  • Intelligence Layer: Snowflake Cortex AI 
  • Semantic Layer: Cortex Search/AI Functions

Pre-requisites

Before we jump in, make sure you have the following:

  1. Estuary: Sign up for a free trial of Estuary, which will be used to ingest API data via HTTP/Webhooks and stream it into Snowflake. 
  2. Snowflake: Sign up for a 30-day free trial of Snowflake Trial, which will serve as the analytical warehouse and used for Cortex AI features. 
  3. Any Python IDE: Any IDE capable of running Python scripts (e.g., VS Code, PyCharm) for generating data and interacting with APIs. 
  4. OpenAI API Key: An OpenAI API key is required to simulate events using LLM-powered prompt engineering.

Step-by-Step Guide Implementation

Step 1: Generate Support Case Records via Prompt Engineering (Python Script + OpenAI)

You’ll generate structured support case JSON records that resemble real support systems (billing, shipping, product defects, etc.), including lifecycle timestamps and resolution states.

python
import os import json import requests from datetime import datetime from openai import OpenAI import re import time OPENAI_API_KEY = <YOUR_OPENAPI_KEY> ESTUARY_WEBHOOK = <YOUR_WEBHOOK_LINK> ESTUARY_API_KEY = <BEARER_TOKEN> BATCH_SIZE = 30 DEBUG = True client = OpenAI(api_key=OPENAI_API_KEY) def generate_interactions(batch_size=30): prompt = f""" Generate {batch_size} realistic customer support CASE records for an eCommerce platform. IMPORTANT CONTEXT: These are full support cases (not chat turns). Each record represents one customer support ticket from creation to resolution. Each record is ONE support case. CUSTOMER DISTRIBUTION (VERY IMPORTANT): • Approximately 70% of customers should appear in ONLY ONE case. • Approximately 30% of customers should appear in 2-3 cases. • DO NOT assign all cases to the same customer. • DO NOT create a unique customer for every case. • The dataset must contain a realistic mix of repeat and one-time customers. WRITING STYLE: - Use natural, messy human language in descriptions. - Include frustration, urgency, politeness, and occasional mild sarcasm. - Make scenarios realistic (delivery delays, billing issues, product defects, etc.). - Do NOT repeat templates. TIMESTAMP RULES: - Use ISO 8601 UTC format. - date_closed must be AFTER date_created. - If status = "open" then date_closed = null. ID RULES: - case_id format: CASE-##### (unique) - customer_id format: CUST-###### (realistic) VALID TAXONOMY query_type → intent: invoice: check_invoice, get_invoice order: cancel_order, change_order, place_order, track_order payment: check_payment_method, payment_issue refund: refund_policy, get_refund shipping: change_shipping_address, setup_shipping review_feedback: complaint, review contact: customer_service, human_agent issue_category options: - billing - delivery - product_defect - account_access - general_inquiry priority options: - low - medium - high - urgent status options: - open - in_progress - resolved - escalated handled_by options: - bot - agent channel options: - chat - email - phone - web REQUIRED FIELDS (STRICT): - case_id - customer_id - timestamp - date_created - date_closed - status - required_escalation - query_type - intent - issue_category - priority - handled_by - response_time_min (integer 1-120) - number_of_calls (integer 0-5) - concerns_addressed (boolean) - interaction_cost (float USD) - product_sku - category - price - channel - location - case_title - case_description - last_update_note DATA REALISM RULES: - urgent cases more likely to escalate - product_defect often negative tone - delivery issues common - some cases unresolved - vary locations across US - price must match realistic ecommerce ranges - response_time_min higher for escalations - NOT all cases should be resolved Return ONLY valid JSON in this exact format: {{ "records": [ ... ] }} No markdown. No explanations. Only JSON. """ response = client.chat.completions.create( model="gpt-4o-mini", temperature=0.7, response_format={"type": "json_object"}, messages=[{"role": "user", "content": prompt}], ) content = response.choices[0].message.content if not content: raise ValueError("Empty model response") # --- robust JSON parsing --- try: data = json.loads(content) except json.JSONDecodeError: if DEBUG: print("⚠️ Raw model output (first 1000 chars):") print(content[:1000]) # Try to extract JSON object safely match = re.search(r"\{.*\}", content, re.DOTALL) if not match: raise ValueError("Failed to locate JSON object") data = json.loads(match.group()) records = data.get("records") if not isinstance(records, list): raise ValueError("JSON missing 'records' array") return records def build_estuary_payload(records): if not records: raise ValueError("No records to build payload") for r in records: priority = (r.get("priority") or "").lower() status = (r.get("status") or "").lower() return [ { **r, "is_high_priority": priority in ("high", "urgent"), "needs_escalation": ( priority == "urgent" or status == "escalated" or r.get("required_escalation") is True ), } for r in records ] def send_to_estuary(records: list) -> None: headers = { "Content-Type": "application/json", "Authorization": "Bearer " + ESTUARY_API_KEY, } for r in records: try: response = requests.post( ESTUARY_WEBHOOK, headers=headers, json=r, timeout=10, ) if response.status_code in (200, 201, 202): print(f"✅ Estuary ingest success ({response.status_code})") else: print(f"❌ Failed ({response.status_code}): {response.text}") except requests.exceptions.RequestException as e: print("🚨 Network error:", str(e)) def run_once(): records = generate_interactions(BATCH_SIZE) if not isinstance(records, list) or not records: raise ValueError("Generator returned invalid data") print(f"📊 Generated {len(records)} records") documents = build_estuary_payload(records) if DEBUG: print("\n=== SAMPLE DOCUMENT ===") print(json.dumps(documents[0], indent=2)) print("\nSending to Estuary...") send_to_estuary(documents) print("Batch complete\n") if __name__ == "__main__": while True: run_once() # time.sleep(30)

A few notes on this script:

  • generate_interactions(): Creates realistic synthetic customer support cases using an OpenAI language model. It constructs a highly structured prompt that enforces schema rules, realistic customer distributions. The function then calls the model (gpt-4o-mini) with JSON-only output enforced. The function ultimately returns a clean Python list of support case records ready for downstream processing.
  • build_estuary_payload(records): Enriches the raw generated cases with derived operational signals that are useful for analytics and automation. Specifically, it computes two boolean flags: is_high_priority, which identifies high-urgency tickets based on priority, and needs_escalation, which combines urgency, workflow status, and explicit escalation indicators into a unified escalation signal.
  • send_to_estuary(records): Handles real-time delivery of each enriched support case to Estuary via an authenticated webhook. It builds the required headers, iterates through the records, and sends each event individually to simulate a streaming workload.

Step 2: Create Estuary Webhook to Ingest the Data

  1. Click on New Capture and search for HTTP Webhook.
Estuary-webhook-capture.png
Create a new capture to connect your source and start ingesting data into the pipeline.
  1. Name your Capture: For example, customer_support_tickets. Then, create your own Authentication Bearer token.
  2. Name the endpoint config as: customer-support-tickets
  3. Once your changes are saved, click on your capture and you can now see the following Public Endpoint:
Estuary-capture-public-endpoint.png
  1. Copy the public endpoint from your specific capture. In this example, it would be: https://9d294bb894b29fec-8080.reactor.aws-us-east-1-c1.dp.estuary-data.com/
  2. Save the above Webhooks URL and Authentication Token safely, to be used later.
  3. Once it starts ingesting, you can see in the collection:
Estuary-collections-UI.png
The image above shows the collection details after the data has been captured.

Step 3: Stream Data from Estuary into Snowflake

Now we connect the streaming pipeline to Snowflake.

  1. In Collections, click Transformations and select your captured collection.
  2. Create Snowflake Materialization
    1. Click Materializations
    2. Choose Snowflake
    3. Select your transformation as the source
    4. Configure the destination table
Estuary-Snowflake-materialization-connector.png
Create a materialization by selecting Snowflake as the destination for captured data.
Estuary-materialization-authentication.png
This image shows how to authenticate Snowflake by entering connection details and authorizing it with a private key (JWT).
  1. Configure Snowflake Access

Before Estuary can write to Snowflake, we must create the required roles, warehouse, and permissions.

python
set database_name = 'ESTUARY_DB'; set warehouse_name = 'ESTUARY_WH'; set estuary_role = 'ESTUARY_ROLE'; set estuary_user = 'ESTUARY_USER'; set estuary_schema = 'ESTUARY_SCHEMA'; -- create role and schema for Estuary create role if not exists identifier($estuary_role); grant role identifier($estuary_role) to role SYSADMIN; -- Create snowflake DB create database if not exists identifier($database_name); use database identifier($database_name); create schema if not exists identifier($estuary_schema); -- create a user for Estuary create user if not exists identifier($estuary_user) default_role = $estuary_role default_warehouse = $warehouse_name; grant role identifier($estuary_role) to user identifier($estuary_user); grant all on schema identifier($estuary_schema) to identifier($estuary_role); -- create a warehouse for estuary create warehouse if not exists identifier($warehouse_name) warehouse_size = xsmall warehouse_type = standard auto_suspend = 60 auto_resume = true initially_suspended = true; -- grant Estuary role access to warehouse grant USAGE on warehouse identifier($warehouse_name) to role identifier($estuary_role); -- grant Estuary access to database grant CREATE SCHEMA, MONITOR, USAGE on database identifier($database_name) to role identifier($estuary_role); -- change role to ACCOUNTADMIN for STORAGE INTEGRATION support to Estuary (only needed for Snowflake on GCP) use role ACCOUNTADMIN; grant CREATE INTEGRATION on account to role identifier($estuary_role); use role sysadmin; COMMIT;
  1. Generate and Attach RSA Keys

Run the following command in Terminal:

python
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptGenerate your public key

Generate your public key:

python
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

To view your public key, type the following command in Terminal: 

python
cat rsa_key.pub

 Output:

python
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAstmN+gPB8oR3q4LnZhN2 qrdWwLTKxtTsV/R84IIhQMVegYwkHmNHHeWKHTtQGpbDasJIz4CLpnwRakJkC8Xq ..... -----END PUBLIC KEY-----

Attach the public key in Snowflake:

python
ALTER USER identifier($estuary_user) SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...'

To view your private key and copy, run the following command:

python
cat rsa_key.p8
python
-----BEGIN PRIVATE KEY----- xxxxxxx -----END PRIVATE KEY-----

Add it to Estuary’s Private Key (JWT): In the Estuary Snowflake materialization, paste contents of rsa_key.p8. Include BEGIN and END lines.

Step 4: Trigger Cortex AI in Snowflake

Once your data loads into Snowflake, run the following query:

python
select * from ESTUARY_DB.ESTUARY_SCHEMA.CUSTOMER_SUPPORT_TICKETS;

Snowflake Output:

Estuary-Snowflake-query-output.png
Query results for customer support tickets loaded into Snowflake for analytics, showing case description, intent, and issue category.

In the output, we have the following columns that we will focus on:

  • Case Description: Containing the customer’s detailed message about the issue 
  • Case Title: Provides a quick overview of the case 
  • Intent: A normalized label representing what the customer is trying to accomplish (for example, track an order or request a refund) 
  • Issue Category: Higher-level classification of the underlying problem area (such as billing, delivery, or product defect) 
  • Needs Escalation: A boolean flag indicating whether the case likely requires priority handling or human intervention. 
  • Status: The current lifecycle state of the support case (for example, open, in_progress, resolved, escalated). It's used to track resolution progress, monitor backlog health, and measure support performance.

Next, we'll use these columns by applying various Cortex AI functions for the following use cases in Step 5.

Step 5: Explore Cortex AI Use Cases

Sentiment Scoring

Sentiment score quantifies the emotional tone of a customer’s message (from very negative to very positive). In a support intelligence pipeline, it turns unstructured text into a measurable signal that teams can act on in real time.

Run the following query to compute the sentiment score using the CORTEX.SENTIMENT function. Scores greater than 0.6 indicate relatively positive customer feedback, while scores below -0.4 signal customers who may be at risk of churn due to a negative product experience. By automatically scoring each case, support teams can proactively surface the most at-risk customers instead of relying solely on manual priority tags.

python
SELECT case_id, customer_id, case_title, issue_category, SNOWFLAKE.CORTEX.SENTIMENT(case_description) AS sentiment_score, CASE WHEN SNOWFLAKE.CORTEX.SENTIMENT(case_description) >= 0.6 THEN 'positive' WHEN SNOWFLAKE.CORTEX.SENTIMENT(case_description) <= -0.4 THEN 'negative' ELSE 'neutral' END AS sentiment_label FROM ESTUARY_DB.ESTUARY_SCHEMA.CUSTOMER_SUPPORT_TICKETS;

The query will return something like the following image:

Snowflake-Cortex-sentiment-analysis-output.png
Snowflake Cortex AI sentiment analysis output for customer support tickets, including sentiment scores and labels.

When combined with fields like needs_escalation or priority, sentiment helps identify cases likely to require human intervention.

Support Tickets AI Summarization and Analysis

Next, we can add the sentiment score to a view, and then perform support tickets summarization based on AI_AGG.

python
CREATE OR REPLACE VIEW SUPPORT_WITH_SENTIMENT AS WITH scored AS ( SELECT *, SNOWFLAKE.CORTEX.SENTIMENT(CASE_DESCRIPTION) AS sentiment_score FROM CUSTOMER_SUPPORT_TICKETS ) SELECT *, CASE WHEN sentiment_score >= 0.6 THEN 'positive' WHEN sentiment_score <= -0.4 THEN 'negative' ELSE 'neutral' END AS sentiment_label FROM scored;

The following query uses Snowflake Cortex AI to generate an executive summary across all customer support cases. Instead of analyzing tickets one by one, it aggregates the full dataset and produces a structured, AI-written narrative highlighting major themes, recurring pain points, and operational risks.

AI_AGG is the key Cortex function here, and does the following:

  1. Reads multiple rows 
  2. Converts them into a single aggregated context 
  3. Sends that context to the LLM 
  4. Returns one synthesized summary

AI_AGG enables Snowflake to synthesize thousands of support interactions into a single executive narrative, dramatically reducing manual analysis effort while surfacing hidden customer pain points.

python
CREATE OR REPLACE TABLE AGGREGATED_SUPPORT_CASES_SUMMARY AS SELECT AI_AGG( /* One "document" per case */ CONCAT( '--- SUPPORT CASE ---\n', 'Case ID: ', CASE_ID, '\n', 'Customer ID: ', CUSTOMER_ID, '\n', 'Title: ', CASE_TITLE, '\n', 'Description: ', CASE_DESCRIPTION, '\n', 'Issue Category: ', ISSUE_CATEGORY, '\n', 'Intent: ', INTENT, '\n', 'Priority: ', PRIORITY, '\n', 'Needs Escalation: ', NEEDS_ESCALATION, '\n', 'Status: ', STATUS, '\n', 'Channel: ', CHANNEL, '\n', 'Response Time (min): ', RESPONSE_TIME_MIN, '\n', 'Number of Calls: ', NUMBER_OF_CALLS, '\n', 'Concerns Addressed: ', CONCERNS_ADDRESSED, '\n', 'Sentiment Label: ', SENTIMENT_LABEL, '\n', 'Sentiment Score: ', SENTIMENT_SCORE, '\n', 'Last Update Note: ', COALESCE(LAST_UPDATE_NOTE, 'N/A'), '\n' ), /* Instruction prompt for the aggregate */ 'You are analyzing a batch of customer support cases. Identify major themes and recurring customer pain points. In your executive summary, include: 1) Top issue categories and the most frequent intents. 2) Common root causes and where customers get stuck. 3) Escalation drivers: what patterns correlate with NEEDS_ESCALATION = true. 4) Channel patterns (chat/email/phone/web) and whether certain channels show higher negativity or slower response times. 5) Operational friction signals (high RESPONSE_TIME_MIN, high NUMBER_OF_CALLS, CONCERNS_ADDRESSED = false). 6) Suggested actions: product fixes, process improvements, and support playbooks. Write a comprehensive executive summary (500-800 words) with headings and bullet points for key findings.' ) AS summary FROM SUPPORT_WITH_SENTIMENT;

This query will give you an aggregated support case summary, like the one below:

Snowflake-Cortex-AI-aggregated-summary.png
Aggregated support case summary report generated from customer support data using prompt engineering and Snowflake Cortex AI_AGG.

The Output:

python
**Executive Summary: Customer Support Case Analysis** **Top Issue Categories and Most Frequent Intents** Our analysis reveals that the top issue categories are: * **Delivery** (34% of cases), with top intents being **track_order** (43%), **change_shipping_address** (21%), and **cancel_order** (15%) * **Billing** (26% of cases), with top intents being **get_refund** (35%), **refund_policy** (23%), and **payment_issue** (17%) * **Product Defect** (15% of cases), with top intents being **complaint** (53%), **get_refund** (26%), and **replacement** (15%) * **General Inquiry** (12% of cases), with top intents being **customer_service** (35%), **place_order** (23%), and **review** (15%) * **Account Access** (5% of cases), with top intents being **account_access** (60%) and **password_reset** (20%) **Common Root Causes and Where Customers Get Stuck** * **Delivery Issues**: Delays, incorrect shipping addresses, and lack of tracking information are common pain points. * **Payment Processing**: Failed payments, declined cards, and unclear payment methods cause frustration. * **Product Defects**: Damaged or defective products lead to complaints and requests for refunds or replacements. * **Lack of Transparency**: Unclear refund policies, unresponsive customer service, and lack of updates on order status cause customer dissatisfaction. **Channel Patterns** * **Phone**: 40% of cases, with an average response time of 25 minutes and a sentiment score of -0.35. * **Email**: 30% of cases, with an average response time of 20 minutes and a sentiment score of -0.25. * **Web**: 20% of cases, with an average response time of 10 minutes and a sentiment score of -0.15. * **Chat**: 10% of cases, with an average response time of 15 minutes and a sentiment score of -0.20. **Suggested Actions** * **Product Fixes**: * Improve quality control to reduce product defects. * Enhance product descriptions to reduce misunderstandings. * **Process Improvements**: * Streamline payment processing to reduce failed payments. * Implement a more efficient refund process. * Improve communication around order status and tracking information. * **Support Playbooks**: * Develop a comprehensive refund policy and communicate it clearly to customers. * Create a knowledge base for common issues and solutions. * Establish a clear escalation process for urgent cases.

Cortex Search: Semantic search for next best action

Cortex Search transforms support data from a passive log into an active intelligence layer, allowing teams to detect emerging product issues, prioritize high-risk cases, and accelerate customer resolution — all directly inside Snowflake.

Here's what our queries look like:

python
CREATE OR REPLACE VIEW CX_SUPPORT_SEARCH_V AS SELECT CASE_ID, CUSTOMER_ID, DATE_CREATED AS TIMESTAMP, CONCAT_WS(' ', CASE_TITLE, CASE_DESCRIPTION) AS searchable_text, CATEGORY, CHANNEL, INTENT, ISSUE_CATEGORY, LOCATION, QUERY_TYPE, PRODUCT_SKU, PRIORITY, STATUS, IS_HIGH_PRIORITY, NEEDS_ESCALATION, RESPONSE_TIME_MIN, PRICE, SENTIMENT_LABEL FROM ESTUARY_DB.ESTUARY_SCHEMA.SUPPORT_WITH_SENTIMENT; SELECT * FROM CX_SUPPORT_SEARCH_V; CREATE OR REPLACE TABLE CX_SUPPORT_SEARCH_T AS SELECT * FROM CX_SUPPORT_SEARCH_V;
python
CREATE OR REPLACE CORTEX SEARCH SERVICE CX_SUPPORT_SEARCH ON searchable_text ATTRIBUTES ( CASE_ID, CUSTOMER_ID, TIMESTAMP, CATEGORY, CHANNEL, INTENT, ISSUE_CATEGORY, LOCATION, QUERY_TYPE, PRODUCT_SKU, PRIORITY, STATUS, IS_HIGH_PRIORITY, NEEDS_ESCALATION, RESPONSE_TIME_MIN, PRICE, SENTIMENT_LABEL ) WAREHOUSE = ESTUARY_WH TARGET_LAG = '1 minute' AS SELECT * FROM CX_SUPPORT_SEARCH_T;

In high-volume support environments, product teams and support leads often need to quickly investigate emerging issues — for example, when customers begin reporting defective products. Traditional keyword searches are brittle and miss semantically similar complaints written in different ways.

Using Snowflake Cortex Search, support teams can perform semantic retrieval across unstructured support tickets to rapidly surface relevant cases, even when customers describe problems differently.

python
SELECT PARSE_JSON( SNOWFLAKE.CORTEX.SEARCH_PREVIEW( 'CX_SUPPORT_SEARCH', '{ "query": "customer reporting defective toy", "columns": ["searchable_text", "ISSUE_CATEGORY", "IS_HIGH_PRIORITY", "NEEDS_ESCALATION"], "limit": 10 }' ) )['results'] AS results;

In this example, the query searches for cases related to a “defective toy”. Cortex Search scans the indexed support text and returns the most semantically relevant tickets, along with key operational metadata such as issue category and escalation signals.

Snowflake-Cortex-Search-defective-toy-query.png
The above image shows how Snowflake Cortex Search is used to query indexed support text and return the most semantically relevant tickets for a “defective toy”, using cosine similarity and reranker scores.

Two terms to highlight here:

  • Cosine Similarity: Semantically cluster similar complaints even when customers use different wording. For example, queries about “defective toys” can surface cases. This enables faster detection of emerging product defects or service failures. 
  • Reranker Score: Support copilots use the reranker score to ensure the most contextually relevant past tickets appear first for agents. This improves answer quality when generating responses using Retrieval Augmented Generation (RAG). It reduces noise from loosely related matches that embedding search alone might retrieve.

Check out more use cases for Cortex Search if you're curious.

AI Resolution Classifier

Based on what we have learned so far, we can use SNOWFLAKE.CORTEX.COMPLETE to classify support cases as resolved, unresolved, or needs_followup, and then generate the next best action.

The following query uses Cortex LLM functions to recommend the next best action for support agents by combining customer sentiment, escalation risk, and case status into a single AI-driven triage decision.

python
CREATE OR REPLACE VIEW SUPPORT_AI_TRIAGE AS SELECT CASE_ID, CUSTOMER_ID, CASE_DESCRIPTION, STATUS, SENTIMENT_SCORE, SENTIMENT_LABEL, IS_HIGH_PRIORITY, NEEDS_ESCALATION, CONCERNS_ADDRESSED, SNOWFLAKE.CORTEX.COMPLETE( 'snowflake-arctic', CONCAT( 'Classify this support case into one of three labels: ', 'resolved, unresolved, needs_followup.\n\n', 'Case description:\n', CASE_DESCRIPTION, '\n\n', 'Status: ', STATUS, '\n', 'Concerns addressed: ', CONCERNS_ADDRESSED, '\n', 'Return ONLY the label.' ) ) AS ai_resolution_label FROM SUPPORT_WITH_SENTIMENT; CREATE OR REPLACE VIEW SUPPORT_AI_ACTIONS AS SELECT CASE_ID, CUSTOMER_ID, ai_resolution_label, SENTIMENT_LABEL, IS_HIGH_PRIORITY, NEEDS_ESCALATION, CONCERNS_ADDRESSED, CASE WHEN ai_resolution_label != 'resolved' THEN SNOWFLAKE.CORTEX.COMPLETE( 'snowflake-arctic', CONCAT( 'You are a senior customer support agent.\n\n', 'Customer issue:\n', CASE_DESCRIPTION, '\n\n', 'Risk signals:\n', '- Sentiment: ', SENTIMENT_LABEL, '\n', '- High priority: ', IS_HIGH_PRIORITY, '\n', '- Needs escalation: ', NEEDS_ESCALATION, '\n', '- Concerns addressed: ', CONCERNS_ADDRESSED, '\n\n', 'Write the next best action for the agent.\n', 'Rules:\n', '- Be professional and empathetic\n', '- Maximum 2 sentences\n', '- Recommend one of: refund, replace, escalate, or request info\n', '- Be specific and actionable\n', '- Do NOT repeat the customer text\n' ) ) END AS ai_next_step FROM SUPPORT_AI_TRIAGE;

That query will give you an output that looks something like this:

Snowflake-Cortex-AI-recommendations.png
Output from the support_ai_actions table showing Cortex AI recommendations for resolving customer cases based on sentiment, priority, and ticket status.

Conclusion

Customer support systems generate massive volumes of unstructured data every day, yet much of it remains underutilized due to challenges in real-time ingestion and analysis. With Estuary as a right-time data movement layer, teams can continuously stream support interactions into Snowflake with minimal latency. While webhooks provide a simple ingestion path, the same pattern extends to CRM systems, contact center platforms, and review pipelines. Once the data is centralized, Snowflake Cortex AI enables multiple intelligent workflows, including sentiment scoring, automated ticket summarization, semantic search for retrieval-augmented generation (RAG), root cause analysis, and resolution classification.

The next step is to operationalize these capabilities into real applications and decision workflows. Teams can build AI-powered copilots and self-service assistants using Streamlit on top of Snowflake, enabling support agents and customers to query data, retrieve insights, and generate responses using natural language. Additionally, Snowflake’s managed Model Context Protocol (MCP) server and Cortex Agents can power more advanced AI agents that automatically connect to enterprise data, retrieve relevant context using tools like Cortex Search, and generate reliable, context-aware answers without manual querying.

Ultimately, this architecture shifts customer support from a reactive process to a proactive, AI-driven system, where insights are generated in real time and directly embedded into workflows that drive faster resolution, better decision-making, and improved customer experience.

FAQs

    What does right-time data mean, and how does it apply to Snowflake?

    Right-time data means treating latency as a per-pipeline decision rather than a single organizational setting. Snowflake supports ingestion methods across the full latency spectrum, from sub-second Snowpipe Streaming to scheduled COPY INTO batch loads. Right-time is choosing the appropriate tier for each pipeline, not defaulting to the fastest or simplest option across the board.

Start streaming your data for free

Build a Pipeline

About the author

Picture of Ruhee Shrestha
Ruhee Shrestha Technical Writer

Ruhee has a background in Computer Science and Economics and has worked as a Data Engineer for SaaS providing tech startups, where she has automated ETL processes using cutting-edge technologies and migrated data infrastructures to the cloud with AWS/Azure services. She is currently pursuing a Master’s in Business Analytics with a focus on Operations and AI at Worcester Polytechnic Institute.

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.