《码出高效》学习:ConcurrentHashMap

目录

 

简介

基本认识

初始化:initTable方法

插入元素:实际为putVal方法

链表进化:treeifyBin方法

协助扩容:helpTransfer & Transfer

删除节点:实际为replaceNode方法

计数方法的改进


简介

 

ConcurrentHashMap主要应用于高并发环境下,使用了大量的lock-free技术来减轻锁竞争导致的性能下降。从JDK5开始引入,最初使用锁分段技术,即并发时不是对整个Map加锁,而是对数据所处的Segment进行加锁:(图片来自网络,侵删)

ConcurrentHashMap初始化时,计算出Segment数组的大小ssize和每个Segment中HashEntry数组的大小cap,并初始化Segment数组的0元素。Segment使用ReentrantLock可重入锁加锁。

JDK8开始,ConcurrentHashMap进行了以下三个主要改进:

  • 取消分段锁机制,改用CAS
  • 引入红黑树结构,元素数量达到阈值后自动进化
  • 引入mappingCount方法,能够统计更多数量的元素(2^{63}-1

基本认识

首先看成员变量:

    //实际存放数据的数组,默认null,大小总是2的幂
    //第一次插入数据时才会初始化
    transient volatile Node<K,V>[] table;

    //扩容时生成,大小时原数组的2倍
    private transient volatile Node<K,V>[] nextTable;

    //直接存储的元素数,通过CAS更新
    private transient volatile long baseCount;

    //用来控制table初始化和扩容
    //-1:代表正在初始化
    //-n:代表n-1个线程正在进行扩容
    //0:默认值,将使用默认容量进行初始化
    //>0:代表初始化或扩容中需要使用的容量
    private transient volatile int sizeCtl;

    //扩容时转移数据使用
    private transient volatile int transferIndex;

    //一个用于扩容的自旋锁对象
    private transient volatile int cellsBusy;

    //计数单元的数组,非空时数量是2的幂
    //Map存储的元素真实数量等于baseCount加上每个counterCell的值
    private transient volatile CounterCell[] counterCells;

    // 用来支持 keySet()、entrySet()、values()等方法的视图
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

以及内部类:

  • Node<K,V>:继承了Map.Entry<K,V>,是数据节点,有四个子类,分别重写了find方法:
    • TreeBin:不实际存储数据,而是TreeNode的桶,维护了桶内红黑树的读写锁和节点引用,hash值固定为-2
    • TreeNode:实际存储数据的节点
    • ForwardingNode:扩容转发节点,,用来把原有哈希槽的操作转发到nextTable(内部记录了nextTable),hash值固定为-1。正在put的线程遇到它的find方法,就不得不协助扩容
    • ReservationNode:占位加锁节点,某些方法(computeIfAbsent)用它进行加锁,hash值固定为-3

还有静态变量:

    //单个table最大容量,为什么不是32呢?因为有两个bit用作哈希控制
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    //table初始化大小
    private static final int DEFAULT_CAPACITY = 16;

    //toArray之后最大大小
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    //用来与旧版本的ConcurrentHashMap保持兼容性
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    //负载因子
    private static final float LOAD_FACTOR = 0.75f;

    //进化阈值,如果桶内节点数超过阈值则从链表进化为红黑树
    static final int TREEIFY_THRESHOLD = 8;

    //退化阈值,节点数少于这个值就退化为链表
    static final int UNTREEIFY_THRESHOLD = 6;

    //进化阈值,进化时必须保证table容量到达该值,否则只扩容不进化
    static final int MIN_TREEIFY_CAPACITY = 64;

    //要求每个线程至少调整多少个桶,用来控制大小调整时线程数量,防止过度竞争
    private static final int MIN_TRANSFER_STRIDE = 16;

    //用于扩容时生成一个戳,下面有用到
    private static int RESIZE_STAMP_BITS = 16;

    //扩容线程最多有多少个
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    //通过对RESIZE_STAMP_BITS移位,用来生成sizeCtl
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

存储结构如下:(图拍自书)

初始化:initTable方法

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //如果表为空才初始化
    while ((tab = table) == null || tab.length == 0) {
        //sizeCtl<0说明已经有线程正在进行初始化操作,就不用本线程插手了
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //否则修改sizeCtl,然后开工
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    //sc 大于零说明容量已经初始化了,否则使用默认容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //计算sizeCtl,也就是初始化所需容量
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
} 

插入元素:实际为putVal方法

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果table为空,直接初始化
            //需要注意的是,ConcurrentHashMap使用懒加载机制,初始化的时候没有创建table
            //直到第一次插入数据才真正初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //如果要插入的bin是空的,也不需要锁
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;
            }
            //遇上了转发节点,于是将操作转发到nextTable,帮助扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            //剩余的情况
            else {
                V oldVal = null;
                synchronized (f) {
                    //如果是目标table,则继续;否则继续下一轮循环
                    //是为了保证锁住的是hash桶的第一个节点,阻止其他写操作进入
                    if (tabAt(tab, i) == f) {
                        //fh在上一个else if初始化为f.hash
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //对已有值进行修改
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                //插入新值
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果已经进化为红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            //红黑树保存时,固定为2,保证put后更改计数值时能够进行扩容检查,同时不触发红黑树化操作
                            binCount = 2;
                            //红黑树的节点插入方法
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //检测是否要进化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //CAS式修改元素数目
        addCount(1L, binCount);
        return null;
    }

插入操作比较简单,链表就是让前一个节点的next指针指向自己,红黑树请参见TreeMap:《码出高效》学习:TreeMap与红黑树 代码肯定不一样,但是原理大致是一样的

链表进化:treeifyBin方法

    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            //在这里进行了判断,要求table数必须达到阈值,否则仅仅扩容一倍(使用CAS)
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                //锁住槽
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            //将链表节点构造为树节点
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            //判断是不是第一个节点(根节点)
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        //把TreeBin插入
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

进化方法很简单,就是把链表转化为树,插回哈希槽

真正复杂的是转化

协助扩容:helpTransfer & Transfer

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //检测是不是转发节点
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        //返回一个 16 位长度的扩容校验标识
        int rs = resizeStamp(tab.length);
        //sizeCtl小于0,说明处于扩容状态
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //前 16 位是数据校验标识,后 16 位是当前正在扩容的线程总数
            //这里判断校验标识是否相等,如果校验符不等或者扩容操作已经完成了,直接退出循环
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //否则调用 transfer 帮助它们进行扩容
            //sc + 1 标识增加了一个线程进行扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

helpTransfer主要是进行一些校验,确保需要协助扩容才去调用transfer方法

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //要求每个线程都至少负责MIN_TRANSFER_STRIDE个桶
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            ……    //这段省略,就是初始化nextTable
        }
        int nextn = nextTab.length;
        //初始化转发节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; 
        //i 指向当前桶,bound 指向当前线程需要处理的桶结点的区间下限
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //通过--i遍历桶
            while (advance) {
                int nextIndex, nextBound;
                //退出循环的条件:超出区间、完成标记为true
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //没完成就为当前线程分配任务,处理区间:(nextBound,nextIndex)
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //当前线程所有任务完成
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //如果迁移操作全部完成了,进行清理并退出
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }                
                //否则更新sizeCtl,并看看是不是还有别的线程在处理
                //没有就可以退出走人,不然就把自己标记为finishing,等下次循环再清理、退出
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //没有待迁移桶,就标识该桶已处理
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //如果该桶已处理,跳过
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            //链表迁移
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //遍历桶,找到整个桶中最后连续的 runBit 不变的结点
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //如果runbit都是0,则nextTab[i]内元素ln前逆序,ln及其之后顺序
                            //否则,nextTab[i+n]内元素全部相对原table逆序
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //将ln、hn整体迁移,并标识已处理
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //红黑树的复制
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

每个线程加入工作以后,会先领取自己的任务,然后开始遍历桶,逐个进行处理。

如果遇到桶空了,就标识该桶已经被处理完成。

如果遇到已经处理完成的桶,直接跳过。

如果是还没处理的桶,对桶首节点加锁,开始迁移,迁移结束后标识该桶已经处理。

如果整张表的迁移操作已经全部完成了,就重置 table 的引用并将 nextTable 置空,相当于打扫战场。否则,用CAS令sizeCtl减1,表示当前线程完成任务,要退出扩容了,然后看自己是不是最后一个线程,是则说明也可以直接退出,否则就要标记自己等下次循环再退出。

删除节点:实际为replaceNode方法

和插入方法代码比较像,就不贴了,流程也差不多。

首先遍历整张表的桶结点,如果表还未初始化或者无法找到目标桶结点,那么将返回 null。

如果桶结点类型是转发结点,则协助扩容。

否则根据链表和红黑树两种情况,删除节点。(插入操作那里是修改or插入节点,就这里算比较大的不同)

最后使用CAS更新baseCount。

计数方法的改进

在JDK7中,获取size的流程如下:

  • 遍历所有segment,计算它们的modCount和size
  • 如果segment发生变化,就重新遍历;如果遍历了三遍,还是出现变动,则对整个Map进行加锁
  • 然后统计size之和并返回

在高并发条件下,修改很频繁,很容易就会触发全段上锁的条件,导致性能下降

在JDK8后,计数值被分成了baseCount和CounterCell两部分:

  • 并发量很小时,直接CAS操作baseCount
  • 如果CAS都发生冲突,说明并发量够大,此时使用CounterCell以CAS方式记录数量更新
  • 如果CounterCell的操作都出现冲突,那么就进行CounterCell数组扩容
  • 由于扩容比较花时间,在这段时间里就可以尝试操作baseCount

统计数量时,用baseCount和每个CounterCell的value相加即可,虽然不准,但是也差不多,不需要上锁

猜你喜欢

转载自blog.csdn.net/u010670411/article/details/86012630