Estuary

How to Stream Data from Kafka to Apache Iceberg in Minutes

Learn how to stream data from Apache Kafka into Apache Iceberg efficiently. This guide will introduce you to the foundations of a streaming data lakehouse and showcase the best options for data integrations.

Share this article

Is your data strategy with Apache Kafka struggling to scale for complex analytics? While Apache Kafka is excellent at handling high-throughput real-time data streaming, it isn’t built for advanced querying and analytics.

Moving your data to Apache Iceberg is the solution you need in such situations. Apache Iceberg provides a scalable solution to help you manage large datasets with efficient storage and excellent query performance. 

To transfer data from Kafka to Iceberg, you can use either an automated method with Estuary Flow or a manual method using Kafka Connect and the Apache Iceberg Sink Connector. Each method offers unique benefits for real-time, scalable integration.

This guide will explore both methods, their prerequisites, and step-by-step processes to help you choose the best option for your Kafka to Iceberg integration needs.

What Is Apache Kafka? 

kafka logo

Apache Kafka is an open-source distributed event streaming platform trusted and used by over 80% of all Fortune 100 companies. It helps you build high-performance streaming data pipelines, enabling many use cases, from asynchronous communication to operational analytics.

Kafka’s architecture uses a Publisher-Subscriber model to handle high-throughput, fault-tolerant, and distributed messaging systems. This architecture involves a cluster of multiple Kafka brokers to manage large amounts of real-time data with low latency. Brokers are the servers, each containing Kafka topics representing a logical channel to receive, store, and deliver data. As a result, the client applications producers can publish data on  Kafka topics, and a consumer can subscribe to and read data from these topics.

Topics are then split into partitions to distribute data across multiple brokers for parallelism. Each partition is an ordered, immutable sequence of messages. To efficiently manage and control the Kafka brokers, you can use ZooKeeper. This centralized service enables you to interact with the Kafka cluster, although in newer versions of Kafka, quorum is handled without ZooKeeper.

What Is Apache Iceberg?

iceberg logo

Apache Iceberg is an open-source, high-performance table format that allows you to manage large analytic datasets. Using this table format to augment your data lake, you can enable extensive scale analytics with compute frameworks like Spark, Trino, and Hito to simultaneously work with the same data.

You can optimize data access and query performance by utilizing Apache Iceberg with a distributed SQL engine. This is possible with a fast scan planning process on a single machine. The scan planning utilizes table metadata to quickly identify the necessary files for a query, even from multi-petabyte tables.

Iceberg supports two types of metadata—manifest files and manifest lists—to efficiently track files and tables. Manifest files list data files along with their partition and column-level stats. On the other hand, the manifest list stores references to manifest files. It includes metadata, such as the range of partition values, and summary statistics, including the number of rows and data size for the manifest.

For fast scan planning, Iceberg helps you prune the manifests based on the partition values ranges in the manifest list. Then, Iceberg allows you to read each manifest to retrieve relevant data files. This advanced filtering facilitates faster query processing, saving time and computational resources.

Here are some of the critical features of Apache Iceberg:

  • Schema Evolution: Iceberg uses an in-place table evolution approach to update the table schema, including nested structures. It supports add, drop, rename, update, and reorder schema evolution changes. These independent schema changes allow you to modify the schema without rewriting files or disrupting existing data.
  • Hidden Partitioning: Iceberg simplifies partitioning by automatically managing partition values, eliminating common manual errors. This automatic optimization improves query performance while keeping the partitioning transparent and efficient. Unlike other formats, it uses hidden partitioning, eliminating manual intervention while partitioning tables.
  • Multiple Concurrent Writes: Iceberg supports multiple concurrent writes using an optimistic concurrency mechanism. Writers update metadata independently and attempt to commit the changes atomically by swapping the new table metadata. In the event of conflicts, the failed writer retries by creating a new metadata tree based on the latest snapshot. This ensures data consistency and conflict resolution.

To dive deeper into how to load data into Apache Iceberg tables efficiently, explore this detailed guide: Loading Data into Apache Iceberg

Common Challenges of Kafka to Iceberg Integration

While integrating Kafka data streams into Iceberg tables offers multiple benefits, it’s also associated with challenges:

  • Efficient and Reliable Data Ingestion: Loading data from Kafka into Iceberg tables quickly and accurately involves balancing latency, reliability, and cost. Real-time streaming pipelines must efficiently convert Kafka’s unstructured or semi-structured data into Icerberg’s structured table format. A robust integration pipeline is essential to manage issues like exactly-once ingestion and schema evolution.
  • Table Optimization and Maintenance: Iceberg offers vital features that enhance data transaction safety, snapshot isolation, and partition pruning. However, this involves regular maintenance tasks like compacting small files, managing growing metadata, and snapshot expiration. Without a well-designed system, the complexities of managing these tasks could increase operational costs.
  • Securing Iceberg Tables: As Iceberg enables you to abstract the data lake storage into tables, traditional file- and directory-based security mechanisms must be improved. Designing a new access control system and integrating enterprise authentication and authorization systems for Iceberg tables is a significant challenge.

2 Methods for Moving Data from Kafka to Iceberg

To overcome the Kafka to Iceberg integration challenges, you must find effective ways to replicate data. Below, we explore two effective approaches:

  • Automated Method: Kafka to Iceberg Integration Using Estuary Flow
  • Manual Method: Kafka to Iceberg Replication Using Kafka Connect Apache Iceberg Sink Connector

Method 1: Move Data from Kafka to Iceberg Using Estuary Flow

Estuary Flow is a real-time data integration platform that allows you to build an ETL/ELT data pipeline for quick data movement and transformation. With minimal-to-no coding, you can effortlessly automate complex data workflows. Estuary Flow lets you streamline reliable and scalable data integration across sources and preferred destinations within a few minutes.

By leveraging end-to-end Change Data Capture (CDC), Estuary Flow helps you capture source schema changes and replicate them to the target platform in real-time. This entire CDC process has a total latency of less than 100 milliseconds

Estuary Flow can facilitate Kafka to Iceberg integration through real-time streaming and automatic schema handling. Some key features of the platform include:

  • No-Code Connectors: Estuary Flow offers over 200 pre-built connectors, including batchreal-time, and streaming CDC options. These connectors help you streamline data extraction, transformation, and loading between various sources and destinations. Such no-code connectors reduce manual work in the pipeline setup.
  • Custom Data Transformations: Estuary Flow facilitates transformations, from a simple remapping to complex self-referential and stateful transaction processing using Flow derivations. You can write derivations for your batch and streaming ETL pipelines using SQLite or TypeScript. Besides this, combining Estuary and dbt enables you to create and apply custom transformations on ELT pipelines immediately after the first data synchronization.
  • Multi-Cloud Deployment: Estuary Flow offers public deployment for hosted options and private deployment for complete control within your infrastructure. It also provides BYOC (Bring Your Own Cloud) for flexible cloud configurations. Each option provides varied levels of security and customization to meet your specific organizational needs.

Now, let’s understand how to migrate data from Kafka to Iceberg using Estuary Flow. Before you start, ensure the following prerequisites are in place:

  • A Kafka cluster with configured bootstrap servers, an authentication mechanism like SASL/SCRAM or SASL/PLAIN, and connection security enabled with TLS.  
  • An Estuary Flow account.
  • Create an Amazon S3 bucket to write files to Apache Iceberg tables.
  • An AWS root or IAM user with read/write access to the S3 bucket.
  • Obtain the IAM user's access key and secret access key.
  • Set up an Iceberg catalog using AWS Glue or REST.
    • If you use the AWS Glue catalog, you must configure the IAM user's permissions to access AWS Glue.
    • If using the REST catalog, get the URI, warehouse name, and credentials to connect to the catalog.   

Step 1: Configure Apache Kafka as the Source

  • Sign in to your Estuary Flow account.
  • Click the Sources menu from the left navigation pane of the dashboard.
kafka to iceberg - kafka source new capture
  • Click the + NEW CAPTURE button on the Sources page and search for Apache Kafka using the Search connectors field.
kafka to iceberg - apache kafka search
  • When you see the Kafka connector in the search results, click the connector’s Capture button.
kafka to iceberg - kafka source capture details
  • You will be redirected to the Kafka connector configuration page; provide a unique name for your capture in the Name field within the Capture Details section.
kafka to iceberg - kafka endpoint config and credentials
  • Expand the Endpoint Config section and specify the following mandatory fields:
    • Bootstrap Servers: Enter the initial Kafka cluster servers to connect to, separated by commas. 
    • Credentials: Under the Credentials section, select the appropriate option between SASL (USER & PASSWORD) and AWS MSK IAM. Depending on the option you choose, fill out the necessary fields to authenticate your Kafka account.
  • Once you enter all the necessary information, click NEXT > SAVE AND PUBLISH.

This completes your Kafka source connector configuration, allowing the connector to capture streaming data from Kafka topics.

Step 2: Configure Amazon S3 Iceberg as the Destination

After a Kafka data capture is complete, a pop-up window with the capture information will appear. Click the MATERIALIZE COLLECTIONS option in this window to configure the destination.

Alternatively, navigate to the Estuary Flow dashboard and click the Destinations > + NEW MATERIALIZATION. When you are redirected to the Create Materialization page, proceed with the following steps: 

  • Enter Iceberg in the Search connectors field. You will find the Amazon S3 Iceberg connector. This connector allows you to materialize delta updates from Flow collections into Apache Iceberg tables, using Amazon S3 for object storage and AWS Glue as the Iceberg catalog. 

Delta updates are the changes made to data since the last update. These updates are batched within Flow, converted to Parquet files, and appended to the Iceberg tables at the specified time interval. 

kafka to iceberg - iceberg search
  • Click the Materialization button of the connector to continue with your destination configuration.
kafka to iceberg - iceberg materialization
  • On the Create Materialization page, specify all the mandatory information, including:
    • Name: Provide a unique name for your materialization.
    • AWS Access Key ID: Mention the Access Key identifier to access the AWS services.
    • AWS Secret Access Key: Fill in the secret access key to access AWS services.
    • Bucket: Enter the name of the S3 bucket into which you will write the data files. 
    • Region: Specify the AWS region where the S3 bucket resides.
    • Namespace: Enter a namespace for bound collection tables. 
  • For Catalog, choose REST or AWS Glue as the Iceberg catalog and enter the relevant information required to connect to it.
  • Expand the Source Collections section and check if the data you captured from the Kafka source is automatically added to your materialization. If not, you can manually link the capture by clicking the SOURCE FROM CAPTURE button.
  • Click on NEXT > SAVE AND PUBLISH to complete your destination configuration.

The Amazon S3 Iceberg connector will materialize data from your Kafka Flow collections into your Iceberg tables, completing your Kafka to Iceberg integration.

Method 2: Kafka to Iceberg Replication Using Kafka Connect Apache Iceberg Sink Connector

Kafka Connect is a popular framework that enables you to migrate data in and out of your Kafka through various connectors. For Kafka to Iceberg integration, Kafka Connect offers a sink connector called the Apache Iceberg Sink Connector.

The Apache Iceberg sink connector guarantees that each record from Kafka is written to the Iceberg tables exactly once, even during failures or retries. Besides this exactly-once delivery semantics, the Kafka Iceberg connector has a multi-table fan-out capability. This helps you move data from a single Kafka topic to multiple Iceberg tables.

Here are the steps to create a Kafka to Iceberg data pipeline using Apache Iceberg sink connector:

Prerequisites:

  • Download and install Kafka 2.5 or higher.
  • Assume a source Kafka topic “events” already exists.
  • Configure the Iceberg catalog.
  • Create a Kafka topic to serve as the Iceberg Sink Connector control topic. 

Step 1: Install the Apache Iceberg Connector in Kafka Connect Instance

  1. Download the Apache Iceberg sink connector from Confluent Hub or GitHub repository. Alternatively, you can build the Kafka Iceberg Connector by executing the following Gradle command in your Confluent CLI terminal:
plaintext
./gradlew -x test -x integrationTest clean build

This will generate a ZIP archive containing the connector.

  1. Once the build is completed, the ZIP file will be located at 
plaintext
./kafka-connect/kafka-connect-runtime/build/distributions/

You can extract the zip archive file. You will find two versions of the archive: one that includes the Hive Metastore client and related dependencies and one without the Hive Metastore client. Select the version that suits your needs, and copy it into the Kafka Connect plugin directory. 

To do this, you can create a plugin directory using the following command and copy the required archive. 

plaintext
mkdir -p CONFLUENT_HOME/share/kafka/plugins

Replace CONFLUENT_HOME with the actual path to your Confluent installation.

  1. Once the connector is copied to the plugin directory, add this to the plugin.path property in your Kafka Connect worker configuration JSON file as given below:

plugin.path=/usr/local/share/kafka/plugins

Kafka will then identify the plugin using its path. A plugin.path is a comma-separated list of directories in the Kafka Connect’s worker configuration file. 

  1. After updating the configuration, restart your Kafka Connect instance to load the Iceberg sink connector across all Kafka Connect nodes.

Step 2: Create a Destination Table

Install Spark SQL interface to create an Iceberg table that can receive all incoming records from the Kafka topic “events.”

plaintext
CREATE TABLE default.events (    id STRING,    type STRING,    ts TIMESTAMP,    payload STRING) PARTITIONED BY (hours(ts))

You can modify this query to meet your requirements. 

Step 3: Configure the Iceberg Connector using Kafka Connect

Modify the following Kafka Connect worker configuration file to connect the Kafka topic to an Iceberg REST catalog. 

plaintext
{ "name": "events-sink", "config": {    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",    "tasks.max": "2",    "topics": "events",    "iceberg.tables": "default.events_list,default.events_create",    "iceberg.tables.route-field": "type",    "iceberg.table.default.events_list.route-regex": "list",    "iceberg.table.default.events_create.route-regex": "create",    "iceberg.catalog": "demo",    "iceberg.catalog.type": "rest",    "iceberg.catalog.uri": "https://localhost",    "iceberg.catalog.credential": "<credential>",    "iceberg.catalog.warehouse": "<warehouse name>"    } }

This configuration is tailored for a project using a REST-based Iceberg catalog with S3 as the storage layer. You must adjust values like the catalog URI, credential, and warehouse name based on your actual environment. After changing the values, you can save the configuration file as events-sink.json

Step 4: Launch the Connector

  1. After creating the configuration file, you can use the Kafka Connect REST API to launch the connector. To do this, use the curl command to send the configuration to your Kafka Connect cluster:
plaintext
curl -X PUT http://localhost:8080/connectors/events-sink/config \     -i -H "Content-Type: application/json" -d @events-sink.json
  1. Then, you can verify if the connector is running by querying the status via the REST API:
plaintext
curl -s http://localhost:8080/connectors/events-sink/status | jq

This command uses jq to format the JSON output for better readability. If everything is working correctly, the response should look like this:

plaintext
{  "name": "events-sink",  "connector": {    "state": "RUNNING",    "worker_id": "connect:8080"  },  "tasks": [    {      "id": 0,      "state": "RUNNING",      "worker_id": "connect:8080"    },    ...  ],  "type": "sink" }

Step 5: Query Data in Iceberg Using PySpark

To verify that data is flowing from Kafka to Iceberg, use PySpark, a Python API for Apache Spark, to query the data in the Iceberg table:

plaintext
df = spark.table("demo.default.events_list") df.show(5)

This will display the first five records on your Iceberg catalog's “events_list” table. 

By following these five steps, you will have:

  • Set up an Iceberg Kafka Sink Connector.
  • Deployed it via Kafka Connect.
  • Confirmed that data is successfully flowing into your Iceberg tables.

Limitations of Kafka Connect to Iceberg Using Iceberg Sink Connector

  • Challenges in Schema Evolution: While Iceberg supports schema evolution, the Iceberg Sink Connector does not always manage complex schema changes well. If a column’s type is changed in Kafka, the connector might fail to map the changes properly, leading to data mismatches or write failures. Such issues are problematic in a streaming context where data is continuously ingested from Kafka into Iceberg.
  • Large Operational Overhead: Managing Kafka and Kafka Connect for integration with Iceberg involves operational complexities. Tasks such as setting up and monitoring Kafka clusters and configuring the connector require expertise as well as effort. Other challenges include schema compatibility and data transformations.
  • Debezium Change Event Format Issues: The Iceberg Sink Connector is compatible with data formats, including JSON and Avro. It relies on Kafka events adhering to the Debezium change event format for CDC use cases. Additional processing may be required to ensure compatibility if the data source does not produce events in this format. 

Best Practices for Kafka to Iceberg Integration

Here are some best practices to ensure efficient and reliable integration between Kafka and Apache Iceberg:

  • Prepare a Data Migration Plan: You must develop a comprehensive data migration plan by clearly identifying the data to be migrated, mapping out any dependencies, and assessing the resources required. This practice ensures efficient migration with minimal downtime.
  • Batch Data Writes: Instead of writing each Kafka topic individually, group messages in larger batches to reduce the number of small files in Iceberg. Small files can lead to slow read performance and high storage costs.
  • Perform Data Transformations: You must standardize, clean, or enrich data before or after writing from Kafka to Iceberg, ensuring that downstream users get high-quality data.
  • Manage Schema Evolution: If you prefer manual ingestion, use schema management tools like Avro or Confluent Schema Registry. These tools will help you smoothly manage schema updates between Kafka and Iceberg.
  • Benchmark Performance: Test the Kafka to Iceberg integration setup under different workloads. This will help evaluate throughput and latency, resource utilization, and failure recovery mechanisms.
  • Use Compatible Versions: Always use compatible and stable versions of Kafka, Iceberg, and other middleware. This involves regular updates to the latest releases for improved performance, bug fixes, and new features.

Conclusion

Integrating Apache Kafka with Apache Iceberg creates a robust pipeline for managing and analyzing real-time data streams in a structured, scalable manner. Kafka provides a reliable, event-streaming platform for ingesting data at low latency. Conversely, Iceberg’s advanced storage features, such as schema evolution and partition management, make it ideal for maintaining and querying large datasets over time.

To transfer data from Kafka to Iceberg tables, especially for analytical purposes, you can manually configure the Kafka Connect Apache Iceberg Sink Connector. However, for an easier and automated integration process, you can use Estuary Flow to transfer data from Kafka to Iceberg tables in real-time and batch processing modes. Ultimately, the choice between these options depends on your integration needs.

To explore how Estuary Flow automates data integration and boosts business productivity, connect with the Estuary experts.

FAQs

Why integrate Kafka with Apache Iceberg?

Integrating Kafka with Apache Iceberg allows you to utilize Iceberg’s scalable storage and querying capabilities. This integration enables your business to store, query, and process streaming data for real-time and batch analysis.

Can the Iceberg Sink Connector handle schema change in Kafka?

Yes, the Iceberg Sink Connector supports schema evolution, such as adding new columns to the Iceberg table. However, it has limitations when handling more complex schema changes like column deletions or type changes. These limitations may cause issues during the data transfer from Kafka to Iceberg tables.


Related Sync with Kafka

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Build a Pipeline

Start streaming your data for free

Build a Pipeline

About the author

Picture of Dani Pálma
Dani Pálma

Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.