Bering Note – formerly 流沙河鎮

情報技術系のこと書きます。

Icebergで時間に基づくパーティションを設定する前に読む記事

Apache Icebergのパーティショニングは、従来のHiveテーブルとは大きく異なる設計思想を持っています。この記事では、Icebergの「Transform」と「Hidden Partitioning」の仕組みを詳しく解説します。
その上で、時間に関わるカラム(date, timestamp, timestamptz, timestamp_ns, timestamptz_ns )にパーティションを設定する際、なぜカラムごとにパーティションに使用したい時間の最小単位のみを指定すれば良いのか、理由を説明します。

以下ではなく

CREATE TABLE events (
  event_id bigint,
  event_time timestamp,
  ...
)
USING iceberg
PARTITIONED BY (year(event_time), month(event_time), day(event_time));

以下が正しい

CREATE TABLE events (
  event_id bigint,
  event_time timestamp,
  ...
)
USING iceberg
PARTITIONED BY (day(event_time));

TL;DR

  • Icebergでdate, timestamp, timestamptz, timestamp_ns, timestamptz_ns 型のカラムへのtransform / hidden partitioningを設定する際は、パーティションに使用したい時間の最小単位のみを指定すれば良い
  • 1つのdate, timestamp, timestamptz, timestamp_ns, timestamptz_ns カラムに対してyear, month, day,hour transformを複数設定する必要はない(ValidationExceptionとなる)
  • 例えばdayパーティションすれば、yearmonthでの絞り込みも自動的に最適化される
  • つまり、Icebergにおけるtimestamp列に対するパーティションは日付カラムを「どの単位で分割するか」ではなく「どこまで分割するか」を指定するもの
  • Icebergのyearmonthdayhour transformは、タイムスタンプ列をUnix Epochからのオフセット(整数値)に変換してメタデータ内で管理する

パーティショニングとは

大規模なデータを扱うテーブルでは、パフォーマンスを向上させるためにパーティショニングが重要な役割を果たします。パーティショニングとは、テーブルのデータを特定のキー(列)に基づいて論理的に分割し、関連する行をグループ化する仕組みです。
例えば、数年分のログデータが格納されたテーブルから「2024年3月のデータ」を検索する場合を考えてみましょう。パーティショニングされていないテーブルでは、すべてのデータファイルをスキャンする必要があります。(ソートなどの他の最適化手法を使わない限り)
一方、ログデータの日付を記録した列でパーティショニングされている場合、2024年3月に該当するデータが格納されたファイルだけを効率的に読み込むことができます。これにより、クエリの実行時間が大幅に短縮されます。

Hiveテーブルにおけるパーティション構造

Hiveテーブルでは、パーティショニングは物理的なディレクトリ構造と密接に結びついています。例えば「年」「月」「日付」でパーティションを切る場合、以下のようなディレクトリ構造でデータファイルが配置されます。

/table/year=2024/month=3/day=30/data.parquet
/table/year=2024/month=3/day=31/data.parquet

これにより、テーブルへのクエリがyear, month, dayをフィルタ条件に含んでいれば(例:WHERE year=2024 AND month=3 AND day=2)、クエリエンジンは各パスの階層を段階的に絞り込むことで、効率的に必要なデータを探すことができます。
この物理的な構造により、Hiveテーブルのパーティションは以下のような特徴/課題を持ちます。

  • パーティション列(上記の例ではyearmonthday)はテーブルスキーマの一部として定義される必要がある
  • 元のデータにパーティションに用いるのに適切な単位の列(yearmonthdayなど)が存在しない場合、これらの列を物理的に追加する必要がある(timestamp型の列からyearの部分だけを切り出して列を追加するなど)
  • データの挿入時に、これらのパーティション列の値を明示的に指定または計算して設定する
  • パーティション方式を変更する場合、事実上のテーブルの作り直しが発生する
  • クエリを実行するユーザーは、テーブルがどのようにパーティショニングされているかを明示的に把握しておく必要がある(これは後述するIcebergでも本質的には同じですが、テーブルに元からある列をそのまま活用できるため、ユーザーはより直感的に列を指定できる可能性が高いです)

IcebergのTransformとHidden Partitioning

Icebergでは、Transform(変換)という仕組みを導入することで、より洗練されたパーティショニングを実現しています。

Transformの仕組み

Transformは、テーブルのメタデータ内部でテーブルの列の値を変換してパーティション値を生成する関数です。Transformは実際のレコードデータを変更するのではなく、Icebergのメタデータマニフェストファイル)内でパーティション情報を管理するために使用されるということです。

CREATE TABLE events (
  event_id bigint,
  event_time timestamp,
  ...
)
USING iceberg
PARTITIONED BY (day(event_time));

例えば、2024-03-30 15:30:00というタイムスタンプを持つカラムがあったとします。day(event_time)というTransformを適用すると、以下のような処理が行われます。

  1. データ書き込み時:Icebergはevent_timeの値のUnix Epochからの経過時間を計算してメタデータに記録する。ここで、日付以降の時間情報は切り捨てられる。
  2. クエリ実行時:ユーザーがevent_timeに対してフィルタ条件を指定すると、Icebergはメタデータ内の変換値を参照して、該当するパーティションを効率的に選択する。

日付・時刻に関するTransformとして、以下の4つが用意されています。

Transform名 説明 ソースタイプ 結果タイプ
year 日付またはタイムスタンプから年を抽出(1970年からの年数) date, timestamp, timestamptz, timestamp_ns, timestamptz_ns int
month 日付またはタイムスタンプから月を抽出(1970-01-01からの月数) date, timestamp, timestamptz, timestamp_ns, timestamptz_ns int
day 日付またはタイムスタンプから日を抽出(1970-01-01からの日数) date, timestamp, timestamptz, timestamp_ns, timestamptz_ns int
hour タイムスタンプから時間を抽出(1970-01-01 00:00:00からの時間数) timestamp, timestamptz, timestamp_ns, timestamptz_ns int

つまり、パーティション定義と実際のデータが分離されるため、Hiveテーブルのように実際のデータにyearmonthday列を追加するのではなく、メタデータのみで管理できるようになります。また、パーティション定義を変更する際も、データの再書き込みが不要で、Icebergのパーティション進化機能により柔軟に対応できます。

こうしたTransformを活用したパーティショニングの仕組みを、Icebergでは「Hidden Partitioning」と呼びます。
Transformには、ここで紹介した日付変換以外にも、文字列のハッシュ化やバケット分割など、さまざまな変換が用意されています。
詳細はIcebergのドキュメントを参照してください。

iceberg.apache.org

時間に関わるカラムへのTransformをどのように指定するべきか

Icebergでの時間に関わるカラムへのTransformによるパーティションを考える際、先ほどご紹介した「Unix Epochからの経過時間」による管理が重要な意味を持ちます。
Hive テーブルの延長で考えると、日付単位のパーティションは以下のように設定したくなるのではないかと思います。

CREATE TABLE events (
  event_id bigint,
  event_time timestamp,
  ...
)
USING iceberg
PARTITIONED BY (year(event_time), month(event_time), day(event_time));

しかし、Icebergでは、パーティションを適用したい最小単位に対してTransformを1つだけ指定するだけで十分です。例えば、day(event_time)を指定すれば、年単位や月単位のフィルタリングも自動的に最適化されます。以下のように指定するのが正しい使い方です。

CREATE TABLE events (
  event_id bigint,
  event_time timestamp,
  ...
)
USING iceberg
PARTITIONED BY (day(event_time));

このように、Icebergでは日付カラムに対してTransformを複数指定する必要はありません。なぜなら、Icebergの設計思想により、日付Transformは階層的な時間単位を自然に扱えるようになっているからです。

なぜ複数のTransformを指定する必要がないのか

Icebergの日付Transformの内部実装を見ると、すべての時間単位がUnix Epoch(1970年1月1日)からの経過時間として統一的に表現されていることがわかります。

// DateTimeUtil.javaより
private static int convertDays(int days, ChronoUnit granularity) {
    if (days >= 0) {
        LocalDate date = EPOCH_DAY.plusDays(days);
        return (int) granularity.between(EPOCH_DAY, date);
    } else {
        LocalDate date = EPOCH_DAY.plusDays(days + 1);
        return (int) granularity.between(EPOCH_DAY, date) - 1;
    }
}

github.com

各Transformは以下のような整数値を生成します。

// Years.java, Months.java, Days.java, Hours.java すべてで同じ実装
@Override
public Type getResultType(Type sourceType) {
    return Types.IntegerType.get(); // 32ビット整数を返す
}
  • year:Epochからの経過年数
  • month:Epochからの経過月数
  • day:Epochからの経過日数
  • hour:Epochからの経過時間数

これらは単純な経過時間を表す整数値であり、より細かい粒度の値には、より粗い粒度の情報が自然に含まれています。

Icebergメタデータ構造とパーティション管理

これらの整数値として表現されたパーティション値は、Icebergの階層的なメタデータ構造の中で管理されます。Icebergのメタデータは以下の3層構造になっています:

  1. メタデータファイル - テーブルの特定時点の状態を表す
  2. マニフェストリスト - スナップショットに含まれるマニフェストファイルの一覧と、各マニフェストの統計情報サマリー
  3. マニフェストファイル - 実際のデータファイルとその詳細なメタデータを記録

この階層構造の各レベルでパーティションの統計情報を保持することで、クエリ実行時により効率的にデータをフィルタリングできます。

マニフェストリストとマニフェストファイルの役割

マニフェストリストには、各マニフェストファイルのパーティション統計情報のサマリーが含まれます:

  • パーティションフィールドの最小値(lower_bound)と最大値(upper_bound)
  • null値やNaN値を含むかどうかの情報

マニフェストファイルは、複数のデータファイルの詳細情報を管理します:

  • 各データファイルのパス(Parquet、ORC等)
  • 各データファイルが属するパーティション
  • ファイルサイズ、レコード数などのメタデータ

例えば、day(event_time)パーティションされたテーブルでは:

マニフェストリスト:
  マニフェスト1: パーティション値の範囲 = 19,807〜19,810(2024年3月30日〜4月2日)
  マニフェスト2: パーティション値の範囲 = 19,811〜19,815(2024年4月3日〜4月7日)
  マニフェスト3: パーティション値の範囲 = 19,500〜19,550(2023年12月〜2024年2月)

マニフェストファイル1の中身:
  - data-001.parquet: パーティション値 = 19,807、レコード数 = 100,000
  - data-002.parquet: パーティション値 = 19,807、レコード数 = 85,000
  - data-003.parquet: パーティション値 = 19,808、レコード数 = 92,000
  ...

このように、1つのマニフェストファイルは複数の日付にまたがるデータファイルを含むことができ、クエリプランニング時は、まずマニフェストリストレベルで範囲をチェックし、該当する可能性のあるマニフェストファイルのみを読み込むことで、効率的なスキャンを実現しています。

クエリ実行時、Icebergはこれらの統計情報を使用して、読み込む必要のあるデータファイルを効率的に特定します。以下は、その範囲比較の実装例です:

// ManifestEvaluator.javaでの範囲比較の実装例
@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
    // パーティションフィールドの位置を取得
    int pos = Accessors.toPosition(ref.accessor());
    
    // そのフィールドの上限値(最大値)を取得
    ByteBuffer upperBound = stats.get(pos).upperBound();
    if (upperBound == null) {
        return ROWS_CANNOT_MATCH; // すべての値がnullの場合
    }
    
    // バイト列から実際の値(整数)に変換
    T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
    
    // フィルタ条件の値と比較
    int cmp = lit.comparator().compare(upper, lit.value());
    if (cmp < 0) {
        // 上限値がフィルタ条件より小さい場合、このマニフェストには該当データなし
        return ROWS_CANNOT_MATCH;
    }
    
    // このマニフェストに該当データが含まれる可能性あり
    return ROWS_MIGHT_MATCH;
}

github.com

階層的な時間単位の自然な扱い

この実装のポイントは、パーティション値が整数として格納されているため、数値の大小比較だけで範囲判定ができることです。文字列や複雑な日付フォーマットの解析は不要で、単純な整数比較により高速なフィルタリングが可能になります。 また、年月日時の階層関係が自然に保持される点も重要な特徴です。

例えば、day(event_time)パーティションされたテーブルに対して「2024年3月のデータ」を検索する場合:

  1. クエリの変換
    WHERE event_time >= '2024-03-01' AND event_time < '2024-04-01'

    Unix Epochからの経過日数に変換:19,783日目〜19,812日目

  2. マニフェストファイルのスキャン
    マニフェストの統計情報(上限値・下限値)と比較:

  3. 効率的なファイル選択
    該当する可能性のあるマニフェストのみを読み込み

このように、日単位でパーティションを作成していても、月単位や年単位のクエリが自動的に最適化されます。19,807という一つの整数値から、必要に応じて年(2024)、月(3)、日(30)の情報をすべて導き出せるためです。

この性質により、Icebergでは一つのTransformを指定するだけで、それより粗い粒度での検索も効率的に実行できます。dayパーティションを定義すれば、年単位や月単位のフィルタリングも自動的に最適化されるのです。

Sparkでは、同一のtimestampカラムに対して複数のTransformを指定しようとするとValidationExceptionが発生しますが、これは設計上の制約というよりも、「そもそも必要ないから」という理由によるものです。

まとめ

IcebergのHidden Partitioningにおける日付変換は洗練された設計になっており、Unix Epochからの経過時間という統一的な表現により、階層的な時間単位を自然に扱うことができるようになっています。