Stormは、 Apache Hadoopに似ていますが、リアルタイムの大規模なデータストリームの分散処理に焦点を当てたシステムです。
Stormの主な機能:
- スケーラビリティ 。 処理タスクは、クラスターノードと各ノードのスレッドに分散されます。
- データ損失に対する保証された保護。
- 展開と保守が簡単。
- クラッシュ回復。 ハンドラーのいずれかが失敗すると、タスクは他のハンドラーにリダイレクトされます。
- Javaだけでなくコンポーネントを作成する機能。 JSONオブジェクトを使用したシンプルなMultilangプロトコル 。 Python、Ruby、およびFancy言語用の既製のアダプターがあります。
最初の部分では、Stormバージョン0.8.2を使用してアプリケーションを作成する基本概念と基本について説明します。
嵐の要素
タプル
データ表示要素。 デフォルトでは、Long、Integer、Short、Byte、String、Double、Float、Boolean、byte []フィールドを含めることができます。 Tupleで使用されるカスタム型はシリアル化可能でなければなりません。
ストリーム
タプルのシーケンス。 Tupleのフィールドの命名スキームが含まれています。
注ぎ口
ストリームのデータプロバイダー。 外部ソースからデータを受信し、それらからタプルを形成し、それをストリームに送信します。 Tupleを複数の異なるストリームに送信できます。 RabbitMQ / AMQP 、 Kestrel 、 JMS 、 Kafkaなどの一般的なメッセージングシステムに対応しています。
ボルト
データハンドラ。 入力はTupleです。 0個以上のTupleを出力に送信します。
トポロジー
関係の説明を持つ要素のコレクション。 HadoopのMapReduceジョブに類似しています。 MapReduceジョブとは異なり、入力データストリームが使い果たされた後も停止しません。 注ぎ口要素とボルト要素の間でタプル輸送を実行します。 ローカルで起動するか、Stormクラスターにロードできます。
使用例
挑戦する
Cdr電話データストリームがあります。 ソース番号に基づいて、クライアントIDが決定されます。 宛先番号と顧客IDに基づいて、料金が決定され、通話コストが考慮されます。 各ステージは複数のスレッドで動作するはずです。
この例はローカルマシンで実行されます。
実装
開始するには、 BasicApp入力を印刷するだけです。
新しいトポロジを作成します。
TopologyBuilder builder = new TopologyBuilder();
入力を生成するスパウトCdrSpoutを追加します。
builder.setSpout("CdrReader", new CdrSpout());
2つのストリームでBoltを追加し、出力ストリームがCdrReaderであることを示します。 shuffleGroupingは、CdrReaderからのデータがランダムに選択されたPrintOutBoltに供給されることを意味します。
builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader");
ローカルStormクラスターを構成して開始します。
Config config = new Config(); // config.setDebug(false); LocalCluster cluster = new LocalCluster(); // Storm cluster.submitTopology("T1", config, builder.createTopology()); // Topology Thread.sleep(1000*10); cluster.shutdown(); //
出力はおよそ次のとおりです。
非表示のテキスト
OUT >> [80] Cdr {callSource = '78119990005'、callDestination = '8313610698077174239'、 callTime = 7631、clientId = 0、price = 0} OUT >> [78] Cdr {callSource = '78119990006'、callDestination = '2238707710336895468'、 callTime = 20738、clientId = 0、price = 0} OUT >> [78] Cdr {callSource = '78119990007'、callDestination = '579372726495390920'、 callTime = 31544、clientId = 0、price = 0} OUT >> [80] Cdr {callSource = '78119990006'、callDestination = '2010724447342634423'、 callTime = 10268、clientId = 0、price = 0}
角括弧内の数字はスレッドIDです。処理が並行して実行されていることがわかります。
さらなる実験のために、いくつかのハンドラー間での入力データの分布に対処する必要があります。
上記の例では、ランダムなアプローチが使用されました。 しかし、実際の使用では、Boltはおそらく外部ヘルプシステムとデータベースを使用します。 この場合、各Boltが入力データの独自のサブセットを処理することが望ましいです。 その後、外部システムからのデータの効果的なキャッシュを整理することが可能になります。
これを行うために、StormはCustomStreamGroupingインターフェイスを提供します。
CdrGrouperをプロジェクトに追加します。 そのタスクは、同じソース番号のタプルを同じボルトに送信することです。 これを行うために、CustomStreamGroupingは2つの呼び出しを提供します。
prepare-最初に使用する前に呼び出されます:
@Override public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) { tasks = new ArrayList<>(integers); // Bolts }
そしてchooseTasks - Tupleからのリストが入力され、 Tupleリスト内の各位置のボルト番号で構成されるリストが返されます:
@Override public List<Integer> chooseTasks(int i, List<Object> objects) { List<Integer> rvalue = new ArrayList<>(objects.size()); for(Object o: objects) { Cdr cdr = (Cdr) o; rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) % tasks.size())); } return rvalue; }
shuffleGroupingをCdrGrouper BasicGroupAppに置き換えます。
builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2). customGrouping("CdrReader", new CdrGrouper());
実行して、意図したとおりに機能することを確認します。
非表示のテキスト
OUT >> [80] Cdr {callSource = '78119990007'、callDestination = '3314931472251135073'、 callTime = 17632、clientId = 0、price = 0} OUT >> [80] Cdr {callSource = '78119990007'、callDestination = '4182885669941386786'、 callTime = 31533、clientId = 0、price = 0}
次に、プロジェクトに追加します。
ClientIdBolt-ソース番号によってクライアントIDを識別します。
ClientIdGrouper-クライアントIDでグループ化します。
RaterBolt-請求に従事しています。
CalcAppはプログラムの最終バージョンです。
トピックが興味深い場合は、次のパートで、データ損失と実際のクラスターでの実行に対する保護メカニズムについてお話したいと思います。 コードはGitHubで入手できます 。
PS。 もちろん、曲から言葉を放り出すことはありませんが、データプロセッサの名前「ボルト」はやや紛らわしいです:)
UPD。 記事の2番目の部分が公開されています。