処理がエラーで失敗しました。次に何をしますか? クラスタノードの1つとの通信が失われたり、データベースが一時的に利用できなくなったりする可能性があります。 この場合、どの操作が正常に実行され、どの操作が失敗したかを確実に言うことはできません。 フラグを設定するなど、チェーン内のすべての操作が再適用可能(べき等 )である場合は、単純に処理を再開できます。 そうでない場合は、Stormトランザクションメカニズムが役立ちます。
トランザクションの特性について説明すると、 ACIDという用語がすぐに表示されます。
- トミシティ(原子性)。 トランザクション中にシステムで行われたすべての変更は、完全に適用されるか、まったく適用されません。
- 一貫性。 トランザクションは、システムをある一貫した状態から別の状態に移行します。
- 私は分離(分離)。 同時に実行されるトランザクションは、互いの作業の結果に影響しません。
- 耐久性(信頼性)。 トランザクションの変更はシステムに残ることが保証されています。
一貫性と耐久性はデータベースにより関連しています。 原子性と分離に興味があります。
バージョン0.8.0で、StormはApache Pigに類似したTridentサブシステムを導入しました。 トランザクショントポロジ機能が移行されました。
Stormのトランザクション
トミシティ
トポロジは、データベースとの作業をカプセル化するStateインターフェースを実装するオブジェクトを作成します。 Spoutへの入力はTupleに分割され、バッチパケットで収集されます。 バッチは一意のトランザクションIDに関連付けられています。 タプル形成バッチは並行して処理できます。処理チェーンの最後に、単一のトランザクションに関連するTupleのセットが 、 StateUpdaterインターフェイスを実装するクラスのupdateStateメソッドに渡され、State の変更が生成されます。 正常に完了すると、Spoutはバッチ処理の成功に関する通知を受け取ります。 エラーが発生した場合、Spoutは処理のためにバッチ全体を再度送信する必要があります。
したがって、Stormは、バッチが完全に1回だけデータベースにコミットされるようにします。
私はソレーション
Stormは、バッチがトランザクションIDの昇順で厳密に連続してStateUpdaterに送信されることを保証します。 つまり、バッチ#2は、バッチ#1が正常に修正された後にのみ修正されます。実装
トランザクションスパウトは、 ICommitterTridentSpout <TransactionMetadata>インターフェイスを実装する必要があります。 TransactionMetadata-Batchを生成し、次のトランザクションを生成するためのデータを含むクラス: TxMeta 。
非表示のテキスト
public class TxMeta { private int start; private int count; public TxMeta(int start, int count) { this.start = start; this.count = count; } // Skipped getters }
ITridentSpout.BatchCoordinator <TransactionMetadata>インターフェースを実装するクラスは、トランザクションを作成するときにTransactionMetadataを初期化し、次のトランザクションの準備ができている場合は要求に応答します: TridentTxSpout 。 トポロジごとに1つのコピーで作成されます。
非表示のテキスト
static class BCoordinator implements BatchCoordinator<TxMeta> { private static final int TRANSACTION_COUNT = 5; private static final int TRANSACTION_ELEMENT_COUNT = 5; //TxMeta - @Override public TxMeta initializeTransaction(long l, TxMeta txMeta) { if(txMeta != null) { System.out.println(String.format("Initializing transaction id: %08d, " + "start: %04d, count: %04d", l, txMeta.getStart() + txMeta.getCount(), txMeta.getCount())); return new TxMeta(txMeta.getStart() + txMeta.getCount(), TRANSACTION_ELEMENT_COUNT); } else { return new TxMeta(0, TRANSACTION_ELEMENT_COUNT); } } // @Override public boolean isReady(long l) { if(l <= TRANSACTION_COUNT) { System.out.println("ISREADY " + l); return true; } return false; } }
ICommitterTridentSpout.Emitterインターフェイスを実装するクラスは、バッチを形成します。 バッチ処理でエラーが発生した場合は、バッチを再度作成します。
重要-再構築されたバッチには、元のバッチとまったく同じタプルセットが含まれている必要があります。
非表示のテキスト
static class BEmitter implements Emitter { // Batch TransactionMetadata @Override public void emitBatch(TransactionAttempt transactionAttempt, Object coordinatorMeta, TridentCollector tridentCollector) { TxMeta txMeta = (TxMeta) coordinatorMeta; System.out.println("Emitting transaction id: " + transactionAttempt.getTransactionId() + " attempt:" + transactionAttempt.getAttemptId() ); for(int i = 0; i < txMeta.getCount(); ++i) { tridentCollector.emit(new Values("TRANS [" + transactionAttempt.getAttemptId() + "] [" + (txMeta.getStart() + i) + "]") ); } } // State @Override public void success(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction success id:" + transactionAttempt.getTransactionId()); } // State @Override public void commit(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction commit id:" + transactionAttempt.getTransactionId()); } }
この場合、 Stateインターフェースを実装するクラス、データベースドライバー: TxDatabase 。
非表示のテキスト
public class TxDatabase implements State { // @Override public void beginCommit(Long txId) { System.out.println("beginCommit [" + Thread.currentThread().getId() + "] " + txId); } // @Override public void commit(Long txId) { System.out.println("commit [" + Thread.currentThread().getId() + "] " + txId); } }
BaseStateUpdater <S extends State>を継承するクラスは、状態(DB)を変更します: TxDatabaseUpdater
非表示のテキスト
public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> { int count; // @Override public void updateState(TxDatabase txDatabase, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) { // if(++count == 2) throw new FailedException("YYYY"); for(TridentTuple t: tridentTuples) { System.out.println("Updating: " + t.getString(0)); } } }
StateFactoryインターフェースを実装するクラスは、State: TxDatabaseFactoryをインスタンス化します。
すべてまとめてTridentTransactionApp :
public class TridentTransactionApp { public static void main( String[] args ) throws Throwable { Logger.getRootLogger().setLevel(Level.ERROR); // TridentTopology tridentTopology = new TridentTopology(); // Spout tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()). // Tuple - OpPrintout shuffle().each(new Fields("msg"), new OpPrintout()). parallelismHint(2). // global(). // State () partitionPersist(new TxDatabaseFactory(), new Fields("msg"), new TxDatabaseUpdater()); // Skipped LocalCluster cluster = new LocalCluster(); cluster.submitTopology("T2", config, tridentTopology.build()); Thread.sleep(1000*100); cluster.shutdown(); } }
Stormのトランザクション機能は、自明でない処理が必要なときに、あるシステムから別のシステムにデータを転送するために使用すると非常に便利です。 たとえば、1つのシステムがファイルを生成し、Stormがそれらをレコードに分割し、それらを並行して処理し、データベースに追加します。 処理エラーの場合、ファイルが削除されず、2回処理されないという保証があります。
PS。 記事の枠組みの中でStormの可能性をすべて明らかにすることは不可能であり、本全体に十分な資料があります。 フレームワークの主要な機能と実際のプロジェクトでのアプリケーションの可能性を示すことができたと思います。
クラスターの展開に関して、最近、すばらしい記事に出会いました。 繰り返す理由はない。 運用環境でStormを展開するのは本当に簡単です。
PPS Hadoopには、Stormのオンライン処理に類似したHadoop Streamingがありますが、Stormとは異なり、トランザクションをサポートしていません。