Bering Note – formerly 流沙河鎮

情報技術系のこと書きます。

Iceberg Summit 2026「Rust が拓く Iceberg の未来: Rust データエコシステムへの Iceberg 統合」- Sean (Yexiangchang), AWS

Iceberg Summit 2026のセッション「A Rusty Future: Bringing Iceberg to the Rust Data Ecosystem」をまとめます。

A Rusty Future: Bringing Iceberg to the Rust Data Ecosystem - YouTube

スピーカー

このセッションは、AWS EMR で約 4 年にわたり活動する Sean 氏(Yexiangchang)によって行われました。もともと Apache Hudi の開発に携わり、現在も Hudi のコミッターを務めています。近年は Iceberg Rust の開発に最も力を注いでおり、本セッションではその実装の最前線を語っています。

なぜ Iceberg Rust なのか


Iceberg Rust を選ぶ理由は、Iceberg という形式の性質と、Rust という言語の特性の二つに分けて考えられます。Iceberg を語るとき、開発者の多くはメタデータのツリー構造や、その下にデータファイルがどう並んでいるかを思い浮かべます。しかし一歩引いて見れば、Iceberg はストレージ上に存在するバイト列にすぎません。本来、特定のエンジンや言語に縛られない、エンジン非依存・言語非依存の形式です。だからこそ実装の多様性を受け入れる余地があります。

もう一方の Rust という言語そのものにも利点があります。最も分かりやすいのは性能です。GC(ガベージコレクション)を持たないため、メモリ管理に起因する問題に遭遇しにくくなります。加えて、Rust は強力なメモリ安全性と並行処理のサポートを言語レベルで備えています。

Iceberg Rust を語るうえで、PyIceberg の存在は外せません。両者は密接に結びついて機能します。Python は AI の台頭とともにますます支配的な言語になっていますが、大規模なデータ処理では性能と並行処理に限界を抱えています。この弱点は、PyIceberg から Iceberg Rust のバインディングを呼び出すことで緩和できます。バインディングは PyO3(Rust と Python を相互に呼び出すためのライブラリ)を使って実現しています。

現在、PyIceberg からいくつかの操作を Iceberg Rust 側に委譲できます。マニフェストの読み取りと partition transform はどちらも計算負荷の高い処理で、Rust 側で実行する価値があります。さらに DataFusion の TableProvider も委譲先になります。これは PyIceberg の読み取りを DataFusion に肩代わりさせる仕組みで、読み取りのワークロードを実際には DataFusion 内の Rust コードで動かせるという形です。

この 1 年のコミュニティの進展


この1年で Iceberg Rust コミュニティは多くの機能を積み上げてきました。挙げられている進展のひとつが V3 metadata support で、Iceberg のテーブル仕様 V3 のメタデータを読み解けるようになっています。

Storage Trait は、ストレージとの I/O 実装を差し替え可能にする仕組みです。Iceberg Java の FileIO に近く、ストレージバックエンドとどうやり取りするかを利用者が独自実装としてプラグインできます。これに関連して Catalog Builder も用意されました。Rust にはリフレクションが無いため、Java のように設定文字列でカタログ種別や FileIO 実装を渡して動的に組み立てるやり方が使えません。そこで私たちは API を明示的に公開し、利用者が自前のカタログやストレージを Iceberg Rust に注入できるようにしています。Catalog Builder は、そのカタログ構築を手軽に行うための API です。

マニフェストリーダーの Python binding は、PyIceberg からマニフェスト読み取りを Rust 側に委譲する経路として既に触れたとおりです。さらに delete file scanning support が加わったことで、削除ファイルを伴うスキャンに対応し、Iceberg V2 の読み取りを一通り満たせる状態になりました。これに加えて性能改善や DataFusion との互換性向上も進んでいます。

この1年の取り組みのなかで、私たちが力を入れたのが Iceberg Rust の書き込み機能です。昨年までの Iceberg Rust には、まともな writer の仕組みがほとんどありませんでした。

書き込みを実現するために、4 つの機能が同じ目標に向けて積み上げられています。リトライ可能なトランザクション(Retryable transactions)、カタログ側の Catalog::update_table 実装、パーティション単位で書き出す Partitioning writer、そして DataFusion の INSERT INTO サポートです。最終的なゴールは 4 番目で、DataFusion の DML を使ってパーティションテーブルへ INSERT INTO できるようにすることにあります。これらは複雑に見えても、Iceberg Rust コミュニティが課題にどう取り組んでいるかを示す一連の流れになっています。

リトライ可能なトランザクションの設計


書き込みを支える最初の構成要素が、リトライ可能なトランザクションです。トランザクションは複数の Iceberg アクションの集まりで、アクションとは set location、統計情報の更新、ファイルの fast append といった個々の操作を指します。トランザクションをリトライ可能にするには、その構成要素であるアクション自体もリトライ可能でなければなりません。

従来の Iceberg Rust の Transaction 構造体は4つのフィールドを持っていました。参照を保持する base_table、ステージ用の current_table、そして updates: Vec<TableUpdate>requirements: Vec<TableRequirement> です。ここで注目したいのは、updatesrequirements がアクションを base_table に適用した結果として生成される点です。つまりこれらは base_table に対する差分(delta)でしかありません。アクションを base_table に適用して states(updates + requirements)が得られる、という関係になっています。

差分だけを保持する設計が、リトライの妨げになります。トランザクションが他の書き込みと競合して失敗した場合、base_table をリフレッシュして最新の状態を取り込む必要があります。ところが構造体が保持しているのは古い base_table に対する差分だけなので、新しい base_table に対してその差分を再適用しても意味のある結果になりません。

ここから導かれる観察は、アクションを states として保存するのではなく、トランザクションがアクションそのものを直接保持すべきだ、というものです。トランザクションにアクションを定義して格納しておけば、競合が起きても base_table を更新したうえでアクションを最初から適用し直せます。残る問題は、そのアクションをどう定義するかです。

trait TransactionAction の導入


この問題に対して、私たちは TransactionAction という新しい trait を定義しました。新しい Transaction 構造体は差分の保持をやめ、table: Tableactions: Vec<BoxedTransactionAction> だけを持ちます。以前の updatesrequirements は捨て去り、適用すべきアクションのリストだけを抱える形です。

この設計の利点は競合時の挙動に表れます。トランザクションが衝突したら base_table をリフレッシュし、保持しているアクションのリストを丸ごと再適用するだけで済みます。状態の差分ではなく、適用したアクションそのものを持っているからです。ここで重要な前提として、これが成り立つのは TransactionAction の実装がテーブルの情報に依存しない不変な値(immutables)だけを保持する場合に限られます。アクションとテーブルを組み合わせて初めて states が得られる(Action + Table = States)という関係です。

例: FastAppendAction


この考え方は FastAppendAction を例にすると具体的になります。左側の構造体は check_duplicatecommit_uuidkey_metadatasnapshot_propertiesadded_data_files といった状態を持ちます。これらはいずれもテーブルの状態に依存しません。そのため base_table をリフレッシュしても値はそのまま有効で、後から再適用できます。

右側が TransactionAction trait の実装です。commit の中では、テーブルの状態とアクションが抱える状態を組み合わせ、それらを SnapshotProducer に渡します。SnapshotProducerActionCommit、すなわち updates と requirements を返します。左下のフローチャートが示すとおり、FastAppendAction と Table を commit に通すと ActionCommit(updates, requirements)が生成される、という流れです。なお FastAppendAction はアクションの中でも複雑な部類で、set location のような単純なアクションであれば commit の実装はさらに簡潔になりますが、論理の流れは同じフローチャートに従います。

カタログ更新とアトミックスワップ


書き込みを成立させる二つ目のステップが、Catalog::update_table の実装です。API のシグネチャは async fn update_table(&self, commit: TableCommit) -> Result<Table>; で、引数の TableCommit を受け取り、更新後の Table を返します。

この API が担うのは、Iceberg のアトミックスワップそのものです。図にあるように、カタログはメタデータファイルへのポインタを持っています。新しいメタデータ(Version N+1)を生成してストレージに書き込んだあと、カタログが指すポインタを旧バージョン(Version N)から新バージョンへ切り替えます。ポインタの差し替えが原子的に行われるため、次に読み取るときは常に最新のメタデータが返ります。この差し替えの仕組みは、これまで Iceberg Rust のカタログ実装には組み込まれていませんでした。

update_table の中身は5つの処理に分解できます。最初にカタログから既存のテーブル(base table)をロードします。次に引数で渡された TableCommit を適用します。TableCommit は複数の ActionCommit を集約したもので、これまで説明してきた updates と requirements を束ねたものです。これをロード済みのテーブルに適用すると、更新後のメタデータをメモリ上に持った staged table が得られます。

staged table ができた時点で、その新しいメタデータをストレージに書き込みます。コード上は staged_table.metadata().write_to(file_io, metadata_location) の形で、テーブルが保持する file_io とメタデータの書き込み先パスを使って永続化します。最後にメタデータポインタを差し替え、次回以降の読み取りで正しいバージョンへ誘導されるようにします。

この例は in-memory catalog という最も単純な実装に基づいています。root_namespace_state.lock() でロックを取り、ロック下でテーブルをロードし、最終的に commit_table_update(staged_table) でポインタを切り替える流れです。カタログの実装は数多くありますが、いずれもこの同じロジックに従うことになります。

パーティショニングに対応したライタ層


ライタ層は内側ほど低レベルになる入れ子構造で組み立てています。最も内側の FileWriter(実体は ParquetWriter)が Parquet ファイルを直接書き出し、その外側を RollingFileWriter が包んでファイルサイズに応じた切り替えを担います。さらに外側に DataWriter または DeleteWriter があり、データファイルか delete ファイルかという書き込み対象の違いを吸収します。これらをまとめて IcebergWriter と呼んでいます。

ここまでの層はいずれもパーティション値を意識しません。つまり従来はパーティションのないデータしか書けませんでした。新しく追加したのが、その外側を包む PartitioningWriter 層です。エンジン固有の TaskWriter から渡されたデータを、パーティション値に応じて適切な内側のライタへ振り分ける役割を持ちます。

PartitioningWriter には FanoutWriter と ClusteredWriter の2種類を用意しています。FanoutWriter は入力データをパーティション値で分割し、パーティションごとにライタを起動して並行に書き込みます。ClusteredWriter は単一スレッドのライタで、入力データがパーティション値でソート済みであることを前提とします。ソートされていないデータを ClusteredWriter に流すと、複数のパーティションの間を行ったり来たりしてライタの開閉を繰り返すため、効率が落ちるうえに小さなファイルが大量に生成されてしまいます。

INSERT INTO を実現するため、Iceberg Rust 側から DataFusion 向けの実行ノードを提供しています。論理プラン・実行プランのオペレータとして組み込み、それらを組み合わせて書き込みの一連の処理を構成します。

デフォルトで用意しているノードは6つです。Input Node は DataFusion 側から渡される入力データをそのまま受け取ります。Project Node はパーティション値を計算します。Iceberg には partition transform があり、パーティション値をデータの一フィールドとしてそのまま持つとは限らないため、ここで値を算出する必要があります。Repartition Node は複数マシン環境でデータを均等に分散させ、最適な並列度を決めます。Sort Node は入力データをソートします。これは前述の ClusteredWriter がソート済みデータを前提とすることに対応しており、ソートが必要な場合のための場所です。

Writer Node は TaskWriter を起動して入力データを Parquet ファイルとしてディスクへ書き出します。最後の Commit Node が最も重要な役割を担い、書き込んだデータを Iceberg のトランザクション API とカタログを使ってコミットします。

DataFusion との INSERT INTO 統合


先に挙げた6つのノードが実際に組み上がるのは、DataFusion の TableProvider trait です。スライドのコードは、この trait が公開する insert_into メソッドのシグネチャを示しています。async fn insert_into(&self, _state: &dyn Session, _input: Arc<dyn ExecutionPlan>, _insert_op: InsertOp) -> Result<Arc<dyn ExecutionPlan>> という形で、入力の ExecutionPlan と挿入操作の種別 InsertOp を受け取り、書き込みを実行する ExecutionPlan を返します。デフォルト実装は not_impl_err!("Insert into not implemented for this table") を返すだけなので、Iceberg 側はこのメソッドを実装し、Input/Project/Repartition/Sort/Writer/Commit の各オペレータをここで束ねています。

この設計の利点は、DataFusion が柔軟な API を提供している点と、Iceberg Rust がオペレータ単体を公開している点が噛み合うところにあります。デフォルトの TableProvider をそのまま使う必要はありません。利用者が自分のユースケースに合わせて独自の TableProvider を実装し、提供されたオペレータを好きなように組み替えることもできます。

実際の挙動は INSERT INTO default.default.query_test_table VALUES (1, 'Alice', 95.5, 'A'), (2, 'Bob', 87.0, 'B'); のような単純な SQL で確認できます。この一文を投げると、これまで積み上げてきたトランザクションのリトライ機構、Catalog::update_table によるアトミックスワップ、PartitioningWriter までの書き込みパイプラインがすべて連動し、パーティションテーブルへの行追加として完結します。

今後のロードマップ


Iceberg Rust には公式なロードマップは存在しません。ここで示す計画は、コミュニティや実際の利用者から拾い上げたシグナルをもとにまとめたものです。大きく「Core Infrastructure」と「Integrations」の二つに分けて整理しています。

Core Infrastructure の一つ目は Storage Trait の拡張です。Storage Trait はカスタムストレージを差し込むための仕組みでした。最近はオブジェクトストアの利用が広がっており、DataFusion 自身も object_store をハードコードで使っています。私たちはストレージ層で中立を保つために、既存の OpenDAL ベースの実装に加えて iceberg-storage-object_store という公式のオブジェクトストア実装を提供することを検討しています。

二つ目はトランザクションの拡充です。現状でサポートしているのは FastAppendAction だけで、これは数あるアクションの中の一つにすぎません。Merge append、Delete、Row delta、Rewrite といったアクションを順次実装してスペックに近づけていきます。個人的に最も関心が高いのは Rewrite トランザクションです。これが入れば Iceberg Rust を使った compaction の道が開けます。コミュニティからの要望も多い領域です。

三つ目は Core API の整備です。テーブル暗号化(Table Encryption)は別の開発者が現在進行中で取り組んでいます。あわせて Runtime API と FileFormat API の追加も計画しています。Integrations 側では DataFusion との連携として、INSERT INTO に続く DataFusion DML サポートの拡大を予定しています。

まとめ

Iceberg がエンジン非依存・言語非依存の形式である以上、Rust による実装はその多様性を素直に受け入れる選択になります。この1年で Iceberg Rust は読み取りに加えて書き込みの基盤を整え、リトライ可能なトランザクション、Catalog::update_table によるアトミックスワップ、PartitioningWriter、そして DataFusion の INSERT INTO までを一本のパイプラインとしてつなぎました。今後は Storage Trait の拡張やトランザクションの拡充、なかでも compaction につながる Rewrite アクションの実装が焦点になります。PyIceberg や DataFusion との連携を通じて、Iceberg Rust は Rust データエコシステムの土台として育ちつつあります。