Estuary

PyIceberg Tutorial: Manage Apache Iceberg Tables in Python

Learn how to use PyIceberg, a lightweight Python API for Apache Iceberg. Set up catalogs, create tables, and query data without Spark or Trino.

Blog post hero image
Share this article

Apache Iceberg is an open table format designed for big data workloads. It enables ACID transactions, schema evolution, and efficient querying for large datasets. Working with Iceberg tables traditionally required a heavy setup, such as a Spark cluster or a Trino engine. PyIceberg changes that by providing a lightweight Python-native API for working with Iceberg tables without needing Spark or Flink for basic operations.

This guide walks you through setting up PyIceberg, configuring catalogs, creating tables, querying data, and managing schema evolution.

Why Use PyIceberg?

As Python is one of the most popular programming languages for data engineering, PyIceberg offers a convenient and accessible way to work with Iceberg. It provides a simple and intuitive API that allows developers to create, read, update, and delete data in Iceberg tables and perform schema evolution and partitioning operations.

With PyIceberg, you can leverage the power of Iceberg's features without the complexity of setting up and managing a distributed processing cluster. This makes it ideal for Python developers who want to integrate Iceberg easily into their data pipelines and applications.

Advantages of PyIceberg

  • Lightweight and Modular: Unlike Spark or Flink, PyIceberg does not require a distributed processing engine to interact with Iceberg tables.
  • Pythonic API: Designed for Python developers, PyIceberg provides a straightforward API to manage Iceberg tables.
  • Interoperability: Works seamlessly with query engines like DuckDB, making ad-hoc querying much easier.
  • Schema Evolution and Partitioning: PyIceberg fully supports schema evolution and flexible partitioning strategies, allowing incremental changes without table recreation.
  • Decoupled Storage and Compute: Iceberg's design allows storage and compute to scale independently, making it more cost-efficient than traditional data warehouses.

Comparing PyIceberg to Alternatives

FeaturePyIcebergApache SparkTrino/PrestoDuckDB
Lightweight✅ Yes❌ No❌ No✅ Yes
Schema Evolution✅ Yes✅ Yes✅ Yes❌ No
ACID Transactions✅ Yes✅ Yes✅ Yes❌ No
Query Engine Dependency❌ No✅ Yes✅ Yes❌ No
Supports Iceberg Tables✅ Yes✅ Yes✅ Yes❌ No (extension)

1. Setting Up PyIceberg

Before we start querying Iceberg tables, we need to install and configure PyIceberg.

Installing PyIceberg

We can install PyIceberg using pip, including necessary extras for SQL-based catalogs and local file storage:

python
pip install "pyiceberg[sql-sqlite,pyarrow]"
  • sql-sqlite enables using SQLite as a lightweight metadata catalog for Iceberg tables.
  • pyarrow provides support for working with Iceberg tables using Apache Arrow, enabling efficient in-memory data operations.

2. Configuring an Iceberg Catalog

Creating a Storage Directory

Iceberg tables require a catalog to store metadata. For this example, we’ll create a local SQLite-backed Iceberg catalog and persist data on the filesystem.

python
mkdir -p iceberg_catalog

Configuring PyIceberg

Next, we define the catalog configuration in a .pyiceberg.yaml file:

python
catalog: local:    uri: sqlite:///iceberg_catalog/catalog.db    warehouse: file://iceberg_catalog
  • uri specifies the SQLite database for storing Iceberg metadata.
  • warehouse defines where Iceberg table data will be stored.

3. Loading the Catalog in Python

Once the configuration is in place, we need to load the Iceberg catalog inside Python.

Setting the Environment

First, we tell PyIceberg where to look for the configuration file:

python
import os os.environ['PYICEBERG_HOME'] = os.getcwd()

Initializing the Iceberg Catalog

Now, we load the catalog using PyIceberg’s API:

python
from pyiceberg.catalog import load_catalog catalog = load_catalog(name='local')

To verify that everything is set up correctly, print the catalog properties:

python
print(catalog.properties)

Expected Output

javascript
{'uri''sqlite:///iceberg_catalog/catalog.db''warehouse''file://iceberg_catalog'}

This confirms that PyIceberg is correctly configured and ready to use.

4. Creating an Iceberg Table

Now that our catalog is set up, let's create an Iceberg table to store structured data.

Defining a Schema

We define a schema with three fields:

  • id: A unique identifier (integer, required).
  • category: A string describing the data category (required).
  • amount: A numerical value (float, required).
python
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, IntegerType, StringType, FloatType schema = Schema(    NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),    NestedField(field_id=2, name="category", field_type=StringType(), required=True),    NestedField(field_id=3, name="amount", field_type=FloatType(), required=True), )

Creating a Namespace

We create a namespace (database) inside the catalog:

python
catalog.create_namespace_if_not_exists('transactions')

Creating the Iceberg Table

Now, we create a table inside the transactions namespace:

python
iceberg_table = catalog.create_table_if_not_exists(    identifier='transactions.sales_data',    schema=schema )

Inspecting the Table

To confirm that the table was created successfully, we print its schema:

python
print(iceberg_table.schema())

Expected Output

python
table { 1id: required int 2: category: required string 3: amount: required float }

Now, we’re ready to insert and query data from our Iceberg table.

5. Adding Data to the Iceberg Table

Let’s insert sample transaction data into our sales_data table.

Creating a PyArrow Table

We use Apache Arrow to create a structured in-memory table before appending it to Iceberg.

python
import pyarrow as pa # Create a PyArrow table with some sample transactions pa_table_data = pa.Table.from_pylist([    {'id'1'category''electronics''amount'299.99},    {'id'2'category''clothing''amount'79.99},    {'id'3'category''groceries''amount'45.50},    {'id'4'category''electronics''amount'999.99},    {'id'5'category''clothing''amount'120.00}, ], schema=iceberg_table.schema().as_arrow())

Appending Data to the Iceberg Table

Now, we append the data to our Iceberg table:

python
iceberg_table.append(df=pa_table_data)

Verifying the Inserted Data

To confirm the data was inserted successfully, we query the table:

python
print(iceberg_table.scan().to_arrow().to_string(preview_cols=10))

Expected Output

python
pyarrow.Table id: int32 not null category: large_string not null amount: float not null ---- id: [[1,2,3,4,5]] category: [["electronics","clothing","groceries","electronics","clothing"]] amount: [[299.99,79.99,45.50,999.99,120.00]]

Querying Iceberg Tables with PyIceberg

PyIceberg provides powerful querying capabilities for Iceberg tables, enabling flexible data exploration using predicate filtering and column selection. Let’s walk through some practical examples.

1. Query By Name (String Matching)

Suppose we want to retrieve all records where the name column contains "metric". We can use the StartsWith expression.

python
from pyiceberg.expressions import StartsWith # Query rows where the 'name' column starts with 'metric' result = iceberg_table \    .scan(row_filter=StartsWith('name''metric')) \    .to_arrow() print(result.to_string(preview_cols=10))

Output (Example)

python
pyarrow.Table id: int32 not null name: large_string not null value: int32 not null ---- id: [[123]] name: [["metric_1""metric_2""metric_3"]] value: [[51218]]

2. Query Latest Entries (Filtering by Timestamp)

Let's assume our table has a column created_at that stores timestamps. We can retrieve all records created in the last 7 days using GreaterThan.

python
from pyiceberg.expressions import GreaterThan import datetime # Get timestamp for 7 days ago seven_days_ago = (datetime.datetime.utcnow() - datetime.timedelta(days=7)).isoformat() # Query rows where 'created_at' is greater than seven days ago result = iceberg_table \    .scan(row_filter=GreaterThan('created_at', seven_days_ago)) \    .to_arrow() print(result.to_string(preview_cols=10))

3. Query with Multiple Conditions (Category & Value Filtering)

Imagine we have a category column, and we want to retrieve all records where category is "A" and value is greater than 20.

python
from pyiceberg.expressions import And, EqualTo, GreaterThan # Query rows where 'category' is "A" and 'value' is greater than 20 result = iceberg_table \    .scan(        row_filter=And(            EqualTo('category''A'),            GreaterThan('value'20)        )    ) \    .to_arrow() print(result.to_string(preview_cols=10))

4. Retrieve Specific Columns for Performance Optimization

Instead of scanning the entire dataset, select only the necessary columns to improve performance.

python
from pyiceberg.expressions import EqualTo # Retrieve only 'id' and 'value' for records where 'id' = 3 result = iceberg_table \    .scan(        row_filter=EqualTo('id'3),        selected_fields=['id''value']    ) \    .to_arrow() print(result.to_string(preview_cols=10))

5. Find Null or Missing Values

If a dataset has missing or null values in a column, we can use IsNull to find them.

python
from pyiceberg.expressions import IsNull # Find rows where 'value' is NULL result = iceberg_table \    .scan(row_filter=IsNull('value')) \    .to_arrow() print(result.to_string(preview_cols=10))

6. Query Using a List of IDs (IN Clause Alternative)

To retrieve records matching multiple IDs at once, use In.

python
from pyiceberg.expressions import In # Query rows where 'id' is in [2, 4, 6] result = iceberg_table \    .scan(row_filter=In('id', [246])) \    .to_arrow() print(result.to_string(preview_cols=10))

Learn more about Iceberg catalog options in this Polaris vs. Unity Catalog comparison.

Updating and Deleting Data

Updating Data in Iceberg

This example updates the updated_at column for the row where id = 4. The new value is the current UTC timestamp in ISO 8601 format.

We use pa.string() to ensure compatibility if the column is stored as a string. If it's stored as a timestamp, we would use pa.timestamp('us').

python
import pyarrow as pa from pyiceberg.expressions import EqualTo import datetime # Query for the row to update. filtered_table = iceberg_table \  .scan(row_filter=EqualTo('id'4)) \  .to_arrow() # Get the index of the 'updated_at' column and retrieve its field updated_at_col_index = filtered_table.column_names.index('updated_at') updated_at_col_field = filtered_table.field(updated_at_col_index) # Generate the current timestamp in ISO 8601 format current_utc_timestamp = datetime.datetime.utcnow().isoformat() # Replace the 'updated_at' column with the new timestamp value updated_table = filtered_table.set_column(  updated_at_col_index,  updated_at_col_field,  pa.array([current_utc_timestamp], type=pa.string()) # Ensure the data type aligns ) # Overwrite the existing row in the Iceberg table iceberg_table.overwrite(  df=updated_table,  overwrite_filter=EqualTo('id'4) )

Deleting Data from an Iceberg Table

Let's take a look at how we can delete a record where the id field is equal to 1.

python
from pyiceberg.expressions import EqualTo iceberg_table.delete( delete_filter=EqualTo('id'1) )

Schema Evolution in Iceberg

Schema changes are a common requirement. Iceberg supports safe schema evolution. Add a new column to an existing table:

python
catalog.alter_table("sales.customers", add_columns=[    NestedField(4"email", StringType()) ]) Confirm the change: print(catalog.load_table("sales.customers").schema())

Partitioning Data in Iceberg

Partitioning in Iceberg is dynamic and supports transformations like buckettruncate, and identity. Example:

python
from pyiceberg.transforms import bucket catalog.alter_table("sales.customers", add_partition_fields=[    bucket("id"4) ])

Conclusion

PyIceberg simplifies working with Apache Iceberg by providing a Python-native API without needing Spark or Trino. You can manage Iceberg tables, insert data with PySpark, and query efficiently with DuckDB. Compared to DuckDB, Iceberg provides better support for large-scale data processing, schema evolution, and ACID compliance, making it ideal for long-term data storage and processing needs.

Suggested Read

Related to Iceberg Setup and Catalogs:

Related to Data Ingestion and Querying:

Start streaming your data for free

Build a Pipeline
Share this article

Table of Contents

Start Building For Free

About the author

Picture of Dani Pálma
Dani PálmaHead of Data Engineering Marketing

Dani is a data professional with a rich background in data engineering and real-time data platforms. At Estuary, Daniel focuses on promoting cutting-edge streaming solutions, helping to bridge the gap between technical innovation and developer adoption. With deep expertise in cloud-native and streaming technologies, Dani has successfully supported startups and enterprises in building robust data solutions.

Popular Articles

Streaming Pipelines.
Simple to Deploy.
Simply Priced.
$0.50/GB of data moved + $.14/connector/hour;
50% less than competing ETL/ELT solutions;
<100ms latency on streaming sinks/sources.