Java并发包——线程安全的Map相关类
摘要:本文主要学习了Java并发包下线程安全的Map相关的类。
部分内容来自以下博客:
https://blog.csdn.net/bill_xiang_/article/details/81122044
https://www.cnblogs.com/zhaojj/p/8942647.html
分类
参照之前在学习集合时候的分类,可以将JUC下有关Map相关的类进行分类。
ConcurrentHashMap:继承于AbstractMap类,相当于线程安全的HashMap,是线程安全的哈希表。JDK1.7之前使用分段锁机制实现,JDK1.8则使用数组+链表+红黑树数据结构和CAS原子操作实现。
ConcurrentSkipListMap:继承于AbstractMap类,相当于线程安全的TreeMap,是线程安全的有序的哈希表。通过“跳表”来实现的。
ConcurrentHashMap
JDK1.7的分段锁机制
Hashtable之所以效率低下主要是因为其实现使用了synchronized关键字对put等操作进行加锁,而synchronized关键字加锁是对整个对象进行加锁,也就是说在进行put等修改Hash表的操作时,锁住了整个Hash表,从而使得其表现的效率低下。
因此,在JDK1.5到1.7版本,Java使用了分段锁机制实现ConcurrentHashMap。
简而言之,ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段。而每个Segment元素,即每个分段则类似于一个Hashtable。这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可是实现多线程put操作。
Segment是ConcurrentHashMap中的内部类,它就是ConcurrentHashMap中的“锁分段”对应的存储结构。ConcurrentHashMap与Segment是组合关系,一个ConcurrentHashMap对象包含若干个Segment对象。在代码中,这表现为ConcurrentHashMap类中存在“Segment数组”成员。
Segment类继承于ReentrantLock类,所以Segment本质上是一个可重入的互斥锁。
HashEntry也是ConcurrentHashMap的内部类,是单向链表节点,存储着key-value键值对。Segment与HashEntry是组合关系,Segment类中存在“HashEntry数组”成员,“HashEntry数组”中的每个HashEntry就是一个单向链表。
JDK1.8的改进
在JDK1.7的版本,ConcurrentHashMap是通过分段锁机制来实现的,所以其最大并发度受Segment的个数限制。因此,在JDK1.8中,ConcurrentHashMap的实现原理摒弃了这种设计,而是选择了与HashMap类似的数组+链表+红黑树的方式实现,而加锁则采用CAS原子更新、volatile关键字、synchronized可重入锁实现。
重要属性
sizeCtl:标志控制符。这个参数非常重要,出现在ConcurrentHashMap的各个阶段,不同的值也表示不同情况和不同功能:
负数代表正在进行初始化或扩容操作。-1表示正在进行初始化操作。-N表示有N-1个线程正在进行扩容操作。
其为0时,表示hash表还未初始化。
正数表示下一次进行扩容的大小,类似于扩容阈值。它的值始终是当前容量的0.75倍,如果hash表的实际大小>=sizeCtl,则进行扩容。
构造方法
需要说明的是,在构造方法里并没有对集合进行初始化操作,而是等到了添加元素的时候才进行初始化,属于懒汉式的加载方式。
而且loadFactor参数在JDK1.8中也不再有加载因子的意义,仅为了兼容以前的版本,加载因子由sizeCtl来替代。
同样,concurrencyLevel参数在JDK1.8中也不再有多线程运行的并发度的意义,仅为了兼容以前的版本。
1 // 空参构造器。 2 public ConcurrentHashMap() { 3 } 4 5 // 指定初始容量的构造器。 6 public ConcurrentHashMap(int initialCapacity) { 7 // 参数有效性判断。 8 if (initialCapacity < 0) 9 throw new IllegalArgumentException(); 10 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? 11 MAXIMUM_CAPACITY : 12 tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); 13 // 设置标志控制符。 14 this.sizeCtl = cap; 15 } 16 17 // 指定初始容量,加载因子的构造器。 18 public ConcurrentHashMap(int initialCapacity, float loadFactor) { 19 this(initialCapacity, loadFactor, 1); 20 } 21 22 // 指定初始容量,加载因子,并发度的构造器。 23 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { 24 // 参数有效性判断。 25 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 26 throw new IllegalArgumentException(); 27 // 比较初始容量和并发度的大小,取最大值作为初始容量。 28 if (initialCapacity < concurrencyLevel) 29 initialCapacity = concurrencyLevel; 30 // 计算最大容量。 31 long size = (long)(1.0 + (long)initialCapacity / loadFactor); 32 int cap = (size >= (long)MAXIMUM_CAPACITY) ? 33 MAXIMUM_CAPACITY : tableSizeFor((int)size); 34 // 设置标志控制符。 35 this.sizeCtl = cap; 36 } 37 38 // 包含指定Map集合的构造器。 39 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { 40 // 设置标志控制符。 41 this.sizeCtl = DEFAULT_CAPACITY; 42 // 放置指定的集合。 43 putAll(m); 44 }
初始化方法
集合并不会在构造方法里进行初始化,而是在用到集合的时候才进行初始化,在初始化的同时会设置集合的阈值。
在初始化的过程中,使用volatile保证顺序和可见性,使用CAS原子操作保证线程安全。
1 // 初始化集合,使用CAS原子更新保证线程安全,使用volatile保证顺序和可见性。 2 private final Node<K,V>[] initTable() { 3 Node<K,V>[] tab; int sc; 4 // 死循环以完成初始化。 5 while ((tab = table) == null || tab.length == 0) { 6 // 如果sizeCtl小于0则表示正在初始化,当前线程让步。 7 if ((sc = sizeCtl) < 0) 8 Thread.yield(); 9 // 如果需要初始化,并且使用CAS原子更新。判断SIZECTL保存的sizeCtl值是否和sc一致,一致则将sizeCtl更新为-1。 10 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 11 try { 12 // 第一个线程初始化之后,第二个线程还会进来所以需要重新判断。类似于线程同步的二次判断。 13 if ((tab = table) == null || tab.length == 0) { 14 // 如果没有指定容量则使用默认容量16。 15 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 16 // 初始化一个指定容量的节点数组。 17 @SuppressWarnings("unchecked") 18 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 19 // 将节点数组指向集合。 20 table = tab = nt; 21 // 扩容阀值,获取容量的0.75倍的值,写法略叼更高端比直接乘高效。 22 sc = n - (n >>> 2); 23 } 24 } finally { 25 // 将sizeCtl的值设为阈值。 26 sizeCtl = sc; 27 } 28 break; 29 } 30 } 31 return tab; 32 }
添加方法
在添加元素时判断是否进行初始化,如果没有初始化就去执行集合初始化操作。
如果添加的位置上没有元,则直接添加到数组中。
如果添加的位置上的节点正在扩容,则帮助扩容。
如果添加的位置上有元素,并且没有扩容,则尝试根据节点类型是链表还是红黑树进行添加。
判断是否需要将链表转为红黑树。
最后增加集合长度,并且判断是否需要扩容。
1 // 添加元素。 2 public V put(K key, V value) { 3 return putVal(key, value, false); 4 } 5 6 // 添加元素。 7 final V putVal(K key, V value, boolean onlyIfAbsent) { 8 // 排除null的数据。 9 if (key == null || value == null) throw new NullPointerException(); 10 // 计算hash,并保证hash一定大于零,负数表示在扩容或者是树节点。 11 int hash = spread(key.hashCode()); 12 // 节点个数。0表示未加入新结点,2表示TreeBin或链表结点数,其它值表示链表结点数。主要用于每次加入结点后查看是否要由链表转为红黑树。 13 int binCount = 0; 14 // CAS经典写法,不成功无限重试。 15 for (Node<K,V>[] tab = table;;) { 16 // 声明节点、集合长度、对应的数组下标、节点的hash值。 17 Node<K,V> f; int n, i, fh; 18 // 如果没有初始化则进行初始化。除非构造时指定集合,否则默认构造不初始化,添加时检查是否初始化,属于懒汉模式初始化。 19 if (tab == null || (n = tab.length) == 0) 20 // 初始化集合。 21 tab = initTable(); 22 // 如果已经初始化了,并且使用CAS根据hash获取到的节点为null。 23 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 24 // 使用CAS比较该索引处是否为null防止其它线程已改变该值,null则插入。 25 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 26 // 添加成功,跳出循环。 27 break; 28 } 29 // 如果获取到节点不为null,并且节点的hash为-1,则表示节点在扩容。 30 else if ((fh = f.hash) == MOVED) 31 // 帮助扩容。 32 tab = helpTransfer(tab, f); 33 // 产生hash碰撞,并且没有扩容操作。 34 else { 35 V oldVal = null; 36 // 锁住节点。 37 synchronized (f) { 38 // 这里volatile获取首节点与节点对比判断节点还是不是首节点。 39 if (tabAt(tab, i) == f) { 40 // 判断是否是链表节点。 41 if (fh >= 0) { 42 // 记录节点个数。 43 binCount = 1; 44 // 循环完成添加节点到链表。 45 for (Node<K,V> e = f;; ++binCount) { 46 K ek; 47 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { 48 oldVal = e.val; 49 if (!onlyIfAbsent) 50 e.val = value; 51 break; 52 } 53 Node<K,V> pred = e; 54 if ((e = e.next) == null) { 55 pred.next = new Node<K,V>(hash, key, value, null); 56 break; 57 } 58 } 59 } 60 // 如果是红黑树节点。 61 else if (f instanceof TreeBin) { 62 Node<K,V> p; 63 binCount = 2; 64 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { 65 oldVal = p.val; 66 if (!onlyIfAbsent) 67 p.val = value; 68 } 69 } 70 } 71 } 72 // 如果添加到了链表节点,需要进一步判断是否需要转为红黑树。 73 if (binCount != 0) { 74 // 如果链表上的节点数大于等于8。 75 if (binCount >= TREEIFY_THRESHOLD) 76 // 尝试转为红黑树。 77 treeifyBin(tab, i); 78 if (oldVal != null) 79 // 返回原值。 80 return oldVal; 81 break; 82 } 83 } 84 } 85 // 集合容量加一并判断是否要扩容。 86 addCount(1L, binCount); 87 return null; 88 }
修改容量并判断是否需要扩容
尝试对baseCount和CounterCell进行增加的操作,这些操作基于CAS原子操作,同时使用volatile保证顺序和可见性。
1 // 修改容量并判断是否要扩容。 2 private final void addCount(long x, int check) { 3 CounterCell[] as; long b, s; 4 // counterCells不为null,或者使用CAS对baseCount增加失败了,说明产生了并发,需要进一步处理。 5 // counterCells初始为null,如果不为null,说明产生了并发。 6 // 如果counterCells仍然为null,但是在使用CAS对baseCount增加的时候失败,表示产生了并发。 7 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 8 CounterCell a; long v; int m; 9 boolean uncontended = true; 10 // 如果counterCells是null的,或者counterCells的个数小于0。 11 // 或者counterCells的每一个元素都是null。 12 // 或者用counterCells数组中随机位置的值进行累加也失败了。 13 if (as == null || (m = as.length - 1) < 0 || 14 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 15 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 16 // 继续更新counterCells和baseCount。 17 fullAddCount(x, uncontended); 18 return; 19 } 20 // 删除或清理节点时是-1,插入索引首节点0,第二个节点是1。 21 if (check <= 1) 22 return; 23 // 计算map元素个数。 24 s = sumCount(); 25 } 26 // 如果check的值大于等于0,需要检查是否要扩容。删除或清理节点时是-1,此时不检查。 27 if (check >= 0) { 28 Node<K,V>[] tab, nt; int n, sc; 29 // 当元素个数大于阈值,并且集合不为空,并且元素个数小于最大值。循环判断,防止多线程同时扩容跳过if判断。 30 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { 31 // 生成与n有关的标记,且n不变的情况下生成的一定是一样的。 32 int rs = resizeStamp(n); 33 // sc在单线程时是大于等于0的,如果小于0说明有其他线程正在扩容。 34 // 如果小于0说明有线程执行了else里面的判断,导致rs左移16位并在低位+2赋值给sc。 35 if (sc < 0) { 36 // 在第一次左移16位的sc,经过第二次右移16位之后,还和rs相同,说明已经扩容完成。 37 // 线程执行扩容,会使用CAS让sc自增,如果sc和右移并累加后的rs相等,说明已经扩容完成。 38 // 线程执行扩容,会使用CAS让sc自增,如果sc和右移并累加最大值后的rs相等,说明已经扩容完成。 39 // 如果下个节点是null,说明已经扩容完成。 40 // 如果transferIndex小于等于0,说明集合已完成扩容,无法再分配任务。 41 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 42 sc == rs + 1 ||// 此处应为 sc == (rs << RESIZE_STAMP_SHIFT) + 1 43 sc == rs + MAX_RESIZERS ||// 此处应为 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS 44 (nt = nextTable) == null || 45 transferIndex <= 0) 46 // 跳出循环。 47 break; 48 // 使用CAS原子累加sc的值。 49 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 50 // 扩容。 51 transfer(tab, nt); 52 } 53 // 如果sizeCtl大于或等于0,说明第一次扩容,并且使用CAS设置sizeCtl为rs左移后的负数,并且低位+2表示有2-1个线程正在扩容。 54 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) 55 // 进行扩容操作。 56 transfer(tab, null); 57 // 计算map元素个数,baseCount和counterCells数组存的总和。 58 s = sumCount(); 59 } 60 } 61 }
帮助扩容方法
1 // 帮助扩容。 2 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { 3 Node<K,V>[] nextTab; int sc; 4 // 如果表不为null,并且不是fwd类型的节点,并且节点的子节点也不为null。 5 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { 6 // 得到标识符。 7 int rs = resizeStamp(tab.length); 8 // 如果nextTab没有被并发修改,并且tab也没有被并发修改,并且sizeCtl小于0说明还在扩容。 9 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { 10 // 在第一次左移16位的sc,经过第二次右移16位之后,还和rs相同,说明已经扩容完成。 11 // 线程执行扩容,会使用CAS让sc自增,如果sc和右移并累加后的rs相等,说明已经扩容完成。 12 // 线程执行扩容,会使用CAS让sc自增,如果sc和右移并累加最大值后的rs相等,说明已经扩容完成。 13 // 如果transferIndex小于等于0,说明集合已完成扩容,无法再分配任务。 14 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 15 sc == rs + 1 ||// 此处应为 sc == (rs << RESIZE_STAMP_SHIFT) + 1 16 sc == rs + MAX_RESIZERS ||// 此处应为 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS 17 transferIndex <= 0) 18 // 跳出循环。 19 break; 20 // 使用CAS原子累加sc的值。 21 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { 22 // 扩容。 23 transfer(tab, nextTab); 24 break; 25 } 26 } 27 return nextTab; 28 } 29 return table; 30 }
扩容方法
1 // 进行扩容操作。 2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 3 int n = tab.length, stride; 4 // 根据cpu个数找出扩容时的最小分组,最小是16。 5 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 6 stride = MIN_TRANSFER_STRIDE; 7 // 表示第一次扩容,因为在addCount()方法中,第一次扩容的时候传入的nextTab的值是null。 8 if (nextTab == null) { 9 try { 10 // 创建新的扩容后的节点数组。 11 @SuppressWarnings("unchecked") 12 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 13 // 将新的数组赋值给nextTab。 14 nextTab = nt; 15 } catch (Throwable ex) { 16 // 扩容失败,设置sizeCtl为最大值。 17 sizeCtl = Integer.MAX_VALUE; 18 return; 19 } 20 // 将新的数组赋值给nextTable。 21 nextTable = nextTab; 22 // 记录要扩容的区间最大值,说明是逆序迁移,从高位向低位迁移。 23 transferIndex = n; 24 } 25 // 设置扩容后的容量。 26 int nextn = nextTab.length; 27 // 创建一个fwd节点,用于占位,fwd节点的hash默认为-1。当别的线程发现这个槽位中是fwd类型的节点,则跳过这个节点。 28 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 29 // 如果是false,需要处理区间上的当前位置,如果是true,说明需要处理区间上的下一个位置。 30 boolean advance = true; 31 // 完成状态,如果是true,就结束此方法。 32 boolean finishing = false; 33 // 死循环,i表示最大下标,bound表示最小下标。 34 for (int i = 0, bound = 0;;) { 35 Node<K,V> f; int fh; 36 // 循环判断是否要处理区间上的下一个位置,每个线程都会在这个循环里获取区间。 37 while (advance) { 38 int nextIndex, nextBound; 39 // i自减一并判断是否大于等于bound,以及是否已经完成了扩容。 40 // 如果i自减后大于等于bound并且未完成扩容,说明需要处理当前i位置上的节点,跳出while循环。 41 // 如果i自减后小于bound并且未完成扩容,说明区间上没有节点需要处理,在while循环里继续判读。 42 // 如果已经完成扩容,跳出while循环。 43 if (--i >= bound || finishing) 44 // 跳出while循环。 45 advance = false; 46 // 如果要扩容的区间最大值小于等于0,说明没有区间需要扩容了。 47 else if ((nextIndex = transferIndex) <= 0) { 48 // i会在下面的if块里判断,从而进入完成状态判断。 49 i = -1; 50 // 跳出while循环。 51 advance = false; 52 } 53 // 首次while循环进入,CAS判断transferIndex和nextIndex是否一致,将transferIndex修改为最大值。 54 else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, 55 nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { 56 // 当前线程处理区间的最小下标。 57 bound = nextBound; 58 // 初次对i赋值,当前线程处理区间的最大下标。 59 i = nextIndex - 1; 60 // 跳出while循环。 61 advance = false; 62 } 63 } 64 // 判读是否完成扩容。 65 // 如果i小于0,表示已经处理了最后一段空间。 66 // 如果i大于等于原容量,表示超过下标最大值。 67 // 如果i加上原容量大于等于新容量,表示超过下标最大值。 68 if (i < 0 || i >= n || i + n >= nextn) { 69 int sc; 70 // 如果完成扩容,finishing为true,表示最后一个线程完成了扩容。 71 if (finishing) { 72 // 删除成员变量。 73 nextTable = null; 74 // 更新集合。 75 table = nextTab; 76 // 更新阈值。 77 sizeCtl = (n << 1) - (n >>> 1); 78 return; 79 } 80 // 如果没完成扩容,当前线程完成这段区间的扩容,将sc的低16位减1。 81 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 82 // 如果判断是否是最后一个扩容线程,如果不等于,说明还有其他线程在扩容,当前线程返回。 83 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 84 return; 85 // 如果相等,说明当前最后一个线程完成扩容,扩容结束,并再次进入while循环检查一次。 86 finishing = advance = true; 87 // 再次循环检查一下整张表。 88 i = n; 89 } 90 } 91 // 正常处理区间,如果原数组i位置是null,就使用fwd占位。 92 else if ((f = tabAt(tab, i)) == null) 93 // 如果成功写入fwd占位,进入while循环,继续处理区间的下一个节点。 94 advance = casTabAt(tab, i, null, fwd); 95 // 正常处理区间,如果原数组i位置不是null,并且hash值是-1,说明别的线程已经处理过了。 96 else if ((fh = f.hash) == MOVED) 97 // 进入while循环,继续处理区间的下一个节点。 98 advance = true; 99 // 到这里,说明这个位置有实际值了,且不是占位符。 100 else { 101 // 对这个节点上锁,防止添加元素的时候向链表插入数据。 102 synchronized (f) { 103 // 判断i下标处的桶节点是否和f相同,二次校验。 104 if (tabAt(tab, i) == f) { 105 // 声明高位桶和低位桶。 106 Node<K,V> ln, hn; 107 // 如果f的hash值大于0,表示是链表结构。红黑树的hash默认是-2。 108 if (fh >= 0) { 109 // 获取原容量最高位同节点hash值的与运算结果,用来判断将该节点放到高位还是低位。 110 int runBit = fh & n; 111 // 定义尾节点,暂时取f节点,后面会更新。 112 Node<K,V> lastRun = f; 113 // 遍历这个节点。 114 for (Node<K,V> p = f.next; p != null; p = p.next) { 115 // 获取原容量最高位同节点hash值的与运算结果,用来判断将该节点放到高位还是低位。 116 int b = p.hash & n; 117 // 如果节点的hash值和首节点的hash值,同原容量最高位与运算的结果不同。 118 if (b != runBit) { 119 // 更新runBit,用于下面判断lastRun该赋值给ln还是hn。 120 runBit = b; 121 // 更新lastRun,保证后面的节点与自己的取于值相同,避免后面没有必要的循环。 122 lastRun = p; 123 } 124 } 125 // 如果最后更新的runBit是0,设置低位节点。 126 if (runBit == 0) { 127 ln = lastRun; 128 hn = null; 129 } 130 // 如果最后更新的runBit是1,设置高位节点。 131 else { 132 hn = lastRun; 133 ln = null; 134 } 135 // 再次循环,生成两个链表,lastRun作为停止条件,这样就是避免无谓的循环。 136 for (Node<K,V> p = f; p != lastRun; p = p.next) { 137 int ph = p.hash; K pk = p.key; V pv = p.val; 138 // 如果与运算结果是0,那么创建低位节点。 139 if ((ph & n) == 0) 140 ln = new Node<K,V>(ph, pk, pv, ln); 141 // 如果与运算结果是1,那么创建高位节点。 142 else 143 hn = new Node<K,V>(ph, pk, pv, hn); 144 } 145 // 设置低位链表,放在新数组的i位置。 146 setTabAt(nextTab, i, ln); 147 // 设置高位链表,放在新数组的i+n位置。 148 setTabAt(nextTab, i + n, hn); 149 // 将旧的链表设置成fwd占位符。 150 setTabAt(tab, i, fwd); 151 // 继续处理区间的下一个节点。 152 advance = true; 153 } 154 // 如果是红黑树结构。 155 else if (f instanceof TreeBin) { 156 TreeBin<K,V> t = (TreeBin<K,V>)f; 157 TreeNode<K,V> lo = null, loTail = null; 158 TreeNode<K,V> hi = null, hiTail = null; 159 int lc = 0, hc = 0; 160 // 遍历。 161 for (Node<K,V> e = t.first; e != null; e = e.next) { 162 int h = e.hash; 163 TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null); 164 // 与运算结果为0的放在低位。 165 if ((h & n) == 0) { 166 if ((p.prev = loTail) == null) 167 lo = p; 168 else 169 loTail.next = p; 170 loTail = p; 171 ++lc; 172 } 173 // 与运算结果为1的放在高位。 174 else { 175 if ((p.prev = hiTail) == null) 176 hi = p; 177 else 178 hiTail.next = p; 179 hiTail = p; 180 ++hc; 181 } 182 } 183 // 如果树的节点数小于等于6,那么转成链表,反之,创建一个新的树。 184 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; 185 // 如果树的节点数小于等于6,那么转成链表,反之,创建一个新的树。 186 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; 187 // 设置低位树,放在新数组的i位置。 188 setTabAt(nextTab, i, ln); 189 // 设置高位数,放在新数组的i+n位置。 190 setTabAt(nextTab, i + n, hn); 191 // 将旧的树设置成fwd占位符。 192 setTabAt(tab, i, fwd); 193 // 继续处理区间的下一个节点。 194 advance = true; 195 } 196 } 197 } 198 } 199 } 200 }
获取方法
根据指定的键,返回对应的键值对,由于是读操作,所以不涉及到并发问题。
1 // 获取元素。 2 public V get(Object key) { 3 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 4 // 计算hash,并保证hash一定大于零,负数表示在扩容或者是树节点。 5 int h = spread(key.hashCode()); 6 // 如果集合不为null,并且集合长度大于0,并且指定位置上的元素不为null。 7 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { 8 // 如果hash相等。 9 if ((eh = e.hash) == h) { 10 // 如果首节点是要找的元素。 11 if ((ek = e.key) == key || (ek != null && key.equals(ek))) 12 return e.val; 13 } 14 // 如果正在扩容或者是树节点。 15 else if (eh < 0) 16 // 尝试查找元素,找到返回元素,找不到返回null。 17 return (p = e.find(h, key)) != null ? p.val : null; 18 // 如果不是首节点,则遍历集合查找。 19 while ((e = e.next) != null) { 20 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) 21 return e.val; 22 } 23 } 24 return null; 25 }
删除方法
删除操作,可以看成是用null替代原来的节点,因此合并在这个方法中,由这个方法一起实现删除操作和替换操作。
replaceNode()方法中的三个参数,key表示想要删除的键,value表示想要替换的元素,cv表示想要删除的key对应的值。
1 // 删除元素。 2 public V remove(Object key) { 3 return replaceNode(key, null, null); 4 } 5 6 // 删除元素 7 final V replaceNode(Object key, V value, Object cv) { 8 // 计算hash,并保证hash一定大于零,负数表示在扩容或者是树节点。 9 int hash = spread(key.hashCode()); 10 // CAS经典写法,不成功无限重试。 11 for (Node<K,V>[] tab = table;;) { 12 Node<K,V> f; int n, i, fh; 13 // 如果集合是null,或者集合长度是0,或者指定位置上的元素是null。 14 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) 15 // 跳出循环。 16 break; 17 // 如果获取到节点不为null,并且节点的hash为-1,则表示节点在扩容。 18 else if ((fh = f.hash) == MOVED) 19 // 帮助扩容。 20 tab = helpTransfer(tab, f); 21 // 产生hash碰撞,并且没有扩容操作。 22 else { 23 V oldVal = null; 24 // 是否进入了同步代码。 25 boolean validated = false; 26 // 锁住节点。 27 synchronized (f) { 28 // 这里volatile获取首节点与节点对比判断节点还是不是首节点。 29 if (tabAt(tab, i) == f) { 30 // 判断是否是链表节点。 31 if (fh >= 0) { 32 validated = true; 33 // 循环查找指定元素。 34 for (Node<K,V> e = f, pred = null;;) { 35 K ek; 36 // 找到元素了。 37 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { 38 V ev = e.val; 39 // 如果cv为null,或者cv不为null时cv和指定元素上的值相同,才更新或者删除节点。 40 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { 41 oldVal = ev; 42 // 如果新值不为null,替换。 43 if (value != null) 44 e.val = value; 45 // 如果新值是null,并且当前节点非首结点,删除。 46 else if (pred != null) 47 pred.next = e.next; 48 // 如果新值是null,并且当前节点是首结点,删除。 49 else 50 setTabAt(tab, i, e.next); 51 } 52 break; 53 } 54 pred = e; 55 // 如果遍历集合也没有找到。 56 if ((e = e.next) == null) 57 // 跳出循环。 58 break; 59 } 60 } 61 // 如果是红黑树节点。 62 else if (f instanceof TreeBin) { 63 validated = true; 64 TreeBin<K,V> t = (TreeBin<K,V>)f; 65 TreeNode<K,V> r, p; 66 // 找到元素了。 67 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { 68 V pv = p.val; 69 // 如果cv为null,或者cv不为null时cv和指定元素上的值相同,才更新或者删除节点。 70 if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { 71 oldVal = pv; 72 if (value != null) 73 p.val = value; 74 else if (t.removeTreeNode(p)) 75 setTabAt(tab, i, untreeify(t.first)); 76 } 77 } 78 } 79 } 80 } 81 // 如果进入了同步代码。 82 if (validated) { 83 // 如果更新或者删除了节点。 84 if (oldVal != null) { 85 // 如果value为null,说明是删除操作。 86 if (value == null) 87 // 将数组长度减一。 88 addCount(-1L, -1); 89 return oldVal; 90 } 91 break; 92 } 93 } 94 } 95 return null; 96 }
计算集合容量
ConcurrentHashMap中baseCount用于保存tab中元素总数,但是并不准确,因为多线程同时增删改,会导致baseCount修改失败,此时会将元素变动存储于counterCells数组内。
当需要统计当前的size的时候,除了要统计baseCount之外,还需要统计counterCells中的元素变化。
值得一提的是即使如此,统计出来的依旧不是当前tab中元素的准确值,在多线程环境下统计前后并不能暂停线程操作,因此无法保证准确性。
1 // 计算集合容量。 2 public int size() { 3 long n = sumCount(); 4 return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); 5 } 6 7 // 计算集合容量,baseCount和counterCells数组存的总和。 8 final long sumCount() { 9 CounterCell[] as = counterCells; CounterCell a; 10 long sum = baseCount; 11 if (as != null) { 12 for (int i = 0; i < as.length; ++i) { 13 if ((a = as[i]) != null) 14 sum += a.value; 15 } 16 } 17 return sum; 18 }
HashMap,Hashtable,ConcurrentHashMap之间的关联
HashMap是非线程安全的哈希表,常用于单线程程序中。
Hashtable是线程安全的哈希表,它是通过synchronized来保证线程安全的;即,多线程通过同一个“对象的同步锁”来实现并发控制。Hashtable在线程竞争激烈时,效率比较低(此时建议使用ConcurrentHashMap)。因为当一个线程访问Hashtable的同步方法时,其它线程在访问Hashtable的同步方法时,可能会进入阻塞状态。
ConcurrentHashMap是线程安全的哈希表,它是通过“锁分段”来保证线程安全的。ConcurrentHashMap将哈希表分成许多片段(Segment),每一个片段除了保存哈希表之外,本质上也是一个“可重入的互斥锁”(ReentrantLock)。多线程对同一个片段的访问,是互斥的;但是,对于不同片段的访问,却是可以同步进行的。