YandexのClickHouseに基づいて、エラーログのリアルタイム全文検索システムを開発しています

この記事では、ClickHouseと呼ばれるYandexデータベース管理システムに基づいて、エラーログ(またはその他のログ)のインデックス作成および全文検索のシステムを開発する方法について説明します。 ヤンデックスは、最初に基地が閉鎖された とき 、そして彼らがそれを開いた ときに、 Habrの基地について書きました 。 このデータベースは、主に分析とYandex.Metricaサービスの実装を目的としていますが、バッチでデータをロードし、巨大なバッチでデータを削除し、個々の行を更新しない場合に実際に使用できます。



私たちは何をしますか



エラーログによるインデックス作成と検索のためのシステムを実装します。 同時に、すでに中央サーバー(または複数のサーバー)にログを配信し、メッセージテキストをデータベースに既に格納していると考えられます。つまり、次のタイプのデータベースに既にテーブルがあります。



CREATE TABLE Messages ( message_id BIGINT PRIMARY KEY AUTO_INCREMENT, created_ts DATETIME, message_text BLOB )
      
      







このようなログで検索結果をすばやく送信する(つまり、常に時間で並べ替える)方法と、リアルタイムでインデックスを作成する方法を学習します。





ElasticSearch / Sphinx / MySQL / other_solutionを試してみませんか?



ClickHouseが何であり、ClickHouseでどのタスクで解決できるのかを見るのは興味深いようです。 この記事の目的は、ターンキーソリューションを提供するのではなく、人々に概要と思考の糧を提供することです。 Elastic、Sphinxなどは既製の検索エンジンであり、ClickHouseは汎用データベースであり、そこから何でもブラインドできます。 また、ClickHouseに基づいた記事で紹介されている検索システムは、ログによる検索のタスクにSphinxよりもうまく対処し、同時に2種類のインデックス(リアルタイムと通常)を使用する必要がないという意見があります。 経験が異なる場合があるため、このようなシステムを実稼働環境に導入する前に、まずプロトタイプを作成することをお勧めします。



サーバーのインストール



ClickHouse( github )のインストールをシステム管理者に任せるか、何も解決したくない場合や怠け者の場合は、ドッカーから自分でインストールします 。 ソースコードから自分でビルドする場合、 最大30 GBのスペースが必要になります。これを念頭に置いてください。



クライアントのインストール



何らかの理由でシステムにcurlまたはphpがない場合は、それらをインストールします。 以降の例では、curlをベースAPIおよびPHPとして使用して、インデックス作成および検索システムを記述します。



インデックスのデータ構造の準備



原則として、検索エンジンでの全文検索の構造は非常に単純です。 この構造はInverted Indexと呼ばれ、少し簡略化された形式で実装します。 主キーと日付の両方を持つデータに推奨されるデフォルトのエンジンを使用します-MergeTree



 CREATE TABLE FT ( EventDate Date, word_id UInt32, message_id UInt64 ) ENGINE=MergeTree(EventDate, (word_id, message_id), 8192);
      
      







データベースにテーブルを作成するには、次のコマンドを使用できます。



 $ cat create.sql | curl 'http:/hostname:8123/?query=' --data-binary @-
      
      





このコマンドでは、create.sqlファイルに実行する必要があるリクエストが含まれている必要があり、hostnameはClickHouseが発生したホスト、8123はデフォルトのポートです。



上記の構造では、word_idは辞書内の単語のID(後で作成します。word_text=> word_idの対応は辞書に保存されます)、message_idはログテーブル内の対応するエントリのIDです(Sphinxのdocument_idのアナログ)。



MergeTreeエンジンのパラメーター:最初のフィールドEventDateはイベントの日付を持つ列の名前を意味し、2番目の列(word_id、message_id)は主キー(実際には通常のインデックス)を定義し、8192はインデックスの粒度に影響を与える設定です。デフォルトでそれを残します。



MergeTreeは主キーでデータをソートし、日付でデータを分割します。そのため、message_idでソートして特定の日と特定の単語で検索するのは非常に高速です。



辞書の構造を作成します



このインデックスを設定するには、文字列ではなくClickHouseに数値を保存するために必要な辞書型の構造が必要です。 辞書はデータベースに作成できます。MySQLの場合、構造は次のようになります。



 CREATE TABLE Words ( id int(11) unsigned NOT NULL AUTO_INCREMENT, word varchar(150) COLLATE ascii_bin NOT NULL DEFAULT '', PRIMARY KEY (id), UNIQUE KEY word (word) ) ENGINE=InnoDB DEFAULT CHARSET=ascii COLLATE=ascii_bin;
      
      





ASCII比較に注意してください。これにより、すべての単語が英語である場合にテキストインデックスのパフォーマンスを大幅に向上させることができます。 すべてのログが英語になっていない場合は、ビューを修正することお勧めますデフォルトで比較(utf8_unicode_ci)を残すことができます。



インデックス作成プロセス



インデックス作成プロセスを制御し、初期インデックス作成を開始するために、まだインデックスを作成していないメッセージのキューを使用して、MySQLに別のテーブルを作成できます。



 CREATE TABLE IndexQueue ( message_id bigint(20) unsigned NOT NULL DEFAULT '0', shard_id int(11) NOT NULL, PRIMARY KEY (shard_id,message_id) );
      
      







このテーブルに初めてデータを入力するには、次のクエリを使用できます。



 INSERT IGNORE INTO IndexQueue (message_id, shard_id) SELECT message_id, message_id % 4 FROM Messages
      
      







ここで、4は使用するインデクサースレッドの数です。 PHP7では、以下の例のコードは、プロセスあたり約3.5 mb / sのパフォーマンスを、それぞれ4ストリーム、14 mb / sで提供します。 14 mb / sよりも多くのエラーログを書き込む場合は、おそらく緊急にプロダクションを修正する必要があり、フルテキスト検索が少し遅れているという事実には至っていません:)。



インデクサーアルゴリズムは次のようになります。

  1. 指定されたシャードのキュー(IndexQueue)のエントリを表示する
  2. レコードの束を選択し、各メッセージの単語を選択して、message_id => array(word1、...、wordN)の形式の$インデックス配列に入れます
  3. 各単語について、辞書で対応するword_idを見つけ、そのような単語がない場合は追加します
  4. すべてのメッセージのすべての単語のClickHouseインデックスにレコードを挿入します




以下は、キューを解析してインデックスを作成するためのわずかに単純化されたコードです。自宅で使用する場合は、自分で変更する必要があります。



PHPでのインデクサーの実装の簡素化
 const CH_HOST = '<hostname>:8123'; const MAX_WORD_LEN = 150; //   ,    Words $mysqli = mysql_connect(...); //    $limit = 10000; //       $shard_id = intval($argv[1] ?? 0); //   (   ,   ,   0) echo "Indexing shard $shard_id\n"; while ($mysqli->query('SELECT MAX(message_id) FROM IndexQueue WHERE shard_id = ' . $shard_id)->fetch_row()[0]) { $index = ""; $start = microtime(true); $ids = []; foreach ($mysqli->query('SELECT message_id FROM IndexQueue WHERE shard_id = ' . $shard_id . ' ORDER BY message_id LIMIT ' . $limit)->fetch_all() as $row) { $ids[] = $row[0]; } if (empty($ids)) { break; } $message_texts = $mysqli->query('SELECT message_id, `message_text` FROM Messages WHERE message_id IN(' . implode(', ', $ids) . ')')->fetch_all(MYSQLI_ASSOC); $unknown_words = []; $msg_words = []; $total_length = 0; foreach ($message_texts as $msg) { $msg_id = $msg['message_id']; $text = $msg['message_text']; $total_length += strlen($text); $words = array_unique( array_filter( preg_split('/\W+/s', $text), function($a) { $len = strlen($a); return $len >= 2 && $len <= MAX_WORD_LEN; } ) ); foreach ($words as $word) { $unknown_words[$word] = true; } $msg_words[$msg_id] = $words; } if (!count($message_texts)) { $mysqli->query('DELETE FROM IndexQueue WHERE shard_id = ' . $shard_id . ' AND message_id IN(' . implode(', ', $ids) . ')'); continue; } if (!count($unknown_words)) { var_dump($message_texts); die("Empty message texts!\n"); } $words_res = $mysqli->query('SELECT word, id FROM Words WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC); $word_ids = []; foreach ($words_res as $row) { $word_ids[$row['word']] = $row['id']; unset($unknown_words[$row['word']]); } if (count($unknown_words)) { echo "Inserting " . count($unknown_words) . " words into dictionary\n"; $values = []; foreach ($unknown_words as $word => $_) { $values[] = "('" . $mysqli->escape_string($word) . "')"; } $mysqli->query('INSERT IGNORE INTO Words (word) VALUES ' . implode(',', $values)); $words_res = $mysqli->query('SELECT word, id FROM Words WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC)); foreach ($words_res as $row) { $word_ids[$row['word']] = $row['id']; unset($unknown_words[$row['word']]); } } if (count($unknown_words)) { die("Could not fill dictionary\n"); } foreach ($msg_words as $msg_id => $words) { foreach ($words as $word) { //   ,  unix timestamp  message_id      32  $index .= date('Ym-d', $msg_id >> 32) . "\t" . $word_ids[$word] . "\t" . $msg_id . "\n"; } } $ch = curl_init('http://' . CH_HOST . '/?query=' . rawurlencode('INSERT INTO FT FORMAT TabSeparated')); curl_setopt($ch, CURLOPT_POSTFIELDS, $index); curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); $res = curl_exec($ch); if ($res !== "") { die($res . "\n"); } $mysqli->query('DELETE FROM IndexQueue WHERE shard_id = ' . $shard_id . ' AND message_id IN(' . implode(', ', $ids) . ')'); echo "Speed " . round($total_length / 1024 / (microtime(true) - $start), 2) . " KiB/sec\n"; } function INstr(array $values) { global $mysqli; $res = []; foreach ($values as $v) $res[] = "'" . $mysqli->escape_string($v) . "'"; return implode(',', $res); }
      
      









インデックス検索



Elastic、Sphinx、およびその他のソリューションが豊富な検索ランキングアルゴリズムは必要ありません。また、日付によるソートが必要なだけなので、検索は非常に簡単になります。 実際、「hello world 111」というクエリで何かを見つけるには、最初に辞書でword_idを見つけ(それぞれ1、2、3と仮定)、次のクエリを実行する必要があります。



 SELECT message_id FROM FT WHERE word_id IN(1, 2, 3) GROUP BY message_id HAVING uniq(word_id) = 3 ORDER BY message_id DESC LIMIT 50
      
      







探しているすべてのドキュメントにはクエリからのすべての単語が含まれている必要があるため、HAVING uniq(word_id)= 3(uniq(word_id)は通常のSQLデータベースのCOUNT(DISTINCT word_id)に類似している)ここで、3はクエリ内の異なる単語の数です。



message_idによるソートは時間によるソートを意味すると想定しています。 これは、message_idの最初の32ビットに秒単位でUNIX TIMESTAMPイベントを書き込み、後半にイベントのマイクロ秒(存在する場合)と乱数を書き込むことで実現できます。



結果



このソリューションのパフォーマンスをテストするために、3 GB(160万イベント)の容量を持つ開発サーバーからエラーログデータベースを取得し、インデックスを作成しました。 インデクサーは、ストリームごとに3.5 Mb / sのインデックス作成速度を示しましたが、これは私のケースでは十分すぎるほどでした。 現時点では、エラーログによる全文検索にSphinxを使用しているため、同じハードウェア上でほぼ同じ条件で動作するため、これら2つのソリューションのパフォーマンスを大まかに比較できます。 Sphinxのインデックス作成(少なくとも非リアルタイムインデックスの作成)はシングルコアあたり数倍高速ですが、SphinxのインデクサーはC ++で記述され、PHPで記述されていることに注意してください:)。



ClickHouseの(そして明らかにSphinxも)最も重いクエリを計算するために、インデックスで最も人気のある単語を見つけることにしました。

 $ echo 'SELECT word_id, count() AS cnt FROM FT GROUP BY word_id ORDER BY cnt DESC LIMIT 5' | curl 'http://hostname:8123/?query=' --data-binary @- 5 1669487 187 1253489 183 1217494 159 1216255 182 1199507
      
      







リクエストには130ミリ秒かかり、合計8,600万レコードを記録しました。 (テストマシン上の2つのコア)。



したがって、上位5つを取得してword_idを通常の単語に変換すると、実行のリクエストは「php wwwrun _packages ScriptFramework badoo」になります。 これらの単語はほとんどすべてのメッセージに含まれており、インデックスから安全に除外できますが、検索パフォーマンスをテストするために残しておきました。



ClickHouseでリクエストを実行します。

 SELECT message_id FROM FT WHERE word_id IN(189, 159, 187, 5, 183) GROUP BY message_id HAVING uniq(word_id) = 5 ORDER BY message_id DESC LIMIT 51
      
      







Sphinxの同様のクエリ:

 SELECT message_id FROM FT WHERE MATCH('php wwwrun _packages ScriptFramework badoo') ORDER BY message_id DESC LIMIT 51
      
      







リクエストの実行時間(両方のデーモンが両方のコアを使用してリクエストを実行でき、すべてがRAMに配置されます):



ClickHouse: 700ミリ秒

スフィンクス: 1500 ms



Sphinxは結果をランク付けする方法を知っていますが、私たちのシステムはそうではないので、Sphinxでの時間はかなり良いです。 リクエストの実行中に、両方のデーモンが〜600万ドキュメント(単語あたり120万ドキュメント)の結果を結合し、適度な2コアで実行しなければならなかったことを忘れないでください。 適切な構成では、この(わずかに合成的な)テストで示される時間が場所を変える可能性がありますが、それでも私は個人的に結果に非常に満足しており、ClickHouseはログによるリアルタイム検索の構築に非常に適していると安全に言えます。



記事を最後まで読んでくれてありがとう。楽しんでいただければ幸いです。



PS私はYandexの従業員ではなく、Yandexとは一切関係ありません。実際のタスクのためにデータベースを試してみたかっただけです:)。



参照資料



  1. ClickHouse Webサイト
  2. オープンソースの前のHabrの記事
  3. Habréのオープンソース記事
  4. Github
  5. ClickHouse Docker




* UPD:* uniqは近似値であるため、 uniqUpTo(N)関数を使用することをお勧めしますが、65536未満の要素数で非常に正確な結果が得られます。



All Articles