1、JDK7中数据结构
ConcurrentHashMap是由Segment数组与HashEntry数组构成。每个ConcurrentHashMap持有一个Segement数组。而每个Segment对象由一个HashEntry数组构成。HashEntr数组其实就是一个小的哈希表。如果研究过HashMap的内部结构,你就应该知道HashMap内部是数组加链表的数据结构,Segment正如HashMap的结构类似,可以说每段Segment就是一个小的HashMap。
这两张图意思是一样的,那个你更理解就理解那个。
2、形象理解 ConcurrentHashMap
把线程比作人,把segment数组比作银行者联盟,那么单个的segment就是一个银行。
现在有两个人,比如盖伦和亚索一起去银行存他们的大宝剑,这个「银行者联盟」一顿操作,然后对盖伦说,1号银行现在没人,你可以去那存,不用排队,然后盖伦就去1号银行存他的大宝剑,1号银行把盖伦接进门,马上拉闸,一顿操作,然后把盖伦的大宝剑放在第x行第x个保险箱,等盖伦办妥离开后,再开闸;同样「银行者联盟」对亚索说,2号银行现在没人,你可以去那存,不用排队,然后亚索去2号银行存他的大宝剑,2号银行把亚索接进门,马上拉闸,一顿操作把亚索的大宝剑放在第x行第x号保险箱,等亚索离开后再开闸,此时不管盖伦和亚索在各自银行里面待多久都不会影响到彼此,不用担心自己的大宝剑被人偷换了。
从这个例子中我们可以知道,每当一个线程占用锁访问一个Segment时,不会影响其他的Segment,就是说如果容量为16,那么他的并发度就是16,可以同时允许16个线程操作16个Segment,而且还是线程安全的。
3、核心源码分析
构造函数
看下面的源码,在这个构造函数中,最重要的是初始化了内部的Segment[]数组Segments与创建Segment对象。
// create segments and segments[0]
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//省略其它健壮性判断
//创建第一段Segment
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
//创建Segment[]数组Segments
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
//赋值给ConcurrentHashMap内部的对象
this.segments = ss;
}
Segment 的结构
Segment可以视为一个小的HashMap,但是因为继承了ReentrantLock,所以可以保证线程安全。
HashEntry可以视为HashMap中的节点,用于封装键值对。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
//加载因子
final float loadFactor;
//阙值:达到多少个元素的时候需要扩容
transient int threshold;
//内部的哈希表,节点就是HashEntry
transient volatile HashEntry<K,V>[] table;
//添加键值对到内部数组+链表中
final V put(K key, int hash, V value, boolean onlyIfAbsent){..};
}
HashEntry结构
HashEntry是用来存储键值对的,类似与HashMap内部的Entry节点。
但是不同点是,他使用volatile修饰了他的数据value还有下一个节点next。
关于volatile的讲解在此篇博客。
static final class HashEntry<K,V> {
//此节点的hash值
final int hash;
//此节点的键
final K key;
//此节点的值,使用volatile修饰,保证立马对其它线程可见
volatile V value;
//因为它是一个链表数据结构,所以要保存下一个节点的引用,volatile保证立马对其它线程可见。
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
//设置下一个节点的引用
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
//省略其它
}
在这里稍微说一下:
volatile
- 可见性。即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
- 禁止进行指令重排。(实现有序性)
- 不保证原子性。(i++这种操作不能保证原子性)
put方法,添加键值对,加锁。
public V put(K key, V value) {
//创建一个Segment引用
Segment<K,V> s;
//如果值为空,则抛异常
if (value == null)
throw new NullPointerException();
//第一次计算hash值:确定数据在哪个具体的Segment中
int hash = hash(key);
//j索引:指在哪个具体的Segment中。即数据应该存放在Segment[j]段中
int j = (hash >>> segmentShift) & segmentMask;
//第一次调用的时候,创建指定的Segment段,以及Segment对应的HashEntry[]数组
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//若指定Segment段不存在,则创建它
s = ensureSegment(j);
//将数据存放到指定段中
return s.put(key, hash, value, false);
}
//嵌套类Segment的put方法,相当于小HashMap的put方法
//static class Segment extends ReentrantLock
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//获取同步锁
HashEntry<K,V> node = tryLock() ? null :
//如果key没有找到,则返回一个新节点,并加锁。否则返回null
//如果没有获取到同步锁,那么也不能闲着。
//说是预热代码:循环获取同步锁,当然了有重试次数的限制,这个代码很有启发
//次数到了之后,还没有获取到同步锁,那么就进入阻塞状态,随时争夺同步锁
//如果没有,返回一个新的节点,省的下面再创建。
//既然返回了,也说明当前线程已经获取到同步锁了。
scanAndLockForPut(key, hash, value);
//旧值引用
V oldValue;
try {
//把这个Segment中的哈希表 HashEntry[] table 赋给 tab
HashEntry<K,V>[] tab = table;
//第二次计算hash值,获取这个key在这个Segment的哪个链表上
int index = (tab.length - 1) & hash;
//获取这个链表的头节点
HashEntry<K,V> first = entryAt(tab, index);
//从头节点开始,循环遍历
for (HashEntry<K,V> e = first;;) {
//从头节点开始找,是不是有相同key的键值对
if (e != null) {
K k;
//如果有相同key的键值对
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
//那么就用新值替换旧值,并退出循环
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//如果没有找到,那么就继续遍历这个链表,直到到链表末尾,执行下面的else的代码
e = e.next;
}
//如果遍历完了整个链表都没有发现有相同的key的元素
//那么put的目的就是把新这个新元素加入的concurrentHashMap中
//效果是:创建最新的节点到tab[index]链表的头位置,以前的节点挂在这个节点的next引用上
else {
//之前新创建的节点不为空
//那么就把这个新节点的下一个节点引用指向以前的头节点
if (node != null)
node.setNext(first);
else
//若新创建的节点为空,那么就新创建一个节点
node = new HashEntry<K,V>(hash, key, value, first);
//个数+1
int c = count + 1;
//若个数超过限制那么就扩容,不然就直接插入到链表中
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//tab == HashEntry[] table
//在这段Hash表中的指定索引的插入最新的node
//此node的下一个节点是之前头节点
setEntryAt(tab, index, node);
++modCount;
count = c;
//新插入的节点没有旧值,那么就方法的返回值就是null
oldValue = null;
//退出循环
break;
}
}
} finally {
//释放同步锁,因为上述方法一直都在加锁的情况下进行
unlock();
}
return oldValue;
}
首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用scanAndLockForPut()
自旋获取锁。
- 尝试自旋获取锁。
- 如果重试次数达到了
MAX_SCAN_RETRIES
,则改为阻塞锁获取,保证能获取成功。
get方法,不加锁
get方法很简单。
1、确定键值对在哪个段。
2、确定键值对在哪个小的链表上 tab[index]。
3、遍历链表,找到指定的key。
说白了,就是比HashMap多一个确定段的操作。从代码上看,它是没有使用同步锁的,所以可以多个线程同时访问。另外值得一提的是,这个HashEntry内部的value属性,使用的是volatile修饰符,所以立即对其它所以线程可见。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
//计算此key的哈希值,确定在哪个段中
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//找到某个段,再确定这个元素在哪个tab[index]上,并确定头节点e
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
//循环遍历头节点是e这个链表,找到指定的key对应的值,并返回
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在查询的时候,还是得去遍历链表,会导致效率很低,这个跟1.7的HashMap是存在一样的问题,那么在1.8中就优化了。
4、JDK8的改进
数据结构的改变
抛弃了原来的Segment分段锁,采用了CAS+synchronized
来保证并发安全性。
跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next用volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。
ConcurrentHashMap 在jdk8的时候是怎么进行存取操作的。
再简单的看一下ConcurrentHashMap 在jdk8的时候是怎么进行存取操作的。
ConcurrentHashMap 在进行put操作的时候还是比较复杂的。大致可以分为以下几个步骤:
- 根据 key 计算出 hashcode 。
- 判断是否需要进行初始化。
- 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
- 如果当前位置的
hashcode == MOVED == -1
,则需要进行扩容。 - 如果都不满足,则利用 synchronized 锁写入数据。
- 如果数量大于
TREEIFY_THRESHOLD
则要转换为红黑树。
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1. 根据key算出hashcode
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2. 判断是否需要进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//3.即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋保证成功
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//4.如果当前位置的hashcode==MOVED==-1,则需要进行扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//5.如果都不满足,synchronized 锁写入数据
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//6. 如果数量大于TREEIFY_THRESHOLD,则要转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
ConcurrentHashMap的get操作
- 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
- 如果是红黑树那就按照树的方式获取值。
- 就不满足那就按照链表的方式遍历获取值。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//根据计算出来的hashcode 寻址,如果就在桶上那么直接返回值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果是红黑树那就按照树的方式获取值。
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//不满足那就按照链表的方式遍历获取值。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
5、涉及的其他问题
CAS是什么?
CAS是乐观锁的一种实现方式,是一种轻量级锁,JUC中很多工具类的实现都是基于CAS的。
关于具体的可以看这篇博客。
CAS 操作的流程如下图所示,线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。
这是一种乐观策略,认为并发操作并不总会发生。
CAS还是有一定的问题的,比如经典的ABA问题,详细讲解看此篇博客。
什么是ABA问题?
就是说来了一个线程把值改回了B,又来了一个线程把值又改回了A,对于这个时候判断的线程,就发现他的值还是A,所以他就不知道这个值到底有没有被人改过,其实很多场景如果只追求最后结果正确,这是没关系的。
但是实际过程中还是需要记录修改过程的,比如资金修改什么的,你每次修改的都应该有记录,方便回溯。
怎么解决ABA问题?
用版本号去保证就好了,就比如说,我在修改前去查询他原来的值的时候再带一个版本号,每次判断就连值和版本号一起判断,判断成功就给版本号加1。
当然还有别的方式,比如时间戳也可以,查询的时候把时间戳一起查出来,对的上才修改并且更新值的时候一起修改更新时间,这样也能保证,方法很多但是跟版本号都是异曲同工之妙。
参考文章: https://www.jianshu.com/p/d38886f98d12