参考书籍《并发编程的艺术》 -------方腾飞
博客: https://blog.csdn.net/anla_/article/details/78838860
锁的全面解析 (非常全)值得细细研究 (重要1.8)
在Java中,也有适用与并发情况下
Map的 HashTable类
Collections
包装的synchronizedMap
方法。
二者原理基本一直,都是在HashMap基础上,给方法加上synchronized
关键字,同时控制访问。
给方法加上synchronized,锁住的是整个集合对象,其他的线程都无法访问了,性能会受到很大的影响。
ConcurrentHashMap定义:
- Java7 利用了ReentrantLock的Segment数组实现,在ConcurrentHashMap里扮演锁的角色。
- Java8中,则是使用CAS+synchronized实现,而其上锁的,则是table数组的一个元素。
二者在分段锁的思想上基本是一致的,均是对节点加锁,Java7引入了一个Segment来作为控制访问的锁,Java8则直接使用优化后的synchronized作为锁。
另一个方面,Java8中的ConcurrentHashMap实现,在存储结构上,也和HashMap趋于一致。数组+链表+红黑树形式进行存储。
1.7的类图:属性&结构
基本的初始大小initialCapacity,加载因子loadFactor,数组和红黑树转化的阈值
(TREEIFY_THRESHOLD
= 8,UNTREEIFY_THRESHOLD
= 6)均与HashMap里面一致。
说说ConcurrentHashMap里面几个重要的属性字段:
//table数组 transient volatile Node<K,V>[] table; //扩容的下一个数组,最终会table=nextTable进行赋值。 private transient volatile Node<K,V>[] nextTable; //控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。 private transient volatile int sizeCtl; //辅助计数,计算size大小 private transient volatile CounterCell[] counterCells;
sizeCtl:
- 负数代表正在进行初始化或扩容操作
- 1代表正在初始化
- N 表示有N-1个线程正在进行扩容操作
- 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
几个内部类:
- Node:继承自Map.Entry,存储键值对,数组的节点。
- TreeNode:继承自Node,用作红黑树的节点,当发生hash冲突时候,首先会转化为链表存储,当链表节点个数超过8时候,就会转化为红黑树存储。
- TreeBin:继承自该类并不负责key-value的键值对包装,它用于在链表转换为红黑树时包装TreeNode节点,也就是说ConcurrentHashMap红黑树存放是TreeBin,不是TreeNode。该类封装了一系列红黑树插入删除以及维持平衡的方法,包括putTreeVal、lookRoot、UNlookRoot、remove、balanceInsetion、balanceDeletion。
- ForwardingNode:该类仅仅只存活在ConcurrentHashMap扩容操作时。只是一个标志节点,并且指向nextTable,它提供find方法而已。该类也是继承Node节点,其hash为-1,key、value、next均为null。
ConcurrentHashMap的几个方法:
initTable()
初始化是发生在插入的时候,例如put、merge、compute、computeIfAbsent、computeIfPresent操作时。
/** * 初始化表的方法。 */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //只有当表还尚未初始化的时候, if ((sc = sizeCtl) < 0) //sizeCtl<0,说明正在初始化或者扩容,那就先休眠,自旋等一会。 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //成功把sizeCtl设为-1,即告诉别人正在扩容 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //设置容量 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //初始化n大小数组。 table = tab = nt; //把其复制给table。 sc = n - (n >>> 2); //sc为n的一半。 } } finally { //重新复制给sizeCtl。 sizeCtl = sc; } break; } } return tab; }
put操作
put操作算是ConcurrentHashMap里面一个比较核心方法,并发不安全问题也主要出现在这里面,接下来仔细看看:
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //kv,不能为null。 int hash = spread(key.hashCode()); //通过key,执行spread方法,获得hash值。 int binCount = 0; //用来标识是用哪种方法存储,冲突链表或者二叉树 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //如果tab为null,就初始化。 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //表已经初始化,但是特定位置为null,还没有节点。 if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) //cas方法,把一个新的node插入到tab里面,并且next为null。成功就退出循环 break; } else if ((fh = f.hash) == MOVED) //如果这个位置的hash值是MOVED,也就是-1;帮助它扩容。 tab = helpTransfer(tab, f); else { //另外一种情况,有竞争。,用synchronized,给tab[i]这个节点加锁。 V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //再次验证下,这个位置这个节点是不是f。 if (fh >= 0) { //表示对链表操作。 binCount = 1; for (Node<K,V> e = f;; ++binCount) { //依次往下寻找 自旋状态,用e存储f K ek; if (e.hash == hash &&((ek = e.key) == key || (ek != null && key.equals(ek)))) { //key和内容均等,那么就是替换 oldVal = e.val; if (!onlyIfAbsent) //如果允许相等就替换的话,那么就替换。 e.val = value; break; } Node<K,V> pred = e; //把e换为e.next,那么就是往后面节点插一个。 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //说明是二叉树节点状态 else if (f instanceof TreeBin) { //看f是不是二叉树状态。 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) { //转化为TreeBin后,插入二叉树节点。 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //检测是否需要把链表转化为二叉树操作。 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //总数+1 addCount(1L, binCount); return null; }和HashMap插入思想在结构上一致,通过判定为二叉树还是链表节点,从而进行不同的插入。
当插入时,会用synchronized来锁着table[i]节点,也就是一串子节点都被锁住了。
接下来就会进行不同情况的插入。
size操作
上面put方法里面,涉及到两个方法helpTransfer
和addCount
方法,这两个方法,都是有关扩容操作
通过size得到的结果,是不准确的,即无法完全准确的获取ConcurrentHashMap的容量,
因为是多线程的,所以完全有可能发生变化。
看看size方法定义:public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
final long sumCount() { //把所有CounterCell里面的值都加一遍,那么就是它的大小了 CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
CounterCell
,又是什么时候初始化,以及什么时候改变里面的值呢?
put方法里面有个addCount
方法。
这个方法做了2件事,一是增加当前put锁节点的CounterCell的值,第二个是判断需不需要扩容。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //counterCells不为null时候,或者,当counterCells为null,并且将baseCount设置为baseCount+x失败。说明并发了 CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //as不为null,并且CAS替换cellValue失败,就增加当前替换节点的CounterCell的value值 fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); //否则,s=sumCount() } //以下则为检测是否需要扩容的代码 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //需要扩容 int rs = resizeStamp(n); //对n进行或运算 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) //更改sizeCtl,进行扩容。别人已经开始了,则我同时共享这个nextTable一起扩。 transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
首先尝试增加baseCount,否则利用fullAddCount进行添加,fullAddCount通过获得ThreadLocal中probe从而来增加本节点的CountCell值。而里面的具体操作则是线程安全的,利于原子性控制cellsBusy的状态来进行加锁解锁
扩容transfer
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // if (nextTab == null) { // 初始化操作。 try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //构造一个nextTable对象 它的容量是原来的两倍 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); ////构造一个连节点指针 用于标志位 boolean advance = true; //并发扩容的关键属性 如果等于true 说明这个节点已经处理过 boolean finishing = false; // 判断是否完全完成。 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { //这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点 int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //如果所有的节点都已经完成复制工作 就把nextTable赋值给table nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); ////扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍 return; } //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //如果遍历到的节点为空 则放入ForwardingNode指针 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); ////如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过,所以这就是能够并发的扩容! else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { //节点上锁 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { //链表节点。 //以下的部分在完成的工作是构造两个链表 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); //在nextTable的i位置上插入一个链表 setTabAt(nextTab, i + n, hn); //在nextTable的i+n的位置上插入另一个链表 setTabAt(tab, i, fwd); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 advance = true; //设置advance为true 返回到上面的while循环中 就可以执行i--操作 } //二叉树节点。 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; //构造正序和反序两个链表 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //检验是否需要转化为链表进行存储。 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
nextTable是全局变量,对于多线程,也是能够进行扩容的,可以这样理解,“多一个线程就多一份力量”,当然是在一定条件下。步骤如下:
- 检查nextTable是否为null,如果是,则初始化nextTable,容量为table的两倍。
- 自旋遍历节点,直到finished:节点从table复制到nextTable中,支持并发,思路如下:
- 如果节点 f 为null,则插入ForwardingNode(采用Unsafe.compareAndSwapObjectf方法实现),从而别的线程不会走到这个节点。
- 如果f为链表的头节点(fh >= 0),则先构造两个链表,通过hash值第n位不同区分
(ph & n) == 0
,然后把他们分别放在nextTable的i和i + n位置,并将ForwardingNode 插入原节点位置,代表已经处理过了 - 如果f为TreeBin节点,同样也是构造两课树 ,同时需要判断是否需要进行unTreeify()操作,并把处理的结果分别插入到nextTable的i 和i+nw位置,并插入ForwardingNode 节点
- 所有节点复制完成后,则将table指向nextTable,同时更新sizeCtl = nextTable的0.75倍,完成扩容过程
为什么能够并发的扩容,并且能够“帮助”别的线程一起扩容?
再看helpTransfer方法:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //tab不为null,并且传进来这个f正在扩容,next不为null。 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); //或运算获得rs while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) //扩容完成了。 break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //当前线程扩容大军。 transfer(tab, nextTab); break; } } return nextTab; } return table; }
可以这样理解,有个nextTable,用于指向下一个扩容之后的数组。
如下面的图:
- 假设一个线程A正在复制节点1的数据,到table中,那么此时,它把节点1给锁住了,并且标识为ForwardingNode,也就是hash值为MOVED。
- 由于没有对整个ConcurrentHashMap加锁,当线程2也能访问这个table,来了后它也尝试扩容,当进行第一个时候,诶,发现有人在扩容,我得先棒棒它。
- 但是节点1有人在扩,进不去,那我就去节点2扩容,如果此时节点2也有人占了,那么就节点3依次往后。
由于每次获取获取某一个节点f只能为一个线程,所以最终保证了线程的安全,并且能让其他线程来辅助扩容。
get方法
get方法并不涉及到线程安全,因为并没有对table数组结构造成修改,所以思想直接计算hash函数获得即可:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //通过(n-1)&h 算法,发现这里有值。 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) //就是这个 return e.val; } else if (eh < 0) //eh小于0,那么就通过next去找,也就是链表方式。 return (p = e.find(h, key)) != null ? p.val : null; //正常eh>0时候,通过next去找。 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
hash算法
和HashMap类似,我觉得除了上述讲的基于HashMap结构,Doug Lea给我们展示了一个新的线程安全的容器,并且是基于synchronized!当然synchronized也是经过了很多的优化才能在ConcurrentHashMap中替代ReentrantLock。
ConcurrentHash的散列函数比较有意思:(n - 1) & h
,
主要是在扩容的时候,和HashMap一样,也是2的倍数。并且是支持并发扩容,利用hash&n
为0或者1来把节点上的链式存储一分为2存储。插入到i和i+n的位置。保证了正确性的同时也保证了效率。