Introduction

Overview of Apache Iceberg

Apache Iceberg is an open table format specifically designed for large-scale data lake environments. It was developed to address the limitations of existing table formats, such as Hive, and to provide a more robust and scalable solution for managing big data. Iceberg supports high-performance, reliability, and easy data management, making it an attractive choice for organizations dealing with vast amounts of data.

Importance of Data Loading in Data Lakes

Efficient data loading is critical for data lakes, as it directly impacts the usability, performance, and reliability of the stored data. Proper data loading practices ensure that data is accessible, consistent, and can be quickly queried and analyzed. This process involves transforming raw data into a structured format, handling schema changes, and optimizing for performance.

This article aims to provide a comprehensive guide on loading data into Apache Iceberg tables, covering the necessary prerequisites, data preparation steps, loading techniques, optimization strategies, and troubleshooting tips. It is intended for professionals with a background in data management and engineering who seek to leverage Apache Iceberg in their data lake architectures.

Understanding Apache Iceberg

What are Data Lakes, Lakehouses, and Table Formats?

For a detailed explanation of the difference between these concepts, take a look at this article.

Here’s a super succinct summary:

  • Data Lakes: Centralized repositories that store large volumes of raw data in various formats. They offer flexibility and scalability but can suffer from data management challenges.
  • Lakehouses: An evolution of data lakes that combine the best features of data lakes and data warehouses, offering structured data management and querying capabilities on top of raw data storage.
  • Table Formats: Define how data is stored, organized, and accessed within data lakes. They provide a schema, partitioning, and metadata management to facilitate efficient data querying and processing.

What is Apache Iceberg?

Image #1.png
Apache Iceberg

Apache Iceberg is an open-source table format for organizing data in data lakes. It offers several key features, including:

  • Schema Evolution: Supports changes to the table schema without requiring expensive rewrites.
  • Partitioning: Efficiently handles partitioning to optimize query performance.
  • ACID Transactions: Ensures data consistency and reliability.
  • Time Travel: Allows querying historical data states.
  • Hidden Partitioning: Enhances performance by automatically managing partitions.

Key Features and Benefits

  • Scalability: Designed to handle petabyte-scale data efficiently.
  • Flexibility: Supports multiple data processing engines like Apache Spark, Flink, and others.
  • Performance: Optimized for fast data reads and writes with advanced indexing and partitioning techniques.
  • Data Governance: Ensures data integrity and compliance with robust schema and metadata management.

Comparisons with Other Table Formats

Compared to other table formats like Apache Hive, Hudi, and Delta Lake, Apache Iceberg offers:

  • Better schema evolution: Handling complex schema changes without data rewrites.
  • Hidden partitioning: Reduces the complexity of managing partitions.
  • Enhanced compatibility: Works seamlessly with various data processing engines.

Use Cases in Modern Data Infrastructure

  • Real-Time Analytics: Supports low-latency data processing for real-time insights.
  • Batch Processing: Efficiently handles large-scale batch jobs with optimized data access.
  • Data Warehousing: Integrates with modern data warehouses to provide a robust data storage layer.
  • Machine Learning: Facilitates the preparation and processing of large datasets for machine learning workflows.

What does it mean to load data into Iceberg?

Loading data into Apache Iceberg involves a structured process of ingesting and managing data files within Iceberg tables, leveraging its efficient storage and query capabilities.

Here’s a detailed explanation of what this process entails:

Writing Data Files

The process begins with writing data files into Apache Iceberg tables, which are typically stored in cloud object storage systems like Amazon S3. These data files can be in various formats supported by Iceberg, such as Parquet or Avro, ensuring efficient storage and optimized query performance.

Updating Metadata

Alongside the data files, Iceberg maintains metadata that describes the structure and location of these files. This metadata includes information such as schema definition, partitioning details, file locations, and transactional information. When loading data, Iceberg updates this metadata to reflect the new data files and their properties.

Role of the Catalog

The Iceberg catalog serves as the central repository for managing metadata and organizing tables. It stores information about all tables within the system, including their schemas, partitions, and locations of data files. The catalog integrates with storage systems like AWS Glue or Hive metastore to provide a unified view and management of Iceberg tables.

Process Overview

  1. Schema Definition: Before loading data, define the schema for the Iceberg table. This includes specifying column names, data types, and any nested structures. Iceberg ensures schema evolution, allowing for additions or modifications to the schema without disrupting existing data.
  2. Data Loading: Data loading can occur through batch processing or real-time streaming, depending on the use case. Batch processing involves loading large volumes of data at scheduled intervals, while streaming supports continuous data ingestion for real-time analytics.
  3. Metadata Management: As data files are written into Iceberg tables, the catalog updates metadata to include information about these files. This metadata is crucial for efficient query execution, as it enables Iceberg to skip irrelevant data files during query processing based on predicates like partition values or file statistics.
  4. Transactional Integrity: Iceberg ensures transactional integrity by supporting atomic commits and isolation levels. This guarantees that operations within a transaction either complete successfully or are rolled back entirely, maintaining data consistency and reliability.

Pre-requisites for Loading Data

Build or Buy Data Integration Solution?

Before loading data into Apache Iceberg, decide whether to build a custom data integration solution or use an existing tool. Consider factors like:

  • Complexity: Custom solutions may offer more control but require significant development effort.
  • Scalability: Off-the-shelf tools often provide built-in scalability and reliability.
  • Maintenance: Evaluate the long-term maintenance and support requirements.

Tools and Technologies

Several tools and technologies can be used to load data into Apache Iceberg tables:

  • Apache Spark: A popular data processing engine with native Iceberg support.
  • Apache Flink: Real-time stream processing engine compatible with Iceberg.
  • Python: Using the pyiceberg library for loading data programmatically.
  • Estuary Flow: A real-time data integration platform that supports Iceberg.

Data Preparation

Underlying Data Formats Supported by Apache Iceberg

Apache Iceberg supports various underlying data formats, including Parquet, Avro, and ORC. Parquet is highly recommended due to its efficient columnar storage format and superior query performance.

How to Load Data Into Apache Iceberg - Columb based structure of Parquet
Colum-based structure of Parquet

If you would like to learn more about why Parquet is the best choice for data storage; check out our comprehensive guide on it here.

Data Cleaning and Transformation Best Practices

Why do we need to take extra care about data cleaning & transformations? There are a few reasons, but the most important is that downstream data consumers of the organization can make sure that the data they read can be trusted for analysis or machine learning.

  • Consistency: Ensure data consistency by validating and cleansing the data before loading.
  • Normalization: Normalize data to remove redundancies and improve query performance.
  • Enrichment: Enhance data quality by adding relevant information during the transformation process.

Example Scenarios for Data Preparation

  1. Log Data Processing: Clean and transform raw log data into structured records.
  2. Sales Data Aggregation: Aggregate sales data from multiple sources and normalize it for analysis.
  3. User Data Enrichment: Enrich user profiles with additional demographic information.

Handling Schema Evolution and Partitioning

  • Schema Evolution: Iceberg supports adding, dropping, and renaming columns without expensive data rewrites. Ensure your data processing pipelines can handle schema changes gracefully.
  • Partitioning: Use hidden partitioning to improve query performance without exposing the partitioning logic to end-users.

Loading Data into Apache Iceberg Tables

Alright, with the knowledge out of the way, let’s take a look at actual examples of data ingestion into Iceberg!

Spark & Flink are some of the most common ways of loading data into Apache Iceberg tables. Both use well-tested underlying systems, so if you are already familiar with either, implementing a pipeline for Iceberg should be a breeze.

Load data into Apache Iceberg using Apache Spark

python
spark = SparkSession.builder \    .appName("IcebergExample") \    .config("spark.sql.extensions""org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \    .config("spark.sql.catalog.spark_catalog""org.apache.iceberg.spark.SparkCatalog") \    .config("spark.sql.catalog.spark_catalog.type""hadoop") \    .config("spark.sql.catalog.spark_catalog.warehouse""path/to/warehouse") \    .getOrCreate() # Load data into a Spark DataFrame df = spark.read.parquet("path/to/parquet/files") # Write data to Iceberg table df.write.format("iceberg").mode("append").save("spark_catalog.db.table_name")
java
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // Define the source and Iceberg sink table String sourceDDL = "CREATE TABLE source_table (...) WITH (...)"; String sinkDDL = "CREATE TABLE iceberg_table (...) WITH ('connector' = 'iceberg', 'catalog-name' = 'my_catalog', 'catalog-database' = 'my_db', 'catalog-table' = 'my_table')"; tableEnv.executeSql(sourceDDL); tableEnv.executeSql(sinkDDL); // Insert data from source to sink tableEnv.executeSql("INSERT INTO iceberg_table SELECT * FROM source_table");

If you don’t want the complexity of Spark or Flink, you still have a few options. Pyiceberg is a relatively new Python library that you can use to interact with Apache Iceberg.

Let’s take a look at how that’s done in a bit more detail.

Load data into Apache Iceberg Using Python via pyiceberg

With PyIceberg 0.6.0, write support is added through Apache Arrow, enabling efficient data writing capabilities. Below is an example of how to use PyIceberg to write data to an Iceberg table using mock customer support request data.

Step-by-Step Example

 Step1: Import Required Libraries

First, we need to import the necessary libraries. We'll use PyArrow to create an Arrow Table and PyIceberg to interact with the Iceberg table.

python
import pyarrow as pa from pyiceberg.catalog import load_catalog from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, IntType, TimestampType

Step 2: Create an Arrow Table

We create an Arrow Table from a list of dictionaries, each representing a customer support request.

python
df = pa.Table.from_pylist(    [        {"ticket_id"1"customer_id"123"issue""Login issue""created_at""2023-07-01 08:30:00"},        {"ticket_id"2"customer_id"456"issue""Payment failure""created_at""2023-07-01 09:00:00"},        {"ticket_id"3"customer_id"789"issue""Account locked""created_at""2023-07-01 09:30:00"},        {"ticket_id"4"customer_id"101"issue""Unable to update profile""created_at""2023-07-01 10:00:00"},    ], )

Step 3: Load the Iceberg Catalog

Next, we load the Iceberg catalog. The catalog provides a namespace for organizing tables.

python
catalog = load_catalog("default")

 Step 4: Define the Table Schema

We define the schema for our Iceberg table, specifying the fields and their data types.

python
schema = Schema(    NestedField(1"ticket_id", IntType(), required=True),    NestedField(2"customer_id", IntType(), required=True),    NestedField(3"issue", StringType(), required=False),    NestedField(4"created_at", TimestampType(), required=True), )

Step 5: Create the Iceberg Table

Using the schema, we create a new Iceberg table in the catalog.

python
tbl = catalog.create_table("default.customer_support_requests", schema=schema)

Step 6: Write Data to the Iceberg Table

We can write data to the table using the append method. PyIceberg defaults to fast append, which minimizes the amount of data written and allows for quick writes.

python
tbl.append(df) # Alternatively, to overwrite the data: # tbl.overwrite(df)

Step 7: Verify Data in the Iceberg Table

To verify that the data has been written correctly, we can read the table and convert it to an Arrow Table.

python
result = tbl.scan().to_arrow() print(result)

The output will be:

python
pyarrow.Table ticket_id: int32 customer_id: int32 issue: string created_at: timestamp[us] ---- ticket_id: [[1234]] customer_id: [[123456789101]] issue: [["Login issue""Payment failure""Account locked""Unable to update profile"]] created_at: [[2023-07-01 08:30:002023-07-01 09:00:002023-07-01 09:30:002023-07-01 10:00:00]]

Step 8: Append More Data

If we want to add more data to the table, we can use the append method again.

python
df_new = pa.Table.from_pylist(    [        {"ticket_id"5"customer_id"112"issue""Refund request""created_at""2023-07-01 11:00:00"},    ], ) tbl.append(df_new)

Reading the table again will include the new data:

python
result = tbl.scan().to_arrow() print(result)

The output will be:

python
pyarrow.Table ticket_id: int32 customer_id: int32 issue: string created_at: timestamp[us] ---- ticket_id: [[1, 2, 3, 4], [5]] customer_id: [[123, 456, 789, 101], [112]] issue: [["Login issue", "Payment failure", "Account locked", "Unable to update profile"], ["Refund request"]] created_at: [[2023-07-01 08:30:00, 2023-07-01 09:00:00, 2023-07-01 09:30:00, 2023-07-01 10:00:00], [2023-07-01 11:00:00]]

And that’s all the basics you need to know to use pyiceberg! If you feel like that’s a lot of implementation required, keep reading as your Estuary Flow is here to make your life easier.

Using Estuary Flow

How to Load Data Into Apache Iceberg - Estuary Flow and Apache Iceberg
Estuary Flow and Apache Iceberg

Estuary Flow is the best way to stream real-time data into Iceberg tables. Flow provides a graphical interface and pre-built connectors that allow you to get started in minutes.

Take a look at the official documentation to see the full feature set of the connector. 

Here’s how you can get up and running in just a few minutes:

Prerequisites

Before you begin, ensure you have the following:

Amazon S3 Bucket: An S3 bucket to write files to.

Guide to setting up a new S3 bucket

AWS Glue Access: An AWS root or IAM user with access to AWS Glue.

Guide to setting up IAM permissions for AWS Glue

Ensure the user has read and write access to your S3 bucket.

AWS Credentials: Access key and secret access key for the AWS user.

Finding AWS credentials

Steps to Set Up the Resources and Connector

Step 1: Set Up the S3 Bucket

Follow the AWS documentation to create a new S3 bucket:

Go to the S3 service in the AWS Management Console.

Click "Create bucket" and follow the prompts to configure the bucket settings.

Note the bucket name, as you'll need it later.

Step 2: Set Up IAM User with Glue Access

Ensure you have an IAM user with the necessary permissions:

Go to the IAM service in the AWS Management Console.

Create a new user or select an existing user.

Attach the following policies to the user:

  • AmazonS3FullAccess
  • AWSGlueServiceRole

Generate access keys for the user and note them down.

Step 3: Configure AWS Glue

Navigate to the AWS Glue service in the AWS Management Console.

Create a new Glue database if you don't have one:

Go to the "Databases" section and click "Add database".

Provide a name and description for the database.

Step 4: Configure the Connector in Estuary Flow

How to Load Data Into Apache Iceberg - Search Connectors - Iceberg Materialization
Create Iceberg connector

Within the Estuary Flow dashboard:

Go to the Connectors section.

Add a new Materialization connector and select "S3 Iceberg" from the list.

Provide the necessary configuration:

  • S3 Bucket Name
  • Glue Database Name
  • AWS Access Key ID
  • AWS Secret Access Key

Set the data batching interval as per your requirements.

How to Load Data into Apache Iceberg - Configure Iceberg Connector
Configure Iceberg connector

 Step 6: Start Data Loading

Once the connector is set up and configured, it will start materializing delta updates from your Flow collections into the Apache Iceberg tables at the specified intervals.

Optimizing Data Loading

 Apache Iceberg Performance Best Practices

Data Partitioning

Proper partitioning is crucial for optimizing query performance in Apache Iceberg tables. By strategically partitioning your data, you can significantly reduce query times by limiting the amount of data scanned. For example, partitioning by date can help narrow down queries to specific time periods, while partitioning by geographic region can optimize location-based queries.

File Sizes

Aim for optimal file sizes, typically between 128MB and 1GB, to balance between read performance and metadata management. Smaller files can lead to excessive metadata overhead, while larger files can improve scan efficiency but may impact write performance. Monitoring and adjusting file sizes based on workload patterns is essential for maintaining optimal performance.

Managing Metadata and Ensuring Data Consistency

Regularly compacting metadata files and cleaning up old snapshots are essential practices for maintaining performance in Apache Iceberg tables. Compacting metadata files reduces the overhead associated with managing numerous small metadata files, while cleaning up old snapshots helps manage storage costs and improves query performance. Iceberg's built-in features, such as atomic commits and isolation levels, ensure data consistency and ACID compliance, making it easier to manage concurrent data operations without compromising data integrity.

Compaction of Iceberg Tables

Compaction is a critical process in Apache Iceberg tables that helps optimize storage and query performance. Here’s why compaction is important and how to manage it effectively:

Importance of Compaction:

  • Metadata Management: Iceberg maintains metadata files that describe the structure and location of data files. Over time, these metadata files can accumulate, leading to increased overhead and potentially slower query performance.
  • Storage Efficiency: Compaction consolidates smaller data files into larger files, reducing the number of files Iceberg needs to manage. This optimization improves storage efficiency and reduces the amount of metadata read during queries.
  • Query Performance: By reducing the number of metadata operations and data files, compaction can significantly improve query performance, especially for large-scale data sets.

Managing Compaction:

  • Automatic vs. Manual: Iceberg supports automatic compaction, where it periodically evaluates the need for compaction based on thresholds like file count or size. Alternatively, you can manually trigger compaction to optimize performance at specific intervals or after major data updates.
  • Compaction Policies: Configure compaction policies to control when and how often compaction occurs. You can specify thresholds for file size, file count, or time intervals to trigger automatic compaction.
  • Performance Impact: Consider the performance impact of compaction on your system. While compaction improves query performance, it requires additional compute resources and may temporarily impact data availability during the compaction process.

Best Practices:

  • Regular Maintenance: Schedule regular compaction to prevent metadata and storage inefficiencies from accumulating over time.
  • Monitor and Tune: Monitor compaction activities and tune policies based on workload patterns and performance metrics. Adjust compaction thresholds as needed to balance between query performance and resource utilization.
  • Documentation and Training: Stay informed about Iceberg’s compaction capabilities through documentation and training resources. Engage with the Iceberg community to exchange best practices and insights on optimizing compaction for your use case.

Streaming vs. Batch Data Loading in Apache Iceberg

Batch Processing

Batch processing is suitable for large, infrequent data loads that involve complex transformations. This approach is ideal for scenarios where data can be aggregated and processed in chunks, such as end-of-day reports or monthly analytics. Batch processing allows for thorough data validation and transformation before loading into Iceberg tables, ensuring data quality and consistency.

Streaming Processing

Streaming processing is ideal for real-time data ingestion and continuous data updates. This approach supports use cases where data needs to be available for querying and analysis almost immediately after it is generated. Streaming processing ensures low-latency data updates, making it suitable for real-time analytics, monitoring dashboards, and other applications that require up-to-the-minute information.

Incremental Data Loading for Apache Iceberg

Implementing incremental data loading can significantly improve efficiency and reduce load times by avoiding full data refreshes. Techniques like change data capture (CDC) allow you to track and load only the data that has changed since the last update. This approach minimizes the amount of data processed and transferred, reducing the load on your system and ensuring faster data availability.

Managing Metadata and Ensuring Data Consistency

Regularly compacting metadata files and cleaning up old snapshots are essential practices for maintaining performance in Apache Iceberg tables. Compacting metadata files reduces the overhead associated with managing numerous small metadata files, while cleaning up old snapshots helps manage storage costs and improves query performance. Iceberg's built-in features, such as atomic commits and isolation levels, ensure data consistency and ACID compliance, making it easier to manage concurrent data operations without compromising data integrity.

Troubleshooting and Common Issues

Common Errors in Apache Iceberg Data Loading and How to Fix Them

Schema Mismatch

Schema mismatches can occur when the data schema does not match the Iceberg table schema. To resolve this, ensure that the data schema is aligned with the table schema before loading data. This may involve transforming or mapping fields to match the expected schema.

Partitioning Issues

Partitioning issues can arise if the partitioning strategy is not optimized for the data or query patterns. Reviewing and adjusting the partitioning strategy can help improve performance. For example, if queries frequently filter by date, consider partitioning by date to enhance query efficiency.

File Format Compatibility

File format compatibility issues can occur if the data files are not in a format supported by Iceberg. Ensure that the data files are in a compatible format, such as Parquet or Avro, before loading them into Iceberg tables. This ensures smooth integration and optimal performance.

Debugging and Optimization Tips for Apache Iceberg Data Loading

Enable Logging

Enabling detailed logging can help track data loading processes and identify bottlenecks. Logs provide insights into the data loading workflow, highlighting areas where performance may be lagging and allowing for targeted optimizations. 

Make sure you research your catalog implementation as well and see what options they support for logging.

Profile Queries

Using query profiling tools can help understand performance issues and optimize accordingly. Profiling queries provides detailed information on query execution, including time spent on different operations, resource utilization, and potential bottlenecks. This information can guide optimizations to improve query performance.

Conclusion

Summary of Key Points

This article provided an in-depth guide on loading data into Apache Iceberg tables, covering the necessary prerequisites, data preparation, loading techniques, optimization strategies, and troubleshooting tips.

By following these guidelines, you can efficiently manage and query large-scale data in your data lake using Apache Iceberg.

References and Further Reading

Start streaming your data for free

Build a Pipeline