并发环境下为什么使用ConcurrentHashMap
1. HashMap在高并发的环境下,执行put操作会导致HashMap的Entry链表形成环形数据结构,从而导致Entry的next节点始终不为空,因此产生死循环获取Entry
2. HashTable虽然是线程安全的,但是效率低下,当一个线程访问HashTable的同步方法时,其他线程如果也访问HashTable的同步方法,那么会进入阻塞或者轮训状态。
3. 在jdk1.6中ConcurrentHashMap使用锁分段技术提高并发访问效率。首先将数据分成一段一段地存储,然后给每一段数据配一个锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问。然而在jdk1.8中的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。
JDK1.6分析
ConcurrentHashMap采用 分段锁的机制,实现并发的更新操作,底层由Segment数组和HashEntry数组组成。Segment继承ReentrantLock用来充当锁的角色,每个 Segment 对象守护每个散列映射表的若干个桶。HashEntry 用来封装映射表的键 / 值对;每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组,下面我们通过一个图来演示一下 ConcurrentHashMap 的结构:
JDK1.8分析
改进一:取消segments字段,直接采用transient volatile HashEntry<K,V> table
保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。
ConcurrentHashMap的重要属性
<span style="color:#000000"><code><span style="color:#880000">/**
* races. Updated via CAS.
* 记录容器的容量大小,通过CAS更新
*/</span>
<span style="color:#000088">private</span> <span style="color:#000088">static</span> <span style="color:#000088">final</span> <span style="color:#000088">long</span> BASECOUNT;
<span style="color:#880000">/**
* 这个sizeCtl是volatile的,那么他是线程可见的,一个思考:它是所有修改都在CAS中进行,但是sizeCtl为什么不设计成LongAdder(jdk8出现的)类型呢?
* 或者设计成AtomicLong(在高并发的情况下比LongAdder低效),这样就能减少自己操作CAS了。
*
* 默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
* -1 代表table正在初始化
* -N 表示有N-1个线程正在进行扩容操作
* 其余情况:
*1、如果table未初始化,表示table需要初始化的大小。
*2、如果table初始化完成,表示table的容量,默认是table大小的0.75 倍,居然用这个公式算0.75(n - (n >>> 2))。
**/</span>
<span style="color:#000088">private</span> <span style="color:#000088">static</span> <span style="color:#000088">final</span> <span style="color:#000088">long</span> SIZECTL;
<span style="color:#880000">/**
* 自旋锁 (锁定通过 CAS) 在调整大小和/或创建 CounterCells 时使用。 在CounterCell类更新value中会使用,功能类似显示锁和内置锁,性能更好
* 在Striped64类也有应用
*/</span>
<span style="color:#000088">private</span> <span style="color:#000088">static</span> <span style="color:#000088">final</span> <span style="color:#000088">long</span> CELLSBUSY;</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。
<span style="color:#000000"><code> <span style="color:#000088">static</span> <span style="color:#000088">class</span> <span style="color:#4f4f4f">Node</span><<span style="color:#4f4f4f">K</span>,<span style="color:#4f4f4f">V</span>> <span style="color:#000088">implements</span> <span style="color:#4f4f4f">Map</span>.<span style="color:#4f4f4f">Entry</span><<span style="color:#4f4f4f">K</span>,<span style="color:#4f4f4f">V</span>> {
<span style="color:#000088">final</span> <span style="color:#000088">int</span> hash;
<span style="color:#000088">final</span> K key;
volatile V val;<span style="color:#880000">//volatile类型的</span>
volatile Node<K,V> next;<span style="color:#880000">//volatile类型的</span>
Node(<span style="color:#000088">int</span> hash, K key, V val, Node<K,V> next) {
<span style="color:#000088">this</span>.hash = hash;
<span style="color:#000088">this</span>.key = key;
<span style="color:#000088">this</span>.val = val;
<span style="color:#000088">this</span>.next = next;
}
<span style="color:#880000">//省略部分代码</span>
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。
<span style="color:#000000"><code> <span style="color:#000088">static</span> <span style="color:#000088">final</span> <span style="color:#000088">class</span> <span style="color:#4f4f4f">ForwardingNode</span><<span style="color:#4f4f4f">K</span>,<span style="color:#4f4f4f">V</span>> <span style="color:#000088">extends</span> <span style="color:#4f4f4f">Node</span><<span style="color:#4f4f4f">K</span>,<span style="color:#4f4f4f">V</span>> {
<span style="color:#000088">final</span> Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
<span style="color:#000088">super</span>(MOVED, <span style="color:#000088">null</span>, <span style="color:#000088">null</span>, <span style="color:#000088">null</span>);
<span style="color:#000088">this</span>.nextTable = tab;
}
<span style="color:#880000">//省略部分代码</span>
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
ConcurrentHashMap的构造函数
<span style="color:#000000"><code> <span style="color:#880000">//默认的构造函数</span>
<span style="color:#000088">public</span> <span style="color:#009900">ConcurrentHashMap</span>(){}
<span style="color:#880000">/**
*initialCapacity 初始化容量
**/</span>
<span style="color:#000088">public</span> <span style="color:#009900">ConcurrentHashMap</span>(<span style="color:#000088">int</span> initialCapacity) {}
<span style="color:#880000">/**
*
*创建与给定map具有相同映射的新map
**/</span>
<span style="color:#000088">public</span> <span style="color:#009900">ConcurrentHashMap</span>(Map<? extends K, ? extends V> m){}
<span style="color:#880000">/**
*initialCapacity 初始容量
*loadFactor 负载因子,当容量达到initialCapacity*loadFactor时,执行扩容
*concurrencyLevel 预估的并发更新线程数
**/</span>
<span style="color:#000088">public</span> <span style="color:#009900">ConcurrentHashMap</span>(<span style="color:#000088">int</span> initialCapacity, <span style="color:#000088">float</span> loadFactor) {}
<span style="color:#880000">/**
*initialCapacity 初始容量
*loadFactor 负载因子
*concurrencyLevel 预估的并发更新线程数
**/</span>
<span style="color:#000088">public</span> <span style="color:#009900">ConcurrentHashMap</span>(<span style="color:#000088">int</span> initialCapacity,
<span style="color:#000088">float</span> loadFactor, <span style="color:#000088">int</span> concurrencyLevel) {}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
接下来具体看看第四个构造函数的具体实现:
<span style="color:#000000"><code> <span style="color:#000088">public</span> <span style="color:#009900">ConcurrentHashMap</span>(<span style="color:#000088">int</span> initialCapacity,
<span style="color:#000088">float</span> loadFactor, <span style="color:#000088">int</span> concurrencyLevel) {
<span style="color:#000088">if</span> (!(loadFactor > <span style="color:#006666">0.0</span>f) || initialCapacity < <span style="color:#006666">0</span> || concurrencyLevel <= <span style="color:#006666">0</span>)
<span style="color:#000088">throw</span> <span style="color:#000088">new</span> IllegalArgumentException();
<span style="color:#000088">if</span> (initialCapacity < concurrencyLevel) <span style="color:#880000">//至少使用尽可能多的bin</span>
initialCapacity = concurrencyLevel; <span style="color:#880000">//作为估计线程</span>
<span style="color:#000088">long</span> size = (<span style="color:#000088">long</span>)(<span style="color:#006666">1.0</span> + (<span style="color:#000088">long</span>)initialCapacity / loadFactor);
<span style="color:#000088">int</span> cap = (size >= (<span style="color:#000088">long</span>)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((<span style="color:#000088">int</span>)size);
<span style="color:#000088">this</span>.sizeCtl = cap;<span style="color:#880000">//初始化sizeCtl</span>
}
<span style="color:#880000">/**
*返回给定所需容量,table的大小总是2的幂次方
**/</span>
<span style="color:#000088">private</span> <span style="color:#000088">static</span> <span style="color:#000088">final</span> <span style="color:#000088">int</span> <span style="color:#009900">tableSizeFor</span>(<span style="color:#000088">int</span> c) {
<span style="color:#000088">int</span> n = c - <span style="color:#006666">1</span>;
n |= n >>> <span style="color:#006666">1</span>;
n |= n >>> <span style="color:#006666">2</span>;
n |= n >>> <span style="color:#006666">4</span>;
n |= n >>> <span style="color:#006666">8</span>;
n |= n >>> <span style="color:#006666">16</span>;
<span style="color:#000088">return</span> (n < <span style="color:#006666">0</span>) ? <span style="color:#006666">1</span> : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + <span style="color:#006666">1</span>;
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作
put()方法的实现
<span style="color:#000000"><code> <span style="color:#000088">public</span> V <span style="color:#009900">put</span>(K key, V <span style="color:#000088">value</span>) {
<span style="color:#000088">return</span> putVal(key, <span style="color:#000088">value</span>, <span style="color:#000088">false</span>);
}
final V putVal(K key, V <span style="color:#000088">value</span>, boolean onlyIfAbsent) {
<span style="color:#000088">if</span> (key == <span style="color:#000088">null</span> || <span style="color:#000088">value</span> == <span style="color:#000088">null</span>) <span style="color:#000088">throw</span> <span style="color:#000088">new</span> NullPointerException();
<span style="color:#000088">int</span> hash = spread(key.hashCode());<span style="color:#880000">//对hashCode进行再散列,算法为(h ^ (h >>> 16)) & HASH_BITS</span>
<span style="color:#000088">int</span> binCount = <span style="color:#006666">0</span>;
<span style="color:#880000">//这边加了一个循环,就是不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject</span>
<span style="color:#880000">//因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试</span>
<span style="color:#000088">for</span> (Node<K,V>[] tab = table;;) {
Node<K,V> f; <span style="color:#000088">int</span> n, i, fh;
<span style="color:#880000">// 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。</span>
<span style="color:#000088">if</span> (tab == <span style="color:#000088">null</span> || (n = tab.length) == <span style="color:#006666">0</span>)
tab = initTable();
<span style="color:#000088">else</span> <span style="color:#000088">if</span> ((f = tabAt(tab, i = (n - <span style="color:#006666">1</span>) & hash)) == <span style="color:#000088">null</span>) {
<span style="color:#000088">if</span> (casTabAt(tab, i, <span style="color:#000088">null</span>,
<span style="color:#000088">new</span> Node<K,V>(hash, key, <span style="color:#000088">value</span>, <span style="color:#000088">null</span>)))
<span style="color:#000088">break</span>; <span style="color:#880000">// no lock when adding to empty bin</span>
}
<span style="color:#880000">// 如果tab[i]不为空并且hash值为MOVED(-1),说明该链表正在进行transfer操作,返回扩容完成后的table。</span>
<span style="color:#000088">else</span> <span style="color:#000088">if</span> ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
<span style="color:#000088">else</span> {
V oldVal = <span style="color:#000088">null</span>;
<span style="color:#880000">// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突</span>
synchronized (f) {
<span style="color:#000088">if</span> (tabAt(tab, i) == f) {
<span style="color:#000088">if</span> (fh >= <span style="color:#006666">0</span>) {
binCount = <span style="color:#006666">1</span>;
<span style="color:#000088">for</span> (Node<K,V> e = f;; ++binCount) {
K ek;
<span style="color:#880000">// 如果在链表中找到值为key的节点e,直接设置e.val = value即可。</span>
<span style="color:#000088">if</span> (e.hash == hash &&
((ek = e.key) == key ||
(ek != <span style="color:#000088">null</span> && key.equals(ek)))) {
oldVal = e.val;
<span style="color:#000088">if</span> (!onlyIfAbsent)
e.val = <span style="color:#000088">value</span>;
<span style="color:#000088">break</span>;
}
<span style="color:#880000">// 如果没有找到值为key的节点,直接新建Node并加入链表即可。</span>
Node<K,V> pred = e;
<span style="color:#000088">if</span> ((e = e.next) == <span style="color:#000088">null</span>) {
pred.next = <span style="color:#000088">new</span> Node<K,V>(hash, key,
<span style="color:#000088">value</span>, <span style="color:#000088">null</span>);
<span style="color:#000088">break</span>;
}
}
}
<span style="color:#880000">// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。</span>
<span style="color:#000088">else</span> <span style="color:#000088">if</span> (f instanceof TreeBin) {
Node<K,V> p;
binCount = <span style="color:#006666">2</span>;
<span style="color:#000088">if</span> ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
<span style="color:#000088">value</span>)) != <span style="color:#000088">null</span>) {
oldVal = p.val;
<span style="color:#000088">if</span> (!onlyIfAbsent)
p.val = <span style="color:#000088">value</span>;
}
}
}
}
<span style="color:#000088">if</span> (binCount != <span style="color:#006666">0</span>) {
<span style="color:#880000">// 如果节点数>=8,那么转换链表结构为红黑树结构。</span>
<span style="color:#000088">if</span> (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
<span style="color:#000088">if</span> (oldVal != <span style="color:#000088">null</span>)
<span style="color:#000088">return</span> oldVal;
<span style="color:#000088">break</span>;
}
}
}
<span style="color:#880000">// 计数增加1,有可能触发transfer操作(扩容)。</span>
addCount(<span style="color:#006666">1</span>L, binCount);
<span style="color:#000088">return</span> <span style="color:#000088">null</span>;
}
@SuppressWarnings(<span style="color:#009900">"unchecked"</span>)
<span style="color:#000088">static</span> final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, <span style="color:#000088">int</span> i) {
<span style="color:#000088">return</span> (Node<K,V>)U.getObjectVolatile(tab, ((<span style="color:#000088">long</span>)i << ASHIFT) + ABASE);
}
<span style="color:#880000">/*
*但是这边为什么i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量
*ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址
*所以呢,((long)i << ASHIFT) + ABASE就算i最后的地址
* 那么compareAndSwapObject的作用就算tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c;
*/</span>
<span style="color:#000088">static</span> final <K,V> boolean casTabAt(Node<K,V>[] tab, <span style="color:#000088">int</span> i,
Node<K,V> c, Node<K,V> v) {
<span style="color:#000088">return</span> U.compareAndSwapObject(tab, ((<span style="color:#000088">long</span>)i << ASHIFT) + ABASE, c, v);
}
<span style="color:#000088">static</span> final <K,V> <span style="color:#000088">void</span> setTabAt(Node<K,V>[] tab, <span style="color:#000088">int</span> i, Node<K,V> v) {
U.putObjectVolatile(tab, ((<span style="color:#000088">long</span>)i << ASHIFT) + ABASE, v);
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
我们还是继续一步步看代码,看inputVal的注释a,这个方法helpTransfer,如果线程进入到这边说明已经有其他线程正在做扩容操作,这个是一个辅助方法
<span style="color:#000000"><code><span style="color:#880000">/**
* Helps transfer if a resize is in progress.
*/</span>
<span style="color:#000088">final</span> Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; <span style="color:#000088">int</span> sc;
<span style="color:#000088">if</span> (tab != <span style="color:#000088">null</span> && (f <span style="color:#000088">instanceof</span> ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != <span style="color:#000088">null</span>) {
<span style="color:#000088">int</span> rs = resizeStamp(tab.length);
<span style="color:#000088">while</span> (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < <span style="color:#006666">0</span>) {
<span style="color:#880000">//下面几种情况和addCount的方法一样,请参考addCount的备注</span>
<span style="color:#000088">if</span> ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + <span style="color:#006666">1</span> ||
sc == rs + MAX_RESIZERS || transferIndex <= <span style="color:#006666">0</span>)
<span style="color:#000088">break</span>;
<span style="color:#000088">if</span> (U.compareAndSwapInt(<span style="color:#000088">this</span>, SIZECTL, sc, sc + <span style="color:#006666">1</span>)) {
transfer(tab, nextTab);
<span style="color:#000088">break</span>;
}
}
<span style="color:#000088">return</span> nextTab;
}
<span style="color:#000088">return</span> table;
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
当我们的putVal执行到addCount的时候
<span style="color:#000000"><code><span style="color:#000088">private</span> final <span style="color:#000088">void</span> <span style="color:#009900">addCount</span>(<span style="color:#000088">long</span> x, <span style="color:#000088">int</span> check) {
CounterCell[] <span style="color:#000088">as</span>; <span style="color:#000088">long</span> b, s;
<span style="color:#880000">//U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次竟来都baseCount都加1因为x=1</span>
<span style="color:#000088">if</span> ((<span style="color:#000088">as</span> = counterCells) != <span style="color:#000088">null</span> ||
!U.compareAndSwapLong(<span style="color:#000088">this</span>, BASECOUNT, b = baseCount, s = b + x)) {<span style="color:#880000">//1</span>
CounterCell a; <span style="color:#000088">long</span> v; <span style="color:#000088">int</span> m;
boolean uncontended = <span style="color:#000088">true</span>;
<span style="color:#000088">if</span> (<span style="color:#000088">as</span> == <span style="color:#000088">null</span> || (m = <span style="color:#000088">as</span>.length - <span style="color:#006666">1</span>) < <span style="color:#006666">0</span> ||
(a = <span style="color:#000088">as</span>[ThreadLocalRandom.getProbe() & m]) == <span style="color:#000088">null</span> ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.<span style="color:#000088">value</span>, v + x))) {
<span style="color:#880000">//多线程CAS发生失败的时候执行</span>
fullAddCount(x, uncontended);<span style="color:#880000">//2</span>
<span style="color:#000088">return</span>;
}
<span style="color:#000088">if</span> (check <= <span style="color:#006666">1</span>)
<span style="color:#000088">return</span>;
s = sumCount();
}
<span style="color:#000088">if</span> (check >= <span style="color:#006666">0</span>) {
Node<K,V>[] tab, nt; <span style="color:#000088">int</span> n, sc;
<span style="color:#880000">//当条件满足开始扩容</span>
<span style="color:#000088">while</span> (s >= (<span style="color:#000088">long</span>)(sc = sizeCtl) && (tab = table) != <span style="color:#000088">null</span> &&
(n = tab.length) < MAXIMUM_CAPACITY) {
<span style="color:#000088">int</span> rs = resizeStamp(n);
<span style="color:#000088">if</span> (sc < <span style="color:#006666">0</span>) {<span style="color:#880000">//如果小于0说明已经有线程在进行扩容操作了</span>
<span style="color:#880000">//一下的情况说明已经有在扩容或者多线程进行了扩容,其他线程直接break不要进入扩容操作</span>
<span style="color:#000088">if</span> ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + <span style="color:#006666">1</span> ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == <span style="color:#000088">null</span> ||
transferIndex <= <span style="color:#006666">0</span>)
<span style="color:#000088">break</span>;
<span style="color:#000088">if</span> (U.compareAndSwapInt(<span style="color:#000088">this</span>, SIZECTL, sc, sc + <span style="color:#006666">1</span>))<span style="color:#880000">//如果相等说明扩容已经完成,可以继续扩容</span>
transfer(tab, nt);
}
<span style="color:#880000">//这个时候sizeCtl已经等于(rs << RESIZE_STAMP_SHIFT) + 2等于一个大的负数,这边加上2很巧妙,因为transfer后面对sizeCtl--操作的时候,最多只能减两次就结束</span>
<span style="color:#000088">else</span> <span style="color:#000088">if</span> (U.compareAndSwapInt(<span style="color:#000088">this</span>, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + <span style="color:#006666">2</span>))
transfer(tab, <span style="color:#000088">null</span>);
s = sumCount();
}
}
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
看上面注释1,每次都会对baseCount 加1,如果并发竞争太大,那么可能导致U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失败,那么为了提高高并发的时候baseCount可见性失败的问题,又避免一直重试,这样性能会有很大的影响,那么在jdk8的时候是有引入一个类Striped64,其中LongAdder和DoubleAdder就是对这个类的实现。这两个方法都是为解决高并发场景而生的,是AtomicLong的加强版,AtomicLong在高并发场景性能会比LongAdder差。但是LongAdder的空间复杂度会高点。
我们每次进来都对baseCount进行加1当达到一定的容量时,就需要对table进行扩容。扩容方法就是transfer,这个方法稍微复杂一点,大部分的代码我都做了注释
<span style="color:#000000"><code><span style="color:#000088">private</span> <span style="color:#000088">final</span> <span style="color:#000088">void</span> <span style="color:#009900">transfer</span>(Node<K,V>[] tab, Node<K,V>[] nextTab) {
<span style="color:#000088">int</span> n = tab.length, stride;
<span style="color:#000088">if</span> ((stride = (NCPU > <span style="color:#006666">1</span>) ? (n >>> <span style="color:#006666">3</span>) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; <span style="color:#880000">// subdivide range</span>
<span style="color:#000088">if</span> (nextTab == <span style="color:#000088">null</span>) { <span style="color:#880000">// initiating</span>
<span style="color:#000088">try</span> {
<span style="color:#9b859d">@SuppressWarnings</span>(<span style="color:#009900">"unchecked"</span>)
Node<K,V>[] nt = (Node<K,V>[])<span style="color:#000088">new</span> Node<?,?>[n << <span style="color:#006666">1</span>];
nextTab = nt;
} <span style="color:#000088">catch</span> (Throwable ex) { <span style="color:#880000">// try to cope with OOME</span>
sizeCtl = Integer.MAX_VALUE;
<span style="color:#000088">return</span>;
}
nextTable = nextTab;
transferIndex = n;
}
<span style="color:#000088">int</span> nextn = nextTab.length;
<span style="color:#880000">//构建一个连节点的指针,用于标识位</span>
ForwardingNode<K,V> fwd = <span style="color:#000088">new</span> ForwardingNode<K,V>(nextTab);
<span style="color:#000088">boolean</span> advance = <span style="color:#000088">true</span>;
<span style="color:#880000">//循环的关键变量,判断是否已经扩容完成,完成就return,退出循环</span>
<span style="color:#000088">boolean</span> finishing = <span style="color:#000088">false</span>; <span style="color:#880000">// to ensure sweep before committing nextTab</span>
<span style="color:#000088">for</span> (<span style="color:#000088">int</span> i = <span style="color:#006666">0</span>, bound = <span style="color:#006666">0</span>;;) {
Node<K,V> f; <span style="color:#000088">int</span> fh;
<span style="color:#880000">//循环的关键i,i--操作保证了倒序遍历数组</span>
<span style="color:#000088">while</span> (advance) {
<span style="color:#000088">int</span> nextIndex, nextBound;
<span style="color:#000088">if</span> (--i >= bound || finishing)
advance = <span style="color:#000088">false</span>;
<span style="color:#000088">else</span> <span style="color:#000088">if</span> ((nextIndex = transferIndex) <= <span style="color:#006666">0</span>) {<span style="color:#880000">//nextIndex=transferIndex=n=tab.length(默认16)</span>
i = -<span style="color:#006666">1</span>;
advance = <span style="color:#000088">false</span>;
}
<span style="color:#000088">else</span> <span style="color:#000088">if</span> (U.compareAndSwapInt
(<span style="color:#000088">this</span>, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : <span style="color:#006666">0</span>))) {
bound = nextBound;
i = nextIndex - <span style="color:#006666">1</span>;
advance = <span style="color:#000088">false</span>;
}
}
<span style="color:#880000">//i<0说明已经遍历完旧的数组tab;i>=n什么时候有可能呢?在下面看到i=n,所以目前i最大应该是n吧。</span>
<span style="color:#880000">//i+n>=nextn,nextn=nextTab.length,所以如果满足i+n>=nextn说明已经扩容完成</span>
<span style="color:#000088">if</span> (i < <span style="color:#006666">0</span> || i >= n || i + n >= nextn) {
<span style="color:#000088">int</span> sc;
<span style="color:#000088">if</span> (finishing) {<span style="color:#880000">// a</span>
nextTable = <span style="color:#000088">null</span>;
table = nextTab;
sizeCtl = (n << <span style="color:#006666">1</span>) - (n >>> <span style="color:#006666">1</span>);
<span style="color:#000088">return</span>;
}
<span style="color:#880000">//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作,参考sizeCtl的注释</span>
<span style="color:#000088">if</span> (U.compareAndSwapInt(<span style="color:#000088">this</span>, SIZECTL, sc = sizeCtl, sc - <span style="color:#006666">1</span>)) {
<span style="color:#880000">//如果有多个线程进行扩容,那么这个值在第二个线程以后就不会相等,因为sizeCtl已经被减1了,所以后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a)</span>
<span style="color:#000088">if</span> ((sc - <span style="color:#006666">2</span>) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
<span style="color:#000088">return</span>;
finishing = advance = <span style="color:#000088">true</span>;<span style="color:#880000">//finishing和advance保证线程已经扩容完成了可以退出循环</span>
i = n; <span style="color:#880000">// recheck before commit</span>
}
}
<span style="color:#000088">else</span> <span style="color:#000088">if</span> ((f = tabAt(tab, i)) == <span style="color:#000088">null</span>)<span style="color:#880000">//如果tab[i]为null,那么就把fwd插入到tab[i],表明这个节点已经处理过了</span>
advance = casTabAt(tab, i, <span style="color:#000088">null</span>, fwd);
<span style="color:#000088">else</span> <span style="color:#000088">if</span> ((fh = f.hash) == MOVED)<span style="color:#880000">//那么如果f.hash=-1的话说明该节点为ForwardingNode,说明该节点已经处理过了</span>
advance = <span style="color:#000088">true</span>; <span style="color:#880000">// already processed</span>
<span style="color:#000088">else</span> {
<span style="color:#000088">synchronized</span> (f) {
<span style="color:#000088">if</span> (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
<span style="color:#000088">if</span> (fh >= <span style="color:#006666">0</span>) {
<span style="color:#000088">int</span> runBit = fh & n;
Node<K,V> lastRun = f;
<span style="color:#880000">//这边还对链表进行遍历,这边的的算法和hashmap的算法又不一样了,这班是有点对半拆分的感觉</span>
<span style="color:#880000">//把链表分表拆分为,hash&n等于0和不等于0的,然后分别放在新表的i和i+n位置</span>
<span style="color:#880000">//次方法同hashmap的resize</span>
<span style="color:#000088">for</span> (Node<K,V> p = f.next; p != <span style="color:#000088">null</span>; p = p.next) {
<span style="color:#000088">int</span> b = p.hash & n;
<span style="color:#000088">if</span> (b != runBit) {
runBit = b;
lastRun = p;
}
}
<span style="color:#000088">if</span> (runBit == <span style="color:#006666">0</span>) {
ln = lastRun;
hn = <span style="color:#000088">null</span>;
}
<span style="color:#000088">else</span> {
hn = lastRun;
ln = <span style="color:#000088">null</span>;
}
<span style="color:#000088">for</span> (Node<K,V> p = f; p != lastRun; p = p.next) {
<span style="color:#000088">int</span> ph = p.hash; K pk = p.key; V pv = p.val;
<span style="color:#000088">if</span> ((ph & n) == <span style="color:#006666">0</span>)
ln = <span style="color:#000088">new</span> Node<K,V>(ph, pk, pv, ln);
<span style="color:#000088">else</span>
hn = <span style="color:#000088">new</span> Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
<span style="color:#880000">//把已经替换的节点的旧tab的i的位置用fwd替换,fwd包含nextTab</span>
setTabAt(tab, i, fwd);
advance = <span style="color:#000088">true</span>;
}<span style="color:#880000">//下面红黑树基本和链表差不多</span>
<span style="color:#000088">else</span> <span style="color:#000088">if</span> (f <span style="color:#000088">instanceof</span> TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = <span style="color:#000088">null</span>, loTail = <span style="color:#000088">null</span>;
TreeNode<K,V> hi = <span style="color:#000088">null</span>, hiTail = <span style="color:#000088">null</span>;
<span style="color:#000088">int</span> lc = <span style="color:#006666">0</span>, hc = <span style="color:#006666">0</span>;
<span style="color:#000088">for</span> (Node<K,V> e = t.first; e != <span style="color:#000088">null</span>; e = e.next) {
<span style="color:#000088">int</span> h = e.hash;
TreeNode<K,V> p = <span style="color:#000088">new</span> TreeNode<K,V>
(h, e.key, e.val, <span style="color:#000088">null</span>, <span style="color:#000088">null</span>);
<span style="color:#000088">if</span> ((h & n) == <span style="color:#006666">0</span>) {
<span style="color:#000088">if</span> ((p.prev = loTail) == <span style="color:#000088">null</span>)
lo = p;
<span style="color:#000088">else</span>
loTail.next = p;
loTail = p;
++lc;
}
<span style="color:#000088">else</span> {
<span style="color:#000088">if</span> ((p.prev = hiTail) == <span style="color:#000088">null</span>)
hi = p;
<span style="color:#000088">else</span>
hiTail.next = p;
hiTail = p;
++hc;
}
}
<span style="color:#880000">//判断扩容后是否还需要红黑树结构</span>
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != <span style="color:#006666">0</span>) ? <span style="color:#000088">new</span> TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != <span style="color:#006666">0</span>) ? <span style="color:#000088">new</span> TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = <span style="color:#000088">true</span>;
}
}
}
}
}
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
值得细细品味的是,transfer的for循环是倒叙的,说明对table的遍历是从table.length-1开始到0的。我觉得这段代码写得太牛逼了,特别是
<span style="color:#000000"><code><span style="color:#880000">//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作,参考sizeCtl的注释</span>
<span style="color:#000088">if</span> (U.compareAndSwapInt(<span style="color:#000088">this</span>, SIZECTL, sc = sizeCtl, sc - <span style="color:#006666">1</span>)) {
<span style="color:#880000">//如果有多个线程进行扩容,那么这个值在第二个线程以后就不会相等,因为sizeCtl已经被减1了,所以后面的线程就只能直接返回,始终保证只有一个线程执行了 a(上面注释a)</span>
<span style="color:#000088">if</span> ((sc - <span style="color:#006666">2</span>) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
<span style="color:#000088">return</span>;
finishing = advance = <span style="color:#000088">true</span>;<span style="color:#880000">//finishing和advance保证线程已经扩容完成了可以退出循环</span>
i = n; <span style="color:#880000">// recheck before commit</span>
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
注意:如果链表结构中元素超过TREEIFY_THRESHOLD阈值,默认为8个,则把链表转化为红黑树,提高遍历查询效率.接下来我们看看如何构造树结构,代码如下:
<span style="color:#000000"><code><span style="color:#000088">private</span> <span style="color:#000088">final</span> <span style="color:#000088">void</span> treeifyBin(Node<K,V>[] tab, <span style="color:#000088">int</span> <span style="color:#000088">index</span>) {
Node<K,V> b; <span style="color:#000088">int</span> n, sc;
<span style="color:#000088">if</span> (tab != <span style="color:#000088">null</span>) {
<span style="color:#000088">if</span> ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << <span style="color:#006666">1</span>);
<span style="color:#000088">else</span> <span style="color:#000088">if</span> ((b = tabAt(tab, <span style="color:#000088">index</span>)) != <span style="color:#000088">null</span> && b.hash >= <span style="color:#006666">0</span>) {
synchronized (b) {
<span style="color:#000088">if</span> (tabAt(tab, <span style="color:#000088">index</span>) == b) {
TreeNode<K,V> hd = <span style="color:#000088">null</span>, tl = <span style="color:#000088">null</span>;
<span style="color:#000088">for</span> (Node<K,V> e = b; e != <span style="color:#000088">null</span>; e = e.next) {
TreeNode<K,V> p =
<span style="color:#000088">new</span> TreeNode<K,V>(e.hash, e.key, e.val,
<span style="color:#000088">null</span>, <span style="color:#000088">null</span>);
<span style="color:#000088">if</span> ((p.prev = tl) == <span style="color:#000088">null</span>)
hd = p;
<span style="color:#000088">else</span>
tl.next = p;
tl = p;
}
setTabAt(tab, <span style="color:#000088">index</span>, <span style="color:#000088">new</span> TreeBin<K,V>(hd));
}
}
}
}
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证table中index位置元素是否被修改过。
1、根据table中index位置Node链表,重新生成一个hd为头结点的TreeNode链表。
2、根据hd头结点,生成TreeBin树结构,并把树结构的root节点写到table的index位置的内存中,具体实现如下:
<span style="color:#000000"><code>TreeBin(TreeNode<K,V> b) {
super(TREEBIN, <span style="color:#006666">null</span>, <span style="color:#006666">null</span>, <span style="color:#006666">null</span>);
this.first = b;
TreeNode<K,V> r = <span style="color:#006666">null</span>;
<span style="color:#000088">for</span> (TreeNode<K,V> x = b, <span style="color:#000088">next</span>; x != <span style="color:#006666">null</span>; x = <span style="color:#000088">next</span>) {
<span style="color:#000088">next</span> = (TreeNode<K,V>)x.<span style="color:#000088">next</span>;
x.<span style="color:#4f4f4f">left</span> = x.<span style="color:#4f4f4f">right</span> = <span style="color:#006666">null</span>;
<span style="color:#000088">if</span> (r == <span style="color:#006666">null</span>) {
x.parent = <span style="color:#006666">null</span>;
x.red = <span style="color:#006666">false</span>;
r = x;
}
<span style="color:#000088">else</span> {
K k = x.key;
<span style="color:#4f4f4f">int</span> h = x.hash;
<span style="color:#000088">Class</span><?> kc = <span style="color:#006666">null</span>;
<span style="color:#000088">for</span> (TreeNode<K,V> p = r;;) {
<span style="color:#4f4f4f">int</span> dir, ph;
K pk = p.key;
<span style="color:#000088">if</span> ((ph = p.hash) > h)
dir = -<span style="color:#006666">1</span>;
<span style="color:#000088">else</span> <span style="color:#000088">if</span> (ph < h)
dir = <span style="color:#006666">1</span>;
<span style="color:#000088">else</span> <span style="color:#000088">if</span> ((kc == <span style="color:#006666">null</span> &&
(kc = comparableClassFor(k)) == <span style="color:#006666">null</span>) ||
(dir = compareComparables(kc, k, pk)) == <span style="color:#006666">0</span>)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
<span style="color:#000088">if</span> ((p = (dir <= <span style="color:#006666">0</span>) ? p.<span style="color:#4f4f4f">left</span> : p.<span style="color:#4f4f4f">right</span>) == <span style="color:#006666">null</span>) {
x.parent = xp;
<span style="color:#000088">if</span> (dir <= <span style="color:#006666">0</span>)
xp.<span style="color:#4f4f4f">left</span> = x;
<span style="color:#000088">else</span>
xp.<span style="color:#4f4f4f">right</span> = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
get()方法
<span style="color:#000000"><code><span style="color:#000088">public</span> V <span style="color:#009900">get</span>(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; <span style="color:#000088">int</span> n, eh; K ek;
<span style="color:#000088">int</span> h = spread(key.hashCode());
<span style="color:#000088">if</span> ((tab = table) != <span style="color:#000088">null</span> && (n = tab.length) > <span style="color:#006666">0</span> &&
(e = tabAt(tab, (n - <span style="color:#006666">1</span>) & h)) != <span style="color:#000088">null</span>) {
<span style="color:#000088">if</span> ((eh = e.hash) == h) { <span style="color:#000088">if</span> ((ek = e.key) == key || (ek != <span style="color:#000088">null</span> && key.equals(ek)))
<span style="color:#000088">return</span> e.val;
}
<span style="color:#000088">else</span> <span style="color:#000088">if</span> (eh < <span style="color:#006666">0</span>)<span style="color:#880000">//如果eh=-1就说明e节点为ForWordingNode,这说明什么,说明这个节点已经不存在了,被另一个线程正则扩容</span>
<span style="color:#880000">//所以要查找key对应的值的话,直接到新newtable找</span>
<span style="color:#000088">return</span> (p = e.find(h, key)) != <span style="color:#000088">null</span> ? p.val : <span style="color:#000088">null</span>;
<span style="color:#000088">while</span> ((e = e.next) != <span style="color:#000088">null</span>) {
<span style="color:#000088">if</span> (e.hash == h &&
((ek = e.key) == key || (ek != <span style="color:#000088">null</span> && key.equals(ek))))
<span style="color:#000088">return</span> e.val;
}
}
<span style="color:#000088">return</span> <span style="color:#000088">null</span>;
}</code></span>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
这个get请求,我们需要cas来保证变量的原子性。如果tab[i]正被锁住,那么CAS就会失败,失败之后就会不断的重试。这也保证了get在高并发情况下不会出错。
我们来分析下到底有多少种情况会导致get在并发的情况下可能取不到值。1、一个线程在get的时候,另一个线程在对同一个key的node进行remove操作;2、一个线程在get的时候,另一个线程正则重排table。可能导致旧table取不到值。
那么本质是,我在get的时候,有其他线程在对同一桶的链表或树进行修改。那么get是怎么保证同步性的呢?我们看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt到底是干嘛的:
<span style="color:#000000"><code><span style="color:#000088">static</span> <span style="color:#000088">final</span> <K,V> Node<K,V> tabAt(Node<K,V>[] tab, <span style="color:#000088">int</span> i) {
<span style="color:#000088">return</span> (Node<K,V>)U.getObjectVolatile(tab, ((<span style="color:#000088">long</span>)i << ASHIFT) + ABASE);
}</code></span>
- 1
- 2
- 3
它是对tab[i]进行原子性的读取,因为我们知道putVal等对table的桶操作是有加锁的,那么一般情况下我们对桶的读也是要加锁的,但是我们这边为什么不需要加锁呢?因为我们用了Unsafe的getObjectVolatile,因为table是volatile类型,所以对tab[i]的原子请求也是可见的。因为如果同步正确的情况下,根据happens-before原则,对volatile域的写入操作happens-before于每一个后续对同一域的读操作。所以不管其他线程对table链表或树的修改,都对get读取可见。
参考
深入浅出ConcurrentHashMap(1.8) 作者 占小狼
探索jdk8之ConcurrentHashMap 的实现机制 作者 淮左
Java并发编程总结4——ConcurrentHashMap在jdk1.8中的改进 作者 everSeeker
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/fjse51/article/details/55260493