流沙河鎮

情報技術系のこと書きます。

Apache Iceberg 1.5 アップデートまとめ

2024/3/11、Iceberg 1.5がリリースされた。
https://iceberg.apache.org/releases/
本記事ではIceberg 1.5のアップデート内容を紹介する。 記事内容はリリースノートと、各アップデートのPR、コードを基に作成した。

Specの仕様変更

Icebergの根幹であるIceberg Table Specに関わる変更。

テーブルメタデータパーティション統計情報を追加し、統計情報ファイルの仕様を定義
概要

テーブルメタデータ新しいフィールド「partition-statistics」が追加され、各テーブルスナップショットに関連付けられたパーティション統計情報ファイルを示すようになった。
パーティション統計情報ファイルの仕様も追加され、各パーティションのデータに関する統計情報をParquet、ORCなどのテーブルと同じデータファイル形式で保存するように定義された。
ファイルのスキーマには、パーティションキー、スペックID、レコード数、ファイル数、ファイルサイズ、削除レコード数などの情報が含まれる。今回の仕様追加は、コミット時に同期的に統計情報ファイルを書き込むのではなく、オンデマンドで非同期に生成することを想定しているが、将来的には同期書き込みの要望も検討する可能性がある模様。

関連PR

Spec: Add partition stats spec Track partition statistics in TableMetadata

パーティション統計情報のトラッキング機能を追加
概要

先述のSpec変更の実装。パーティションレベルの統計情報をトラッキングし、読み取りをより効率的に行えるようになった。統計情報は参考情報であり、必須ではない。
これに伴って、パーティションの統計情報を保持するTableMetadataPartitionStatisticsFile(パーティション統計情報ファイル)が導入された。パーティション統計情報ファイルの挙動は以下の通り。

  • パーティションレベルの統計情報の保持
  • スナップショットとの関連付け
    • パーティション統計情報ファイルは、特定のスナップショットIDに関連付けられる
    • スナップショットが削除されると、関連する統計情報ファイルも削除される
  • 統計情報の更新
    • 新しいスナップショットが作成されると、対応する統計情報ファイルが生成または更新される
    • UpdatePartitionStatistics APIを通じて、統計情報ファイルの追加・削除が行われる
  • オプショナルな参照情報として扱われる
    • 統計情報はデータ読み取りの最適化に使用されるが、必須ではない
    • 統計情報が存在しない場合でも、データの読み取りは正常に行われる
  • TableMetadataでの管理
    • 各テーブルのTableMetadataオブジェクトが、関連するパーティション統計情報ファイルのリストを保持
    • partitionStatisticsFiles()メソッドで統計情報ファイルのリストを取得できる
関連PR

Track partition statistics in TableMetadata
Spec: Add partition stats spec

ナノ秒精度のタイムスタンプ型を追加
概要

Icebergの仕様にナノ秒精度のタイムスタンプ型timestamp_nstimestamptz_nsが新たに追加された。これらの型はv3仕様で導入され、既存のtimestamptimestamptzはマイクロ秒精度のまま維持される。
Avroスキーマでは、timestamp_nstimestamptz_nsに新しい論理型timestamp-nanosが使用され、パーティショニングのハッシュ関数においてはnanoseconds from Unix epochに基づいて計算される。
ORCのマッピングでは、timestamptimestamp_instantナノ秒精度で格納されることが明記され、Icebergから書き出す際はナノ秒がマイクロ秒に切り捨てられる必要がある。
また、v3仕様では構造体のフィールドにデフォルト値を設定できるようになった。この変更はforward-compatibleである。
Design Docはこちら。

関連PR

Spec: add nanosecond timestamp types

パーティショニングとソートにおいて複数カラムを組み合わせたトランスフォームをサポート
概要

v2までのIcebergでは、パーティショニングとソートに使用するトランスフォームは単一のソースカラムを引数として受け取るが、特にBucketトランスフォームでは複数カラムを引数として使用することが一般的である。そこで、以下の変更を行うことで、複数カラムを組み合わせた柔軟なパーティショニングとソートを実現する。

  1. パーティションフィールドとソートフィールドの定義を拡張し、単一のソースカラムだけでなく複数のソースカラムを指定したトランスフォームを可能にする。
  2. パーティション進化の動作を単一引数と複数引数の両方に対応するよう変更する。
  3. JSONリアライゼーションにおいて、パーティションフィールドとソートフィールドの "source-ids" フィールドでソースカラムIDのリストを表現する。下位互換性のため、単一引数の場合は "source-id" フィールドを従来通り使用する。
  4. 複数引数のBucketトランスフォームにおいて、入力値をstructとして扱い、各フィールドの値を順番にハッシュ計算に使用することで、最終的なハッシュ値を計算する。

これらの変更は、バージョン3で導入される非互換の変更となる。
Design Docはこちら。

関連PR

Multi-arg transform support in Iceberg

カタログ機能の拡張

viewサポート
概要

REST catalogとJDBC catalog,NessieCatalogでviewがサポートされた。これに伴い、docにview specが追加されている。また、Iceberg views in Sparkなど個別エンジンのDocにもview操作に関する記述が追加されている。
ただし、viewサポートの有無、挙動は使用するcatalogの種類、クエリエンジンに依存する点は留意が必要だ。(無条件に使えるわけではない)

viewの詳細は以下の記事を参照して欲しい。

bering.hatenadiary.com

関連PR

Add view support for REST catalog
Add view support for JDBC catalog
Nessie: Support views for NessieCatalog
Core: Support view metadata compression
Spark: Support creating views via SQL
Spark: Support dropping views
Spark: Support renaming views
Spark: Add support for describing/showing views
Spark: Support altering view properties

テーブルごとの認証セッションのキャッシュを追加
概要

RESTSessionCatalogtableSessionsフィールドを追加し、各テーブルの認証セッションをキャッシュするようにした。これにより、使用されなくなったセッションのリフレッシュを中止し、パフォーマンスとリソース使用量を改善できる。

関連PR

Core: Add table session cache

カタログ実装指定方式の追加(glue,jdbc,nessie)
概要

カタログ実装にglue,jdbc,nessieを指定する際、従来は以下のようにcatalog-implに実装クラスを指定する必要があった。

# start Spark SQL client shell
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.0,org.apache.iceberg:iceberg-aws-bundle:1.5.0 \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
    --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
    --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO

今回のアップデートにより、以下のようにtypeで指定できるようになった。(後方互換性は確保されており、従来の指定方法も問題なく機能する)

# start Spark SQL client shell
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.0,org.apache.iceberg:iceberg-aws-bundle:1.5.0 \
    --conf spark.sql.defaultCatalog=my_catalog \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
    --conf spark.sql.catalog.my_catalog.type=glue \
    --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
関連PR

Core: Add catalog type for glue,jdbc,nessie

Spark

テーブルスキャン時に、エグゼキュータキャッシュのローカリティを有効にする機能を追加
概要

プロパティ spark.sql.iceberg.executor-cache.locality.enabled が導入された。(デフォルトはオフ)
有効化するとdeleteが存在する場合に同じpartitionのtaskを同じexecutorに割り当てるようになる。これにより、キャッシュの再利用率を高めることができる。

関連PR

Spark 3.5: Support executor cache locality

SparkからIcebergのデータ削除を行うためのマニフェストファイル書き換え処理を拡張
概要

ContentFileインターフェースを実装する抽象クラスSparkContentFileが追加された。このクラスは、SparkのRowオブジェクトからIcebergのContentFileオブジェクトにデータをラップするためのユーティリティクラスとして機能する。これにより、SparkからIcebergのテーブルに対してデータの削除を行う際に、削除されたデータをマニフェストファイルに反映するための処理が可能になった。

関連PR

Spark 3.5: Extend action for rewriting manifests to support deletes

Spark UIにIcebergスキャンのメトリクスを追加
概要

SparkScanにデータファイル、削除ファイル、マニフェストファイルのスキャン結果を集計する新しいメトリクスが追加された。これにより、Spark UIとドライバーメトリクスでクエリのパフォーマンスをより詳細に分析できるようになった。

関連PR

Add Spark UI metrics from Iceberg scan metrics #8717

Sparkテーブルのインポート時のファイルリスティングを並列化
概要

add_filesプロシージャにおいて、Sparkテーブルをインポートする際のファイルリスティングを並列化するための機能が追加された。parallelism引数を指定することで、ファイルの読み込みに使用するスレッド数を制御できるようになった。

関連PR

Spark 3.5: Parallelize reading files in add_files procedure

ファイル単位とパーティション単位のdelete粒度をサポート
概要

ファイル単位とパーティション単位の2種類のデリート粒度をサポートし、ユーザーが選択できるようになった。パーティション単位ではdeleteファイル総数を減らせるが無関係なdeleteが割り当てられる可能性があり、ファイル単位では無関係なdeleteの割り当てを防げるがdeleteファイル総数が増加する。現在はposition deletesにのみ適用可能。

関連PR

Core, Data, Spark 3.5: Support file and partition delete granularity

Spark 3.5で暗号化された出力ファイルを直接サポート
概要

SparkAppenderFactoryEncryptedOutputFileを引数に取るnewAppenderメソッドのオーバーロードを追加し、暗号化された出力ファイルへの直接書き込みをサポート。newDataWriternewEqDeleteWriternewPosDeleteWriterメソッドも変更され、EncryptedOutputFileを直接使用するようになっている。

関連PR

Encrypted output file in Spark 3.5

IcebergSourceにwatermark発行機能を追加
概要

SerializableRecordEmitterインターフェースとSplitWatermarkExtractorインターフェースを追加することで、レコードの発行とwatermarkの発行を行えるようになった。OrderedSplitAssignerFactoryを使用してsplitを順序付けし、ユーザーがwatermarkColumnを指定できるようにIcebergSourceを拡張した。

関連PR

Flink: Emit watermarks from the IcebergSource #8553

Flinkコネクタにwatermarkカラムを指定するオプションを追加
概要

FlinkのIcebergコネクタに、watermarkに使用するカラムを指定するwatermark-columnと、そのカラムの時間単位を指定するwatermark-column-time-unitの読み取りオプションが追加された。これによりSQLステートメントでこれらのパラメータを渡せるようになった。

select * from t /*+ OPTIONS('watermark-column'='t2','watermark-column-time-unit'='MILLISECONDS')*/
関連PR

Flink: Watermark read options

セキュリティとプライバシー

Avroの暗号化サポートを追加
概要

Apache IcebergにAvroの暗号化サポートを追加し、暗号化されたAvroファイルを適切に分割して読み取ることができるようになった。

関連PR

Avro data encryption

ParquetファイルのKMS暗号化をサポート
概要

Apache Iceberg プロジェクトにParquetファイルのネイティブ暗号化をサポートするための基本的な暗号化機能(EncryptionManager)が実装された。KMSと連携して鍵管理を行う仕組み(StandardEncryptionManager`)が提供されている。

関連PR

Core: Add StandardEncryptionManager

暗号化されたメタデータファイルの操作を改善
概要

FileIOをImplementするEncryptingFileIOによって、メタデータファイルの暗号化有無に関わらず互換性を維持して操作できるようになった。

関連PR

Extend FileIO and add EncryptingFileIO.

サポート

Spark 3.2のサポートを削除
概要

GitHub Actions、ドキュメント、Gradleの設定ファイル、ソースコードなどからSpark 3.2関連の記述を削除し、Spark 3.2のサポートを終了した。

関連PR

Spark: Remove support for Spark 3.2

概要

GitHub Actions、ドキュメント、Gradleの設定ファイル、ソースコードなどからFlink 1.15関連の記述を削除し、Flink 1.15のサポートを終了した。同時にFlink 1.18のサポートが追加された。

ベンダ製品との統合

AWS
Glue CatalogのDescriptionとの統合
概要

Apache IcebergとAWS Glue Catalog間で、データベースとテーブルの説明(Description)をシームレスに連携できるようになった。Glue側で設定した説明をIcebergから参照でき、Icebergで設定した説明もGlueカタログに反映される。

関連PR

AWS: Support setting description for Glue table

Glue Catalogのdefault db location末尾のスラッシュを削除
概要

Glue CatalogクラスのdefaultWarehouseLocationメソッドとloadNamespaceMetadataメソッドにおいて、データベースのlocationUriから末尾のスラッシュを削除するように修正された。これにより、テーブルの場所を正しく構築でき、パスの問題を防ぐことができる。

関連PR

AWS: Glue catalog strip trailing slash on DB URI #8870

S3 Access Grantsのサポートを追加
概要

S3FileIOにS3 Access Grants(S3AG)のサポートを追加。カタログでS3AGが有効になっている場合のみ、S3クライアントにS3AG用のSDKプラグインが追加されるようになった。

関連PR

AWS: Add S3 Access Grants Integration

S3FileIOのテストを改善
概要

S3FileIOのテストを更新し、CLIENT_FACTORYが設定されていない場合でも動作することを確認。これにより、DefaultAwsClientFactoryDefaultS3FileIOAwsClientFactoryの両方でテストが可能になった。

関連PR

AWS: Update S3FileIO test to run when CLIENT_FACTORY is not set

Azure
Azure Data Lake Storage Gen2 用の FileIO を追加
概要

Azure Data Lake Storage Gen2に対応した新しいFileIOの実装が追加された。abfsabfssスキームの URI をサポートし、レンジリードやプレフィックス、バルク操作のインターフェースを備える。認証は現在、Azure の資格情報チェーン、SAS トークン、接続文字列をサポート。SparkとFlinkのランタイムの依存関係にも追加され、必要な依存関係を含む Azureバンドルプロジェクトも作成された。

関連PR

Azure: Add FileIO that supports ADLSv2 storage
Azure: Make ADLSFileIO implement DelegateFileIO

Nessie
NessieCatalogのwarehouseのパス操作を改善
概要

NessieCatalogにおいて、warehouseのパスから末尾のスラッシュを取り除くように変更し、他のカタログとの互換性を維持しつつ、ダブルスラッシュの問題を解消した。

関連PR

Nessie: Strip trailing slash for warehouse location

URIからデフォルトのAPIバージョンを推測するように変更
概要

これまではデフォルトのAPIバージョンがv1に固定されていたが、今回の変更によりURIの末尾からデフォルトのバージョンが自動的に推測されるようになった。これにより、v2を使用する際にはv2のURIを設定するだけで済むようになった。ただし、カスタムURIの末尾にバージョン番号がない特殊なケースでは、引き続き client-api-version を使用してバージョンを明示的に指定できる。

関連PR

Nessie: Infer default API version from URI

その他の改善

列統計データの取り扱いを改善
概要

Scan.includeColumnStatsメソッドとContentFile.copyWithSpecificStatsメソッドの追加により、ユーザーが必要な列の統計データのみを保持し、メモリ使用量を削減できるようになった。

関連PR

Core: Enable column statistics filtering after planning

Iceberg Kafka Connect Sink connectorの初期実装
概要

Iceberg Kafka Connect Sink connectorの初期実装。Tabularが開発したiceberg-kafka-connectが元になっている。まだ完成形ではない模様(筆者がKafkaに明るくないので完成度合いがよく分からないが...)

関連PR

Initial project setup and event data structures
Kafka Connect: Sink connector with data writers and converters

Parquetの INT96 型カラムのフィルタリングをサポート
概要

Parquetの INT96 型カラムに対して、行グループレベルのフィルタリングができるようになった。INT96型のデコードロジックが追加され、ディクショナリエンコードされたINT96型カラムのフィルタリングもサポートされた。

関連PR

Parquet: Support reading INT96 column in row group filter

avroのカラムの論理名と物理名を分離するマッピングを導入
概要

ApplyNameMappingによって、AvroファイルのフィールドとIcebergのフィールドの名前を分離して扱うことができるようになった。

関連PR

Core: Add ApplyNameMapping for Avro

SQLのdialectに対応するビューのrepresentationを解決するAPIを追加
概要

指定されたデータベース方言(dialect)に基づいてビューの表現(representation)を解決する ViewRepresentation sqlFor(String dialect) APIが追加。

関連PR

API, Core: Add sqlFor API to views to handle basic resolution of dialect

不安全なParquet IDフォールバックの設定追加
概要

Netflixのシステム独自の挙動である不安全なParquet IDフォールバック(列IDを位置に基づいて割り当てる)を有効または無効にする設定が追加された。この動作は公開前のIcebergに由来し、マッピングを使用できなかったNetflixのデータセットにのみ適用される。

関連PR

Parquet: Add system config for unsafe Parquet ID fallback.

依存関係の変更

Icebergが利用する各種ライブラリが以下の通り更新された

  • Nessie to 0.77.1
  • ORC to 1.9.2
  • Arrow to 15.0.0
  • AWS Java SDK to 2.24.5
  • Azure Java SDK to 1.2.20
  • Google cloud libraries to 26.28.0