Estuary

Real-Time Flight Traffic Analytics With Estuary and StarTree

Learn how to stream real-time flight data with Estuary Flow, Kafka API, and StarTree. Capture & analyze air traffic data efficiently.

Blog post hero image
Share this article

When working with real-time streaming data coming from various sources that are unstructured, whether it is a sensor or IoT data from devices or social media feeds, that needs to be quickly accessible for data warehousing for end users in a tabular form, it can often be time-consuming to capture such events. Traditionally, tools like Kafka have been used to stream and consume real-time data; however, setting up a Kafka cluster and managing its dependencies, such as ZooKeeper and Debezium, can be quite challenging. 

Estuary Flow simplifies this process through its Kafka API compatibility layer, Kafka Dekaf, which streamlines Kafka processing. In this blog, we explore a Change Data Capture process from MongoDB using Estuary, where we capture real-time air traffic data from the OpenSky API and stream it to Startree, a real-time streaming platform—via a Kafka-compatible endpoint. For real-time analytics on change data, StarTree serves as an ideal data sink. This data can then be queried to monitor current air traffic conditions and detect anomalies, such as flight delays, diversions, or unexpected altitude changes, helping air traffic controllers and aviation analysts make data-driven decisions in real time.

TL;DR – Quick Highlights

  • Stream real-time flight data from OpenSky API into MongoDB, Estuary Flow, and StarTree (Apache Pinot) for real-time flight analytics.
  • No Kafka cluster required – Use Estuary Flow’s Kafka API (Dekaf) for seamless real-time data streaming.
  • Monitor air traffic instantly – Track flight departures, aircraft positions, and velocity changes with sub-second queries.
  • Step-by-step guide – Learn how to set up MongoDB CDC, Estuary Flow, and StarTree for live flight insights.

Why Use Change Data Capture (CDC) for Real-Time MongoDB Flight Data Streaming?

Change Data Capture (CDC) is a powerful pattern that captures data changes and propagates them in real-time or near real-time between various systems. Using MongoDB as the source, changes are captured through its operations log (oplog) and streamed to destinations like Kafka and StarTree using Estuary’s connector.

Estuary Flow's CDC functionality integrates seamlessly with MongoDB. It captures and tracks changes in your MongoDB data in real time and ensures that your downstream systems are always updated with the most recent changes. This approach is particularly useful for real-time flight analytics.

Overview of StarTree

StarTree Cloud is a real-time analytics platform for user-facing applications powered by Apache Pinot. It is a terrific tool to ingest millions of events per second for fast results on massively concurrent queries against petabytes — without performance bottlenecks. It supports various real-time and batch data sources, from AWS Kinesis and Kafka to batch data sources stored in AWS S3, Big Query, and Delta Lake. It can detect anomalies in real time, monitor your most important business metrics, and perform root cause analysis.

In this blog, we will be using Startree Data Manager, which is a highly scalable and elastic batch ingestion framework that supports low code workflow-based ingestion, and we will be using a Kafka-based connector supported by Estuary Flow’s Kafka Dekaf endpoint. By integrating Estuary with StarTree, developers can ingest, process, and analyze high-velocity data streams without the operational burden of managing Kafka.

Why Integrate Estuary Flow with StarTree for Real-Time Flight Data Analysis?

  • Easy integration – Capture data via Estuary from hundreds of real-time sources and analyze it in StarTree without standing up Kafka or any other piece of infrastructure.
  • Lightning-fast queries – Sub-second response times on live event streams, no batch delays. And minimize delays with optimized data processing structures.
  • Enterprise-scale throughput – Handle millions of events per second with a fully managed solution.

The Architecture: 

Architecture Diagram

After seeing this diagram, you might be asking:

Architecture Alternatives

From the OpenSky API, we ingest flight departure information and track each aircraft’s state, storing it in MongoDB. We then continuously capture changes using Estuary Flow and stream the data via the Estuary Dekaf Connection into StarTree, where it can be queried in real time.

Instead of dual writes, we use a series-based approach to ensure reliability, consistency, and flexibility.

  • Why MongoDB first? It acts as an operational data store, structuring raw flight data before downstream processing. It also provides a queryable source of truth and prevents inconsistencies that can arise from parallel writes.
  • Why Estuary Flow? It enables real-time Change Data Capture (CDC) from MongoDB to StarTree without requiring a Kafka cluster. Flow ensures exactly-once processing, preventing duplicates and maintaining order.
  • Why not write directly to StarTree? StarTree (Apache Pinot) excels at real-time analytics but isn’t designed as an operational data store. Using MongoDB first allows better data structuring and reprocessing flexibility.

Why not dual writes? Writing to both MongoDB and Pinot in parallel could introduce inconsistencies, race conditions, and added operational overhead. This architecture ensures reliable ingestion, historical storage, and real-time analytics, without managing Kafka or complex streaming infrastructure.

Building a Real-Time Flight Data Pipeline: Steps & Setup

Prerequisites:

  • OpenSky API: We ingest two types of data:
    • Flight departures (/api/flights/departure) to analyze airport activity and optimize scheduling.
    • Aircraft state vectors (/api/states/all) to monitor real-time air traffic conditions.
  • MongoDB:MongoDB Atlas instance is required to store two collections within the flights_data database—one for flight departures and another for live aircraft state updates.
  • Estuary Flow: Captures Change Data Capture (CDC) from MongoDB and streams the data in real-time. Additionally, we need to configure the Estuary Dekaf Connection in Estuary Flow.
  • StarTree: Ingests the real-time data and enables analytical queries for monitoring and insights.

Step 1: Ingesting OpenSkyAPI Data into MongoDB

To analyze real-time air traffic, we first need a MongoDB server using the MongoDB Atlas instance, choosing the free tier, with AWS as the cloud provider.  After successfully creating a cluster, go to Databases -> Clusters and click on Connect, which will allow you to choose an option to Connect with MongoDB for VS Code and save the connection string for later on your Python Script. 

Connection Details to MongoDB
 

Next, you can write a Python script on your chosen IDE, like VS Code, and import libraries such as request for requesting API data, pymongo to interact with MongoDB Client connection.  I ingest two endpoints, one for aircraft state data and another for tracking flight departure.

The /api/states/all endpoint returns a snapshot of all currently airborne aircraft.  

This API call retrieves real-time state vectors for aircraft. In simple terms, it returns the current status of all aircraft, including details such as their position (latitude, longitude), altitude, velocity, callsign, and other flight parameters. This endpoint is ideal for applications that require real-time situational awareness, such as air traffic control, flight tracking systems, aviation research, or any operational intelligence tool. It enables users to monitor live aircraft movements and status updates, helping in decision-making, safety analysis, and performance monitoring.

python
import requests import pymongo import json import pandas as pd from datetime import datetime, timedelta from pymongo import MongoClient # OpenSky API URL state_of_aircraft_url = "https://opensky-network.org/api/states/all" # MongoDB Connection db_password = 'YOURPASSWORD' client = MongoClient(f"mongodb+srv://ruhee-shrestha:{db_password}@ruhee-cluster.buwp4.mongodb.net/") db = client["flight_data"] collection = db["realtime_flights"] """Fetch real-time flight data, store in MongoDB, and save to CSV.""" def get_state_vectors():     response = requests.get(state_of_aircraft_url) if response.status_code == 200: data = response.json() if "states" in data and data["states"]: # Clear existing data in MongoDB collection.delete_many({}) flights = [] for state in data["states"]: flight_data = { "icao24": str(state[0]), "callsign": state[1].strip() if state[1] else None, "origin_country": str(state[2]), "last_contact": int(state[4]) if state[4] else None"longitude": float(state[5]) if state[5] else None, "latitude": float(state[6]) if state[6] else None, "altitude": int(state[7]) if state[7] else None, "velocity": float(state[9]) if state[9] else None, "heading": float(state[10]) if state[10] else None, "on_ground": bool(state[8]),  "spi": bool(state[15]),  "position_source": int(state[16]) if state[16] else None, "timestamp": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ") } flights.append(flight_data) # Convert to DataFrame df = pd.DataFrame(flights) # Drop columns where all values are null df.dropna(axis=1, how="all", inplace=True) # Convert cleaned data back to list of dictionaries cleaned_flights = df.to_dict(orient="records") # Insert into MongoDB only if valid data exists if cleaned_flights: collection.insert_many(cleaned_flights) # Save cleaned DataFrame as CSV df.to_csv(csv_filename, index=False) print(f"Saved {len(cleaned_flights)} valid records to {csv_filename}") else: print("No valid flight data found after dropping null columns.") else: print(f"Failed to fetch data. HTTP Status Code: {response.status_code}") get_state_vectors()

Upon calling this function, it creates a DB called flight_data and a collection called realtime_flights, which ingests the flight state vectors data. Here is the following output in MongoDB:

realtime_flights collection in mongo
 

Another endpoint I looked at from OpenSky API was /api/flights/departure endpoint, where you can analyze the number and timing of departures from an airport. By examining these trends, you can optimize gate scheduling, assess delays, or coordinate with ground transit systems to improve overall efficiency. 

When calling this API endpoint, it requires two parameters: the airport code (ICAO identifier for the airport) to track the departure of a certain airport’s flights and the time range to retrieve the flights. I looked at the flights in the last 24 hours. I ingested the data into a new MongoDB collection called “flights_departure” under the same database flights_data.

python
collection_departures = db["flight_departures"] def get_departure_flights(airport, hours=24): """ Retrieves departure flights for a given airport over the past 'hours' hours. Returns a list of flight records. """ departure_url = "https://opensky-network.org/api/flights/departure" now = datetime.utcnow()  # Use UTC for consistency begin_time = int((now - timedelta(hours=hours)).timestamp()) end_time = int(now.timestamp()) params = { "airport": airport, "begin": begin_time, "end": end_time } try: response = requests.get(departure_url, params=params) response.raise_for_status() data = response.json() if data: flights = [] for flight in data: flight_data = { "icao24": flight.get("icao24"), "firstSeen": flight.get("firstSeen"), "lastSeen": flight.get("lastSeen"), "estDepartureAirport": flight.get("estDepartureAirport"if flight.get("estDepartureAirport"else None, "estArrivalAirport": flight.get("estArrivalAirport"), "callsign": flight.get("callsign").strip() if flight.get("callsign"else None, "departureAirportCandidatesCount": flight.get("departureAirportCandidatesCount"), "arrivalAirportCandidatesCount": flight.get("arrivalAirportCandidatesCount"), "timestamp": datetime.now() } flights.append(flight_data) print(f"Fetched {len(flights)} flights for airport {airport}.") return flights else: print(f"No recent departures found for {airport}.") return [] except requests.exceptions.RequestException as e: print(f"Error fetching departures for airport {airport}: {e}") return [] # Mapping of countries to representative airport ICAO codes airport_codes_by_country = { "Germany": ["EDDF""EDDM""EDDH"], "United States": ["KJFK""KLAX""KORD"], "France": ["LFPG""LFPO""LFPB"], "Spain": ["LEMD""LEBL""LEMG"], "Thailand": ["VTBS""VTBD"], "Australia": ["YSSY""YMML"], "Argentina": ["SAEZ"] } def get_departures_for_countries(hours=24): """ Iterates over the airport mapping, retrieves departures for each airport over the past 'hours' hours, and aggregates all flight records into a single list. Returns: A tuple (all_flights, departures_by_country) where: - all_flights: List of all flight departure records. - departures_by_country: Dictionary with country names as keys and lists of flights as values. """ all_flights = [] departures_by_country = {} for country, airports in airport_codes_by_country.items(): departures_by_country[country] = [] for airport in airports: print(f"Fetching departures for {airport} in {country}...") flights = get_departure_flights(airport, hours) departures_by_country[country].extend(flights) all_flights.extend(flights) return all_flights, departures_by_country  # Aggregate departure flights for the last 24 hours from all airports all_flights, departures_by_country = get_departures_for_countries(hours=24) if all_flights: # Clear existing data once before inserting aggregated data collection_departures.delete_many({}) # Insert aggregated flights into MongoDB collection_departures.insert_many(all_flights) print(f"Inserted {len(all_flights)} valid flight departure records into MongoDB.") else: print("No flight departure data available for the specified time window.")

This script provides a foundation for retrieving and analyzing 24-hour departure data from multiple countries, which can be useful for air traffic control, public transit optimization, and other operational insights.

Here is the following MongoDB collection created that ingests the flight departures data.

Departures collection in Mongo

Step 2: Setting up MongoDB Capture in Estuary Flow

Now that MongoDB has data collections populated, we can create a CDC pipeline from MongoDB using Estuary Flow. This will capture all the changes as and when they happen on MongoDB, like inserts, updates, and deletes on the collection.

Prerequisites:

  • Credentials for connecting to your MongoDB instance and database.
  • Read access to your MongoDB database(s); see Role-Based Access Control for more information.
  • If you are using MongoDB Atlas or your MongoDB provider requires allowlisting of IPs, you need to allowlist the Estuary IP addresses.

Navigate to Estuary and click on Sources from the left navigation menu. Click on the New Capture button at the top. 

Search for “MongoDB” and click on the Capture button on the MongoDB tile.

Searching for the Mongo connector

On the Create Capture page, put an appropriate name in the Name field, such as flight_data. 

In the Address field, put an appropriate address where the MongoDB server is hosted. Use the mongodb+srv://<cluster-host> notation for the Address.

Configure Capture

You should now see a listing of the collections ingested from MongoDB regarding the aircraft state and departure flights. Next, click on Save and Publish at the top of the page. Once the publish is successful, you should be able to see the newly created source on the Sources page. 

Collection binding configuration

You can also navigate to Collections from the left navigation menu and notice the new collection corresponding to the flight_departures and realtime_flights capture.

collection list in estuary

You can look at the collection details here and also see that each time there are changes in MongoDB, Estuary Flow automatically captures the changes. 

Collection Details and preview

Step 3: Creating StarTree Materialization using Dekaf: Estuary’s Kafka API Compatibility Layer

Dekaf is Estuary Flow's Kafka API compatibility layer, allowing consumers to read data from Estuary Flow collections as if they were Kafka topics. Additionally, Dekaf provides a schema registry API for managing schemas. In this tutorial, we will use Estuary Dekaf configuration as a source Kafka connector to connect to StarTree. 

There is no extra configuration required to enable Dekaf; all you need to do is create a new Materialization.

Head over to the Destinations page and search for the “StarTree” connector.

Search for the StarTree connector

Configure the connector with a name and assign an auth token to it (make note of this token as it will be used on the StarTree side to authenticate toward Estuary)

Configure StarTree connector

As a final step, bind the collections from the MongoDB capture connector.

Bind collections to StarTree materialization

When you’re done, press the save and publish button and provision the connector.

This will allow incoming connections from Kafka consumers to ingest data from the selected collections as if they were Kafka topics.

You can now head over to the StarTree Cloud console and configure the other side of the data flow!

Step 4: Streaming Real-Time Flight Data from Estuary Flow to StarTree

Use the StarTree Data Manager to easily connect and ingest data from sources like Kafka, Kinesis, or other pipelines. Configure your ingestion, and your data will be ready to analyze key metrics and monitor your data.

In StarTree, click on Go to Data Manager, then click on Create a Dataset, choose Kafka as your Connection Type to bring your data from, and click on Create Connection and enter the following connection details:

Configure Kafka connection in StarTree
  • Bootstrap Servers: dekaf.estuary-data.com:9092
  • Security Protocol: SASL_SSL
  • SASL Mechanism: PLAIN
  • SASL Username: The name of the materialization task (connector) in Estuary; in my case, it was ruhee/flights/startree
  • SASL Password: The Auth Token you configured in the connector
  • Configure Schema Registry: To decode Avro messages, enable schema registry settings:
    • Schema Registry URL: https://dekaf.estuary-data.com
    • Schema Registry Username: same as SASL Username
    • Schema Registry Password: The same Auth Token as above

Next, after successfully creating a connection, choose the same connection called EstuaryStreams, create a dataset, and set the Data format as AVRO:

Configure input format in StarTree

StarTree will retrieve available topics from Dekaf. Select the topic corresponding to your Estuary capture (e.g., flight_departure or flights). 

Next, define the schema and transformations and map incoming fields to your StarTree schema. 

Schema mapping

Add any necessary transformations or calculated fields, I change the timestamp to the appropriate date time format. 

Transformations for types

Review and adjust field data types as needed, and StarTree will begin ingesting data from Estuary.

In “Data Manager,” check the status of your data source. Ensure the ingestion status is “Healthy” or “Active”.

Verify ingestion in data manager

Performing Real-Time Aviation Analytics in StarTree (Apache Pinot)

We can now query a table called flights, which consists of aircraft status information such as the velocity it is in, whether it is on the ground, its origin country, its altitude, and its current location based on latitude and longitude. 

image9.png

The following query groups aircraft by their reported country and counts how many records exist per country.

image20.png

We can retrieve the top five aircraft with the highest ground speeds (velocity). By ordering on velocity, this query shows which aircraft are moving fastest, which can be useful for identifying high-speed jets or potential anomalies.

image19.png

The below query groups aircraft by their true track, rounded into 10‑degree bins, to understand the distribution of flight directions. By categorizing headings into bins (e.g., 0–9°, 10–19°, etc.), you can see which flight directions are most common—information that can support air traffic flow analysis.

image5.png

The second table that we can query is the flight_departures, which consists of the total departures per airport in different countries and where they were first and last seen.

image4.png
image22.png

This query ranks airports by the total number of departures in your dataset. This helps identify which airports have the highest departure volumes, supporting capacity planning and resource allocation.

image23.png

This query provides insight into airline activity, revealing which airlines have the highest number of departures based on the callsign prefixes.

Combining the two tables, we can combine historical or scheduled departure information with real-time state data for a more comprehensive view of flight operations. Joining the departure records (which include details such as departure times, airports, and callsigns) with the current state vectors (providing position, altitude, velocity, etc.) using the common key icao24 field, gives you a complete picture of a flight's journey—from departure to its current state.

By correlating departure data with real-time state data, you can monitor if flights are deviating from expected profiles. For example, you can track whether flights are maintaining proper cruising altitudes or if there are unexpected changes in velocity or heading that might indicate safety concerns.

image14.png

This query joins the flight_departures table (d) with the flights table (s) on the icao24 field to retrieve relevant flight information for aircraft originating from the United States. This query helps identify and track aircraft that departed from a U.S. airport (estDepartureAirport) and are still airborne (based on real-time state vector data). We can monitor real-time flight status post departure, such as the aircraft’s last location (longitude, latitude, velocity, and direction), to ensure the departing flights have smooth operations. This could be useful for air traffic controllers to cross-check departure records with real-time tracking data. Analysts can identify potential delays or anomalies by comparing firstSeen (departure timestamp) with last_contact (latest flight update).

By leveraging this integration for flight data, airlines, air traffic controllers, and aviation analysts can:

  • Monitor Live Air Traffic: Track aircraft positions, speeds, and altitudes in real-time
  • Optimize Airport Operations: Analyze departure and arrival patterns to reduce congestion and delays.
  • Enhance Passenger Experience: Provide real-time flight status updates and predictive arrival times.
  • Improve Safety & Compliance: Detect anomalies such as unexpected altitude changes or deviations from flight paths.

Conclusion

By integrating real-time API data, such as flight traffic data, from MongoDB into StarTree Cloud using Estuary Flow’s Dekaf endpoint, we can perform real-time flight analytics with high efficiency. Estuary’s ability to automatically execute Change Data Capture (CDC) on MongoDB and seamlessly connect it to StarTree through its no-code Kafka-compatible Dekaf solution simplifies real-time pipeline management for developers. Powered by Apache Pinot, StarTree enables sub-second query responses, allowing organizations to build scalable, user-facing analytics applications with ease. This integration not only reduces operational complexity and costs but also accelerates the ability to derive actionable insights in real-time.

Next Steps

Now that you’ve explored how Estuary Flow and StarTree simplify real-time flight analytics, it’s time to put this into action.

  1. Set up your pipeline – Connect OpenSky API, MongoDB, Estuary Flow, and StarTree to start streaming real-time air traffic data.
  2. Analyze in real-time – Run sub-second queries to monitor flight departures, aircraft velocity, and anomalies without Kafka overhead.
  3. Optimize operations – Use real-time insights to enhance air traffic monitoring, airport scheduling, and airline performance.

Get started today with Estuary Flow and StarTree to transform your flight data into actionable insights! Need help? Contact us

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Start Building For Free

About the author

Picture of Ruhee Shrestha
Ruhee Shrestha Technical Writer

Ruhee has a background in Computer Science and Economics and has worked as a Data Engineer for SaaS providing tech startups, where she has automated ETL processes using cutting-edge technologies and migrated data infrastructures to the cloud with AWS/Azure services. She is currently pursuing a Master’s in Business Analytics with a focus on Operations and AI at Worcester Polytechnic Institute.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.