タスクのクラスが異なると、信頼性の要件も異なります。 訪問の統計を計算する際にいくつかのレコードを見逃すことは1つです。アカウントが数十万になり、特別な正確さは必要ありません。 そしてもう一つ-例えば、クライアントの支払いに関する情報を失うこと。
次に、Stormに実装されたデータ損失保護メカニズムを見ていきます。
基本的な例
注ぎ口
Tupleの処理中にエラーが発生したかどうかが重要でない場合、Spoutは、 emit(新しい値(...))メソッドを呼び出してTupleをSpoutOutputCollectorに送信します。Tupleが正常に処理されたかどうかを確認する場合、呼び出しはemit(new Values(...)、msgId)のようになります。msgIdは任意のクラスのオブジェクトです。 この場合、 ISpoutインターフェイスは次のメソッドを提供します。
- ack(Object msgId) -タプルが処理されると呼び出されます
- fail(オブジェクトmsgId) -タプルが処理されない場合に呼び出されます
FailAwareSpoutの例:
public class FailAwareSpout extends BaseRichSpout { private Message[] messages; // Skipped ... private static class Message implements Serializable { private String message; private int failCount; private Message(String message) { this.message = message; } } // Skipped ... @Override public void nextTuple() { // Skipped ... // Tuple c msgId outputCollector.emit(new Values(messages[messageId].message), messageId); } // Tuple @Override public void ack(Object msgId) { Message m = messages[(Integer) msgId]; System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processed successfully"); } // Tuple @Override public void fail(Object msgId) { Message m = messages[(Integer) msgId]; if(++m.failCount > MAX_RETRY_COUNT) { throw new IllegalStateException("Too many message processing errors"); } System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processing failed " + "[" + m.failCount + "]"); // sendQueue.addLast((Integer) msgId); } }
nextTuple、ack、およびfailメソッドは同じスレッドで呼び出され、Spoutフィールドにアクセスするときに追加の同期を必要としません。
ボルト
Boltが処理結果についてStormに通知するには、 IRichBoltインターフェイスを実装する必要があります。 これを行う最も簡単な方法は、 BaseRichBoltクラスを継承することです 。Boltは、 実行(タプル)メソッドでOutputCollectorクラスの以下のメソッドを呼び出すことにより、Stormに作業の結果を通知します。
FailingBoltの例:
public class FailingBolt extends BaseRichBolt { OutputCollector outputCollector; // Skipped ... @Override public void execute(Tuple tuple) { // Skipped ... outputCollector.ack(tuple); // } else { // Skipped ... outputCollector.fail(tuple); // } } // Skipped ... }
使用例: BasicFailApp 、 Spout FailAwareSpoutおよびBolt FailingBoltは、処理エラーをランダムに生成します。
BaseBasicBoltクラスから継承されたBoltsでは、 executeメソッドの終了後にack( Tuple )が自動的に呼び出されます。
固定
入力タプルを処理するとき、Boltは複数の出力タプルを生成できます。 Boltがemit(sourceTuple、resultTuple)を呼び出した場合、 DAGは元のタプルの形の頂点と生成されたタプルの形の子孫で形成されます。 Stormは、グラフ内のすべてのノードの処理エラーを追跡します。 階層内のいずれかのレベルでエラーが発生した場合、元のタプルを生成したSpoutは失敗呼び出しによって通知されます。 MultiplierBoltの例:
public class MultiplierBolt extends BaseRichBolt { // Skipped ... @Override public void execute(Tuple tuple) { // Tuple for(int i = 0; i < MULTI_COUNT; ++i) { // Anchoring, Tuple outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i)); } outputCollector.ack(tuple); } // Skipped ... }
アンカーの例: TreeFailApp
BaseBasicBoltクラスから継承されたBolts では 、 executeメソッド(Tuple、BasicOutputCollector)がBasicOutputCollectorコレクターで呼び出されます。 BasicOutputCollectorの機能は、発行時に入力タプルにアンカーを自動的に作成することです。
Stormは分散システムであるため、Tupleは1つのクラスターノードから別のクラスターノードに転送できます。 この点で、Stormは処理タイムアウトの追跡を提供します。 デフォルトでは、グラフ全体を30秒で処理する必要があります。そうしないと、Stormはグラフを生成したスパウトでfailメソッドを呼び出します。 タイムアウトは変更できます。
コードはGitHubで入手できます 。
次のパートでは、トランザクションデータソースと組み合わせて使用されるトランザクショントポロジに焦点を当てます。
UPD。 記事の最後の部分が公開されています。