明日は、ほぼ記念日ストリームの「Java開発者」コースをスムーズに開始します-昨年4月以来6回連続です。 そしてこれは、私たちがあなたと共有する最も興味深い資料を再び選択し、翻訳したことを意味します。
行こう!
このガイドは、マルチスレッドプログラムを使用するJava開発者が、並行性の基本概念とその使用方法を理解するのに役立ちます。 標準ライブラリへのリンクを使用して、Java言語の重要な側面について学習します。
セクション1
エントリー
Javaは当初から、スレッドやロックなどの主要な並行性の概念をサポートしています。 このガイドは、マルチスレッドプログラムを使用するJava開発者が、並行性の基本概念とその使用方法を理解するのに役立ちます。
セクション2
コンセプトの
コンセプト | 説明 |
---|---|
| アトミック操作は、完全に実行されるかまったく実行されない操作であり、部分的な実行は不可能です。 |
可視性 | あるスレッドが別のスレッドによって行われた変更を見る条件 |
表1:同時実行性の概念

競合状態
競合状態は、同じリソースが複数のスレッドで同時に使用される場合に発生し、各スレッドのアクションの順序によっては、いくつかの結果が得られる場合があります。 以下のコードはスレッドセーフではなく、フィールドを遅延初期化する
check-then-act
(
null
を確認してから初期化する)はアトミックではないため、
value
変数は複数回初期化できます。
class Lazy <T> { private volatile T value; T get() { if (value == null) value = initialize(); return value; } }
データ競合
データの競合は、2つ以上のスレッドが同期せずに同じ非最終変数にアクセスしようとしたときに発生します。 同期の欠如は、他のスレッドには見えない変更をもたらし、古いデータの読み取りを引き起こし、無限ループ、データ構造の損傷、または不正確な計算を引き起こす可能性があります。 このコードは、読み取りストリームがスレッドの書き換えによる変更に気付かない場合があるため、無限ループにつながる可能性があります。
class Waiter implements Runnable { private boolean shouldFinish; void finish() { shouldFinish = true; } public void run() { long iteration = 0; while (!shouldFinish) { iteration++; } System.out.println("Finished after: " + iteration); } } class DataRace { public static void main(String[] args) throws InterruptedException { Waiter waiter = new Waiter(); Thread waiterThread = new Thread(waiter); waiterThread.start(); waiter.finish(); waiterThread.join(); } }
セクション3
Javaメモリモデル:偶然の関係
Javaメモリモデルは、フィールドの読み取り/書き込みやモニターでの同期などのアクションに関して定義されます。 アクションは、事前に発生する関係(以前に実行される)を使用して順序付けられます。これは、スレッドが別のスレッドのアクションの結果を確認したタイミングと、正しく同期されたプログラムを説明するために使用できます。
HAPPENS-BEFOREリレーションズには以下のプロパティがあります:
- スレッド#startの呼び出しは、このスレッドのアクションの前に発生します。
- モニターは、同じモニターの後続のキャプチャの前に戻ります。
- 揮発性変数への書き込みは、その後の揮発性変数の読み取りの前に発生します。
- 最後の変数への書き込みは、オブジェクトリンクの発行前に発生します。
- スレッド内のすべてのアクションは、このスレッドのスレッド#joinから戻るまで実行されます。
イメージ1では、
Action X
は
Action Y
前に発生するため、
Thread 2
では、
Action Y
の右側にあるすべての操作に、
Thread 1
Action X
の左側にあるすべての操作が表示されます。

画像1:前の例
セクション4
標準同期機能
synchronized
キーワード
synchronized
、異なるスレッドが同じコードブロックを同時に実行しないようにするために使用されます。 (同期ブロックに入ることで)ロックを受け取った場合、このロックが適用されるデータは排他モードで処理されるため、操作はアトミックと見なされます。 また、同じロックを受け取った後、他のスレッドが操作の結果を確認できるようにします。
class AtomicOperation { private int counter0; private int counter1; void increment() { synchronized (this) { counter0++; counter1++; } } }
synchronizedキーワードは、メソッドレベルで展開することもできます。
| モニターとして使用されるリンク |
---|---|
静的 | Classオブジェクトへの参照<?> |
非静的 | このリンク |
表2:メソッド全体が同期されるときに使用されるモニター
ロックは再入可能であるため、スレッドにすでにロックが含まれている場合は、再びロックを正常に取得できます。
class Reentrantcy { synchronized void doAll() { doFirst(); doSecond(); } synchronized void doFirst() { System.out.println("First operation is successful."); } synchronized void doSecond() { System.out.println("Second operation is successful."); } }
競合のレベルは、モニターのキャプチャ方法に影響します。
| 説明 |
---|---|
初期化 | 誰も捕獲されなくなるまで作成されました。 |
偏った | 戦いはなく、ロックによって保護されたコードは1つのスレッドのみによって実行されます。 キャプチャするのに最も安い。 |
薄い | モニターは、戦いなしで複数のスレッドによってキャプチャされます。 ロックには比較的安価なCASが使用されます。 |
太い | 闘争があります。 JVMはOSミューテックスを要求し、OSスケジューラがスレッドパーキングとウェイクアップを処理できるようにします。 |
表3:モニターステータス
wait/notify
wait/notify/notifyAll
は、
Object
クラスで宣言されています。
wait
、スレッドを強制的に
WAITING
または
TIMED_WAITING
に移行させるために使用されます(タイムアウト値が渡された場合)。 スレッドを起動するには、次のアクションのいずれかを実行できます。
- 別のスレッドがnotifyを呼び出すと、モニターで待機している任意のスレッドが起動します。
- 別のスレッドがnotifyAllを呼び出し、モニターで待機しているすべてのスレッドを起動します。
- スレッド#割り込みが呼び出されます。 この場合、InterruptedExceptionがスローされます。
最も一般的な例は条件付きループです。
class ConditionLoop { private boolean condition; synchronized void waitForCondition() throws InterruptedException { while (!condition) { wait(); } } synchronized void satisfyCondition() { condition = true; notifyAll(); } }
- オブジェクトに対して
wait/notify/notifyAll
を使用するには、最初にオブジェクトをロックする必要があることに注意してwait/notify/notifyAll
。 - ループ内で常に待機して、完了が予想される状態を確認してください。 これは、待機の開始前に別のスレッドが条件を満たしている場合の同期の問題に関係します。 さらに、これにより、発生する可能性がある(および発生する)付随的な目覚めからコードが保護されます。
- notify / notifyAllを呼び出す前に、常に待機条件を満たしていることを確認してください。 そうしないと、通知が発生しますが、スレッドは待機ループをエスケープできません。
キーワード
volatile
volatile
は可視性の問題を解決し、値の変更をアトミックにします。これは、発生前の関係があるためです。volatile変数への書き込みは、volatile変数の後続の読み取りの前に発生します。 したがって、次回フィールドが読み取られるときに、最新のレコードによって設定された値が表示されるようになります。
class VolatileFlag implements Runnable { private volatile boolean shouldStop; public void run() { while (!shouldStop) { //do smth } System.out.println("Stopped."); } void stop() { shouldStop = true; } public static void main(String[] args) throws InterruptedException { VolatileFlag flag = new VolatileFlag(); Thread thread = new Thread(flag); thread.start(); flag.stop(); thread.join(); } }
原子性
java.util.concurrent.atomic
パッケージには、
volatile
ように、ロックせずに単一の値で複合アトミックアクションをサポートするクラスのセットが含まれています。
AtomicXXXクラスを使用して、アトミック
check-then-act
ザント
check-then-act
操作を実装できます。
class CheckThenAct { private final AtomicReference<String> value = new AtomicReference<>(); void initialize() { if (value.compareAndSet(null, "Initialized value")) { System.out.println("Initialized only once."); } } }
AtomicInteger
と
AtomicLong
両方にアトミックなインクリメント/デクリメント操作があります:
class Increment { private final AtomicInteger state = new AtomicInteger(); void advance() { int oldState = state.getAndIncrement(); System.out.println("Advanced: '" + oldState + "' -> '" + (oldState + 1) + "'."); } }
カウンターが必要で、アトミックに値を取得する必要がない場合は、
AtomicLong/AtomicInteger
代わりに
AtomicLong/AtomicInteger
使用を検討して
AtomicLong/AtomicInteger
。
LongAdder
は、いくつかのセルの値を処理し、必要に応じてその数を増やします。したがって、競争が激しい場合に効果的です。
ThreadLocal
データをストリームに保存し、ロックをオプションにする1つの方法は、
ThreadLocal
ストレージを使用することです。 概念的に、
ThreadLocal
は、各スレッドが独自のバージョンの変数を持っているかのように機能します。
ThreadLocal
通常、「現在のトランザクション」などの各スレッドの値やその他のリソースをキャプチャするために使用されます。 さらに、フローカウンタ、統計、または識別子ジェネレータを維持するために使用されます。
class TransactionManager { private final ThreadLocal<Transaction> currentTransaction = ThreadLocal.withInitial(NullTransaction::new); Transaction currentTransaction() { Transaction current = currentTransaction.get(); if (current.isNull()) { current = new TransactionImpl(); currentTransaction.set(current); } return current; } }
セクション5
安全な公開
オブジェクトを公開すると、現在のスコープ外でリンクが利用可能になります(たとえば、ゲッターからリンクを返す)。 オブジェクトを安全に公開するには(完全に作成された場合のみ)、同期が必要になる場合があります。 パブリケーションのセキュリティは、次を使用して実現できます。
- 静的初期化子。 クラスの初期化は排他ロック下で実行されるため、静的変数を初期化できるスレッドは1つだけです。
class StaticInitializer { // public static final Year year = Year.of(2017); public static final Set<String> keywords; // static { // Set<String> keywordsSet = new HashSet<>(); // keywordsSet.add("java"); keywordsSet.add("concurrency"); // keywords = Collections.unmodifiableSet(keywordsSet); } }
- 揮発性フィールド。 読み取りストリームは常に最後の値を読み取ります。これは、揮発性変数への書き込みが後続の読み取りの前(発生前)に発生するためです。
class Volatile { private volatile String state; void setState(String state) { this.state = state; } String getState() { return state; } }
- 原子性。 たとえば、
AtomicInteger
は値をvolatileフィールドに格納するため、volatile変数の規則もここで適用できます。
class Atomics { private final AtomicInteger state = new AtomicInteger(); void initializeState(int state) { this.state.compareAndSet(0, state); } int getState() { return state.get(); } }
- 最終フィールド。
class Final { private final String state; Final(String state) { this.state = state; } String getState() { return state; } }
このリンクが作成中に蒸発しないことを確認してください。
class ThisEscapes { private final String name; ThisEscapes(String name) { Cache.putIntoCache(this); this.name = name; } String getName() { return name; } } class Cache { private static final Map<String, ThisEscapes> CACHE = new ConcurrentHashMap<>(); static void putIntoCache(ThisEscapes thisEscapes) { // 'this' , . CACHE.putIfAbsent(thisEscapes.getName(), thisEscapes); } }
- 正しく同期されたフィールド。
class Synchronization { private String state; synchronized String getState() { if (state == null) state = "Initial"; return state; } }
セクション6
不変オブジェクト
不変オブジェクトの最も注目すべき特性の1つはスレッドの安全性です 。したがって、オブジェクトの同期は不要です。 不変オブジェクトの要件:
- すべてのフィールドは最終フィールドです。
- すべてのフィールドは、可変または不変のオブジェクトである必要がありますが、オブジェクトの境界を超えてはならないため、作成後にオブジェクトの状態を変更することはできません。
- このリンクは作成時に消えません。
- クラスは最終クラスなので、サブクラスでその動作を再定義することはできません。
不変オブジェクトの例:
// final - public final class Artist { // , final private final String name; // , final private final List<Track> tracks; public Artist(String name, List<Track> tracks) { this.name = name; // List<Track> copy = new ArrayList<>(tracks); // this.tracks = Collections.unmodifiableList(copy); // 'this' } // Getters, equals, hashCode, toString } // final - public final class Track { // , final private final String title; public Track(String title) { this.title = title; } // Getters, equals, hashCode, toString }
セクション7
ストリーム
java.lang.Thread
クラスは、アプリケーションまたはJVMスレッドを表すために使用されます。 コードは、常にいくつかのThreadクラスのコンテキストで実行されます(現在のスレッドを取得するには、
Thread#currentThread()).
使用できます
Thread#currentThread()).
| 説明 |
---|---|
新品 | 開始しませんでした。 |
| 稼働しています。 |
ブロックされた | モニターで待機中-彼はロックを取得して、クリティカルセクションに入ろうとしています。 |
待っています | 特定のアクションが別のスレッドによって実行されるのを待っています(notify / notifyAll、LockSupport#unpark)。 |
| WAITINGと同じですが、タイムアウトがあります。 |
終了しました | 止まった |
表4:ストリームの状態
| 説明 |
---|---|
始める | Threadクラスのインスタンスを開始し、run()メソッドを実行します。 |
参加する | ストリームの終わりまでブロックします。 |
割り込む | ストリームを中断します。 割り込みに応答するメソッドでスレッドがブロックされている場合、別のスレッドでInterruptedExceptionがスローされます。そうでない場合は、割り込みステータスが設定されます。 |
停止、一時停止、再開、破棄 | これらの方法はすべて時代遅れです。 問題のフローの状態に応じて、危険な操作を実行します。 代わりに、Thread#interrupt()またはvolatileフラグを使用して、スレッドに何をすべきかを伝えます |
表5:スレッド調整方法スレッド調整方法
InterruptedExceptionを処理する方法は?
- 可能であれば、現在のレベルですべてのリソースをクリアし、スレッドを終了します。
- 現在のメソッドがInterruptedExceptionをスローすることを宣言します。
- メソッドがInterruptedExceptionを発生させない場合、Thread.currentThread()。Interrupt()を呼び出して、割り込みフラグをtrueにリセットし、このレベルでより適切な例外をスローする必要があります。 より高いレベルで割り込みを処理できるようにするには、trueフラグを返すことが非常に重要です。
予期しない例外の処理
UncaughtExceptionHandler
で
UncaughtExceptionHandler
が示される場合があり、スレッドが中断されるため、キャッチされない例外の通知を受け取ります。
Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler((failedThread, exception) -> { logger.error("Caught unexpected exception in thread '{}'.", failedThread.getName(), exception); }); thread.start();
セクション8
活力(活力)
Deadlock
Deadlock
、またはデッドロックは、複数のスレッドがあり、それぞれが別のスレッドに属するリソースを予期している場合に発生します。そのため、リソースとそれらを待機しているスレッドからループが形成されます。 最も明白なタイプのリソースはオブジェクトモニターですが、ロックを引き起こすリソース(たとえば、
wait/notify
)も適切です。
潜在的なデッドロックの例:
class Account { private long amount; void plus(long amount) { this.amount += amount; } void minus(long amount) { if (this.amount < amount) throw new IllegalArgumentException(); else this.amount -= amount; } static void transferWithDeadlock(long amount, Account first, Account second){ synchronized (first) { synchronized (second) { first.minus(amount); second.plus(amount); } } } }
同時に次の場合、相互ロックが発生します。
- 1つのスレッドが1つのアカウントから別のアカウントにデータを転送しようとしており、すでに最初のアカウントにロックをかけています。
- 別のスレッドが2番目のアカウントから最初のアカウントにデータを転送しようとしており、すでに2番目のアカウントにロックをかけています。
デッドロックを防ぐ方法:
- ロック順序-常に同じ順序でロックします。
class Account { private long id; private long amount; // static void transferWithLockOrdering(long amount, Account first, Account second){ boolean lockOnFirstAccountFirst = first.id < second.id; Account firstLock = lockOnFirstAccountFirst ? first : second; Account secondLock = lockOnFirstAccountFirst ? second : first; synchronized (firstLock) { synchronized (secondLock) { first.minus(amount); second.plus(amount); } } } }
- タイムアウト付きのブロック-ロックを適用するときに無期限にブロックしないでください。すべてのロックをできるだけ早く削除して再試行することをお勧めします。
class Account { private long amount; // static void transferWithTimeout( long amount, Account first, Account second, int retries, long timeoutMillis ) throws InterruptedException { for (int attempt = 0; attempt < retries; attempt++) { if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) { try { if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) { try { first.minus(amount); second.plus(amount); } finally { second.lock.unlock(); } } } finally { first.lock.unlock(); } } } } }
JVMはモニターの相互ロックを検出し、モニターに関する情報をストリームダンプに表示できます。
LivelockおよびStream Fasting
ライブロックは、スレッドがリソースへのアクセスのネゴシエーションに全時間を費やすか、スレッドが実際に前進しないようにデッドロックを発見して回避するときに発生します。 絶食は、スレッドが長期間ブロックし続けると発生するため、一部のスレッドは進行せずに「star死」します。
セクション9
java.util.concurrent
スレッドプール
スレッドプールのメインインターフェイスは
ExecutorService.java.util.concurrent
また、最も一般的な構成でスレッドプールを作成するためのファクトリメソッドを含む静的Executorsファクトリも提供します。
方法 | 説明 |
---|---|
newSingleThreadExecutor | 1つのスレッドのみを持つExecutorServiceを返します。 |
newFixedThreadPool | スレッドの数が固定されたExecutorServiceを返します。 |
newCachedThreadPool | さまざまなサイズのスレッドのプールを持つExecutorServiceを返します。 |
| 単一のスレッドでScheduledExecutorServiceを返します。 |
newScheduledThreadPool | スレッドのメインセットを含むScheduledExecutorServiceを返します。 |
newWorkStealingPool | キャッシングタスクExecutorServiceを返します。 |
表6:静的ファクトリメソッド
スレッドプールのサイズを決定するとき、アプリケーションが実行されているマシンの論理コアの数のサイズを決定することはしばしば有用です。
Runtime.getRuntime().AvailableProcessors()
呼び出すことにより、Javaでこの値を取得できます。
実装 | 説明 |
---|---|
ThreadPoolExecutor | デフォルトの実装では、サイズ変更スレッドプール、1つの作業キュー、拒否されたタスク(RejectedExecutionHandler経由)およびスレッド作成(ThreadFactory経由)のカスタムポリシーが使用されます。 |
| 定期的なタスクをスケジュールする機能を提供する拡張ThreadPoolExecutor。 |
フォークジョインプール | タスクを盗むプール:プール内のすべてのスレッドは、割り当てられたタスクまたは他のアクティブなタスクによって作成されたタスクを見つけて実行しようとします。 |
表7:スレッドプールの実装
タスクは、
ExecutorService#submit
、
ExecutorService#invokeAll
または
ExecutorService#invokeAny
を使用して送信されます。これらには、さまざまなタイプのタスク用のオーバーロードがいくつかあります。
| 説明 |
---|---|
実行可能 | 戻り値のないタスクを表します。 |
呼び出し可能 | 戻り値を持つ計算を表します。 また、元の例外をスローするため、チェック済み例外のラッパーは必要ありません。 |
表8:機能タスクインターフェイス
Future
Future
は非同期コンピューティングの抽象化です。 計算結果または例外のいずれかで、いつでも利用可能な計算の結果を表します。 ほとんどの
ExecutorService
メソッドは、
Future
を戻り型として使用します。 将来の現在の状態を調べるためのメソッドを提供するか、結果が利用可能になるまでブロックします。
ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<String> future = executorService.submit(() -> "result"); try { String result = future.get(1L, TimeUnit.SECONDS); System.out.println("Result is '" + result + "'."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } catch (TimeoutException e) { throw new RuntimeException(e); } assert future.isDone();
ロック
Lock
java.util.concurrent.locks
パッケージには、標準の
Lock
インターフェイスがあります。
ReentrantLock
実装は、synchronizedキーワードの機能を複製しますが、ロックステータス情報の取得、非ブロッキング
tryLock()
ロックの中断などの追加機能も提供します。 ReentrantLockの明示的なインスタンスの使用例:
class Counter { private final Lock lock = new ReentrantLock(); private int value; int increment() { lock.lock(); try { return ++value; } finally { lock.unlock(); } } }
ReadWriteLock
java.util.concurrent.locks
パッケージには、ReadWriteLockインターフェイス(およびReentrantReadWriteLock実装)も含まれています。これは、読み取りと書き込みのロックのペアによって決定され、通常は複数のリーダーが同時に読み取りを許可しますが、ライターは1人のみ許可します。
class Statistic { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private int value; void increment() { lock.writeLock().lock(); try { value++; } finally { lock.writeLock().unlock(); } } int current() { lock.readLock().lock(); try { return value; } finally { lock.readLock().unlock(); } } }
CountDownLatch
CountDownLatch
.
await()
, , 0. ( )
countDown()
, . , 0. , .
CompletableFuture
CompletableFuture
. Future, — , , , . (
CompletableFuture#supplyAsync/runAsync
), (
*async
) , (
ForkJoinPool#commonPool
).
,
CompletableFuture
, ,
*async
, .
future
,
CompletableFuture#allOf
,
future
, ,
future
,
CompletableFuture#anyOf
, , -
future
.
ExecutorService executor0 = Executors.newWorkStealingPool(); ExecutorService executor1 = Executors.newWorkStealingPool(); //, future CompletableFuture<String> waitingForAll = CompletableFuture .allOf( CompletableFuture.supplyAsync(() -> "first"), CompletableFuture.supplyAsync(() -> "second", executor1) ) .thenApply(ignored -> " is completed."); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0) // .thenApply(result -> "Java " + result) // .thenApplyAsync(result -> "Dzone " + result, executor1) //, future .thenCombine(waitingForAll, (first, second) -> first + second) // ForkJoinPool#commonPool .thenAcceptAsync(result -> { System.out.println("Result is '" + result + "'."); }) // .whenComplete((ignored, exception) -> { if (exception != null) exception.printStackTrace(); }); // - , . future.join(); future // ( ). .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'.")) // ForkJoinPool#commonPool .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."));
—
Collections#synchronized*
. ,
java.util.concurrent
, .
実装 | 説明 |
---|---|
| , ( , ). . |
9:
java.util.concurrent
| 説明 |
---|---|
| -. , , . CAS- ( ), ( ). |
| Map, TreeMap. TreeMap, , . |
10:
java.util.concurrent
| 説明 |
---|---|
| CopyOnWriteArrayList, copy-on-write Set. |
| ConcurrentSkipListMap, Set. |
11:
java.util.concurrent
Map:
Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
«» «». « , » (FIFO).
BlockingQueue
Queue
, , , ( ) ( ).
BlockingQueue
, , , - .
実装 | 説明 |
---|---|
| , . |
LinkedBlockingQueue | , . |
| , . , Comparator, ( FIFO). |
DelayQueue | , . , . |
SynchronousQueue | -, , . , . . |
12:
java.util.concurrent
終わり
.
ありがとう