OpenSearchCon North America 2025のセッション「Accelerating OpenSearch With Streaming: Apache Arrow, Flight, DataFusion and gRPC」をまとめます。

- スピーカー
- OpenSearchクエリエンジンの進化
- なぜストリーミングが必要なのか
- Apache Arrow:カラムナーデータ処理の基盤
- Apache Flight:高速データ転送の仕組み
- OpenSearch集計フレームワークの詳細
- Flightによるパフォーマンス改善の成果
- ストリーミングがどのように改善をもたらしたか
- 集計処理の制約と課題
- ストリーミング集計の実装
- カラムナーフォーマットの重要性:Arrow、Parquet、Vortex
- ベクトル化実行とSIMDの活用
- 将来に向けた構想
- 今すぐ試せる機能
- Q&Aセッション
スピーカー

Saurabh Singh氏はAWSのSoftware Development Managerで、OpenSearchプロジェクトのエンジニアリングリードを務めています。大規模分散システムにおける複雑な課題の解決に情熱を注いでおり、特にOpenSearchプロジェクトのコア検索、リリース、ベンチマーキング分野のエンジニアリング活動を統括しています。
Harsha Vamsi氏はAWSのSoftware Development Engineerで、コア検索チームに所属しています。検索パフォーマンスの向上、新しい検索機能の構築、そして大規模な検索インフラストラクチャを進化させるための新しいツールの統合に注力しています。
OpenSearchクエリエンジンの進化

従来のOpenSearchは、リクエストがコーディネーターノードに到達すると、個々のデータノードに分散され、それぞれのデータノードでシャード内のデータが処理されます。各データノードはヒットや集計バケットといった結果を生成し、すべての処理が完了してから、それらをまとめてコーディネーターノードに送り返していました。コーディネーターノードは受け取った結果をマージしてReduce操作を行い、最終的にクライアントに応答を返すという流れです。この方法は長い間うまく機能してきましたし、現在も機能しています。
しかし、この分散と収集のモデル、つまりバッチ処理と収集のモデルには問題がありました。分析システムでデータが増加すると、システムはメモリに関する問題を示し始め、コーディネーターでの最終的なマージ処理が計算上のボトルネックになる可能性があります。
そこで登場したのがストリーミングパラダイムです。この新しいアプローチでは、データノードが処理を完了するまですべての結果を蓄積するのではなく、処理中の部分的な結果を継続的にコーディネーターノードに送信します。コーディネーターノードは、受信したデータを逐次的にマージしていくことができるようになりました。
このストリーミング方式への移行を推進している主な要因は、集計のパフォーマンス改善です。また、このアプローチは結合のような他のユースケースを実現するための基盤としても機能しています。OpenSearch 3.2では、このストリーミング集計フレームワークが初めて導入され、ターム集計で実際に活用されました。
なぜストリーミングが必要なのか

従来のクエリ実行スタイルが存在する中で、なぜストリーミングという新しいアプローチを選択するのか、その理由を見ていきましょう。
カーディナリティの高いフィールドでターム集計を実行する場合、各データノードで実行中に巨大なメモリマップが作成されます。大量のメモリバケットに加えて、プライオリティキューなどの他の構造も作成されるため、メモリ使用量が大幅に増加します。
ストリーミングでは、このメモリを長時間保持する必要がなくなります。代わりに、より小さなデータのチャンクとしてバッチをコーディネーターノードに送り返すことができます。コーディネーターノードに滞留するデータが少なくなることで、ガベージコレクションがより効率的に行えるようになります。JVMコレクターが収集すべきオブジェクトの数が減少し、結果としてGCポーズが減少し、より安定したテールレイテンシを実現できます。
さらに重要な利点として、ワークロードの分離とより良いスケーリングが可能になります。データノードをよりスリムな構成にし、検索と取得に専念させることができます。一方、コーディネーターノードにはより効率的にリソースを割り当てることができます。これにより、コーディネーターをマージやメモリ集約型、CPU集約型の操作を行う専門的な役割に特化させることができます。
OpenSearch 3.2での実際の結果として、レイテンシが約30〜50%削減され、スループットが2倍に向上しました。
また、このアプローチの更なる利点として、クロスインデックス結合のような機能をプラグインとして実現する余地も生まれています。
Eコマース検索とパーソナライゼーションのユースケース

実際のユースケースを見てみましょう。ブラックフライデーセールを行っている小売Webサイトで、人々がワイヤレスヘッドホンを検索しているシナリオを考えます。
このユースケースでは、クエリを効率的に実行するために2つの要件があります。まず、カタログインデックスに基づいて先行入力の提案を取得する必要があります。そして、ユーザーのプロファイル動作にマッピングする必要があります。ユーザーは特定の地域にいて、特定の好みを持っている可能性があるためです。
現在のOpenSearchでこれを実現するのは困難です。カタログとユーザーの行動が2つの異なるインデックスに存在する場合、クロスインデックス結合を実行できないからです。SQLレベルで何らかの処理はできるかもしれませんが、それはコーディネーターノードで多くの結果を事前に計算し、それらをマージしてReduceしようとすることを意味します。これは非効率的で遅い処理になります。また、同時実行性や必要なメモリ量の面でも課題があります。
ファセットカウントなどを維持したい場合、ストリーミングによってより効率的に処理できます。データノードからストリーミングされる高速なオートコンプリート結果を、ユーザーの行動に関する他のインデックスからの応答と一緒にコーディネーターノードでストリーミングできるようになります。コーディネーターで起こっている作業を専用のワーカーに分散させることも可能です。
この結果、データノードをスケーリングすることなく、結合を効率的にスケールアウトできるようになります。最終的に効率が向上し、検索クエリのレイテンシが大幅に改善されます。このような負荷を処理する際にクラスターに過度な負荷をかけることもなくなります。
オブザーバビリティと分析のユースケース

分析側のユースケースについても同様の例を見ていきましょう。さまざまな地域にわたる独自のIPを分析し、24時間ウィンドウで1分ごとに視覚化ダッシュボードを取得しようとしているシナリオを考えます。もしUS East 1に10,000のIP、US West 2に10,000のIPがあった場合、従来の方法ではこれらすべてのメモリマップをデータノード上に作成し、すべての結果をまとめてコーディネーターに送り返す必要があります。これはデータノードとコーディネーターの両方で大きな配列を作成することを意味します。
ストリーミングでは、これらのオブジェクトをメモリに保持する必要がなくなります。準備ができたらすぐに送信でき、コーディネーターはオンザフライでマージできます。リアルタイムダッシュボードの体験においても重要な改善があります。ダッシュボードはクエリが完全に完了するのを待つ必要がなく、マージされた結果が到着するとすぐにオブジェクトのロードを開始できます。つまり、データが利用可能になり次第、早期に表示を開始できます。
もう1つの重要な側面は、分散環境でデータノードの負荷が異なる場合の対応です。1つのデータノードが遅い場合でも、後から結果を送信できます。その間、ダッシュボードは先に到着した結果の表示を開始でき、最後のピースが到着するのを待ちながら段階的に更新していくことができます。
これらはログとトレース間の結合を行いたいトレース分析のユースケースや、メタデータとメトリックの結合を行いたい場合に役立ちます。これらすべてのユースケースは、ストリーミングデータでこれらの結合をサポートできれば潜在的に解決できます。
Apache Arrow:カラムナーデータ処理の基盤

これらの核となる重要な要素がApache Arrowです。アナロジーで説明すると、データが音楽のようなものだとすれば、Arrowはすべてのミュージシャンや楽器が、それぞれに専用の記譜法を必要とせずに理解できる楽譜のようなものです。
Arrowの重要な側面は、相互運用性だけでなく、データがカラムナーのインメモリ形式で保存されることにあります。たとえば、100万個のログエントリがあり、ステータスコードというフィールドがある場合、これらの100万個のステータスコードはすべて連続したメモリ位置に格納されます。同様に、タイムスタンプやメッセージなどの他のフィールドも同じように格納されます。
なぜこれが重要なのでしょうか。データを順次保存することで、行ベースのストアで通常発生するようなポインターを追いかける処理よりも、桁違いに高速なデータ取得が可能になります。最新のCPUはこのような順次データの処理を得意としており、キャッシュプリフェッチ操作に役立ち、非常に効率的です。さらにSIMD命令というボーナスも得られます。このデータ配置により、一度に1つの値を読み取る代わりに、一度に8値または16値といった複数の値を同時に操作できるようになります。
Arrowはゼロコピーと相互運用性も提供します。OpenSearchがJava形式で計算した結果を、Pythonクライアントでそのまま消費できるようになります。これはDataFusionなどのライブラリに転送することも、Jupyterなどの他のツールで使用することも可能です。本質的に、これは普遍的で言語に依存しない形式であるため、データのシリアル化とデシリアル化の必要性を排除します。
これは私たちが導入しようとしている新しい概念ではありません。他のデータベースも採用し、成功を収めてきた既知の、実績のある概念です。Spark、Pandas、Snowflakeなどは、Arrowを非常に効率的に使用してきた例です。これにより、OpenSearchも他のビッグデータシステムと同じ土俵に立ち、外部エンジンとの連携の可能性が開かれます。
Apache Flight:高速データ転送の仕組み

インメモリ表現であるArrowについて説明しましたが、このインメモリオブジェクトを転送できるようにする側面も非常に重要です。
そこで登場するのがFlightです。これはArrowのネットワーク側の技術です。名前が示すように、Arrowがデータであるとすれば、Flightはそれを効率的に送信するための高速道路や空路のようなものです。
FlightはgRPC上に構築されており、Arrowバッチを送信するために最適化されています。重要な特徴は双方向性と並列ストリームのサポートです。先ほどの結合のユースケースで考えると、カタログインデックスに10個のシャード、ユーザーの行動インデックスにさらに10個のシャードがある場合、それらすべてを並列でストリーミングできます。双方向性により、コーディネーターはデータをフェッチする速度を制御でき、データノードに過度な負荷をかけることを防ぎます。
これまでの進化を見ると、従来のJDBCやODBCと比較して、データベース取得操作において最大10倍の速度で実行されることが多くの論文で示されています。具体的には、JSONで100MB/秒規模だった転送速度が、バイト形式で1GB/秒規模まで向上できるという話をしています。
今後、OpenSearchでFlightエンドポイントを公開することを予定しています。現在は内部ノード通信にのみ使用していますが、将来的に_flightのようなデータエンドポイントが提供されれば、クライアントはArrowデータを直接受け取ることができるようになります。JSON形式のシリアル化やデシリアル化を行う必要がなくなり、より効率的なデータ転送が可能になります。また、大規模な集計においても、各ノードが独自のマージを行い、Flightストリームを使用して単一のコーディネーターまたは直接クライアントに送信することで、水平スケーリングが実現できます。
OpenSearch集計フレームワークの詳細

OpenSearchがストリーミング形式、そしてFlightとArrowをどのように使用しているかの基礎を理解したところで、3.2でリリースされた具体的な集計のユースケース、特にターム集計について深く掘り下げていきましょう。まず、集計とは何か、そしてこのユースケースでターゲットとしている集計の種類について説明します。
OpenSearchには大きく分けて3つの主要な集計タイプがあります。
最初のタイプはバケット集計です。これにはターム集計、範囲集計、ヒストグラム、データヒストグラムなどが含まれます。
バケット集計は、キーに基づいてデータを個々のバケットにグループ化する仕組みです。例えば、自動車ディーラーが「異なる車種ごとのカウントを教えてください」と尋ねた場合、「ElantraがX台、SonataがX台」といった結果が返されます。バケット集計の強力な点は、他のバケット集計とネストできることで、多層的な集計が可能になることです。
メトリック集計は、SQLの世界でいう「このフィールドのカウントや最小値、最大値を教えてください」といった処理に相当します。メトリック集計はバケット集計と組み合わせることができ、「このバケット内での最小値と最大値は何か」といった分析が可能になります。
パイプライン集計は他の2つとは性質が異なりますが、複数の集計パイプラインを持つことができ、1つの集計の出力が次の集計の入力として流れる仕組みです。移動平均や移動導関数などの計算が可能になります。

OpenSearchがサポートする様々な集計タイプの中で、ターム集計は特に頻繁に使用されます。シンプルなターム集計の例として、response.keywordフィールドでの集計があります。ここではsizeを0に設定してヒットは取得せず、集計結果のみを取得し、トップ10のバケットをバケットカウント値の降順で返すように指定しています。
Flightによるパフォーマンス改善の成果

詳細について話す前に、達成した結果をお見せしたいと思います。
左側には従来の集計フレームワークのCPU使用率が表示されています。2つのデータノードと1つのコーディネーターノードがあり、集計が実行されると、集計はCPU集約型であるため、データノードのCPUが急上昇し始めます。クエリのワークロードが完了すると、それらは元に戻ります。右側のストリーミング集計では、データノードのCPUが50%少なくなっています。非常に印象的な結果です。

CPUだけでなく、他の面でも改善が見られました。JVMメトリックを見ると、これは多くのユーザーが気にする非常に人気のあるメトリックですが、従来の集計ではJVMは非常にスパイクしており、平均してJVM使用率の約75%を占めていました。一方、ストリーミングではJVMはもはやスパイクしておらず、より均一になっています。また、ヒープ使用率も約30%少なくなっています。

特に強調したい改善はレイテンシメトリック、つまり応答時間です。左側の従来の集計フレームワークでは、最初に長く大きなスパイクが見られます。なぜ最初にこのような大きなスパイクが見られるのかは後ほど説明します。時間が経つにつれて下がってきて、このクエリの平均は2.5秒程度になります。一方、ストリーミングでは、このクエリの平均レイテンシは230ミリ秒です。これはレイテンシが8倍改善されたことを意味します。
ストリーミングがどのように改善をもたらしたか
これらの結果をどのように達成したかを理解するには、まず今日の集計がどのように機能するか、特にターム集計がどのように機能するか、そしてボトルネックがどこにあるかを理解する必要があります。


先ほどと同じターム集計クエリの例に戻りましょう。クエリがコーディネーターノードに到達すると、シャードにファンアウトされ、各シャードはそのノードで使用可能なデータに対して集計を処理します。持っているアグリゲーターのタイプに応じて、どのような集計を使用するかが決定されます。
各シャードはローカルデータで集計を実行します。シャードによってデータ量が異なるため、より多くのデータを持つシャードもあれば、より少ないデータを持つシャードもあり、それに応じて異なるレイテンシが見られることがあります。各シャードのアグリゲーターは集計の実行を担当し、シャードがノード上のデータを処理すると、結果をコーディネーターノードに返します。コーディネーターノードでは、受け取った結果をマージし、一連のReduce操作を実行します。
Ordinalsの役割とメモリへの影響

この集計プロセスで重要な役割を果たしているのが、Ordinalsと呼ばれるメモリ集約型のオブジェクトです。Ordinalsはフィールドの値を数値データにマッピングします。
Ordinalsには2種類あります。Luceneセグメントレベルで機能するセグメントOrdinalsと、OpenSearchシャードレベルで機能するグローバルOrdinalsです。この区別が必要な理由は、各セグメントが独自の値のセットを持つためです。シャードは当然、そのシャード内のすべてのセグメントに対してより大きな値のセットを持ちます。そのため、Ordinalsのグローバルマッピングを持つ必要があります。
例えば、ステータスコード200はOrdinal 1に、404はOrdinal 2に、503はOrdinal 3にマッピングされます。これらの数値Ordinalsに基づいて集計を行う方が、値自体に基づいて集計を行うよりも大幅に高速です。この例では値は3桁のステータスコードにすぎませんが、例えばキーワードフィールドが30文字あるような場合、そのデータに対して集計を続けるのではなく、1桁の数値に減らすことができます。
デフォルトでは、グローバルOrdinalsは最初の集計リクエスト時に遅延して構築されます。先ほどお見せしたチャートで最初にレイテンシが急上昇したのは、そのリクエストでグローバルOrdinalsがまだ構築されていなかったためです。最初のリクエストがノードに到達したとき、グローバルOrdinalsマップを構築する必要があり、これは非常に時間のかかるプロセスです。その後、その値をフィールドデータキャッシュに保存し、後続のリクエストではそれを使用できます。そのためレイテンシは2.5秒程度に戻ります。それでもかなり遅いですが、最初の10秒のレイテンシよりははるかに優れています。
Big Arrays - メモリを大量消費するデータ構造

集計が使用するもう1つの重要なオブジェクトがBig Arraysです。OpenSearchのコードベースで作業したことがある方は、これらのBig Arraysが散在しているのを目にしたことがあるでしょう。
Big Arraysは集計のデータストアとして機能し、非常にメモリ集約的です。これらはデータを集約するためのコアデータ構造です。
Big Arraysは本質的に配列ですが、OpenSearch特有の実装になっています。通常の配列と異なる点は、動的かつ効率的に成長およびサイズ変更できることです。Big Arraysは特定のサイズに初期化され、必要なデータ量に応じて動的に成長します。また、ヒープを意識するように設計されており、通常の配列よりもメモリを効率的に管理できます。強参照、弱参照を保持でき、必要なときにガベージコレクションが行われます。Big Arraysは、少なくともターム集計が主に動作するコアデータ構造です。
シャードレベルのReduce - CPU集約的な処理

集計フレームワークのもう1つの重要な部分がシャードレベルのReduceです。集計はシャードレベルで行われ、シャードがデータを計算した後、その結果をコーディネーターに返す前にReduce処理を行います。これは優先度キューを保持するため、CPU集約型の操作になります。
例えば、10個の結果を要求している場合、現在使用している式はサイズ×1.5なので、実際には15個の結果を計算します。各シャードは15個の結果をコーディネーターに返します。しかし実際には、シャードには15個以上の結果が存在します。シャードには多くのデータがあり、集計は多くの結果を生成するからです。
これらすべての結果は優先度キューに保存され、新しい受信結果は優先度キューの最下位と比較されます。新しい結果がその優先度キューに入る価値があると判断された場合、必要に応じて挿入または更新されます。これにより、集計データスペースの大部分を排除できます。データノード上に数千もの集計バケットを作成していても、トップサイズ×1.5のバケットだけを保持すればよいのです。
この方法により、コーディネーターノードに送信するデータ量を削減できます。ただし、これには欠点もあります。すべての結果をコーディネーターに送信しているわけではなく、結果のごく一部しか送信していないため、精度が失われる可能性があります。
コーディネーターレベルのReduce - CPU集約的な処理

集計フレームワークの最後のステップは、コーディネーター自体でのReduceです。すべてのシャードが結果をコーディネーターに返すと、コーディネーターは興味深い状況に直面します。10個の結果しか要求していないのに、各シャードから15個の結果を受け取っているのです。そこで、これらすべての結果を取り、再び10個の結果にマージする必要があります。これがコーディネーターレベルのReduceです。
このReduce処理を行うと、一連のバケットが返されます。各バケットには、求めるタームであるキーと、そのタームを含むドキュメントのカウントが含まれています。デフォルトでは、集計フレームワークは10個の結果を返しますが、サイズを500と指定すれば500個の結果が返されます。
これで、集計フロー処理の全体像が見えてきました。
集計処理の制約と課題

それでは集計の改善すべき点、制約は何でしょうか。
まず、ドキュメントスペースが多いほど、計算量が多くなり、レイテンシも大きくなります。大量のデータをフィルターするクエリフィルターがあり、カーディナリティが非常に高いフィールドで集計クエリを実行している場合、多くの計算とレイテンシが発生するのは当然です。
私が示した例はシンプルなトップレベルのターム集計にすぎません。実際には、ほとんどのユーザーはシンプルなトップレベルの集計を実行するだけでなく、複数の集計、多層集計、サブ集計、ネストされた集計などを実行します。CPUスペースの複雑さを理解するには、これらすべてのカーディナリティを掛け合わせる必要があります。フィールドA、B、Cがある場合、それらすべてを掛け合わせる必要があり、非常に大きな集計スペースになります。
データがコーディネーターに到達してシャードに送信されると、シャードは値を計算します。しかし、シャードが作業をしている間、コーディネーターノードは実際には何もしていません。シャードが結果を返すのを待っているだけです。シャードは結果を返すのに数秒、場合によっては数十秒かかることもあります。そのため、コーディネーターノードはあまり多くの作業を行っていません。

結果として、たとえばCPUが200%に急上昇すると言ったことが起きます。多くの場合、これは高価な集計クエリが実行されているためです。また、ヒープ使用率も高くなります。
これらのOrdinals、これらのBig Arraysはすべてヒープメモリに格納されており、ヒープを消費しています。ドキュメントスペースが大きいほど、集計が複雑になるほど、Big Arraysのサイズが大きくなり、ヒープ使用量も大きくなります。
レイテンシの問題も顕著です。通常の検索クエリはミリ秒単位、数十ミリ秒単位、数百ミリ秒単位で実行されますが、集計クエリは突然数秒、場合によっては数十秒かかることもあります。数分かかるクエリを見たこともあります。

これらの厳しい制約に加えて、いくつかのソフトな制約もあります。ログに「too many buckets exception」のようなエラーが表示されることがあります。これは複数のネストされた集計を実行しようとしているためです。または「circuit breaker exception」が表示されることもあります。これはヒープサイズのしきい値(70%、80%、90%など)を超えたためで、Big Arraysがメモリ内で非常に大きく成長し、メモリを消費しすぎて、他のすべてに十分なメモリがなくなるためです。その結果、積極的なガベージコレクションが行われることになります。
ストリーミング集計の実装

これらの問題に対して私たちはストリーミング集計を導入しました。ストリーミング集計には2つの部分があります。
1つはカスタムストリーミングトランスポート、もう1つはカスタムストリーミングアグリゲーターです。
なぜストリーミングトランスポートが必要なのでしょうか。現在の転送はデフォルトのNettyプロトコルで動作します。Saurabhが話したように、新しい転送はFlightを使用し、gRPC上で動作します。そのため、Arrowベースのメッセージをワイヤー経由で送受信できる新しいドロップイン転送が必要です。すべてのシリアル化およびデシリアル化ロジックはArrowによって処理されます。
転送だけでは十分ではありません。フレームワークにも変更を加える必要がありました。そのため、新しいカスタムストリーミングアグリゲーターがあります。私たちはそれをStreamStringTermsAggregatorと呼んでいます。以前はGlobal Ordinals String Terms Aggregatorでした。これについては後ほど詳しく説明します。メモリを節約し、レイテンシを改善するために、いくつかの新しいアルゴリズム変更も組み込むことができました。

ストリーミングトランスポート自体は、新しいFlightサーバーです。OpenSearchには9200ポートで実行されているRESTサーバーがあり、9300ポートで実行されている転送サーバーがあり、そして今9400ポートで実行されているArrow Flightサーバーがあります。新しいArrow Flightクライアントもあります。サーバーがある場合、当然クライアントも必要です。
サーバーはArrowメッセージをクライアントに送信し、クライアントはそれらのメッセージを読み取り、デコードできます。Flightの素晴らしい点の1つは、双方向ストリームがあることです。これはNettyの動作とは少し異なります。Nettyでは、リクエストを送信し、応答を待ち、応答を受け取り、接続を閉じます。ここでは、ストリームを開始します。
ストリームはノード間で開かれ、「次のデータのバッチをください」と言い続け、ストリームが完全に尽きるまで新しいデータのバッチを要求し続けます。これにより、ストリームのバックプレッシャーが軽減されます。1つのノードで処理されているバッチが多すぎる場合、「待って、新しいバッチを要求したくない」と言うことができます。この新しいトランスポートは、将来的にデフォルトのトランスポートを置き換える、ドロップインの代替品として設計されています。

ストリーミングアグリゲーターでも、多くの重要な改善が行われています。まず大きな変更点として、グローバルOrdinalsを完全に廃止し、セグメントレベルのOrdinalsのみを使用するようにしました。これにより、グローバルOrdinalsを構築したり、セグメントOrdinalsをグローバルOrdinalsにマッピングしたりする高価な操作が不要になりました。
さらに重要な変更として、シャード単位ではなくセグメント単位でデータをフラッシュするようになりました。OpenSearchシャードは複数のセグメントで構成されていますが、従来はすべてのセグメントの処理が完了してからシャードレベルでReduceを行っていました。新しいアプローチでは、各セグメントが計算を終了し次第、即座にコーディネーターにデータを送信します。
優先度キューとシャードレベルのReduceも廃止しました。代わりに、すべての結果を直接コーディネーターに送信します。この変更には2つの大きなメリットがあります。まず、計算量が減少し、シャードの処理完了を待つ必要がなくなりました。次に、従来はサイズ×1.5の結果のみを送信していたため精度に問題がありましたが、現在はすべての結果を送信するため100%正確な結果が保証されます。

もう1つの決定的な改善は、Big Arraysのメモリ管理です。従来はメモリ内で際限なく成長していましたが、現在は一定サイズまで使用した後、すぐに破棄するようになりました。その結果、従来の集計では1つのクエリで4GBのヒープメモリを消費していたのに対し、ストリーミング集計では200MBに抑えられています。これは約20倍のメモリ効率改善を意味し、単一のターム集計でこの結果ですから、将来的にはさらなる改善が期待できます。
カラムナーフォーマットの重要性:Arrow、Parquet、Vortex

これらの改善全体がどこに向かっているのかを見てみましょう。Apache Arrowは長い間、デファクトスタンダードのカラムナーストアとして使用されてきました。データレイクを運用している場合は、おそらくすでに使用しているでしょう。カラム形式の利点は明確です。類似したデータを一緒に保存するため圧縮効率が高く、メタデータを使って不要な処理をスキップし、必要なデータに直接アクセスできます。
Apache Parquetはオンディスクのカラムナーファイル形式で、特にデータレイクで効率的なストレージを実現します。興味深いことに、Parquetは基本的に「Arrow-on-disk」と考えることができます。つまり、OpenSearchがParquetファイルをArrow経由で直接取り込めるようになれば、重いJSON再インデックス処理を回避できる可能性があります。
Vortexは次世代のフォーマットで、インメモリとオンディスクの両方の分析に最適化された最先端のフォーマットを目指しています。内部的にはArrow IPCを基盤としながら、さらに速度と拡張性を追求した実装になっています。OpenSearchがこの統合によってエコシステムに進化していることで、将来Vortexがより人気を得た場合でも、この基盤の上で容易に統合できるようになります。
現在のOpenSearchはLuceneのフォーマット(転置インデックスとdoc values)でデータを保存していますが、将来的には一部のデータの保存にParquetやVortexのようなフォーマットを活用する可能性があります。カラムナーフォーマットの利点は明確で、優れた圧縮、チャンクごとのmin/maxインデックスによるスキップスキャン、そして分析クエリパフォーマンスの大幅な向上が実現できます。
これは孤立した動きではありません。データ処理の世界全体がこの方向に向かっており、OpenSearchもこの流れに合わせて進化しています。より良い標準化とパフォーマンス向上のために、次のレベルへと進んでいるのです。
ベクトル化実行とSIMDの活用

ベクトル化クエリ実行は、一度に1つのドキュメントを処理するのではなく、バッチ(ベクトル)単位でデータを処理する手法です。Arrowのカラムチャンクを使用することで、エンジンは例えば1024個の値に対してフィルターや関数を一括で適用できます。これは1024回の個別の関数呼び出しよりもはるかに高速です。
SIMD(Single Instruction, Multiple Data)の実践的な例を見てみましょう。数値フィールドに対する範囲フィルターを考えた場合、従来は値を1つずつチェックしていましたが、SIMDを使用すると8個または16個の値をCPUレジスタにロードし、AVX2やAVX-512などの命令を使って1つの命令でチェックできます。
カラムナーフォーマットはSIMDに最適です。Arrowのフォーマットは、SIMD処理のためにデータを適切に整列させています。配列の途中に可変長のデータが現れることがないため、予測可能な処理が可能になります。
この最適化による利点は明確です。スループットの面では、100万レコードのスキャンがSIMD最適化により素朴なループと比較して3〜5倍高速になる可能性があります。さらに、ベクトル化されたコードでは分岐(if文)が最小限に抑えられるため、予測可能な分岐処理が実現します。実際の例として、DuckDBはベクトル化実行により70〜80%のメモリ使用量削減と大幅な速度向上を実現しています。
将来に向けた構想

ストリーミングクエリの実装は始まりに過ぎません。将来的には、より多くのOpenSearch機能がストリーミングパラダイムを採用することが期待されます。ストリーミングインデックスパイプラインや、継続的なクエリ更新など、様々な可能性を検討しています。ダッシュボードがリアルタイムでデータをストリーミングし続けることも可能になるでしょう。
SIMDの活用もクエリ時の高速化だけでなく、インデックス作成時にも適用できます。高速なトークン化やk-NNのベクトル計算などがその例です。よりハードウェアに特化した実行への移行により、新しい可能性が開かれます。
特に重要なのがマルチフォーマットインデックスです。数年後には、OpenSearchインデックスがLuceneセグメントだけでなく、様々なフォーマットをサポートする可能性があります。エンジンは用途に応じて適切なフォーマットエンジンを選択できるようになるでしょう。例えば、ホットなテキストデータにはLucene、コールドな分析データにはDataFusionやVeloxといった使い分けです。
ArrowとFlightにより、OpenSearchはかつてないほどエコシステムに開かれます。Jupyter notebookからArrow Flight経由で直接OpenSearchにクエリを実行できる未来を想像してみてください。シリアル化なしでデータフレームの結果を直接取得できるようになります。

この進化は、Amazon、Uber、Intel、Metaなどからのオープンソースコントリビューションによって推進されています。検索と分析の境界を押し広げる取り組みが続いています。関連する講演として、「Using OpenSearch as a Database with DataFusion」もぜひご覧ください。
今すぐ試せる機能

ストリーミングターム集計はOpenSearch 3.2でGAとなっています。使い方は非常に簡単です:
opensearch.experimental.feature.transport.stream.enabledをtrueに設定- リクエストヘッダーに
stream=trueを追加
これだけで、この講演で紹介したパフォーマンス改善を体験できます。ドロップインプラグインとしてもインストール可能で、特定のフローで使用して結果を確認できます。
Q&Aセッション

Q: 集計のパフォーマンス数値について教えてください。
A: CPU、メモリ、レイテンシの各側面で改善が見られています。平均で約8倍のレイテンシ改善を達成しました。CPU使用率、ガベージコレクション、JVMメモリなど、すべてのメトリックで大幅な改善が見られます。3.2では、メトリック集計を伴うターム集計またはトップレベルのターム集計のみをサポートしています。
Q: サブ集計はまだサポートされていないとのことですが、ロードマップにはありますか?
A: はい、ロードマップにあります。ターム集計はバケット集計ですが、現在は他の種類のバケット集計や、互いにネストされた複数のバケット集計はサポートしていません。3.3と3.4で、ほとんどの集計タイプに拡張する予定で、デフォルトで有効にする計画があります。今後の数回のリリースで、クエリパラメーターの変更やフラグの有効化なしに、すぐに使えるデフォルトの機能になります。
Q: ストリーミングの欠点について教えてください。
A: 従来はシャードがすべての結果を1回のホップでコーディネーターに返していましたが、今では各セグメントから個別にデータが返されるため、ネットワーク帯域幅の使用量が増加します。シャードに30〜40のセグメントがある場合、その影響は顕著です。また、ストリーミングがすべてのケースで有効とは限りません。カーディナリティが非常に低いフィールドや、複数のバケット集計がある場合は適切ではない可能性があります。従来はサイズ×1.5の結果のみを返していましたが、今はすべての結果をコーディネーターに返すため、コーディネーターの負荷が高くなるリスクがあります。そのため、ガードレールを追加し、トップレベルから始めて徐々に拡張していく段階的なアプローチを採用しています。