
Real-time data movement from a database to an event-driven platform is an essential part of modern data workflows. With this, you can instantly trigger actions, such as sending notifications or updating dashboards, depending on the database changes. For example, a user survey application that stores responses in a database can automatically trigger an email notification whenever a new submission is recorded.
PostgreSQL is a prominently used database that allows you to store data in rows and columns. Apache Kafka, on the other hand, is an event-streaming platform that facilitates real-time processing, storage, and transmission of data streams. Moving data from Postgres to Kafka enables the automation of data workflow tasks.
This guide explores three methods to stream data from Postgres to Kafka: Estuary Flow (no-code, real-time CDC), Debezium with Kafka Connect (open-source CDC), and Kafka JDBC Source Connector.
Need to reverse the flow? See how to move Kafka data into PostgreSQL!
Understanding the Components
Before you start streaming data from Postgres to Kafka, it is essential to understand the key components that will help you perform this migration.
PostgreSQL Overview
PostgreSQL is an open-source Object-Relational Database Management System (ORDBMS). Like any other RDBMS, it permits data storage in a tabular format. However, PostgreSQL also supports relational and non-relational data types.
The non-relational data support feature makes it a popular option for storing semi-structured and unstructured data. Postgres also offers an extension, `pgvector`, to facilitate the storage of data in the form of vector embeddings. You can transform unstructured data into vectors and store these embeddings in Postgres along with relational data.
Apache Kafka Overview
Apache Kafka is an open-source, distributed event-streaming platform. It enables you to distribute event-driven messages across multiple nodes—brokers. If one broker fails, another can take over to ensure continuity in operations. This architecture supports parallel processing, making Kafka a good choice for scalable applications.
With Kafka, you can develop high-performance mission-critical applications that rely on message brokers to produce and consume data streams. Kafka’s scalable architecture allows it to manage millions of messages per second, supporting real-time analytics and enterprise-level data integration.
Here are some of the key features of Apache Kafka:
- Stream Processing: Kafka supports real-time stream processing with built-in features for filtering, aggregating, and transforming event streams.
- Scalability: Kafka is horizontally scalable, allowing the addition of multiple brokers to help distribute the load without compromising performance. This makes it an ideal platform for powering an application’s server-side architecture, as it can accommodate growing user requirements.
- Disk-Based Retention: Kafka uses a disk-based retention mechanism to store messages for a particular duration and automatically eliminates obsolete data based on the retention time limit. This feature enables you to retain information for an extended period of time, which can be advantageous for analysis.
Change Data Capture (CDC)
Change Data Capture, or CDC, is a software design pattern that assists in identifying and replicating source data changes to a target system in real time. By instantly propagating the updates to downstream applications, CDC ensures data consistency between various platforms within your workflow.
With CDC, the original database remains intact, and you do not need to add triggers at the source to track changes. This helps minimize performance overhead since additional actions, such as creating triggers or full table scans, can significantly impact the database’s efficiency.
Method 1: Streaming PostgreSQL to Kafka with Estuary Flow (No-Code Approach)
Estuary Flow is a low- and no-code, high-performance, easy-to-use SaaS platform that lets you streamline your data integration tasks. It offers an extensive library of 200+ pre-built connectors to simplify the development of robust ETL/ELT pipelines.
You can implement custom transformation logic with SQLite and TypeScript to modify the source data and make it compatible with the destination system. With this feature, you can perform complex transformations, including mapping, filtering, and aggregations.
Let’s explore the steps to stream data from Postgres to Kafka effortlessly using Estuary Flow.
Prerequisites
- An Estuary Flow account.
- A Postgres database with logical replication configured, a user role with the REPLICATION attribute, a replication slot, a publication, and a watermarks table.
- A Kafka cluster with bootstrap.servers configured, an authentication mechanism, and connection security enabled with TLS.
- If using the AVRO message format with a schema registry, you must have the endpoint for connecting to the registry schema, and the username and password for authentication.
Step 1: Configure Postgres as a Source
- Log in to your Estuary account.
- Click on the Sources tab on the left-hand panel of the dashboard.
- On the Sources page, click the + NEW CAPTURE button, and in the Search connectors box, enter PostgreSQL.
- From the available options, select PostgreSQL Real-time (100ms) by clicking on the Capture button of the connector.
- On the connector configuration page, enter all the necessary details, including:
- Name: Provide a unique name for your capture.
- Server Address: The host or host:port to connect to a database.
- User: The username for database authentication.
- Password: The password for the provided username.
- After populating all the fields, click NEXT > SAVE AND PUBLISH.
Following the above steps, you can configure the Postgres source connector in Estuary Flow. This connector utilizes CDC to continuously capture Postgres database updates into one or more Flow collections.
Step 2: Configure Apache Kafka as a Destination
- After setting up the source connector, a pop-up will summarize the details of the capture. You can click the MATERIALIZE COLLECTIONS button to start configuring Apache Kafka as a destination.
Another way is to click on the Destinations tab on the left-hand panel of the dashboard. This will redirect you to the Destinations page, where you can click the + NEW MATERIALIZATION button.
- On the Create Materialization page, enter Kafka in the Search connectors box.
- Click on the Materialization button of the Kafka connector to proceed.
- On the Create Materialization page, enter all the mandatory fields, including:
- Name: A unique name for your materialization.
- Bootstrap Servers: The initial servers in the Kafka clusters to connect to.
- For authentication, navigate to the Credentials section and mention Auth Type, SASL Mechanism, Username, and Password.
- The Postgres data collections added to the capture will be automatically added to the materialization. However, you can also manually choose a capture to link to your materialization by navigating to the Source Collections section. Click on the SOURCE FROM CAPTURE button and select your Postgres data collection.
- Finally, click NEXT > SAVE AND PUBLISH to set up a real-time Postgres to Kafka pipeline.
The Apache Kafka connector supports delta updates to streamline data movement from Postgres to Kafka.
Note: In Estuary Flow, materializations highlight the process of loading data to the destination. It facilitates data migration with standard materialization and delta updates. The standard materialization mechanism involves querying the target system to check the existing data state. After examining the current state, Flow updates the data by merging new information.
For working with standard materialization updates, the endpoint must be a stateful system, like a relational database. However, for systems that don’t provide state representation, Flow uses a delta-updates mode.
In the delta-updates mode, Flow reduces documents locally within each transaction. It then propagates only the changes, or deltas, per key to the endpoint. The endpoints are responsible for further reductions.
Set up real-time Postgres-to-Kafka streaming in minutes with Estuary Flow. Start now or contact us for expert support.
Why Choose Estuary Flow?
- Many-to-Many Connections: Estuary Flow supports many-to-many connections, allowing you to integrate various data sources and destinations using a single data pipeline. The pipeline can consolidate data from multiple platforms into a unified source of truth and enhances data accessibility within your organization.
- Real-Time Data Synchronization: With built-in CDC functionality, Estuary Flow enables you to identify and copy updates in the source system to the destination in real-time. It involves sub-100 ms latency when synchronizing data changes between source and sink platforms.
- Time Travel: Estuary Flow has a time travel feature that lets you define a date range to restrict the data materialization process. The materialization of data values only happens when they belong within the specified date and time window. This feature provides you with more control over the data replication process.
- Deployment Options: Estuary offers flexible deployment options based on your specific needs. The Public version is fully managed, while the Private edition supports executing Estuary’s data infrastructure within your private environment. Bring Your Own Cloud is preferable for organizations requiring more control, as it enables you to deploy Estuary in your own cloud environment.
Method 2: Using Debezium with Kafka Connect (Open-Source Approach)
Debezium is a distributed platform that supports CDC functionality. It offers a growing range of source connectors, including prominent databases like PostgreSQL, MySQL, and MongoDB. Utilizing these connectors, you can record database changes from WAL—Write-Ahead Logging—files and publish them to an event-streaming platform like Apache Kafka.
Whenever you add new data to the Postgres database, Debezium facilitates the capture and forwarding to Kafka. Following this, Kafka publishes the changes to a specific topic. Kafka consumers can then execute tasks based on the information stored in a particular topic.
Before proceeding, you must activate logical replication in PostgreSQL. To achieve this, you can set wal_level = logical in the postgresql.conf file.
Here are the steps to perform Postgres CDC to Kafka using Debezium:
Step 1: Setting up the Environment
To set up the required services, you can create a docker-compose.yml file and copy-paste the following code:
plaintextversion: '3'
services:
zookeeper:
image: debezium/zookeeper:7.3.0
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:7.3.0
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- ADVERTISED_LISTENERS=PLAINTEXT://${HOST_IP}:9092
- LISTENERS=PLAINTEXT://0.0.0.0:9092
postgres:
image: debezium/example-postgres:15
ports:
- 5433:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
postgres-connector:
image: debezium/connect:2.3
ports:
- 8083:8083
links:
- kafka
- postgres
- zookeeper
environment:
- BOOTSTRAP_SERVERS=${HOST_IP}:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=pg_connect_configs
- OFFSET_STORAGE_TOPIC=pg_connect_offsets
- STATUS_STORAGE_TOPIC=pg_connect_statuses
The above template will set up four services: Zookeeper, Kafka, Postgres, and Debezium. Zookeeper is a management server that allows you to track Kafka cluster metadata, topics, and partitions. It works as a shared configuration service, enabling you to perform multiple read and write operations.
Step 2: Running the Necessary Services
- Open the terminal and create a new directory for this project.
- Copy the docker-compose.yml file into the directory.
- Set the HOST_IP environment variable used in the previous step.
plaintextexport HOST_IP=$(ifconfig | grep -E "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep -v 127.0.0.1 | awk '{print $2}' | cut -f2 -d: |head -n1)
Note that the above command is only applicable to UNIX-based operating systems.
- To start the containers, run the following:
docker-compose up -d
You can verify the state of your docker container with:
docker ps
Step 3: Configure the Postgres Database
- Connect to the Postgres instance and log in to psql, Postgres’ interactive terminal.
docker exec -it postgres bash
psql -U postgres
- When prompted, enter your Postgres password.
- In the shell, create a new table by executing the following sample code:
plaintextCREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL
);
INSERT INTO customers (name) VALUES ('James');
INSERT INTO customers (name) VALUES ('John');
INSERT INTO customers (name) VALUES ('Jim');
- After creating the table, exit the PostgreSQL shell and container.
Step 4: Initialize Debezium
- You can use the REST endpoints Kafka Connect provides to check the connectors enabled in the container. In your terminal, run:
curl -X GET http://localhost:8083/connectors
This code must output a success message as an empty array, indicating no active connectors.
Create a configurational JSON file for the Postgres Debezium connector. To accomplish this, you can create a new file named pg-source-config.json with the following content:
plaintext{
"name": "pg-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.server.name": "myserver",
"schema.include.list": "public",
"table.include.list": "public.customers",
"plugin.name": "pgoutput"
}
}
In the above file content, replace all the placeholders with your credentials.
- To register the Debezium connector with Kafka Connect, run the following:
curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:8083/connectors
The execution of this code must produce the same JSON object as the one available in the pg-source-config.json file. This implies that the pipeline is active.
Kafka topics will reflect each change made to the customers table. Finally, you can create a consumer application to use the data stored in the Kafka topic to perform specific tasks.
Challenges
- Risk of Error Production: Configuring a Postgres to Kafka connection using Debezium requires constant monitoring. Even a minor misconfiguration or bug in the code can lead to inconsistencies or incorrect event propagation.
- Effort-Intensive: Creating a CDC data pipeline using Debezium requires extensive technical expertise in configuring Kafka, Postgres, and Debezium connectors. As this process lacks automation, it can be time-consuming and effort-intensive.
Method 3: Using Kafka Connect JDBC Source Connector (Postgres to Kafka Connector)
Another method to stream data from Postgres to Kafka is to use the Postgres JDBC source connector for Confluent. This method supports capturing the initial snapshot of the database and then monitoring all the row-level changes in the table. A separate Kafka topic records each event occurring on a table.
To perform this procedure, you must have access to the Confluent CLI and the Kafka cluster credentials for authentication.
Let’s explore the steps to use Postgres to Kafka connector:
Step 1: Set up the Connector Configuration Properties
To display the connector configuration properties, execute the following command:
confluent connect plugin describe PostgresSource
Step 2: Create Connector Configuration File
You can define the connector configuration properties by creating a new JSON file. For example, create a file with the name postgres-source.json and copy the following properties:
plaintext{
"name": "confluent-postgresql-source",
"connector.class": "PostgresSource",
"kafka.api.key":"<my-kafka-api-key>",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.secret": "<my-kafka-api-secret>",
"topic.prefix": "postgresql_",
"ssl.mode": "prefer",
"connection.host": "<my-database-endpoint>",
"connection.port": "5432",
"connection.user" : "postgres",
"connection.password": "<my-database-password>",
"db.name": "postgres",
"table.whitelist": "passengers",
"timestamp.column.name": "created_at",
"output.data.format": "JSON",
"db.timezone":"UTC",
"tasks.max" : "1"
}
In the above format, you must enter your credentials, replacing these placeholders:
- name: Provide a name for the new connector.
- connector.class: Useful for identifying the connector plugin name.
- kafka.auth.mode: Essential for specifying the authentication mode, which can be SERVICE_ACCOUNT or KAFKA_API_KEY. You can use the API key and secret using the kafka.api.key and kafka.api.secret properties. For a service account, mention the RESOURCE ID in kafka.service.account.id property.
- topic.prefix: Enter a topic prefix. Kafka automatically defines topics with <topic.prefix><tableName> naming convention. Newly created tables have the properties topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3. To create a topic with specific properties, you must define it before executing this connector.
- ssl.mode: It allows you to create an encrypted connection to the database.
- timestamp.column.name: This option detects changes made to the database using a timestamp column. You can combine this property with incrementing.column.name to handle updates using a unique ID assigned to a specific stream offset.
- output.data.format: Specifies the Kafka message format. Commonly used formats include AVRO, JSON_SR, PROTOBUF, or JSON.
- db.timezone: Sets the valid database timezone.
Step 3: Create the Connector Using the Configuration File
To load the configuration file and start the connector, use the following command:
confluent connect cluster create --config-file postgres-source.json
The above command must result in a success message.
Step 4: Evaluate the Connector Status
After creating the Postgres to Kafka connector, you can check its status using:
confluent connect cluster list
The above command must output a connector with a unique ID, name, and status set to running. This implies the successful creation of the Kafka-Postgres connector.
Limitations
- DDL Challenges: The Postgres Kafka connector relies on the PostgreSQL logical decoding feature. This process cannot capture and reflect DDL changes, such as table modifications or column deletions.
- Manual Intervention Required: In case of connection failure, the connector will attempt to reconnect using the exponential backoff algorithm. After 16 failed attempts, the task is marked as a failure, and you might have to restart the procedure manually.
- Data-Type Compatibility Issues: The JDBC connector does not support PostgreSQL-specific data types, such as arrays or geometrical columns. As a result, the connector does not stream any unsupported data type to the Kafka topic.
Conclusion
Streaming data from Postgres to Kafka enhances operational efficiency, enables real-time data processing, and ensures seamless integration with modern data workflows. Kafka can work as a buffer system, storing data between Postgres and the downstream applications. This is especially useful when backend processes rely majorly on PostgreSQL.
The three methods mentioned in this article can effectively stream data from Postgres to Kafka. However, the JDBC Source connector and Kafka Connect Debezium connector have certain drawbacks. The primary challenge is the manual effort required, which can lead to errors that can be time-consuming to resolve. To automate this migration, use Estuary Flow.
Or, if your ultimate goal is to stream data to a Kafka consumer rather than your own Kafka broker, consider using Estuary’s Dekaf connector for your materialization. In this scenario, Estuary hosts the Kafka broker and schema registry so you can streamline your own data architecture.
You can connect with the experts to learn more about the benefits of incorporating Estuary into your workflow.
Related Articles

About the author
With over 15 years in data engineering, a seasoned expert in driving growth for early-stage data companies, focusing on strategies that attract customers and users. Extensive writing provides insights to help companies scale efficiently and effectively in an evolving data landscape.
Popular Articles
