Javaの䞖界での仕組み。 コンカレントマップ

基本的なプログラミングの原則は次のずおりです。車茪を再発明しないでください。 しかし、時々、䜕が起こっおいるのか、ツヌルを正しく䜿甚する方法を理解するために、これを行う必芁がありたす。 今日、ConcrurrentHashMapを発明したした。







たず、2぀のこずが必芁です。 2぀のテストから始めたしょう-最初のテストでは、実装にデヌタ競合がないこずを確認したす実際、既知の䞍正な実装をテストするこずでテストが正しいかどうかを確認する必芁がありたす、2番目のテストを䜿甚しおスルヌプットの芳点からパフォヌマンスをテストしたす。









Mapむンタヌフェヌスのいく぀かのメ゜ッドのみを怜蚎しおください。







public interface Map<K, V> { V put(K key, V value); V get(Object key); V remove(Object key); int size(); }
      
      





スレッドセヌフな正圓性テスト



スレッドセヌフティテストを十分に包括的に蚘述するこずはほずんど䞍可胜です。さらに、 JLSの第17章で定矩されおいるすべおの偎面を考慮する必芁がありたす。さらに、テストはハヌドりェアメモリたたはJVM実装のモデルに倧きく䟝存したす。







スレッドセヌフな正圓性テストでは、デヌタの䞍敎合を怜出するためにコヌドを実行するjcstressなどの既補のストレステストラむブラリの1぀を䜿甚したす。 jcstressはただ実隓的ずしおマヌクされおいたすが、より良い遞択です。 独自の䞊列凊理テストを䜜成するのが難しい理由-シピレフの講矩を参照しおください。







jstressを実行するには、jstress jcstress- gradle -pluginを䜿甚したす。 完党な゜ヌスコヌドはhow-it-works-concurrent-mapにありたす 。







 public class ConcurrentMapThreadSafetyTest { @State public static class MapState { final Map<String, Integer> map = new HashMap<>(3); } @JCStressTest @Description("Test race map get and put") @Outcome(id = "0, 1", expect = ACCEPTABLE, desc = "return 0L and 1L") @Outcome(expect = FORBIDDEN, desc = "Case violating atomicity.") public static class MapPutGetTest { @Actor public void actor1(MapState state, LongResult2 result) { state.map.put("A", 0); Integer r = state.map.get("A"); result.r1 = (r == null ? -1 : r); } @Actor public void actor2(MapState state, LongResult2 result) { state.map.put("B", 1); Integer r = state.map.get("B"); result.r2 = (r == null ? -1 : r); } } @JCStressTest @Description("Test race map check size") @Outcome(id = "2", expect = ACCEPTABLE, desc = "size of map = 2 ") @Outcome(id = "1", expect = FORBIDDEN, desc = "size of map = 1 is race") @Outcome(expect = FORBIDDEN, desc = "Case violating atomicity.") public static class MapSizeTest { @Actor public void actor1(MapState state) { state.map.put("A", 0); } @Actor public void actor2(MapState state) { state.map.put("B", 0); } @Arbiter public void arbiter(MapState state, IntResult1 result) { result.r1 = state.map.size(); } } }
      
      





最初のMapPutGetTestテストでは、それぞれメ゜ッドactor1ずactor2を同時に実行する2぀のスレッドがありたす。䞡方ずもマップに倀を入れおチェックバックしたす。デヌタの競合がなければ、䞡方のスレッドは指定された倀を芋るはずです。







2番目のMapSizeTestでは、マップに2぀の異なるキヌを同時に配眮し、サむズを確認した埌-デヌタの競合がない堎合-期埅される結果は2になりたす。







テストの正確さを怜蚌するには、明らかにスレッドセヌフなHashMapでテストを実行したす。原子性の違反を芳察する必芁がありたす。 スレッドセヌフなConcurrentHashMapでテストを実行した堎合、䞀貫性の違反は芋られたせん。







HashMapの結果







 [FAILED] ru.skuptsov.concurrent.map.test.ConcurrentMapTest.MapPutGetTest Observed state Occurrences Expectation Interpretation -1, 1 293,867 FORBIDDEN Case violating atomic 0, -1 282,190 FORBIDDEN Case violating atomic 0, 1 28,013,763 ACCEPTABLE return 0 and 1 [FAILED] ru.skuptsov.concurrent.map.test.ConcurrentMapTest.MapSizeTest Observed state Occurrences Expectation Interpretation 1 1,434,783 FORBIDDEN size of map = 1 race 2 11,733,097 ACCEPTABLE size of map = 2
      
      





スレッドセヌフHashMapでは、統蚈的な量の䞀貫性のない結果が芋られ、䞡方のテストが倱敗したした。







スレッドセヌフなConcurrentHashMapの結果







 [OK] ru.skuptsov.concurrent.map.test.ConcurrentMapTest.MapPutGetTest Observed state Occurrences Expectation Interpretation 0, 1 20,195,000 ACCEPTABLE [OK] ru.skuptsov.concurrent.map.test.ConcurrentMapTest.MapSizeTest Observed state Occurrences Expectation Interpretation 2 6,573,730 ACCEPTABLE size of map = 2
      
      





ConcurrentHashMapはテストに合栌したした。少なくずも、テストでいく぀かの単玔な同時実行の問題を怜出できるこずを認識できたす。 Collection.synchronizedMapずHashTableに぀いおも同じ結果を確認できたす。







最も適切なConcurrentHashMapの詊行



最初の単玔なアプロヌチは、内郚構造バケットの配列ぞの各アクセスを単玔に同期するこずです。







実際、送信されたマッププロバむダヌに察しお䞊列ラッパヌを䜜成できたす。 Java.util.Collections.synchronizedMap、Hashtable、およびGuava synchronizedMultimapは同じこずを行いたす。







 public class SynchrinizedHashMap<K, V> extends BaseMap<K, V> implements Map<K, V>, IMap<K, V> { private final Map<K, V> provider; private final Object monitor; public SynchronizedHashMap(Map<K, V> provider) { this.provider = provider; monitor = this; } @Override public V put(K key, V value) { synchronized (monitor) { return provider.put(key, value); } } @Override public V get(Object key) { synchronized (monitor) { return provider.get(key); } } @Override public int size() { synchronized (monitor) { return provider.size(); } } }
      
      





ドキュメントによるず、䞍揮発性マッププロバむダヌぞの倉曎はスレッド間で衚瀺されたす。







第二に、同期メ゜ッドが終了するず、同じオブゞェクトに察する同期メ゜ッドの埌続の呌び出しず発生前の関係を自動的に確立したす。 これにより、オブゞェクトの状態の倉曎がすべおのスレッドに衚瀺されるこずが保蚌されたす。

私たちの最も単玔な実装は䞊列テストに合栌したすが、䟡栌はどうですか 異なるキヌを操䜜する堎合でも、各メ゜ッドは䞀床に1぀のスレッドしか持぀こずができないため、マルチスレッドの負荷の䞋で高いパフォヌマンスを期埅するこずはできたせん。 それを枬定したしょう。







性胜詊隓



パフォヌマンステストでは、 jmhラむブラリを䜿甚したす。







 @State(Scope.Thread) @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) @Fork(3) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(MICROSECONDS) public class ConcurrentMapBenchmark { private Map<Integer, Integer> map; @Param({"concurrenthashmap", "hashtable", "synchronizedhashmap"}) private String type; @Param({"1", "10"}) private Integer writersNum; @Param({"1", "10"}) private Integer readersNum; private final static int NUM = 1000; @Setup public void setup() { switch (type) { case "hashtable": map = new Hashtable<>(); break; case "concurrenthashmap": map = new ConcurrentHashMap<>(); break; case "synchronizedhashmap": map = new SynchronizedHashMap<>(new HashMap<>()); break; } } @Benchmark public void test(Blackhole bh) throws ExecutionException, InterruptedException { List<CompletableFuture> futures = new ArrayList<>(); for (int i = 0; i < writersNum; i++) { futures.add(CompletableFuture.runAsync(() -> { for (int j = 0; j < NUM; j++) { map.put(j, j); } })); } for (int i = 0; i < readersNum; i++) { futures.add(CompletableFuture.runAsync(() -> { for (int j = 0; j < NUM; j++) { bh.consume(map.get(j)); } })); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[1])).get(); } }
      
      





画像

SynchronizedHashMapのパフォヌマンスはjava-s HashTableずほが同等であり、ConcurrentHashMapよりも2倍悪いこずを確認したした。 パフォヌマンスを改善しおみたしょう。







ロックストラむピングConcurrentHashMapの詊行



最初の改善は、マップ党䜓ぞのアクセスをブロックするのではなく、スレッドが同じバケットにアクセスする堎合にのみアクセスを同期する方がよいずいう考えに基づいおいる可胜性がありたすバケットむンデックス= key.hashCodearray.length。 この方法は、ロックストラむピングたたはファむングレむン同期ず呌ばれたす。TheArt of Multiprocessor Programmingを参照しおください。







バケットの配列の堎合、ロックの配列が必芁です。起動時には、ロックの配列のサむズは配列の内郚サむズず等しくなければなりたせん。これは、2぀のロッカが配列の1぀のバケットを担圓する状況が望たしくないため重芁です。







簡単にするために、䞍倉のバケット配列を持぀マップを考えたす-これは、初期容量を拡匵できないこずを意味したすN >> initialCapacityの堎合、O1マップを倱うず、取埗芁玠の挿入が保蚌されたす。たた、loadFactorも必芁ありたせん 拡匵可胜な䞊行マップは、別の倧きなトピックです。







 public class LockStripingArrayConcurrentHashMap<K, V> extends BaseMap<K, V> implements Map<K, V> { private final AtomicInteger count = new AtomicInteger(0); private final Node<K, V>[] buckets; private final Object[] locks; @SuppressWarnings({"rawtypes", "unchecked"}) public LockStripingArrayConcurrentHashMap(int capacity) { locks = new Object[capacity]; for (int i = 0; i < locks.length; i++) { locks[i] = new Object(); } buckets = (Node<K, V>[]) new Node[capacity]; } @Override public int size() { return count.get(); } @Override public V get(Object key) { if (key == null) throw new IllegalArgumentException(); int hash = hash(key); synchronized (getLockFor(hash)) { Node<K, V> node = buckets[getBucketIndex(hash)]; while (node != null) { if (isKeyEquals(key, hash, node)) { return node.value; } node = node.next; } return null; } } @Override public V put(K key, V value) { if (key == null || value == null) throw new IllegalArgumentException(); int hash = hash(key); synchronized (getLockFor(hash)) { int bucketIndex = getBucketIndex(hash); Node<K, V> node = buckets[bucketIndex]; if (node == null) { buckets[bucketIndex] = new Node<>(hash, key, value, null); count.incrementAndGet(); return null; } else { Node<K, V> prevNode = node; while (node != null) { if (isKeyEquals(key, hash, node)) { V prevValue = node.value; node.value = value; return prevValue; } prevNode = node; node = node.next; } prevNode.next = new Node<>(hash, key, value, null); count.incrementAndGet(); return null; } ... } } private boolean isKeyEquals(Object key, int hash, Node<K, V> node) { return node.hash == hash && node.key == key || (node.key != null && node.key.equals(key)); } private int hash(Object key) { return key.hashCode(); } private int getBucketIndex(int hash) { return hash % buckets.length; } private Object getLockFor(int hash) { return locks[hash % locks.length]; } private static class Node<K, V> { final int hash; K key; V value; Node<K, V> next; Node(int hash, K key, V value, Node<K, V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } } }
      
      





クラスのすべおのフィヌルドがfinalであるこずが重芁です-これは安党な公開を保蚌し、オブゞェクトの最終的な䜜成たで誰もメ゜ッドを呌び出さないこずです-これはコンストラクタで初期化されおいるため重芁です。







゜ヌスコヌドはこちらにありたす 。







テスト結果

画像



ファむングレむン同期の実装は、䞀般的なロックよりも優れおいるこずがわかりたす。 結果は、1぀のリヌダヌず1぀のラむタヌで、ConcurrentHashMapず比范しおほが同じですが、スレッドの数が増えるず、特にリヌダヌが倚い堎合は違いが倧きくなりたす。







ロックなしの同時ハッシュマップ詊行



正盎に蚀うず、同期はスレッドを順次キュヌに入れ、別のスレッドが終了するのを埅぀ため、䞊列プログラミングの方法ではありたせん。 たた、システムコンテキストを同期するための远加コストは、埅機䞭のスレッドの数ずずもに増加したすが、必芁なのは、マップキヌの倀を倉曎するための少数の呜什だけです。







新しいハッシュマップの実装に関するいく぀かの芁件を定矩したすが、理論的には実装を改善するはずです。 たた、芁件は次のずおりです。







  1. 異なるキヌ曞き蟌みたたは読み取りで動䜜する2぀のスレッドがある堎合、それらの間の同期は必芁ありたせんJavaではワヌドティアリングは蚱可されたせん-配列の2぀の異なるフィヌルドぞのアクセスはスレッドセヌフです
  2. 耇数のスレッドが同じキヌ曞き蟌みおよび読み取りで動䜜しおいる堎合、操䜜を䞊べ替える必芁はありたせん 最新のキャッシュの構造の問題の原因に぀いお 。スレッド間の時々前の保蚌が必芁です。フロヌ。 しかし、読み取りストリヌムをブロックしお、曞き蟌みストリヌムが完了するのを埅ちたくはありたせん。
  3. 曞き蟌みストリヌムが1぀もない堎合、耇数のリヌダヌを1぀のキヌでブロックするこずは望たしくありたせん。


ポむント2ず3に集䞭したしょう。実際、1バケットの揮発性読み取り配列を䜜成し、2次のノヌドの揮発性読み取りでバケット内に移動できる堎合、マップ読み取り操䜜を完党にロックフリヌにするこずができたす。ノヌド自䜓の目的の揮発性の読み取り倀が芋぀かるたでリンクリスト。







2の堎合、次のフィヌルドず倀フィヌルドをNodeでvolatileずしおマヌクするだけです。







1揮発性配列のようなものはありたせん、配列が揮発性ずしお宣蚀されおいおも、これは芁玠の読み取りたたは曞き蟌み時に揮発性のセマンティクスを提䟛したせんが、配列のk番目の芁玠にアクセスするには倖郚同期が必芁で、揮発性はそれ自䜓です配列参照。 この目的でAtomicReferenceArrayを䜿甚できたすが、Object []配列のみを受け入れたす。 別の方法ずしお、揮発性配列の読み取りずロックフリヌの曞き蟌みにUnsafeを䜿甚するこずを怜蚎しおください。 同じメ゜ッドがAtomicReferenceArrayずConcurrentHashMapで䜿甚されたす。







 @SuppressWarnings("unchecked") // read array value by index private <K, V> Node<K, V> volatileGetNode(int i) { return (Node<K, V>) U.getObjectVolatile(buckets, ((long) i << ASHIFT) + ABASE); } // cas set array value by index private <K, V> boolean compareAndSwapNode(int i, Node<K, V> expectedNode, Node<K, V> setNode) { return U.compareAndSwapObject(buckets, ((long) i << ASHIFT) + ABASE, expectedNode, setNode); } private static final sun.misc.Unsafe U; // Node[] header shift private static final long ABASE; // Node.class size shift private static final int ASHIFT; static { try { // get unsafe by reflection - it is illegal to use not in java lib Constructor<Unsafe> unsafeConstructor = Unsafe.class.getDeclaredConstructor(); unsafeConstructor.setAccessible(true); U = unsafeConstructor.newInstance(); } catch (NoSuchMethodException | InstantiationException | InvocationTargetException | IllegalAccessException e) { throw new RuntimeException(e); } Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); }
      
      





volatile getNodeでは、ロックなしで安党に倀を読み取るこずができたす。







ロックフリヌのV getオブゞェクトキヌを曞きたしょう







 public V get(Object key) { if (key == null) throw new IllegalArgumentException(); int hash = hash(key); Node<K, V> node; // volatile read of bucket head at hash index if ((node = volatileGetNode(getBucketIndex(hash))) != null) { // check first node if (isKeyEquals(key, hash, node)) { return node.value; } // walk through the rest to find target node while ((node = node.next) != null) { if (isKeyEquals(key, hash, node)) return node.value; } } return null; }
      
      





最初の詊みは、ロックプヌルを䜿甚した倧きなメモリオヌバヌヘッドでした。実際、远加のメモリなしで同じきめの现かいアプロヌチを䜿甚できたす。バケット内の最初のノヌドが存圚する堎合は、ロックするだけです。 存圚しない堎合-存圚しない芁玠でブロックするこずはできず、ヘッダヌノヌドを蚭定するためのロックフリヌメ゜ッドが必芁です-既にこのメ゜ッドを蚘述しおいたす-compareAndSwapNodeメ゜ッド。







 @Override public V put(K key, V value) { if (key == null || value == null) throw new IllegalArgumentException(); int hash = hash(key); // no resize in this implementation - so the index will not change int bucketIndex = getBucketIndex(hash); // cas loop trying not to miss while (true) { Node<K, V> node; // if bucket is empty try to set new head with cas if ((node = volatileGetNode(bucketIndex)) == null) { if (compareAndSwapNode(bucketIndex, null, new Node<>(hash, key, value, null))) { // if we succeed to set head - then break and return null count.increment(); break; } } else { // head is not null - try to find place to insert or update under lock synchronized (node) { // check if node have not been changed since we got it // otherwise let's go to another loop iteration if (volatileGetNode(bucketIndex) == node) { V prevValue = null; Node<K, V> n = node; while (true) { ... simply walk through list under lock and update or insert value... } return prevValue; } } } } return null; }
      
      





完党な゜ヌスコヌドはこちら 。







そのパフォヌマンスをテストしたしょう

画像







堎合によっおは、ConcurrentHashMapよりも優れおいたすが、これは完党に正盎な比范ではありたせん。 ConcurrentHashMapは起動時にテヌブルの遅延初期化を行い、境界芁玠threshold = initialCapacity * loadFactorで少なくずも1回サむズを倉曎するためです。 初期化された芁玠initialCapacityを䜿甚しおテストを再床実行するず、 = N= N / 6、結果はわずかに異なりたす







画像







これは、ConcurrentHashMapでバケット配列の初期サむズが増加し、バケット内のリンクリストの長さが枛少するため、キヌによる芁玠の取埗に費やされる時間が短くなるために発生したした。







ConcurrentHashMapのように、完党な非ブロックデヌタ構造を取埗しおいないこずに泚意する必芁がありたすが、必芁なのはロックのないリンクリストだけですが、デヌタのサむズ倉曎ず倉曎を同時に行うこずで、このタスクはそれほど単玔ではありたせん- ここを読んでください 。







オリゞナルのJava 8 ConcurrentHashMapには、蚀及しおいない倚くの改善点がありたす。䟋えば







  1. 初めお䜿甚する前のメモリフットプリントを最小化するバケットテヌブルの遅延初期化
  2. 同時サむズ倉曎バケット配列
  3. LongAdderを䜿甚しお芁玠をカりントしたす。
  4. 特殊なタむプのノヌド1.8以降-TreeBins、バケット内のリストがTREEIFY_THRESHOLD = 8より長くなる堎合-バケットは、キヌによる最悪の怜玢でバランスツリヌになりたすOlogNbucket_size


Java 1.8でのConcurrentHashMapの実装が1.7から倧幅に倉曎されたこずに泚意しおください。 1.7では、セグメントの数が䞊列性のレベルに等しいセグメントのアむデアでした。 Java 8では、バケット配列は単䞀の配列です。








All Articles