目录
Java版本:8u261。之前我对HashMap的源码也进行过分析,感兴趣的话可以查看《较真儿学源码系列-HashMap(逐行源码带你分析作者思路)》(强烈建议看一下,因为我下面对ConcurrentHashMap源码的分析中会牵扯到我之前对HashMap的分析,以及分析其中的一些差异点)。
1 简介
HashMap虽然很高效,使用起来也很方便,但很遗憾它却不是线程安全的(比如各种++操作就不是线程安全的),阅读源码可知道HashMap并没有对并发做控制。但是无妨,Doug Lea大神为我们提供了ConcurrentHashMap这个工具类,一个并发版本的HashMap。相比于Hashtable只是简单地将方法加上synchronized关键字来控制并发(所有方法共用一个锁资源,同一时刻只能有一个线程在调用),ConcurrentHashMap做到了真正的并发调用。
和HashMap一样,ConcurrentHashMap在Java 7和Java 8中也发生了变化:在Java 7中是使用了一个继承ReentrantLock的分段锁Segment来实现并发的(我之前也写过对ReentrantLock进行源码分析的文章《较真儿学源码系列-AQS(逐行源码带你分析作者思路)》),每个Segment里面保存着一个类似于HashMap的结构。所以说一个Segment里面的操作和另一个Segment里面是互不干扰的,同时也就是在说ConcurrentHashMap的并发程度取决于Segment的数量(默认为16)。但是这样使用起来未免还是觉得并发粒度太粗了,所以在Java 8中做了改进。
Java 8中的ConcurrentHashMap完全摒弃了之前的数据结构,而是采用了和HashMap一样数组+链表+红黑树的结构,整体变得更加轻便。并发控制是通过CAS + synchronized + Unsafe类直接操作地址(volatile语义)的方式来实现,而且并发粒度也缩减到了桶上(即并发粒度是数组的长度。注意,说并发粒度是在Node节点上的这个说法是错误的。因为不可能存在一个线程修改链表上的一个节点,而另外一个线程同时修改同一个链表上的另一个节点。源码中锁住了链表上的第一个节点,这只是表面上的含义,真正的含义是锁住整个桶(链表))。其源码结构也是和HashMap类似的,只不过是在一些会出现并发问题的关键代码处改为了对并发的支持(在多线程计数和利用多线程来做扩容这两个方法中做了较大的改动)。两者结构类似也更方便我们来查看二者在并发实现上的差异。所以从某种意义上来说,Java 8中的ConcurrentHashMap是对HashMap在并发方面所做的改进版本。但是这并不意味着取代,因为在一些不需要考虑并发的场景下(比如局部变量),HashMap比ConcurrentHashMap有着更高的性能(CAS和synchronized多多少少会有点儿开销,而且还有其他需要考虑并发的代码)。
ReentrantLock源码也是Doug Lea写的。ConcurrentHashMap源码从Java 7中用ReentrantLock(Segment)来实现并发控制,到Java 8中改用CAS + synchronized + Unsafe类直接操作地址(volatile语义)的方式,可以看出在Java 8这个版本中synchronized的性能已经优化地很好了(偏向锁+轻量级锁+重量级锁,详见我的另一篇文章《synchronized锁的升级过程》)。其实synchronized可以不断地去优化它的性能,因为它是属于底层的实现。而ReentrantLock继承于AQS,还是属于代码层次上的阻塞与唤醒(依赖于底层操作系统的线程库),优化幅度不高。
还有一点需要提的是:在HashMap中是允许key和value为null的,而在ConcurrentHashMap中则是不允许的,会抛出空指针异常。这是为什么呢?首先来说明一下value不能为null的原因。其实我在分析HashMap中的get方法中就已经说过,通过get方法来获取键所对应的值,结果为null的话是具有二义性的。我分不清到底是因为存进去的就是map.put("key", null);,还是这个键值对本身就在HashMap中不存在,从而返回的null。但是在HashMap中,我可以通过containsKey方法来查看到底是属于上面哪种情况,因为HashMap本身就是假定在单线程中能正确执行,所以这样来做不会有问题。但是在ConcurrentHashMap中,假定说也是允许value为null的话,那么我也按照上面的方式来进行判断,可能会写出下面的代码:
if (map.containsKey("key")) {
return map.get("key");
} else {
//做一些其他的处理,比如说抛出一个没有key的异常
}
假定当前ConcurrentHashMap中就有一个"key"->null的键值对。而就在第一个线程执行完containsKey方法,返回true,而此时准备要执行get方法的时候,第二个线程将这个键值对删掉了,此时第一个线程get方法返回null就产生了二义性:我以为是当前有"key"->null这个键值对,get方法才返回的null,而实际上是因为这个键值对已经被删掉了才返回的null。
再来说说key不能为null的原因。其实说实话,我没能找到一种场景下能很好地解释出key不能为null的原因(就如同上面解释value不能为null那样),而下面是Doug Lea对于这个问题的解释:
他也只是解释了value不能为null的原因(就如同上面我说的那样),但是在倒数第二段中,他说到了一点就是检查key和value是null,这个是很困难的。其实看完他说的这段话,我有理由相信,key不能为null就是Doug Lea提前设定好的代码规范(既然你value已经不能为null了,key也别为null了),以此来避免没必要出现的麻烦(二义性或其他)。上面还说到了Doug Lea认为HashMap中的key和value也不能为null,同时给出了一种在多线程使用HashMap时,用一种包装NULL为空对象的方式,以此来区分出这两种差异(用Java 8中的Optional类应该也能做到)。有意思的是,这和Josh Bloch(另一位Java界大佬,HashMap的主要作者之一(Doug Lea也是)。可以认为HashMap主要是Josh Bloch写的,而ConcurrentHashMap是Doug Lea写的)之前的想法是相左的,但是后来,Josh Bloch似乎改口了:
他同意了key为null可能会造成错误,但不确定value是否应该不能为null。并且认为如果在JDK中加入这种包装NULL空对象的方式是需要慎重考虑的。这些问答发布于06年,但直到如今的Java 8u261,我在源码中还是没有找到类似的修改点。但不管怎样,我们知道有这么回事儿就行了。原文地址:http://cs.oswego.edu/pipermail/concurrency-interest/2006-May/002485.html,感兴趣的可以查看,后面几篇文章写了一些其他人实现包装NULL空对象的代码。
2 构造器
/**
* ConcurrentHashMap:
* 无参构造器
* 空实现,所有参数都是走默认的
*/
public ConcurrentHashMap() {
}
/**
* 有参构造器
*/
public ConcurrentHashMap(int initialCapacity) {
//initialCapacity非负校验
if (initialCapacity < 0)
throw new IllegalArgumentException();
/*
与HashMap不同的是,这里initialCapacity如果大于等于2的29次方的时候(HashMap这里为超过2的30次方),
就重置为2的30次方
tableSizeFor方法是用来求出大于等于指定值的最小2次幂的(我在HashMap源码分析中详细解释了该方法的执行过程),
有意思的是,注意在第26行代码处,在HashMap中仅仅就是对设定的数组容量取最小2次幂,而这里首先对设定值*1.5+1后,
再取最小2次幂,后面会解释为什么会这么做
*/
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
/*
sizeCtl是用来记录当前数组的状态的(类似于HashMap中的threshold):
1.如果为-1,代表当前数组正在被初始化
2.如果为其他负数,代表当前数组正在被扩容。取该负数的低16位,即(1 + n),n代表正在执行扩容操作的线程数量
(这里+1是为了错开-1这个值)
3.在调用有参构造器的时候存放的是需要初始化的容量
4.调用无参构造器的时候为0
5.当前数组已经不为空,此时存放的是下次需要扩容时的阈值
*/
this.sizeCtl = cap;
}
在上面的第26行代码处,首先会对设定值*1.5+1后(+1是对应*1.5后如果结果有小数的情况。因为最后是要取整(传进tableSizeFor方法中的参数必须是int)的,也就是将所有小数部分都截掉,所以+1是为了弥补这种差异),然后再取最小2次幂,这和HashMap中的实现有所不同(HashMap中是tableSizeFor(initialCapacity)),那么这到底是为什么呢?如果细心的人可以看到,在这个构造器中,已经没有了负载因子的操作,整个源码中也用得很少(仅有的一处使用也是遗留代码)。在ConcurrentHashMap中,似乎Doug Lea已经不想让我们去调整负载因子的大小了,而是采用默认的0.75(0.75可以用n - (n >>> 2)来表示)。也就是说,数组容量会自适应,传进来的容量实际上并不是存进去的桶的个数,而是需要扩容时的个数。举个例子就明白了:16 * 0.75 = 12,在HashMap中,我们传进来的其实是16,需要乘负载因子后才是实际需要扩容时的阈值点;而在ConcurrentHashMap中,传进来的值其实相当于12,也就是说我们传进来的就是需要扩容的阈值。所以在构造器阶段需要除以负载因子,以此来求出真正的桶的个数。所以在上面的第26行代码处,实际上就是在做自适应容量的工作。那么可能又会在想:不对啊,即使是在做自适应,那也应该是数组容量 / 0.75啊?*1.5是什么鬼(*1.5的扩容策略其实并不少见,在ArrayList中就是采用*1.5的方式)?我猜测可能是为了提高执行速度,其实/0.75就相当于*1.333333...,这样和*1.5来对比的话似乎差别也不太大,但是/0.75的方式毕竟是除法,又带小数,而*1.5可以优化为右移操作。但是这么做的话可能会使计算出的结果导向为另一个错误的值,下面来举个例子:比方说现在传进来的容量是22,那么/ 0.75的方式结果是29.3,+1后再tableSizeFor结果是:32;而*1.5的方式结果是33,+1后再tableSizeFor结果是:64。可以看到,*1.5方式最后计算出来的容量明显是不对的,相当于多扩容了一倍(负载因子相当于强制为0.75,所以22 / 0.75后+1再取2的幂,结果肯定是32而不是64)。而实际上的结果也确实如此,这里实际上是个bug。在OpenJDK的bug提交记录上可以看到如下的JDK-8202422:
从上面可以抓取到几个信息:这个bug从Java 8开始就已经有了,已经在Java 11.0.1中修复了。既然如此,我们就来看看这块改成了什么。以下是Java 14.0.2中的ConcurrentHashMap单参数构造器的源码:
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, LOAD_FACTOR, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long) (1.0 + (long) initialCapacity / loadFactor);
int cap = (size >= (long) MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int) size);
this.sizeCtl = cap;
}
可以看到在第11行代码处,已经改为了initialCapacity / loadFactor的方式,解决了这个bug点。
还有一点需要明确:sizeCtl在为负数表示扩容的时候(不包括-1),严格的定义为取该负数的低16位,即(1 + n),n代表正在执行扩容操作的线程数量(这里+1是为了错开-1这个值)。低16位表示的才是扩容线程数量+1,而高16位为一个生成数组长度所对应的标志位(详见后面的示意图)。而在源码中的注释是这样写的:
可以看到并不准确。
3 put方法
/**
* ConcurrentHashMap:
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//注意,ConcurrentHashMap中的key和value是不允许为null的,但在HashMap中却可以
if (key == null || value == null) throw new NullPointerException();
//计算key的hash,注意,这里必须是一个非负数,详见spread方法中的注释
int hash = spread(key.hashCode());
//binCount表示添加当前节点前,这个桶上面的节点数
int binCount = 0;
//注意这里是个死循环
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果当前数组还没有初始化的话,就进行初始化的工作(延迟初始化至该方法中)。然后会跳到下一次循环,添加节点
tab = initTable();
/*
tabAt方法是Unsafe类中通过volatile方式获得指定地址所对应的值,方式是通过(n - 1) & hash
也就是通过(n - 1) & hash的方式来找到这个数据插入的桶位置,和HashMap是一样的
*/
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
/*
casTabAt方法是Unsafe类中通过CAS的方式设置值,这里的意思是如果这个桶上还没有数据存在的话,
就直接创建一个新的Node节点插入进这个桶就可以了,也就是快速判断。当然如果CAS失败了,会进入
到下一次循环中继续判断
*/
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
break;
} else if ((fh = f.hash) == MOVED)
/*
如果当前桶的第一个节点是ForwardingNode节点的时候(ForwardingNode节点的hash值为MOVED),
也就是说当前桶正在被迁移中,就去帮助一起去扩容。等扩容完成后,就更新一下tab,下次循环还是会去插入节点的
*/
tab = helpTransfer(tab, f);
else {
//走到这里说明当前这个桶上有节点
V oldVal = null;
/*
注意这里使用了synchronized来锁住了当前桶上的第一个节点,同时也就证明了在Java 8的
ConcurrentHashMap中,锁的粒度是在桶(锁住第一个节点也就是在锁住这个桶)这个级别
*/
synchronized (f) {
/*
双重检查,可能的一种情况是(我的个人猜测):如果此时有两个线程走到了第48行代码处。第一个线程进入到了
synchronized同步语句块中,并插入了新节点,最后触发了扩容操作,此时table已经是一个newTable了
然后第二个线程进来,下面的判断条件发现不等(tabAt方法是Unsafe类中直接拿的主内存值,而此时table
已经扩容成newTable了。所以此时会找到newTable中i位置处的第一个节点,以此和旧table中i位置处的
第一个节点对比(f是局部变量),发现不是同一个位置),于是就会退出同步语句块,进入到下一次循环中
不管最终是不是这种解释,在synchronized同步语句块中加上双重检查本身就是一个好的编程习惯
*/
if (tabAt(tab, i) == f) {
/*
如果节点是普通的Node节点的话(在spread方法中提到过,如果节点hash值>=0的话,
就是一个普通的Node节点)
*/
if (fh >= 0) {
//设置binCount=1
binCount = 1;
/*
其实从下面的循环可以看出,ConcurrentHashMap中去掉了HashMap中的快速判断模式
注意,在链表上每循环一个节点,binCount就+1(for循环运行机制:第一个节点不加)
*/
for (Node<K, V> e = f; ; ++binCount) {
K ek;
//如果桶上当前节点的hash值和要插入的hash值相同,并且key也是相同的话
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
//如果onlyIfAbsent为false,就新值覆盖旧值
e.val = value;
break;
}
Node<K, V> pred = e;
/*
e指向下一个节点,如果下一个节点为null,意味着已经循环到最后一个节点
还没有找到一样的,此时将要插入的新节点插到最后(pred指针指向当前节点的
上一个节点,因为e此时已经变成当前节点的下一个节点了)
*/
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key,
value, null);
break;
}
}
} else if (f instanceof TreeBin) {
//如果节点是红黑树的话
Node<K, V> p;
//设置binCount=2,后面会解释这里设置为2的意义
binCount = 2;
//执行红黑树的插入节点逻辑(红黑树的分析本文不做展开)
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount != 0说明要么已经在链表上添加了一个新节点,要么在红黑树中插入了一个节点
if (binCount != 0) {
//如果链表的数量已经达到了转成红黑树的阈值的时候,就进行转换
if (binCount >= TREEIFY_THRESHOLD)
/*
我在之前的HashMap源码分析中已经说过,是否真正会转成红黑树,
需要看当前数组的桶的个数是否大于等于MIN_TREEIFY_CAPACITY,小于就只是扩容
*/
treeifyBin(tab, i);
if (oldVal != null)
//返回旧值
return oldVal;
//如果上面是在链表尾新添加了一个节点的话,就跳出死循环,进入到下面的addCount方法中
break;
}
}
}
//添加节点后,计数器+1(在该方法中,也会有其他线程帮忙进行扩容迁移的逻辑)
addCount(1L, binCount);
return null;
}
/**
* 我在HashMap源码分析中已经解释了这里用高16位和低16位进行异或来作为最终的hash的原因了
* 但是在HashMap源码中这里是(key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16)
* 因为之前已经判断过key不能为null,所以这里不用再判断了。所以实际上这里和HashMap中计算的差异点仅仅在于
* 最后多了个“& HASH_BITS”的条件。HASH_BITS是一个有31个1的二进制数,也就是Integer.MAX_VALUE
* 那么按位与之后的结果是如何呢?如果是正数(包括0)的话,这样做不会有任何作用,但是如果h是一个负数呢?
* 要知道如果是负数的话,最高位是1(最高位是符号位),这样和HASH_BITS按位与之后就变成了一个正数
* 也就是说“& HASH_BITS”这个条件的添加是为了确保计算出来的hash值是非负的。但是为什么在HashMap源码中
* 不需要添加呢?因为在之后的判断桶的位置使用的是(table.length - 1) & hash,之前在HashMap构造器中分析过,
* table.length不可能为0,最小就是1(调用构造器时table.length确实为0。但是请注意,(table.length - 1) & hash
* 这个条件是在数组扩容(初始化)方法的后面调用的,此时数组已经有容量了),所以这里table.length - 1最小就是0,
* 是非负的。所以和hash按位与之后就能把最高位符号位1改成0,当然如果hash本身就是大于等于0的话,就无所谓了
* 也就是在说,在HashMap中,将hash值非负数化处理是延迟到了(table.length - 1) & hash这个操作上
* 但是其实ConcurrentHashMap中计算桶位置也是用的“(table.length - 1) & hash”这种方式,所以说在这里使用
* “& HASH_BITS”这个条件,以此来将hash值提前非负数化处理是有原因的。原因就在于:在ConcurrentHashMap中,
* hash值为负数是有特殊含义的:
* -1(MOVED):代表当前节点正在迁移
* -2(TREEBIN):代表当前节点是红黑树节点
* -3(RESERVED):代表当前节点是用在computeIfAbsent和compute方法中的占位节点
* 而在后面的源码中可以看到当判断当前节点是否是普通Node节点时,是通过判断节点的hash值是否>=0来实现的
* (如果<0则代表是红黑树节点,RESERVED只在computeIfAbsent和compute方法中有),如果现在计算出的hash值就有
* 负数的话,那我就分不清到底是普通Node节点还是红黑树节点了
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
/**
* 第21行代码处:
* 注意,该方法只是做初始化数组用的,不像HashMap中的resize方法除了用来初始化也用来做扩容
* ConcurrentHashMap中的扩容方法是transfer
*/
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
//如果当前数组已经不为空了,就可以退出了
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
/*
前面说明过,如果sizeCtl为-1,代表当前数组正在被别的线程做初始化工作
这里的sizeCtl不用想都知道肯定是被volatile修饰的,以确保内存可见性
既然现在已经有别的线程在初始化了,那么当前这个线程就不用再做一遍了,
只需要不断让渡本资源,跳进下一次循环,直到初始化工作完成就行了
*/
Thread.yield();
/*
这里利用Unsafe的CAS操作,将sizeCtl改为-1,代表着当前线程要去进行初始化数组的工作了,
其他线程只能在上面的if条件中让渡资源。当然如果CAS竞争失败,继续去循环就行了
*/
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//走到这里,说明抢到了资源,准备开始做初始化工作
try {
/*
这里会再判断一次当前数组是否为空,避免重复的数组初始化工作。如果第一个线程已经走到第168行代码处,
然后此时被切出资源,注意此时sc还没有被赋值。这时候第二个线程进来了,完成了初始化工作后退出了
此时sizeCtl被赋值成*负载因子后的结果。而现在第一个线程又拿到资源,将sc赋值成第二个线程刚才已经
改过后的值,然后CAS成功了,那么此时又会开始进行初始化工作(之前已经初始化过了)。所以这里的再次判断
就是为了避免在高并发下,数组会被重复初始化的情况出现。这里的设计思路其实类似于单例的双重加锁模式
*/
if ((tab = table) == null || tab.length == 0) {
/*
前面分析过,如果sizeCtl=0说明当前调用的是无参构造器,那么此时改成初始值16
n此时就代表着数组应该要创建的容量(也就是桶的个数)
*/
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
//用上一行得出的n的容量来创建一个新的Node数组
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = tab = nt;
/*
初始化工作完成后,此时计算实际的阈值:n*0.75(负载因子),然后在finally子句中赋值给sizeCtl
注意这里是n - (n >>> 2),实际上也就相当于n*0.75,那么为什么HashMap中不这么做呢?之前已经
分析过,在ConcurrentHashMap中虽然还是有负载因子,但是其值已经固定为0.75,不能修改
而在HashMap中负载因子仍然可以由用户来指定,不是一个固定值,自然不能用右移来优化了
*/
sc = n - (n >>> 2);
}
} finally {
/*
之所以要把下面这行代码放在finally子句中,是因为在上面第197行代码处,使用了@SuppressWarnings注解来
抑制异常,也就是说,这里可能会抛出异常(又或者是可能发生OOM)。如果抛出异常的话,sizeCtl就一直是-1了,
这样别的线程也不能完成初始化工作,就成为死循环了。所以sizeCtl赋值这行代码放在finally子句的意义就是:
确保即使发生异常的话,也要将sizeCtl赋成初始值sc,然后再让其他的线程完成初始化工作
*/
sizeCtl = sc;
}
break;
}
}
return tab;
}
/**
* 第40行代码处:
* 帮助做扩容工作
* <p>
* (注:看该方法之前建议先看addCount方法中的分析,不然可能看不懂)
*/
final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
Node<K, V>[] nextTab;
int sc;
/*
如果此时数组不为空并且当前节点是ForwardingNode节点的时候(是ForwardingNode
节点就说明当前桶正在被迁移中)
*/
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
//此时会根据数组长度计算出一个标记位,详见resizeStamp方法的注释
int rs = resizeStamp(tab.length);
/*
nextTab == nextTable、table == tab和(sc = sizeCtl) < 0这三个条件
都是在说如果当前数组还没扩容完,也就是正在扩容中。while在此是保证一定要帮助扩容
*/
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
/*
退出扩容时的条件,也就意味着此时已经做完扩容了:
1.(sc >>> RESIZE_STAMP_SHIFT) != rs说明当前线程不在同一次扩容中(sc右移16位的结果理论上
应该和rs相同,但如果不同,说明此时的数组长度已经变了,可能是当前线程还在上一次扩容中,而其它
线程已经在下一次了(可能是sc赋值前触发了下一次扩容))
2.sc == rs + 1说明当前还有一个线程在做最后的检查工作(第一个线程初始为+2,但是最后是每个扩容线程
都会-1,实际上就相当于多减了一次,也就是这里+1的意思。而如果连检查也完成的话,sc会复位为一个正数
所以此时是最后一个线程正在做检查的时刻),那么本线程也不用帮忙了,直接等那个线程检查完就行了
(这里正确的判断条件应该为sc == (rs << RESIZE_STAMP_SHIFT) + 1,这里实际上是个bug,后面会说明)
3.sc == rs + MAX_RESIZERS和上面是一样的道理,MAX_RESIZERS表示最多可以帮助的线程数量+1(低16位
都为1,已经把低16位都占满了,不能再大了),也就是说现在已经有MAX_RESIZERS - 1个线程在帮忙做迁移了,
本线程就不掺和了(这里正确的判断条件应该为sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,
这里实际上是个bug,后面会说明)
4.transferIndex <= 0说明bound区间已经都分配完了,那么本线程也不需要扩容了
注意:相比于在addCount方法中的相同此处的判断,该处代码少了一个判断,即判断nextTable
是否为空,可以想想为什么?因为上面的while循环中已经判断了nextTab == nextTable,
说明此时nextTable肯定不为空
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
/*
此时将sizeCtl+1(注意,sizeCtl此时是负数),然后进入transfer方法帮忙做迁移
在transfer方法里面等该线程做完迁移工作后,会再将sizeCtl-1的。也就是说,在上面我对构造器
中sizeCtl所做的注释中的第2条:cizeCtl中低16位为(1+n)(高16位为标记位),这里n代表
正在执行扩容操作的线程数量
*/
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
//扩容完返回局部变量nextTab就行了,反正它本身就代表着下一次新的2倍容量的新数组。setTabAt方法保证内存可见性
return nextTab;
}
return table;
}
/**
* 该方法其实并没有什么实际意义,只是为了根据数组长度生成一个标记位,后续会拿这个标记位进行判断
*/
static final int resizeStamp(int n) {
/*
Integer.numberOfLeadingZeros方法是用来计算最高位为1之前的0的个数(包括符号位),而RESIZE_STAMP_BITS
是16,这里也就是说,将数组长度取最高位为1之前的0的个数和2的15次方做按位或的操作,得出来的数据
在低16位的最高位为1(后续再左移16位后符号位就为1了),剩下的就是0的个数了(后面会举示意图)
这里之所以会用Integer.numberOfLeadingZeros这个方法是为了确保最后计算出的结果只能在低16位上有值,
高16位上不能有值,后面会说明原因
*/
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
/**
* 第274行、第691行和第699行代码处:
* 该方法会利用多线程来分工执行扩容操作,会把迁移任务分成几个bound区间,每个bound区间中会有几个
* 桶,每个线程负责迁移本bound区间内的所有桶。因为只有在做完了本bound区间内的所有迁移工作后,才会
* 去CAS抢占下一次bound区间,在这期间不会有任何的CAS,所以多个线程之间可以并发地执行迁移工作
* 如果迁移工作都做完了的话,最后一个线程会再次检查一下所有的桶是否完成了迁移(后面有示意图)
* 当然了,如果只有一个线程,它就会完成全部的迁移工作(相当于每次都是它抢到资源)
* <p>
* (注:提前打下预防针,该方法的实现过程(尤其是前半部分)真的很不好理解,把它当作整个ConcurrentHashMap
* 源码中最难理解的内容也不为过。我也是debug了好几次才慢慢理解的,所以如果以下的注释看不懂的话,自己多
* 调试几次吧!)
*/
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
int n = tab.length, stride;
/*
定义bound区间的长度单位stride
1.stride=1:如果当前JVM最大可用线程数为1
2.stride=数组容量/(8*当前JVM最大可用线程数):当前JVM最大可用线程数大于1
3.stride=16:如果上面计算出来的值小于16,也就是说如果当前JVM最大可用线程数大于1的话,stride最小为16
该处计算是为了根据数组长度大小来计算出合适的stride
*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
//如果nextTab是空的,意味着当前线程是第一个进来的扩容线程
if (nextTab == null) {
try {
//创建一个2倍旧容量的Node数组,最后旧数组上的数据会迁移到此数组中
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
//将上面创建出来的新数组赋值给nextTab
nextTab = nt;
} catch (Throwable ex) {
//如果上面抛出异常的话(可能是OOM),就将sizeCtl设置为int的最大值,停止扩容操作
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//transferIndex指针初始指向旧数组容量
transferIndex = n;
}
int nextn = nextTab.length;
/*
创建一个新的ForwardingNode节点,注意这里将nextTab赋值进去了,此时还是一个空数组,
但是后续的setTabAt操作可以保证内存的可见性
*/
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
//advance表示是否完成了当前桶的迁移工作
boolean advance = true;
//finishing表示是否完成了所有的迁移工作(该参数是为了最后一次检查用的)
boolean finishing = false;
//i指向当前桶的位置,bound指向当前线程所分配的区间边界点
for (int i = 0, bound = 0; ; ) {
Node<K, V> f;
int fh;
//以下是在做分配bound区间,以及更新当前桶位置i的工作
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
/*
每次i都会减1,表示当前线程每迁移完一个桶就迁移下一个。--i >= bound表示当前线程分配过
bound区域,但是还没有完成这个区域内所有桶的迁移工作;finishing为true这个条件的添加
是为了保证在所有迁移工作都做完后,最后的一次检查也做完后,在此也能成功退出while循环
(其实我感觉不加也可以,跳到下面if条件中也能退出,此时transferIndex已经为0了
可能这样做是省了一次读取volatile变量的消耗(插入内存屏障)),然后会跳到第391行代码处
*/
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//小于等于0说明此时所有bound区间都被分配完了
i = -1;
advance = false;
} else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
/*
为当前线程分配新的bound边界,如果CAS失败了,说明有其他的线程已经抢占到了本bound区间,
继续循环去抢下个bound区间就可以了
*/
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/*
走到这里advance复位为false,下面这个if条件是用来判断当前线程是否迁移完了。i<0很好理解,i>=n我猜测是为了
防止数据溢出(一个线程在上面的CAS操作中一直是失败的,但是每循环一次i就-1,等减到-2147483648后
再-1就变成了2147483647(在这期间transferIndex一直大于0))。而i+n>=nextn这个条件看起来像是在判断
错次扩容的场景(nextn和n已经不是2倍的关系了),但是在本方法外面已经判断过了,而且传进来的tab和
nextTab都是局部变量,所以我猜测这里只是个安全性检查
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果迁移工作都做完了的话(最后一次检查也做完了)
if (finishing) {
//nextTable赋值为null,也就是说,nextTable只在扩容时候有值
nextTable = null;
//table此时指向两倍容量,扩容后的数组
table = nextTab;
/*
设置新的sizeCtl阈值(迁移结束后该值将变为正数),n是原数组长度,这里的意思是
sizeCtl=n*1.5,也就是sizeCtl存放的是新数组长度*0.75(n*1.5=2*n*0.75)
*/
sizeCtl = (n << 1) - (n >>> 1);
return;
}
/*
注意:走到这里说明此时还没有走最后一次检查
每当一个线程做完迁移工作后,就将sizeCtl-1,注意在外面帮助线程调用本方法的时候,
是先+1的。也就是sizeCtl低16位(1 + n)的含义
*/
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/*
见addCount方法中的解释,sc == (rs << RESIZE_STAMP_SHIFT) + 2表示
当前是第一个线程在执行扩容。而如果下面的if条件不等于,说明此时还有其他的线程
在进行扩容,而且此时所有的bound区间都分配完了,那么本线程就可以退出了(帮完忙了)
*/
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
/*
走到这里,说明上面的if条件是相等的,也就是说,当前线程是最后一个在执行着的迁移线程
注意,这里没有return,说明此时还会再次循环一遍旧数组,看其中桶头节点是否都变为了
FollowingNode节点。如果没有,就继续迁移。相当于最后会再做一遍检查工作,做收尾
*/
finishing = advance = true;
i = n;
}
} else if ((f = tabAt(tab, i)) == null)
/*
如果旧数组上该桶为null,也就是说该桶上没有数据,那就说明当前这个桶不需要做迁移
此时只需要将头节点设置为ForwardingNode节点就行了(ForwardingNode节点上的hash
值为MOVED,这样别的线程在拿到这个桶的时候,就不会操作了)
*/
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
/*
如果当前这个桶已经有别的线程在做迁移了(实际上是做完了迁移),就不需要本线程再做了,此时将
advance设置为true,进入下一次循环即可
*/
advance = true;
else {
//synchronized锁住当前链表上的第一个节点,也就是锁住了这个桶,以防其他线程操作
synchronized (f) {
//双重检查,同putVal方法中synchronized同步语句块中双重检查的解释
if (tabAt(tab, i) == f) {
Node<K, V> ln, hn;
/*
如果节点是普通的Node节点的话(在spread方法中提到过,如果节点hash值>=0的话,
就是一个普通的Node节点)
*/
if (fh >= 0) {
/*
其实下面的节点迁移的逻辑是和HashMap中是一样的,即将原桶上这个链表上每个节点hash值在数组
容量二进制数为1的那个位置处去按位与判断是0还是1,以此来拆分出两个链表。然后根据结果
如果为0的话最后就会插入到新数组的原位置,为1就插入到原位置+旧数组容量的位置(我在之前对
HashMap的分析中讲解了这里为什么是+旧数组容量)。但是在ConcurrentHashMap中做了进一步的
优化。可以试想一种情况:如果链表上所有节点计算出来的值都是0的话,那么如果还按照HashMap
中的方式来进行迁移,就还是会一个节点一个节点去遍历判断。其实这个时候我完全可以
不用去遍历,直接将原来的这个链表的头节点直接插入到新数组的原位置处就可以了,
在ConcurrentHashMap中就使用了这种优化思路
n是旧数组的容量,runBit记录的是最后一次发生计算变动的值,比如一个链表上每个节点
按位与计算出的结果分别是1 0 1 1 0 0,那么runBit最终记录的是倒数第二个节点的值:0
(因为最后一个是0,和前面这个0是一样的)
*/
int runBit = fh & n;
//如上面的解释,lastRun最终会记录到倒数第二个节点,现在记录的都是初始位置第一个节点处
Node<K, V> lastRun = f;
/*
知道了上面runBit和lastRun代表了什么,那么下面的操作其实就很明朗了,就是在找最后一个
计算值发生变动的节点
*/
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
//如果最后一个发生变动的节点是0(如果后面还有节点,就一定都为0),就将ln指针指向它
ln = lastRun;
hn = null;
} else {
//如果最后一个发生变动的节点是1(如果后面还有节点,就一定都为1),就将hn指针指向它
hn = lastRun;
ln = null;
}
/*
这里再次强调:ln或hn此时不一定代表的是原数组中最后一个节点,如果后面还有节点的话,
就跟lastRun节点的计算值是一样的
下面就是从第一个节点遍历到计算值发生变动的这个节点处(后面的节点不需要遍历了,
因为计算值都是和lastRun是一样的),逐渐去构建这两个链表的过程
*/
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0)
/*
如果计算值是0,就插入到ln链表中。注意,这里使用的是头插法,不同于HashMap
中的尾插法。原因就在于lastRun节点(ln指向lastRun)后面可能还有节点,如果
用尾插法,值就会被覆盖了。同时也就意味着,HashMap中节点的迁移是稳定的算法,
而在ConcurrentHashMap中则是不稳定的,不是正序也不是逆序。而将创建结果
再赋值给ln也是为了更新一下ln指针的位置,使ln指针始终指向第一个节点处,这点
很重要,因为下面要用到它
*/
ln = new Node<K, V>(ph, pk, pv, ln);
else
//如果计算值是1,就使用头插法插入到hn链表中。
hn = new Node<K, V>(ph, pk, pv, hn);
}
/*
走到这里说明已经将原来的旧数组上的链表拆分完毕了,现在分成了两个链表,ln和hn。接下来需要
做的工作就很清楚了:将这两个链表分别插入到新数组的原位置和原位置+旧数组容量的位置就可以了
setTabAt方法是Unsafe类中通过volatile方式设置指定地址的值,这里将ln链表赋值在新数组
nextTab的i(原数组桶的位置)位置处
注意,这里不需要再像HashMap中将ln和hn链表中最后一个节点的next指针指向null了,可以想想
为什么?因为上面第492行代码处是循环到lastRun节点为止的,也就是说我不用去管lastRun的next
指针了,因为后面如果没有节点的话next指针肯定是null的,如果后面有节点,那next指针也都是指向
正确的
*/
setTabAt(nextTab, i, ln);
//这里将hn链表赋值在新数组nextTab的i(原数组桶的位置)+旧数组容量位置处
setTabAt(nextTab, i + n, hn);
/*
将旧数组上这个桶的头节点置为ForwardingNode节点,这样该节点的hash值就变为了MOVED
也就是说,旧数组上这个桶的迁移工作,当前线程已经做完了,不再需要别的线程再做了
对应于第431行代码处。需要注意的是,这里只是做完了旧数组上一个桶的迁移工作,
并没有做完全部工作。在HashMap中,所有桶的迁移工作都是由一个线程完成的,而在
ConcurrentHashMap中则是多线程(要看是哪个线程抢到了资源,极端条件下,由一个线程
全部完成(每次都是它抢到))来完成,充分利用了多线程的优势
*/
setTabAt(tab, i, fwd);
//advance设置为true,代表当前桶的迁移工作完成了
advance = true;
} else if (f instanceof TreeBin) {
//如果是红黑树,就执行红黑树的迁移逻辑(红黑树的分析本文不做展开)
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K, V> p = new TreeNode<K, V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K, V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K, V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
/**
* 第127行代码处:
* 在HashMap中,对size计数+1是很简单的(注意,在HashMap中,size表示的是桶的个数,而在这里,
* 需要计数的是所有节点的个数,两者不是同一个维度),直接加就行了;但在ConcurrentHashMap中却是
* 一件很难做到的事,因为要涉及到多线程的操作。在Java 7中,是等待获取到所有Segment的锁之后再进行
* 统计的。也就是说会把当前所有线程都停止,然后去计算现在所有的节点数,这样虽然能精确计算出来结果,
* 但效率却非常低
* <p>
* 下面说一下在Java 8中的计数过程:
* 1.在没有并发或者低并发的场景下:baseCount是用来记录当前节点个数的;
* 2.如果CAS设置baseCount+1失败,就代表着这里有多个线程在抢着计数,那么此时就会转而使用CounterCell
* 数组来进行计数。每个线程都会通过随机探测(ThreadLocalRandom.getProbe() & m)的方式来找到
* 属于它的CounterCell数组中的那个CounterCell槽位置(ThreadLocalRandom在并发的场景下性能更好),
* 在这个CounterCell上进行计数。最后计算出baseCount与counterCells数组中所有非空值的和就是最后的结果
* 也就是说,第一个线程会在baseCount上计数,而剩下的线程,会在CounterCell数组中计数。比如说现在baseCount为16,
* 第一个线程做完了加节点后,将baseCount+1变为了17,而剩下的三个线程CAS失败,会在CounterCell数组的
* 第1、3、7个位置处赋值为1(这个索引位置是我随便举的),此时数组中所有的节点数就是17+1+1+1=20个节点
* <p>
* 这里来说一下为什么采用随机探测槽,最后将所有结果汇总的方式来进行计数,而不是采用CAS抢占的方式?
* 在高并发下,相比于所有线程在同一位置CAS来进行尝试,失败的话就继续尝试的行为,分成多个数组位置来分别计算,
* 最后汇总的方式显然要更加高效,避免了失败后的自旋过程,同时也能在同一时刻让所有的线程都在做计数工作
*/
private final void addCount(long x, int check) {
CounterCell[] as;
long b, s;
/*
如上面注释所说:在基本上没什么并发的场景下,baseCount是用来做计数用的,只要CAS设置+1成功就完事了
但是如果CounterCell数组不为空,说明现在是有多个线程在同时计数。抑或是CAS设置失败,就进入到下面
的if条件中
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a;
long v;
int m;
//uncontended表示没有发生竞争的标志位
boolean uncontended = true;
/*
如果CounterCell数组为空,或者随机探测的槽位置处为空,又或者尝试将其中探测到的CounterCell
槽位置处的值+1的时候也失败了(快速尝试),就会进入到fullAddCount方法中,以此来完成+1的操作
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//在该方法中会完成最终的计数工作
fullAddCount(x, uncontended);
/*
可以看到在计完数后,这里就退出了,没有走到下面的帮助扩容的逻辑中。为什么?可以想想走到
这里的时候,上面经历了两次CAS失败,说明当前是在一个高并发的场景下。如果此时我还去帮助
扩容的话,多个线程之间的锁竞争、上下文切换的开销,都会被放大
*/
return;
}
/*
走到这里说明上面的CAS CounterCell操作成功了,check<=1(也就是传进来的binCount),
要么是桶里当前是空的,新加了一个节点,要么就是桶里面只有一个节点,在后面新加了一个节点
这两种情况下也不会走帮助扩容的逻辑,直接返回(我猜测是因为在这种情况下,节点数量并不多,
于是就不用帮着扩容了)。这里也揭示了在第98行代码处,为什么之前在插入红黑树节点的时候,
会设置binCount=2,如果设置一个小于2的数,那后面就不会走帮忙扩容的逻辑了(不走也无妨,
走了更好)
*/
if (check <= 1)
return;
//走到这里说明check > 1,计算一下此时实际的所有节点的值,赋值给局部变量s,以便下面扩容时用到
s = sumCount();
}
/*
如果check不是负数,就进入到下面的帮助扩容逻辑中。check表示添加当前节点前的这个桶上面的节点数,
不可能是负数。所以我猜测这里只是个安全性检查
*/
if (check >= 0) {
Node<K, V>[] tab, nt;
int n, sc;
/*
s记录的是当前ConcurrentHashMap中所有的节点数量,如果其大于设置的阈值sizeCtl,并且数组不为空,
并且数组的长度小于最大长度2的30次方的话,就执行扩容操作。否则不扩容。while在此是保证一定要帮助扩容
*/
while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//此时会根据数组长度计算出一个标记位,详见resizeStamp方法的注释
int rs = resizeStamp(n);
/*
sc小于0说明此时有别的线程正在扩容(不可能为-1,因为此时初始化操作已经结束了,
并且上面已经判断数组不为空了),那当前线程就来帮助一起做扩容
*/
if (sc < 0) {
/*
退出扩容时的条件,也就意味着此时已经做完扩容了:
1.(sc >>> RESIZE_STAMP_SHIFT) != rs说明当前线程不在同一次扩容中(sc右移16位的结果理论上
应该和rs相同,但如果不同,说明此时的数组长度已经变了,可能是当前线程还在上一次扩容中,而其它
线程已经在下一次了(可能是sc和tab赋值的间隙中触发了下一次扩容))
2.sc == rs + 1说明当前还有一个线程在做最后的检查工作(第一个线程初始为+2,但是最后是每个扩容线程
都会-1,实际上就相当于多减了一次,也就是这里+1的意思。而如果连检查也完成的话,sc会复位为一个正数
所以此时是最后一个线程正在做检查的时刻),那么本线程也不用帮忙了,直接等那个线程检查完就行了
(这里正确的判断条件应该为sc == (rs << RESIZE_STAMP_SHIFT) + 1,这里实际上是个bug,后面会说明)
3.sc == rs + MAX_RESIZERS和上面是一样的道理,MAX_RESIZERS表示最多可以帮助的线程数量+1(低16位
都为1,已经把低16位都占满了,不能再大了),也就是说现在已经有MAX_RESIZERS - 1个线程在帮忙做迁移了,
本线程就不掺和了(这里正确的判断条件应该为sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,
这里实际上是个bug,后面会说明)
4.nextTable已经为空了(nextTable只在扩容时才有值)
5.transferIndex <= 0说明bound区间已经都分配完了,那么本线程也不需要扩容了
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
/*
此时sc<0,说明本线程当前是要帮助做迁移的。将sizeCtl+1(注意,sizeCtl此时是负数),然后进入
transfer方法帮忙做迁移。在transfer方法里面等该线程做完迁移工作后,会再将sizeCtl-1的。也就是说,
在上面我对构造器中sizeCtl所做的注释中的第2条:cizeCtl中低16位为(1+n)(高16位为标记位),
这里n代表正在执行扩容操作的线程数量
*/
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
/*
否则sc>=0,说明当前线程是第一个进来要进行扩容的线程,将sizeCtl初始为
(rs << RESIZE_STAMP_SHIFT) + 2,左移16位后,会将之前的标记位移动到高16位处,然后
低16位为10(+2),这里+2是为了错开1这个值,因为它代表着初始化
*/
transfer(tab, null);
//重新计算一下此时的最新节点数,以便下一次循环时进行判断
s = sumCount();
}
}
}
/**
* 在该方法中完成最终的+1计数操作
*/
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//如果ThreadLocalRandom还没有被初始化,就执行初始化的工作
if ((h = ThreadLocalRandom.getProbe()) == 0) {
//在初始化的过程中当前线程会被分配一个随机数probe(threadLocalRandomProbe)
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
//未发生竞争标志位重置为true
wasUncontended = true;
}
//冲突标志位,当其值为true,说明此时CounterCell数组该扩容了
boolean collide = false;
for (; ; ) {
CounterCell[] as;
CounterCell a;
int n;
long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
/*
CounterCell数组已经初始化了的时候,找到随机探测的槽如果为null,那么此时就
新创建一个CounterCell
*/
if ((a = as[(n - 1) & h]) == null) {
/*
cellsBusy用来表示一个锁资源,0是无锁状态,1是上锁状态
当前为0表示此时没有线程在做数组放入CounterCell的过程,也没有正在扩容
*/
if (cellsBusy == 0) {
//创建一个新的CounterCell,将1传进去
CounterCell r = new CounterCell(x);
//CAS上锁,失败了后面会将冲突标志位collide置为true
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
CounterCell[] rs;
int m, j;
//双重检查(之前见过很多次了)
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//上面新创建的CounterCell放在数组中
rs[j] = r;
created = true;
}
} finally {
/*
释放锁(放在finally子句中的意义在上面的initTable方法中
已经解释过了)
*/
cellsBusy = 0;
}
//如果创建成功了,就跳出死循环(也就是该方法结束了)
if (created)
break;
/*
走到这里说明该槽已经被别的线程设置进去了(注意上面的双重检查),
那么此时就重新循环,找下一个位置就行了
*/
continue;
}
}
//冲突标志位collide复位为false,避免之后可能会走到扩容逻辑中,而是继续下一次尝试
collide = false;
} else if (!wasUncontended)
/*
走到这里说明槽位置不为null,并且已经知道了上一次的CAS已经失败了(第621行代码处)
此时将wasUncontended重置为true,走下一遍循环即可
*/
wasUncontended = true;
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
//此时会再尝试一次将新值插入进去,插入成功就退出了,插入失败的话也无妨,找下一个位置
break;
else if (counterCells != as || n >= NCPU)
/*
counterCells != as说明此时CounterCell数组正在扩容中,n >= NCPU说明当前数组容量已经
达到或超过了当前JVM可用的最大线程数,就让collide置为false,避免走到下面的扩容逻辑中,
而是继续下一次尝试(从这里也说明了,CounterCell数组的长度不可能无限制增大,最多为
当前JVM可用的最大线程数(如果再继续增大的话,剩下的线程也是多余的,徒增消耗))
*/
collide = false;
else if (!collide)
/*
走到这里,说明上面条件都不满足。此时将冲突标志位collide由原来的false重新置为true,
等下次循环的时候如果前面还是不满足的话就会走到下面的扩容逻辑中去了
*/
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//走到这里,说明上面条件都不满足。上锁,此时要进行CounterCell数组扩容的逻辑了
try {
//如果此时数组正在被别的线程扩容中,就不用本线程再扩容了
if (counterCells == as) {
//创建一个2倍容量的新数组
CounterCell[] rs = new CounterCell[n << 1];
/*
遍历的方式来进行数据迁移(毕竟数组的最大长度是当前JVM可用的最大线程数,不会
特别大,普通遍历足矣)
*/
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
//释放锁(在finally子句中释放锁的写法,之前见过很多次了)
cellsBusy = 0;
}
//冲突标志位collide复位为false
collide = false;
//扩容后重新循环,尝试添加数据(当然,如果上面条件还是都不满足的话,还是会走到这里扩容的)
continue;
}
//走到这里,会生成一个新的随机数probe,进行下一次尝试
h = ThreadLocalRandom.advanceProbe(h);
} else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
/*
此时CounterCell数组没有初始化,如果cellsBusy没有上锁,当前没有处于扩容中,那现在就CAS上锁
以此来执行CounterCell数组初始化的工作
*/
boolean init = false;
try {
//双重检查
if (counterCells == as) {
//创建一个初始容量为2的CounterCell数组
CounterCell[] rs = new CounterCell[2];
//在槽位置处创建一个新的CounterCell
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
//释放锁
cellsBusy = 0;
}
//如果创建CounterCell数组成功,就可以退出了(此时数据也放进去了),否则继续循环
if (init)
break;
} else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
/*
如果当前cellsBusy锁正在被上锁,就退而求其次,尝试对baseCount做更新
当然,如果也失败了,也还是会继续循环
*/
break;
}
}
/**
* 第642行和701行代码处:
* 计算baseCount与counterCells数组中所有非空值的和,
* 即当前ConcurrentHashMap中所有的节点数
* <p>
* 注意这里只是计算出一个近似值,如果该方法计算出结果后,
* 此时又有一个线程进来添加了节点,那么之前计算出来的
* 结果就不准了。这个是没有办法避免的,只能在后续的代码
* 中去考虑这种情况
*/
final long sumCount() {
CounterCell[] as = counterCells;
CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
addCount方法中标记位的示意图:
多线程在进行扩容时的示意图:
上面在addCount方法和helpTransfer方法中,我注释了两个地方是存在bug的:在判断扩容完成,准备跳出的这两个条件:sc == rs + 1和sc == rs + MAX_RESIZERS,应该改为sc == (rs << RESIZE_STAMP_SHIFT) + 1和sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,这是为什么呢?可以想到,rs指向的是resizeStamp(n),也就是上面示意图演示的一个大于0的数,而sc指向sizeCtl,程序走到这里肯定是小于0的(注意上面一行代码:在addCount方法中是“sc < 0”,在helpTransfer方法中是“(sc = sizeCtl) < 0”,都是在sc小于0的前提下),那么如何才能做到一个大于0的数在+1或者+MAX_RESIZERS(65535)后,能变成一个小数呢?答案肯定是不可能的。数据溢出的情况也不可能出现,因为resizeStamp(n)方法保证数据只能放在低16位上(最大的情况也就是n为1的时候,此时前导0的个数也就是31而已,这也就是为什么在resizeStamp方法里面使用Integer.numberOfLeadingZeros方法的原因)。而上个判断迁移结束的条件是(sc >>> RESIZE_STAMP_SHIFT) != rs:将siztCtl右移16位后和resizeStamp(n)进行判断是否相等,能这么进行判断的前提也是因为resizeStamp方法计算出来的数据只能在低16位上。那么既然rs的值只能在低16位上,又何谈溢出一说呢?
所以现在造成的结果就是这两个条件永远都不会满足,相当于是个废条件,帮助线程的数量也就没有了上下界,可能会造成迁移过程中一些本不需要帮忙做迁移的线程错误地进入到transfer方法中的情况出现。这里Doug Lea原本的意思是将rs左移16位后再和sc进行判断,所以这里很明显是笔误了。在OpenJDK的bug提交记录上可以看到如下的JDK-8214427:
由上可以看到,这个bug在Java 12及之后的版本修复了,所以下面来看一下这块改成了什么。以下是Java 14.0.2中的addCount方法的部分源码:
private final void addCount(long x, int check) {
//...
if (check >= 0) {
Node<K,V>[] tab, nt;
int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
s = sumCount();
}
}
}
可以看到在上面第8行代码处,rs已经改为了左移16位的结果(helpTransfer方法也一并改掉了,这里不再看了)。但是令我不解的是:去掉了(sc >>> RESIZE_STAMP_SHIFT) != rs这个条件,该条件是为了确保当前线程和其他线程是在同一次扩容中,也就是判断标记位是否相同。如果这个条件还有的话,按上面的写法就应该变成了(sc >>> RESIZE_STAMP_SHIFT) != (rs >>> RESIZE_STAMP_SHIFT)。说实话我没有搞懂Doug Lea把这个条件去掉的原因:
我注意到了在OpenJDK bug提交记录上的另一个bug,JDK-8242464:
提交者的意思是说JDK-8214427最主要的问题是要考虑新数组容量变了的情况,而不是一个正数另一个负数的问题,Java 12中所做的更改并没有解决问题。应该将sc == rs + 1改为(sc >>>RESIZE_STAMP_SHIFT) == (rs>>>RESIZE_STAMP_SHIFT) + 1(两个标记位相差1,也就是说前导0差一位,也就意味着数组容量翻倍了,和我说的(sc >>> RESIZE_STAMP_SHIFT) != (rs >>> RESIZE_STAMP_SHIFT)是一个意思),目前Doug Lea并没有作出评论,该bug也是处于OPEN状态(疫情停工?)。其实要按照我的想法,在Java 12中应该改为(sc >>> RESIZE_STAMP_SHIFT) != (rs >>> RESIZE_STAMP_SHIFT) || sc == rs + 1,即把这两种情况都写上。
put方法分析到这里就算是分析完了,后续只能一直跟进该bug的状态和评论了。
4 get方法
/**
* ConcurrentHashMap:
*/
public V get(Object key) {
Node<K, V>[] tab;
Node<K, V> e, p;
int n, eh;
K ek;
/*
计算指定key的hash,注意,这里直接调用了key的hashCode方法,也就意味着如果传进来的
key为null的话,会抛出空指针异常
*/
int h = spread(key.hashCode());
//如果数组没有初始化,或者计算出来的桶的位置为null(说明找不到这个key),就直接返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
/*
如果桶上第一个节点的hash值和要查找的hash值相同,并且key也是相同的话,
就直接返回(快速判断模式)
*/
return e.val;
} else if (eh < 0)
/*
eh < 0说明eh是一个特殊节点:正在迁移中的节点或树节点,又或者是RESERVED节点,
此时会走find方法进行查找。而不同的节点会重写find方法。也就是说,每种特殊节点
都有自己的寻找方式
*/
return (p = e.find(h, key)) != null ? p.val : null;
/*
走到这里说明eh >= 0,即当前桶是一个正常的Node链表,那么遍历链表上的每一个节点进行查找
(第一个节点不需要判断了,因为在第17和18行代码处已经判断过了)
*/
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
/**
* 最普通的Node节点的find方法,可以看出就是做个遍历查找,判断一下hash和key是否相同就行了
*/
Node<K, V> find(int h, Object k) {
Node<K, V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
/**
* ForwardingNode节点的find方法
*/
Node<K, V> find(int h, Object k) {
outer:
/*
注意,找迁移节点是在nextTable上找的,之所以没有在当前数组中进行遍历,
是因为当前就是要查找迁移中这种场景中的节点,而在迁移时setTabAt方法能保证
nextTable的内存可见性。如果nextTable上找不到也无所谓,再调一次get方法,
等扩容结束后就能找到了
*/
for (Node<K, V>[] tab = nextTable; ; ) {
Node<K, V> e;
int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
/*
如果key为null,新数组为null或者计算出来的新数组桶的位置为null
(说明找不到这个key),就直接返回null(快速判断模式)
注意:这里的tabAt取的是nextTable上的位置,所以说如果返回为null不代表着一定
就是找不到这个key,也可能是这个桶还没有做迁移。但是无妨,下次再调用一次get方法,
等迁移做完了就能找到了
值得一提的是:跳进该方法时是ForwardingNode节点,说明此时正在迁移中
而走到该处nextTable却可能为null,说明此时已经迁移完了,所以快速返回null
当然如果在下面的代码执行中,迁移才做完,那么这个时候的快速判断就不起作用了。但无妨,
后面会再次从头往下进行判断的
*/
return null;
for (; ; ) {
int eh;
K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
//如果当前节点的hash和key都和要找的节点相同,就返回它
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
/*
再次判断一下是否是ForwardingNode节点,走到这里说明当前还在迁移中(可能还是
这次迁移也可能是下一次迁移了),那么就继续从本方法的开头处再次往下判断(其实这里不去写这个
分支也是没问题的,直接走下面第126行代码处的ForwardingNode节点的find方法
就行了。但是这样就相当于递归了,后面会解释为什么这里不用递归)
这里想去解释一下上面说的下一次迁移的意思。如果此时正在遍历链表上的节点,突然发现某一个节点由
普通的Node节点变为了ForwardingNode节点,这是怎么发生的呢?我所做的一种猜测是:
比如说一个链表上有4个节点:0,1,2,3。我判断第一个节点的key和hash不是我想要的,
那么此时就会遍历到第二个节点处也就是节点1。就在此刻,这个链表发生了扩容迁移,
迁移结束后,节点1可能被放在了2倍容量新数组的桶的第一个位置处。而不久后,又发生了一次扩容迁移,
即第二次迁移(注意这里的e是局部变量,所以能一直循环下去),那么它就会被包装为ForwardingNode
节点(注意,虽然这里的e是局部变量,但是变成ForwardingNode节点的操作是通过Unsafe类中的
setTabAt方法来实现的(volatile语义,内存可见性),所以可以及时判断出来这个节点已经变为了
ForwardingNode节点)
此时将tab更新一下,以便下次循环时候使用,也就是在说,tab此时会指向最新的nextTable,去进行查找
(对应于上面所说的情况,即下一次迁移时,这个tab更新的动作才有意义)
*/
tab = ((ForwardingNode<K, V>) e).nextTable;
continue outer;
} else
/*
走到这里说明已经不是ForwardingNode节点了(本次迁移结束,
该节点已经变成其他的节点了),可能是红黑树节点也可能是
RESERVED节点,那么就调用它们各自的find方法进行查找
*/
return e.find(h, k);
}
/*
走到这里eh >= 0,说明此时本次迁移结束(注意:如上面所说,可能还会发生下一次迁移)。当然如果在遍历的过程中,
某个节点又变成了红黑树节点(其他线程添加节点触发转红黑树阈值)或者ForwardingNode节点(下一次扩容做迁移),
就又会去它们自己覆写的find方法中进行查找(ForwardingNode节点不会递归find查找)
这里就可以说明一下,为什么ForwardingNode节点不去走递归?其实这里更多的意义在于优化。如上面所说,如果扩容
非常频繁,在遍历链表上的节点的时候,就可能会有很多节点变为了ForwardingNode,如果用递归的话可能会造成
递归层次非常深的情况出现(这里也没有使用尾递归)。可能会出现StackOverflow,即使不出现,递归层次非常深
的话也不利于维护。所以为了避免这种情况的出现,就改用了标签的方式来重进
*/
if ((e = e.next) == null)
//遍历到底也没有找到,就直接返回null
return null;
}
}
}
由上可以看出,即使是get方法,在ConcurrentHashMap中也是很复杂的,尤其是ForwardingNode节点,需要考虑到各种情况。
在put方法中,我们会用到CAS + synchronized + Unsafe类直接操作地址(volatile语义)的方式来保证并发下的插入安全,但是在get方法中,却没有发现任何的锁资源或CAS的代码出现,那么它是如何保证线程安全的呢?其实上面也分析了,table属性是volatile修饰的,取桶位置也是用的tabAt方法(Unsafe类中直接拿取指定地址的数据(volatile语义,内存可见性)),这样的话是能保证拿取到的数据永远是当前这个时刻最新的数据的。同时get方法不用加锁或CAS自旋,提高了并发读的性能。
这里我想再提一点:不是说put方法加上了synchronized锁之后,get方法就会被阻塞了。只有get方法中也去synchronized这个节点,才会被阻塞。但是从上面的源码中可以看出,get方法没有使用任何的加锁机制,所以get方法是不会被阻塞的(如果get方法受put影响,从而会阻塞,那我就会怀疑Doug Lea的水平了。而且也不是所有的put操作都会synchronized,如源码所示:如果计算的桶的位置上没有节点的话,直接就CAS插入节点了。只有计算的桶的位置上有节点的话,才会synchronized)。