JDKコンカレントパッケージ

現在Javaに存在するメモリモデルは、このコードにスレッドレースがない場合に、マルチスレッドコードの予想される実行順序を保証します。 また、コードを競合から保護するために、コード間でデータを同期および交換するさまざまな方法が考案されました。



HotSpot JDKに含まれているjava.util.concurrent



パッケージは、マルチスレッドコードを記述するための次のツールを提供します。





アトミック


java.util.concurrent.atomic



子パッケージには、プリミティブ型のアトミックな作業のためのクラスのセットが含まれています。 これらのクラスのコントラクトは、「1プロセッサ時間単位」のcompare-and-set



操作のパフォーマンスを保証します。 この変数に新しい値を設定する場合、古い値も渡します(楽観的ロックのアプローチ)。 メソッドが呼び出された時点から、変数の値が予想された値と異なる場合、実行結果はfalse



になりfalse







たとえば、2つのlong



変数の配列[1,2,3,4,5]



および[-1,-2,-3,-4,-5]



ます。 各スレッドは配列に対して順次反復され、要素を1つの変数にまとめます。 悲観的なロックを使用したコード( groovy )は次のようになります。



 class Sum { static monitor = new Object() static volatile long sum = 0 } class Summer implements Callable { long[] data Object call() throws Exception { data.each { synchronized (Sum.monitor) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") Sum.sum += it } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
      
      







実行結果が期待されます。



 pool-1-thread-1: add 1 to 0 pool-1-thread-2: add -1 to 1 pool-1-thread-1: add 2 to 0 pool-1-thread-2: add -2 to 2 pool-1-thread-1: add 3 to 0 pool-1-thread-2: add -3 to 3 pool-1-thread-1: add 4 to 0 pool-1-thread-1: add 5 to 4 pool-1-thread-2: add -4 to 9 pool-1-thread-2: add -5 to 5 Sum: 0
      
      







ただし、このアプローチにはパフォーマンス上の重大な欠点があります。 この場合、役に立たない作業が増えると、役に立つよりも多くのリソースが必要になります。





同じ量を計算する場合、 AtomicLong



を使用して楽観的ロックを実装することを検討してください。



 class Sum { static volatile AtomicLong sum = new AtomicLong(0) } class Summer implements Callable { long[] data Object call() throws Exception { data.each { while(true) { long localSum = Sum.sum.get() if (Sum.sum.compareAndSet(localSum, localSum + it)) { println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}") break; } else { println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}") } } } } } Executors.newFixedThreadPool(2).invokeAll([ new Summer(data: [1,2,3,4,5]), new Summer(data: [-1,-2,-3,-4,-5]) ]) print("Sum: ${Sum.sum}")
      
      







「誤った」試みの結果からわかるように、それほど多くはありませんでした。



 [MISS!] pool-1-thread-1: add 1 to -1 pool-1-thread-2: add -1 to -1 pool-1-thread-2: add -2 to -3 [MISS!] pool-1-thread-1: add 1 to -3 pool-1-thread-2: add -3 to -6 pool-1-thread-1: add 1 to -5 [MISS!] pool-1-thread-2: add -4 to -5 pool-1-thread-1: add 2 to -7 pool-1-thread-2: add -4 to -7 pool-1-thread-1: add 3 to -9 pool-1-thread-2: add -5 to -9 pool-1-thread-1: add 4 to -5 pool-1-thread-1: add 5 to 0 Sum: 0
      
      







楽観的ロックを使用することを決定する場合、変更される変数を使用したアクションにあまり時間がかからないことが重要です。 このアクションが長ければ長いほど、誤ったcompare-and-set



が頻繁に発生し、このアクションを再度実行しなければならない頻度が高くなります。



compare-and-set



に基づいて、非ブロッキング読み取りロックも実装できます。 この場合、アトミック変数には、処理中のオブジェクトのバージョンが格納されます。 計算前にバージョン値を受け取ったので、計算後に検証できます。 通常のread-write



ロックは、バージョン検証が失敗した場合にのみ有効になります。



 class Transaction { long debit } class Account { AtomicLong version = new AtomicLong() ReadWriteLock readWriteLock = new ReentrantReadWriteLock() List<Transaction> transactions = new ArrayList<Transaction>() } long balance(Account account) { ReentrantReadWriteLock.ReadLock locked while(true) { long balance = 0 long version = account.version.get() account.transactions.each {balance += it.debit} //volatile write for JMM if (account.version.compareAndSet(version, version)) { if (locked) {locked.unlock()} return balance } else { locked = account.readWriteLock.readLock() } } } void modifyTransaction(Account account, int position, long newDebit) { def writeLock = account.readWriteLock.writeLock() account.version.incrementAndGet() account.transactions[position].debit = newDebit writeLock.unlock() }
      
      







ロック


再入可能ロック


同期ロックとは異なり、 ReentrantLock



使用ReentrantLock



と、ロックの取り外しと受け取りのReentrantLock



をより柔軟に選択できます。 通常のJava呼び出しを使用します。 また、 ReentrantLock



使用すると、ロックの現在の状態に関する情報を取得したり、一定時間ロックを「待機」したりできます。 単一のスレッドのロックの正しい再帰的な取得と解放をサポートします。 正直なロックが必要な場合(モニターのキャプチャ時に順序を維持する) ReentrantLock



もこのメカニズムが備わっています。



ReentrantLock



ロックとReentrantLock



ロックは非常に似ているという事実にもかかわらず、JVMレベルでの実装はまったく異なります。

JMMの詳細に入らない場合:JVMが提供する同期ロックの代わりにReentrantLock



を使用することは、モニターのスレッドの戦いが頻繁にある場合にのみ価値があります。 1つのスレッドのみが同期メソッドにアクセスする場合、ReentrantLockパフォーマンスReentrantLock



JVMロックメカニズムよりも劣ります。



ReentrantReadWriteLock


ReentrantLock



プロパティReentrantLock



補完ReentrantLock



多くの読み取りおよび書き込みロックをキャプチャReentrantLock



機能を追加します。 必要に応じて、読み取りロックの前に書き込みロックを「下げる」ことができます。



StampedLock _jdk 1.8_


楽観的および悲観的読み取り/書き込みロックを実装し、さらに増加または減少する可能性があります。 楽観的ロックは、ロックの「スタンプ」( javadoc )によって実装されます。



 double distanceFromOriginV1() { // A read-only method long stamp; if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic double currentX = x; double currentY = y; if (sl.validate(stamp)) return Math.sqrt(currentX * currentX + currentY * currentY); } stamp = sl.readLock(); // fall back to read lock try { double currentX = x; double currentY = y; return Math.sqrt(currentX * currentX + currentY * currentY); } finally { sl.unlockRead(stamp); } }
      
      







コレクション


ArrayBlockingQueue


あるスレッドから別のスレッドにメッセージを送信するための正直なキュー。 ブロック( put()



take()



)および非ブロック( offer()



pool()



)メソッドをサポートします。 null値を禁止します。 キューの容量は、作成時に指定する必要があります。



並行ハッシュマップ


hash



関数に基づくキーと値の構造。 読み取りロックはありません。 記録時には、カードの一部(セグメント)のみがブロックされます。 セグメントの数は、 concurrencyLevel



最も近い次数2に制限されconcurrencyLevel







ConcurrentSkipListMap


バランスの取れたマルチスレッドのキーと値の構造(O(log n))。 検索はスキップされたリストに基づいています。 カードはキーを比較できる必要があります。



ConcurrentSkipListSet


値のないConcurrentSkipListMap







CopyOnWriteArrayList


書き込みブロック、非読み取りブロックのリスト。 変更すると、メモリ内に配列の新しいインスタンスが作成されます。



CopyOnWriteArraySet


値なしのCopyOnWriteArrayList







遅延キュー


PriorityBlockingQueue



、特定の遅延(オブジェクトのDelayed



インターフェイスを介して宣言された遅延)の後にのみ要素を受信できるようにします。 DelayQueue



を使用して、スケジューラを実装できます。 キューの容量は固定されていません。



LinkedBlockingDeque


接続性に基づいた双方向BlockingQueue



(キャッシュミスとキャッシュコヒーレンスのオーバーヘッド)。 キューの容量は固定されていません。



LinkedBlockingQueue


接続性に基づく単方向BlockingQueue



(キャッシュミスとキャッシュコヒーレンスのオーバーヘッド)。 キューの容量は固定されていません。



LinkedTransferQueue


接続性に基づく単方向の `BlockingQueue`(キャッシュミスとキャッシュコヒーレンスのオーバーヘッド)。 キューの容量は固定されていません。 このキューにより、ハンドラーが要素を取得するのを待つことができます。



PriorityBlockingQueue


(要素の比較を通じて)メッセージに優先順位を付けることができる、単方向の `BlockingQueue`。 null値を禁止します。



SynchronousQueue


put()



メソッドのtransfer()



ロジックを実装する単方向の `BlockingQueue`。



同期ポイント


CountDownLatch


countDown()



への特定の(またはそれ以上の)呼び出し回数を期待するバリア( await()



countDown()



。 バリアの状態はリセットできません。



CyclicBarrier


他のスレッドによる特定の数のawait()



呼び出しを期待するバリア( await()



)。 スレッドの数が指定に達すると、オプションのコールバックが呼び出され、ロックが解除されます。 バリアは、保留中のフローが解放されて再利用できるようになると、状態を初期状態にリセットします。



交換機


2つのスレッドを同期するためのバリア( `exchange()`)。 同期時には、スレッド間でオブジェクトを揮発的に転送できます。



フェイザー


拡張機能「CyclicBarrier」。バリアの各サイクルの参加者を登録および削除できます。



セマフォ


指定された数のスレッドのみがモニターをキャプチャできるようにするバリア。 基本的に、「ロック」の機能をブロック内の機能を複数のスレッドに拡張します。



執行者




ExecutorService



new Thread(runnable)



を置き換えて、スレッドでの作業を簡素化しました。 ExecutorService



は、解放されたスレッドを再利用し、スレッドプールのタスクからキューを編成し、タスクの結果をサブスクライブするのに役立ちます。 Runnable



インターフェースの代わりに、プールはCallable



インターフェースを使用します(結果を返し、エラーをスローできます)。



 ExecutorService pool = Executors.newFixedThreadPool(4) Future future = pool.submit(new Callable() { Object call() throws Exception { println("In thread") return "From thread" } }) println("From main") println(future.get()) try { pool.submit(new Callable() { Object call() throws Exception { throw new IllegalStateException() } }).get() } catch (ExecutionException e) {println("Got it: ${e.cause}")} pool.shutdown()
      
      







invokeAll



メソッドは、すべてのタスクの完了時にのみ呼び出しスレッドに制御を渡します。 invokeAny



メソッドは、最初に正常に完了したタスクの結果を返し、後続のタスクをすべてキャンセルします。



ThreadPoolExecutor


プール内の作業スレッドとスレッドの最大数、タスクのキューを指定する機能を備えたスレッドのプール。



ScheduledThreadPoolExecutor


ThreadPoolExecutor



の機能を拡張し、遅延または定期的にタスクを実行します。



ThreadPoolExecutor


自己複製タスク用の軽いスレッドプール。 プールは、親の子タスクの `fork()`および `join()`メソッドの呼び出しを期待しています。



 class LNode { List<LNode> childs = [] def object } class Finder extends RecursiveTask<LNode> { LNode node Object expect protected LNode compute() { if (node?.object?.equals(expect)) { return node } node?.childs?.collect { new Finder(node: it, expect: expect).fork() }?.collect { it.join() }?.find { it != null } } } ForkJoinPool es = new ForkJoinPool() def invoke = es.invoke(new Finder( node: new LNode( childs: [ new LNode(object: "ivalid"), new LNode( object: "ivalid", childs: [new LNode(object: "test")] ) ] ), expect: "test" )) print("${invoke?.object}")
      
      







アキュムレータ_jdk 1.8_


バッテリーを使用すると、CASを使用せずにマルチスレッド環境で数値要素に対してプリミティブ操作(合計/最大値の検索)を実行できます。



All Articles