並列処理の主な概念の分析

みんなのコーヒー!



明日は、ほぼ記念日ストリームの「Java開発者」コースをスムーズに開始します-昨年4月以来6回連続です。 そしてこれは、私たちがあなたと共有する最も興味深い資料を再び選択し、翻訳したことを意味します。



行こう!



このガイドは、マルチスレッドプログラムを使用するJava開発者が、並行性の基本概念とその使用方法を理解するのに役立ちます。 標準ライブラリへのリンクを使用して、Java言語の重要な側面について学習します。



セクション1



エントリー



Javaは当初から、スレッドやロックなどの主要な並行性の概念をサポートしています。 このガイドは、マルチスレッドプログラムを使用するJava開発者が、並行性の基本概念とその使用方法を理解するのに役立ちます。



セクション2



コンセプトの



コンセプト 説明
原子性(原子性) アトミック操作は、完全に実行されるかまったく実行されない操作であり、部分的な実行は不可能です。
可視性 あるスレッドが別のスレッドによって行われた変更を見る条件


表1:同時実行性の概念







競合状態



競合状態は、同じリソースが複数のスレッドで同時に使用される場合に発生し、各スレッドのアクションの順序によっては、いくつかの結果が得られる場合があります。 以下のコードはスレッドセーフではなく、フィールドを遅延初期化するcheck-then-act



null



を確認してから初期化する)はアトミックではないため、 value



変数は複数回初期化できます。



 class Lazy <T> { private volatile T value; T get() { if (value == null) value = initialize(); return value; } }
      
      





データ競合



データの競合は、2つ以上のスレッドが同期せずに同じ非最終変数にアクセスしようとしたときに発生します。 同期の欠如は他のスレッドには見えない変更をもたらし、古いデータの読み取りを引き起こし、無限ループ、データ構造の損傷、または不正確な計算を引き起こす可能性があります。 このコードは、読み取りストリームがスレッドの書き換えによる変更に気付かない場合があるため、無限ループにつながる可能性があります。



 class Waiter implements Runnable { private boolean shouldFinish; void finish() { shouldFinish = true; } public void run() { long iteration = 0; while (!shouldFinish) { iteration++; } System.out.println("Finished after: " + iteration); } } class DataRace { public static void main(String[] args) throws InterruptedException { Waiter waiter = new Waiter(); Thread waiterThread = new Thread(waiter); waiterThread.start(); waiter.finish(); waiterThread.join(); } }
      
      





セクション3



Javaメモリモデル:偶然の関係



Javaメモリモデルは、フィールドの読み取り/書き込みやモニターでの同期などのアクションに関して定義されます。 アクションは、事前に発生する関係(以前に実行される)を使用して順序付けられます。これは、スレッドが別のスレッドのアクションの結果を確認したタイミングと、正しく同期されたプログラムを説明するために使用できます。



HAPPENS-BEFOREリレーションズには以下のプロパティがあります:





イメージ1では、 Action X



Action Y



前に発生するため、 Thread 2



では、 Action Y



の右側にあるすべての操作に、 Thread 1



Action X



の左側にあるすべての操作が表示されます。





画像1:前の例





セクション4



標準同期機能



synchronized



キーワード




synchronized



、異なるスレッドが同じコードブロックを同時に実行しないようにするために使用されます。 (同期ブロックに入ることで)ロックを受け取った場合、このロックが適用されるデータは排他モードで処理されるため、操作はアトミックと見なされます。 また、同じロックを受け取った後、他のスレッドが操作の結果を確認できるようにします。



 class AtomicOperation { private int counter0; private int counter1; void increment() { synchronized (this) { counter0++; counter1++; } } }
      
      





synchronizedキーワードは、メソッドレベルで展開することもできます。



方法の種類 モニターとして使用されるリンク
静的 Classオブジェクトへの参照<?>
非静的 このリンク


表2:メソッド全体が同期されるときに使用されるモニター



ロックは再入可能であるため、スレッドにすでにロックが含まれている場合は、再びロックを正常に取得できます。



 class Reentrantcy { synchronized void doAll() { doFirst(); doSecond(); } synchronized void doFirst() { System.out.println("First operation is successful."); } synchronized void doSecond() { System.out.println("Second operation is successful."); } }
      
      





競合のレベルは、モニターのキャプチャ方法に影響します。



状態 説明
初期化 誰も捕獲されなくなるまで作成されました。
偏った 戦いはなく、ロックによって保護されたコードは1つのスレッドのみによって実行されます。 キャプチャするのに最も安い。
薄い モニターは、戦いなしで複数のスレッドによってキャプチャされます。 ロックには比較的安価なCASが使用されます。
太い 闘争があります。 JVMはOSミューテックスを要求し、OSスケジューラがスレッドパーキングとウェイクアップを処理できるようにします。


表3:モニターステータス



wait/notify







wait/notify/notifyAll



は、 Object



クラスで宣言されています。 wait



、スレッドを強制的にWAITING



またはTIMED_WAITING



に移行させるために使用されます(タイムアウト値が渡された場合)。 スレッドを起動するには、次のアクションのいずれかを実行できます。





最も一般的な例は条件付きループです。



 class ConditionLoop { private boolean condition; synchronized void waitForCondition() throws InterruptedException { while (!condition) { wait(); } } synchronized void satisfyCondition() { condition = true; notifyAll(); } }
      
      







キーワードvolatile







volatile



可視性の問題を解決し、値の変更をアトミックにします。これは、発生前の関係があるためです。volatile変数への書き込みは、volatile変数の後続の読み取りの前に発生します。 したがって、次回フィールドが読み取られるときに、最新のレコードによって設定された値が表示されるようになります。



 class VolatileFlag implements Runnable { private volatile boolean shouldStop; public void run() { while (!shouldStop) { //do smth } System.out.println("Stopped."); } void stop() { shouldStop = true; } public static void main(String[] args) throws InterruptedException { VolatileFlag flag = new VolatileFlag(); Thread thread = new Thread(flag); thread.start(); flag.stop(); thread.join(); } }
      
      





原子性



java.util.concurrent.atomic



パッケージには、 volatile



ように、ロックせずに単一の値で複合アトミックアクションをサポートするクラスのセットが含まれています。



AtomicXXXクラスを使用して、アトミックcheck-then-act



ザントcheck-then-act



操作を実装できます。



 class CheckThenAct { private final AtomicReference<String> value = new AtomicReference<>(); void initialize() { if (value.compareAndSet(null, "Initialized value")) { System.out.println("Initialized only once."); } } }
      
      





AtomicInteger



AtomicLong



両方にアトミックなインクリメント/デクリメント操作があります:



 class Increment { private final AtomicInteger state = new AtomicInteger(); void advance() { int oldState = state.getAndIncrement(); System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'."); } }
      
      





カウンターが必要で、アトミックに値を取得する必要がない場合は、 AtomicLong/AtomicInteger



代わりにAtomicLong/AtomicInteger



使用を検討してAtomicLong/AtomicInteger



LongAdder



は、いくつかのセルの値を処理し、必要に応じてその数を増やします。したがって、競争が激しい場合に効果的です。



ThreadLocal







データをストリームに保存し、ロックをオプションにする1つの方法は、 ThreadLocal



ストレージを使用することです。 概念的に、 ThreadLocal



は、各スレッドが独自のバージョンの変数を持っているかのように機能します。 ThreadLocal



通常、「現在のトランザクション」などの各スレッドの値やその他のリソースをキャプチャするために使用されます。 さらに、フローカウンタ、統計、または識別子ジェネレータを維持するために使用されます。



 class TransactionManager { private final ThreadLocal<Transaction> currentTransaction = ThreadLocal.withInitial(NullTransaction::new); Transaction currentTransaction() { Transaction current = currentTransaction.get(); if (current.isNull()) { current = new TransactionImpl(); currentTransaction.set(current); } return current; } }
      
      





セクション5



安全な公開



オブジェクトを公開すると、現在のスコープ外でリンクが利用可能になります(たとえば、ゲッターからリンクを返す)。 オブジェクトを安全に公開するには(完全に作成された場合のみ)、同期が必要になる場合があります。 パブリケーションのセキュリティは、次を使用して実現できます。





 class StaticInitializer { //       public static final Year year = Year.of(2017); public static final Set<String> keywords; //        static { //    Set<String> keywordsSet = new HashSet<>(); //   keywordsSet.add("java"); keywordsSet.add("concurrency"); //    keywords = Collections.unmodifiableSet(keywordsSet); } }
      
      







 class Volatile { private volatile String state; void setState(String state) { this.state = state; } String getState() { return state; } }
      
      







 class Atomics { private final AtomicInteger state = new AtomicInteger(); void initializeState(int state) { this.state.compareAndSet(0, state); } int getState() { return state.get(); } }
      
      







 class Final { private final String state; Final(String state) { this.state = state; } String getState() { return state; } }
      
      





このリンクが作成中に蒸発しないことを確認してください。



 class ThisEscapes { private final String name; ThisEscapes(String name) { Cache.putIntoCache(this); this.name = name; } String getName() { return name; } } class Cache { private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>(); static void putIntoCache(ThisEscapes thisEscapes) { // 'this'   ,    . CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes); } }
      
      







 class Synchronization { private String state; synchronized String getState() { if (state == null) state = "Initial"; return state; } }
      
      





セクション6



不変オブジェクト



不変オブジェクトの最も注目すべき特性の1つはスレッドの安全性です 。したがって、オブジェクトの同期は不要です。 不変オブジェクトの要件:





不変オブジェクトの例:



 //   final -   public final class Artist { //  ,  final private final String name; //   , final  private final List<Track> tracks; public Artist(String name, List<Track> tracks) { this.name = name; //   List<Track> copy = new ArrayList<>(tracks); //      this.tracks = Collections.unmodifiableList(copy); // 'this'       } // Getters, equals, hashCode, toString } //  final -   public final class Track { // ,  final private final String title; public Track(String title) { this.title = title; } // Getters, equals, hashCode, toString }
      
      





セクション7



ストリーム



java.lang.Thread



クラスは、アプリケーションまたはJVMスレッドを表すために使用されます。 コードは、常にいくつかのThreadクラスのコンテキストで実行されます(現在のスレッドを取得するには、 Thread#currentThread()).



使用できますThread#currentThread()).







状態 説明
新品 開始しませんでした。
実行可能 稼働しています。
ブロックされた モニターで待機中-彼はロックを取得して、クリティカルセクションに入ろうとしています。
待っています 特定のアクションが別のスレッドによって実行されるのを待っています(notify / notifyAll、LockSupport#unpark)。
TIMED_WAITING WAITINGと同じですが、タイムアウトがあります。
終了しました 止まった


表4:ストリームの状態



ストリーム方式 説明
始める Threadクラスのインスタンスを開始し、run()メソッドを実行します。
参加する ストリームの終わりまでブロックします。
割り込む ストリームを中断します。 割り込みに応答するメソッドでスレッドがブロックされている場合、別のスレッドでInterruptedExceptionがスローされます。そうでない場合は、割り込みステータスが設定されます。
停止、一時停止、再開、破棄 これらの方法はすべて時代遅れです。 問題のフローの状態に応じて、危険な操作を実行します。 代わりに、Thread#interrupt()またはvolatileフラグを使用して、スレッドに何をすべきかを伝えます


表5:スレッド調整方法スレッド調整方法



InterruptedExceptionを処理する方法は?





予期しない例外の処理



UncaughtExceptionHandler



UncaughtExceptionHandler



が示される場合があり、スレッドが中断されるため、キャッチされない例外の通知を受け取ります。



 Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler((failedThread, exception) -> { logger.error("Caught unexpected exception in thread '{}'.", failedThread.getName(), exception); }); thread.start();
      
      





セクション8



活力(活力)



Deadlock







Deadlock



、またはデッドロックは、複数のスレッドがあり、それぞれが別のスレッドに属するリソースを予期している場合に発生します。そのため、リソースとそれらを待機しているスレッドからループが形成されます。 最も明白なタイプのリソースはオブジェクトモニターですが、ロックを引き起こすリソース(たとえば、 wait/notify



)も適切です。



潜在的なデッドロックの例:



 class Account { private long amount; void plus(long amount) { this.amount += amount; } void minus(long amount) { if (this.amount < amount) throw new IllegalArgumentException(); else this.amount -= amount; } static void transferWithDeadlock(long amount, Account first, Account second){ synchronized (first) { synchronized (second) { first.minus(amount); second.plus(amount); } } } }
      
      





同時に次の場合、相互ロックが発生します。





デッドロックを防ぐ方法:





 class Account { private long id; private long amount; //    static void transferWithLockOrdering(long amount, Account first, Account second){ boolean lockOnFirstAccountFirst = first.id < second.id; Account firstLock = lockOnFirstAccountFirst ? first : second; Account secondLock = lockOnFirstAccountFirst ? second : first; synchronized (firstLock) { synchronized (secondLock) { first.minus(amount); second.plus(amount); } } } }
      
      







 class Account { private long amount; //    static void transferWithTimeout( long amount, Account first, Account second, int retries, long timeoutMillis ) throws InterruptedException { for (int attempt = 0; attempt < retries; attempt++) { if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) { try { if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) { try { first.minus(amount); second.plus(amount); } finally { second.lock.unlock(); } } } finally { first.lock.unlock(); } } } } }
      
      





JVMはモニターの相互ロックを検出し、モニターに関する情報をストリームダンプに表示できます。



LivelockおよびStream Fasting



ライブロックは、スレッドがリソースへのアクセスのネゴシエーションに全時間を費やすか、スレッドが実際に前進しないようにデッドロックを発見して回避するときに発生します。 絶食は、スレッドが長期間ブロックし続けると発生するため、一部のスレッドは進行せずに「star死」します。



セクション9



java.util.concurrent







スレッドプール



スレッドプールのメインインターフェイスはExecutorService.java.util.concurrent



また、最も一般的な構成でスレッドプールを作成するためのファクトリメソッドを含む静的Executorsファクトリも提供します。



方法 説明
newSingleThreadExecutor 1つのスレッドのみを持つExecutorServiceを返します。
newFixedThreadPool スレッドの数が固定されたExecutorServiceを返します。
newCachedThreadPool さまざまなサイズのスレッドのプールを持つExecutorServiceを返します。
newSingleThreadScheduledExecutor 単一のスレッドでScheduledExecutorServiceを返します。
newScheduledThreadPool スレッドのメインセットを含むScheduledExecutorServiceを返します。
newWorkStealingPool キャッシングタスクExecutorServiceを返します。


表6:静的ファクトリメソッド



スレッドプールのサイズを決定するとき、アプリケーションが実行されているマシンの論理コアの数のサイズを決定することはしばしば有用です。 Runtime.getRuntime().AvailableProcessors()



呼び出すことにより、Javaでこの値を取得できます。



実装 説明
ThreadPoolExecutor デフォルトの実装では、サイズ変更スレッドプール、1つの作業キュー、拒否されたタスク(RejectedExecutionHandler経由)およびスレッド作成(ThreadFactory経由)のカスタムポリシーが使用されます。
ScheduledThreadPoolExecutor 定期的なタスクをスケジュールする機能を提供する拡張ThreadPoolExecutor。
フォークジョインプール タスクを盗むプール:プール内のすべてのスレッドは、割り当てられたタスクまたは他のアクティブなタスクによって作成されたタスクを見つけて実行しようとします。


表7:スレッドプールの実装



タスクは、 ExecutorService#submit



ExecutorService#invokeAll



またはExecutorService#invokeAny



を使用して送信されます。これらには、さまざまなタイプのタスク用のオーバーロードがいくつかあります。



インターフェース 説明
実行可能 戻り値のないタスクを表します。
呼び出し可能 戻り値を持つ計算を表します。 また、元の例外をスローするため、チェック済み例外のラッパーは必要ありません。


表8:機能タスクインターフェイス



Future







Future



は非同期コンピューティングの抽象化です。 計算結果または例外のいずれかで、いつでも利用可能な計算の結果を表します。 ほとんどのExecutorService



メソッドは、 Future



を戻り型として使用します。 将来の現在の状態を調べるためのメソッドを提供するか、結果が利用可能になるまでブロックします。



 ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(() -> "result"); try { String result = future.get(1L, TimeUnit.SECONDS); System.out.println("Result is '" + result + "'."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } catch (TimeoutException e) { throw new RuntimeException(e); } assert future.isDone();
      
      





ロック



Lock







java.util.concurrent.locks



パッケージには、標準のLock



インターフェイスがあります。 ReentrantLock



実装は、synchronizedキーワードの機能を複製しますが、ロックステータス情報の取得、非ブロッキングtryLock()



ロックの中断などの追加機能も提供します。 ReentrantLockの明示的なインスタンスの使用例:



 class Counter { private final Lock lock = new ReentrantLock(); private int value; int increment() { lock.lock(); try { return ++value; } finally { lock.unlock(); } } }
      
      





ReadWriteLock



java.util.concurrent.locks



パッケージには、ReadWriteLockインターフェイス(およびReentrantReadWriteLock実装)も含まれています。これは、読み取りと書き込みのロックのペアによって決定され、通常は複数のリーダーが同時に読み取りを許可しますが、ライターは1人のみ許可します。



 class Statistic { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private int value; void increment() { lock.writeLock().lock(); try { value++; } finally { lock.writeLock().unlock(); } } int current() { lock.readLock().lock(); try { return value; } finally { lock.readLock().unlock(); } } }
      
      





CountDownLatch







CountDownLatch



. await()



, , 0. ( ) countDown()



, . , 0. , .



CompletableFuture







CompletableFuture



. Future, — , , , . ( CompletableFuture#supplyAsync/runAsync



), ( *async



) , ( ForkJoinPool#commonPool



).



, CompletableFuture



, , *async



, .



future



, CompletableFuture#allOf



, future



, , future



, CompletableFuture#anyOf



, , - future



.



 ExecutorService executor0 = Executors.newWorkStealingPool(); ExecutorService executor1 = Executors.newWorkStealingPool(); //,   future  CompletableFuture<String> waitingForAll = CompletableFuture .allOf( CompletableFuture.supplyAsync(() -> "first"), CompletableFuture.supplyAsync(() -> "second", executor1) ) .thenApply(ignored -> " is completed."); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0) //    .thenApply(result -> "Java " + result) //   .thenApplyAsync(result -> "Dzone " + result, executor1) //,     future  .thenCombine(waitingForAll, (first, second) -> first + second) //  ForkJoinPool#commonPool   .thenAcceptAsync(result -> { System.out.println("Result is '" + result + "'."); }) //  .whenComplete((ignored, exception) -> { if (exception != null) exception.printStackTrace(); }); //   - ,     . future.join(); future //    (  ). .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'.")) //  ForkJoinPool#commonPool   .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."));
      
      









Collections#synchronized*



. , java.util.concurrent



, .







実装 説明
CopyOnWriteArrayList , ( , ). .


9: java.util.concurrent







説明
ConcurrentHashMap -. , , . CAS- ( ), ( ).
ConcurrentSkipListMap Map, TreeMap. TreeMap, , .


10: java.util.concurrent











説明
CopyOnWriteArraySet CopyOnWriteArrayList, copy-on-write Set.
ConcurrentSkipListSet ConcurrentSkipListMap, Set.


11: java.util.concurrent







Map:



 Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
      
      









«» «». « , » (FIFO). BlockingQueue



Queue



, , , ( ) ( ). BlockingQueue



, , , - .



実装 説明
ConcurrentLinkedQueue , .
LinkedBlockingQueue , .
PriorityBlockingQueue , . , Comparator, ( FIFO).
DelayQueue , . , .
SynchronousQueue -, , . , . .


12: java.util.concurrent







終わり



.



ありがとう



All Articles