Jdk1.6 JUC源码解析(25)-ConcurrentHashMap
作者:大飞
- ConcurrentHashMap是一种线程安全的HashMap。相对于HashTable和Collections.synchronizedMap(),ConcurrentHashMap具有更好的性能和伸缩性,是由于其使用了分段锁的策略,将内部数据分为多个段,每个段单独加锁,而不是整个HashMap加锁,这样能减少很多不必要的锁争用。
- ConcurrentHashMap实现了ConcurrentMap接口,先简单了解下这个接口:
public interface ConcurrentMap<K, V> extends Map<K, V> { /** * 如果map中已经存在给定的key,返回map中key对应的value; * 如果不存在给定的key,插入给定的key和value。 * 这个是一个原子操作,逻辑相当于: * if (!map.containsKey(key)) * return map.put(key, value); * else * return map.get(key); */ V putIfAbsent(K key, V value); /** * 如果map中存在给定的key,并且map中对应的value也等于给定的value, * 那么删除这个key和value。 * 这是一个原子操作,逻辑相当于: * if (map.containsKey(key) && map.get(key).equals(value)) { * map.remove(key); * return true; * } else return false; */ boolean remove(Object key, Object value); /** * 如果map中存在给定的key,并且map中对应的value也等于给定的oldValue, * 那么将这个key对应的value替换成newValue。 * 这是一个原子操作,逻辑相当于: * if (map.containsKey(key) && map.get(key).equals(oldValue)) { * map.put(key, newValue); * return true; * } else return false; */ boolean replace(K key, V oldValue, V newValue); /** * 如果map中已经存在给定的key, * 那么将这个key对应的value替换成给定的value。 * 这是一个原子操作,逻辑相当于: * if (map.containsKey(key)) { * return map.put(key, value); * } else return null; */ V replace(K key, V value); }ConcurrentMap扩展了Map接口,定义了上面4个原子操作方法。
- 接下来看下ConcurrentHashMap的内部结构:
/** * segment数组, 每一个segment都是一个hash table。 */ final Segment<K,V>[] segments;重点看下segment的实现吧,首先看数据结构:
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; /** * 记录segment(哈希表)中的元素数量。 * 另一个重要角色就是其他操作会利用count的volatile读写来保证可见性,避免使用锁。 */ transient volatile int count; /** * 统计跟踪修改,用来保证一些批量操作的一致性。 * 比如统计所有segment元素个数时,如果统计过程发现modCount变化 * 那么需要重试。 */ transient int modCount; /** * 当哈希表的容量超过了这个阀值,表会扩容,里面的元素会重新散列。 * 这个值一般是:capacity * loadFactor */ transient int threshold; /** * 存放数组的哈希表。 */ transient volatile HashEntry<K,V>[] table; /** * 哈希表的加载因子。 * @serial */ final float loadFactor;
再看下segment的构造方法:
Segment(int initialCapacity, float lf) { loadFactor = lf; setTable(HashEntry.<K,V>newArray(initialCapacity)); } void setTable(HashEntry<K,V>[] newTable) { threshold = (int)(newTable.length * loadFactor); table = newTable; } static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V>[] newArray(int i) { return new HashEntry[i]; } }
/* ---------------- Constants -------------- */ //默认segment中hashTable长度。 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认加载因子。 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认table的并发级别,其实就是segment数组长度。 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //table的最大容量。 static final int MAXIMUM_CAPACITY = 1 << 30; //允许的最大的segment数组长度。 static final int MAX_SEGMENTS = 1 << 16; /** * 在size和containsValue方法中,加锁之前的尝试操作次数。 */ static final int RETRIES_BEFORE_LOCK = 2; /* ---------------- Fields -------------- */ /** * 计算segment下标的掩码。一个key的hash code高位(由segmentShift确定)用来确定segment下标。 */ final int segmentMask; /** * segment下标的位移值。 */ final int segmentShift; public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; //concurrencyLevel不能超过最大值 // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; //ssize最后是比concurrencyLevel大的最小的2的幂。 } /* * 假设传入的concurrencyLevel是50, * 那么ssize就是64,sshift就是6,segmentMask就是 00000000 00000000 00000000 00111111*/ segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; //cap其实就是比总体容量平均分到每个segment的数量大的最小的2的幂...有点绕, for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); //把segment都初始化一下。 } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }可以看到构造方法中计算了几个重要的数:segment掩码、segment位移值、segment数组长度和segment内部哈希表容量,注意后两个都是2的幂,想到了什么? a & (b - 1)吧哈哈。
- 现在从插入和获取操作切入,来理解源码。先看下插入操作:
public V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); return segmentFor(hash).put(key, hash, value, false); }
注意到,put过程中,首先要根据key的hashCode,再次算一个hash值出来;其次是要根据这个hash值来确定一个segment,然后把key-value存到这个segment里面。
/** * Applies a supplemental hash function to a given hashCode, which * defends against poor quality hash functions. This is critical * because ConcurrentHashMap uses power-of-two length hash tables, * that otherwise encounter collisions for hashCodes that do not * differ in lower or upper bits. */ private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } /** * Returns the segment that should be used for key with given hash * @param hash the hash code for the key * @return the segment */ final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
首先看下这个hash算法,它相当于在key本身的hashCode上做了加强,再次hash一次,使得hash值更加散列。这样做的原因是因为ConcurrentHashMap中哈希表的长度都是2的幂,会增加一些冲突几率,比如两个hashCode高位不同但低位相同,对哈希表长度取模时正好忽略了这些高位,造成冲突。这里是采用了Wang/Jenkins哈希算法的一个变种,更多相关信息可以google之。
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock();// 加锁 try { int c = count; if (c++ > threshold) // ensure capacity rehash(); // 如果添加一个元素后,超过扩容阀值,那么进行rehash。 HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); // 对hash取模算出key对应的哈希表的桶的下标。 HashEntry<K,V> first = tab[index]; // 找出桶的第一个节点 HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; // 遍历一下桶里的单链表,看看有没有相同的。 V oldValue; if (e != null) { // 如果找到了相同的,记录旧值 oldValue = e.value; if (!onlyIfAbsent) // 并且有覆盖标识 e.value = value; // 那么覆盖这个值 } else { // 如果没找到相同的。 oldValue = null; ++modCount; // 因为会改变哈希表元素个数,所以modCount累加。 // 将元素设置为桶内新的第一个节点。 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 注意这里做了一个volatile写。 } return oldValue; // 返回旧值。 } finally { unlock(); // 解锁。 } }代码也比较容易理解,注意有rehash的情况,看下rehash方法:
void rehash() { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; //不能超过最大容量。 /* * 将哈希表中所有桶里的节点重新分配到新哈希表中。 * 由于使用的容量是2的幂,所有一部分节点会分配到新哈希表中相同 * 下标的桶里,这样我们就可以重用这些节点,而无需重新创建。 * 按照统计数据,在默认的加载因子下,大约只有六分之一的节点在 * 哈希表扩容的时候需要拷贝(重新创建对象)。 */ HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1); threshold = (int)(newTable.length * loadFactor); int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) newTable[idx] = e; // 链表上唯一的节点,直接复制到新table。 else { // 重用从尾部往前能定位到新table中同一个桶的,最长的连续节点。 HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 其他的节点就copy过去了。 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(p.key, p.hash, n, p.value); } } } } table = newTable; }
put的实现看完了,继续看下从ConcurrentHashMap中get的实现:
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }
流程还是先算hash值,然后确定到segment,然后调用segment的get方法:
V get(Object key, int hash) { if (count != 0) { // 这里做一个volatile读 HashEntry<K,V> e = getFirst(hash); //获取相应桶的第一个节点。 while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; //如果是相同的key,返回value。 return readValueUnderLock(e); // recheck } e = e.next; } } return null; } HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; } /** * 加锁读value。如果value为nul的情况下调用这个方法。 * 只有在编译器将HashEntry的初始化和其赋值给table的指令重排才会 * 出现这种情况,这在内存模型下是合法的,但从没发生过。 */ V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
- 理解了put和get过程,其他方法也很好理解了:
boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) return true; e = e.next; } } return false; }
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
我们会发现所有的写操作最后都会写一下count,而且所有的读操作最前面都会读一下count,由于count是volatile修饰的,所以这样相当于加了内存屏障(volatile写和后面的volatile读不能重排),保证了读操作能够看到最新的写的变化。
以上理解有偏差,感谢@不待人亲指正。
仔细看源码注释发现:
All (synchronized) write operations should write to the "count" field after structurally changing any bin.
也就是说只有bin(HashEntry链)的结构变化之后才会写count(覆盖的情况不会写count)。
所以这里纠正一下:所有改变bin结构的写操作都会写一下count,可以保证HashEntry的可见性(因为无论是添加还是删除,bin起始的HashEntry都会发生变化,由于HashEntry的next域是不变的,所以删除时需要将目标HashEntry之前的Entry都拷贝一下)。
而覆盖旧值的情况下不会写count,因为HashEntry的value本身也是volatile的,可以保证自身的可见性。
- 我们上面还看到了有一个RETRIES_BEFORE_LOCK值,看看这个值起什么作用:
public int size() { final Segment<K,V>[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { // Resort to locking all segments sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int)sum; }
上面size的过程就是,累加所有segment中的count,如果过程中segment中元素数量发生了变化,那么重试。如果重试了RETRIES_BEFORE_LOCK次(默认是2)都不行,那么将所有segment加锁,然后累加count,然后再解锁。在containsValue里面也是这么玩儿的,代码就不贴了。
- 其他代码也很容易看懂了,就分析到这里。最后注意下,ConcurrentHashMap也提供了Key和Value的集合视图,它们和ConcurrentHashMap共享一份数据,它们的迭代器是弱一致的。