Bering Note – formerly 流沙河鎮

情報技術系のこと書きます。

OpenSearchCon North America 2025「Uberが開発したOpenSearchのPull-Based Ingestionを深く理解する」- Varun Bharadwaj, Uber & Yupeng Fu, Uber

OpenSearchCon North America 2025のセッション「Deep Dive Into OpenSearch Pull-Based Ingestion at Uber」をまとめます。

www.youtube.com

資料

https://static.sched.com/hosted_files/opensearchconna2025/95/%5BExternal%5D%5BOpenSearchCon%2025%20US%5D%5BPull-based%20Ingestion%5D.pdf

スピーカー

このセッションは、Uberでオンラインデータシステムを担当するエンジニアのYupeng Fu氏と、同じくUberのSearch Platform TeamでSoftware Engineerを務めるVarun Bharadwaj氏によって行われました。
Yupeng Fu氏はFeature Store、データベース、Kafkaストリーミング、検索といった複数のオンラインデータシステムに携わっており、OpenSearchのTSC(Technical Steering Committee)メンバーであり、コアメンテナーでもあります。Varun Bharadwaj氏はYupeng氏とともに、本セッションのテーマであるPull-Based Ingestion機能の開発に貢献してきました。この機能はUberから提案され、コミュニティとAWSの支援を受けながらOpenSearch 3.0で利用可能になったものです。

リアルタイムが本質のUberと検索基盤への要求


OpenSearchの新機能Pull-Based IngestionをUberの事例から掘り下げるセッションです。

Pull-Based Ingestionは、私たちUberから提案し、今年のOpenSearch 3.0で利用可能になった機能です。アイデアは昨年に着想し、複数の企業との対話やAWSからの支援・助言を受けながら開発を進めてきました。なぜこの機能がUberの運用にとって重要だったのか、その背景から説明します。

Uberにとって検索は中核機能を支える柱の一つです。最も特徴的なのは、Uberのビジネスとデータが本質的にリアルタイムであることです。タクシーを呼ぶ、オンラインで食事を注文するといった行動はリアルタイムに発生します。同時に、ドライバーの車が移動し、店舗で扱う料理が在庫切れになったり再び提供可能になったりと、大量の情報がリアルタイムに生まれます。私たちはこうしたリアルタイムな情報を絶えずプラットフォームに取り込む必要があります。

マップ検索という最も身近なユースケース


Uber RidesではMapが機能の中心にあり、ここでも検索が大きな役割を果たします。ユーザーが目的地を入力すると、リアルタイムなオートコンプリートが返ってくることを期待します。この補完にはフルテキスト検索とセマンティック検索の両方が絡みます。現在地や時間帯といったシグナルを取り込み、より精度の高い推薦につなげる仕組みです。
図はRider、Driver、Eaterそれぞれの画面に、Geocoding、Reverse Geocoding、オートコンプリート&フルテキスト検索、カテゴリ検索、予測、パーソナライゼーションといった検索機能がどう組み込まれているかを示しています。Geocodingは住所と座標を相互変換する処理で、その逆方向がReverse Geocodingにあたります。こうしたマップ検索だけでも、グローバルで月間数十億件のクエリが処理されています。

RAGやGenAIを活用する新しいユースケースも生まれており、ここでも検索は基盤の重要な構成要素になっています。RAG(Retrieval-Augmented Generation)は、外部から取得した情報を補ってLLMの回答を生成する手法です。リアルタイム性とストリーミングデータが効いてくるのは、最新の知識と情報を取り込むためです。
図はカスタマーサポートの体験を、Predict/Understand、Resolve、Learn and Improveというユーザーのサポートジャーニーとして描いています。「ドキュメントに問題があってオンラインにできないのか」と困るドライバー、「ドライバーが進んでいないのにキャンセル料を請求されるのは納得できない」と訴える利用者。こうした問い合わせに対し、問題が起きる前後のユーザーイベントを取り込んで予測・理解し、エージェントによる自動・半自動の対応へつなげ、対応結果を分析して改善に回します。サンフランシスコ空港に降り立った利用者に手荷物の状況を返したり、着地を検知してより良い推薦を出したりするには、リアルタイムな情報が欠かせません。これが検索プラットフォームに新たな要求を突きつけることになります。

検索プラットフォームに課される要求


以上を踏まえると、Uberの検索プラットフォームには複数の要求が課されます。データは絶え間なくストリーミングで流れ込みます。これにはユーザーがアプリをクリックしたりコンテンツにカーソルを合わせたりといったユーザー由来のアクションに加え、ログやメトリクスのような社内サービスが生成する情報も含まれます。
クエリ性能の速さは顧客体験のために当然求められます。鮮度については、ユースケースによって要求が異なります。配車では、ボタンを押して車を待つ利用者がマッチング結果をできるだけ早く見たいと考え、ドライバーの位置も刻々と変わるため、データが古いとマッチング体験が損なわれます。一方で、レストランのカタログのように頻繁には更新されない情報は、ニアリアルタイムで十分です。私たちはこうした異なる要求のバランスを取り、最良の体験とシステムの単純さを両立させようとしています。
機能要件以外にも、水平方向のスケーラビリティ、高可用性とall-activeな構成、そしてコスト効率が挙げられます。all-activeは複数の拠点をすべて稼働状態で運用する形を指します。これらがなぜ重要なのかは、後段の構成で具体的に見えてきます。

Delivery向けアーキテクチャとオールアクティブ構成を支えるKafka


Delivery(配達)向けの検索プラットフォームは、Kafkaからのストリーミングインジェスト、Reader/Writerの分離、フェデレーションとアクセス制御を担うGateway、並列度を高めるVirtual Shard、セグメントを共有するRemote Storage、ブートストラップやバックフィルのためのOffline Buildという要素で構成されています。
データの流れは図のとおりです。アプリからの書き込みはGatewayを経由してKafkaへ送られ、Ingesterがそれを読み取ってインデックスを生成します。Ingesterはここでドキュメントをソートし、一定の順序を保ったうえでインデックスをRemote Storageへアップロードします。検索側のSearcherはそのインデックスをダウンロードしてクエリに応答します。読み取りはGateway経由でSearcherに届く形です。このように書き込みと読み取りの経路を明確に分けているのが、この設計の特徴になります。
ここでの分離は、reader(検索ノード)とwriter(インジェストノード)を独立させる構成を指します。私たちはこれらを別々に動かすことで、インデックス生成と検索処理が互いの負荷に影響されないようにしています。インデックス内の親子関係(parent-child)の管理については別セッションで掘り下げており、本セッションではストリーミングの部分に重点を置いて説明します。

Uberのインフラは検索に限らず、全体が複数リージョンでのall-active構成で動いています。これは2つのリージョンをどちらも稼働状態に保ち、片方のリージョンで障害が起きたらもう片方へフェイルオーバーできるようにする設計です。空港へ向かう人がいつでもUberで配車を呼べると期待するように、サービスが常に利用できることが前提になっているため、all-activeは私たちのインフラの土台に深く組み込まれています。
長年このインフラに投資してきて得た重要な洞察は、all-active構成がKafkaに大きく依存しているということです。Kafkaがストリーミングデータを保持しているため、Kafka自体にクロスリージョンのレプリケーションを持たせれば、リージョンをまたいだサービスやデータベース間の整合性管理にまつわる複雑さの多くを解消できます。
図の構成では、各リージョンのプロデューサーがローカルのRegional Kafkaへ書き込みます。レプリケーションサービスがリージョン間でデータを複製し、グローバルな全体像を持つAggregated Kafkaへ集約します。検索をはじめとする各システムはこのAggregated Kafkaを読み取るため、両リージョンのSearchクラスターは結果整合(eventually consistent)な状態を保ちます。

Project Sunrise


検索のような最先端の領域では、業界の進化やイノベーションに歩調を合わせ続ける必要があります。私たちは内部の検索プラットフォームで培ってきたモダンなアーキテクチャを、オープンソースコミュニティと協働しながらプロジェクトへ還元していく道を選びました。この取り組みがProject Sunriseです。
背景には、製品の反復が速く、プラットフォーム要件が次々に立ち上がるという課題があります。range queryのサポート、セマンティック検索、マルチモーダル検索といった新機能の要望に加え、検索結果の関連性やクエリ理解、taxonomy(分類体系)といった検索品質の向上も求められます。GPUによるアクセラレーションのように、検索技術そのものも業界で速く進化しています。
これらに対する答えとして、私たちはOpenSearchという業界のオープンスタンダードを採用しました。数百万のユーザーに広く使われている事実上の標準であること、デバッグやツール、監視・アラート・セキュリティ・機械学習向けの拡張プラグインが揃っていること、開発者やコントリビューターによる活発なコミュニティと豊富なドキュメントがあることが決め手になっています。

Uberは2024年9月16日にLinux Foundationが発表したOpenSearch Software Foundationの創設メンバーとして参加しました。これは検索とアナリティクスにおけるオープンな協働を促進する財団です。
この参加に合わせて、私たちはUberとAWSの間でOpenSearch開発に関する戦略的パートナーシップを結び、OpenSearch 2025のロードマップを共同で策定しました。その後も継続的にコントリビューションを重ねており、本カンファレンス初日にはgRPC関連のコントリビューションを発表しています。本セッションで扱うPull-Based Ingestionも、こうした貢献の一例です。

Why Pull-based Ingestion


Pull-Based Ingestionは、図の破線で囲まれたKafkaとIngesterの部分、つまりインジェスト経路に関わる仕組みです。私たちはこの方式から複数の利点を得ています。
プロデューサーとコンシューマーを分離できるため、インジェストのスパイク(急増)を平滑化できます。コンシューマー側はKafka上のメッセージを自分のペース、自分のスループットで引き取れるので、レイテンシを犠牲にせずに済みます。また従来のpushベースのインジェストと異なり、translog(トランザクションログ。書き込みを永続化するためにOpenSearchが持つ追記ログ)をスキップできるため、インジェストのスループットが向上します。Kafkaがメッセージを保持していることから、メッセージのリプレイによる復旧も可能になります。そして先に触れたとおり、OpenSearchクラスターのall-active構成を支える基盤にもなります。
この機能の提案と設計はGitHub issue #16930にまとめられています。

ここからの技術的な解説は、Pull-Based Ingestionの機能(Features)、高レベルのアーキテクチャ、サンプルのKafkaメッセージが各コンポーネントをどう流れるかというデータフロー、Uberでの実運用、そしてロードマップという流れで進みます。詳細な設計の議論はissue #16930に対応しています。

Pull-Based Ingestionの全体アーキテクチャと用語


Pull-Based IngestionはOpenSearch 3.0の時点でKafkaとKinesisの2つのプラグインを備えており、どちらもセグメントレプリケーションとリモートストアの構成で動作します。セグメントレプリケーションは、プライマリシャードで生成したLuceneセグメントをレプリカへそのまま配布する方式で、ドキュメントを各レプリカで再インデックスする必要がありません。
データの取り込みは、ドキュメント全体を入れ替えるfull-document upsertに対応します。各ドキュメントが外部から付与されたバージョンを持つexternal versioningにも対応しており、ストリーム側で管理されたバージョンを使って新旧の判定を行えます。加えて、取り込みを制御するためのIngestion management APIも用意されています。

KafkaとKinesis、そしてOpenSearch APIは似た概念を別々の名前で呼んでいるため、対応関係を押さえておくと以降の説明が読みやすくなります。
イベントの集まりをKafkaではTopic、KinesisではStreamと呼び、OpenSearch APIではSourceにあたります。これがKafkaではPartition、KinesisではShardに分割され、OpenSearch側でもShardという単位に対応します。OpenSearchの各シャードは、1つのKafkaパーティションまたはKinesisシャードから取り込む形です。データの1件はKafkaのMessage、KinesisのRecordで、OpenSearchではMessageと呼びます。パーティション内で各データを一意に指す識別子は、KafkaではOffset、KinesisではSequence Number、OpenSearch側ではPointerです。なお書き込み側のProducerに相当する役割はOpenSearch APIには存在せず、OpenSearchは読み取り側のConsumerとして動作します。

アーキテクチャの全体像では、緑が既存のOpenSearchコンポーネント、紫がPull-Based Ingestionで新たに導入したコンポーネントを表します。図の上にあるStreaming SourceがKafkaまたはKinesisで、ノードはそこからデータをPullします。
ノード内のエンジンには、Pull-Based Ingestion向けの新しい派生としてIngestion Engineを導入しました。このエンジンはStreamPollerを保持します。StreamPollerはコンシューマの実装で、ストリーミングソースを読み取ってLuceneへインデックスする役割を担います。取り込み元を切り替えられるよう、拡張点となるIngestion Pluginインターフェースも新設しました。KafkaとKinesisの各プラグインはこのインターフェースを実装したものです。従来のtranslogを使わないため、translogマネージャはNoopTranslogMgr(何もしない実装)に置き換わっています。
インデックスされたデータには、通常のフィールドに加えてオフセット情報などのメタデータを付与します。コミットデータにはバッチの読み取り再開位置を示すbatch start pointer(図中のingestion_batch_start)を保存し、シャードがどこまで取り込んだかをコミットとともに記録します。
インデックス作成時の指定方法は、左側のPUT /my-indexリクエストに表れています。settings内に新設のingestion_sourceを置き、typekafkaparamにtopic名やbootstrap_serversを指定します。replication.typeSEGMENTを指定する形です。CreateIndexリクエストを受け取るとIndicesServiceがIngestion Engineを生成し、StreamPollerがソースからのPullを開始します。設計の詳細はissue #16495にまとまっています。

Kafkaメッセージが流れる各コンポーネント


Pull-Based Ingestionでは、Kafka/KinesisのメッセージはStream Consumer、In-memory blocking queue、Message Processor、Ingestion Engineという順で流れ、最終的にLucene Indexに書き込まれます。入口となるStreaming Sourceは、KafkaまたはKinesisです。これらは耐久性のあるメッセージングシステムであり、リカバリの観点でtranslogの役割を肩代わりします。translogは通常のOpenSearchが書き込み内容を一時記録して障害回復に使う追記ログですが、Pull-Based Ingestionはこれに依存しません。
シャードが何らかの理由でダウンして復旧が必要になった場合は、既存のインデックスを調べてどのoffsetから再生すべきかを特定し、そこまで巻き戻してメッセージをリプレイすることで直近の取り込みを取り戻します。OpenSearchの各シャードはKafkaのパーティションと1対1の静的マッピングを持ちます。2シャードのインデックスであれば、シャード0はパーティション0から、シャード1はパーティション1から読み取るという形です。
右側のKafkaメッセージのサンプルは {"_id":"1", "_source":{"name": "n1", "age": 25}} という構造です。ドキュメントIDと、インデックス対象のフィールドをまとめた _source を定義しています。

Stream Consumerは、KafkaやKinesisといったソースごとのコンシューマ実装を保持するコンポーネントです。内部に無限ループを持ち、定期的にデータをPullします。メッセージはバッチ単位で消費し、取り込む前にいくつかの検証を行います。
検証の一環として、そのメッセージが既に処理済みかどうかを確認します。処理済みのメッセージであればスキップし、重複した計算を避けます。これによってイベントのexactly-once処理、つまり1件のイベントを過不足なく一度だけ処理することを保証しようとしています。検証を終えたメッセージのバッチは、In-memory blocking queueへ書き込みます。

In-memory blocking queueは、ConsumerスレッドとProcessorスレッドを切り離すために置かれています。両者を分離することで、Consumer側とWriter側のスレッドを独立してスケールできます。
さらに、流入するメッセージをパーティション分割し、複数のWriterスレッドを持たせることも可能になります。図の下段はWriterスレッドを2本持つ構成です。blocking queueのインスタンスを2つ用意し、それぞれのキューが自分専用のWriterスレッドを持ちます。各Writerスレッドはキューからメッセージを読み取り、Lucene Indexへ書き込むという形です。

Message Processorは、イベントを処理するメインのWriterスレッドです。In-memory queueからメッセージを読み取り、イベントをパースしてLuceneドキュメントを組み立て、そこにメタデータを付加します。
ここで付加するメタデータは主にoffset情報です。保存する各ドキュメントには、対応するメッセージのoffsetをPointFieldとStoredFieldの両方で持たせます。PointFieldは、シャード復旧時に取り込み開始地点より先のoffsetをすべて取得するレンジクエリに使います。StoredFieldは値のルックアップに使うという役割分担です。

Ingestion Engineは、Lucene Indexに対するインターフェースです。push型のモデルと同様に、create・index・deleteの3モードに対応します。
translogを持たない方針のため、Ingestion EngineはNoopTranslogManager、すなわち何もしないtranslog実装を使ってtranslog操作をスキップします。Ingestion EngineはWriterスレッドから呼び出されるため、正常に処理し終えた最新メッセージを追跡しておく必要があります。これがシャード復旧時の開始地点になります。この最新のoffsetはbatch start pointerと呼ばれ、live commit dataに記録されます。シャードを復旧する際にはここを起点にリプレイします。

パーティション分割されたblocking queueの動き


Writerスレッドを2本持つ具体例で、blocking queueの挙動を見ると仕組みが分かりやすくなります。In-memory blocking queueのインスタンスが2つあり、Kafkaトピックにはドキュメント1から8のメッセージが順番に並んでいるとします。Stream Consumerは検証を終えると、メッセージからドキュメントIDを取り出してパーティション分割し、いずれかのキューへ書き込みます。パーティション数が2のこの例では、ドキュメントIDのmod2に相当する振り分けになり、奇数のドキュメントが一方のキュー、偶数のドキュメントがもう一方のキューに入ります。
ここで重要な点が2つあります。1つ目は、パーティション分割をドキュメントIDで行っていることです。同じドキュメントに対する複数のイベントは必ず同じキューに入るため、ドキュメントレベルでの逐次処理が保証され、競合状態に陥りません。2つ目は、2つのキューを2本のWriterスレッドが独立して処理するため、進み具合が揃わない点です。図ではWriterスレッド1はドキュメント1を処理し終えて3に取りかかっており、Writterスレッド2は2・4・6まで処理を終えて8に進んでいます。
このように進捗がずれる構成では、batch start pointerをWriterスレッドをまたいだ最小のコミット済みoffsetとして取ります。最も遅れているスレッドの地点を起点にすればリプレイで取りこぼしが生じないためです。シャードが落ちて復旧する際は、この地点からメッセージを再生することになります。

シャードリカバリとレンジクエリによるオフセット復元


Pull-Based Ingestionでは、Kafkaのoffsetがインデックスの中に保存されているため、シャードの回復に他のシステムと違った仕組みを使えます。各ドキュメントには処理元メッセージのoffsetが埋め込まれており、コミットのlive commit dataには最後に処理したoffsetがbatch start pointerとして残ります。シャードが初期化されるときには、batch start pointerより先のoffsetをインデックスから取得するレンジクエリを実行します。これによって、すでに処理済みのメッセージを再び取り込んでしまう重複を避けられるのです。
図は、Kafka Partition 0にM1(offset=1)、M2(offset=2)、M3(offset=3)の3つのメッセージがあり、プライマリのShard 0がダウンした場面を示しています。M1はRemote storeへ永続化されており、レプリカが新しいプライマリへ昇格します。新しいプライマリのshard pointerと、ダウンした旧プライマリのshard pointerの違いがどう吸収されるかを、続く例で順を追って確かめます。

具体例として、3つのメッセージを持つ1つのKafkaパーティションと、プライマリ・レプリカ各1つのシャードを考えます。レプリケーションはRemote store経由で行われ、ingestion engineを使うのはプライマリ側です。
最初にM1を処理します。M1の取り込みに成功した後、flushが走ると、M1がセグメントとしてRemote storeへアップロードされて永続化されます。この時点でM1はレプリカからも見える状態になります。図ではM1がオレンジ色で処理中を示し、Remote storeはまだ「No data」から「M1 persisted」へ移る直前です。

プライマリはそのままM2、M3へと取り込みを進めます。ただしここで注意したいのは、M2とM3はまだflushされておらず、Remote storeには反映されていない点です。図ではM1とM2が緑色(処理済み)、M3がオレンジ色(処理中)になっていますが、Remote storeに永続化されているのは依然としてM1だけです。プライマリのメモリ上にはM2・M3が存在しても、レプリカや永続ストアの観点ではM1までしか確定していません。

ここでM3を処理している最中にプライマリのShard 0がダウンします。図ではプライマリが赤色になり、障害発生を示しています。Remote storeにはM1だけが永続化された状態のままで、M2とM3はプライマリの消失とともに失われた格好です。失われたメッセージをどう取り戻すかが、この後の回復処理の焦点になります。

プライマリが失われたため、レプリカを新しいプライマリへ昇格させます。図ではShard 0(Replica -> Primary)が青色になり、昇格したことを示しています。レプリカが持っているのはRemote store経由で受け取ったM1までです。昇格しただけでは失われたM2・M3は復元されないため、昇格処理の中でingestionを再開する仕組みが必要になります。

レプリカの昇格処理では、エンジンを初期化し直します。レプリカは取り込みを行わないため別のエンジンを使っていますが、プライマリになる際にこれをingestion engineへ切り替えます。
ingestion engineの初期化では、最新のコミットを参照してbatch start pointerを取り出し、その値より先のoffsetをインデックスから取得するレンジクエリを実行します。今回はM1より先のデータが存在しないため、永続化済みoffsetの集合は空です。そのうえで取り込みを再開します。batch start pointerがM2を指しているので、新しいプライマリはM2からメッセージをリプレイし、M2を処理し直します。図右側に整理されたとおり、昇格は「ingestion engineの初期化」「コミットからのbatch start pointer取得」「batch start pointerより先の永続化offsetをレンジクエリで取得」「取り込み再開」の4ステップで構成されます。

M2の処理が終わると、続いてM3を取り込み、残りのメッセージを処理します。図ではM1・M2が緑色、M3がオレンジ色の処理中となり、旧プライマリ(赤)は切り離され、新プライマリ(青)がパーティションから取り込みを続けています。ダウン前に失われていたM2・M3が、Kafkaからのリプレイによって新しいプライマリ上で再構築されていく流れです。

最後にflushが走ると、M1・M2・M3のすべてがRemote storeへアップロードされ、永続化されます。図のRemote storeは「M1, M2, M3 persisted」となり、障害前の状態が完全に復元されたことを示します。Remote storeにデータが残っていなくても、Kafka側にメッセージが保持されている限りbatch start pointerからリプレイして取り戻せる、というのがPull-Based Ingestionの回復モデルです。

レンジクエリによる重複スキップ


レンジクエリの役割をもう一段はっきりさせるため、offsetを巻き戻すKafka rewindの例を見ます。1つのパーティションにM1からM4の4メッセージがあり、取り込みをM2から始めたとします。batch start pointerは2で、エンジン初期化時にレンジクエリ(batch start pointer以上のoffset)を実行します。図1ではインデックスがまだ空なので永続化済みoffsetも空です。M2・M3を処理してflushすると、コミット済みデータは{M2, M3}になり、ここで取り込みを一時停止します。
図2では、offsetを1まで巻き戻してM1へ戻ります。巻き戻しに伴ってエンジンが再初期化され、batch start pointerは1になります。再びレンジクエリを実行すると、batch start pointer=1より先でインデックスに存在するoffsetは{2, 3}と判明します。これが「すでに処理済み」の集合になります。
図3で取り込みを再開すると、M1を処理した後にM2へ進みますが、M2は永続化済みoffsetに含まれるためスキップします。M3も同様にスキップし、まだ処理されていないM4にたどり着きます。コミット済みデータは{M1, M2, M3}となり、巻き戻しても重複処理が起きないことが分かります。レンジクエリがexactly-onceを支える要として働いているのです。

External Versioningと運用管理API

External Versioning


Pull-Based Ingestionには、exactly-onceの取り込み以外にもいくつかの機能があります。その一つがドキュメント単位のexternal versioning(外部バージョニング)です。これは順序の乱れによる不整合を防ぐためのものです。
Kafkaのようなストリーミングシステムでは、複数のレイヤーで順序の入れ替わりが起こり得ます。たとえばプロデューサーがメッセージのバッチを書き込む際、一部が失敗してリトライされると、その分だけKafkaトピック上で順序が前後します。さらに私たちはクロスリージョンレプリケーションを行っているため、リージョンをまたぐ過程でも順序が乱れるコーナーケースが生じます。external versioningはこうした状況を吸収します。メッセージのバージョンが古い場合、その更新は適用されません。
バージョンはKafkaメッセージのversionフィールドで指定します。図の例では {"_id":"1", "_version":"1", "_source":{"name": "n1", "age": 25}} のように、ドキュメントごとに _version を持たせています。同じ _id に対して、より新しいバージョンだけが反映される形です。
エラーハンドリングはDROPとBLOCKの2つのポリシーに対応しています。DROPは取り込みに失敗したメッセージをそのまま破棄し、次のメッセージへ進みます。メッセージを欠落させても支障のないユースケース向けです。一方、メッセージを落とせないユースケースにはBLOCKを使います。BLOCKではエラーが発生したメッセージを成功するまで無期限にリトライし、それが片付くまで次のメッセージには進みません。なお一時的な失敗(transient failures)についてはどちらのポリシーでもリトライが行われます。

Ingestion management API


運用のためのIngestion management APIも用意しています。取り込みを一時停止するpause API、再開するresume API、そして現在の取り込み状態を取得するget current ingestion state APIです。
resumeにはoffsetをリセットするオプションを付けられます。過去のoffsetへ戻すことも、未来のoffsetへ飛ばすこともできます。このリセット機能が運用上とくに役立ちます。たとえば障害が発生して大きなラグ(遅延)が積み上がってしまい、その遅延分を捨ててでも追いつきたい場合、このAPIで最新のoffsetまでスキップすればラグを素早く解消できます。
これらのAPIの詳細はPull-based ingestion management API のドキュメントにまとまっています。

Uberでの位置づけとロードマップ、試し方


Pull-Based IngestionをUberが採用する最大の理由は、レジリエンシーの向上にあります。プロデューサーとコンシューマーを分離できるため、OpenSearchクラスターが何らかの理由でダウンしたり、書き込みブロックがかかってwriteを受け付けられなくなったりしても、データはKafkaトピックに書き込まれ続けます。クラスターが復帰したら、そこから追いつけばよいという形です。
スライドの3つのダッシュボードがこの効果を示しています。左のグラフでは書き込みトラフィックが13:50付近で50K req/sから100K req/sへスパイクしています。従来のpush型では、このスパイクに対してスレッドプールが飽和し、中央のグラフのようにFailure Rateが100%近くまで跳ね上がります。一方pull型では、右のグラフのIngestion lagが一時的に最大1分強まで増えるだけで、失敗は起きません。ラグはその後解消され、最終的に追いつきます。スパイクが失敗ではなく遅延として吸収される、これがbackpressureを自然にこなせるという意味になります。

今後のロードマップには複数の項目が並んでいます。現在はシンプルさを優先してfull-document upsertのみをサポートしていますが、partial updates(一部フィールドだけの更新)への対応を予定しています。
スループットの観点で大きいのがconcurrent poller supportです。Writer側はすでにマルチライターに対応している一方、コンシューマー側は依然としてシングルコンシューマーであり、ここがボトルネックになり得ます。これを並列化し、さらにflexible shard to partition mappingsを組み合わせることで、複数のKafkaトピックから取り込んだり、1つのシャードが複数パーティションから消費したりできるようになります。priority-aware ingestion(優先度に応じた取り込み)も、この複数トピック対応を前提とした機能です。
offline index buildも検討中です。これはSparkジョブなどでsource of truthを読み取り、クラスターの外でOpenSearchのインデックスを直接構築するという発想です。構築済みのインデックスをクラスターにデプロイすると、Pull-Based Ingestionの仕組みによってどのoffsetから取り込みを再開すればよいかを正確に把握できます。これにindex rolloverも加わります。

Cloud-native OpenSearch


ロードマップの中でも特に方向性を示すのがcloud-native OpenSearchです。これはリーダーを持たないall-active構成の取り込みモードで、shared-nothingなデータノード群、つまりノード間で状態を共有しない構成を前提とします。Pull-Based Ingestionはこの構成を素直に成立させます。
スライドの図では、上段にWriterとなるデータノード群、下段にReaderとなるノード群が分かれて配置され、Kafka TopicからのWritesは上段が受け、ClientからのReadsは下段が処理します。両者はetcdとRemote storeを介してメタデータとデータをやり取りします。ここでの肝は、同一シャードに対して複数のWriterを持てる点です。各Writerは同じKafkaパーティションから消費できます。
マルチライターが効くのは安全なrolling upgradeとrestartです。新しいバイナリを1つのWriterノードに展開してそのノードが落ちても、もう一方のノードが取り込みを続けるため、fresh dataの提供が途切れません。アップグレードが終わってノードが戻れば、どのoffsetから再開すべきかを正確に知っているので、追いついてすぐに最新データの提供に復帰します。設計はissue #17957で議論されています。

手元で試す


Pull-Based Ingestionは手元のラップトップで手軽に試せます。テスト用に用意されたfile-basedプラグインを使えば、KafkaやKinesisを立ち上げる必要はありません。
手順は単純です。{"_id":"1", "_source":{"name": "n1", "age": 25}} のようなメッセージをファイルに数件書き込み、./gradlew run -PinstalledPlugins="['ingestion-fs']" を実行するだけです。これでpull型の取り込みモードの挙動を一通り確かめられます。詳しい手順はingestion-fsプラグインのREADMEにまとまっています。

より深く触れたい場合の参照先として、OpenSearch公式ドキュメントに加え、コードリファレンスとしてingestion-kafkaプラグインとingestion-kinesisプラグインが挙げられています。この機能へのコントリビューションに関心がある人にとって、これらが出発点になります。

Q&A


質疑では、性能数値、特にindexing latencyに関する質問が出ました。
ベンチマークについては、OSB(OpenSearch Benchmark)フレームワークが持つevent logデータで計測しています。マルチライター構成では、単一パーティションから1シャードあたりおよそ35,000〜40,000メッセージ/秒を処理できるという結果でした。ただし前述のとおりWriterは並列でもコンシューマーはシングルスレッドのままなので、ボトルネックは現状コンシューマー側にあります。この上限に達したときのために、concurrent poller supportを計画しており、複数パーティション・複数トピックを活用できるようにする狙いです。
indexing latencyの具体的な数値は手元にないとしつつ、translogを持たない分だけ原理的には速くなるはずだという回答でした。Luceneのインデックスライターに渡してデータをインデックスするだけの軽量な処理になるためです。一方push型はスレッド数に制限がないので、スレッドプールが飽和するまで並行リクエストを送り続けられ、スループットの上限という点では性質が異なります。

Q&A

質問1: インデックスのレイテンシなど、Pull-Based Ingestion を導入した場合としない場合の性能数値はありますか。
回答: OSB(OpenSearch Benchmark)フレームワーク上のイベントログデータでベンチマークを行いました。複数の writer を使った場合、単一パーティションから 1 シャードあたり毎秒約 35,000〜40,000 メッセージを処理できています。ただし複数 writer をサポートしていても、consumer は依然としてシングルスレッドであるため、ボトルネックは consumer 側にあります。この上限に達した場合に向けて、複数パーティションや複数トピックを活用できる concurrent poller のサポートを計画しています。

レイテンシの具体的な数値は持ち合わせていませんが、translog がなくなったぶん理論上は速くなるはずです。データを Lucene の index writer に渡してインデックスするだけの軽量な処理になります。一方、Push-Based はスレッド数に制限がなく、スレッドプールが飽和するまで並行リクエストを送れるため、スループットでは現状 Push-Based の方が高くなります。Pull-Based の利点はリソースをより効率的に使える点にあります。

質問2: Kafka のメッセージスキーマと Lucene のインデックスマッピングは完全に一致させる必要がありますか。
回答: 現状は静的マッピングで、動的マッピングはサポートしていません。インデックスマッピングで定義したフィールドは Kafka のメッセージソースに含める必要があります。追加フィールドを与えた場合、インデックス自体は行われますが、インデックスマッピングのクラスタステート更新は行わないため、それらのフィールドはクエリできません。また、API フォーマットは Push-Based と同じ形式に揃えており、ペイロードに update・delete・insert といった operations フィールドを並べてバッチ処理を指定できます。

質問3: 既存インデックスに新しいフィールドを追加するようなスキーマの進化(schema evolution)には対応できますか。
回答: Pull-Based と Push-Based はその点で疎結合になっており、Push-Based と同じ手順でマッピングを更新でき、Kafka トピック上のメッセージは正しく解釈されます。内部的にはスキーマの概念を持ち、ガードレールとしてのバリデーションを OpenSearch の外側で行っています。Kafka トピックへ publish する前に、保存しているスキーマと突き合わせて検証します。たとえばベクトルを publish する際は、データの次元数がスキーマ定義の次元数と一致するかを確認します。こうした高度なバリデーションを将来的に OpenSearch コアへコントリビュートすることも検討事項に挙げています。

質問4: スキーマのバージョニングはどのように扱っていますか。
回答: 内部ではスキーマのバージョニングという概念を持ち、複数のモードを用意しています。特定のプロデューサーに対しては、produce されたメッセージのバージョン番号が定義済みスキーマと一致するかをチェックする厳格なモードを使います。一方で、後方互換性のある変更を許容するより緩やかなモードもあり、新しいフィールドが導入されてもスキーマが捕捉していなければ無視します。社内のユースケースに応じて、アプリケーションオーナーがどちらを使うか選べる柔軟性を提供しています。

質問5: 順序が乱れたメッセージ向けの OCC(楽観的同時実行制御)で使うバージョン情報は、Kafka ネイティブの仕組みですか、それとも OpenSearch 側で処理しているのですか。プロデューサー側で前処理しているのでしょうか。
回答: バージョンはメッセージペイロードの一部であり、Kafka ネイティブのバージョニングではありません。ユーザーはタイムスタンプなど任意のフィールドをバージョンとして使えます。サンプルペイロードでは document ID と source に加えて version フィールドを設定するだけです。OpenSearch はメッセージを consume すると parse し、そこから version を取り出してインデックスを参照し、新しいバージョンか古いバージョンかを判定します。

実運用では、現状フルアップサートのみのサポートであるため、最新の状態をそのまま全体の状態として解釈します。そのため順序の乱れはあまり問題になりません。将来的に部分アップサートに対応すると、たとえば特定フィールドを 5 だけインクリメントするようなケースでフィールドレベルのバージョニングが必要になり、その整合性の解決を検討する必要が出てきます。これはロードマップ上の項目です。

質問6: ドキュメント単位のバージョニングだけで十分なのでしょうか。
回答: バージョニングはドキュメント自体に付与されており、フルアップサートのシステムにとってはこれで十分です。

質問7: パーティションに使うキーは、シャードのプロパティと一致させる必要がありますか。同じノードに振り分けてノード間のルーティングを不要にするためです。
回答: 現状は静的マッピングです。シャード 0 は常にパーティション 0 から読み取ります。シャーディングは外部で行う形になっており、自分でシャードを決めて対応するパーティションへ書き込みます。これにより、対応するシャードがそのパーティションから consume することが保証されます。

質問8: スケールアップしてシャードを分割する必要がある場合はどう扱いますか。
回答: 現状ではシャード分割はサポートしていません。分割が必要な場合は、別のパーティションからの consume を開始する必要があります。リシャーディングはより破壊的な操作で、アプリケーションオーナーとの調整も必要になります。パーティション上で動的マッピングをよりうまく許容する方法は、ロードマップ上の検討項目です。