1.为什么要用ConcurrentHashMap
HashMap线程不安全,而Hashtable是线程安全,但是它使用了synchronized进行方法同步,插入、读取数据都使用了synchronized,当插入数据的时候不能进行读取(相当于把整个Hashtable都锁住了,全表锁),当多线程并发的情况下,都要竞争同一把锁,导致效率极其低下。而在JDK1.5后为了改进Hashtable的痛点,ConcurrentHashMap应运而生。
在学习ConcurrentHashMap之前,建议先学习HashMap,HashMap底层原理及源码分析(详细)(jdk1.7 && jdk 1.8)
2.ConcurrentHashMap为什么高效?
ConcurrentHashMap采用CAS和synchronized来保证并发安全,数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。
synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。
看ConcurrentHashMap 源码之前,必须要知道什么是CAS
JDK1.8的ConcurrentHashMap的结构图如下:
TreeBin 在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。在构造TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,里面有TreeNode属性,相当于是一个封装类。
Node 链表结点,实现了Map.Entry<K,V>接口。
3.ConcurrentHashMap 属性
- transient volatile Node<K,V>[] table
键值对桶数组;
- private transient volatile Node<K,V>[] nextTable
rehash扩容时用到的新键值对数组;
- private transient volatile long baseCount
记录当前键值对总数,通过CAS更新,对所有线程可见;
- private transient volatile int sizeCtl
sizeCtl表示键值对总数阈值,通过CAS更新, 对所有线程可见
当sizeCtl < 0时,表示多个线程在等待扩容,-1代表正在初始化tabl e,-N代表有N- 1 个线程正在 进行扩容 ;
当sizeCtl = 0时,默认值;
当sizeCtl > 0时,表示扩容的阈值;
在其他情况下,如果table还未初始化(table == null),sizeCtl表示table进行初始化的数组大小(所以从构造函数传入的initialCapacity在经过计算后会被赋给它)。如果table已经初始化过了,则表示下次触发扩容操作的阈值,算法stzeCtl = n - (n >>> 2),也就是n的75%,与默认负载因子(0.75)的HashMap一致。
- private transient volatile int cellBusy
锁标志;
- private transient volatile CounterCell[] counterCells
counter cell表,长度总为2的幂次,计算ConcurrentHashMap元素个数时使用;
- static final int MOVED = - 1
ForwardingNode的hash值。ForwardingNode,一个特殊的Node节点,hash值为-1,其中存储nextTable的引用,用于指向下一个table。
只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或者链表已经被移动
- static final int TREEBIN = - 2
树根节点的hash值 ;
- static final int RESERVED = - 3
ReservationNode的hash值
4.Unsafe与CAS
在ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。
三个核心方法
ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。
@SuppressWarnings("unchecked")
//获得在i位置上的node结点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
//在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
//因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile方法设置节点位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
5.ConcurrentHashMap的构造方法
1)无参的构造函数,使用默认的table(长度为16)创建一个新的map,这种构造方法经常使用。
public ConcurrentHashMap() {
}
2)有参的构造函数,将sizeCtl取为大于initialCapacity并且最接近initialCapacity的2的n次方。
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
6.table 的延迟初始化
创建ConcurrentHashMap对象之后,并没有初始化数组(table),只是初始化sizeCtl的值(有参的构造函数),只有当第一次进行put的时候,会根据sizeCtl的值进行初始化,put操作是并发执行的,怎么保证table只初始化一次呢?
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//进入自旋
while ((tab = table) == null || tab.length == 0) {
//开始的时候sizeCtl要么等于0,要么大于0,小于0说明正在初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // 让出cpu资源,直接自旋
//cas操作,将sc设置为-1,如果一个线程cas成功的话,会对table进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//判断table是否为空,如果为空,就进行初始化,双重检验
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//重新计算阈值,sc = 0.75n
sc = n - (n >>> 2);
}
} finally {
//将阈值赋给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
执行第一次put操作的线程会执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,有且只有一个线程能够修改成功,而其它线程只能通过Thread.yield()让出CPU时间片等待table初始化完成。
7.put操作
大致流程:
- 若数组空,则初始化,完成之后,转2
- 计算当前桶位是否有值
1)无,则 CAS 添加,失败后继续自旋,直到成功,结束自旋
2)有,转3 - 判断数组元素是否为转移节点(ForwardingNode)
1)是,说明正在扩容,则帮助扩容,之后再新增
2)否,转4 - 桶位有值,对当前桶位加synchronize锁,判断是链表还是红黑树
1) 链表,新增节点到链尾
2) 红黑树,红黑树版方法新增 - 新增完成之后,统计size,检查是否需要扩容
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
//自旋保证新增可以成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table为空,进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//不为空,就判断数组中索引为i的值是否为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//为空,进行CAS,添加新节点,成功之后,结束自旋
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//判断是否正在扩容,如果是的话,就帮忙扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//锁定当前节点(链表的头结点),保证只有一个线程能进行操作
synchronized (f) {
//再次判断节点是否被修改
if (tabAt(tab, i) == f) {
//fh,哈希值大于等于0,说明是链表中的结点
if (fh >= 0) {
binCount = 1;
//遍历链表,如果key相等的话进行覆盖
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//没有相等的key,插入尾结点
Node<K,V> pred = e;
//判断是否为尾结点,并把结点放到尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//哈希值是-2,说明是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount不等于0,说明添加成功,判断是否需要转换成红黑树,并且是否是覆盖的原有的值
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//树化
treeifyBin(tab, i);
//oldVal不为空,说明,原来有一个key,返回oldVal
if (oldVal != null)
return oldVal;
// 槽点已经上锁,只有在红黑树或者链表新增失败的时候
// 才会走到这里,这两者新增都是自旋的,几乎不会失败
break;
}
}
}
//统计size,并检查是否需要扩容
addCount(1L, binCount);
return null;
}
8.扩容(transfer)
在 put 方法最后检查是否需要扩容,从 put 方法的 addCount 方法进入transfer 方法,主要就是新建新的空数组,然后移动拷贝每个元素到新数组.在扩容过程中,依然支持并发更新操作;也支持并发插入。
扩容原理
ConcurrentHashMap扩容的时候,如果有其他线程进行put操作,会帮助一起扩容(主要负责迁移数据),一个线程负责(按从后往前的顺序)一个stride部分,将数据迁移到新的table中 。
什么时候会触发扩容?
- 如果新增节点之后,所在的链表的元素个数大于等于8,则会调用treeifyBin把链表转换为红黑树。在转换结构时,若tab的长度小于MIN_TREEIFY_CAPACITY,默认值为64,则会将数组长度扩大到原来的两倍,并触发transfer,重新调整节点位置。(只有当tab.length >= 64, ConcurrentHashMap才会使用红黑树。)
- 新增节点后,addCount统计tab中的节点个数大于阈值(sizeCtl),会触发扩容。
addCount方法
addCount方法,两个功能:增加元素个数,检测是否需要扩容。这里主要分析扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS更新baseCount,这个下面会详细讲
......
//check就是结点数量,有新元素加入成功才检查是否要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//s表示加入新元素后容量大小,计算已省略。
// s(元素个数)大于等于sizeCtl,触发扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 扩容标志位,下面会详细讲
// n不同则返回值不同,它的返回值被当作是当前table的标识,扩容期间sizeCtl的高16为就为该值,
// 低16为等于当前扩容线程数加一
int rs = resizeStamp(n);
//sc<0表示已经有线程在进行扩容工作
if (sc < 0) {
// 扩容已经结束,中断循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 进行扩容,并设置sizeCtl,表示扩容线程 + 1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 触发扩容(第一个进行扩容的线程)
// 并设置sizeCtl为rs << RESIZE_STAMP_SHIFT) + 2告知其他线程
//sizeCtl之前代表阀值,更改后高16位为标识,低16位为扩容线程数加一
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//第二个参数为null会初始化新数组nextTable,确保只有一个线程新建table
transfer(tab, null);
// 统计个数,用于循环检测是否还需要扩容
s = sumCount();
}
}
}
resizeStamp()
该函数返回一个用于数据校验的标志位,意思是对长度为n的table进行扩容。它将n的前导零(最高有效位之前的零的数量)和1 << 15做或运算,这时低16位的最高位为1,其他都为n的前导零。
private static int RESIZE_STAMP_BITS = 16;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
初始化sizeCtl(扩容操作被第一个线程首次进行)的算法为(rs << RESIZE_STAMP_SHIFT) + 2(addCount方法里),首先RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS = 16,那么rs << 16等于将这个标志位移动到了高16位,这时最高位为1,所以sizeCtl此时是个负数,然后加二(至于为什么是2,还记得有关sizeCtl的说明吗?1代表初始化状态,所以实际的线程个数是要减去1的)代表当前有一个线程正在进行扩容,
这样sizeCtl就被分割成了两部分,高16位是一个对n的数据校验的标志位,低16位表示参与扩容操作的线程个数 + 1。
可能会有读者有所疑惑,更新进行扩容的线程数量的操作为什么是sc + 1而不是sc - 1,这是因为对sizeCtl的操作都是基于位运算的,所以不会关心它本身的数值是多少,只关心它在二进制上的数值,而sc + 1会在低16位上加1。
总结一下sizeCtl的变化
table 初始化:
1)根据你调用的构造函数的不同,比如无参则sizeCtl = 0,initTable中将数组初始化为16;
若传了大小,则先经tableSizeFor改变大小确保为2的n次幂,之后赋给sizeCtl,
initTable中将数组初始化为sizeCtl大小
2)=-1 在初始化数组期间,即initTable里为了保证只有一个线程能够初始化table数组,
线程会利用cas将sizeCtl改为-1,之后的线程检测到sizeCtl< 0会退回到就绪状态
3)数组初始化完成后sizeCtl变为为阀值,大小为0.75倍数组大小
table 扩容:
第一个执行扩容的线程会将sizeCtl变为< 0,扩容期间sizeCtl高16位代表本次扩容的标识,不同扩容数组大小标识不同,
低16位数大小代表扩容线程数加一,假设为N则代表有N-1个线程在执行扩容操作。
下面的源码分析可以看出,很多方法会判断sizeCtl的正负,<0则代表正在扩容,>0则代表阀值
transfer方法
当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。
整个扩容操作分为两个部分
- 第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是扩容的第一个线程完成的。会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。
- 第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。
先来看一下单线程是如何完成的:
它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:
-
如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
-
如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上
-
如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//n为旧tab的长度,stride为步长(就是每个线程迁移的节点数)
int n = tab.length, stride;
// 根据当前机器的CPU数量来决定每个线程负责的bucket数
// 避免因为扩容线程过多,反而影响到性能
//单核步长为1,多核为(n>>>3)/ NCPU,最小值为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab为空,则将table扩为原来的两倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//transferIndex也为全局属性,用于控制迁移位置
transferIndex = n;
}
int nextn = nextTab.length;
//ForwardingNode是正在被迁移的Node,它的key,value,next都为null
//hash为MOVED,其中有个nextTable属性指向新tab[]
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//advance为true,可以继续迁移下一个节点,false则停止迁移
boolean advance = true;
//是否结束迁移
boolean finishing = false; // to ensure sweep before committing nextTab
//i是当前迁移位置的索引,bound是迁移的边界,是从后往前的顺序
for (int i = 0, bound = 0;;) {
// 这个循环使用CAS不断尝试为当前线程分配任务
// 直到分配成功或任务队列已经被全部分配完毕
// 如果当前线程已经被分配过bucket区域
// 那么会通过--i指向下一个待处理bucket然后退出该循
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// --i表示将i指向下一个待处理的bucket
// 如果--i >= bound,代表当前线程已经分配过bucket区域
// 并且还留有未处理的bucket
if (--i >= bound || finishing)
advance = false;
//transferIndex(上一次迁移的边界)赋值给nextIndex(必执行),这里transferIndex一旦小于等于0
//则说明原数组的所有位置的迁移都有相应的线程去处理了,该线程可以不用迁移了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//将nextBound赋值给transferIndex,nextBound = nextIndex - stride(上一个边界减去步长)
//i = nextIndex - 1(上一个边界-1变成开始迁移的位置)
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//确定当前线程每次分配的待迁移桶的范围为[bound, nextIndex)
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//i < 0说明所有迁移任务都已经分配给对应的线程了,
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//finishing为true,说明所有迁移完成,将nextTable设为空,sizeCtl为新tab.length * 0.75
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//该线程完成迁移,sizeCtl - 1,对应之前helpTransfer()中+1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// (resizeStamp << RESIZE_STAMP_SHIFT) + 2代表当前有一个扩容线程
// 相对的,(sc - 2) != resizeStamp << RESIZE_STAMP_SHIFT
// 表示当前还有其他线程正在进行扩容,所以直接返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//如果相等,则说明所有线程都完成任务了,设置finish为true
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果旧tab[i]为null,则放入ForwardingNode,以通知其他现程
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果该节点为ForwardingNode,则说明已经被迁移过了,就可以开始迁移下一个节点了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//迁移开始加锁,这部分和1.8HashMap差不多,将一条链表拆分成两条,一条是正序,另外一条是反序
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
//找出最后一段完整的fh&n不变(扩容后位置不变)的链表,这样最后这一段链表就不用重新创建新结点了。
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// runBit == 0,说明位置没有变,不等于0,说明位置变化为oldLength + 原位置
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//lastRun之前的结点因为fh&n不确定,所以全部需要重新迁移。
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在新数组位置上放置拷贝的值
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 在老数组位置上放上 ForwardingNode 节点
// put 时,发现是 ForwardingNode 节点,就不会再动这个节点的数据了
setTabAt(tab, i, fwd);
advance = true;
}
//树的迁移
。。。
}
}
}
}
}
扩容理解之后,再来看一下helpTransfer方法
helpTransfer方法
只有在桶的头节点的hash值为MOVED(这时为ForwardingNode)时才会调用helpTransfer方法,功能是帮助迁移节点,这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//只有f的hash为MOVED,才会执行该方法,说明f节点是ForwardingNode
//如果nextTable为null,则表示迁移完成了,详见transfer()
//三个判断条件判断的是扩容是否结束,ForwardingNode再创建时持有nextTable数组的引用,
//nextTable会在扩容结束后被置为null。
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 本次扩容的标识,数组大小不变则rs不变
int rs = resizeStamp(tab.length);
//循环的这些判断条件为tue的话表明扩容未结束,扩容时sizeCtl一定小于0
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 扩容已经结束,中断循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 每有一个线程来帮助迁移,sizeCtl就+1,初始值为(rs << RESIZE_STAMP_SHIFT) + 2)
//(ps:在tryPresize()设置),之后再transfer中会用到
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
9.get()方法
get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找.
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//根据hash值计算结点的位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 先尝试判断链表头是否为目标,如果是就直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// eh < 0代表这是一个特殊节点(TreeBin或ForwardingNode)
// 所以直接调用find()进行遍历查找
return (p = e.find(h, key)) != null ? p.val : null;
//遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
10.Size
之前在解析addCount时有部分代码被省略,省略的那部分代码与ConcurrentHashMap的size操作有关。对于ConcurrentHashMap来说table中的节点数量是个不确定的值,你没法停下所有正在执行各种操作的线程们来统计准确数字,也没必要,所以折中一下返回个估计值。下面来看看如何统计出来的。
相关内部类与变量:
//CounterCell是一个简单的内部静态类,每个CounterCell都是一个用于记录数量的单元
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
//用于与CAS配合实现排他性,CAS从0改为1代表获取锁
//用于保护初始化CounterCell、初始化CounterCell数组以及对CounterCell数组进行扩容时的安全
private transient volatile int cellsBusy;
//初始大小为2,每次扩容翻倍,存储CounterCell对象,该对象有个value变量,用来存储个数
//该数组的大小上限与当前机器的CPU数量有关,它不会被主动初始化,
//只有在调用fullAddCount()函数时才会进行初始化
private transient volatile CounterCell[] counterCells;
//Java 8声明了一个volatile变量baseCount用于记录元素的个数,对这个变量的修改操作是基于CAS的,
//每当插入元素或删除元素时都会调用addCount()函数进行计数。
private transient volatile long baseCount;
注解@sun.misc.Contended用于解决伪共享问题。所谓伪共享,即是在同一缓存行(CPU缓存的基本单位)中存储了多个变量,当其中一个变量被修改时,就会影响到同一缓存行内的其他变量,导致它们也要跟着被标记为失效,其他变量的缓存命中率将会受到影响。解决伪共享问题的方法一般是对该变量填充一些无意义的占位数据,从而使它独享一个缓存行。
mappingCount与Size方法
这两个方法都是统计个数的,不同在于size返回int,mappingCount返回long,文档注释建议使用mappingCount
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看出sumCount是关键。统计的方法就是遍历counterCells将每个位置存储的值相加再加上baseCount的值,和就是此时的个数估计值。
为了搞清一开始说的三个变量的用途,回到addCount里被我省略的部分:这前半部是为了得出此时的元素个数 s,在下半部代码中若 s 大于等于阀值sizeCtl会进行扩容。
首先若counterCells数组不为null,调用sumCount计算元素个数,赋给 s;否则CAS增加baseCount += x,并将其赋给变量 s
若是CAS失败再次判断counterCells数组是否已初始化,已初始化则获取当前线程的CounterCell,CAS增加其value值,最后调用sunCount计算个数赋给 s 。若是未初始化调用fullAddCount(x , true),若是CAS失败调用fullAddCount(x , false)
fullAddCount方法:该函数负责初始化CounterCells和更新计数。第二个参数wasUncontended意思是:是否不存在竞争。CAS失败调用该方法说明存在竞争所以传false。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//counterCells为空的话,就对baseCount进行CAS,加1
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//counterCells未初始化,或者对baseCount进行CAS失败的话
//再次判断counterCells没有初始化
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 调用fullAddCount(),该函数负责初始化CounterCells和更新计数
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//统计总数
s = sumCount();
}
//检查是否需要扩容,上面已经讲过
....
- ConcurrentHashMap的计数设计与LongAdder类似。在一个低并发的情况下,就只是简单地使用CAS操作来对baseCount进行更新,但只要这个CAS操作失败一次,就代表有多个线程正在竞争,那么就转而使用CounterCell数组进行计数,数组内的每个ConuterCell都是一个独立的计数单元。
- 每个线程都会通过ThreadLocalRandom.getProbe() & m寻址找到属于它的CounterCell(类似hashMap),然后进行计数。ThreadLocalRandom是一个线程私有的伪随机数生成器,每个线程的probe都是不同的(这点基于ThreadLocalRandom的内部实现,它在内部维护了一个probeGenerator,这是一个类型为AtomicInteger的静态常量,每当初始化一个ThreadLocalRandom时probeGenerator都会先自增一个常量然后返回的整数即为当前线程的probe,probe变量被维护在Thread对象中),可以认为每个线程的probe就是它在CounterCell数组中的hash code。
- 这种方法将竞争数据按照线程的粒度进行分离,相比所有竞争线程对一个共享变量使用CAS不断尝试在性能上要效率高多了,这也是为什么在高并发环境下LongAdder要优于AtomicInteger的原因。
- fullAddCount()函数根据当前线程的probe寻找对应的CounterCell进行计数,如果CounterCell数组未被初始化,则初始化CounterCell数组和CounterCell。该函数的实现与Striped64类(LongAdder的父类)的longAccumulate()函数是一样的,把CounterCell数组当成一个散列表,每个线程的probe就是hash code,散列函数也仅仅是简单的(n - 1) & probe。
- CounterCell数组的大小永远是一个2的n次方,初始容量为2,每次扩容的新容量都是之前容量乘以二,处于性能考虑,它的最大容量上限是机器的CPU数量。
- 所以说CounterCell数组的碰撞冲突是很严重的,因为它的bucket基数太小了。而发生碰撞就代表着一个CounterCell会被多个线程竞争,为了解决这个问题,Doug Lea使用无限循环加上CAS来模拟出一个自旋锁来保证线程安全,自旋锁的实现基于一个被volatile修饰的整数变量,该变量只会有两种状态:0和1,当它被设置为0时表示没有加锁,当它被设置为1时表示已被其他线程加锁。这个自旋锁用于保护初始化CounterCell、初始化CounterCell数组以及对CounterCell数组进行扩容时的安全。
- CounterCell更新计数是依赖于CAS的,每次循环都会尝试通过CAS进行更新,如果成功就退出无限循环,否则就调用ThreadLocalRandom.advanceProbe()函数为当前线程更新probe,然后重新开始循环,以期望下一次寻址到的CounterCell没有被其他线程竞争。
- 如果连着两次CAS更新都没有成功,那么会对CounterCell数组进行一次扩容,这个扩容操作只会在当前循环中触发一次,而且只能在容量小于上限时触发。
对于统计总数,只要能够理解CounterCell的思想,就很简单了。仔细想一想,每次计数的更新都会被分摊在baseCount和CounterCell数组中的某一CounterCell,想要获得总数,把它们统计相加就是了。
参考:
Map大家族的那点事儿
JUC源码解析-ConcurrentHashMap1.8
关于jdk1.8中ConcurrentHashMap的方方面面
ConcurrentHashMap源码分析(JDK8版本)
ConcurrentHashMap1.8源码tryPresize()和transfer()方法解析
看完这篇ConcurrentHashMap源码解析,我又觉得能手撕面试官了
在这里感谢上述博客大佬。
如有不足之处,欢迎指正,谢谢!