導入
pip install --upgrade pip pip install "pyiceberg[s3fs,hive]"
用途に応じて以下を依存関係に加えられます.
キー | 説明 |
---|---|
hive | Hiveメタストアのサポート |
glue | AWS Glueのサポート |
dynamodb | AWS DynamoDBのサポート |
sql-postgres | PostgreSQLによってバックアップされるSQLカタログのサポート |
sql-sqlite | SQLiteによってバックアップされるSQLカタログのサポート |
pyarrow | オブジェクトストアと対話するためのFileIO実装としてのPyArrow |
pandas | PyArrowとPandasの両方をインストール |
duckdb | PyArrowとDuckDBの両方をインストール |
ray | PyArrow、Pandas、およびRayをインストール |
daft | Daftをインストール |
s3fs | オブジェクトストアと対話するためのFileIO実装としてのS3FS |
adlfs | オブジェクトストアと対話するためのFileIO実装としてのADLFS |
snappy | snappy Avro圧縮のサポート |
gcsfs | オブジェクトストアと対話するためのFileIO実装としてのGCSFS |
本記事では追加で以下を利用します。
sqlalchemy pyarrow
サンプルデータ準備
定番のTLC Trip Record Data
を使います。
mkdir data curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o data/yellow_tripdata_2023-01.parquet/yellow_tripdata_2023-01.parquet
Icebergカタログを作成する
今回は検証用にライトに使える、SQLite ベースの SQLCatalog を使用します。
Icebergカタログについて詳しく知りたい方は以下をご参照ください。
Icebergカタログ、テーブルに関する情報を保存するディレクトリを作成します。
mkdir warehouse
SQLCatalogを作成します。
from pyiceberg.catalog.sql import SqlCatalog warehouse_path = "./warehouse" catalog = SqlCatalog( "default", **{ "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", "warehouse": f"file://{warehouse_path}", }, )
warehouse ディレクトリに pyiceberg_catalog.db が作成されました。
pyiceberg_env ❯ ls warehouse
pyiceberg_catalog.db
Icebergテーブルを作成する
サンプルデータをロードして taxi_dataset テーブルを作成します。
import pyarrow.parquet as pq # Read a parquet file as a pyarrow table df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet") catalog.create_namespace("default") table = catalog.create_table( "default.taxi_dataset", schema=df.schema, )
warehouse ディレクトリにIceberg テーブルを構成するメタデータ、データファイルが作成されました。
Iceberg テーブルのアーキテクチャについて詳しく知りたい方は以下をご参照ください。
pyiceberg_env ❯ tree warehouse/default.db/taxi_dataset warehouse/default.db/taxi_dataset ├── data │ ├── 00000-0-8380a671-5f0e-4766-848b-e575944b87f5.parquet │ └── 00000-0-9f292d06-4479-4a44-816c-1dc8742767ec.parquet └── metadata ├── 00000-4a71b880-2fb2-4ee3-8334-9b7378891b01.metadata.json ├── 00001-b99b531e-a3f0-4e8c-9d49-a6043b372f7b.metadata.json ├── 00002-87324e52-945e-4fec-a716-eae47383c12f.metadata.json ├── 00003-ef08bd54-6ca8-4d5e-8b44-3e1c0e161b63.metadata.json ├── 6e5558ef-c56f-4914-a6cd-8b4d29804fa6-m0.avro ├── 8380a671-5f0e-4766-848b-e575944b87f5-m0.avro ├── 9f292d06-4479-4a44-816c-1dc8742767ec-m0.avro ├── snap-4809862839345096711-0-6e5558ef-c56f-4914-a6cd-8b4d29804fa6.avro ├── snap-6164888634227151801-0-8380a671-5f0e-4766-848b-e575944b87f5.avro └── snap-6534028523732720220-0-9f292d06-4479-4a44-816c-1dc8742767ec.avro
テーブルを pandas として参照してみます。
import pyarrow.parquet as pq print(table.scan().to_pandas().head(10))
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance ... improvement_surcharge total_amount congestion_surcharge airport_fee tip_per_mile 0 2 2023-01-01 00:32:10 2023-01-01 00:40:36 1.0 0.97 ... 1.0 14.30 2.5 0.00 0.000000 1 2 2023-01-01 00:55:08 2023-01-01 01:01:27 1.0 1.10 ... 1.0 16.90 2.5 0.00 3.636364 2 2 2023-01-01 00:25:04 2023-01-01 00:37:49 1.0 2.51 ... 1.0 34.90 2.5 0.00 5.976096 3 1 2023-01-01 00:03:48 2023-01-01 00:13:25 0.0 1.90 ... 1.0 20.85 0.0 1.25 0.000000 4 2 2023-01-01 00:10:29 2023-01-01 00:21:19 1.0 1.43 ... 1.0 19.68 2.5 0.00 2.293706 5 2 2023-01-01 00:50:34 2023-01-01 01:02:52 1.0 1.84 ... 1.0 27.80 2.5 0.00 5.434783 6 2 2023-01-01 00:09:22 2023-01-01 00:19:49 1.0 1.66 ... 1.0 20.52 2.5 0.00 2.060241 7 2 2023-01-01 00:27:12 2023-01-01 00:49:56 1.0 11.70 ... 1.0 64.44 2.5 0.00 0.917949 8 2 2023-01-01 00:21:44 2023-01-01 00:36:40 1.0 2.95 ... 1.0 28.38 2.5 0.00 1.925424 9 2 2023-01-01 00:39:42 2023-01-01 00:50:36 1.0 3.01 ... 1.0 19.90 2.5 0.00 0.000000 [10 rows x 20 columns]
to_pandas以外にも、扱いたいデータ形式に応じてto_duckdb, to_ray, to_arrow などに変換できます。
Namespaceを参照
カタログ内のテーブルをリストします。
print(catalog.list_tables("default"))
[('default', 'taxi_dataset')]
テーブルをロードしてみる
table = catalog.load_table( "default.taxi_dataset", ) print(table.scan().to_arrow())
実行結果
[('default', 'taxi_dataset')] pyarrow.Table VendorID: int64 tpep_pickup_datetime: timestamp[us] tpep_dropoff_datetime: timestamp[us] passenger_count: double trip_distance: double RatecodeID: double store_and_fwd_flag: large_string PULocationID: int64 DOLocationID: int64 payment_type: int64 fare_amount: double extra: double mta_tax: double tip_amount: double tolls_amount: double improvement_surcharge: double total_amount: double congestion_surcharge: double airport_fee: double tip_per_mile: double ---- VendorID: [[2,2,2,1,2,...,2,2,1,1,1],[1,2,2,2,2,...,1,1,1,2,2],...,[2,2,2,2,2,...,2,2,2,2,2],[2,2,2,2,2,...,2,2,2,2,2]] tpep_pickup_datetime: [[2023-01-01 00:32:10.000000,2023-01-01 00:55:08.000000,2023-01-01 00:25:04.000000,2023-01-01 00:03:48.000000,2023-01-01 00:10:29.000000,...,2023-01-02 21:16:11.000000,2023-01-02 21:56:02.000000,2023-01-02 21:04:31.000000,2023-01-02 21:13:09.000000,2023-01-02 21:45:30.000000],[2023-01-02 21:49:54.000000,2023-01-02 21:17:06.000000,2023-01-02 21:35:06.000000,2023-01-02 21:18:43.000000,2023-01-02 21:24:42.000000,...,2023-01-04 14:04:17.000000,2023-01-04 14:27:49.000000,2023-01-04 14:44:46.000000,2023-01-04 14:35:46.000000,2023-01-04 14:52:44.000000],...,[2023-01-30 20:07:47.000000,2023-01-30 20:28:57.000000,2023-01-30 19:59:53.000000,2023-01-30 20:21:42.000000,2023-01-30 20:09:59.000000,...,2023-01-10 08:10:07.000000,2023-01-10 08:51:52.000000,2023-01-10 08:13:34.000000,2023-01-10 08:29:03.000000,2023-01-10 08:49:00.000000],[2023-01-10 08:30:00.000000,2023-01-10 08:34:07.000000,2023-01-10 08:06:16.000000,2023-01-10 08:47:26.000000,2023-01-10 08:43:51.000000,...,2023-01-31 23:58:34.000000,2023-01-31 23:31:09.000000,2023-01-31 23:01:05.000000,2023-01-31 23:40:00.000000,2023-01-31 23:07:32.000000]] [[2023-01-01 00:40:36.000000,2023-01-01 01:01:27.000000,2023-01-01 00:37:49.000000,2023-01-01 00:13:25.000000,2023-01-01 00:21:19.000000,...,2023-01-02 21:22:04.000000,2023-01-02 22:02:42.000000,2023-01-02 21:08:06.000000,2023-01-02 21:31:43.000000,2023-01-02 21:48:18.000000],[2023-01-02 22:23:48.000000,2023-01-02 21:41:59.000000,2023-01-02 22:00:39.000000,2023-01-02 21:24:23.000000,2023-01-02 21:51:41.000000,...,2023-01-04 14:07:51.000000,2023-01-04 14:40:33.000000,2023-01-04 15:13:24.000000,2023-01-04 14:41:59.000000,2023-01-04 15:18:58.000000],...,[2023-01-30 20:24:09.000000,2023-01-30 20:37:27.000000,2023-01-30 20:16:07.000000,2023-01-30 20:32:01.000000,2023-01-30 20:17:23.000000,...,2023-01-10 08:41:22.000000,2023-01-10 09:12:03.000000,2023-01-10 08:20:49.000000,2023-01-10 08:45:05.000000,2023-01-10 09:42:00.000000],[2023-01-10 08:38:00.000000,2023-01-10 08:41:48.000000,2023-01-10 08:24:17.000000,2023-01-10 09:08:22.000000,2023-01-10 09:18:55.000000,...,2023-02-01 00:12:33.000000,2023-01-31 23:50:36.000000,2023-01-31 23:25:36.000000,2023-01-31 23:53:00.000000,2023-01-31 23:21:56.000000]] passenger_count: [[1,1,1,0,1,...,1,1,1,2,1],[1,1,2,1,2,...,0,2,2,1,1],...,[1,1,2,1,1,...,null,null,null,null,null],[null,null,null,null,null,...,null,null,null,null,null]] trip_distance: [[0.97,1.1,2.51,1.9,1.43,...,1.59,0.74,0.9,4.3,0.5],[7.9,10.13,17.71,0.61,14.41,...,0.5,1.2,4.7,0.94,3.83],...,[1.95,1.53,3.25,2.23,1.16,...,5.84,2.66,1.22,1.7,20.95],[0.88,1.11,2.03,3.12,5.51,...,3.05,5.8,4.67,3.15,2.85]] RatecodeID: [[1,1,1,1,1,...,1,1,1,1,1],[1,1,2,1,1,...,1,1,1,1,1],...,[1,1,1,1,1,...,null,null,null,null,null],[null,null,null,null,null,...,null,null,null,null,null]] store_and_fwd_flag: [["N","N","N","N","N",...,"N","N","Y","N","N"],["N","N","N","N","N",...,"N","N","N","N","N"],...,["N","N","N","N","N",...,null,null,null,null,null],[null,null,null,null,null,...,null,null,null,null,null]] PULocationID: [[161,43,48,138,107,...,233,79,68,237,234],[164,132,132,140,132,...,107,158,79,246,113],...,[263,239,237,50,237,...,37,113,246,50,141],[262,170,141,4,80,...,107,112,114,230,262]] DOLocationID: [[141,237,238,7,79,...,141,211,158,114,164],[173,225,164,237,129,...,234,79,236,234,52],...,[238,237,68,238,229,...,195,161,100,170,132],[236,161,43,161,72,...,48,75,239,79,143]] payment_type: [[2,1,1,1,1,...,1,1,1,1,1],[2,1,2,2,1,...,1,1,2,1,1],...,[1,1,1,1,1,...,0,0,0,0,0],[0,0,0,0,0,...,0,0,0,0,0]] ...
列を追加してみる
tip_amount と trip_distance から乗車距離に対するチップの比率を計算します。
import pyarrow.compute as pc df = df.append_column("tip_per_mile", pc.divide(df["tip_amount"], df["trip_distance"]))
pyarrow の スキーマに tip_per_mile が追加されました。
VendorID: int64 tpep_pickup_datetime: timestamp[us] tpep_dropoff_datetime: timestamp[us] passenger_count: double trip_distance: double RatecodeID: double store_and_fwd_flag: string PULocationID: int64 DOLocationID: int64 payment_type: int64 fare_amount: double extra: double mta_tax: double tip_amount: double tolls_amount: double improvement_surcharge: double total_amount: double congestion_surcharge: double airport_fee: double tip_per_mile: double -- schema metadata -- pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492
pyarrow のスキーマで Iceberg テーブルのスキーマを更新して、先ほど計算した df で上書きします 。
with table.update_schema() as update_schema: update_schema.union_by_name(df.schema) table.overwrite(df)
スキーマが更新できました。
table.schema
<bound method Table.schema of taxi_dataset( 1: VendorID: optional long, 2: tpep_pickup_datetime: optional timestamp, 3: tpep_dropoff_datetime: optional timestamp, 4: passenger_count: optional double, 5: trip_distance: optional double, 6: RatecodeID: optional double, 7: store_and_fwd_flag: optional string, 8: PULocationID: optional long, 9: DOLocationID: optional long, 10: payment_type: optional long, 11: fare_amount: optional double, 12: extra: optional double, 13: mta_tax: optional double, 14: tip_amount: optional double, 15: tolls_amount: optional double, 16: improvement_surcharge: optional double, 17: total_amount: optional double, 18: congestion_surcharge: optional double, 19: airport_fee: optional double, 20: tip_per_mile: optional double ), partition by: [], sort order: [], snapshot: Operation.APPEND: id=1392500968734037032, parent_id=8214272341143535658, schema_id=1>
print(table.scan().to_arrow().select(['tip_per_mile']).slice())
pyarrow.Table tip_per_mile: double ---- tip_per_mile: [[0,3.6363636363636362,5.976095617529881,0,2.2937062937062938,...,2.138364779874214,3.4864864864864864,1.2666666666666666,0.6976744186046512,6.4],[0,1.085883514313919,0,0,0.8147120055517002,...,3.6,2.4166666666666665,0,1.0638297872340425,1.4255874673629243],...,[1.0256410256410258,2.5620915032679736,1.44,1.5964125560538116,2.4655172413793105,...,0.11986301369863013,0.9473684210526315,1.4098360655737705,2.4882352941176475,0.8357995226730311],[2.8977272727272725,2.6036036036036037,1.6354679802955665,0.641025641025641,0.061705989110707814,...,1.298360655737705,0.45517241379310347,1.1391862955032122,1.4063492063492062,0.7017543859649122]]