Stormフレームワークの学習。 パートI

2011年、TwitterはEclipse Public Licenseの下で、 Storm Distributed Computingプロジェクトを開始しました。 StormはBackTypeで作成され、購入後にTwitterに切り替えられました。



Stormは、 Apache Hadoopに似ていますが、リアルタイムの大規模なデータストリームの分散処理に焦点を当てたシステムです。



Stormの主な機能:



最初の部分では、Stormバージョン0.8.2を使用してアプリケーションを作成する基本概念と基本について説明します。



嵐の要素



タプル

データ表示要素。 デフォルトでは、Long、Integer、Short、Byte、String、Double、Float、Boolean、byte []フィールドを含めることができます。 Tupleで使用されるカスタム型はシリアル化可能でなければなりません。



ストリーム

タプルのシーケンス。 Tupleのフィールドの命名スキームが含まれています。



注ぎ口

ストリームのデータプロバイダー。 外部ソースからデータを受信し、それらからタプルを形成し、それをストリームに送信します。 Tupleを複数の異なるストリームに送信できます。 RabbitMQ / AMQPKestrelJMSKafkaなどの一般的なメッセージングシステムに対応しています。



ボルト

データハンドラ。 入力はTupleです。 0個以上のTupleを出力に送信します。



トポロジー

関係の説明を持つ要素のコレクション。 HadoopのMapReduceジョブに類似しています。 MapReduceジョブとは異なり、入力データストリームが使い果たされた後も停止しません。 注ぎ口要素とボルト要素の間でタプル輸送を実行します。 ローカルで起動するか、Stormクラスターにロードできます。



使用例



挑戦する



Cdr電話データストリームがあります。 ソース番号に基づいて、クライアントIDが決定されます。 宛先番号と顧客IDに基づいて、料金が決定され、通話コストが考慮されます。 各ステージは複数のスレッドで動作するはずです。

この例はローカルマシンで実行されます。



実装



開始するには、 BasicApp入力を印刷するだけです。



新しいトポロジを作成します。

TopologyBuilder builder = new TopologyBuilder();
      
      





入力を生成するスパウトCdrSpoutを追加します。

 builder.setSpout("CdrReader", new CdrSpout());
      
      





2つのストリームでBoltを追加し、出力ストリームがCdrReaderであることを示します。 shuffleGroupingは、CdrReaderからのデータがランダムに選択されたPrintOutBoltに供給されることを意味します。

 builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader");
      
      





ローカルStormクラスターを構成して開始します。

 Config config = new Config(); //     config.setDebug(false); LocalCluster cluster = new LocalCluster(); //   Storm  cluster.submitTopology("T1", config, builder.createTopology()); //  Topology Thread.sleep(1000*10); cluster.shutdown(); //  
      
      





出力はおよそ次のとおりです。

非表示のテキスト
 OUT >> [80] Cdr {callSource = '78119990005'、callDestination = '8313610698077174239'、 
 callTime = 7631、clientId = 0、price = 0}
 OUT >> [78] Cdr {callSource = '78119990006'、callDestination = '2238707710336895468'、 
 callTime = 20738、clientId = 0、price = 0}
 OUT >> [78] Cdr {callSource = '78119990007'、callDestination = '579372726495390920'、 
 callTime = 31544、clientId = 0、price = 0}
 OUT >> [80] Cdr {callSource = '78119990006'、callDestination = '2010724447342634423'、 
 callTime = 10268、clientId = 0、price = 0}


角括弧内の数字はスレッドIDです。処理が並行して実行されていることがわかります。



さらなる実験のために、いくつかのハンドラー間での入力データの分布に対処する必要があります。

上記の例では、ランダムなアプローチが使用されました。 しかし、実際の使用では、Boltはおそらく外部ヘルプシステムとデータベースを使用します。 この場合、各Boltが入力データの独自のサブセットを処理することが望ましいです。 その後、外部システムからのデータの効果的なキャッシュを整理することが可能になります。



これを行うために、StormはCustomStreamGroupingインターフェイスを提供します。

CdrGrouperをプロジェクトに追加します。 そのタスクは、同じソース番号のタプルを同じボルトに送信することです。 これを行うために、CustomStreamGroupingは2つの呼び出しを提供します。

prepare-最初に使用する前に呼び出されます:

 @Override public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) { tasks = new ArrayList<>(integers); //   Bolts }
      
      





そしてchooseTasks - Tupleからのリストが入力され、 Tupleリスト内の各位置のボルト番号で構成されるリストが返されます:

 @Override public List<Integer> chooseTasks(int i, List<Object> objects) { List<Integer> rvalue = new ArrayList<>(objects.size()); for(Object o: objects) { Cdr cdr = (Cdr) o; rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) % tasks.size())); } return rvalue; }
      
      





shuffleGroupingをCdrGrouper BasicGroupAppに置き換えます。

 builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2). customGrouping("CdrReader", new CdrGrouper());
      
      





実行して、意図したとおりに機能することを確認します。

非表示のテキスト
 OUT >> [80] Cdr {callSource = '78119990007'、callDestination = '3314931472251135073'、 
 callTime = 17632、clientId = 0、price = 0}
 OUT >> [80] Cdr {callSource = '78119990007'、callDestination = '4182885669941386786'、 
 callTime = 31533、clientId = 0、price = 0}




次に、プロジェクトに追加します。

ClientIdBolt-ソース番号によってクライアントIDを識別します。

ClientIdGrouper-クライアントIDでグループ化します。

RaterBolt-請求に従事しています。

CalcAppはプログラムの最終バージョンです。



トピックが興味深い場合は、次のパートで、データ損失と実際のクラスターでの実行に対する保護メカニズムについてお話したいと思います。 コードはGitHubで入手できます



PS。 もちろん、曲から言葉を放り出すことはありませんが、データプロセッサの名前「ボルト」はやや紛らわしいです:)



UPD。 記事の2番目の部分が公開されています。




All Articles