##1、CocurrentHashMap概念
CocurrentHashMap是jdk中的容器,是hashmap的一个提升,结构图:
这里对比在对比hashmap的结构:
可以看出CocurrentHashMap对比HashMap在HashEnty前面加了Segment段,因为HashMap不是线程安全的,并且在多线程同时写入的情况下会导致死循环,所以先多线程的环境下,一般不实用HashMap,而使用CocurrentHashMap,CocurrentHashMap通过分段锁的机制,实现了多线程写入时的线程安全,也提高了多线程情况下的访问效率。
##2、通过源码分析CocurrentHashMap的实现
首先看构造函数:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
HashMap 的构造方法中有两个参数,而ConcurrentHashMap这里多了一个参数,三个参数分别是initialCapacity初始容量,loadFactor加载因子,concurrencyLevel并行度。而函数里面也有几个重要的参数,sshift,ssize ,segmentShift ,segmentMask ,cap。构造方法中首先判断参数的有效性,然后通过并行度计算segment段数组的大小ssize及sshift,段数组大小是2的幂次方,这里2的sshift等于ssize,同HashMap中数组的大小一样,在我的另一篇关于Hashmap的博客中有详细的解释。确定了segment段数组的大小后再确定HashEntry数组的大小cap,cap也是2的幂次方且capssize是不小于initialCapacity的2的幂次方的数,最后先创建一个段,然后将这个段放入段数组中创建出整个ConcurrentHashMap。从这里就可以看出,原来的HashMap是HashEntry的大小就是capssize,但没有分段,ConcurrentHashMap经过分段后容量也是cap*ssize,只是多了ssize个段,然后每个段里面HashEntry的大小是cap。然后看Segment段的构造方法:
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
可以看到这里面有三个参数,lf,threshold,tab,分别是加载因子,域和HashEntry表,可以看出其实每个segment里面相当于就是一个小型的HashMap结构的容器。接下来看put操作:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
这个是ConcurrentHashMap对应的put操作,里面首先检查value是否为空,然后对key执行一次hash方法,利用hash值的高几位对segment段的长度减1取模得到segment段的索引,然后获取到对应的segment元素,最后执行segment的put方法。接着看segment的put方法。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
在这个方法中,首先尝试获取这个段的锁,获取到锁后然后利用上面获取到的hash值继续与段里面HashEntry长度减1相与得到HashEntry数组的索引值,接着获取到对应的HashEntry元素,如果这个HashEntry节点不为空,那么就找是否有键值相同的节点,如果有那么就替换,如果没有或者这个HashEntry节点为空,那么新建一个HashEntry节点,然后判断HashEntry数组不为空节点是否超过阈值,超过了就扩容这个段里面的hash表,没有的话就将这个节点查到对应HashEntry链表的头部。最后再释放锁。
然后来看get操作:
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在get操作中,首先实现了也是获取hash值,然后找到对应的段,然后通过这个hash值再找到对应的HashEntry节点,在遍历这个链表看能不能找到键值相同的节点,有的话就返回节点的值,没有的话就返回空,注意get操作是没有加锁的,但是通过getObjectVolatile可以获取到已经更新的节点的最新的值。
最后再来看size操作:
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
size操作返回容器中保存的键值对的个数,在size操作中首先获取到所有段的锁,然后通过循环获取到每个每个段中键值对数量的累积值,最后释放锁。