Bering Note – formerly 流沙河鎮

情報技術系のこと書きます。

【Iceberg 1.5新機能】viewの紹介 - 共通メタデータ形式とバージョン管理が実現する新たな可能性

はじめに

2024/3/11、Iceberg 1.5がリリースされた。Apache Icebergは、大規模なデータセットを管理するためのOpen Table Formatの一種である。Icebergを使用することで、ハイパフォーマンスで運用管理性と信頼性に優れたデータレイクを構築できる。Open Table FormatやIcebergの詳細は以下を参照して欲しい。

Iceberg 1.5で追加された重要な機能の一つがviewのサポートだ。Icebergにおけるviewの仕様を定めるIceberg View Specが定義され、いくつかのCatalog実装がviewの操作をサポートした。Iceberg viewによって、データ基盤上の複数のエンジン間で共通のフォーマットによってviewを管理できる。また、viewの変更を追跡し、バージョンを管理して必要に応じてロールバックすることも可能になる。
加えて、Iceberg 1.5では、viewを活かしたSparkのStored Prodecureとしてcreate_changelog_viewも追加された。このProdecureを使用すると、指定した範囲内にテーブルで発生した行レベルの変更(挿入、更新、削除)をキャプチャできるため、CDCを実装する上で役に立つ。
本記事では、Iceberg viewの仕組みと機能を紹介しつつ、実践的な活用例としてcreate_changelog_viewについて解説する。

Iceberg view概要

一般的なクエリエンジンにおけるviewの役割

多くのクエリエンジンでは、テーブルを単位としてデータを管理して、それに対して検索や集計を行う。一方で、要件によっては複数のテーブルを結合したり、必要な項目だけに絞り込んだりした結果を新たな「テーブル」として扱いたい場面がある。このような、他のテーブルから作られる仮想的なテーブルを「view」と呼ぶ。
viewは例えば以下のような目的で使われる。

  • 複雑なクエリを簡略化: 結合や集約を事前に行っておくことで、ユーザは簡単なSELECT文でデータを取得できる
  • セキュリティ向上: テーブルの特定のカラムだけをviewで公開することで、不要な情報へのアクセスを制限できる(何らかのアクセス制御機構の存在が前提)
  • データの独立性確保: テーブル定義が変更されてもアプリケーション側はviewを使用することでデータ構造の独立性が高まる(場合がある)
Iceberg viewを使ってみる

内部構造の説明に立ち入る前に、まずは大まかな挙動を観察してみよう。
ニューヨーク市が配布する、ニューヨーク市タクシー・リムジン委員会の移動記録データを格納するテーブル「taxis」があるとする。

taxis.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
taxis.show(3,vertical=True)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2021-04-01 00:00:18 
 tpep_dropoff_datetime | 2021-04-01 00:21:54 
 passenger_count       | 1.0                 
 trip_distance         | 8.4                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 79                  
 DOLocationID          | 116                 
 payment_type          | 1                   
 fare_amount           | 25.5                
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 5.85                
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 35.15               
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
-RECORD 1------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2021-04-01 00:42:37 
 tpep_dropoff_datetime | 2021-04-01 00:46:23 
 passenger_count       | 1.0                 
 trip_distance         | 0.9                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 75                  
 DOLocationID          | 236                 
 payment_type          | 2                   
 fare_amount           | 5.0                 
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 8.8                 
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
-RECORD 2------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2021-04-01 00:57:56 
 tpep_dropoff_datetime | 2021-04-01 01:08:22 
 passenger_count       | 1.0                 
 trip_distance         | 3.4                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 236                 
 DOLocationID          | 168                 
 payment_type          | 2                   
 fare_amount           | 11.5                
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 15.3                
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
only showing top 3 rows

ここで、1日ごとの移動数を集計するview「nyc.pickup_agg」を定義する。

spark.sql('''
CREATE OR REPLACE VIEW nyc.pickup_agg (
    pickup_count COMMENT 'Count of pickup',
    pickup_date
)
COMMENT 'Daily pickup counts'
AS
SELECT
    COUNT(1), CAST(tpep_pickup_datetime AS DATE)
FROM nyc.taxis
GROUP BY CAST(tpep_pickup_datetime AS DATE)
''')

pickup_dateごとの乗車数を集計するviewが出来た。

spark.sql('''
SELECT * from nyc.pickup_agg limit 3
''').show()

+------------+-----------+
|pickup_count|pickup_date|
+------------+-----------+
|       82210| 2021-04-29|
|       78858| 2021-04-24|
|           5| 2009-01-01|
+------------+-----------+
Iceberg viewのコンセプト
メタデータ形式の共有

従来、クエリエンジンはview定義を独自の形式でメタストアに保存してきたため、他のエンジンから参照・変更することが難しかった。そこで、Iceberg viewはtableと同様にviewのメタデータを共通フォーマットとして保存することで、エンジン間でのviewの共有を可能にする。

viewのバージョン管理

viewの変更履歴を追跡して、必要に応じて過去のバージョンにロールバックできるようにする。(バージョン管理はテーブルと同様の仕組みで、各バージョンのメタデータをatomicに入れ替えることで実現する)

Iceberg viewの構成要素と仕組み

Iceberg viewのメタデータは、Icebergテーブルのメタデータと同様にview専用のmetadata fileに保存される。Iceberg Catalogはviewの最新のmetadata fileのロケーションをポイントしており、各エンジンはCatalogを起点としてviewを参照できる。

先ほど作成したview「pickup_agg」についてREST catalogに問い合わせると、metadata fileのロケーションを示す"metadata-location": "s3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.json"を含む情報が得られる。

curl -X GET "http://rest:8181/v1/namespaces/nyc/views/pickup_agg"

{
 "metadata-location": "s3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.json",
 "metadata": {
   "view-uuid": "6891e4cd-ebac-4e4a-b511-dd3bcb3b6798",
   "format-version": 1,
   "location": "s3://warehouse/nyc/pickup_agg",
   "properties": {
     "comment": "Daily pickup counts",
     "engine_version": "Spark 3.5.1",
     "create_engine_version": "Spark 3.5.1",
     "spark.query-column-names": "count(1),tpep_pickup_datetime"
   },
   "schemas": [
     ...
   ],
   "current-version-id": 1,
   "versions": [
     ...
   ],
   "version-log": [
     {
       "timestamp-ms": 1711842668070,
       "version-id": 1
     }
   ]
 }
}

viewに対する変更は、新しいmetadata fileを作成し、current-version-idをatomicに入れ替えることで反映される。これにより、他のリーダーに影響を与えずに変更を適用でき、必要に応じて古いバージョンに戻すこともできるようになる。
つまり、Iceberg Catalogはテーブルと同様にviewを管理(作成、削除、リネーム、リスト)する責務を負う。viewを管理するinterfaceはViewCatalogが定義しており、それぞれのCatalog実装がクライアントからのview操作を受け付ける(REST CatalogであればRESTCatalog)。
従って、view機能のサポート有無は各Catalog実装に依存しており、2024/3/31時点ではJDBCとREST, NESSIE catalogのみがviewをサポートしている。

View Metadata

viewのmetadata fileには以下の情報が含まれる。

  • view-uuid: ビューを識別するUUID
  • format-version: view formatのversion
  • location: viewのベースロケーション
  • schemas: スキーマの履歴情報
  • current-version-id: ビューの現在のバージョンのID (version-id)
  • versions: ビューのversionの配列(事項で詳述)
  • version-log: viewに対する変更を追跡する配列。current-version-idが変更される度に作成され、そのtimestamp-msversion-idが格納される
  • properties: viewのプロパティ(文字列から文字列へのマップ)

先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のView Metadataの中身は以下のようになっている。

{
 "view-uuid": "5b68cab8-be9b-457d-9fbf-7796e8289770",
 "format-version": 1,
 "location": "s3://warehouse/nyc/pickup_agg",
 "properties": {
   "comment": "Daily pickup counts",
   "engine_version": "Spark 3.5.1",
   "create_engine_version": "Spark 3.5.1",
   "spark.query-column-names": "count(1),tpep_pickup_datetime"
 },
 "schemas": [
   {
     "type": "struct",
     "schema-id": 0,
     "fields": [
       {
         "id": 0,
         "name": "count(1)",
         "required": true,
         "type": "long"
       },
       {
         "id": 1,
         "name": "tpep_pickup_datetime",
         "required": false,
         "type": "date"
       }
     ]
   },
   {
     "type": "struct",
     "schema-id": 1,
     "fields": [
       {
         "id": 0,
         "name": "pickup_count",
         "required": true,
         "type": "long",
         "doc": "Count of pickup"
       },
       {
         "id": 1,
         "name": "pickup_date",
         "required": false,
         "type": "date"
       }
     ]
   }
 ],
 "current-version-id": 3,
 "versions": [...],
 "version-log": [
   {
     "timestamp-ms": 1711814793006,
     "version-id": 1
   },
   {
     "timestamp-ms": 1711815139224,
     "version-id": 2
   },
   {
     "timestamp-ms": 1711816668205,
     "version-id": 3
   }
 ]
}
versionsフィールド

view metadata内の配列として、ビューの各バージョンに関する情報が格納される。各バージョンは以下の情報を持つ。

  • version-id: バージョンのID
  • schema-id: スキーマのID
  • timestamp-ms: バージョンが作成された時のタイムスタンプ (エポックからのミリ秒)
  • summary: バージョンに関するメタデータ。文字列から文字列へのマップ。viewを作成したengine-nameengine-versionなどを格納する
  • representations: ビュー定義の表現のリスト(事項で詳述)
  • default-catalog: SELECTの参照がカタログを含まない場合に使用するカタログ名
  • default-namespace: SELECTの参照が単一の識別子である場合に使用する名前空間

先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のversionsフィールドの中身は以下のようになっている。

"versions": [
 {
   "version-id": 1,
   "timestamp-ms": 1711842668070,
   "schema-id": 0,
   "summary": {
     "iceberg-version": "Apache Iceberg 1.5.0 (commit 2519ab43d654927802cc02e19c917ce90e8e0265)"
   },
   "default-namespace": [
     "nyc" 
   ],
   "representations": [
     ...
   ]
 }
]
representationsフィールド

representationsは、view定義を表現するドキュメントである。1つのviewバージョンは複数のrepresentationを持つことができ、エンジンはどのrepresentationsを使用するかを選べる。これにより、異なるエンジンがそれぞれの方法でビュー定義を解釈できるようになる。

現在サポートされているrepresentationは以下の通り。(sql representationのみ)

  • sql representation: viewを定義するSELECT文とそのメタデータを格納する。1つのviewバージョンは、異なる方言の複数のSQL representationを持つことができるが、1つの方言につき1つのSQL representationしか持てない。
    • type: sql representationでは常に"sql"
    • sql: ビューを定義するSELECT文の文字列
    • dialect: SELECT文の方言を示す文字列(例: "spark"や"trino")

先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のrepresentationsフィールドの中身は以下のようになっている。

"representations": [
 {
   "type": "sql",
   "sql": "SELECT\n COUNT(1), CAST(tpep_pickup_datetime AS DATE)\nFROM nyc.taxis\nGROUP BY CAST(tpep_pickup_datetime AS DATE)",
   "dialect": "spark"
 }
]

representationsフィールドの機構により、例えばSparkで作成されたviewのメタデータを、Trinoなど別のSQLエンジンから読み取ってviewを再現することができるようになる。つまり、SQLをベースとするエンジン間であれば、Iceberg viewを介してview定義を共有できるようになり、相互運用性が高まる。

Appendix: Viewサポートに関連するPR

Add view support for REST catalog
Add view support for JDBC catalog
Nessie: Support views for NessieCatalog
Core: Support view metadata compression
Spark: Support creating views via SQL
Spark: Support dropping views
Spark: Support renaming views
Spark: Add support for describing/showing views
Spark: Support altering view properties