Bering Note – formerly 流沙河鎮

情報技術系のこと書きます。

OpenSearchCon North America 2025「Uberが開発したOpenSearchのPull-Based Ingestionを深く理解する」- Varun Bharadwaj, Uber & Yupeng Fu, Uber

OpenSearchCon North America 2025のセッション「Deep Dive Into OpenSearch Pull-Based Ingestion at Uber」をまとめます。

www.youtube.com
資料
https://static.sched.com/hosted_files/opensearchconna2025/95/%5BExternal%5D%5BOpenSearchCon%2025%20US%5D%5BPull-based%20Ingestion%5D.pdf

スピーカー


このセッションは、UberでOnline Data Platformを担当しているYupeng Fu氏と、Search Platform TeamのSoftware EngineerであるVarun Bharadwaj氏によって行われました。
Yupeng氏は、Uberでデータベース、Kafkaストリーミング、検索などのオンラインデータシステムを担当するエンジニアであり、OpenSearchのTechnical Steering Committeeメンバーおよびコアメンテナーとしても活動しています。Varun氏は、今回紹介されるPull-Based Ingestion機能の開発に直接貢献したエンジニアです。

Uberにおける検索技術とオープンソースへの貢献

Uberにおける検索の重要性とリアルタイムなデータ特性



Uberの検索機能は、同社サービスを支える柱として極めて重要な役割を果たしています。
特にUber Eatsでは、100万を超えるレストランがプラットフォーム上に存在し、平均的なユーザーがアプリやウェブサイトを開いた際に、1000以上のレストランや料理から選択することになります。ユーザーは必ずしも何を求めているか明確ではない場合が多く、料理の種類、レストラン、価格、配送速度、信頼性、食事制限、パーティーサイズ、好みなど、複雑な意思決定プロセスを経る必要があります。そのため、検索機能はユーザーが何を食べたいか、何を購入したいかを決定する上で重要な支援を提供しています。
Uberのビジネスとデータの最もユニークで興味深い特性は、それらが本質的にリアルタイムであることです。タクシーを呼んだり、オンラインで食べ物を注文したりする行為は、すべてリアルタイムのアクションとして実行されます。ドライバーの車両が移動している間や、レストランのメニューが利用可能になったり利用できなくなったりする際など、大量の情報がリアルタイムで生成され続けます。この特性により、プラットフォームには常に最新のリアルタイム情報を組み込む必要があります。データの鮮度を保ちながら、同時に顧客に低遅延の体験を提供することのバランスを取ることが、このプラットフォームの核心的な課題となっています。
Uber Ridesについても、地理空間に関わる検索がコアになります。どこかに行きたいとき、目的地を入力すると、リアルタイムのオートコンプリートが期待できます。これは、現在の場所、時間帯からの信号を考慮した完全な検索とセマンティック検索の両方をトリガーし、より良い推奨を提供しています。

RAGとGenAIにおける検索の役割


GenAI時代の到来により、RAGやエージェントプラットフォームを活用した新しいユースケースが数多く登場しています。これらの新しい技術においても、検索はコンポーネントとして重要な役割を果たしています。ユーザーサポートの改善において、検索機能は「Learn and Improve(学習と改善)」「Predict/Understand(予測と理解)」「Resolve(解決)」という循環的なプロセスの中心に位置しており、ユーザーエクスペリエンスの継続的な向上を支えています。
リアルタイムおよびストリーミングデータは、最新の知識と情報を取り込む上で極めて重要です。例えば、サンフランシスコ空港にいるユーザーが自分の荷物の所在を知りたい場合、航空会社からのリアルタイム情報が必要となります。同様に、Uberにとってもユーザーがサンフランシスコに着陸したことを把握できれば、より適切な推奨を提供することが可能になります。このような要求は、検索プラットフォームに対して新たな技術的要件を課すことになり、リアルタイムデータの処理と活用がますます重要になっています。

Uberにおける検索プラットフォームの要件


Uberの検索プラットフォームには、複雑で多様な要件が存在します。まず、データは常にストリーミングされ続けており、これにはユーザーがアプリケーションをクリックしたり、コンテンツの上にカーソルを置いたりする際に生成されるユーザー生成アクションが含まれます。さらに、ログやメトリクスなど、内部的にシステムによって生成される情報も絶えずフローインし、プラットフォームへの重要なシグナルとして機能しています。
より良い顧客体験を提供するためには、高速なクエリパフォーマンスが不可欠です。一部のユースケースではリアルタイムの鮮度が求められる一方で、レストランのカタログのように頻繁に更新されないようなユースケースでは、ニアリアルタイムで十分な場合もあります。このような異なる要件のバランスを取り、最適化されたエクスペリエンスとシステムのシンプルさを同時に実現することが求められています。
さらに、水平スケーラビリティ、高可用性とオールアクティブネス、そしてコスト効率性という非機能要件も重要です。これらの要件は、後述するPull-Based Ingestionを検討し採用する上で非常に重要な要因となっています。これらすべての要件を満たすことが、Uberの規模で検索プラットフォームを運用する上での大きな挑戦となっています。

Uber検索プラットフォームのデリバリー向けアーキテクチャ


Uberのデリバリーユースケース向けに構築された検索プラットフォームアーキテクチャは、読み取りと書き込みの分離というアプローチを採用しています。このアーキテクチャでは、IngesterがKafkaからデータをプルし、デリバリーチームによって構築および維持されている個別のインジェスションパイプラインと連携して動作します。Ingesterはインデックスを作成し、データをソートして特定の順序を維持した後、インデックスをRemote Storageにアップロードします。そして、Searcherがこれらのインデックスをダウンロードして検索処理を実行する仕組みになっています。
このアーキテクチャには、システムを効率的でスケーラブルにするための重要な技術要素が複数組み込まれています。Kafkaからのストリーミングインジェスション、Reader/Writer分離、フェデレーションとACL制御のためのGateway、より高い並列処理を実現するVirtual Shards、セグメントレプリケーションのためのRemote Storage、そしてブートストラップとバックフィルのためのOffline Buildという機能が統合されています。これらの技術により、システムは効率的でスケーラブルとなり、デリバリーアプリケーションの厳しい要件に合わせて最適化されています。

高可用な検索プラットフォーム


Uberの検索プラットフォームは、2つのリージョンにわたるアクティブ・アクティブ構成で運用されています。この設計により、1つのリージョンでインシデントが発生した場合でも、もう一方のリージョンへのフェイルオーバーが可能となっています。このアーキテクチャは検索だけでなく、Uberのインフラストラクチャ全体に適用されている重要な設計思想です。空港に向かう顧客がUberのサービスを常に利用できることを期待するように、このアクティブ・アクティブ設計はインフラストラクチャの基盤に深く根ざしています。
このアーキテクチャの実現において、Kafkaが重要な役割を果たしています。各リージョンには独自のKafkaクラスターがあり、プロデューサーはローカルのKafkaクラスターにデータを送信します。その後、レプリケーションサービスがクロスリージョンレプリケーションを実行し、データをAggregated Kafkaクラスターに集約してグローバルビューを作成します。検索システムやその他のシステムは、このKafkaストリームを直接のソースとして利用し、グローバルビューを提供してサービングに使用できます。
この設計により、2つのリージョン間で最終的な整合性が保証され、インシデント発生時にはアプリケーションの観点から矛盾したデータビューを見ることなく、非常にシームレスな体験が提供されます。Kafkaがストリーミングデータを維持し、クロスリージョンレプリケーションを設定できることで、異なるサービスやデータベース間での整合性管理の複雑さの多くが解決されています。このクロスリージョン整合性ソリューションを構築するための強力な前提条件となるKafkaを活用し、Kafkaデータから読み取るためのネイティブ統合と機能を備えるように検索技術をカスタマイズすることが、Pull-Based Ingestionの重要性につながっています。

Project Sunrise:オープンソースエコシステムとの協業


検索システムのような最先端の技術領域では、業界のイノベーションのペースに遅れを取らないことが極めて重要です。Uberが長年にわたって学んだ教訓の一つは、持続可能性とより速いイノベーションを推進するために、オープンソースエコシステムとの協業が不可欠であるということです。
Uberは、オープンソースコミュニティと協力してイノベーションオープンソースプロジェクトに貢献することを重視しています。この方針に基づき、OpenSearchに内部検索プラットフォームからのアーキテクチャの革新を貢献するという決定を下しました。

UberOpenSearchのパートナーシップ


昨年、UberOpenSearch Software Foundationに創設メンバーとして参加することを発表しました。UberAWSの間でOpenSearch開発における戦略的パートナーシップが確立され、OpenSearch 2025ロードマップを共同で作成しています。
この1年間で、UberOpenSearchに複数の貢献を行ってきました。gRPCのサポートや、本セッションで紹介されているPull-Based Ingestionがその一例です。

Pull-based Ingestion

github.com

Pull-based Ingestionの利点


Pull-based Ingestionにより、複数の利点が得られます。
まず、アクティブ・アクティブ設定をサポートできるようになります。
また、プロデューサーとコンシューマーを分離できるため、インジェスションスパイクを緩和できます。Kafkaを介して大量のインジェスチョンが発生した場合でも、コンシューマーはレイテンシを損なうことなく、独自のペースで、独自の処理能力でメッセージをプルできます。
さらに、トランザクションログをスキップすることでより高いインジェスションスループットを実現すると共に、メッセージのリカバリが可能となります。
これらの機能により、OpenSearchクラスターのアクティブアクティブセットアップが実現されています。

Pull-based Ingestion概要


今日、Pull-based IngestionはKafkaとKinesisをサポートしており、セグメントレプリケーションとリモートストアで動作し、外部バージョニングによるドキュメントのアップサートを実現しています。

概念と用語の整理


ストリーミングシステムとOpenSearch API全体の概念と用語について整理しておきます。
イベントのコレクションは、Kafkaでは「Topic」、Kinesisでは「Stream」と呼ばれ、OpenSearchでは「Source」と呼ばれます。
これらはKafkaではPartitionに分割され、同様の概念がKinesisではシャードと呼ばれ、これらはOpenSearchでもシャードとして扱われます。すべてのOpenSearchのシャードは1つのKafka PartitionまたはKinesis シャードからコンシュームします。
データの単位はKafkaではMessage、KinesisではRecordと呼ばれ、OpenSearchでもMessageとして扱われます。Partition内の固有の識別子については、Kafkaでは「Offset」、Kinesisでは「Sequence Number」、OpenSearchでは「Pointer」と呼びます。

アーキテクチャ


緑色のボックスが既存のOpenSearchコンポーネント、紫色のボックスがPull-based Ingestion用に新たに導入されたコンポーネントを示しています。上部にはKafkaまたはKinesisのストリーミングソースが配置されています。
OpenSearchノード内にPull-based Ingestion用に「IngestionEngine」と呼ばれる新しい仕組みが導入されました。このエンジンはStreamPollerを保持しています。これはストリーミングソースを読み取り、データをLuceneにインデックス化できるコンシューマの実装です。
また、IngestionPluginインターフェースも追加されました。これにより、さまざまなストリーミングソースをサポートするように拡張でき、現在KafkaおよびKinesis Ingestionプラグインがこのインターフェースを拡張しています。
データ側については、indexにはオフセット情報を主とする追加のメタデータが含まれます。また、「ingestion_batch_start」と呼ばれるポインタがあり、これはコミット情報として保存されます。
indexを作成する際は、左側に示す方法で定義します。index設定内には新しく導入された「ingestion_source」があり、ストリーミングソースのタイプ(ここではKafka)、トピック名、bootstrap serverを定義します。
このリクエストはOpenSearchノードに送信され、ノード内のIndexServiceがリクエストを受け入れて処理します。IndexServiceはindexとシャードを作成し、シャード作成処理の一部でエンジンを初期化します。ここでIngestionEngineが作成され、それがStreamPollerを作成し、ストリーミングソースからデータをプルし、Luceneへのインデックス化を開始します。 

RFC
github.com

データフロー

ここからは、Kafkaのメッセージが処理される流れを詳しく解説します。Streaming Source, Stream Consumer, In-memory blocking queue, Message Processor, Ingestion Engine, Lucene Indexの順序で見ていきます。

Streaming Source


Streaming Sourceは現時点ではKafkaまたはKinesisであり、耐久性のあるメッセージングシステムです。
これらのメッセージングシステムがトランザクションログの役割を果たすため、従来のOpenSearchとは異なり、Pull-based Ingestionはトランザクションログに依存しません。
何らかの理由でシャードがダウンし、回復したい場合、既存のindexを見て、再生を開始したいoffsetを特定し、失われたメッセージを回復するために巻き戻して処理(rewind)します。
すべてのOpenSearchのシャードは、Kafka partitionまたはKinesis シャードと1対1の静的なマッピングを持っています。例えば、2つのシャードを持つindexがある場合、シャード0はKafka partition0から読み取り、シャード1はpartition1から読み取ります。

Stream Consumer


Stream Consumerはコンシューマの実装(KafkaまたはKinesis)を保持し、データを定期的にプルする無限ループを行います。メッセージをバッチで消費し、いくつかの検証を行います。
検証の一部として、メッセージが以前に処理されたことがあるかどうかをチェックしようとします。もしすでに処理されたメッセージがある場合、それをスキップして重複した計算を避けます。そのようにして、イベントのexsactly-onceな処理を保証しようとします。検証が完了すると、イベントを In-memory blocking queueに書き込みます。

In-memory blocking queue


In-memory blocking queueは、コンシューマースレッドとプロセッサースレッドを分離するために主に役立ちます。これにより、コンシューマーとライタースレッドを独立してスケーリングできます。また、受信メッセージをパーティション化し、複数のライタースレッドを持つこともできます。
例えば、図の下部に示されているように、2つのライタースレッドを持ちたい場合、2つのIn-memory blocking queueのそれぞれに独自のライタースレッドがあり、そこからメッセージを読み取り、Luceneにインデックス化します。この設計により、マルチスレッドでの書き込みのためのパーティション化されたIn-memory blocking queueが可能になります。

Message Processor


Message Processorはイベントを処理するメインのライタースレッドです。In-memory blocking queueからメッセージを読み取り、イベントをパースし、Luceneドキュメントを形成してメタデータを追加します。このメタデータは主にoffset情報です。
documentとともに、対応するメッセージのoffsetをPointFieldとStoredFieldとして保存します。PointFieldはシャードリカバリ中にRange queryを実行するために使用され、StoredFieldは値の検索のために使用されます。

Ingestion Engine


Ingestion EngineはLuceneインデックスのインターフェースです。プッシュベースモデルと同様に、create、index、deleteの3つの操作モードをサポートしています。
トランザクションログが必要ないため、Ingestion Engineはトランザクションログ操作をスキップするためにno-opバージョンのトランザクションログマネージャーを使用します。
コミット情報として最新の成功したイベントのoffset「batch start pointer」を追跡しており、シャードを回復したい場合のリカバリポイントとして用いられます。

blocking queueの詳細


2つのライタースレッドと、2つのblocking queueのインスタンスがある状態を考えます。
Kafka topicにdocument 1から8までのメッセージが順番にあると仮定します。Stream Consumerは、検証が完了すると、メッセージからdocument IDを抽出し、パーティション化して、これらのブロッキングキューのいずれかに書き込みます。
ここで注目したいのは、document IDに基づいてパーティション化されている点です。
これにより、同じdocumentに対して複数のイベントがある場合、すべてのメッセージが同じblocking queueに入ることを保証します。従ってシーケンシャル処理が保証され、競合状態のシナリオに陥ることはありません。
もう1つ重要な点は、各ライタースレッドがこれらの2つのblocking queueからメッセージを独立して処理する点です。そのため、異なるペースでメッセージを処理させることができます。この例では、ライタースレッド1はドキュメント3を処理中で、ライタースレッド2はドキュメント8を処理中です。
何らかの理由でシャードがダウンし、回復したい場合、以前に消費した最新のメッセージはドキュメント8ですが、ドキュメント3も処理されていないため、8から再開することはできません。そこで、すべてのライタースレッドは、処理された最新の成功したメッセージと、現在処理中のメッセージを追跡します。Ingestion Engineがコミットを作成するたびに、すべてのライタースレッドと調整し、現在処理中のメッセージを取得し、その最小値を取得して、コミットとともにbatch start pointerとして保存します。この場合、コミットに保存されるbatch start pointerは3になります。

シャードリカバリ
シャードリカバリの詳細

シャードのリカバリについてさらに詳しく見ていきます。

各documentには処理された最新のメッセージのKafka offsetをshard pointerとして格納します。そしてライブコミットデータとともに、batch start pointerを格納します。
シャードが初期化されると、Range queryを実行し、index内のbatch start pointer以降のすべての永続化されたoffsetを取得しようとします。これは重複処理を避けるために使用されます。
図では、Kafka partitionにM1、M2、M3の3つのメッセージがあり、プライマリシャードがM1を処理してRemote Storeに永続化した後、M2とM3を処理中にダウンした場合、レプリカが新しいプライマリに昇格し、batch start pointerから再開することで、データの整合性を保ちながらリカバリを実行できることを示しています。

シャードリカバリの流れ

シャードリカバリのプロセスを詳しく説明します。
3つのメッセージを持つKafkaパーティションがあり、プライマリシャードとレプリカがある状況を考えます。レプリケーションはRemote Storeを介して行われます。

まず、メッセージM1の処理を開始し、メッセージM1を正常に処理したと仮定します。その後、フラッシュ操作が行われ、M1はRemote Storeにアップロードされ、永続化され、レプリカでも表示可能になります。


次にM2、そしてM3の処理に移動しますが、この段階ではM2とM3はまだRemote Storeで利用できません。



ここでプライマリシャードがダウンした場合、レプリカを新しいプライマリに昇格させる必要があります。このレプリカ昇格プロセスには以下の手順が含まれます:

  • Ingestion Engineを初期化する
  • コミットから最新のBatchStartPointerを取得する
  • BatchStartPointer以降の永続化されたポインタを取得するためにRange queryを実行する
  • インジェスションを再開する



レプリカはIngestion Engineを再初期化し、使用を開始します。Ingestion Engineの初期化の一部として、最新のコミットを検索し、batch start pointerを取得します。次に、batch start pointer以降のインデックス内のすべてのオフセットを取得するためにrange queryを実行します。
新しいプライマリがオンラインに戻ると、メッセージM2を再生します。なぜなら、それがbatch start pointerであり、M2を処理し、次にM3に移動して残りのメッセージを処理するからです。最後にフラッシュが行われると、すべてのデータはRemote Storeで利用可能になります。

Kafkaのoffset再処理(rewind)


kafkaのoffsetを巻き戻して再処理するシナリオを考えてみましょう。
図1では、1つのパーティションに4つのメッセージがあり、M2からインジェスチョンを開始すると仮定します。ここでのBatchStartPointerは2です。エンジンが初期化されると、BatchStartPointer以上のrange queryを実行します。
ここでindexは空なので、永続化されたoffsetは空です。M2とM3を処理し、この時点でフラッシュ操作があると、M2とM3はコミットされます。ここでインジェスチョンを一時停止します。
図2に進み、offsetをoffset=1に巻き戻します。ここで、巻き戻しの一部としてエンジンを再初期化します。ここで、BatchStartPointerは1になります。range queryを再度実行すると、BatchStartPointerは1なので、インデックスで利用可能な1以降のオフセットは、この場合{2, 3}となります。
図3に進み、インジェスチョンを再開します。M1を処理し、M2に進むと、M2はすでに永続化されたオフセットにあることに気づくので、M2をスキップします。同様にM3もスキップし、処理されていないM4に到達して処理を行います。この仕組みにより、メッセージの重複処理を避けながら、必要に応じてKafkaのオフセットを巻き戻して再処理することが可能になります。

その他の機能


Pull-based Ingestionは、その他にも重要な機能を提供しています。
まず、documentレベルでの外部バージョニングをサポートしており、順序外の不整合を防止するのに役立ちます。Kafkaのようなストリーミングシステムを使用している場合、プロデューサーの種類や再試行ポリシーによっては、複数のレイヤーで順序外のシナリオが発生する可能性があります。
例えば、メッセージのバッチを書き込んでいるときに一部が失敗し、再試行することになり、Kafkaトピックで順序外のイベントが発生する可能性があります。さらに、クロスリージョンレプリケーションを行っているため、順序外のイベントが発生する可能性のあるコーナーケースがあるかもしれません。
これをPull-based Ingestionと外部バージョニングを使用することで解決します。Kafkaメッセージのバージョンフィールドを設定することで、外部バージョニングを設定できます。
エラー処理については、現在DROPとBLOCKの2つのエラーポリシーをサポートしています。DROPポリシーは、失敗があった場合にメッセージをドロップします。メッセージを気にしない場合は、このポリシーを使用してドロップし、次のメッセージに進むことができます。一方、ユースケースによってはそれができない場合、BLOCKポリシーがあります。エラーが発生した場合、成功するかドロップされるまで、そのメッセージを無期限に再試行します。

インジェスチョンを管理するためのAPIも豊富に用意されています。
インジェスチョンの一時停止API、そして再開APIがあります。再開操作の一部として、オプションでコンシューマーをリセットすることもでき、過去または未来の新しいoffsetを設定できます。また、現在のインジェスチョン状態を取得するためのAPIも用意しています。
ここでのリセットオプションは非常に便利です。例えば、停電が発生し、何らかの理由で大きなラグが蓄積してしまい、そのラグをスキップしたい場合、このAPIを使用して最新のオフセットにスキップし、ラグをより早く追いつくことができます。
docs.opensearch.org

UberにおけるPull-based Ingestion

UberにとってPull-based Ingestionの主なポイントは、回復力の向上です。
メッセージの再実行と回復機能が必要であり、プロデューサーとコンシューマーを分離することに大きな価値があります。何らかの理由でOpenSearchクラスターがダウンした場合、書き込みブロックが発生し、書き込みを受け入れられない場合でも、Pull-basedモデルでは書き込みをKafkaトピックで受け入れ続けることができ、クラスターがオンラインに戻ったらいつでも追いつくことができます。
スクリーンショットに示されているように、従来のプッシュベースモデルでは書き込みが急増すると、スレッドプールが飽和状態になり、障害が発生しますが、Pull-basedモデルでの唯一の影響はラグが増加することであり、最終的には追いつきます。これにより、バックプレッシャーの処理も適切に行われ、システム全体の安定性が向上します。

ロードマップ


Uberにおける開発のロードマップにあるいくつかの項目について説明します。まず、部分的な更新をサポートしたいと考えています。現在、シンプルにするために完全なドキュメントのアップサートのみをサポートしています。
以前議論したように、マルチライター機能がありますが、コンシューマー側はまだシングルコンシューマーであり、より高いスループットが必要な場合はボトルネックになる可能性があります。そこで、同時ポーラーのサポートも検討しています。また、柔軟なシャードからパーティションへのマッピングも検討しており、これにより複数のKafkaトピックから消費し、ユースケースに役立つ場合は1つのシャードが複数のパーティションから消費できるようになります。
さらに、優先度を考慮したインジェスチョンにも期待しており、これにはやはり複数のKafkaトピックのサポートが必要です。インデックスロールオーバー機能も計画されています。
オフラインインデックス構築機能も重要な項目です。オフラインインデックス構築のアイデアは、Sparkジョブのようなものを使用して、Source of Truthを読み取り、クラスター外で直接OpenSearchインデックスを構築することです。Pull-based Ingestionを使用すると、このインデックスをクラスターにデプロイした際に、どこからインジェスチョンを再開すればよいかを正確に知ることができます。

クラウドネイティブなOpenSearch」におけるPull-Based Ingestionの役割


次世代のOpenSearchの構想として、「クラウドネイティブなOpenSearch」があります。これについて、Pull-based Indexの観点から説明します。
クラウドネイティブはリーダーレスのオールアクティブインジェスチョンモードであり、複数のデータノードがあり、これらのデータノードは互いに状態を共有しません。また、複数のライターをサポートできます。同じシャードに対して複数のライターを持つことができ、これらのライターのそれぞれが同じKafkaパーティションから消費できます。Pull-based Indexは、ここでアーキテクチャを簡素化します。
単一のシャードに対して複数のライターサポートがある場合の利点として、安全なローリングリスタートとアップグレードを行うことができます。新しいバイナリをロールアウトしたいと仮定します。これらのライターノードの1つにロールアウトし、それがダウンした場合、他のノードは更新を消費し続け、最新のデータを提供できます。アップグレードが完了し、ノードがオンラインに戻ると、どこからインジェスチョンを再開すればよいかを正確に知り、追いついて最新のデータを提供し始めます。

github.com

試してみよう



テスト目的のために、ファイルベースのプラグインをPull-based Ingestion用に作成しました。ラップトップでKafkaやKinesisを設定する必要はありません。ファイルにメッセージを追加し、このプラグインでGradleを実行するだけで、Pull-based Indexモードを試すことができます。
github.com
ドキュメントとプラグインへのクイックリンクも用意しました。ingestion-kafkaプラグインとingestion-kinesisプラグインのコードリファレンスが提供されており、この機能に貢献することに興味がある方にとって、これが開始点となるでしょう。
docs.opensearch.org
github.com
github.com

QA

Q&Aセッション

質問1:パフォーマンスについて、特にインデックス作成のレイテンシについて、Pull-basedと非Pull-basedのベンチマーク結果があれば教えていただけますか?

回答:OSBフレームワークのイベントログデータでベンチマークを試しました。複数のライターを使用すると、現在単一のパーティションから1シャードあたり約35,000〜40,000メッセージ/秒を処理できます。ただし、コンシューマーはまだシングルスレッドであるため、ボトルネックはコンシューマー側にあります。更なるスケールのため、同時ポーラーサポートを導入する予定です。レイテンシについては、トランザクションログがなくなったため理想的には高速になるはずです。

質問2:Kafkaメッセージとインデックスについて、これらは正確に一致する必要がありますか、それともインデックスに何らかの変換がありますか?

回答:現在、静的マッピングをサポートしており、動的マッピングはサポートしていません。インデックスマッピングで定義したフィールドはすべてKafkaメッセージソースの下に与える必要があります。追加のフィールドを指定した場合、それらをindex化しますが、マッピングの更新は行わないため、クエリすることはできません。プッシュベースと同じAPI形式に従っており、更新、削除、挿入といった同じ操作フィールドを定義します。

質問3:フィールドの進化が必要な場合、新しいフィールドを追加するにはどうすればよいですか?

回答:プッシュベースと同じ手順でマッピングを更新できます。内部的にはスキーマの概念があり、Kafkaトピックにパブリッシュする前にバリデーションを行います。スキーマのバージョニングという概念もあり、厳格なモードと寛容なモードがあり、内部のユースケースに応じてアプリケーションオーナーが選択できる柔軟性を提供しています。

質問4:Kafkaの順序外メッセージについて、バージョンを使用するとのことですが、Kafkaレコードにこのバージョン情報を提供するカスタムロジックがあるのですか、それともOpenSearchにネイティブに組み込まれて処理されるのですか?

回答:バージョンはメッセージペイロードの一部として含まれており、Kafkaネイティブのバージョニングではありません。OpenSearchは、メッセージを消費するとそれをパースし、バージョンを抽出し、インデックスを検索して新しいバージョンか古いバージョンかを判断します。一般的にはタイムスタンプを使用します。現在は完全なアップサートのみを受け入れているため、順序外の問題はあまり心配ありませんが、部分的なアップサートが導入された場合は、フィールドレベルのバージョニングが必要になります。

質問5:パーティションに使用するキーは、シャードのキーと一致する必要がありますか?

回答:現在は静的マッピングです。シャード0は常にパーティション0から読み取ります。従来のプッシュベースとは異なり、シャーディングは外部で行われます。シャード分割が必要な場合は、別のパーティションから消費を開始する必要があります。リシャーディングは、アプリケーションオーナーとも調整する必要がある、より破壊的な操作になります。動的なパーティションマッピングについてもロードマップに載っています。