ØMQおよびJZMQを使用してJavaにファイルを転送する

あいさつ、私たちは志を同じくする人々の小さな会社であり、フォーマットや保存方法に関係なくデータを管理するために設計された製品を開発しています-ArkStore 、私たちのブログでは、ほぼ2年間の開発の過程で得た経験を共有しようとします。 IO層とØMQ(またはZeroMQ)と呼ばれる製品を含む最初の記事を強調することにしました。 ØMQの使用を開始する方法と、それを使用してかなり大量のデータを転送する方法を説明しようとします。



ØMQを選ぶ理由


製品の開発中に、「大量のデータを確実に転送する方法」という課題に直面しました。IO層全体が完全に非同期であり、大量のメモリを消費せず、非常に単純であることが望ましいのです。 当初、すべてのアーキテクチャはAkkaを使用して構築されていたため、Spray IO(またはAkka IO)を使用しました。 しかし、適切な解決策がなかった多くの問題が発生しました。たとえば、発見したバグにより、追加のハートビートメッセージの作成や大量のサービス情報の送信が必要になりました。



最後に、メッセージブローカーに注目することにしました。 ActiveMQRabbitMQおよびØMQ 原則として、すべてのブローカーが私たちのために設定されたタスクを解決しましたが、ØMQに決めました。 ActiveMQは重すぎたようで、RabbitMQは最初の分散アーキテクチャにマスターノードを導入しました(明示的なリーダーなし)。



ØMQは、次の3つの基本的なデータ転送パターンをサポートしています。



これらのテンプレートとØMQの詳細については、公式Webサイトにあるすばらしいガイドをご覧ください。



この記事では、最初のテンプレート(Request-Reply)のみを検討します。



ØMQおよびJZMQを収集します


コードに入る前に、ØMQ自体とJavaバインディングを構築する必要があります。



Linux


CentOSの場合、プロセスは次のようになります(他の* nix OSの場合は若干異なるはずです)。



必要なライブラリがすべて揃っていることを確認してください。

yum install libtool autoconf automake gcc-c++ make e2fsprogs
      
      





開発者のサイトからØMQライブラリの最新の安定バージョンを取得して解凍します (3.2.4執筆時点)。

 wget http://download.zeromq.org/zeromq-3.2.4.tar.gz tar -xzvf zeromq-3.2.4.tar.gz cd zeromq-3.2.4
      
      





ØMQを収集してインストールします。ライブラリは/ usr / local / libディレクトリにありますが、今後必要になります。

 ./configure make sudo make install sudo ldconfig
      
      





ØMQを組み立てた後、JZMQを組み立てる必要があります。 これを行うために、GITリポジトリーから最新バージョンを取り出します(マスターまたはタグ、執筆時の最後のタグは2.2.2です)。

 wget https://github.com/zeromq/jzmq/archive/v2.2.2.zip unzip jzmq-2.2.2.zip cd jzmq-2.2.2 ./autogen.sh ./configure make sudo make install sudo ldconfig
      
      





ライブラリも/ usr / lib / localディレクトリにあります。 なぜこれが重要なのですか? 事実、ネイティブライブラリを使用するには、Javaがそれらを見つける場所を知っている必要があるため、プログラムを起動するときにjava.library.pathパラメータを指定する必要があります。 これを行う方法はいくつかあります。アプリケーションの起動時に指定することができます-Djava.library.path = "/ usr / lib / local" 、またはプログラムの実行中に直接インストールします。 デフォルトで設定されるjava.library.path値を使用することもできます。 デフォルトで設定されている値を確認するには、次のコマンドを実行する必要があります。



 java -XshowSettings:properties
      
      





私の場合、これは次のとおりです。

 java.library.path = /usr/java/packages/lib/amd64 /usr/lib64 /lib64 /lib /usr/lib
      
      





Javaがネイティブライブラリを見つけることができるようにするには、ライブラリをこれらのアドレスのいずれかに転送するか、リンクするだけで十分です。 どのアプローチを選択するかは、個人的に選択します。



make install後にライブラリがインストールされた場所を確認するには、次のコマンドを実行します。

 whereis libzmq whereis libjzmq
      
      







libzmq用に収集されたdllファイルは、公式サイトからダウンロードできます。 ここでは、Windows用のJZMQのコンパイルに関するガイドを見つけることができます。 残念ながら、 CMAKEを使用してライブラリを構築することはできませんでした。VisualStudio 2013を使用してlibzmqおよびjzmqを構築する必要がありました。同時に、ライブラリ自体がJVM(32または64ビット)に適したアーキテクチャ用に構築されることが重要です、



libzmq.dllとjzmq.dllがPATHに追加されている場合、JVMはそれらを自動的に見つける必要があります。



プログラム


ようやく、コンピューターにØMQとJZMQをインストールして構成することができました! それを実行する時が来ました。 例として、 マニュアルに記載されているファイル転送プロトコルを使用してみて、少し改善します。



まず、プロトコルの要件を説明します。



1ギガバイトの「保証されたランダム」データを準備します。

 dd if=/dev/urandom of=testdata bs=1M count=1024
      
      





OSがデータをコピーするのにかかる時間を測定しましょう。 これらの数字はかなり近似していますが、少なくとも何らかの比較ポイントがあります。

 echo 3 > /proc/sys/vm/drop_caches time cp testdata testdata2 real 0m7.745s user 0m0.011s sys 0m1.223s
      
      





コードに取りかかりましょう。 顧客:

 package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; public class ZMQClient implements Runnable { //      "" private static final int PIPELINE = 10; //  1  , 250. private static final int CHUNK_SIZE = 250000; private final ZMQ.Context zmqContext; public ZMQClient(ZMQ.Context zmqContext) { this.zmqContext = zmqContext; } @Override public void run() { //  ZMQ.Socket,     DEALER socket try (ZMQ.Socket socket = zmqContext.socket(ZMQ.DEALER)) { //            //  ,   1      (drop)   socket.setLinger(1000); //     TCP  socket.connect("tcp://127.0.0.1:6000"); //  ,  Chunks     int credit = PIPELINE; //    long total = 0; //    (Chunks) int chunks = 0; //      long offset = 0; while (!Thread.currentThread().isInterrupted()) { //     "",     //   while (credit > 0) { socket.sendMore("fetch"); socket.sendMore(Long.toString(offset)); socket.send(Integer.toString(CHUNK_SIZE)); offset += CHUNK_SIZE; credit--; } //       ZFrame zFrame = ZFrame.recvFrame(socket); //   ,   if (zFrame == null) { break; } chunks++; credit++; int size = zFrame.getData().length; total += size; zFrame.destroy(); //         //       if (size < CHUNK_SIZE) { break; } } System.out.format("%d chunks received, %d bytes %n", chunks, total); } } }
      
      





クライアントはサーバーに接続し、データの要求を送信します。 各メッセージはいくつかの部分で構成されています。

  1. コマンドはサーバーに必要なものです。この場合、コマンドは1つだけで、 フェッチはデータを取得することです。
  2. コマンドパラメータ(存在する場合)- フェッチの場合これはファイルの先頭からのインデントとデータのサイズです。


クライアントはこれらのコマンドを「クレジット」で送信します。つまり、クライアントは「クレジット」が残っているのと同じ数のフェッチチームを送信します。 クレジットは、クライアントが受信したデータを正常に処理した場合にのみ増加します。 この例では、クライアントはデータに対して何も行いませんが、ハンドラーを追加して、データを保存したり、スリープを使用して作業をシミュレートしたりできます。 したがって、クライアントはサーバーからの新しいデータを待機するアイドル状態になりません。



 package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; public class ZMQServer implements Runnable { private final ZMQ.Context zmqContext; //         private final Path filePath; public ZMQServer(ZMQ.Context zmqContext, Path filePath) { this.zmqContext = zmqContext; this.filePath = filePath; } @Override public void run() { try { File file = filePath.toFile(); if (!file.exists()) { throw new RuntimeException("File does not exists: " + filePath); } //    , Router   identity , //         try (FileChannel fileChannel = FileChannel.open(filePath)) { try (ZMQ.Socket socket = zmqContext.socket(ZMQ.ROUTER)) { //      localhost  6000  socket.bind("tcp://*:6000"); socket.setLinger(1000); while (!Thread.currentThread().isInterrupted()) { //   -  identity frame ZFrame identity = ZFrame.recvFrame(socket); assert identity != null; //    String command = socket.recvStr(); if (command.equals("fetch")) { //     fetch,   offset     String offsetString = socket.recvStr(); long offset = Long.parseLong(offsetString); String chunkSizeString = socket.recvStr(); int chunkSize = Integer.parseInt(chunkSizeString); int currentChunkSize = chunkSize; //   offset +       //     ,      offset    if (file.length() < (offset + chunkSize)) { currentChunkSize = (int) (file.length() - offset); } if (currentChunkSize > 0) { ByteBuffer byteBuffer = ByteBuffer.allocate(currentChunkSize); fileChannel.read(byteBuffer, offset); byteBuffer.flip(); byte[] bytes = new byte[currentChunkSize]; byteBuffer.get(bytes); //     ZFrame frameToSend = new ZFrame(bytes); //        identity // ,     identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } } } } } catch (IOException e) { throw new RuntimeException(e); } } }
      
      





サーバーは、1つのファイル(起動時に受信したリンク)のみを転送し、1つのコマンド-fetchのみに応答できます。 彼はクライアントを区別する方法を知っていますが、クライアントは1つのファイルしか取得できません。 これをどのように改善するかはもう少し書きますが、ここではテストと測定結果について説明します。



 package com.coldsnipe.example; import org.junit.Test; import org.zeromq.ZMQ; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; public class ZMQExampleTest { @Test public void testDataExchange() throws Exception { //  ZMQ.Context,    1!      //          ZMQ.Context zmqContext = ZMQ.context(1); //     final URL fileUrl = ZMQExampleTest.class.getClassLoader().getResource("testdata"); assert fileUrl != null; Path filePath = Paths.get(fileUrl.toURI()); //        ,     //       long startTime = System.nanoTime(); Thread clientThread = new Thread(new ZMQClient(zmqContext)); Thread serverThread = new Thread(new ZMQServer(zmqContext, filePath)); clientThread.start(); serverThread.start(); clientThread.join(); long estimatedTime = System.nanoTime() - startTime; float timeForTest = estimatedTime / 1000000000F; System.out.format("Elapsed time: %fs%n", timeForTest); //    ,     //          . zmqContext.term(); } }
      
      





このテストでは、ZMQ.Contextを起動し、クライアントとサーバーを起動し、データの転送にかかる時間を測定します。 コンテキストについては別に言いたい。 プロセス内でソケットを制御し、データを送信する方法とタイミングを決定するのは、隠されたコンダクターであるコンテキストです。 したがって、ここから単純なルールが続きます- プロセスごとに1つのコンテキスト



テストを実行し、結果を確認します。

 4295 chunks received, 1073741824 bytes Elapsed time: 1.429522s
      
      





1ギガバイトの読み取りには1.42秒かかりました。 このインジケーターがどれほど優れているかを言うのは難しいですが、同じスプレーIOと比較して、ØMQは30〜40%速くなり、IOの負荷は100%に近く(スプレー85-90)、CPU負荷はほぼ3分の1になります。



プロトコルの改善


プロトコルはコマンドを1つしか認識していませんが、テストには十分ですが、実際の状況では、サーバーが多くの異なるファイルを転送し、クライアントにサービス情報を提供できるようにする必要があります。 これを行うために、2つの新しいコマンドを導入します。



この場合のメッセージハンドラは次のようになります。

 else if(command.equals("get")) { // ID  String id = socket.recvStr(); //       BigInteger identityString = new BigInteger(identity.getData()); DataBlob dataBlob = blobMap.get(identityString); if (dataBlob!= null){ //      ,   DataBlob dataBlob.closeSource(); blobMap.remove(identityString); } //     DataBlob dataBlob = dataProvider.getBlob(id); if (dataBlob == null) { log.error("Received wrong get call on server socket, ID [{}]", id); } else { //   ,        DataHeader dataHeader = new DataHeader(id, dataBlob.getSize()); byte[] bytesToSend = FrameHeaderEncoder.encode(dataHeader); ZFrame frameToSend = new ZFrame(bytesToSend); //   DataBlob   blobMap.put(new BigInteger(identity.getData()), dataBlob); //    identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } else if (command.equals("end")){ BigInteger identityString = new BigInteger(identity.getData()); //  DataBlob    DataBlob dataBlob = blobMap.remove(identityString); if (dataBlob != null) { dataBlob.closeSource(); } }
      
      





多くのファイルを操作するために、サーバーに新しいオブジェクトを追加しました。

 public interface DataProvider { public DataBlob getBlob(String dataId); } public abstract class DataBlob { private final long size; private final String dataId; protected DataBlob(long size, String dataId) { this.size = size; this.dataId= dataId; } public abstract byte[] getData(long position, int length); public abstract void closeSource(); public long getSize() { return size; } public String getDataId() { return getDataId; } }
      
      





DataProviderを実装するクラスはデータの受信を管理し、getBlobメソッドは、基本的にリソースへの参照である新しいDataBlobを返します。



ファイルのDataBlob実装は次のようになります。

 public class FileDataBlob extends DataBlob { private final FileChannel fileChannel; public FileDataBlob(long size, String dataId, Path filePath) { super(size, dataId); try { this.fileChannel = FileChannel.open(filePath); } catch (IOException e) { throw new DataBlobException(e); } } @Override public byte[] getData(long position, int length) { ByteBuffer byteBuffer = ByteBuffer.allocate(length); try { fileChannel.read(byteBuffer, position); byteBuffer.flip(); byte[] bytes = new byte[length]; byteBuffer.get(bytes); return bytes; } catch (IOException e) { throw new DataBlobException(e); } } @Override public void closeSource() { try { fileChannel.close(); } catch (IOException e) { throw new DataBlobException(e); } } }
      
      





これらの2つのメソッドを追加することにより、クライアントが受信するデータの種類を選択でき、要求されたデータのサイズをクライアントに通知します。 プロトコルをさらに改善できます。たとえば、複数のファイルを連続して送信する機能を追加したり、Heartbeatを追加して、死んだクライアントや空きリソースを特定したりできます。



おわりに


この記事では、JavaでØMQを使用する方法を示したいと思いました。 現時点では、プロジェクトでは、ファイルだけでなくメタデータのメインメッセージブローカーであるØMQがかなり良い結果を示しています(まだ問題はありません)。



以下の記事では、 AkkaとSemantic Webが順番に使用されている間にArkStoreで使用される他のテクノロジーについてお話します。 ご清聴ありがとうございました。少なくともこの読み物が誰かにとって有益であることを願っています!



All Articles