![](https://habrastorage.org/files/872/58f/66a/87258f66a0ee4ed79cf1a648ac6a8f72.jpg)
この春、HP Verticaデータベースがイベントパターンマッチングを使用してクエリを作成する機能について学びました。 いわゆるイベントパターンマッチングは、ivi.ru製品でのユーザーの行動を分析するタスクの下にありました。 支払いファネルに対処し、デバイス上の問題のある領域を検索し、トラフィック分析をさらに掘り下げることにしました。 私たちのチームは、MixpanelとLocalytics(イベントとそのプロパティに基づいています)の分析の実装方法が非常に気に入っているため、それらから多くのアイデアが取り入れられました。
何が起こっているの?
従来、分析には、他のほとんどのプロジェクトと同様に、Google Analyticsを使用していました。 ある時点で、私たちのボリュームでは、データサンプリングが想像を超える割合に達しました-サンプルはオーディエンスの0.5%未満で構築されました。 これにより、小さなサンプルを扱うことができなくなりました-サンプルがまったく見えないか、エラーが壊滅的でした。 さらに、GAでは内部コンテンツデータを大量に処理することは不可能であったため、詳細な分析は不可能でした。この事実は、独自のシステムの開発を開始する機会となりました。 Grootが生まれました-内部分析ivi.ru。
Grootが満たさなければならない要件のリストから始めました。
- サンプリングが不足しているため、すべてのデータを未加工で保存する必要があります。
- クロスプレート性。 サイトに加えて、モバイルプラットフォームとスマートテレビ用の非常に人気のあるアプリケーションがあるため、インターネットに接続され、アプリケーションがインストールされていれば、システムはアイロンからでもデータを収集できるはずです。
- 迅速にスケーリングする機能。
- SPOFの欠如;
- 簡単なセットアップと展開。
建築
HP Verticaコラムベースに加えて、Apache KafkaとApache Stormを使用することを決定しました。これにより、Javaの素晴らしくて恐ろしい世界を発見しました。
Apache Kafka-パブ/サブシステム。 通常のpub / sub実装との主な違いは、サブスクライバーがメッセージを最後からではなく、最初または途中から読み始めることができることです。 このソリューションにより、加入者が働いていないときにデータ損失を心配する必要がなくなります。
Apache Stormは、大量のデータを計算するための分散システムです。 一般に、Stormについて長い間話すことができます。 すぐに使えるkafkaとの統合、システムを水平方向に拡張できること、そしてかなり速い速度が気に入っています。
平面図
一般に、システムは次のように機能します。
- クライアントは、イベントに関するJSON情報を含むリクエストを送信します。
- フラスコWebサーバーは、非同期でイベントのパケットをkafkaに送信します。
- ストームはkafkaからの新しいメッセージを常にピックアップします。
- ストームでは、トポロジはイベントを解析、解析し、Verticaでバッチリクエストを構築し、データベースを保存します。
最初の厄介なステップ
![](https://habrastorage.org/files/112/293/0c0/1122930c097147f1b31a9ab063761ec1.jpg)
最初のバージョンは非常にうまく機能しませんでした。 より正確には、データをkafkaに送信するのに問題はありませんでした(すべてがそのまま動作します)。 また、Apache Stormをいじる必要がありました。これは、トポロジをJavaで記述する必要があったためです。
ストームのトポロジは、次の部分で構成されています。
- 注ぎ口 -常にデータが到着するかどうかのタップ。 私たちの場合、これは標準のKafkaSpoutです。
- ボルトはデータハンドラーそのものです。 「ボルト」では、データを操作する魔法がすべて発生します。
- tupleは標準のデータ構造です。 タプルでは、素数からオブジェクトまで何でも保存できます。
イベントを受信し、jsonが解析してデータベースにパケットを送信する最も単純なボルトを実装しました。 最初のテストでは、次の問題が明らかになりました。
- Verticaは記録中にテーブルをロックします。
- トポロジ内の問題領域を追跡することは非常に困難です。
- データベースへの挿入を伴うスレッドは、1レコードを送信し、その後すぐに100レコードを送信できます。これが発生する理由は理解できませんでした。
最初のバージョンは非常にシンプルでした。id、name、subsite_id、user_id、ivi_id、tsの列があります。 同時に、Verticaのテーブルでも困難が生じました。
ご覧のとおり、これ以上データを記録していません。 しかし、その後、別のブラウザー、オペレーティングシステム、ブラウザーウィンドウサイズ、フラッシュプレーヤーのバージョンを記録することにしました。 「ハ!」、私たちは考えて、このようなテーブルを作りました。
| id | event_id | name | int_value | string_value | double_value | datetime_value | added |
JSONから追加のパラメーターを抽出し、タイプをチェックして、すべてを新しいプレートに書き込む2つ目のボルトを作成しました。
すべてが完璧でした。実装するのがとてもクールだったことが嬉しかったです。アナリストは、イベントにパラメーターを追加してレポートを作成できることを嬉しく思いました。 当時、私たちにとってイベントの主なソースはivi.ruサイト自体でしたが、モバイルアプリケーションはまだ何も送信していません。 彼らが送信を始めたとき、私たちはすべてが非常に悪いことに気づきました。
まず、Chromeブラウザーの単純な「クリック」->「購入」ファネルのリクエストを見てみましょう。
WITH groupped_events AS ( SELECT MIN(e.ts) as added, MIN(e.user_id) as user_id, e.name, MIN(CASE WHEN ep.name = 'browser' THEN string_value ELSE NULL END) as browser from events.events as e LEFT JOIN events.event_properties as ep ON ep.event_id = e.id WHERE e.added >= '2014-07-28' and e.added < '2014-07-29' and e.subsite_id = '10' GROUP BY e.id, e.name ) SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM groupped_events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name;
キャッチを参照してください? プレートに参加し(現在10億を超えるレコードがあります)、グループ化し、CASEを介して目的の値を引き出します。 もちろん、多くのイベントがあったとき、これはすべて遅くなり始めました。 リクエストは数分間機能しましたが、私たちには不向きでした。 アナリストは30分でリクエストに不満を述べ、製品の専門家は私を暗くしたかったです。
なんで?
また、HP Verticaは依然として列データベースであるという事実を明確にしたいと思います。 大量のデータを非常にコンパクトに列に格納し、たとえば、すべてのデータをシャベルすることなく、その場で新しい列を追加できます。 垂直は、オールインワンプレートでは非常にうまく対処できませんでした-彼女は、このヒープを最適化する方法を理解していませんでした。
次に、メインパラメーターを別の列のイベントテーブルにドラッグし、クエリでよく使用されるパラメーターのリストを作成することにしました。 この手順を2回実行しました。 最初に30列のテーブルがあり、2回目は既に50でした。これらすべての操作の後、すべてのクエリの平均実行時間は6〜8倍減少しました。
すべての操作の後、前のリクエストは単純なリクエストに変わりました。
SELECT COUNT(q.match_id) as count, name FROM ( SELECT event_name() as name, user_id, match_id() as match_id FROM events.events as e WHERE e.name IN ('click', 'buy') MATCH ( PARTITION BY user_id ORDER BY e.added ASC DEFINE click as e.name = 'click' and e.browser = 'Chrome', buy as e.name = 'buy' PATTERN P as (click buy | click) ) ) as q GROUP BY q.match_id, q.name;
これについて、私たちが止めた基地に対する苦痛は、この形で誰もが約3ヶ月間生き続けており、私たちはそれについて主張していませんでした。
とにかく、メインテーブルの構造が更新されるのを待たずに、アプリケーションをより速く開発できるように、event_propertiesテーブルを残しました。
アパッチストーム
HP Verticaに対処した後、Apache Stormに対処し始めました。作業を安定させ、別のスレッドを削除し、重い負荷に対応する必要がありました。
嵐の中でバッチ処理を行うには、少なくとも2つの方法があります。
- 生成されたリストでスレッドを分離します。
- tickTupleを受け入れる標準機能を使用します。
まず、最初のオプションを試し、それを破棄しました-動作は不安定で、リクエストはほとんどアイドル状態になりました。 2番目のオプションは、Stormのすべての魅力を示しています。
トポロジを作成するときの簡単なセットアップで、tickTupleを取得するタイミングを指定できます(10秒あります)。 TickTupleは、10秒ごとにメインスレッドに送信される空のエントリです。 このようなレコードを安全に追跡し、キューまたはレコード内のデータベースにすべてを追加できます。
private static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); } @Override public void execute(Tuple tuple) { if( isTickTuple(tuple) ) { executeTickTuple(tuple); } else { executeTuple(tuple); } }
executeTuple
、イベントを
LinkedBlockingQueue
に保存し、それに応じて、
executeTickTuple
ではキューを通過し、パケットをデータベースに挿入します。
トポロジをいくつかの
Bolt
分割しました。
- KafkaRecieverBolt-KafkaSpoutからデータを取得し、JSONを解析してPropertiesParserBoltに送信します。
- PropertiesParserBolt-非標準パラメーターを解析し、それらをEventPropertiesBatchBoltに送信し、イベント全体をさらにEventsBatchBoltに送信します
- EventsBatchBolt-メインテーブルにデータを保存します。
- EventPropertiesBatchBolt-追加パラメーターのテーブルにデータを保存します
これで、どの「ボルト」が遅くなり、どのデータが追跡されるかを確認できます。StormUIのトポロジパフォーマンス統計
あとがき
次の記事では、これを管理および監視する方法を説明します。