
はじめに
2024/3/11、Iceberg 1.5がリリースされた。Apache Icebergは、大規模なデータセットを管理するためのOpen Table Formatの一種である。Icebergを使用することで、ハイパフォーマンスで運用管理性と信頼性に優れたデータレイクを構築できる。Open Table FormatやIcebergの詳細は以下を参照して欲しい。
Iceberg 1.5で追加された重要な機能の一つがviewのサポートだ。Icebergにおけるviewの仕様を定めるIceberg View Specが定義され、いくつかのCatalog実装がviewの操作をサポートした。Iceberg viewによって、データ基盤上の複数のエンジン間で共通のフォーマットによってviewを管理できる。また、viewの変更を追跡し、バージョンを管理して必要に応じてロールバックすることも可能になる。
加えて、Iceberg 1.5では、viewを活かしたSparkのStored Prodecureとしてcreate_changelog_viewも追加された。このProdecureを使用すると、指定した範囲内にテーブルで発生した行レベルの変更(挿入、更新、削除)をキャプチャできるため、CDCを実装する上で役に立つ。
本記事では、Iceberg viewの仕組みと機能を紹介しつつ、実践的な活用例としてcreate_changelog_viewについて解説する。
Iceberg view概要
一般的なクエリエンジンにおけるviewの役割
多くのクエリエンジンでは、テーブルを単位としてデータを管理して、それに対して検索や集計を行う。一方で、要件によっては複数のテーブルを結合したり、必要な項目だけに絞り込んだりした結果を新たな「テーブル」として扱いたい場面がある。このような、他のテーブルから作られる仮想的なテーブルを「view」と呼ぶ。
viewは例えば以下のような目的で使われる。
- 複雑なクエリを簡略化: 結合や集約を事前に行っておくことで、ユーザは簡単なSELECT文でデータを取得できる
- セキュリティ向上: テーブルの特定のカラムだけをviewで公開することで、不要な情報へのアクセスを制限できる(何らかのアクセス制御機構の存在が前提)
- データの独立性確保: テーブル定義が変更されてもアプリケーション側はviewを使用することでデータ構造の独立性が高まる(場合がある)
Iceberg viewを使ってみる
内部構造の説明に立ち入る前に、まずは大まかな挙動を観察してみよう。
ニューヨーク市が配布する、ニューヨーク市タクシー・リムジン委員会の移動記録データを格納するテーブル「taxis」があるとする。
taxis.printSchema() root |-- VendorID: long (nullable = true) |-- tpep_pickup_datetime: timestamp_ntz (nullable = true) |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true) |-- passenger_count: double (nullable = true) |-- trip_distance: double (nullable = true) |-- RatecodeID: double (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- PULocationID: long (nullable = true) |-- DOLocationID: long (nullable = true) |-- payment_type: long (nullable = true) |-- fare_amount: double (nullable = true) |-- extra: double (nullable = true) |-- mta_tax: double (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- total_amount: double (nullable = true) |-- congestion_surcharge: double (nullable = true) |-- airport_fee: double (nullable = true)
taxis.show(3,vertical=True) -RECORD 0------------------------------------ VendorID | 1 tpep_pickup_datetime | 2021-04-01 00:00:18 tpep_dropoff_datetime | 2021-04-01 00:21:54 passenger_count | 1.0 trip_distance | 8.4 RatecodeID | 1.0 store_and_fwd_flag | N PULocationID | 79 DOLocationID | 116 payment_type | 1 fare_amount | 25.5 extra | 3.0 mta_tax | 0.5 tip_amount | 5.85 tolls_amount | 0.0 improvement_surcharge | 0.3 total_amount | 35.15 congestion_surcharge | 2.5 airport_fee | 0.0 -RECORD 1------------------------------------ VendorID | 1 tpep_pickup_datetime | 2021-04-01 00:42:37 tpep_dropoff_datetime | 2021-04-01 00:46:23 passenger_count | 1.0 trip_distance | 0.9 RatecodeID | 1.0 store_and_fwd_flag | N PULocationID | 75 DOLocationID | 236 payment_type | 2 fare_amount | 5.0 extra | 3.0 mta_tax | 0.5 tip_amount | 0.0 tolls_amount | 0.0 improvement_surcharge | 0.3 total_amount | 8.8 congestion_surcharge | 2.5 airport_fee | 0.0 -RECORD 2------------------------------------ VendorID | 1 tpep_pickup_datetime | 2021-04-01 00:57:56 tpep_dropoff_datetime | 2021-04-01 01:08:22 passenger_count | 1.0 trip_distance | 3.4 RatecodeID | 1.0 store_and_fwd_flag | N PULocationID | 236 DOLocationID | 168 payment_type | 2 fare_amount | 11.5 extra | 3.0 mta_tax | 0.5 tip_amount | 0.0 tolls_amount | 0.0 improvement_surcharge | 0.3 total_amount | 15.3 congestion_surcharge | 2.5 airport_fee | 0.0 only showing top 3 rows
ここで、1日ごとの移動数を集計するview「nyc.pickup_agg」を定義する。
spark.sql(''' CREATE OR REPLACE VIEW nyc.pickup_agg ( pickup_count COMMENT 'Count of pickup', pickup_date ) COMMENT 'Daily pickup counts' AS SELECT COUNT(1), CAST(tpep_pickup_datetime AS DATE) FROM nyc.taxis GROUP BY CAST(tpep_pickup_datetime AS DATE) ''')
pickup_dateごとの乗車数を集計するviewが出来た。
spark.sql(''' SELECT * from nyc.pickup_agg limit 3 ''').show() +------------+-----------+ |pickup_count|pickup_date| +------------+-----------+ | 82210| 2021-04-29| | 78858| 2021-04-24| | 5| 2009-01-01| +------------+-----------+
Iceberg viewのコンセプト
メタデータ形式の共有
従来、クエリエンジンはview定義を独自の形式でメタストアに保存してきたため、他のエンジンから参照・変更することが難しかった。そこで、Iceberg viewはtableと同様にviewのメタデータを共通フォーマットとして保存することで、エンジン間でのviewの共有を可能にする。
viewのバージョン管理
viewの変更履歴を追跡して、必要に応じて過去のバージョンにロールバックできるようにする。(バージョン管理はテーブルと同様の仕組みで、各バージョンのメタデータをatomicに入れ替えることで実現する)
Iceberg viewの構成要素と仕組み
Iceberg viewのメタデータは、Icebergテーブルのメタデータと同様にview専用のmetadata fileに保存される。Iceberg Catalogはviewの最新のmetadata fileのロケーションをポイントしており、各エンジンはCatalogを起点としてviewを参照できる。
先ほど作成したview「pickup_agg」についてREST catalogに問い合わせると、metadata fileのロケーションを示す"metadata-location": "s3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.json"を含む情報が得られる。
curl -X GET "http://rest:8181/v1/namespaces/nyc/views/pickup_agg" { "metadata-location": "s3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.json", "metadata": { "view-uuid": "6891e4cd-ebac-4e4a-b511-dd3bcb3b6798", "format-version": 1, "location": "s3://warehouse/nyc/pickup_agg", "properties": { "comment": "Daily pickup counts", "engine_version": "Spark 3.5.1", "create_engine_version": "Spark 3.5.1", "spark.query-column-names": "count(1),tpep_pickup_datetime" }, "schemas": [ ... ], "current-version-id": 1, "versions": [ ... ], "version-log": [ { "timestamp-ms": 1711842668070, "version-id": 1 } ] } }
viewに対する変更は、新しいmetadata fileを作成し、current-version-idをatomicに入れ替えることで反映される。これにより、他のリーダーに影響を与えずに変更を適用でき、必要に応じて古いバージョンに戻すこともできるようになる。
つまり、Iceberg Catalogはテーブルと同様にviewを管理(作成、削除、リネーム、リスト)する責務を負う。viewを管理するinterfaceはViewCatalogが定義しており、それぞれのCatalog実装がクライアントからのview操作を受け付ける(REST CatalogであればRESTCatalog)。
従って、view機能のサポート有無は各Catalog実装に依存しており、2024/3/31時点ではJDBCとREST, NESSIE catalogのみがviewをサポートしている。
View Metadata
viewのmetadata fileには以下の情報が含まれる。
view-uuid: ビューを識別するUUIDformat-version: view formatのversionlocation: viewのベースロケーションschemas: スキーマの履歴情報current-version-id: ビューの現在のバージョンのID (version-id)versions: ビューのversionの配列(事項で詳述)version-log: viewに対する変更を追跡する配列。current-version-idが変更される度に作成され、そのtimestamp-msとversion-idが格納されるproperties: viewのプロパティ(文字列から文字列へのマップ)
先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のView Metadataの中身は以下のようになっている。
{ "view-uuid": "5b68cab8-be9b-457d-9fbf-7796e8289770", "format-version": 1, "location": "s3://warehouse/nyc/pickup_agg", "properties": { "comment": "Daily pickup counts", "engine_version": "Spark 3.5.1", "create_engine_version": "Spark 3.5.1", "spark.query-column-names": "count(1),tpep_pickup_datetime" }, "schemas": [ { "type": "struct", "schema-id": 0, "fields": [ { "id": 0, "name": "count(1)", "required": true, "type": "long" }, { "id": 1, "name": "tpep_pickup_datetime", "required": false, "type": "date" } ] }, { "type": "struct", "schema-id": 1, "fields": [ { "id": 0, "name": "pickup_count", "required": true, "type": "long", "doc": "Count of pickup" }, { "id": 1, "name": "pickup_date", "required": false, "type": "date" } ] } ], "current-version-id": 3, "versions": [...], "version-log": [ { "timestamp-ms": 1711814793006, "version-id": 1 }, { "timestamp-ms": 1711815139224, "version-id": 2 }, { "timestamp-ms": 1711816668205, "version-id": 3 } ] }
versionsフィールド
view metadata内の配列として、ビューの各バージョンに関する情報が格納される。各バージョンは以下の情報を持つ。
version-id: バージョンのIDschema-id: スキーマのIDtimestamp-ms: バージョンが作成された時のタイムスタンプ (エポックからのミリ秒)summary: バージョンに関するメタデータ。文字列から文字列へのマップ。viewを作成したengine-nameやengine-versionなどを格納するrepresentations: ビュー定義の表現のリスト(事項で詳述)default-catalog: SELECTの参照がカタログを含まない場合に使用するカタログ名default-namespace: SELECTの参照が単一の識別子である場合に使用する名前空間
先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のversionsフィールドの中身は以下のようになっている。
"versions": [ { "version-id": 1, "timestamp-ms": 1711842668070, "schema-id": 0, "summary": { "iceberg-version": "Apache Iceberg 1.5.0 (commit 2519ab43d654927802cc02e19c917ce90e8e0265)" }, "default-namespace": [ "nyc" ], "representations": [ ... ] } ]
representationsフィールド
representationsは、view定義を表現するドキュメントである。1つのviewバージョンは複数のrepresentationを持つことができ、エンジンはどのrepresentationsを使用するかを選べる。これにより、異なるエンジンがそれぞれの方法でビュー定義を解釈できるようになる。
現在サポートされているrepresentationは以下の通り。(sql representationのみ)
- sql representation: viewを定義するSELECT文とそのメタデータを格納する。1つのviewバージョンは、異なる方言の複数のSQL representationを持つことができるが、1つの方言につき1つのSQL representationしか持てない。
先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のrepresentationsフィールドの中身は以下のようになっている。
"representations": [ { "type": "sql", "sql": "SELECT\n COUNT(1), CAST(tpep_pickup_datetime AS DATE)\nFROM nyc.taxis\nGROUP BY CAST(tpep_pickup_datetime AS DATE)", "dialect": "spark" } ]
representationsフィールドの機構により、例えばSparkで作成されたviewのメタデータを、Trinoなど別のSQLエンジンから読み取ってviewを再現することができるようになる。つまり、SQLをベースとするエンジン間であれば、Iceberg viewを介してview定義を共有できるようになり、相互運用性が高まる。
Appendix: Viewサポートに関連するPR
Add view support for REST catalog
Add view support for JDBC catalog
Nessie: Support views for NessieCatalog
Core: Support view metadata compression
Spark: Support creating views via SQL
Spark: Support dropping views
Spark: Support renaming views
Spark: Add support for describing/showing views
Spark: Support altering view properties