我们常用的Java并发容器类是由java.util.concurrent包为我们提供的
java.util.concurrent包提供的并发容器主要分为三类:Concurrent*、CopyOnWrite*、Blocking*
其中Concurrent*
的特点大部分通过CAS+synchronized
实现的,CopyOnWrite*
则是通过复制一份原数据来实现的,而Blocking*
是通过AQS
实现的
面试常见的并发容器如ConcurrentHashMap
、CopyOnWriteArrayList
、BlockQueue的实现类
等均是来自juc包,我们只是简单的知道它们是线程安全的是完全不够的,所以,让我们一起来从底层认识下Java并发容器吧!
本文会从常见问题,源码分析,面试题总结三个部分来展开
CopyOnWriteArrayList
常见问题
诞生的历史和原因
- 代替Vector和SynchronizedList,就像ConcurrentHashMap代替SynchronizedMap一样
- Vector和SynchronizedList的锁的粒度太大了,并发效率相对较低,并且迭代时无法编辑
- Copy-On-Right并发容器还包括CopyOnWriteArray用来替代SynchronizedSet
整体架构
从整体架构上来说,CopyOnWriteArrayList 数据结构和 ArrayList 是一致的,底层是个数组,只不过 CopyOnWriteArrayList 在对数组进行操作的时候,基本会分四步走:
- 加锁
- 从原数组中拷贝出新数组
- 在新数组上进行操作,并把新数组赋值给数组容器
- 解锁。
适用场景
- 读操作快,写就算慢一点也太大问题
- 读操作多,写操作少
如:
黑名单,每日一次更新就够了
监听器,监听迭代操作次数远高于修改操作
读写规则
对比读写锁的规则:读读共享、读写互斥、写读互斥、写写互斥
CopyOnWriteArrayList的读写规则为:
- 读取不需要加锁(读读共享)
- 写入不会阻塞读取操作(读写共享、写读共享)
- 写入与写入之间需要同步等待(写写互斥)
特征
- 线程安全的,多线程环境下可以直接使用,无需加锁;
- 通过锁 + 数组拷贝 + volatile 关键字保证了线程安全;
- 每次数组操作,都会把数组拷贝一份出来,在新数组上进行操作,操作成功之后再赋值回去
- 修改过程中:读取的数据是原来的数据,不存在线程安全;迭代的数据是迭代器生成时的数据,之后的修改不可见
缺点
- 数据不一致问题:CopyOnWrite容器只能保证数据最终一致性,不能保证数据实时一致性。所以希望写入数据马上看到就不要用CopyOnWrite容器
- 内存占用问题:因为CopyOnWrite的写是复制机制,所以进行写操作时,内存会同时有两个对象,如果对象较大,会占用较大内存
案例演示
案例一:
演示下使用ArrayList和CopyOnWriteArrayList迭代时进行修改操作
首先使用ArrayList
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()){
System.out.println("list is: "+list);
String next = iterator.next();
System.out.println("cur is: "+next);
if (next.equals("2")){
list.remove("5");
}
if (next.equals("3")){
list.add("find 3");
}
}
}
运行抛出异常
list is: [1, 2, 3, 4, 5]
cur is: 1
list is: [1, 2, 3, 4, 5]
cur is: 2
list is: [1, 2, 3, 4]
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at collection.CopyOnWriteArrayListDemo.main(CopyOnWriteArrayListDemo.java:27)
将ArrayList修改为CopyOnWriteArrayList
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
运行结果如图
list is: [1, 2, 3, 4, 5]
cur is: 1
list is: [1, 2, 3, 4, 5]
cur is: 2
list is: [1, 2, 3, 4]
cur is: 3
list is: [1, 2, 3, 4, find 3]
cur is: 4
list is: [1, 2, 3, 4, find 3]
cur is: 5
可以很惊奇的发现,最后一个是cur is:5
而不是预想的find 3
CopyOnWriteArrayList在迭代的时候如果有修改是不可见的,会保持开始迭代时的内容
案例二:演示迭代时迭代数据的确定时间
创建一个迭代器之后对容器进行修改,然后再创建一个迭代器,打印两个迭代器的数据
public static void main(String[] args) {
CopyOnWriteArrayList<Integer> list =
new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3});
Iterator<Integer> iterator1 = list.iterator();
list.add(4);
Iterator<Integer> iterator2 = list.iterator();
iterator1.forEachRemaining(System.out::print);
System.out.println();
iterator2.forEachRemaining(System.out::print);
}
打印结果如下:
123
1234
从结果可以得知,迭代器的数据在迭代器生成时就已经确定了,对生成迭代器之后的数据修改时不可见的
源码分析
1. 新增
新增包括新增到数组尾部,新增到数组某一个索引位置,批量新增等等,操作的思路都是那四步:加锁、拷贝、操作后赋值、解锁
新增到数组尾部的源码:
// 添加元素到数组尾部
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 得到所有的原数组
Object[] elements = getArray();
int len = elements.length;
// 拷贝到新数组里面,新数组的长度是 + 1 的,因为新增会多一个元素
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 在新数组中进行赋值,新元素直接放在数组的尾部
newElements[len] = e;
// 替换掉原来的数组
setArray(newElements);
return true;
// finally 里面释放锁,保证即使 try 发生了异常,仍然能够释放锁
} finally {
lock.unlock();
}
}
从源码中可以看出,整个add过程都在持有锁的状态下进行的,通过锁保证了只能有一个线程同时对一个数组进行add操作
add过程中会创建一个老数组长度+1的新数组,然后把老数组的值拷贝到新数组内,再添加值到尾部
question:为什么加锁了不在原数组直接操作呢?
- volatile关键字修饰的是数组的引用,如果只是修改数组内元素的值是无法触发可见性的,必须修改数组的地址,也就是对数组进行重新赋值才能使修改内容对其他线程可见
- 在新数组上进行拷贝,对老数组没有影响,保证了修改过程中,其他线程可以访问原数据
新增到指定下标位置的源码:
// len:数组的长度、index:插入的位置
int numMoved = len - index;
// 如果要插入的位置正好等于数组的末尾,直接拷贝数组即可
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
// 如果要插入的位置在数组的中间,就需要拷贝 2 次
// 第一次从 0 拷贝到 index。
// 第二次从 index+1 拷贝到末尾。
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1,
numMoved);
}
// index 索引位置的值是空的,直接赋值即可。
newElements[index] = element;
// 把新数组的值赋值给数组的容器中
setArray(newElements);
从源码可以看出,如果插入的位置是数组末尾,只需要拷贝一次。当插入的位置是中间,就会把原数组分成两部分进行复制,然后添加新值到新数组
2. 删除
指定数组索引位置删除的源码:
// 删除某个索引位置的数据
public E remove(int index) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 先得到老值
E oldValue = get(elements, index);
int numMoved = len - index - 1;
// 如果要删除的数据正好是数组的尾部,直接删除
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
// 如果删除的数据在数组的中间,分三步走
// 1. 设置新数组的长度减一,因为是减少一个元素
// 2. 从 0 拷贝到数组新位置
// 3. 从新位置拷贝到数组尾部
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
步骤分为三步:
-
加锁
-
判断索引位置
- 如果删除在数组尾部,直接复制长度为
len-1
的数组返回 - 如果删除数据在中间,创建长度为
len-1
的新数组,分两段复制到新数组
- 如果删除在数组尾部,直接复制长度为
-
解锁
批量删除的源码:
// 批量删除包含在 c 中的元素
public boolean removeAll(Collection<?> c) {
if (c == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 说明数组有值,数组无值直接返回 false
if (len != 0) {
// newlen 表示新数组的索引位置,新数组中存在不包含在 c 中的元素
int newlen = 0;
Object[] temp = new Object[len];
// 循环,把不包含在 c 里面的元素,放到新数组中
for (int i = 0; i < len; ++i) {
Object element = elements[i];
// 不包含在 c 中的元素,从 0 开始放到新数组中
if (!c.contains(element))
temp[newlen++] = element;
}
// 拷贝新数组,变相的删除了不包含在 c 中的元素
if (newlen != len) {
setArray(Arrays.copyOf(temp, newlen));
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
批量删除并不会对数组中的数据挨个删除,而是对老数组的值进行遍历,如果值在传入集合c
中存在,就放入新数组,最后返回的新数组就是不包含待删除数组的数组了
3.indexOf
indexOf正向搜索源码:
// o:我们需要搜索的元素
// elements:我们搜索的目标数组
// index:搜索的开始位置
// fence:搜索的结束位置
private static int indexOf(Object o, Object[] elements,
int index, int fence) {
// 支持对 null 的搜索
if (o == null) {
for (int i = index; i < fence; i++)
// 找到第一个 null 值,返回下标索引的位置
if (elements[i] == null)
return i;
} else {
// 通过 equals 方法来判断元素是否相等
// 如果相等,返回元素的下标位置
for (int i = index; i < fence; i++)
if (o.equals(elements[i]))
return i;
}
return -1;
}
indexOf方法主要用于查找元素在数组中第一次出现的下标位置,如果元素不存在就返回-1,并且支持对null值的搜索
步骤:
- 判断是否为空值,如果为空值遍历数组判断是否为空,找到第一个null返回下标
- 如果不为空值,遍历数组,比较值是否相同,找到第一个值相同的返回下标
- 如果找不到就返回-1
4. 迭代
CopyOnWriteArrayList 在迭代过程中,即使原数组的值发生了改变也不会抛出ConcurrentModificationException 异常,因为每次改动都会生成新数组,不会影响老数组
CopyOnWriteArrayList 迭代持有的是老数组的引用,而 CopyOnWriteArrayList 每次的数据变动,都会产生新的数组,对老数组的值不会产生影响,所以迭代也可以正常进行。
面试题
- CopyOnWriteArrayList 与ArrayList相比有哪些异同?
- 相同点:
- 底层数据结构相同,都为数组
- 提供的API基本相同,方便使用
- 不同点:
- CopyOnWriteArrayList 线程安全,多线程环境下使用无需加锁
- CopyOnWriteArrayList 通过哪些手段实现了线程安全?
- 数组容器被volatile关键字修饰,保证了数组内存地址修改后,修改内容其他线程可见
- 对数组的所有修改操作都进行了加锁,并且所有的修改操作都是使用的同一把锁,保证同一时刻只能有一个线程进行修改
- 修改过程对原数组进行了赋值,修改操作在新数组上,修改过程中,不会对原数组造成任何影响
- 在add方法中,对数组进行加锁后,线程安全了为什么还要对老数组进行拷贝
- volatile修饰的的数组这个对象地址,如果不拷贝修改内存地址,就无法触发volatile的可见性效果,其他线程就无法感知修改
- 对老数组进行拷贝会有性能损耗,使用中有哪些注意点?
- 批量操作时尽量使用addAll、removeAll方法,而不要循环的使用add、remove方法,使用*All方法时只进行一次拷贝,而循环的调用单体方法时会拷贝调用的次数,当调用次数较多时,对性能影响就非常明显
- 为什么 CopyOnWriteArrayList 迭代过程中,数组结构变动,不会抛出ConcurrentModificationException ?
- CopyOnWriteArrayList 每次修改操作时都会产生新数组,而迭代时,持有的是老数组的引用,所以对数组结构变动不可见,就不会抛出异常了
- 在list的中间插入一个数据,ArrayList和CopyOnWriteArrayList 分别会拷贝几次数组
- ArrayList只会拷贝一次,然后把插入位置及后面的数据都往后移一位
- CopyOnWriteArrayList 拷贝两次将数据分为两部分,分别拷入到新数组,然后再空的位置添加新数据
concurrentHashMap
常见问题
为什么需要ConcurrentHashMap?
Hashtable
线程安全,但各种方法操作时都直接使用了synchronized
锁住了整个结构
HashMap
虽然效率高,但是在多线程环境下不安全
需要一个中和了Hashtable
和HashMap
的类在多线程下高效的使用
ConcurrentHashMap的构造方法有哪些
- 无参数的
- 传入初始化容量的
- 传入map的
- 传入初始化容量和阈值的
- 传入初始化容量、阈值和并发级别的
//无参构造函数
public ConcurrentHashMap() {
}
//可传初始容器大小的构造函数
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
//可传入map的构造函数
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//可设置阈值和初始容量
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//可设置初始容量和阈值和并发级别
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
ConcurrentHashMap使用什么技术来保证线程安全?
- JDK1.7中采用
segment + ReentrantLock
实现 - JDK1.8中采用
node + CAS + synchronized
实现
JDK1.7中
- concurrentHashMap最外层是多个
segment
,每个segment
的底层实现和HashMap
类似,任然是数组加链表组成的拉链法 - 每个
segment
单独上一个ReentrantLock
锁,每个segment
之间互不影响,提高了并发效率 - concurrentHashMap默认有16个
Segment
,所以最多可以同时支持16个线程并发写,其默认值可以在初始化时设置,一旦初始化完成不可以扩容
JDK1.8中
错误的使用concurrentHashMap依然会造成线程安全问题
案例:
构建两个线程,对concurrentHashMap进行读取,修改,重新写入的操作
/**
* 〈组合操作不能保证concurrentHashMap线程安全〉
*
* @author Chkl
* @create 2020/3/28
* @since 1.0.0
*/
public class OptionNotSafe implements Runnable {
private static ConcurrentHashMap<String, Integer> scores
= new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
scores.put("张三", 0);
Thread thread1 = new Thread(new OptionNotSafe());
Thread thread2 = new Thread(new OptionNotSafe());
thread1.start();;
thread2.start();
thread1.join();
thread2.join();
System.out.println(scores.get("张三"));
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
Integer score = scores.get("张三");
int newScore = score + 1;
scores.put("张三", newScore);
}
}
}
如果是线程安全的,预期结果应该是2000,而实际运行结果不等于2000
虽然concurrentHashMap可以保证并发下的单个操作是安全的,但是不能保证组合操作的安全,这样使用是错误的用法
正确的用法
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
while (true) {
Integer score = scores.get("张三");
int newScore = score + 1;
boolean b = scores.replace("张三", score, newScore);
if (b) break;
}
}
}
concurrentHashMap针对这种情况有相应的解决措施,调用replace方法,参数列表为key,oldVal,newVal
,进行修改时会判断值是否为oldVal,如果不是则修改失败返回false,所以需要不断的进行重试,如果修改成功再退出。这里应用的就是CAS的思想
concurrentHashMap提供 的组合操作方法:
- replace(key,oldVal,newVal)
- putIfAbsent(key,value)
等价代码为:if (!map.containsKey(key)) return map.put(key,value); else { return map.get(key); }
源码分析
put value的过程
- 如果数组为空,进行初始化
- 计算当前槽点有无值,如果没有值就采用
CAS
创建,失败后自旋直到创建成功, - 如果槽点是转移节点(正在扩容),自旋等待扩容完成后新增
- 新增的三种情况
- 如果槽点有值锁定槽点,其他线程不能操作
- 如果是链表,新增值到链表的尾部
- 如果是红黑树,使用红黑树新增方法新增
- 新增完成检查链表是否需要转换为红黑树
- 最后检查是否需要扩容
具体源码如下:
public V put(K key, V value) {
return this.putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 或 value 不允许为 null
if (key == null || value == null) throw new NullPointerException();
// 计算 key 的哈希码
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
// 1. 如果 table 数组为空,则进行初始化
if (tab == null || (n = tab.length) == 0) {
// 基于 CAS 策略初始化 table,初始化大小为 16
tab = this.initTable();
}
// 2. 否则,计算 hash 值对应的下标,获取 table 上对应下标的头结点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
/*
* table 对应下标的头结点为 null
* 基于 CAS 设置结点,如果成功则本次 put 操作完成,
* 如果失败则说明期间有并发操作,需要进入一轮新的循环
*/
if (casTabAt(tab, i, null, new Node<>(hash, key, value, null))) {
// 设置结点成功,put 操作完成
break;
}
}
// 3. 否则,如果 Map 正在执行扩容操作(MOVED 哈希值表示正在扩容),则帮助扩容
else if ((fh = f.hash) == MOVED) {
tab = this.helpTransfer(tab, f);
}
// 4. 否则,获取到 hash 值对应下标的头结点,且结点不为 null
else {
V oldVal = null;
synchronized (f) { // 加锁
if (tabAt(tab, i) == f) { // 再次校验头结点为 f
// 头结点的哈希值大于等于 0,说明是链表,如果是树的话应该是 -2
if (fh >= 0) {
binCount = 1;
for (Node<K, V> e = f; ; ++binCount) {
K ek;
// 如果是已经存在的 key,则在允许覆盖的前提下直接覆盖已有的值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) {
e.val = value;
}
break;
}
// 如果是不存在的 key,则直接在链表尾部插入一个新的结点
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<>(hash, key, value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
// 调用红黑树的方法获取到修改的结点,并插入或更新结点(如果允许)
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) {
p.val = value;
}
}
}
}
} // end synchronized
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) {
/*
* 结点数目大于等于 8,对链表执行转换操作
* - 如果 table 长度小于 64,则执行扩容
* - 如果 table 长度大于等于 64,则转换成红黑树
*/
this.treeifyBin(tab, i);
}
if (oldVal != null) {
return oldVal;
}
break;
}
}
}
// size 加 1
this.addCount(1L, binCount);
return null;
}
数组初始化的线程安全保证
- 通过自旋保证初始化一定能成功
- 通过CAS设置sizeCtl遍历值保证同时只有一个线程进行初始化
- CAS成功后再次判断数组是否完成初始化,如果未完成再次初始化
通过自旋+CAS+双中检查保证了数组初始化的线程安全
具体源码如下:
//初始化 table,通过对 sizeCtl 的变量赋值来保证数组只能被初始化一次
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//通过自旋保证初始化成功
while ((tab = table) == null || tab.length == 0) {
// 小于 0 代表有线程正在初始化,释放当前 CPU 的调度权,重新发起锁的竞争
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS 赋值保证当前只有一个线程在初始化,-1 代表当前只有一个线程能初始化
// 保证了数组的初始化的安全性
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 很有可能执行到这里的时候,table 已经不为空了,这里是双重 check
if ((tab = table) == null || tab.length == 0) {
// 进行初始化
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
新增槽点值时的线程安全保障
- 通过自旋保证一定能新增成功
- 当前槽点为空时,通过CAS新增
- 当前槽点有值时,锁住当前槽点(发生hash冲突了)
- 红黑树旋转时,锁住红黑树根节点,保证同一时刻当前红黑树只能被一个线程旋转
通过自旋 + CAS + synchronized保证了新增槽点值的线程安全
扩容时的线程安全保证
ConcurrentHashMap 的扩容时机和HashMap一致,都是在put方法的最后一步检查是否需要扩容,但是扩容的过程完全不同。
ConcurrentHashMap 的扩容方法叫做transfer
,实现思路如下
- 将老数组的值拷贝到扩容后的新数组上,从数组的队尾开始拷贝
- 拷贝数组的槽点时,先把原数组的槽点锁住,保证原数组槽点不能被操作,成功拷贝后将原数组这个槽点设置为转移节点
- 此时如果有新数据需要put到此槽点时,发现槽点为转移节点,就会自旋等待。所以扩容完成前数据不会发生改变
- 直到所有数组数据都拷贝到新数组时,把新数组赋值给数组容器,拷贝完成
关键源码如下:
// 扩容主要分 2 步,第一新建新的空数组,第二移动拷贝每个元素到新数组中去
// tab:原数组,nextTab:新数组
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 老数组的长度
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果新数组为空,初始化,大小为原数组的两倍,n << 1
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
// 新数组的长度
int nextn = nextTab.length;
// 代表转移节点,如果原数组上是转移节点,说明该节点正在被扩容
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 无限自旋,i 的值会从原数组的最大值开始,慢慢递减到 0
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// 结束循环的标志
if (--i >= bound || finishing)
advance = false;
// 已经拷贝完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 每次减少 i 的值
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// if 任意条件满足说明拷贝结束了
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 拷贝结束,直接赋值,因为每次拷贝完一个节点,都在原数组上放转移节点,所以拷贝完成的节点的数据一定不会再发生变化。
// 原数组发现是转移节点,是不会操作的,会一直等待转移节点消失之后在进行操作。
// 也就是说数组节点一旦被标记为转移节点,是不会再发生任何变动的,所以不会有任何线程安全的问题
// 所以此处直接赋值,没有任何问题。
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
// 进行节点的拷贝
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 如果节点只有单个数据,直接拷贝,如果是链表,循环多次组成链表拷贝
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在新数组位置上放置拷贝的值
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 在老数组位置上放上 ForwardingNode 节点
// put 时,发现是 ForwardingNode 节点,就不会再动这个节点的数据了
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树的拷贝
else if (f instanceof TreeBin) {
// 红黑树的拷贝工作,同 HashMap 的内容,代码忽略
…………
// 在老数组位置上放上 ForwardingNode 节点
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
get value的过程
- 计算hash值获取数组下标
- 找到对应的位置,根据情况取值
- 直接取值
- 红黑树取值
- 遍历链表取值
- 返回找到的结果
具体源码如下:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hashcode
int h = spread(key.hashCode());
//不是空的数组 && 并且当前索引的槽点数据不是空的
//否则该key对应的值不存在,返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//槽点第一个值和key相等,直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果是红黑树或者转移节点,使用对应的find方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//如果是链表,遍历查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
面试题
- ConcurrentHashMap 和 HashMap 的相同点和不同点
-
相同之处:
- 都是数组 +链表+红黑树的数据结构(JDK8之后),所以基本操作的思想一致
- 都实现了Map接口,继承了AbstractMap 操作类,所以方法大都相似,可以相互切换
-
不同之处:
- ConcurrentHashMap 是线程安全的,多线程环境下,无需加锁直接使用
- ConcurrentHashMap 多了转移节点,主要用户保证扩容时的线程安全
- ConcurrentHashMap 通过哪些手段保证线程安全
- 储存Map数据的数组时被volatile关键字修饰,一旦被修改,其他线程就可见修改。因为是数组存储,所以只有改变数组内存值是才会触发volatile的可见性
- 如果put操作时hash计算出的槽点内没有值,采用自旋+CAS保证put一定成功,且不会覆盖其他线程put的值
- 如果put操作时节点正在扩容,即发现槽点为转移节点,会等待扩容完成后再进行put操作,保证扩容时老数组不会变化
- 对槽点进行操作时会锁住槽点,保证只有当前线程能对槽点上的链表或红黑树进行操作
- 红黑树旋转时会锁住根节点,保证旋转时线程安全
- 描述一下 CAS 算法在 ConcurrentHashMap 中的应用
- CAS是一种乐观锁,在执行操作时会判断内存中的值是否和准备修改前获取的值相同,如果相同,把新值赋值给对象,否则赋值失败,整个过程都是原子性操作,无线程安全问题
- ConcurrentHashMap 的put操作是结合自旋用到了CAS,如果hash计算出的位置的槽点值为空,就采用CAS+自旋进行赋值,如果赋值是检查值为空,就赋值,如果不为空说明有其他线程先赋值了,放弃本次操作,进入下一轮循环
- ConcurrentHashMap 是如何发现当前槽点正在扩容的?
- ConcurrentHashMap 新增了一个节点类型,叫做转移节点,当我们发现当前槽点是转移节点时(转移节点的 hash 值是 -1),即表示 Map 正在进行扩容
- 发现槽点正在扩容时,put 操作会怎么办?
- 无限 for 循环,或者走到扩容方法中去,帮助扩容,一直等待扩容完成之后,再执行 put 操作
- ConcurrentHashMap 和HashMap的扩容有什么不同?
- HashMap的扩容是创建一个新数组,将值直接放入新数组中,JDK7采用头链接法,会出现死循环,JDK8采用尾链接法,不会造成死循环
- ConcurrentHashMap 扩容是从数组队尾开始拷贝,拷贝槽点时会锁住槽点,拷贝完成后将槽点设置为转移节点。所以槽点拷贝完成后将新数组赋值给容器
- ConcurrentHashMap 在 Java 7 和 8 中关于线程安全的做法有啥不同?
- 两者实现差距很大
- Java7中采用分段锁,默认为16个segment,操作时最多能满足16个的并发
- Java8中采用自旋锁+CAS+synchronized,锁住的是某个槽点,并发效率高
- JDK1.7和JDK1.8中的concurrentHashMap的区别
- 数据结构不同了,JDK1.8几乎全部重写了concurrentHashMap的结构,不再使用segment,而是让hash的每一个节点都是一个node,都是独立的,提高了并发度
- Hash碰撞的处理不同了,变更同HashMap,除了拉链法之外还增加红黑树
- 保证并发安全的方式不同了,1.7中通过分段锁实现,而1.8中采用CAS+synchronized
- 查询复杂度不同,1.7中链表时间可能很长查询时间
0(n)
,1.8中超过阈值转为红黑树查询时间变为n(logn)
- 为什么超过冲突超过8才将链表转为红黑树而不直接用红黑树
- 默认使用链表, 链表占用的内存更小
- 正常情况下,想要达到冲突为8的几率非常小(泊松分布计算为千万分之几的概率),如果真的发生了转为红黑树可以保证极端情况下的效率
阻塞队列
常见问题
为什么要使用队列?
- 使用了队列可以在线程间传递数据:生产者消费者模式,银行转账
- 队列可以将线程安全问题交给队列解决
阻塞队列
- 什么是阻塞队列
- 具有阻塞功能的队列
- 通常,一端给生产者放数据,另一个端给消费者来拿数据。
- 阻塞队列是线程安全的队列
- 阻塞队列是线程池的重要组成部分
- 主要方法
- take,put:
- put放数据,如果队列满了put阻塞住,直到有空闲空间。
- take拿数据 , 如果队列空了take阻塞住 ,直到有数据
- add、remove、element
- add放数据,类似于put,如果队列满了抛出异常
- remove拿数据,类似于take,如果队列空了抛出异常
- element返回队列头元素,如果队列为空抛出异常
- offer、poll、peek
- offer放数据,类似于put,如果队列满了返回false
- poll拿数据,类似于take,如果队列空了返回null
- peek返回队列头元素,如果队列为空返回null
- ArrayBlockingQueue
- 有界
- 能指定容量
- 可以指定公平或者非公平
- LinkedBlockingQueue
- 无界
- 容量Integer.MAX_VALUE
- 内部结构:
- Node
- 两把锁 :takeLock、putLock
- PriorityBlockingQueue
- 支持优先级
- 自然排序(不是先进先出)
- 无界队列(不够了可以扩容)
- PriorityQueue的线程安全版本(传入内容必须是可比较的,可重写比较规则)
- SynchronousQueue
- 容量为0,不需要存储,直接传递,效率很高
- 无peek等函数,因为容量为0不存在头结点概念
- 极好的直接传递的并发数据结构
- SynchronousQueue是newCachedThreadPool的阻塞队列
- DelayQueue
- 延迟队列,根据延迟时间排序
- 元素需要实现Delayed接口,规定排序顺序
- 无界队列
非阻塞队列
- ConcurrentLinkedQueue
- JUC中只有这一种非阻塞队列
- 链表作为数据结构
- 采用CAS实现线程安全
- 适合对性能要求高的并发场景
如何选择适合自己的队列
- 是否需要边界
- 是否需要容量
- 吞吐量
源码分析
1. LinkedBlockingQueue
类图:
从类图可以看出,直接继承了AbstractQueue类 并实现了BlockingQueue 接口
那么LinkedBlockingQueue应该具有集合的相关方法和Queue接口相关的方法
其主要方法可以总结为:
内部构成源码:
// 链表结构 begin
//链表的元素
static class Node<E> {
E item;
//当前元素的下一个,为空表示当前节点是最后一个
Node<E> next;
Node(E x) { item = x; }
}
//链表的容量,默认 Integer.MAX_VALUE
private final int capacity;
//链表已有元素大小,使用 AtomicInteger,所以是线程安全的
private final AtomicInteger count = new AtomicInteger();
//链表头
transient Node<E> head;
//链表尾
private transient Node<E> last;
// 链表结构 end
// 锁 begin
//take 时的锁
private final ReentrantLock takeLock = new ReentrantLock();
// take 的条件队列,condition 可以简单理解为基于 ASQ 同步机制建立的条件队列
private final Condition notEmpty = takeLock.newCondition();
// put 时的锁,设计两把锁的目的,主要为了 take 和 put 可以同时进行
private final ReentrantLock putLock = new ReentrantLock();
// put 的条件队列
private final Condition notFull = putLock.newCondition();
// 锁 end
// 迭代器
// 实现了自己的迭代器
private class Itr implements Iterator<E> {
………………
}
内部构成主要分为三个部分:链表 + 两个锁 + 迭代器
其中两把锁为take锁和put锁,为了保证线程安全设计了两把锁,保证了take和put可以同时进行,互不影响
构造方法源码:
// 不指定容量,默认 Integer 的最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 指定链表容量大小,链表头尾相等,节点值(item)都是 null
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 已有集合数据进行初始化
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
// 集合内的元素不能为空
if (e == null)
throw new NullPointerException();
// capacity 代表链表的大小,在这里是 Integer 的最大值
// 如果集合类的大小大于 Integer 的最大值,就会报错
// 其实这个判断完全可以放在 for 循环外面,这样可以减少 Integer 的最大值次循环(最坏情况)
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
构造方法有三种:
- 指定链表容量大小
- 不指定链表容量大小(默认Integer.MAX_VALUE)
- 对已有集合数据进行初始化
新增(入队)源码:
入队有put、offer、add三种方法,都差不多,以put为例
// 把e新增到队列的尾部。
// 如果有可以新增的空间的话,直接新增成功,否则当前线程陷入等待
public void put(E e) throws InterruptedException {
// e 为空,抛出异常
if (e == null) throw new NullPointerException();
// 预先设置 c 为 -1,约定负数为新增失败
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 设置可中断锁
putLock.lockInterruptibly();
try {
// 队列满了
// 当前线程阻塞,等待其他线程的唤醒(其他线程 take 成功后就会唤醒此处被阻塞的线程)
while (count.get() == capacity) {
// await 无限等待
notFull.await();
}
// 队列没有满,直接新增到队列的尾部
enqueue(node);
// 新增计数赋值,注意这里 getAndIncrement 返回的是旧值
// 这里的 c 是比真实的 count 小 1 的
c = count.getAndIncrement();
// 如果链表现在的大小 小于链表的容量,说明队列未满
// 可以尝试唤醒一个 put 的等待线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// c==0,代表队列里面有一个元素
// 会尝试唤醒一个take的等待线程
if (c == 0)
signalNotEmpty();
}
// 入队,把新元素放到队尾
private void enqueue(Node<E> node) {
last = last.next = node;
}
步骤:
- 上一个可中断put锁
- 如果队列不为满追加到链表尾部
- 如果队列满了,线程阻塞
- 新增数据完成后
- 如果队列不为满,唤醒一个put等待线程
- 如果对列有一个元素时,唤醒一个take等待线程
offer与put只有一点点不同,会自旋尝试,超时了会中断返回false
删除(出队)源码:
以take为例说明删除的原理
// 阻塞拿数据
public E take() throws InterruptedException {
E x;
// 默认负数,代表失败
int c = -1;
// count 代表当前链表数据的真实大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 空队列时,阻塞,等待其他线程唤醒
while (count.get() == 0) {
notEmpty.await();
}
// 非空队列,从队列的头部拿一个出来
x = dequeue();
// 减一计算,注意 getAndDecrement 返回的值是旧值
// c 比真实的 count 大1
c = count.getAndDecrement();
// 如果队列里面有值,从 take 的等待线程里面唤醒一个。
// 意思是队列里面有值啦,唤醒之前被阻塞的线程
if (c > 1)
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 如果队列空闲还剩下一个,尝试从 put 的等待线程中唤醒一个
if (c == capacity)
signalNotFull();
return x;
}
// 队头中取数据
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;// 头节点指向 null,删除
return x;
}
步骤:
- 上一个可中断take锁
- 如果队列不为空从队列头部取出节点
- 如果队列为空,线程阻塞
- 删除数据完成后
- 如果队列不为空,唤醒一个take等待线程
- 如果对列只剩下一个空闲了,唤醒一个put等待线程
查看队首元素源码:
以peek为例
// 查看并不删除元素,如果队列为空,返回 null
public E peek() {
// count 代表队列实际大小,队列为空,直接返回 null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 拿到队列头
Node<E> first = head.next;
// 判断队列头是否为空,并返回
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
步骤:
- 加可中断take锁
- 获取队首值
- 关闭锁
**2. ArrayBlockingQueue **
数据结构:
// 队列存放在 object 的数组里面
// 数组大小必须在初始化的时候手动设置,没有默认大小
final Object[] items;
// 下次拿数据的时候的索引位置
int takeIndex;
// 下次放数据的索引位置
int putIndex;
// 当前已有元素的大小
int count;
// 可重入的锁
final ReentrantLock lock;
// take的队列
private final Condition notEmpty;
// put的队列
private final Condition notFull;
其中有两个很重要的变量,takeIndex和putIndex,分别表示下次拿数据和放数据的索引位置,只要维护好这两个指针,每次操作就不需要进行计算
初始化:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
// 队列不为空 Condition,在 put 成功时使用
notEmpty = lock.newCondition();
// 队列不满 Condition,在 take 成功时使用
notFull = lock.newCondition();
}
初始化时有两个参数:数组的大小和是否公平
如果是公平锁,锁竞争时就会按先来后到顺序
如果是不公平锁,锁竞争是随机的
新增
// 新增,如果队列满,无限阻塞
public void put(E e) throws InterruptedException {
// 元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列如果是满的,就无限等待
// 一直等待队列中有数据被拿走时,自己被唤醒
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1; 同一时刻只能一个线程进行操作此方法
// assert items[putIndex] == null;
final Object[] items = this.items;
// putIndex 为本次插入的位置
items[putIndex] = x;
// ++ putIndex 计算下次插入的位置
// 如果下次插入的位置,正好等于队尾,下次插入就从 0 开始
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒因为队列空导致的等待线程
notEmpty.signal();
}
新增是存在两种情况
- 本次新增数据在中间,可以直接新增,然后更新下一次新增的putindex
- 本次新增数据在数组尾部,更新下次新增的putindex为数组头
拿数据:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空,无限等待
// 直到队列中有数据被 put 后,自己被唤醒
while (count == 0)
notEmpty.await();
// 从队列中拿数据
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
// takeIndex 代表本次拿数据的位置,是上一次拿数据时计算好的
E x = (E) items[takeIndex];
// 帮助 gc
items[takeIndex] = null;
// ++ takeIndex 计算下次拿数据的位置
// 如果正好等于队尾的话,下次就从 0 开始拿数据
if (++takeIndex == items.length)
takeIndex = 0;
// 队列实际大小减 1
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒被队列满所阻塞的线程
notFull.signal();
return x;
}
从源码可以看出,每次拿数据的位置是takeIndex的位置,拿到数据后更新takeIndex的位置,如果拿的数据在中间部分,takeIndex+1,如果位于数组尾部,将takeIndex指针指向数组头部
面试题
- 说一说你对队列的理解,队列和集合的区别
- 对队列的理解
- 队列本身也是一个容器,底层可以采用不同的数据结构,如LinkedBlockingQueue 底层是链表,ArrayBlockingQueue 底层是数组
- 有的队列具有存储功能,有的队列不具有存储功能,如SynchronousQueue不具有存储空间
- 队列可以使入队出队两端解耦合
- 队列能提供阻塞功能,能实现线程安全
- 队列和集合的区别
- 虽然底层数据结构相似,但是实现的接口不用,提供的API不同
- 队列提供了阻塞功能,能实现生产者消费者关系
- 队列能实现数据解耦合
- 哪些队列具有阻塞的功能,大概是如何阻塞的?
- LinkedBlockingQueue 和ArrayBlockingQueue 是阻塞队列,前者容量为Integer.MAX_VALUE,后者的可以指定。进行put和take操作时会锁住队列
- SynchronousQueue 是同步队列,本身不能存储数据,如果生成了数据没有被消费就会阻塞住,如果消费了没有生成的也会阻塞住
- LinkedBlockingQueue 和 ArrayBlockingQueue 的区别?
- 底层实现不同,LinkedBlockingQueue 为链表,ArrayBlockingQueue 为数组
- 数据容量不同,LinkedBlockingQueue 默认Integer.MAX_VALUE,ArrayBlockingQueue 必须手动指定
- LinkedBlockingQueue 使用了take锁和put锁,ArrayBlockingQueue 都使用的同一个锁
- 往队列里 put 数据和take数据是线程安全的么?为什么?
- 是线程安全的
- put操作之前,队列会加锁,put操作完之后才释放
- take操作之前,队列会加锁,take操作完之后才释放
- take和put方法是不是同一时间只能运行其中一个。
- 这要视底层实现决定
- LinkedBlockingQueue 的take 和put有单独的锁,可以同时进行
- ArrayBlockingQueue take和put是同一个锁,同一时刻只能运行一个方法
- 工作中经常使用队列的 put、take 方法有什么危害,如何避免?
- put和take 如果发生阻塞之后会永久等待,直到满足条件为止
- 大流量时采用offer和poll来代替,只要设置好超时时间会自动返回false,避免被阻塞
- DelayQueue 对元素有什么要求么,我把 String 放到队列中去可以么?
- DelayQueue 要求元素必须实现Delayed接口,String类没有实现不能放入
- DelayQueue 如何让快过期的元素先执行的?
- DelayQueue 实现了Comparable 接口并重写了排序方法,过期时间与当前时间差小的在前面被执行
- 如何查看 SynchronousQueue 队列的大小?
- 此题是个陷进题,题目首先设定了 SynchronousQueue 是可以查看大小的,实际上 SynchronousQueue 本身是没有容量的,所以也无法查看其容量的大小,其内部的 size 方法都是写死的返回 0。
- SynchronousQueue 底层有几种数据结构,两者有何不同?
- 有两种,分别是队列和堆栈
- 队列维护了先入先出的顺序,是公平的,而堆栈则是先入后出的,是不公平的
- 假设 SynchronousQueue 底层使用的是堆栈,线程 1 执行 take 操作阻塞住了,然后有线程 2 执行 put 操作,问此时线程 2 是如何把 put 的数据传递给 take 的?
- 线程1倍阻塞住了,堆栈头是线程1,而线程2执行put操作,会把put的数据赋值给堆栈头的match属性,并唤醒线程1,线程1醒了之后获取堆栈头中的数据
- 如果想使用固定大小的队列,有几种队列可以选择,有何不同?
- 可以使用 LinkedBlockingQueue 和 ArrayBlockingQueue 两种队列。
- 前者是链表,后者是数组,链表新增时,只要建立起新增数据和链尾数据之间的关联即可,数组新增时,需要考虑到索引的位置(takeIndex 和 putIndex 分别记录着下次拿数据、放数据的索引位置),如果增加到了数组最后一个位置,下次就要重头开始新增
- ArrayBlockingQueue 可以动态扩容么?用到数组最后一个位置时怎么办?
- ArrayBlockingQueue 虽然底层是数组实现,但是不支持动态扩容
- 如果使用到了数组尾部,下一次操作会指向数组头
- ArrayBlockingQueue 的take 和 put 都是怎么找到索引位置的?是利用 hash 算法计算得到的么?
- ArrayBlockingQueue 维护了两个指针:takeIndex和putIndex,分别标识下一次相关操作的位置,不需要通过hash算法获取
参考:
慕课网《面试官系统精讲Java源码及大厂真题》
极客时间《Java核心技术面试精讲》
慕课网《玩转Java并发工具,精通JUC,成为并发多面手》
更多Java面试复习笔记和总结可访问我的面试复习专栏《Java面试复习笔记》,或者访问我另一篇博客《Java面试核心知识点汇总》查看目录和直达链接