Estuary

How to Stream MongoDB to Kafka: 3 Best Methods Explained

Learn the three best methods to stream MongoDB to Kafka using Estuary Flow, Debezium, and the MongoDB Kafka Connector. Compare ease of setup, scalability, and real-time integration.

Blog post hero image
Share this article

Choosing a suitable data storage system enables you to effectively store and manage data for different business operations within your organization. MongoDB is popular among the available options due to its document-based data model, flexible schema, and robust data querying capabilities. However, its high memory usage and document size limitations lead to latency issues while processing high-volume data. Such drawbacks can slow service delivery in the current fast-paced business ecosystem, resulting in revenue loss.

To overcome these challenges, you can use real-time data streaming tools. They assist in processing data effectively to gain instant insights and automate workflows. Apache Kafka, a high-performance event-streaming platform, is one such solution. By opting for the data streaming process, you can utilize your enterprise data effectively for diverse use cases like fraud detection and website traffic monitoring. 

Let’s examine three effective methods for streaming data from MongoDB to Kafka that facilitate reliable data integration and workflow optimization.

Understanding the Integration Landscape of MongoDB to Kafka

Data integration is critical to ensuring operational efficiency and informed decision-making in data-driven businesses. Multiple data integration strategies exist, including batch processing, real-time integration, Change Data Capture (CDC), and API-based integration. 

Among these, CDC is a real-time synchronization technique that syncs your source and target systems. Let’s have a brief look at this mechanism. 

Change Data Capture(CDC): How It Enables Real-time Streaming

CDC is a technique for detecting changes made to a source data system and replicating them in the destination in real-time. It ensures the continuous transfer of changes from the source to the target data system. Instead of transferring entire datasets, CDC tracks only modified data records in sources and instantly replicates those changes to the destination. This approach helps reduce data redundancy and optimize performance.

Depending on your organizational strategy, you can choose from different types of CDC mechanisms:

  • Log-based CDC: In this method, you can capture changes made to data sources by reading transaction logs. The main advantage of this process is that it has minimal impact on source systems since it does not require additional queries.
  • Query-based CDC: During query-based CDC, you can execute queries on source tables to track newly added, updated, or deleted data records. While monitoring changes using query-based CDC, you require specific columns such as timestamp or ID to query data. This makes it resource-intensive and can impact system performance.
  • Trigger-based CDC: In trigger-based CDC, you can set database triggers, which get activated when you introduce any changes in the database. You can record these changes in a separate table known as the change table. However, if you set up many triggers, it can result in high overhead.

Top Tools for Streaming MongoDB to Kafka

Several tools offer data streaming services to support real-time data integration. Let’s have a look at some solutions:

Estuary Flow

Estuary Flow Architecture

Estuary Flow is a robust data integration platform with streaming and batch-processing capabilities. It offers an extensive library of 200+ pre-built connectors, enabling you to collect data from various sources, including real-time streams, files, databases, and cloud services. You can then transform and load this data to a destination of your choice. Such effective data pipelines are helpful for diverse use cases, such as data analytics and creating AI-based applications.

Leveraging Estuary’s log-based CDC feature can help you build reliable data pipelines. Its end-to-end latency range is less than 100 milliseconds, enabling real-time data streaming.

MongoDB Kafka Connector

MongoDB Kafka Connector

Image Source

The MongoDB Connector for Apache Kafka is a Confluent-verified connector. Confluent is a cloud-based platform built on Apache Kafka, which allows you to integrate diverse data sources with Kafka to develop scalable streaming pipelines.

In Kafka, topics are storage units for data streams. Using the MongoDB Kafka source connector, you can stream changes from MongoDB to Kafka topics. Alternatively, you can configure the MongoDB Kafka sink connector to transfer changes from Kafka topics back to MongoDB.

Debezium

Debezium logo

Image Source

Debezium is an open-source CDC solution built on Apache Kafka. Using Debezium connectors, you can track row-level changes in your source database and stream these changes to Kafka topics. An essential feature of Debezium is its fault tolerance; in the event of a system failure, it ensures data capture resumes from the last recorded change. This helps prevent data loss.

To record changes from MongoDB to Kafka topics, you can use the Debezium MongoDB source connector. It allows you to monitor changes in the MongoDB replica set or sharded cluster. With this, you gain reliable data synchronization across distributed systems.

Method 1: Streaming MongoDB Data With Estuary Flow

Estuary Flow is an effective data streaming solution with advanced data pipeline development features. The platform facilitates data movement between multiple sources and destinations through its many-to-many functionality.

Let’s look into some additional reasons why Estuary Flow is a suitable data streaming solution.

Why Choose Estuary Flow?

  • Robust Data Integration: Estuary Flow allows you to build ETL and ELT data integration pipelines. The platform provides numerous batch and streaming connectors to facilitate data retrieval and transfer between various systems. During ETL integration, you can transform data via SQL or Typescript; you can use the dbt transformations for ELT.
  • Kafka-API Compatibility: Estuary offers DeKaf, a Kafka API-compatibility layer. You can use it to connect and read data from Estuary Flow collections as if they are Kafka topics. Collections in Estuary Flow are the datasets where data is captured from source systems. Dekaf also provides schema registry API to manage schemas.
  • Multiple Deployment Options: The three deployment plans supported by Estuary Flow include Public, Private, and Bring Your Own Cloud (BYOC) options. The Public deployment mode is fully managed and requires minimal configuration; you can implement the Private plan with your infrastructural setup. Lastly, the BYOC option allows you to deploy Estuary using a cloud service provider of your choice, offering more flexibility and customization features.

Here are the steps to build a MongoDB Kafka CDC streaming pipeline using Estuary Flow. Before starting, you should ensure to fulfill some requirements such as:

Prerequisites

  • An Estuary account
  • Credentials to connect to MongoDB instance and database
  • Kafka cluster with configured bootstrap.servers, authentication mechanism, and connection security enabled with TLS.

Step 1: Configure the MongoDB Source Connector

  • Sign in to your Estuary account.
  • Select the Sources tab from the left navigation pane on the main dashboard.
  • Click the + NEW CAPTURE button on the Sources page and search for MongoDB connector in the Search connectors field.
MongoDB Search Connector Page
  • When you see the connector in the search results, click its Capture button.
MongoDB Create Capture page
  • You will be directed to the Create Capture page. Here, you need to enter the necessary details, such as:
    • Name: Enter the name of your capture.
    • Address: Provide the connection URI for your MongoDB database.
    • User: Enter the username of your MongoDB database.
    • Password: Specify the password for the specified user.
  • Click NEXT and then SAVE AND PUBLISH.

These steps complete the MongoDB source connector configuration. The connector helps capture and convert data from MongoDB collections into Flow collections.

Step 2: Configure the Kafka Destination Connector

  • To configure Kafka as the destination end of the pipeline, click the MATERIALIZE COLLECTIONS button on the pop-up that appears after a successful capture. Alternatively, select the Destinations tab from the dashboard's left pane. You will be directed to the Destinations page, where you must click + NEW MATERIALIZATION.
Search For Kafka Connector
  • Enter Apache Kafka in the Search connectors field. Click the connector’s Materialization button to proceed.
Kafka Materialization config page
  • On the Create Materialization page, enter the following mandatory details:
    • Name: Give a unique name for your materialization.
    • Bootstrap Servers: The initial server in the Kafka cluster to which you can connect.
    • Auth Type: Enter the type of authentication method that you want to use.
    • SASL Mechanism: Choose the SASL mechanism to describe how to exchange and authenticate clients/servers.
    • Username: Provide the username for the chosen authentication mechanism.
    • Password: Enter the password for the specified authentication method.
  • The MongoDB collection added to your capture will likely be automatically added to your materialization. If not, click the SOURCE FROM CAPTURE button in the Source Collections section to manually link the capture to your materialization.
  • Finally, click NEW > SAVE AND PUBLISH.

The connector materializes Flow collections of your MongoDB data into Kafka topics. This concludes the process of streaming MongoDB data to Kafka using Estuary Flow.

Start streaming MongoDB to Kafka in real-time with Estuary Flow—Get Started for Free, or Contact Us for expert support!

Method 2: Using the MongoDB Kafka Connector

Using the MongoDB to Kafka Connector in this method, you can utilize the CDC handler to stream data. CDC handler is an application that allows you to capture updated events and translate them to MongoDB write operations. You can then publish these operations to Kafka topics.

Here are the steps for this process:

Step 1: Set Up Confluent Kafka Connect and MongoDB Environment

First, the development environment with Docker should be set up to run the MongoDB Connector for Apache Kafka.   

Ensure you have a Docker account, a GitHub account, a terminal app, and a shell. To start the setup, clone the git repository using the following command:

plaintext
git clone https://github.com/mongodb-university/kafka-edu.git

Depending on your OS, refer to the ‘mongodb-kafka-base’ tutorial directory in the repository. Then, start the docker image using the following code:

plaintext
docker-compose -p mongo-kafka up -d --force-recreate

This creates a docker container with all the services you need to stream data. You can then verify if you have configured the development environment successfully using the below command:

plaintext
docker exec mongo1 status

Step 2: Start Interactive Shell

Open two interactive shells, CDCShell1 and CDCShell2, within the Docker container in separate terminal windows using the following command:

plaintext
docker exec -it mongo1 /bin/bash

Step 3: Configure the Source Connector

In CDCShell1, you need to create a configuration file, cdc-source.json, with the following code:

plaintext
nano cdc-source.json

Enter the given code in the configuration file:

javascript
{  "name": "mongo-cdc-source",  "config": {    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",    "connection.uri": "mongodb://mongo1",    "database": "CDCTutorial",    "collection": "Source"  } }

Here:

  • CDCTutorial is the MongoDB database name from which you read and publish the data streams on the Kafka topic. You can replace this term with your own MongoDB database name.
  • Source is the MongoDB collection from which you can track changes. You can enter your MongoDB collection name here.

Then, run the following command to start the MongoDB Kafka source connector:

javascript
cx cdc-source.json

The cx command is a custom script and enables you to send the following request to Kafka Connect REST API to develop a new connector:

javascript
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

To check the status of the connector, use:

javascript
status

On successful streaming of changed data records, you will see the following output:

javascript
Kafka topics: ... The status of the connectors: source  |  mongo-cdc-source  |  RUNNING  |  RUNNING  |  com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ...

Step 4: Monitor the Kafka Topic

You can execute the below command to monitor the data stream published to the Kafka topic (using kcat):

javascript
kcat -b localhost:9092 -t CDCTutorial.Source

Step 5: Insert Data into MongoDB and Watch the Data Stream

In CDCShell2, you can connect to MongoDB through mongosh, the MongoDB shell as shown here:

javascript
mongosh "mongodb://mongo1"

You will see the following prompt:

javascript
rs0 [direct: primary] test>

Here, type the following command to insert a new document into your MongoDB namespace:

javascript
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" });

After completion, you will get the acknowledgment message as follows:

javascript
{  acknowledged: true,  insertedId: ObjectId("600b38ad...") }

You should see the following JSON output in CDCShell1:

javascript
{  "schema": { "type": "string", "optional": false },  "payload": {    "_id": { "_data": "8260..." },    "operationType": "insert",    "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },    "wallTime": { "$date": "..." },    "fullDocument": {      "_id": { "$oid": "600b38ad..." },      "proclaim": "Hello World!"    },    "ns": { "db": "CDCTutorial", "coll": "Source" },    "documentKey": { "_id": { "$oid": "600b38a..." } }  } }

You can then repeat this step to incrementally update both MongoDB and Kafka topics, ensuring consistent data integration. You can complete the data streaming process using MongoDB to Kafka source connector with these steps.

Limitations  

  • Complexity: Setting up the MongoDB Kafka connector requires technical expertise. To achieve this, you must first understand connector configurations, Kafka topics, and CDC handlers, which can be time-consuming.
  • Limited Scalability: While the MongoDB Kafka source connector can efficiently stream large data volumes, scaling it needs proper configuration. The copy.existing feature helps migrate initial datasets but doesn’t support CDC. Even if you fine-tune the connector to accommodate increased data volume, doing so slows the entire data migration process.

Method 3: Streaming With Debezium

You can use the Debezium MongoDB connector to stream data from MongoDB to Kafka topics. To capture changes, the Debezium MongoDB connector leverages oplog, MongoDB’s capped collection. With Oplog, you can track and capture MongoDB write operations in the correct order.

Steps to Stream Data from MongoDB to Kafka Using Debezium

Here are the steps for streaming data from MongoDB to Kafka using Debezium:

Step 1: Install the MongoDB Connector

Install the latest version of the Debezium MongoDB connector on the machine running Kafka Connect. The command for it is as follows:

javascript
confluent connect plugin install debezium/debezium-connector-mongodb:latest

For specific versions, say 2.3, you can use the below command:

javascript
confluent connect plugin install debezium/debezium-connector-mongodb:2.3

Step 2: Configure a Replication Mechanism in MongoDB

To enable Debezium CDC, MongoDB must be running a replica set. You can verify or initiate a replica set with the following command:

javascript
docker run -it --name mongo-init --rm -e REPLICASET=rs0 --link data1:mongo1 --link data2:mongo2 --link data3:mongo3 debezium/mongo-initiator

Here:

  • REPLICASET is the name of the replica set (eg.rs0).
  • data1, data2, and data3 are MongoDB instances. You can replace them with your own instances.

This helps ensure that Debezium can capture changes from MongoDB Oplog.

You can then initiate a MongoDB sharded cluster. It consists of a separate replica set, which you can use as the cluster’s configuration server. The sharded cluster also contains routers that direct your queries to shards to establish a connection with the MongoDB cluster.

To configure a sharded cluster, execute the below command:

javascript
docker run -it --name mongo-init --rm -e REPLICASET=shardA --link shardA1:mongo1 --link shardA2:mongo2 --link shardA3:mongo3 --link router1 --link router2 debezium/mongo-initiator

Where shardA1, shardA2, and shardA3 are replica sets that form a shard in a sharded cluster.

Step 3: Create the Connector Configuration File

Next, you can create a JSON configuration file (register-mongodb.json) to store the connector settings. It may contain the following contents:

javascript
{ "name": "inventory-connector", "config": {     "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",     "tasks.max" : "1",     "mongodb.hosts" : "debezium/localhost:27017",     "mongodb.name" : "dbserver1",     "mongodb.user" : "debezium",     "mongodb.password" : "dbz",     } }

Here:

  • connector.class specifies the Debezium MongoDB connector.
  • tasks.max defines the number of worker tasks.
  • mongodb. name is the name of Kafka topics where you will stream MongoDB data.
  • mongodb.user and mongodb.password are MongoDB credentials. You can replace them with your own credentials.

Now, register the connector in Kafka Connect using this code:

javascript
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json

After this, your configuration file (register-mongodb.json) gets uploaded to Kafka Connect, enabling the MongoDB connector.

Step 4: Start the Kafka Consumer

Start a Kafka consumer in a new terminal session to receive the streamed data from MongoDB. The command for it is as follows:

javascript
confluent local services kafka consume dbserver1.inventory.customers --from-beginning

Here, dbserver1.inventory.customers is the Kafka topic where you can stream your MongoDB change events.

Step 5: Verify by Inserting Data in MongoDB

To ensure data stream transfer from MongoDB to Kafka, start MongoDB bash and execute data insertion queries in MongoDB. As soon as you insert new data, the Kafka consumer will receive a notification about adding new data.

Here is a sample code to insert data records in a MongoDB collection called ‘customers’:

javascript
db.customers.insert([ { _id : 1008, first_name : 'Jim', last_name : 'Colbert', email : 'thejim@example.com' } ]);

After the execution, MongoDB should acknowledge the insertion. This concludes the process of replicating the MongoDB stream to Kafka using Debezium.

Limitations

  • Technical Complexity: Understanding Kafka Connect, Zookeeper, and Debezium connectors can be challenging, especially if you are a beginner. The learning curve can slow down initial deployment.
  • Complex Schema Evolution: While Debezium supports schema evolution using Avro and Schema Registry, handling primary key modifications, column type changes, or table alterations can still be complex. Improper schema management may lead to data inconsistency and integrity issues.

Comparing MongoDB Kafka Connectors, Debezium, and Estuary Flow

Features

Estuary Flow

MongoDB Kafka Connectors

Debezium

Data SourcesWith 200+ pre-built connectors, Estuary supports multiple data sources.While using the MongoDB Kafka connector, the source is limited to MongoDB.Debezium offers a wide range of source connectors for several databases.
Ease of ConfigurationYou can configure Estuary with minimal or no code, which makes it user-friendly.It requires extensive coding for configuration.You need significant coding expertise for Debezium configuration.
Schema EvolutionEstuary facilitates automated schema evolution, downstream updates, and real-time data validation.While using the MongoDB Kafka connector, you must manually manage schema changes.Debezium offers the Avro mechanism and schema registry for efficient schema evolution.
Use CaseYou can use Estuary to build real-time data pipelines for multiple sources and destinations. It supports analytics or data warehousing operations.It is more suitable only for applications that require MongoDB to Kafka streaming.You can use Debezium for reliable CDC between various databases for consistent database replication or microservices communication.

Conclusion

Real-time MongoDB Kafka integration can help you create effective data pipelines for event-driven applications. You can also leverage CDC to ensure continuous data movement for real-time analytics.

This blog gives you three methods to stream data from MongoDB to Kafka. One method involves using the MongoDB Kafka source connector, while the other uses Debezium. However, these two approaches are complex and time-consuming. Instead, you can use Estuary Flow, a powerful data integration platform, to overcome these drawbacks. It helps you build a real-time MongoDB Kafka streaming pipeline for different use cases, including analysis of the stock market and tracking live traffic updates.

Get started with Estuary Flow now and effortlessly stream data from MongoDB to Kafka in real-time. Sign up for a free trial!

FAQs

1. What data security mechanisms are offered by MongoDB Kafka Connector?

While using the MongoDB Kafka connector, you can ensure data security by implementing encryption via SSL/TLS. The connector also supports authentication mechanisms, including SCRAM-SHA-256, SCRAM-SHA-1, and MONGODB-CR, to help you protect sensitive data.

2. What is the best method to stream MongoDB CDC to Kafka?

The best method is Estuary Flow, offering real-time, low-latency streaming (<100ms) with log-based CDC and an easy no-code setup. It automates schema evolution and integrates seamlessly with Kafka.

3. What databases can Debzium monitor?

You can use Debezium to monitor various databases, including MySQL, MongoDB, PostgreSQL, Cassandra, and Amazon RDS. With Debezium, you can securely query data and track the performance of these databases.


Related Articles

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Start Building For Free

About the author

Picture of Jeffrey Richman
Jeffrey Richman

With over 15 years in data engineering, a seasoned expert in driving growth for early-stage data companies, focusing on strategies that attract customers and users. Extensive writing provides insights to help companies scale efficiently and effectively in an evolving data landscape.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.