Stormフレームワークの学習。 パートIII

記事の第2部では、処理中にエラーを検出するメカニズムについて説明しました。



処理がエラーで失敗しました。次に何をしますか? クラスタノードの1つとの通信が失われたり、データベースが一時的に利用できなくなったりする可能性があります。 この場合、どの操作が正常に実行され、どの操作が失敗したかを確実に言うことはできません。 フラグを設定するなど、チェーン内のすべての操作が再適用可能(べき )である場合は、単純に処理を再開できます。 そうでない場合は、Stormトランザクションメカニズムが役立ちます。



トランザクションの特性について説明すると、 ACIDという用語がすぐに表示されます。



一貫性と耐久性はデータベースにより関連しています。 原子性と分離に興味があります。



バージョン0.8.0で、StormはApache Pigに類似したTridentサブシステムを導入しました。 トランザクショントポロジ機能が移行されました。



Stormのトランザクション



トミシティ

トポロジは、データベースとの作業をカプセル化するStateインターフェースを実装するオブジェクトを作成します。 Spoutへの入力はTupleに分割され、バッチパケットで収集されます。 バッチは一意のトランザクションIDに関連付けられています。 タプル形成バッチは並行して処理できます。

処理チェーンの最後に、単一のトランザクションに関連するTupleのセットがStateUpdaterインターフェイスを実装するクラスのupdateStateメソッドに渡され、State 変更が生成されます。 正常に完了すると、Spoutはバッチ処理の成功に関する通知を受け取ります。 エラーが発生した場合、Spoutは処理のためにバッチ全体を再度送信する必要があります。

したがって、Stormは、バッチが完全に1回だけデータベースにコミットされるようにします。



私はソレーション

Stormは、バッチがトランザクションIDの昇順で厳密に連続してStateUpdaterに送信されることを保証します。 つまり、バッチ#2は、バッチ#1が正常に修正された後にのみ修正されます。



実装



トランザクションスパウトは、 ICommitterTridentSpout <TransactionMetadata>インターフェイスを実装する必要があります。 TransactionMetadata-Batchを生成し、次のトランザクションを生成するためのデータを含むクラス: TxMeta

非表示のテキスト
public class TxMeta { private int start; private int count; public TxMeta(int start, int count) { this.start = start; this.count = count; } // Skipped getters }
      
      







ITridentSpout.BatchCoordinator <TransactionMetadata>インターフェースを実装するクラスは、トランザクションを作成するときにTransactionMetadataを初期化し、次のトランザクションの準備ができている場合は要求に応答します: TridentTxSpout 。 トポロジごとに1つのコピーで作成されます。

非表示のテキスト
  static class BCoordinator implements BatchCoordinator<TxMeta> { private static final int TRANSACTION_COUNT = 5; private static final int TRANSACTION_ELEMENT_COUNT = 5; //TxMeta -    @Override public TxMeta initializeTransaction(long l, TxMeta txMeta) { if(txMeta != null) { System.out.println(String.format("Initializing transaction id: %08d, " + "start: %04d, count: %04d", l, txMeta.getStart() + txMeta.getCount(), txMeta.getCount())); return new TxMeta(txMeta.getStart() + txMeta.getCount(), TRANSACTION_ELEMENT_COUNT); } else { return new TxMeta(0, TRANSACTION_ELEMENT_COUNT); } } //       @Override public boolean isReady(long l) { if(l <= TRANSACTION_COUNT) { System.out.println("ISREADY " + l); return true; } return false; } }
      
      







ICommitterTridentSpout.Emitterインターフェイスを実装するクラスは、バッチを形成します。 バッチ処理でエラーが発生した場合は、バッチを再度作成します。

重要-再構築されたバッチには、元のバッチとまったく同じタプルセットが含まれている必要があります。

非表示のテキスト
 static class BEmitter implements Emitter { //  Batch    TransactionMetadata @Override public void emitBatch(TransactionAttempt transactionAttempt, Object coordinatorMeta, TridentCollector tridentCollector) { TxMeta txMeta = (TxMeta) coordinatorMeta; System.out.println("Emitting transaction id: " + transactionAttempt.getTransactionId() + " attempt:" + transactionAttempt.getAttemptId() ); for(int i = 0; i < txMeta.getCount(); ++i) { tridentCollector.emit(new Values("TRANS [" + transactionAttempt.getAttemptId() + "] [" + (txMeta.getStart() + i) + "]") ); } } //     State @Override public void success(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction success id:" + transactionAttempt.getTransactionId()); } //     State @Override public void commit(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction commit id:" + transactionAttempt.getTransactionId()); } }
      
      







この場合、 Stateインターフェースを実装するクラス、データベースドライバー: TxDatabase

非表示のテキスト
 public class TxDatabase implements State { //       @Override public void beginCommit(Long txId) { System.out.println("beginCommit [" + Thread.currentThread().getId() + "] " + txId); } //       @Override public void commit(Long txId) { System.out.println("commit [" + Thread.currentThread().getId() + "] " + txId); } }
      
      







BaseStateUpdater <S extends State>を継承するクラスは、状態(DB)を変更します: TxDatabaseUpdater

非表示のテキスト
 public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> { int count; //     @Override public void updateState(TxDatabase txDatabase, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) { //    if(++count == 2) throw new FailedException("YYYY"); for(TridentTuple t: tridentTuples) { System.out.println("Updating: " + t.getString(0)); } } }
      
      







StateFactoryインターフェースを実装するクラスは、State: TxDatabaseFactoryをインスタンス化します。



すべてまとめてTridentTransactionApp

 public class TridentTransactionApp { public static void main( String[] args ) throws Throwable { Logger.getRootLogger().setLevel(Level.ERROR); //   TridentTopology tridentTopology = new TridentTopology(); //   Spout tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()). //  Tuple   - OpPrintout    shuffle().each(new Fields("msg"), new OpPrintout()). parallelismHint(2). //        global(). //    State () partitionPersist(new TxDatabaseFactory(), new Fields("msg"), new TxDatabaseUpdater()); // Skipped LocalCluster cluster = new LocalCluster(); cluster.submitTopology("T2", config, tridentTopology.build()); Thread.sleep(1000*100); cluster.shutdown(); } }
      
      





Stormのトランザクション機能は、自明でない処理が必要なときに、あるシステムから別のシステムにデータを転送するために使用すると非常に便利です。 たとえば、1つのシステムがファイルを生成し、Stormがそれらをレコードに分割し、それらを並行して処理し、データベースに追加します。 処理エラーの場合、ファイルが削除されず、2回処理されないという保証があります。



PS。 記事の枠組みの中でStormの可能性をすべて明らかにすることは不可能であり、本全体に十分な資料があります。 フレームワークの主要な機能と実際のプロジェクトでのアプリケーションの可能性を示すことができたと思います。

クラスターの展開に関して、最近、すばらしい記事に出会いました。 繰り返す理由はない。 運用環境でStormを展開するのは本当に簡単です。



PPS Hadoopには、Stormのオンライン処理に類似したHadoop Streamingがありますが、Stormとは異なり、トランザクションをサポートしていません。




All Articles