この記事は、 「Javaでのマルチコアプログラミング」コースを読むための準備として、Javaプログラマーが利用できるさまざまなアクターライブラリの研究中に書かれました。
オンライン教育プラットフォームudemy.comでScala for Java Developersコースも教えています(Coursera / EdXに似ています)。
これは、AkkaアクターのAPI、パフォーマンス、および実装を、モデルの問題に関する他のライブラリの実装と比較することを目的とした一連の記事の最初の記事です。 この記事では、GParsでこのような問題と解決策を提供します。
GParsはClojure用に作成されたライブラリで、さまざまな並列コンピューティングアプローチを幅広くサポートしています。
GParsの長所
- ソースコードはJavaで記述されています(AkkaはScalaで記述されています)。 「ネイティブ」プログラミング言語で「ボンネットの下にあるもの」を見るのは常に興味深い
- GParsはアプローチ(俳優、エージェント、STM、CSP、データフロー)の「動物園」です。
- GParsは、Javaで記述されたClojureランタイムライブラリのクラスを使用します。 臨検に興味がある
「インストール」GPar
Maven GParsとGroovyを接続する
<dependency> <groupId>org.codehaus.gpars</groupId> <artifactId>gpars</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>2.2.2</version> </dependency>
Mavenがなければ、リポジトリからGPars-1.1.0 ( sources )とGroovy-2.2.2 ( sources )をダウンロードして、プロジェクトに接続します。
ステートレス俳優
簡単な例から始めましょう。
アクターにメッセージを送信しています。
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("receive: " + msg); } }.start(); actor.send("Hello!"); System.in.read(); } } >> receive: Hello!
メッセージを送信して応答を待つ
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); System.out.println("pong: " + actor.sendAndWait("Hello!")); } } >> ping: Hello! >> pong: HELLO!
メッセージを送信し、非同期コールバックを切ります
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String msg) { System.out.println("ping: " + msg); getSender().send(msg.toUpperCase()); } }.start(); actor.sendAndContinue("Hello!", new MessagingRunnable<String>() { protected void doRun(String msg) { System.out.println("pong: " + msg); } }); System.in.read(); } } >> ping: Hello! >> pong: HELLO!
受信メッセージの種類ごとにパターンマッチングを行う
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> Hello! -> HELLO! >> 42 -> 1042
パターンマッチングで適切なハンドラが見つかりませんでした
import groovyx.gpars.actor.*; public class Demo { public static void main(String[] args) throws Exception { Actor actor = new DynamicDispatchActor() { public void onMessage(String arg) { getSender().send(arg.toUpperCase()); } public void onMessage(Long arg) { getSender().send(1000 + arg); } }.start(); System.out.println("42.0 -> " + actor.sendAndWait(42.0)); } } >> An exception occurred in the Actor thread Actor Thread 1 >> groovy.lang.MissingMethodException: No signature of method: >> net.golovach.Demo_4$1.onMessage() is applicable for argument types: (java.lang.Double) values: [42.0] >> Possible solutions: onMessage(java.lang.Long), onMessage(java.lang.String) >> at org.codehaus.groovy.runtime.ScriptBytecodeAdapter ... >> ...
目に見えるもの
-「パターンマッチング」は、適切なオーバーロードバージョンのonMessage(<one-arg>)メソッドを選択します。存在しない場合は、例外が発生します。
-アクターは「デーモン」スレッドのプールに基づいて動作するため、JVMが早期にシャットダウンしないように、何らかの方法でmain()メソッド(System.in.read()を使用)の操作を一時停止する必要があります。
-reply()メソッドの例では、DynamicDispatchActorから継承すると、多くのメソッドがアクターの「名前空間」に分類されることがわかります(reply、replyIfExists、getSender、terminate、...)
GParsの作成者はDynamicDispatchActorクラスのステートレスアクターの相続人を呼び出しますが、これらは変更フィールドを持つことができ、その中に状態を格納できるjavaクラスの通常のインスタンスです。 これを実証する
import groovyx.gpars.actor.*; import java.util.ArrayList; import java.util.List; public class StatelessActorTest { public static void main(String[] args) throws InterruptedException { Actor actor = new DynamicDispatchActor() { private final List<Double> state = new ArrayList<>(); public void onMessage(final Double msg) { state.add(msg); reply(state); } }.start(); System.out.println("answer: " + actor.sendAndWait(1.0)); System.out.println("answer: " + actor.sendAndWait(2.0)); System.out.println("answer: " + actor.sendAndWait(3.0)); System.out.println("answer: " + actor.sendAndWait(4.0)); System.out.println("answer: " + actor.sendAndWait(5.0)); } } >> answer: [1.0] >> answer: [1.0, 2.0] >> answer: [1.0, 2.0, 3.0] >> answer: [1.0, 2.0, 3.0, 4.0] >> answer: [1.0, 2.0, 3.0, 4.0, 5.0]
ステートフルアクター
ステートレス/ステートフル部門の紹介により、著者はステートフルアクターによりステートテンプレートの実装を有機的に作成できることを意味します。 簡単な例を見てみましょう(DefaultActorの子孫-ステートフルアクター)
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(Arrays.asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(Arrays.asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<Object>(this) { protected void doRun(final Object msg) { System.out.println("react: " + msg); } }); } }); } } } >> react: A >> react: 1.0 >> react: [1, 2, 3] >> react: B >> react: 2.0 >> react: [4, 5, 6]
ただし、Stateテンプレートの約束された実装はまったく臭いがしません。 この方法で行こう(Javaはこのようなトリックに最適な言語ではありません。Clojure/ Scalaでは、このコードははるかにコンパクトに見えます)
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new MessagingRunnable<String>(this) { protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); react(new MessagingRunnable<Double>() { protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); react(new MessagingRunnable<List<Integer>>() { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } }); } }); } }); } }); } } } >> Stage #0: A >> Stage #1: 1.0 >> Stage #2: [1, 2, 3] >> >> Stage #0: B >> Stage #1: 2.0 >> Stage #2: [4, 5, 6]
さて、またはこの匿名クラスのひどいネストを取り除き、「状態を具体化」しましょう
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import java.util.List; import static java.util.Arrays.asList; public class StatefulActorTest { public static void main(String[] args) throws Exception { Actor actor = new MyStatefulActor().start(); actor.send("A"); actor.send(1.0); actor.send(asList(1, 2, 3)); actor.send("B"); actor.send(2.0); actor.send(asList(4, 5, 6)); System.in.read(); } private static class MyStatefulActor extends DefaultActor { protected void act() { loop(new Runnable() { public void run() { react(new Stage0(MyStatefulActor.this)); } }); } } private static class Stage0 extends MessagingRunnable<String> { private final DefaultActor owner; private Stage0(DefaultActor owner) {this.owner = owner;} protected void doRun(final String msg) { System.out.println("Stage #0: " + msg); owner.react(new Stage1(owner)); } } private static class Stage1 extends MessagingRunnable<Double> { private final DefaultActor owner; private Stage1(DefaultActor owner) {this.owner = owner;} protected void doRun(final Double msg) { System.out.println(" Stage #1: " + msg); owner.react(new Stage2()); } } private static class Stage2 extends MessagingRunnable<List<Integer>> { protected void doRun(final List<Integer> msg) { System.out.println(" Stage #2: " + msg + "\n"); } } }
はい、はい、私はあなたに完全に同意します、Javaは非常に冗長な言語です。
遷移図は次のようになります(引数を分岐しませんでした)
// START // ----- // | // | // | // | +--------+ // +->| Stage0 | ---String----+ // +--------+ | // ^ v // | +--------+ // | | Stage1 | // List<Integer> +--------+ // | | // | +--------+ Double // +--| Stage2 |<-------+ // +--------+
タイマー
私の問題を解決するには、タイマーが必要になります。タイマーは、一定期間の終了を通知するようにプログラムできます。 「通常の」Javaでは、最悪でもjava.util.concurrent.ScheduledThreadPoolExecutorまたはjava.util.Timerを使用します。 しかし、私たちは俳優の世界にいます!
これは、タイムアウト付きのreact()メソッドでメッセージを待ってハングするステートフルアクターです。 この期間中にメッセージが届かない場合、GParsインフラストラクチャはActor.TIMEOUTメッセージ(これは単なる「TIMEOUT」行です)を送信し、timeoutMsgコンストラクターから作成者にメッセージを「返します」。 タイマーを「オフ」にする場合は、他のメッセージを送信します(「KILL」という文字列を送信します)
import groovyx.gpars.MessagingRunnable; import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class Timer<T> extends DefaultActor { private final long timeout; private final T timeoutMsg; private final MessageStream replyTo; public Timer(long timeout, T timeoutMsg, MessageStream replyTo) { this.timeout = timeout; this.timeoutMsg = timeoutMsg; this.replyTo = replyTo; } protected void act() { loop(new Runnable() { public void run() { react(timeout, MILLISECONDS, new MessagingRunnable() { protected void doRun(Object argument) { if (Actor.TIMEOUT.equals(argument)) { replyTo.send(timeoutMsg); } terminate(); } }); } }); } }
タイマーの使用例。
2つのタイマーtimerXとtimerYを作成します。これらはそれぞれ1000msの遅延でメッセージ「X」と「Y」を送信します。 しかし、500ms後に私は気が変わってtimerXを「釘付け」しました。
import groovyx.gpars.actor.Actor; import groovyx.gpars.actor.impl.MessageStream; public class TimerDemo { public static void main(String[] args) throws Exception { Actor timerX = new Timer<>(1000, "X", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerX send timeout message: '" + msg + "'"); return this; } }).start(); Actor timerY = new Timer<>(1000, "Y", new MessageStream() { public MessageStream send(Object msg) { System.out.println("timerY send timeout message: '" + msg + "'"); return this; } }).start(); Thread.sleep(500); timerX.send("KILL"); System.in.read(); } } >> timerY send timeout message: 'Y'
問題文と解決策
次の非常に一般的な問題を考慮してください。
1.かなりの頻度で何らかの機能を引き起こす多くのスレッドがあります。
2.この関数には2つのオプションがあります。1つの引数の処理と引数のリストの処理です。
3.この関数は、引数リストの処理が、個々の処理の合計より少ないシステムリソースを消費するようなものです。
4.タスクは、フローと関数の間にバッチャーを配置し、フローから引数を「バンドル」に収集し、関数に渡し、リストを処理し、バッチャーが送信者フローに結果を「分配」します。
5.バッチャーは、2つの場合に引数のリストを渡します。十分なサイズの「バンドル」を収集したか、完全な「バンドル」を収集できなかったタイムアウト期間の後、スレッドが結果を返すときです。
ソリューションスキームを見てみましょう。
タイムアウト100ミリ秒、「バンドル」の最大サイズ-3つの引数
時間0で、フローT-0は引数「A」を送信します。 バッチャーは「クリーン」状態、世代0です
//time:0 // // T-0 --"A"-----> +-------+ generationId=0 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
しばらくすると、Batcherは「A」を短くしてT-0に戻す必要があることを認識します。 世代0のタイマーが開始しました
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:0.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0]
時間25ミリ秒で、T-1ストリームは処理のために「B」を送信します
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25 +-----+ // // T-0 +-------+ generationId=0 // T-1 ---"B"----> |Batcher| argList=["A"] // T-2 +-------+ replyToList=[T-0]
しばらくすると、Batcherは、「A」と「B」を短くして、フローをT-0とT-1に戻す必要があることを認識します
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:25.001 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 +-------+ replyToList=[T-0,T-1]
50ミリ秒の時点で、T-2ストリームは処理のために「C」を送信します
// +-----+ timeoutMsg=0 // |Timer| timeout=100 //time:50 +-----+ // // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B"] // T-2 ----"C"---> +-------+ replyToList=[T-0,T-1]
しばらくすると、Batcherは、「A」、「B」、および「C」を計算し、それをフローT-0、T-1、およびT-2に戻す必要があることを認識します。 「バンドル」が一杯で、タイマーが「殺す」ことがわかります
// +-----+ timeoutMsg=0 // +-"KILL"->|Timer| timeout=100 //time:50.001 | +-----+ // | // T-0 +-------+ generationId=0 // T-1 |Batcher| argList=["A","B","C"] // T-2 +-------+ replyToList=[T-0,T-1,T-2]
しばらくすると、Batcherは計算用のデータを別のアクター(匿名)に渡し、状態をクリアして、世代を0から1に変更します
//time:50.002 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["A","B","C"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+
しばらくすると(「ストーリーボード」の場合、計算は瞬時であると想定します)、匿名のアクターが引数のリストに対してアクションを実行します[「A」、「B」、「C」]-> [「res#A」、「res#B」、 res#C "]
//time:50.003 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#A","res#B","res#B"] // |anonymous| replyToList=[T-0,T-1,T-2] // +---------+
しばらくすると、匿名のアクターが計算結果をスレッドに配布します
//time:50.004 // // T-0 <-----------+ +-------+ generationId=1 // T-1 <---------+ | |Batcher| argList=[] // T-2 <-------+ | | +-------+ replyToList=[] // | | | // | | +---"res#A"--- +---------+ // | +---"res#B"----- |anonymous| // +--"res#C"-------- +---------+
しばらくすると、システムは元の「純粋な」状態に戻ります
//time:50.005 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
その後、時間75で、T-2ストリームは「D」の計算に渡されます。
//time:75 // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=[] // T-2 ----"D"---> +-------+ replyToList=[]
しばらくすると、Batcherは「D」を短くしてT-2ストリームに戻す必要があることを認識し、さらに第1世代のタイマーが開始されました。
// +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:75.001 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
100ミリ秒後(175ミリ秒)、GParsインフラストラクチャはタイマーに待機期間の満了を通知します
// +--"TIMEOUT"-- // | // v // +-----+ timeoutMsg=1 // |Timer| timeout=100 //time:175 +-----+ // // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
しばらくすると、タイマーはBatcherに第1世代がタイムアウトしたことを通知し、terminate()を呼び出して自殺します。
// +-----+ timeoutMsg=1 // +----1-----|Timer| timeout=100 //time:175.001 | +-----+ // v // T-0 +-------+ generationId=1 // T-1 |Batcher| argList=["D"] // T-2 +-------+ replyToList=[T-2]
引数リスト(引数が1つしかない)で計算を実行する匿名アクターが作成されます。 ジェネレーション1から2への変更
//time:175.002 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ argList=["D"] // |anonymous| replyToList=[T-2] // +---------+
俳優は仕事をしました
//time:175.003 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[] // // +---------+ resultList=["res#D"] // |anonymous| replyToList=[T-2] // +---------+
俳優は結果を与える
//time:175.004 // // T-0 +-------+generationId=2 // T-1 |Batcher|argList=[] // T-2 <-------+ +-------+replyToList=[] // | // | +---------+ // +--"res#C"----- |anonymous| // +---------+
元の「純粋な」状態のシステム
//time:175.005 // // T-0 +-------+ generationId=2 // T-1 |Batcher| argList=[] // T-2 +-------+ replyToList=[]
問題解決
BatchProcessor-「関数」のインターフェース。 「バッチモード」処理
import java.util.List; public interface BatchProcessor<ARG, RES> { List<RES> onBatch(List<ARG> argList) throws Exception; }
Batcher-引数を「パック」するクラス。 コアソリューション
import groovyx.gpars.actor.*; import groovyx.gpars.actor.impl.MessageStream; import java.util.*; public class Batcher<ARG, RES> extends DynamicDispatchActor { // fixed parameters private final BatchProcessor<ARG, RES> processor; private final int maxBatchSize; private final long batchWaitTimeout; // current state private final List<ARG> argList = new ArrayList<>(); private final List<MessageStream> replyToList = new ArrayList<>(); private long generationId = 0; private Actor lastTimer; public Batcher(BatchProcessor<ARG, RES> processor, int maxBatchSize, long batchWaitTimeout) { this.processor = processor; this.maxBatchSize = maxBatchSize; this.batchWaitTimeout = batchWaitTimeout; } public void onMessage(final ARG elem) { argList.add(elem); replyToList.add(getSender()); if (argList.size() == 1) { lastTimer = new Timer<>(batchWaitTimeout, ++generationId, this).start(); } else if (argList.size() == maxBatchSize) { lastTimer.send("KILL"); lastTimer = null; nextGeneration(); } } public void onMessage(final long timeOutId) { if (generationId == timeOutId) {nextGeneration();} } private void nextGeneration() { new DynamicDispatchActor() { public void onMessage(final Work<ARG, RES> work) throws Exception { List<RES> resultList = work.batcher.onBatch(work.argList); for (int k = 0; k < resultList.size(); k++) { work.replyToList.get(k).send(resultList.get(k)); } terminate(); } }.start().send(new Work<>(processor, new ArrayList<>(argList), new ArrayList<>(replyToList))); argList.clear(); replyToList.clear(); generationId = generationId + 1; } private static class Work<ARG, RES> { public final BatchProcessor<ARG, RES> batcher; public final List<ARG> argList; public final List<MessageStream> replyToList; public Work(BatchProcessor<ARG, RES> batcher, List<ARG> argList, List<MessageStream> replyToList) { this.batcher = batcher; this.argList = argList; this.replyToList = replyToList; } } }
BatcherDemoは、Batcherクラスのデモです。 回路図と同じ
import groovyx.gpars.actor.Actor; import java.io.IOException; import java.util.*; import java.util.concurrent.*; import static java.util.concurrent.Executors.newCachedThreadPool; public class BatcherDemo { public static final int BATCH_SIZE = 3; public static final long BATCH_TIMEOUT = 100; public static void main(String[] args) throws InterruptedException, IOException { final Actor actor = new Batcher<>(new BatchProcessor<String, String>() { public List<String> onBatch(List<String> argList) { System.out.println("onBatch(" + argList + ")"); ArrayList<String> result = new ArrayList<>(argList.size()); for (String arg : argList) { result.add("res#" + arg); } return result; } }, BATCH_SIZE, BATCH_TIMEOUT).start(); ExecutorService exec = newCachedThreadPool(); exec.submit(new Callable<Void>() { // T-0 public Void call() throws Exception { System.out.println(actor.sendAndWait(("A"))); return null; } }); exec.submit(new Callable<Void>() { // T-1 public Void call() throws Exception { Thread.sleep(25); System.out.println(actor.sendAndWait(("B"))); return null; } }); exec.submit(new Callable<Void>() { // T-2 public Void call() throws Exception { Thread.sleep(50); System.out.println(actor.sendAndWait(("C"))); Thread.sleep(25); System.out.println(actor.sendAndWait(("D"))); return null; } }); exec.shutdown(); } } >> onBatch([A, B, C]) >> res#A >> res#B >> res#C >> onBatch([D]) >> res#D
おわりに
私の意見では、アクターはマルチスレッドプリミティブのプログラミングに適しています。これは、特に遷移引数に依存する複雑な遷移図を持つ有限状態マシンです。
この記事の例には、 gpars.org/guideなど、さまざまな場所でオンラインで見つかったコードのバリエーションがあります。
第二部では
- 提案されたソリューションの速度を測定します
- 個々のトランザクションからのさまざまなスレッドからの要求を1つの大きなRDBMSトランザクションに結合することにより、JDBCとの作業を加速します。 つまり、同じConnection内ではなく、異なるConnection間でバッチを実行します。
UPD
フューリーコメントをありがとう:
GParsはJava + Groovyのミックスで書かれています。
ソースコードは、Groovyパッケージが記述されていることを示しています
-groovyx.gpars.csp *
-groovyx.gpars.pa。*
-groovyx.gpars *(部分的に)
連絡先
Javaトレーニングをオンラインで行い( プログラミングコースはこちら) 、Java Coreコースの再設計の一環としてトレーニング資料の一部を公開しています 。 この記事では、視聴者の講義のビデオ録画をyoutubeチャンネルで見ることができます。おそらく、 チャンネルのビデオがより体系化されています 。
スカイプ:GolovachCourses
メール:GolovachCourses@gmail.com