ConcurrentSkipListMap高并发原理探究和源码分析

TreeMap、ConcurrentSkipListMap的关系

TreeMap是支持key有序排列的一个key-value数据结构,不过是在单线程情况下使用,并发下不是线程安全的。ConcurrentSkipListMap是基于跳表的实现,也是支持key有序排列的一个key-value数据结构,在并发情况下表现很好,是一种空间换时间的实现,ConcurrentSkipListMap是基于一种乐观锁的方式去实现高并发。



(这里借用一张网上的图片)

ConcurrentSkipListMap中用到了Node数据节点和Index索引节点的存储方式,通过volatile关键字实现了并发的操作。

static final class Node<K,V> {  
        final K key;  
        volatile Object value;//value值  
        volatile Node<K,V> next;//next引用  
        ……  
}  
static class Index<K,V> {  
        final Node<K,V> node;  
        final Index<K,V> down;//down引用  
       volatile Index<K,V> right;//right引用  
       ……  
}  

我们看Node数据,维护了一个next的Node节点,实现中全部的数据节点Node是一种链表的实现方式。Index节点则维护了当前Index节点的Node数据,以及Index的右引用和下引用。查找时Index节点当查到level=1时,即down引用为null时,就会根据当前的Index节点对应的Node节点的链表去查找key对应的数据。


下面从插入数据去看如何实现高并发

public V put(K key, V value) {  
    if (value == null)  
        throw new NullPointerException();  
    return doPut(key, value, false);  
}  
private V doPut(K kkey, V value, boolean onlyIfAbsent) {  
    Comparable<? super K> key = comparable(kkey);  
    for (;;) {  
        Node<K,V> b = findPredecessor(key);//查找key的前驱Node数据节点  
        Node<K,V> n = b.next;  
        for (;;) {  
            if (n != null) {  
                Node<K,V> f = n.next;  
                //1、b的next节点两次读取不一致,跳出第二层for循环重试(这里可能是其他现成在b和n之间执行了插入)  
                if (n != b.next)               // inconsistent read  
                    break;  
                Object v = n.value;  
                //2、数据节点的值为null,表示该数据节点标记为已删除,移除该数据节点并重试。  
                if (v == null) {               // n is deleted  
                    n.helpDelete(b, f);  
                    break;  
                }  
                //3、b节点被标记为已删除,重试  
                if (v == n || b.value == null) // b is deleted  
                    break;  
                int c = key.compareTo(n.key);  
                if (c > 0) {//给定key大于当前可以,继续寻找合适的插入点  
                    b = n;  
                    n = f;  
                    continue;  
                }  
                if (c == 0) {//找到了key  
                    if (onlyIfAbsent || n.casValue(v, value))  
                        return (V)v;  
                    else  
                        break; // restart if lost race to replace value  
                }  
                // else c < 0; fall through  
            }  
            //没有找到,新建数据节点  
            Node<K,V> z = new Node<K,V>(kkey, value, n);  
            if (!b.casNext(n, z))  
                break;         // restart if lost race to append to b  
            int level = randomLevel();//随机一个索引级别,这是skiplist随机性的一个体现  
            if (level > 0)  
                insertIndex(z, level);  
            return null;  
        }  
    }  
}  
[java] view plain copy
/** 
 * Creates and adds index nodes for the given node. 
 * @param z the node 
 * @param level the level of the index 
 */  
private void insertIndex(Node<K,V> z, int level) {  
    HeadIndex<K,V> h = head;  
    int max = h.level;  
  
    if (level <= max) {//索引级别已经存在
        Index<K,V> idx = null;  
        for (int i = 1; i <= level; ++i)//首先得到一个包含1~level个索引级别的down关系的链表,最后的inx为最高level索引   
            idx = new Index<K,V>(z, idx, null);  
        addIndex(idx, h, level);//Adds given index nodes from given level down to 1.新增索引  
    } else { // Add a new level 新增索引级别  
        /* To reduce interference by other threads checking for empty levels in tryReduceLevel, new levels are added 
         * with initialized right pointers. Which in turn requires keeping levels in an array to access them while 
         * creating new head index nodes from the opposite direction. */  
        level = max + 1;  
        Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];  
        Index<K,V> idx = null;  
        for (int i = 1; i <= level; ++i)  
            idxs[i] = idx = new Index<K,V>(z, idx, null);  
  
        HeadIndex<K,V> oldh;  
        int k;  
        for (;;) {  
            oldh = head;  
            int oldLevel = oldh.level;//更新head  
            if (level <= oldLevel) { // lost race to add level  
                k = level;  
                break;  
            }  
            HeadIndex<K,V> newh = oldh;  
            Node<K,V> oldbase = oldh.node;  
            for (int j = oldLevel+1; j <= level; ++j)  
                newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);  
            if (casHead(oldh, newh)) {  
                k = oldLevel;  
                break;  
            }  
        }  
        addIndex(idxs[k], oldh, k);  
    }  
}  

 private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
        //  insertionLevel 代表要插入的level,该值会在indexLevel~1间遍历一遍
        int insertionLevel = indexLevel;
        Comparable<? super K> key = comparable(idx.node.key);
        if (key == null) throw new NullPointerException();
        // 和get操作类似,不同的就是查找的同时在各个level上加入了对应的key
        for (;;) {
            int j = h.level;
            Index<K,V> q = h;
            Index<K,V> r = q.right;
            Index<K,V> t = idx;
            for (;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    int c = key.compareTo(n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                if (j == insertionLevel) {//在该层level中执行插入操作
                    // Don't insert index if node already deleted
                    if (t.indexesDeletedNode()) {
                        findNode(key); // cleans up
                        return;
                    }
                    if (!q.link(r, t))//执行link操作,其实就是inset的实现部分
                        break; // restart
                    if (--insertionLevel == 0) {
                        // need final deletion check before return
                        if (t.indexesDeletedNode())
                            findNode(key);
                        return;
                    }
                }
                if (--j >= insertionLevel && j < indexLevel)//key移动到下一层level
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }


猜你喜欢

转载自blog.csdn.net/codingtu/article/details/79411609