一、ConcurrentHashMap
(1)为什么使用ConcurrentHashMap?
- HashMap是非线程安全的,在并发环境中可能会引起死循环
- HashTable是线程安全的,但是全部使用悲观锁效率非常慢
- ConcurrentHashMap采用分段的技术(降低了锁粒度)、CAS(比较并交换)、volatile关键字等来有效的提高了并发环境下的效率。
二、ConcurrentHashMap原理
在ConcurrentHashMap中通过一个Node<K,V>[]数组来保存添加到map中的键值对,而在同一个数组位置是通过链表和红黑树的形式来保存的。但是这个数组只有在第一次添加元素的时候才会初始化,否则只是初始化一个ConcurrentHashMap对象的话,只是设定了一个sizeCtl变量,这个变量用来判断对象的一些状态和是否需要扩容。
第一次添加元素的时候,默认初期长度为16,当往map中继续添加元素的时候,通过hash值跟数组长度取与来决定放在数组的哪个位置,如果出现放在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数又达到了8个以上,如果数组的长度还小于64的时候,则会扩容数组。如果数组的长度大于等于64了的话,在会将该节点的链表转换成树。
通过扩容数组的方式来把这些节点给分散开。然后将这些元素复制到扩容后的新的数组中,同一个链表中的元素通过hash值的数组长度位来区分,是还是放在原来的位置还是放到扩容的长度的相同位置去 。在扩容完成之后,如果某个节点的是树,同时现在该节点的个数又小于等于6个了,则会将该树转为链表。
取元素的时候,相对来说比较简单,通过计算hash来确定该元素在数组的哪个位置,然后在通过遍历链表或树来判断key和key的hash,取出value值。
三、源码
初始化
从ConcurrentHashMap部分代码中可以看到,整段代码只是设置了分段的长度(为2的整数次幂),并没有初始化segment内部的HashEntry[],采用的是懒汉方式进行初始化。
//入参为Segment数组的长度,默认为16
public ConcurrentHashMap(int initialCapacity) {
//初始化长度小于0,异常抛出
if (initialCapacity < 0)
throw new IllegalArgumentException();
//MAXIMUM_CAPACITY 为2的30次方,如果大于2的29次方则为最大值,否则为
//大于等于initialCapacity最小2的整数次幂
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//设置初始化长度
this.sizeCtl = cap;
}
get方法
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
//如果当前e的key值是我们要找的,直接返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
//如果eh<0说明当前node已经不存在,有另一个线程在进行扩容,直接去newtable中寻找
return (p = e.find(h, key)) != null ? p.val : null;
//遍历链表获得Value值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//扰动函数
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//死循环,不断重试,直至插入成功
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果为空,进行初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//根据hash值去除对应节点的数据f,如果为null,则不会发生碰撞
//直采用CAS方式进行赋值,成功后直接返回
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//如果当前有线程正在进行扩容,则帮助其扩容完成
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果当前节点不为空,则加锁插入,hash值相同的链表的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
//遍历该链表,如果存在key值进行覆盖val,如果没有则添加在末尾
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//如果当前节点为树节点则直接调用树方法插入
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果当前节点链表数量大于8则转化为shu
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//如果是新插入元素,则进行元素个数+1
addCount(1L, binCount);
return null;
}