流沙河鎮

情報技術系のこと書きます。

PyIcebergを試す

導入

pip install --upgrade pip
pip install "pyiceberg[s3fs,hive]" 

用途に応じて以下を依存関係に加えられます.

キー 説明
hive Hiveメタストアのサポート
glue AWS Glueのサポート
dynamodb AWS DynamoDBのサポート
sql-postgres PostgreSQLによってバックアップされるSQLカタログのサポート
sql-sqlite SQLiteによってバックアップされるSQLカタログのサポート
pyarrow オブジェクトストアと対話するためのFileIO実装としてのPyArrow
pandas PyArrowとPandasの両方をインストール
duckdb PyArrowとDuckDBの両方をインストール
ray PyArrow、Pandas、およびRayをインストール
daft Daftをインストール
s3fs オブジェクトストアと対話するためのFileIO実装としてのS3FS
adlfs オブジェクトストアと対話するためのFileIO実装としてのADLFS
snappy snappy Avro圧縮のサポート
gcsfs オブジェクトストアと対話するためのFileIO実装としてのGCSFS

本記事では追加で以下を利用します。

sqlalchemy pyarrow

サンプルデータ準備

定番のTLC Trip Record Dataを使います。

mkdir data
curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o data/yellow_tripdata_2023-01.parquet/yellow_tripdata_2023-01.parquet

Icebergカタログを作成する

今回は検証用にライトに使える、SQLite ベースの SQLCatalog を使用します。
Icebergカタログについて詳しく知りたい方は以下をご参照ください。

bering.hatenadiary.com

Icebergカタログ、テーブルに関する情報を保存するディレクトリを作成します。

mkdir warehouse

SQLCatalogを作成します。

from pyiceberg.catalog.sql import SqlCatalog

warehouse_path = "./warehouse"
catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)

warehouse ディレクトリに pyiceberg_catalog.db が作成されました。

pyiceberg_env ❯ ls warehouse 
pyiceberg_catalog.db

Icebergテーブルを作成する

サンプルデータをロードして taxi_dataset テーブルを作成します。

import pyarrow.parquet as pq

# Read a parquet file as a pyarrow table
df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")

catalog.create_namespace("default")

table = catalog.create_table(
    "default.taxi_dataset",
    schema=df.schema,
)

warehouse ディレクトリにIceberg テーブルを構成するメタデータ、データファイルが作成されました。
Iceberg テーブルのアーキテクチャについて詳しく知りたい方は以下をご参照ください。

bering.hatenadiary.com

pyiceberg_env ❯ tree warehouse/default.db/taxi_dataset
warehouse/default.db/taxi_dataset
├── data
│   ├── 00000-0-8380a671-5f0e-4766-848b-e575944b87f5.parquet
│   └── 00000-0-9f292d06-4479-4a44-816c-1dc8742767ec.parquet
└── metadata
    ├── 00000-4a71b880-2fb2-4ee3-8334-9b7378891b01.metadata.json
    ├── 00001-b99b531e-a3f0-4e8c-9d49-a6043b372f7b.metadata.json
    ├── 00002-87324e52-945e-4fec-a716-eae47383c12f.metadata.json
    ├── 00003-ef08bd54-6ca8-4d5e-8b44-3e1c0e161b63.metadata.json
    ├── 6e5558ef-c56f-4914-a6cd-8b4d29804fa6-m0.avro
    ├── 8380a671-5f0e-4766-848b-e575944b87f5-m0.avro
    ├── 9f292d06-4479-4a44-816c-1dc8742767ec-m0.avro
    ├── snap-4809862839345096711-0-6e5558ef-c56f-4914-a6cd-8b4d29804fa6.avro
    ├── snap-6164888634227151801-0-8380a671-5f0e-4766-848b-e575944b87f5.avro
    └── snap-6534028523732720220-0-9f292d06-4479-4a44-816c-1dc8742767ec.avro

テーブルを pandas として参照してみます。

import pyarrow.parquet as pq
print(table.scan().to_pandas().head(10))
 VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  ...  improvement_surcharge total_amount  congestion_surcharge  airport_fee  tip_per_mile
0         2  2023-01-01 00:32:10  2023-01-01 00:40:36               1.0           0.97  ...                  1.0          14.30                  2.5          0.00      0.000000
1         2  2023-01-01 00:55:08  2023-01-01 01:01:27               1.0           1.10  ...                  1.0          16.90                  2.5          0.00      3.636364
2         2  2023-01-01 00:25:04  2023-01-01 00:37:49               1.0           2.51  ...                  1.0          34.90                  2.5          0.00      5.976096
3         1  2023-01-01 00:03:48  2023-01-01 00:13:25               0.0           1.90  ...                  1.0          20.85                  0.0          1.25      0.000000
4         2  2023-01-01 00:10:29  2023-01-01 00:21:19               1.0           1.43  ...                  1.0          19.68                  2.5          0.00      2.293706
5         2  2023-01-01 00:50:34  2023-01-01 01:02:52               1.0           1.84  ...                  1.0          27.80                  2.5          0.00      5.434783
6         2  2023-01-01 00:09:22  2023-01-01 00:19:49               1.0           1.66  ...                  1.0          20.52                  2.5          0.00      2.060241
7         2  2023-01-01 00:27:12  2023-01-01 00:49:56               1.0          11.70  ...                  1.0          64.44                  2.5          0.00      0.917949
8         2  2023-01-01 00:21:44  2023-01-01 00:36:40               1.0           2.95  ...                  1.0          28.38                  2.5          0.00      1.925424
9         2  2023-01-01 00:39:42  2023-01-01 00:50:36               1.0           3.01  ...                  1.0          19.90                  2.5          0.00      0.000000

[10 rows x 20 columns]

to_pandas以外にも、扱いたいデータ形式に応じてto_duckdb, to_ray, to_arrow などに変換できます。

Namespaceを参照

カタログ内のテーブルをリストします。

print(catalog.list_tables("default"))
[('default', 'taxi_dataset')]

テーブルをロードしてみる

table = catalog.load_table(
    "default.taxi_dataset",
)
print(table.scan().to_arrow())

実行結果

[('default', 'taxi_dataset')]
pyarrow.Table
VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: large_string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
tip_per_mile: double
----
VendorID: [[2,2,2,1,2,...,2,2,1,1,1],[1,2,2,2,2,...,1,1,1,2,2],...,[2,2,2,2,2,...,2,2,2,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
tpep_pickup_datetime: [[2023-01-01 00:32:10.000000,2023-01-01 00:55:08.000000,2023-01-01 00:25:04.000000,2023-01-01 00:03:48.000000,2023-01-01 00:10:29.000000,...,2023-01-02 21:16:11.000000,2023-01-02 21:56:02.000000,2023-01-02 21:04:31.000000,2023-01-02 21:13:09.000000,2023-01-02 21:45:30.000000],[2023-01-02 21:49:54.000000,2023-01-02 21:17:06.000000,2023-01-02 21:35:06.000000,2023-01-02 21:18:43.000000,2023-01-02 21:24:42.000000,...,2023-01-04 14:04:17.000000,2023-01-04 14:27:49.000000,2023-01-04 14:44:46.000000,2023-01-04 14:35:46.000000,2023-01-04 14:52:44.000000],...,[2023-01-30 20:07:47.000000,2023-01-30 20:28:57.000000,2023-01-30 19:59:53.000000,2023-01-30 20:21:42.000000,2023-01-30 20:09:59.000000,...,2023-01-10 08:10:07.000000,2023-01-10 08:51:52.000000,2023-01-10 08:13:34.000000,2023-01-10 08:29:03.000000,2023-01-10 08:49:00.000000],[2023-01-10 08:30:00.000000,2023-01-10 08:34:07.000000,2023-01-10 08:06:16.000000,2023-01-10 08:47:26.000000,2023-01-10 08:43:51.000000,...,2023-01-31 23:58:34.000000,2023-01-31 23:31:09.000000,2023-01-31 23:01:05.000000,2023-01-31 23:40:00.000000,2023-01-31 23:07:32.000000]]
[[2023-01-01 00:40:36.000000,2023-01-01 01:01:27.000000,2023-01-01 00:37:49.000000,2023-01-01 00:13:25.000000,2023-01-01 00:21:19.000000,...,2023-01-02 21:22:04.000000,2023-01-02 22:02:42.000000,2023-01-02 21:08:06.000000,2023-01-02 21:31:43.000000,2023-01-02 21:48:18.000000],[2023-01-02 22:23:48.000000,2023-01-02 21:41:59.000000,2023-01-02 22:00:39.000000,2023-01-02 21:24:23.000000,2023-01-02 21:51:41.000000,...,2023-01-04 14:07:51.000000,2023-01-04 14:40:33.000000,2023-01-04 15:13:24.000000,2023-01-04 14:41:59.000000,2023-01-04 15:18:58.000000],...,[2023-01-30 20:24:09.000000,2023-01-30 20:37:27.000000,2023-01-30 20:16:07.000000,2023-01-30 20:32:01.000000,2023-01-30 20:17:23.000000,...,2023-01-10 08:41:22.000000,2023-01-10 09:12:03.000000,2023-01-10 08:20:49.000000,2023-01-10 08:45:05.000000,2023-01-10 09:42:00.000000],[2023-01-10 08:38:00.000000,2023-01-10 08:41:48.000000,2023-01-10 08:24:17.000000,2023-01-10 09:08:22.000000,2023-01-10 09:18:55.000000,...,2023-02-01 00:12:33.000000,2023-01-31 23:50:36.000000,2023-01-31 23:25:36.000000,2023-01-31 23:53:00.000000,2023-01-31 23:21:56.000000]]
passenger_count: [[1,1,1,0,1,...,1,1,1,2,1],[1,1,2,1,2,...,0,2,2,1,1],...,[1,1,2,1,1,...,null,null,null,null,null],[null,null,null,null,null,...,null,null,null,null,null]]
trip_distance: [[0.97,1.1,2.51,1.9,1.43,...,1.59,0.74,0.9,4.3,0.5],[7.9,10.13,17.71,0.61,14.41,...,0.5,1.2,4.7,0.94,3.83],...,[1.95,1.53,3.25,2.23,1.16,...,5.84,2.66,1.22,1.7,20.95],[0.88,1.11,2.03,3.12,5.51,...,3.05,5.8,4.67,3.15,2.85]]
RatecodeID: [[1,1,1,1,1,...,1,1,1,1,1],[1,1,2,1,1,...,1,1,1,1,1],...,[1,1,1,1,1,...,null,null,null,null,null],[null,null,null,null,null,...,null,null,null,null,null]]
store_and_fwd_flag: [["N","N","N","N","N",...,"N","N","Y","N","N"],["N","N","N","N","N",...,"N","N","N","N","N"],...,["N","N","N","N","N",...,null,null,null,null,null],[null,null,null,null,null,...,null,null,null,null,null]]
PULocationID: [[161,43,48,138,107,...,233,79,68,237,234],[164,132,132,140,132,...,107,158,79,246,113],...,[263,239,237,50,237,...,37,113,246,50,141],[262,170,141,4,80,...,107,112,114,230,262]]
DOLocationID: [[141,237,238,7,79,...,141,211,158,114,164],[173,225,164,237,129,...,234,79,236,234,52],...,[238,237,68,238,229,...,195,161,100,170,132],[236,161,43,161,72,...,48,75,239,79,143]]
payment_type: [[2,1,1,1,1,...,1,1,1,1,1],[2,1,2,2,1,...,1,1,2,1,1],...,[1,1,1,1,1,...,0,0,0,0,0],[0,0,0,0,0,...,0,0,0,0,0]]
...

列を追加してみる

tip_amount と trip_distance から乗車距離に対するチップの比率を計算します。

import pyarrow.compute as pc
df = df.append_column("tip_per_mile", pc.divide(df["tip_amount"], df["trip_distance"]))

pyarrow の スキーマに tip_per_mile が追加されました。

VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
tip_per_mile: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492

pyarrow のスキーマで Iceberg テーブルのスキーマを更新して、先ほど計算した df で上書きします 。

with table.update_schema() as update_schema:
    update_schema.union_by_name(df.schema)
table.overwrite(df)

スキーマが更新できました。

table.schema
<bound method Table.schema of taxi_dataset(
  1: VendorID: optional long,
  2: tpep_pickup_datetime: optional timestamp,
  3: tpep_dropoff_datetime: optional timestamp,
  4: passenger_count: optional double,
  5: trip_distance: optional double,
  6: RatecodeID: optional double,
  7: store_and_fwd_flag: optional string,
  8: PULocationID: optional long,
  9: DOLocationID: optional long,
  10: payment_type: optional long,
  11: fare_amount: optional double,
  12: extra: optional double,
  13: mta_tax: optional double,
  14: tip_amount: optional double,
  15: tolls_amount: optional double,
  16: improvement_surcharge: optional double,
  17: total_amount: optional double,
  18: congestion_surcharge: optional double,
  19: airport_fee: optional double,
  20: tip_per_mile: optional double
),
partition by: [],
sort order: [],
snapshot: Operation.APPEND: id=1392500968734037032, parent_id=8214272341143535658, schema_id=1>
print(table.scan().to_arrow().select(['tip_per_mile']).slice())
pyarrow.Table
tip_per_mile: double
----
tip_per_mile: [[0,3.6363636363636362,5.976095617529881,0,2.2937062937062938,...,2.138364779874214,3.4864864864864864,1.2666666666666666,0.6976744186046512,6.4],[0,1.085883514313919,0,0,0.8147120055517002,...,3.6,2.4166666666666665,0,1.0638297872340425,1.4255874673629243],...,[1.0256410256410258,2.5620915032679736,1.44,1.5964125560538116,2.4655172413793105,...,0.11986301369863013,0.9473684210526315,1.4098360655737705,2.4882352941176475,0.8357995226730311],[2.8977272727272725,2.6036036036036037,1.6354679802955665,0.641025641025641,0.061705989110707814,...,1.298360655737705,0.45517241379310347,1.1391862955032122,1.4063492063492062,0.7017543859649122]]