マルチスレッドファイル圧縮にTPL Dataflowを使用する

小さな例を使用して、TPL Dataflowライブラリを使用して、15分でマルチスレッドファイル圧縮のかなり重要なタスクを解決する方法を説明します。



挑戦する



System.IO.Compression



名前空間にあるGZipStream



クラスを使用して、効果的なファイル圧縮を実装する必要があります。 大きなファイルを圧縮することを想定しており、RAMに完全には収まりません。



Tplデータフロー



TPL Dataflow(TDF)は 、.NET 4に含まれるTPL(The Task Parallel Library)ライブラリの上に構築され、ソースライブラリよりも複雑な問題を解決するための一連のプリミティブで補完します。 TPL Dataflowは、タスク、スレッドセーフコレクション、および.NET 4で導入されたその他の機能を使用して、データストリームの並列処理のサポートを追加します。 ライブラリの本質は、さまざまなブロックを接続してさまざまなデータ処理チェーンを編成することです。 さらに、データ処理は同期的および非同期的に発生する可能性があります。 ライブラリは、今後の.NET 4.5に含まれます。



解決策



この問題を解決するには、必要なブロックは3つだけです。

  1. データソースから読み取られたデータのバッファ:

     var buffer = new BufferBlock<byte[]>();
          
          





  2. データ圧縮ユニット:

     var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes));
          
          





    圧縮機能:

     private static byte[] Compress(byte[] bytes) { using (var resultStream = new MemoryStream()) { using (var zipStream = new GZipStream(resultStream, CompressionMode.Compress)) { using (var writer = new BinaryWriter(zipStream)) { writer.Write(bytes); return resultStream.ToArray(); } } } }
          
          





  3. 圧縮データ記録ユニット:

     var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length));
          
          







ブロックを接続します:

 buffer.LinkTo(compressor); compressor.LinkTo(writer);
      
      





また、ブロックのデータが終了し、作業を完了することができる場合、ブロックに通知します。 これを行うには、ブロックのComplete



メソッドを呼び出します。

 buffer.Completion.ContinueWith(task => compressor.Complete()); compressor.Completion.ContinueWith(task => writer.Complete());
      
      





ファイルを読み取ると、バッファにデータを提供します。 これは、ブロックのPost



メソッドを呼び出すことで実行されます。

 while (!buffer.Post(bytes)) { }
      
      





ブロックがいっぱいになり、データを受信しなくなったときの状況を考慮するために、このような構造が必要です。



読み取りが完了すると、データがなくなったことをブロックに通知します。

 buffer.Complete();
      
      





ここで、圧縮データをストリームに書き込む責任があるブロックの終わりまで待つ必要があります。

 writer.Completion.Wait();
      
      







結果のメソッド:

 public static void Compress(Stream inputStream, Stream outputStream) { var buffer = new BufferBlock<byte[]>(); var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes)); var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length)); buffer.LinkTo(compressor); buffer.Completion.ContinueWith(task => compressor.Complete()); compressor.LinkTo(writer); compressor.Completion.ContinueWith(task => writer.Complete()); var readBuffer = new byte[BufferSize]; while (true) { int readCount = inputStream.Read(readBuffer, 0, BufferSize); if (readCount > 0) { var bytes = new byte[readCount]; Buffer.BlockCopy(readBuffer, 0, bytes, 0, readCount); while (!buffer.Post(bytes)) { } } if (readCount != BufferSize) { buffer.Complete(); break; } } writer.Completion.Wait(); }
      
      





1つの「ではない」場合でも、これで終わる可能性があります。このコードは、完全に同期したコードと速度に違いはありません。 動作を高速化するには、圧縮操作を非同期で行う必要があることを示す必要があります。 これを行うには、必要な設定をブロックに追加します。

 var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }; var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions);
      
      





また、データが圧縮よりも速く読み取られるか、圧縮よりも遅く書き込まれる状況を予測する必要があります。 これを行うには、ブロックのBoundedCapacity



プロパティを変更します。

 var buffer = new BufferBlock<byte[]>(new DataflowBlockOptions { BoundedCapacity = 100 }); var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, BoundedCapacity = 100 }; var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions); var writerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 100, SingleProducerConstrained = true }; var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions);
      
      





最終的な方法は次のようになります。

 public static void Compress(Stream inputStream, Stream outputStream) { var buffer = new BufferBlock<byte[]>(new DataflowBlockOptions {BoundedCapacity = 100}); var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, BoundedCapacity = 100 }; var compressor = new TransformBlock<byte[], byte[]>(bytes => Compress(bytes), compressorOptions); var writerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 100, SingleProducerConstrained = true }; var writer = new ActionBlock<byte[]>(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions); buffer.LinkTo(compressor); buffer.Completion.ContinueWith(task => compressor.Complete()); compressor.LinkTo(writer); compressor.Completion.ContinueWith(task => writer.Complete()); var readBuffer = new byte[BufferSize]; while (true) { int readCount = inputStream.Read(readBuffer, 0, BufferSize); if (readCount > 0) { var postData = new byte[readCount]; Buffer.BlockCopy(readBuffer, 0, postData, 0, readCount); while (!buffer.Post(postData)) { } } if (readCount != BufferSize) { buffer.Complete(); break; } } writer.Completion.Wait(); }
      
      





たとえば、このようなコンソールアプリケーションから呼び出すことができます。

 private const int BufferSize = 16384; static void Main(string[] args) { var stopwatch = Stopwatch.StartNew(); using (var inputStream = File.OpenRead(@"C:\file.bak")) { using (var outputStream = File.Create(@"E:\file.gz")) { Compress(inputStream, outputStream); } } stopwatch.Stop(); Console.WriteLine(); Console.WriteLine(string.Format("Time elapsed: {0}s", stopwatch.Elapsed.TotalSeconds)); Console.ReadKey(); }
      
      







おわりに



ご覧のとおり、TPL Dataflowを使用すると、マルチスレッドデータ処理タスクを大幅に簡素化できます。 私のテストでは、圧縮に必要な時間はほぼ3倍短縮されました。

このライブラリをダウンロードして、 公式ページで読むことができます



All Articles