ØMQを選ぶ理由
製品の開発中に、「大量のデータを確実に転送する方法」という課題に直面しました。IO層全体が完全に非同期であり、大量のメモリを消費せず、非常に単純であることが望ましいのです。 当初、すべてのアーキテクチャはAkkaを使用して構築されていたため、Spray IO(またはAkka IO)を使用しました。 しかし、適切な解決策がなかった多くの問題が発生しました。たとえば、発見したバグにより、追加のハートビートメッセージの作成や大量のサービス情報の送信が必要になりました。
最後に、メッセージブローカーに注目することにしました。 ActiveMQ 、 RabbitMQおよびØMQ 原則として、すべてのブローカーが私たちのために設定されたタスクを解決しましたが、ØMQに決めました。 ActiveMQは重すぎたようで、RabbitMQは最初の分散アーキテクチャにマスターノードを導入しました(明示的なリーダーなし)。
ØMQは、次の3つの基本的なデータ転送パターンをサポートしています。
- Request-Replyは最も単純な共通テンプレートであり、サーバーにリクエストを送信し、レスポンスを受信しました。 古典的なクライアントサーバーモデル。
- パブリッシュ/サブスクライブ-この場合、サーバー(パブリッシャー)は定期的にサブスクライバーの情報を公開します。 例は、関心のあるすべての顧客の測定データを常に公開するセンサーです。
- Push-Pull-またはParallel Pipelineを使用すると、並列コンピューティングを実行できます。 サーバーは、作業を含むメッセージ(プッシュ)を計算を実行できるマシン(ワーカー)に均等に送信し、マシンはこれらのメッセージを取得(プル)し、計算を実行し、計算の結果に関心があるクライアント(プッシュ)に結果を与えます(プル)。 写真
これらのテンプレートとØMQの詳細については、公式Webサイトにあるすばらしいガイドをご覧ください。
この記事では、最初のテンプレート(Request-Reply)のみを検討します。
ØMQおよびJZMQを収集します
コードに入る前に、ØMQ自体とJavaバインディングを構築する必要があります。
Linux
CentOSの場合、プロセスは次のようになります(他の* nix OSの場合は若干異なるはずです)。
必要なライブラリがすべて揃っていることを確認してください。
yum install libtool autoconf automake gcc-c++ make e2fsprogs
開発者のサイトからØMQライブラリの最新の安定バージョンを取得して解凍します (3.2.4執筆時点)。
wget http://download.zeromq.org/zeromq-3.2.4.tar.gz tar -xzvf zeromq-3.2.4.tar.gz cd zeromq-3.2.4
ØMQを収集してインストールします。ライブラリは/ usr / local / libディレクトリにありますが、今後必要になります。
./configure make sudo make install sudo ldconfig
ØMQを組み立てた後、JZMQを組み立てる必要があります。 これを行うために、GITリポジトリーから最新バージョンを取り出します(マスターまたはタグ、執筆時の最後のタグは2.2.2です)。
wget https://github.com/zeromq/jzmq/archive/v2.2.2.zip unzip jzmq-2.2.2.zip cd jzmq-2.2.2 ./autogen.sh ./configure make sudo make install sudo ldconfig
ライブラリも/ usr / lib / localディレクトリにあります。 なぜこれが重要なのですか? 事実、ネイティブライブラリを使用するには、Javaがそれらを見つける場所を知っている必要があるため、プログラムを起動するときにjava.library.pathパラメータを指定する必要があります。 これを行う方法はいくつかあります。アプリケーションの起動時に指定することができます-Djava.library.path = "/ usr / lib / local" 、またはプログラムの実行中に直接インストールします。 デフォルトで設定されるjava.library.path値を使用することもできます。 デフォルトで設定されている値を確認するには、次のコマンドを実行する必要があります。
java -XshowSettings:properties
私の場合、これは次のとおりです。
java.library.path = /usr/java/packages/lib/amd64 /usr/lib64 /lib64 /lib /usr/lib
Javaがネイティブライブラリを見つけることができるようにするには、ライブラリをこれらのアドレスのいずれかに転送するか、リンクするだけで十分です。 どのアプローチを選択するかは、個人的に選択します。
make install後にライブラリがインストールされた場所を確認するには、次のコマンドを実行します。
whereis libzmq whereis libjzmq
窓
libzmq用に収集されたdllファイルは、公式サイトからダウンロードできます。 ここでは、Windows用のJZMQのコンパイルに関するガイドを見つけることができます。 残念ながら、 CMAKEを使用してライブラリを構築することはできませんでした。VisualStudio 2013を使用してlibzmqおよびjzmqを構築する必要がありました。同時に、ライブラリ自体がJVM(32または64ビット)に適したアーキテクチャ用に構築されることが重要です、
libzmq.dllとjzmq.dllがPATHに追加されている場合、JVMはそれらを自動的に見つける必要があります。
プログラム
ようやく、コンピューターにØMQとJZMQをインストールして構成することができました! それを実行する時が来ました。 例として、 マニュアルに記載されているファイル転送プロトコルを使用してみて、少し改善します。
まず、プロトコルの要件を説明します。
- プロトコルは、データを受信することについてクライアントからの応答を毎回待つことなく、非同期でデータを送信する必要があります。
- サーバーは、多数のクライアントと連携できる必要があります(クライアントを識別できる)
- 同時に、プロトコルは、メモリに大量のデータを保持することなく、サーバーのメモリを慎重に処理する必要があります。これは、クライアントとサーバーの両方にとって重要です。
- プロトコルは、データ送信のキャンセルをサポートする必要があります。
- クライアントが受け入れるデータ(チャンク)のサイズを選択できるようにするため。
- データ転送の再開をサポートするため(たとえば、エラーが発生し、ファイル全体を再度転送せずに特定の場所からデータ転送を再開したい場合)。
1ギガバイトの「保証されたランダム」データを準備します。
dd if=/dev/urandom of=testdata bs=1M count=1024
OSがデータをコピーするのにかかる時間を測定しましょう。 これらの数字はかなり近似していますが、少なくとも何らかの比較ポイントがあります。
echo 3 > /proc/sys/vm/drop_caches time cp testdata testdata2 real 0m7.745s user 0m0.011s sys 0m1.223s
コードに取りかかりましょう。 顧客:
package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; public class ZMQClient implements Runnable { // "" private static final int PIPELINE = 10; // 1 , 250. private static final int CHUNK_SIZE = 250000; private final ZMQ.Context zmqContext; public ZMQClient(ZMQ.Context zmqContext) { this.zmqContext = zmqContext; } @Override public void run() { // ZMQ.Socket, DEALER socket try (ZMQ.Socket socket = zmqContext.socket(ZMQ.DEALER)) { // // , 1 (drop) socket.setLinger(1000); // TCP socket.connect("tcp://127.0.0.1:6000"); // , Chunks int credit = PIPELINE; // long total = 0; // (Chunks) int chunks = 0; // long offset = 0; while (!Thread.currentThread().isInterrupted()) { // "", // while (credit > 0) { socket.sendMore("fetch"); socket.sendMore(Long.toString(offset)); socket.send(Integer.toString(CHUNK_SIZE)); offset += CHUNK_SIZE; credit--; } // ZFrame zFrame = ZFrame.recvFrame(socket); // , if (zFrame == null) { break; } chunks++; credit++; int size = zFrame.getData().length; total += size; zFrame.destroy(); // // if (size < CHUNK_SIZE) { break; } } System.out.format("%d chunks received, %d bytes %n", chunks, total); } } }
クライアントはサーバーに接続し、データの要求を送信します。 各メッセージはいくつかの部分で構成されています。
- コマンドはサーバーに必要なものです。この場合、コマンドは1つだけで、 フェッチはデータを取得することです。
- コマンドパラメータ(存在する場合)- フェッチの場合、これはファイルの先頭からのインデントとデータのサイズです。
クライアントはこれらのコマンドを「クレジット」で送信します。つまり、クライアントは「クレジット」が残っているのと同じ数のフェッチチームを送信します。 クレジットは、クライアントが受信したデータを正常に処理した場合にのみ増加します。 この例では、クライアントはデータに対して何も行いませんが、ハンドラーを追加して、データを保存したり、スリープを使用して作業をシミュレートしたりできます。 したがって、クライアントはサーバーからの新しいデータを待機するアイドル状態になりません。
package com.coldsnipe.example; import org.zeromq.ZFrame; import org.zeromq.ZMQ; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; public class ZMQServer implements Runnable { private final ZMQ.Context zmqContext; // private final Path filePath; public ZMQServer(ZMQ.Context zmqContext, Path filePath) { this.zmqContext = zmqContext; this.filePath = filePath; } @Override public void run() { try { File file = filePath.toFile(); if (!file.exists()) { throw new RuntimeException("File does not exists: " + filePath); } // , Router identity , // try (FileChannel fileChannel = FileChannel.open(filePath)) { try (ZMQ.Socket socket = zmqContext.socket(ZMQ.ROUTER)) { // localhost 6000 socket.bind("tcp://*:6000"); socket.setLinger(1000); while (!Thread.currentThread().isInterrupted()) { // - identity frame ZFrame identity = ZFrame.recvFrame(socket); assert identity != null; // String command = socket.recvStr(); if (command.equals("fetch")) { // fetch, offset String offsetString = socket.recvStr(); long offset = Long.parseLong(offsetString); String chunkSizeString = socket.recvStr(); int chunkSize = Integer.parseInt(chunkSizeString); int currentChunkSize = chunkSize; // offset + // , offset if (file.length() < (offset + chunkSize)) { currentChunkSize = (int) (file.length() - offset); } if (currentChunkSize > 0) { ByteBuffer byteBuffer = ByteBuffer.allocate(currentChunkSize); fileChannel.read(byteBuffer, offset); byteBuffer.flip(); byte[] bytes = new byte[currentChunkSize]; byteBuffer.get(bytes); // ZFrame frameToSend = new ZFrame(bytes); // identity // , identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } } } } } catch (IOException e) { throw new RuntimeException(e); } } }
サーバーは、1つのファイル(起動時に受信したリンク)のみを転送し、1つのコマンド-fetchのみに応答できます。 彼はクライアントを区別する方法を知っていますが、クライアントは1つのファイルしか取得できません。 これをどのように改善するかはもう少し書きますが、ここではテストと測定結果について説明します。
package com.coldsnipe.example; import org.junit.Test; import org.zeromq.ZMQ; import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; public class ZMQExampleTest { @Test public void testDataExchange() throws Exception { // ZMQ.Context, 1! // ZMQ.Context zmqContext = ZMQ.context(1); // final URL fileUrl = ZMQExampleTest.class.getClassLoader().getResource("testdata"); assert fileUrl != null; Path filePath = Paths.get(fileUrl.toURI()); // , // long startTime = System.nanoTime(); Thread clientThread = new Thread(new ZMQClient(zmqContext)); Thread serverThread = new Thread(new ZMQServer(zmqContext, filePath)); clientThread.start(); serverThread.start(); clientThread.join(); long estimatedTime = System.nanoTime() - startTime; float timeForTest = estimatedTime / 1000000000F; System.out.format("Elapsed time: %fs%n", timeForTest); // , // . zmqContext.term(); } }
このテストでは、ZMQ.Contextを起動し、クライアントとサーバーを起動し、データの転送にかかる時間を測定します。 コンテキストについては別に言いたい。 プロセス内でソケットを制御し、データを送信する方法とタイミングを決定するのは、隠されたコンダクターであるコンテキストです。 したがって、ここから単純なルールが続きます- プロセスごとに1つのコンテキスト 。
テストを実行し、結果を確認します。
4295 chunks received, 1073741824 bytes Elapsed time: 1.429522s
1ギガバイトの読み取りには1.42秒かかりました。 このインジケーターがどれほど優れているかを言うのは難しいですが、同じスプレーIOと比較して、ØMQは30〜40%速くなり、IOの負荷は100%に近く(スプレー85-90)、CPU負荷はほぼ3分の1になります。
プロトコルの改善
プロトコルはコマンドを1つしか認識していませんが、テストには十分ですが、実際の状況では、サーバーが多くの異なるファイルを転送し、クライアントにサービス情報を提供できるようにする必要があります。 これを行うために、2つの新しいコマンドを導入します。
- get-クライアントがデータを受信したいというメッセージで、受信したいデータを含める必要があります
- end-クライアントがデータの受信を完了した後に送信されるメッセージは、サーバーが転送されたファイルに関連付けられたリソースを解放できることを示します。
この場合のメッセージハンドラは次のようになります。
else if(command.equals("get")) { // ID String id = socket.recvStr(); // BigInteger identityString = new BigInteger(identity.getData()); DataBlob dataBlob = blobMap.get(identityString); if (dataBlob!= null){ // , DataBlob dataBlob.closeSource(); blobMap.remove(identityString); } // DataBlob dataBlob = dataProvider.getBlob(id); if (dataBlob == null) { log.error("Received wrong get call on server socket, ID [{}]", id); } else { // , DataHeader dataHeader = new DataHeader(id, dataBlob.getSize()); byte[] bytesToSend = FrameHeaderEncoder.encode(dataHeader); ZFrame frameToSend = new ZFrame(bytesToSend); // DataBlob blobMap.put(new BigInteger(identity.getData()), dataBlob); // identity.send(socket, ZFrame.MORE); frameToSend.send(socket, 0); } } else if (command.equals("end")){ BigInteger identityString = new BigInteger(identity.getData()); // DataBlob DataBlob dataBlob = blobMap.remove(identityString); if (dataBlob != null) { dataBlob.closeSource(); } }
多くのファイルを操作するために、サーバーに新しいオブジェクトを追加しました。
public interface DataProvider { public DataBlob getBlob(String dataId); } public abstract class DataBlob { private final long size; private final String dataId; protected DataBlob(long size, String dataId) { this.size = size; this.dataId= dataId; } public abstract byte[] getData(long position, int length); public abstract void closeSource(); public long getSize() { return size; } public String getDataId() { return getDataId; } }
DataProviderを実装するクラスはデータの受信を管理し、getBlobメソッドは、基本的にリソースへの参照である新しいDataBlobを返します。
ファイルのDataBlob実装は次のようになります。
public class FileDataBlob extends DataBlob { private final FileChannel fileChannel; public FileDataBlob(long size, String dataId, Path filePath) { super(size, dataId); try { this.fileChannel = FileChannel.open(filePath); } catch (IOException e) { throw new DataBlobException(e); } } @Override public byte[] getData(long position, int length) { ByteBuffer byteBuffer = ByteBuffer.allocate(length); try { fileChannel.read(byteBuffer, position); byteBuffer.flip(); byte[] bytes = new byte[length]; byteBuffer.get(bytes); return bytes; } catch (IOException e) { throw new DataBlobException(e); } } @Override public void closeSource() { try { fileChannel.close(); } catch (IOException e) { throw new DataBlobException(e); } } }
これらの2つのメソッドを追加することにより、クライアントが受信するデータの種類を選択でき、要求されたデータのサイズをクライアントに通知します。 プロトコルをさらに改善できます。たとえば、複数のファイルを連続して送信する機能を追加したり、Heartbeatを追加して、死んだクライアントや空きリソースを特定したりできます。
おわりに
この記事では、JavaでØMQを使用する方法を示したいと思いました。 現時点では、プロジェクトでは、ファイルだけでなくメタデータのメインメッセージブローカーであるØMQがかなり良い結果を示しています(まだ問題はありません)。
以下の記事では、 AkkaとSemantic Webが順番に使用されている間に 、 ArkStoreで使用される他のテクノロジーについてお話します。 ご清聴ありがとうございました。少なくともこの読み物が誰かにとって有益であることを願っています!