流沙河鎮

情報技術系のこと書きます。

【Iceberg 1.5新機能】viewの紹介 - 共通メタデータ形式とバージョン管理が実現する新たな可能性

はじめに

2024/3/11、Iceberg 1.5がリリースされた。Apache Icebergは、大規模なデータセットを管理するためのOpen Table Formatの一種である。Icebergを使用することで、ハイパフォーマンスで運用管理性と信頼性に優れたデータレイクを構築できる。Open Table FormatやIcebergの詳細は以下を参照して欲しい。

Iceberg 1.5で追加された重要な機能の一つがviewのサポートだ。Icebergにおけるviewの仕様を定めるIceberg View Specが定義され、いくつかのCatalog実装がviewの操作をサポートした。Iceberg viewによって、データ基盤上の複数のエンジン間で共通のフォーマットによってviewを管理できる。また、viewの変更を追跡し、バージョンを管理して必要に応じてロールバックすることも可能になる。
加えて、Iceberg 1.5では、viewを活かしたSparkのStored Prodecureとしてcreate_changelog_viewも追加された。このProdecureを使用すると、指定した範囲内にテーブルで発生した行レベルの変更(挿入、更新、削除)をキャプチャできるため、CDCを実装する上で役に立つ。
本記事では、Iceberg viewの仕組みと機能を紹介しつつ、実践的な活用例としてcreate_changelog_viewについて解説する。

Iceberg view概要

一般的なクエリエンジンにおけるviewの役割

多くのクエリエンジンでは、テーブルを単位としてデータを管理して、それに対して検索や集計を行う。一方で、要件によっては複数のテーブルを結合したり、必要な項目だけに絞り込んだりした結果を新たな「テーブル」として扱いたい場面がある。このような、他のテーブルから作られる仮想的なテーブルを「view」と呼ぶ。
viewは例えば以下のような目的で使われる。

  • 複雑なクエリを簡略化: 結合や集約を事前に行っておくことで、ユーザは簡単なSELECT文でデータを取得できる
  • セキュリティ向上: テーブルの特定のカラムだけをviewで公開することで、不要な情報へのアクセスを制限できる(何らかのアクセス制御機構の存在が前提)
  • データの独立性確保: テーブル定義が変更されてもアプリケーション側はviewを使用することでデータ構造の独立性が高まる(場合がある)
Iceberg viewを使ってみる

内部構造の説明に立ち入る前に、まずは大まかな挙動を観察してみよう。
ニューヨーク市が配布する、ニューヨーク市タクシー・リムジン委員会の移動記録データを格納するテーブル「taxis」があるとする。

taxis.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
taxis.show(3,vertical=True)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2021-04-01 00:00:18 
 tpep_dropoff_datetime | 2021-04-01 00:21:54 
 passenger_count       | 1.0                 
 trip_distance         | 8.4                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 79                  
 DOLocationID          | 116                 
 payment_type          | 1                   
 fare_amount           | 25.5                
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 5.85                
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 35.15               
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
-RECORD 1------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2021-04-01 00:42:37 
 tpep_dropoff_datetime | 2021-04-01 00:46:23 
 passenger_count       | 1.0                 
 trip_distance         | 0.9                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 75                  
 DOLocationID          | 236                 
 payment_type          | 2                   
 fare_amount           | 5.0                 
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 8.8                 
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
-RECORD 2------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2021-04-01 00:57:56 
 tpep_dropoff_datetime | 2021-04-01 01:08:22 
 passenger_count       | 1.0                 
 trip_distance         | 3.4                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 236                 
 DOLocationID          | 168                 
 payment_type          | 2                   
 fare_amount           | 11.5                
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 15.3                
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
only showing top 3 rows

ここで、1日ごとの移動数を集計するview「nyc.pickup_agg」を定義する。

spark.sql('''
CREATE OR REPLACE VIEW nyc.pickup_agg (
    pickup_count COMMENT 'Count of pickup',
    pickup_date
)
COMMENT 'Daily pickup counts'
AS
SELECT
    COUNT(1), CAST(tpep_pickup_datetime AS DATE)
FROM nyc.taxis
GROUP BY CAST(tpep_pickup_datetime AS DATE)
''')

pickup_dateごとの乗車数を集計するviewが出来た。

spark.sql('''
SELECT * from nyc.pickup_agg limit 3
''').show()

+------------+-----------+
|pickup_count|pickup_date|
+------------+-----------+
|       82210| 2021-04-29|
|       78858| 2021-04-24|
|           5| 2009-01-01|
+------------+-----------+
Iceberg viewのコンセプト
メタデータ形式の共有

従来、クエリエンジンはview定義を独自の形式でメタストアに保存してきたため、他のエンジンから参照・変更することが難しかった。そこで、Iceberg viewはtableと同様にviewのメタデータを共通フォーマットとして保存することで、エンジン間でのviewの共有を可能にする。

viewのバージョン管理

viewの変更履歴を追跡して、必要に応じて過去のバージョンにロールバックできるようにする。(バージョン管理はテーブルと同様の仕組みで、各バージョンのメタデータをatomicに入れ替えることで実現する)

Iceberg viewの構成要素と仕組み

Iceberg viewのメタデータは、Icebergテーブルのメタデータと同様にview専用のmetadata fileに保存される。Iceberg Catalogはviewの最新のmetadata fileのロケーションをポイントしており、各エンジンはCatalogを起点としてviewを参照できる。

先ほど作成したview「pickup_agg」についてREST catalogに問い合わせると、metadata fileのロケーションを示す"metadata-location": "s3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.json"を含む情報が得られる。

curl -X GET "http://rest:8181/v1/namespaces/nyc/views/pickup_agg"

{
 "metadata-location": "s3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.json",
 "metadata": {
   "view-uuid": "6891e4cd-ebac-4e4a-b511-dd3bcb3b6798",
   "format-version": 1,
   "location": "s3://warehouse/nyc/pickup_agg",
   "properties": {
     "comment": "Daily pickup counts",
     "engine_version": "Spark 3.5.1",
     "create_engine_version": "Spark 3.5.1",
     "spark.query-column-names": "count(1),tpep_pickup_datetime"
   },
   "schemas": [
     ...
   ],
   "current-version-id": 1,
   "versions": [
     ...
   ],
   "version-log": [
     {
       "timestamp-ms": 1711842668070,
       "version-id": 1
     }
   ]
 }
}

viewに対する変更は、新しいmetadata fileを作成し、current-version-idをatomicに入れ替えることで反映される。これにより、他のリーダーに影響を与えずに変更を適用でき、必要に応じて古いバージョンに戻すこともできるようになる。
つまり、Iceberg Catalogはテーブルと同様にviewを管理(作成、削除、リネーム、リスト)する責務を負う。viewを管理するinterfaceはViewCatalogが定義しており、それぞれのCatalog実装がクライアントからのview操作を受け付ける(REST CatalogであればRESTCatalog)。
従って、view機能のサポート有無は各Catalog実装に依存しており、2024/3/31時点ではJDBCとREST, NESSIE catalogのみがviewをサポートしている。

View Metadata

viewのmetadata fileには以下の情報が含まれる。

  • view-uuid: ビューを識別するUUID
  • format-version: view formatのversion
  • location: viewのベースロケーション
  • schemas: スキーマの履歴情報
  • current-version-id: ビューの現在のバージョンのID (version-id)
  • versions: ビューのversionの配列(事項で詳述)
  • version-log: viewに対する変更を追跡する配列。current-version-idが変更される度に作成され、そのtimestamp-msversion-idが格納される
  • properties: viewのプロパティ(文字列から文字列へのマップ)

先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のView Metadataの中身は以下のようになっている。

{
 "view-uuid": "5b68cab8-be9b-457d-9fbf-7796e8289770",
 "format-version": 1,
 "location": "s3://warehouse/nyc/pickup_agg",
 "properties": {
   "comment": "Daily pickup counts",
   "engine_version": "Spark 3.5.1",
   "create_engine_version": "Spark 3.5.1",
   "spark.query-column-names": "count(1),tpep_pickup_datetime"
 },
 "schemas": [
   {
     "type": "struct",
     "schema-id": 0,
     "fields": [
       {
         "id": 0,
         "name": "count(1)",
         "required": true,
         "type": "long"
       },
       {
         "id": 1,
         "name": "tpep_pickup_datetime",
         "required": false,
         "type": "date"
       }
     ]
   },
   {
     "type": "struct",
     "schema-id": 1,
     "fields": [
       {
         "id": 0,
         "name": "pickup_count",
         "required": true,
         "type": "long",
         "doc": "Count of pickup"
       },
       {
         "id": 1,
         "name": "pickup_date",
         "required": false,
         "type": "date"
       }
     ]
   }
 ],
 "current-version-id": 3,
 "versions": [...],
 "version-log": [
   {
     "timestamp-ms": 1711814793006,
     "version-id": 1
   },
   {
     "timestamp-ms": 1711815139224,
     "version-id": 2
   },
   {
     "timestamp-ms": 1711816668205,
     "version-id": 3
   }
 ]
}
versionsフィールド

view metadata内の配列として、ビューの各バージョンに関する情報が格納される。各バージョンは以下の情報を持つ。

  • version-id: バージョンのID
  • schema-id: スキーマのID
  • timestamp-ms: バージョンが作成された時のタイムスタンプ (エポックからのミリ秒)
  • summary: バージョンに関するメタデータ。文字列から文字列へのマップ。viewを作成したengine-nameengine-versionなどを格納する
  • representations: ビュー定義の表現のリスト(事項で詳述)
  • default-catalog: SELECTの参照がカタログを含まない場合に使用するカタログ名
  • default-namespace: SELECTの参照が単一の識別子である場合に使用する名前空間

先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のversionsフィールドの中身は以下のようになっている。

"versions": [
 {
   "version-id": 1,
   "timestamp-ms": 1711842668070,
   "schema-id": 0,
   "summary": {
     "iceberg-version": "Apache Iceberg 1.5.0 (commit 2519ab43d654927802cc02e19c917ce90e8e0265)"
   },
   "default-namespace": [
     "nyc" 
   ],
   "representations": [
     ...
   ]
 }
]
representationsフィールド

representationsは、view定義を表現するドキュメントである。1つのviewバージョンは複数のrepresentationを持つことができ、エンジンはどのrepresentationsを使用するかを選べる。これにより、異なるエンジンがそれぞれの方法でビュー定義を解釈できるようになる。

現在サポートされているrepresentationは以下の通り。(sql representationのみ)

  • sql representation: viewを定義するSELECT文とそのメタデータを格納する。1つのviewバージョンは、異なる方言の複数のSQL representationを持つことができるが、1つの方言につき1つのSQL representationしか持てない。
    • type: sql representationでは常に"sql"
    • sql: ビューを定義するSELECT文の文字列
    • dialect: SELECT文の方言を示す文字列(例: "spark"や"trino")

先ほど作成したview「pickup_agg」のmetadata files3://warehouse/nyc/pickup_agg/metadata/00000-89feab5c-a8ab-4c53-8497-ab6ba11a6f4a.gz.metadata.jsonより、実際のrepresentationsフィールドの中身は以下のようになっている。

"representations": [
 {
   "type": "sql",
   "sql": "SELECT\n COUNT(1), CAST(tpep_pickup_datetime AS DATE)\nFROM nyc.taxis\nGROUP BY CAST(tpep_pickup_datetime AS DATE)",
   "dialect": "spark"
 }
]

representationsフィールドの機構により、例えばSparkで作成されたviewのメタデータを、Trinoなど別のSQLエンジンから読み取ってviewを再現することができるようになる。つまり、SQLをベースとするエンジン間であれば、Iceberg viewを介してview定義を共有できるようになり、相互運用性が高まる。

「create_changelog_view」プロシージャによるIcebergのCDC

create_changelog_view

create_changelog_viewは、Iceberg 1.5で導入されたSpark用のストアドプロシージャである。指定したicebergテーブルで発生した行レベルの変更(挿入、更新、削除)を含むviewを作成する機能を提供する。これにより、Change Data Capture(CDC)、つまりデータの変更を継続的にキャプチャして追跡することが可能になる。
CDCは、データの監査、外部システムとの同期、増分処理などに活用される。create_changelog_viewは、Icebergのスナップショット機構と新機能のviewを活用し、指定された期間内の変更を効率的に取得できる。
なお、ここでのCDCというのは、データ基盤(iceberg)のデータソース(非Iceberg)からのCDC(data ingestion)ではなく、Icebergベースのデータ基盤自体のCDCを指している点に注意して欲しい。

create_changelog_viewの使い方

引数とアウトプットの概要を紹介する。詳細はドキュメントとコードを参照して欲しい。

引数
引数名 必須 タイプ 説明
table ✔️ 文字列 変更のキャプチャ元となるテーブルの名前
changelog_view 文字列 作成するビューの名前(省略した場合、デフォルトの名前が使用される)
options マップ Sparkのreadオプションを指定する。よく使用されるオプションは以下の通り
- start-snapshot-id:開始スナップショットID(exclusive)デフォルトはテーブルの最初のスナップショット
- end-snapshot-id:終了スナップショットID(inclusive)デフォルトはテーブルの最新のスナップショット
- start-timestamp:開始タイムスタンプ(exclusive)デフォルトはテーブルの最初のスナップショット
- end-timestamp:終了タイムスタンプ(inclusive)デフォルトはテーブルの最新のスナップショット
net_changes ブール値 中間の変更を除去し、最終的な変更のみを出力するかどうかを指定する。デフォルトはfalse
compute_updates ブール値 更新前後のイメージを計算するかどうかを指定。デフォルトはfalse
identifier_columns 配列 更新の計算に使用する識別子カラムのリストを指定。compute_updatesがtrueに設定されていて、identifier_columnsが指定されていない場合、テーブルの現在の識別子フィールドが使用される。識別子カラムは、挿入レコードと削除レコードが同じ行を参照しているかどうかを判断するために使用される
アウトプット
出力名 タイプ 説明
changelog_view 文字列 作成されたビューの名前

作成されたビューには、CDC対象のテーブルのカラムに加えて、CDCメタデータカラムが含まれる。

カラム名 説明
_change_type 変更の種類。INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTERのいずれか
_change_ordinal 変更の順序
_commit_snapshot_id 変更が発生したスナップショットID
create_changelog_viewの実行例

nyc.taxisテーブルのchangelog viewを作成する。ここではスナップショットの範囲を指定していないため、テーブルの最初のスナップショットから最新のスナップショットまでのviewが作成される。
また、viewの名前を指定していないため、view名はデフォルト名称である[CDC対象テーブル]_changesになる。

spark.sql("""CALL demo.system.create_changelog_view(
  table => 'nyc.taxis'
)""")

view「taxis_changes」が作成された

spark.sql("""
SHOW VIEWS;
""").show()

+---------+-------------+-----------+
|namespace|     viewName|isTemporary|
+---------+-------------+-----------+
|      nyc|   pickup_agg|      false|
|         |taxis_changes|       true|
+---------+-------------+-----------+

spark.sql("""
DESC taxis_changes;
""").show(n=30,truncate=False)

+---------------------+-------------+-------+
|col_name             |data_type    |comment|
+---------------------+-------------+-------+
|VendorID             |bigint       |NULL   |
|tpep_pickup_datetime |timestamp_ntz|NULL   |
|tpep_dropoff_datetime|timestamp_ntz|NULL   |
|passenger_count      |double       |NULL   |
|trip_distance        |double       |NULL   |
|RatecodeID           |double       |NULL   |
|store_and_fwd_flag   |string       |NULL   |
|PULocationID         |bigint       |NULL   |
|DOLocationID         |bigint       |NULL   |
|payment_type         |bigint       |NULL   |
|fare_amount          |double       |NULL   |
|extra                |double       |NULL   |
|mta_tax              |double       |NULL   |
|tip_amount           |double       |NULL   |
|tolls_amount         |double       |NULL   |
|improvement_surcharge|double       |NULL   |
|total_amount         |double       |NULL   |
|congestion_surcharge |double       |NULL   |
|airport_fee          |double       |NULL   |
|_change_type         |string       |NULL   |
|_change_ordinal      |int          |NULL   |
|_commit_snapshot_id  |bigint       |NULL   |
+---------------------+-------------+-------+

taxis_changesには、各レコードに対する変更が発生したスナップショットID、順序、変更の種類が格納される。

spark.sql("""
SELECT VendorID, tpep_pickup_datetime, passenger_count, _change_type, _change_ordinal, _commit_snapshot_id FROM taxis_changes limit 5;
""").show()

+--------+--------------------+---------------+------------+---------------+-------------------+
|VendorID|tpep_pickup_datetime|passenger_count|_change_type|_change_ordinal|_commit_snapshot_id|
+--------+--------------------+---------------+------------+---------------+-------------------+
|       1| 2021-04-01 00:03:58|            1.0|      INSERT|              0|3838051412002077211|
|       1| 2021-04-01 00:05:27|            2.0|      INSERT|              0|3838051412002077211|
|       1| 2021-04-01 00:05:47|            1.0|      INSERT|              0|3838051412002077211|
|       1| 2021-04-01 00:07:47|            1.0|      INSERT|              0|3838051412002077211|
|       1| 2021-04-01 00:08:44|            1.0|      INSERT|              0|3838051412002077211|
+--------+--------------------+---------------+------------+---------------+-------------------+
Tips
Carry-over Rows

Carry-over rowsは、copy-on-writeを使用する際の、行レベル操作(MERGE、UPDATE、DELETE)の結果として生じる行である。
例えば、row1(id=1, name='Alice')とrow2(id=2, name='Bob')を含むファイルがあるとする。row2のcopy-on-write削除では、このファイルを消去し、row1を新しいファイルに保持する必要がある。changelog tableは、これを以下のような一対の行として報告する。

id name _change_type
1 Alice DELETE
1 Alice INSERT

多くの場合、carry-over rowsはCDCにとって関心の対象ではないため、create_changelog_viewプロシージャは、生成するviewからcarry-over rowsを除外し、実際のデータ変更のみを含むように振る舞う。

Pre/Post Update Images

create_changelog_viewプロシージャは、設定されている場合、更新前後のイメージ(pre/post update images)を計算する。更新前後のイメージは、削除行と挿入行のペアから変換される。識別子カラム(identifier columns)は、挿入レコードと削除レコードが同じ行を参照しているかどうかを判断するために使用される。2つのレコードが識別子カラムで同じ値を共有している場合、それらは同じ行の更新前後の状態と見なされる。識別子フィールドは、テーブルスキーマで設定するか、プロシージャのパラメータとして入力できる。

以下の例は、識別子カラム(id)を使用した更新前後のイメージの計算を示している。idが同じ行の削除と挿入は、単一の更新操作として扱われる。具体的には、以下のような一対の行があるとする。

id name _change_type
3 Robert DELETE
3 Dan INSERT

この場合、プロシージャは更新前の行をUPDATE_BEFOREイメージ、更新後の行をUPDATE_AFTERイメージとしてマークし、以下のような更新前後のイメージを生成する。

id name _change_type
3 Robert UPDATE_BEFORE
3 Dan UPDATE_AFTER
ユースケースのアイデア

create_changelog_viewのユースケースとして、以下のようなアイデアが考えられる。

  • データのオーディット(監査):create_changelog_viewを使って、重要なデータの変更履歴をキャプチャし、監査証跡を作成できる。これにより、いつ、誰が、どのようなデータを変更したかを追跡できる
  • 外部システムとのデータ同期:CDCを利用して、データレイクの変更を外部システムにリアルタイムに反映させることができる
  • データのバージョン管理:スナップショット間の変更を追跡することで、データのバージョン管理が簡単になる
  • 異常検知:create_changelog_viewで取得した変更パターンを分析することで、通常とは異なる異常な変更を検出できる。これにより、データ品質の問題やセキュリティ上の脅威を早期に発見できる

おわりに

本記事では、Iceberg 1.5で導入されたIceberg viewの概要と、その実践的な活用方法の一つである「create_changelog_view」プロシージャについて解説した。 Iceberg viewは、様々なSQLエンジン間でviewを共有し、viewのバージョン管理を可能にする仕組みである。これにより、データ基盤上の複数のエンジン間での相互運用性が高まり、より柔軟で効率的なデータ分析基盤の構築が可能になる。

また、「create_changelog_view」プロシージャを活用することで、Icebergテーブル内の行レベルの変更をキャプチャし、CDCを実現できる。データの監査、外部システムとの同期、増分処理など、様々な用途に応用が広がっていくことが期待される。

Icebergのviewサポートについては、今後より幅広いcatalog実装でのサポートが期待される。またマテリアライズド・ビューについてもproposalの検討が進行中だ。今後もIcebergの動向とその活用方法に注目していきたい。

データ基盤の設計・運用に携わる方々にとって、本記事がIceberg viewとCDCの理解と活用の一助となることを願っている。

Appendix: Viewサポートに関連するPR

Add view support for REST catalog
Add view support for JDBC catalog
Nessie: Support views for NessieCatalog
Core: Support view metadata compression
Spark: Support creating views via SQL
Spark: Support dropping views
Spark: Support renaming views
Spark: Add support for describing/showing views
Spark: Support altering view properties