Data versioning with Iceberg

Apache Iceberg is an open table format for analytic datasets. Iceberg tables offer features such as schema evolution, hidden partitioning, partition layout evolution, time travel, and version rollback. This guide explains how to use Iceberg tables with Kedro. For this tutorial, we will use pyiceberg which is a library that allows you to interact with Iceberg tables using Python, without the need of a JVM. It is important to note that pyiceberg is a fast evolving project and does not support the full range of features that Iceberg tables offer. You can use this tutorial as inspiration to extend the functionality using different compute engines such as Spark, or dataframe technologies such as Apache Arrow, DuckDB, and more by creating your own custom datasets.

Prerequisites

You will need to create a spaceflights-pandas starter project which contains example pipelines to work with called kedro-iceberg. If you haven’t already, you can create a new Kedro project using the following command:

kedro new --starter spaceflights-pandas --name kedro-iceberg

To interact with Iceberg tables, you will also need the pyiceberg package installed. You can install it by adding the following line to your requirements.txt:

pyiceberg[pyarrow]~=0.8.0

Depending on your choice of storage, you may also need to install the optional dependencies. Consult the installation guide for PyIceberg to update the line above with the necessary optional dependencies.

Now you can install the project requirements with the following command:

pip install -r requirements.txt

Set up the Iceberg catalog

Iceberg tables are managed by a catalog which is responsible for managing the metadata of the tables. In production, this could be a Hive, Glue, or other catalog supported by Apache Iceberg. Iceberg also supports various storage options such as S3, HDFS, and more. There are multiple ways you can configure the catalog, credentials, and object storage to suit your needs by referring to the configuration guide. For this tutorial, we will use the SQLCatalog which stores the metadata in a local sqlite database and uses the local filesystem for storage.

Create a temporary location for Iceberg tables by running the following command:

mkdir -p /tmp/warehouse

There are multiple ways to configure the catalog, and for this tutorial, you can use the ~/.pyiceberg.yaml file. By default, pyiceberg looks for the .pyiceberg.yaml file in your home directory, that is, it looks for ~/.pyiceberg.yaml. You can create or update the existing file .pyiceberg.yaml in your home directory with the following content:

catalog:
  default:
    type: sql
    uri: sqlite:////tmp/warehouse/pyiceberg_catalog.db
    warehouse: file:///tmp/warehouse/warehouse

You can check if the configuration is loading by opening a Python shell with ipython command and running the following code:

from pyiceberg.catalog import load_catalog
catalog = load_catalog(name="default")

Define a custom dataset to use Iceberg tables

To use the Iceberg tables with Kedro, you will need to define a custom dataset that uses the pyiceberg library. Create a new file called pyiceberg_dataset.py in the src/kedro_iceberg/ directory of your project and copy the following code:

import pyarrow as pa
from kedro.io.core import AbstractDataset, DatasetError
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NoSuchTableError

DEFAULT_LOAD_ARGS = {"load_version": None}
DEFAULT_SAVE_ARGS = {"mode": "overwrite"}

class PyIcebergDataset(AbstractDataset):
    def __init__(
            self,
            catalog,
            namespace,
            table_name,
            load_args=DEFAULT_LOAD_ARGS,
            scan_args=None,
            save_args=DEFAULT_SAVE_ARGS,
    ):
        self.table_name = table_name
        self.namespace = namespace
        self.catalog = load_catalog(catalog)
        self.load_args = load_args
        self.table = self._load_table(namespace, table_name)
        self.save_args = save_args
        self.scan_args = scan_args


    def load(self):
        self.table = self.catalog.load_table((self.namespace, self.table_name))
        if self.scan_args:
            scan = self.table.scan(**self.scan_args)
        else:
            scan = self.table.scan()
        return scan.to_pandas()

    def _load_table(self, namespace, table_name):
        try:
            return self.catalog.load_table((namespace, table_name))
        except NoSuchTableError:
            return None

    def save(self, data) -> None:
        arrow = pa.Table.from_pandas(data)
        if not self.table:
            self.catalog.create_namespace_if_not_exists(self.namespace)
            self.table = self.catalog.create_table((self.namespace, self.table_name), schema=arrow.schema)
        if self.save_args.get("mode") == "overwrite":
            self.table.overwrite(arrow)
        elif self.save_args.get("mode") == "append":
            self.table.append(arrow)
        else:
            raise DatasetError("Mode not supported")

    def _describe(self) -> dict:
        return {}

    def exists(self):
        return self.catalog.table_exists((self.namespace, self.table_name))

    def inspect(self):
        return self.table.inspect

This dataset allows you to load Iceberg tables as pandas dataframes. You can also load a subset of the table by passing the scan_args parameter to the dataset. The save() method allows you to save a dataframe as an Iceberg table. You can specify the mode of saving the table by passing the save_args parameter to the dataset. The inspect() method returns an InspectTable object which contains metadata about the table. You can also update the code to extend the functionality of the dataset to support more features of Iceberg tables. Refer to the Iceberg API documentation to see what you can do with the Table object.

Using Iceberg tables in the catalog

Save the dataset as an Iceberg table

Now update your catalog in conf/base/catalog.yml to update the model_input_table dataset to use the custom PyIcebergDataset:

model_input_table:
  type: kedro_iceberg.pyiceberg_dataset.PyIcebergDataset
  catalog: default
  namespace: default
  table_name: model_input_table

Now run your Kedro project with the following command:

kedro run

You can inspect the model_input_table dataset created as an Iceberg table by running the following command:

tree /tmp/warehouse

The output should look something like:

/tmp/warehouse
├── pyiceberg_catalog.db
└── warehouse
    └── default.db
        └── model_input_table
            ├── data
               ├── 00000-0-a3d0f3e6-a9b4-44e4-8dac-95d4b5c14b29.parquet
               └── 00000-0-baa30a43-2cad-4507-967e-84c744d69c9b.parquet
            └── metadata
                ├── 00000-e66a465e-cdfa-458e-aaf3-aed48ac49157.metadata.json
                ├── 00001-d1fa7797-ef6f-438e-83c0-bdaabf1bd8de.metadata.json
                ├── 00002-f0b5294e-a0be-450c-95fe-4cee96c9a311.metadata.json
                ├── 836ecf2e-e339-44d8-933a-bd978991ea3e-m0.avro
                ├── a3d0f3e6-a9b4-44e4-8dac-95d4b5c14b29-m0.avro
                ├── baa30a43-2cad-4507-967e-84c744d69c9b-m0.avro
                ├── snap-3087457244520966174-0-836ecf2e-e339-44d8-933a-bd978991ea3e.avro
                ├── snap-3885749350984242152-0-a3d0f3e6-a9b4-44e4-8dac-95d4b5c14b29.avro
                └── snap-7387825159950300388-0-baa30a43-2cad-4507-967e-84c744d69c9b.avro

Suppose the upstream datasets companies, shuttles, or reviews are updated. You can run the following command to generate a new version of the model_input_table dataset:

kedro run --to-outputs=model_input_table

You can use the find /tmp/warehouse/ command to inspect the updated dataset and logs.

Load a specific dataset version

To load a specific version of the dataset, you can specify the snapshot_id of the version you want in the scan_args of the table in the configuration. You can get the snapshot_id from the history of the table. The section below explains how to inspect the history of the table in interactive mode.

model_input_table:
  type: kedro_iceberg.pyiceberg_dataset.PyIcebergDataset
  catalog: default
  namespace: default
  table_name: model_input_table
  table_type: pandas
  scan_args:
    snapshot_id: <snapshot_id>

Inspect the dataset in interactive mode

You can inspect the history, metadata, schema, and more of the Iceberg table in an interactive Python session. To start the IPython session with Kedro components loaded, run:

kedro ipython

Load the instance of the PyIcebergDataset using the catalog.datasets attribute:

In [1]: model_input_table = catalog.datasets['model_input_table']

You can inspect the history of the Delta table by accessing the InspectTable object with the inspect() method:

In [2]: inspect_table = model_input_table.inspect()

Now you can call the history() method on the InspectTable object to get the history of the table:

In [3]: inspect_table.history()
Out [3]:

pyarrow.Table
made_current_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
is_current_ancestor: bool not null
----
made_current_at: [[2025-02-26 11:42:36.871,2025-02-26 12:08:38.826,2025-02-26 12:08:38.848]]
snapshot_id: [[9089827653240705573,5091346767047746426,7107920212859354452]]
parent_id: [[null,9089827653240705573,5091346767047746426]]
is_current_ancestor: [[true,true,true]]

Alternatively, you can also call the history() method from the pyiceberg.table.Table object directly which shows a more consise output:

In [4]: model_input_table.table.history()
Out [4]:
[
    SnapshotLogEntry(snapshot_id=7387825159950300388, timestamp_ms=1741190825900),
    SnapshotLogEntry(snapshot_id=3087457244520966174, timestamp_ms=1741190833531),
    SnapshotLogEntry(snapshot_id=3885749350984242152, timestamp_ms=1741190833554)
]

Similarly, you can call other methods on the InspectTable object to get more information about the table, such as snapshots(), schema(), partitions(), and more.