Map
Map中的集合,元素是成对存在的。每个元素由键与值两部分组成,通过键可以找到所对应的值。Map中常用的集合为HashMap集合、LinkedHashMap集合。
Map接口规定了Map的一些通用方法和默认方法实现。Map.Entry是Map声明的一个内部接口,此接口为泛型,定义为Entry<K,V>。它表示Map中的一个实体(一个key-value对)。接口中有getKey(),getValue方法。
由于Map没有继承Iterable接口,不能直接通过map.iterator来遍历(list,set实现了这个接口,可以直接这样遍历),所以就只能先转化为set类型,用entrySet()方法,其中set中的每一个元素值就是map中的一个键值对,也就是Map.Entry<K,V>,然后就可以遍历了。
public interface Map<K,V> {
int size();
boolean isEmpty();
boolean containsKey(Object key);
boolean containsValue(Object value);
V get(Object key);
V put(K key, V value);
V remove(Object key);
void putAll(Map<? extends K, ? extends V> m);
void clear();
Set<K> keySet();
Collection<V> values();
Set<Map.Entry<K, V>> entrySet();
interface Entry<K,V> {
K getKey();
V getValue();
V setValue(V value);
boolean equals(Object o);
int hashCode();
public static <K extends Comparable<? super K>, V> Comparator<Map.Entry<K,V>> comparingByKey() {
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> c1.getKey().compareTo(c2.getKey());
}
public static <K, V extends Comparable<? super V>> Comparator<Map.Entry<K,V>> comparingByValue() {
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> c1.getValue().compareTo(c2.getValue());
}
public static <K, V> Comparator<Map.Entry<K, V>> comparingByKey(Comparator<? super K> cmp) {
Objects.requireNonNull(cmp);
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> cmp.compare(c1.getKey(), c2.getKey());
}
public static <K, V> Comparator<Map.Entry<K, V>> comparingByValue(Comparator<? super V> cmp) {
Objects.requireNonNull(cmp);
return (Comparator<Map.Entry<K, V>> & Serializable)
(c1, c2) -> cmp.compare(c1.getValue(), c2.getValue());
}
}
boolean equals(Object o);
int hashCode();
default V getOrDefault(Object key, V defaultValue) {
V v;
return (((v = get(key)) != null) || containsKey(key))
? v : defaultValue;
}
default void forEach(BiConsumer<? super K, ? super V> action) {
Objects.requireNonNull(action);
for (Map.Entry<K, V> entry : entrySet()) {
K k;
V v;
try {
k = entry.getKey();
v = entry.getValue();
} catch(IllegalStateException ise) {
// this usually means the entry is no longer in the map.
throw new ConcurrentModificationException(ise);
}
action.accept(k, v);
}
}
default void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
Objects.requireNonNull(function);
for (Map.Entry<K, V> entry : entrySet()) {
K k;
V v;
try {
k = entry.getKey();
v = entry.getValue();
} catch(IllegalStateException ise) {
// this usually means the entry is no longer in the map.
throw new ConcurrentModificationException(ise);
}
// ise thrown from function is not a cme.
v = function.apply(k, v);
try {
entry.setValue(v);
} catch(IllegalStateException ise) {
// this usually means the entry is no longer in the map.
throw new ConcurrentModificationException(ise);
}
}
}
default V putIfAbsent(K key, V value) {
V v = get(key);
if (v == null) {
v = put(key, value);
}
return v;
}
default boolean remove(Object key, Object value) {
Object curValue = get(key);
if (!Objects.equals(curValue, value) ||
(curValue == null && !containsKey(key))) {
return false;
}
remove(key);
return true;
}
default boolean replace(K key, V oldValue, V newValue) {
Object curValue = get(key);
if (!Objects.equals(curValue, oldValue) ||
(curValue == null && !containsKey(key))) {
return false;
}
put(key, newValue);
return true;
}
default V replace(K key, V value) {
V curValue;
if (((curValue = get(key)) != null) || containsKey(key)) {
curValue = put(key, value);
}
return curValue;
}
default V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
Objects.requireNonNull(mappingFunction);
V v;
if ((v = get(key)) == null) {
V newValue;
if ((newValue = mappingFunction.apply(key)) != null) {
put(key, newValue);
return newValue;
}
}
return v;
}
default V computeIfPresent(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
V oldValue;
if ((oldValue = get(key)) != null) {
V newValue = remappingFunction.apply(key, oldValue);
if (newValue != null) {
put(key, newValue);
return newValue;
} else {
remove(key);
return null;
}
} else {
return null;
}
}
default V compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
V oldValue = get(key);
V newValue = remappingFunction.apply(key, oldValue);
if (newValue == null) {
// delete mapping
if (oldValue != null || containsKey(key)) {
// something to remove
remove(key);
return null;
} else {
// nothing to do. Leave things as they were.
return null;
}
} else {
// add or replace old mapping
put(key, newValue);
return newValue;
}
}
default V merge(K key, V value,
BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
Objects.requireNonNull(value);
V oldValue = get(key);
V newValue = (oldValue == null) ? value :
remappingFunction.apply(oldValue, value);
if(newValue == null) {
remove(key);
} else {
put(key, newValue);
}
return newValue;
}
}
HashMap
1、存储结构
public class HashMap<K,V> extends AbstractMap<K,V>
implements Map<K,V>, Cloneable, Serializable
HashMap内部包含了一个 Node类型的数组 table。Node存储着键值对。它包含了四个字段,从 next 字段我们可以看出 Node是一个链表。即数组中的每个位置被当成一个桶,一个桶存放一个链表。HashMap 使用拉链法来解决冲突,同一个链表中存放哈希值(键哈希值异或值哈希值之后的值作为Node的哈希值)和散列桶取模运算结果相同的 Node。当链表长度大于8,会转换为红黑树(1.8)。
transient Node<K,V>[] table;
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;
public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(value);
}
public final boolean equals(Object o) {
if (o == this)
return true;
if (o instanceof Map.Entry) {
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
if (Objects.equals(key, e.getKey()) &&
Objects.equals(value, e.getValue()))
return true;
}
return false;
}
}
2、get
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
//首先定位所在的桶:(n-1)&hash
//如果表不是空的,并且要查找索引处有值,就判断位于第一个的key是否是要查找的key
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
//判断链表是否是红黑树,如果是,就从树中取值
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
//不是红黑树,遍历链表
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
由源码可知,hashMap是通过key的hashCode和低16位异或后和桶的数量取模得到索引位置,即key.hashcode()^(hashcode>>>16)%length
。当length是2^n时,h&(length-1)运算等价于h%length,而&操作比%效率更高。而采用高16位和低16位进行异或,也可以让所有的位数都参与越算,使得在length比较小的时候也可以做到尽量的散列。(由于和(length-1)运算,length 绝大多数情况小于2的16次方。所以始终是hashcode 的低16位(甚至更低)参与运算。要是高16位也参与运算,会让得到的下标更加散列。)
为什么是2^n?
1、当数组长度为2的n次幂的时候,不同的key算得得index相同的几率较小,数据在数组上分布就比较均匀,也就是说碰撞的几率小。相对的,查询的时候就不用遍历某个位置上的链表,这样查询效率也就较高了。 例如,如果数组长度为15的时候,hashcode的值会与14(1110)进行“与”,那么最后一位永远是0,而0001,0011,0101,1001,1011,0111,1101这几个位置永远都不能存放元素了,空间浪费相当大,更糟的是这种情况中,数组可以使用的位置比数组长度小了很多,这意味着进一步增加了碰撞的几率,减慢了查询的效率!
2、在扩容的时候,如果length每次是2^n,那么重新计算出来的索引只有两种情况,一种是 old索引+table.length,另一种是索引不变,所以就不需要每次都重新计算索引,进而可以多线程扩容(ConcurrentHashMap)。
为什么?
(a):扩容前;(b):扩容后
因此,我们在扩充HashMap的时候,不需要像1.7重新计算hash,只需要看看原来的hash值新增的那个bit是1还是0就好了,是0的话索引没变,是1的话索引变成“原索引+oldCap”
3、当length是2^n时,h&(length-1)运算等价于h%length,而&操作比%效率更高。
3、 put 操作
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
//判断table是否为空,如果是空的就创建一个table,并获取它的长度
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
//如果计算出来的索引位置之前没有放过数据,就直接放入
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
//判断put的数据和之前的数据是否重复,如果重复,覆盖value
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
//如果是红黑树就直接插入树中
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
//如果是链表,就遍历每个节点,判断链表长度是否大于8,如果大于就转换为红黑树
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
//如果e不是null,说明没有迭代到最后就跳出了循环,说明链表中有相同的key,因此只需要将value覆盖,并将oldValue返回即可
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
1.8中插入数据使用尾插法,而1.7使用的是头插法
头插法:考虑到热点数据,即最近插入的元素也很可能最近会被使用到。所以为了缩短链表查找元素的时间,每次都会将新插入的元素放到表头。
但是头插法,在并发扩容时可能导致死循环。—见扩容部分。
4、扩容
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
// 如果原来的table有数据,则将数据复制到新的table中
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
// 获取数组的第j个元素
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else {
//进行链表复制。
//方法比较特殊:没有重新计算元素在数组中的位置,而是采用了原始位置加原数组长度的方法计算得到位置
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
//(e.hash & oldCap) 用来确定元素的在数组中的位置是否需要移动
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
1.7中,扩容时,由于采用头插法,扩容后链表会反转。
线程A交出时间片,线程B这时候接手转移并且完成了元素的转移,这个时候线程A又拿到时间片并接着执行:
5、与 Hashtable 的比较
- Hashtable 使用 synchronized 来进行同步。
- HashMap 可以插入键为 null 的 Entry。
- HashMap 的迭代器是 fail-fast 迭代器。
- HashMap 不能保证随着时间的推移 Map 中的元素次序是不变的。
ConcurrentHashMap
1、存储结构
使用CAS+Synchronized
,对每个数组元素加锁。
使用了 CAS
操作来支持更高的并发度,在 CAS
操作失败时使用内置锁 synchronized。
其中的 val、next
都用了 volatile
修饰,保证了可见性。
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
//为什么阈值选择8?当hashCode离散性很好的时候,树型bin用到的概率非常小,因为数据均匀分布在每个bin中,几乎不会有bin中链表长度会达到阈值。但是在随机hashCode下,离散性可能会变差,然而JDK又不能阻止用户实现这种不好的hash算法,因此就可能导致不均匀的数据分布。不过理想情况下随机hashCode算法下所有bin中节点的分布频率会遵循泊松分布,一个bin中链表长度达到8个元素的概率为0.00000006,几乎是不可能事件。选择8是根据概率统计决定的。
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
// 当table数组的长度小于此值时,不会把链表转化为红黑树。
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
5private static final float LOAD_FACTOR = 0.75f;
// 扩容操作中,transfer允许多线程,这个常量表示一个线程执行transfer时,最少要对连续的16个hash桶进行transfer,不足16就按16算
// 也就是单线程执行transfer时的最小任务量,单位为一个hash桶,这就是线程的transfer的步进(stride)
private static final int MIN_TRANSFER_STRIDE = 16;
// 用于生成每次扩容都唯一的生成戳的数。
private static int RESIZE_STAMP_BITS = 16;
// 最大的扩容线程的数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数,相反方向移位后能够反解出生成戳
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 下面几个是特殊的节点的hash值,正常节点的hash值在hash函数中都处理过了,不会出现负数的情况,特殊节点在各自的实现类中有特殊的遍历方法
// ForwardingNode的hash值,ForwardingNode是一种临时节点,在扩容进行中才会出现,并且它不存储实际的数据
// 如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode
// 读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容
static final int MOVED = -1;
// TreeBin的hash值,TreeBin是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点
// 因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,
// 所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因
static final int TREEBIN = -2;
// ReservationNode的hash值,ReservationNode是一个保留节点,就是个占位符,不会保存实际的数据,正常情况是不会出现的
static final int RESERVED = -3;
// 用于和负数hash值进行 & 运算,将其转化为正数,Hashtable中定位hash桶也使用这种方式来进行负数转正数
static final int HASH_BITS = 0x7fffffff;
transient volatile Node<K,V>[] table;
// 扩容后的新的table数组,只有在扩容时才有用
// nextTable != null,说明扩容方法还没有真正退出,一般可以认为是此时还有线程正在进行扩容
private transient volatile Node<K,V>[] nextTable;
//控制table的初始化和扩容
//值为负时,表示正在进行初始化或扩容:
// sizeCtl = -1,表示有线程正在进行真正的初始化操作
// sizeCtl = -(1 + nThreads),表示有nThreads个线程正在进行扩容操作
// sizeCtl > 0,表示接下来的真正的初始化操作中使用的容量,或者初始化/扩容完成后的threshold
// sizeCtl = 0,默认值,此时在真正的初始化操作中使用默认容量
private transient volatile int sizeCtl;
// 下一个transfer任务的起始下标index 加上1 之后的值,transfer时下标index从length - 1开始往0走
// transfer时方向是倒过来的,迭代时是下标从小往大,二者方向相反,尽量减少扩容时transefer和迭代两者同时处理一个hash桶的情况
// 顺序相反时,二者相遇过后,迭代没处理的都是已经transfer的hash桶,transfer没处理的,都是已经迭代的hash桶,冲突会变少
// 下标在[nextIndex - 实际的stride (下界要 >= 0), nextIndex - 1]内的hash桶,就是每个transfer的任务区间
// 每次接受一个transfer任务,都要CAS执行 transferIndex = transferIndex - 实际的stride
// 保证一个transfer任务不会被几个线程同时获取(相当于任务队列的size减1)
// 当没有线程正在执行transfer任务时,一定有transferIndex <= 0,这是判断是否需要帮助扩容的重要条件(相当于任务队列为空)
private transient volatile int transferIndex;
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
}
//ConcurrentHashMap对此节点的操作,都会由TreeBin来代理执行
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
}
//ForwardingNode是一种临时节点,在扩容进行中才会出现,hash值固定为-1,并且它不存储实际的数据数据。
//如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。
//读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,
//写操作碰见它时,则尝试帮助扩容。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
//TreeBin的hash值固定为-2,它是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。
//因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 红黑树的读、写锁状态是互斥的,指的是以红黑树方式进行的读操作和写操作(只有部分的put/remove需要加写锁)是互斥的
// 但是当有线程持有红黑树的 写锁 时,读线程不会以红黑树方式进行读取操作,而是使用简单的链表方式进行读取,此时读操作和写操作可以并发执行
// 当有线程持有红黑树的 读锁 时,写线程可能会阻塞,不过因为红黑树的查找很快,写线程阻塞的时间很短
// 另外一点,ConcurrentHashMap的put/remove/replace方法本身就会锁住TreeBin节点,这里不会出现写-写竞争的情况,因此这里的读写锁可以实现得很简单
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
// 在hashCode相等并且不是Comparable类时才使用此方法进行判断大小
static int tieBreakOrder(Object a, Object b) {
int d;
if (a == null || b == null ||
(d = a.getClass().getName().compareTo(b.getClass().getName())) == 0)
d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
-1 : 1);
return d;
}
// 对根节点加写锁,红黑树重构时需要加上 写锁
private final void lockRoot() {
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
contendedLock(); // offload to separate method
}
//释放写锁
private final void unlockRoot() {
lockState = 0;
}
// 从根节点开始遍历查找,找到相等的节点就返回它,没找到就返回null
// 当有写线程加上写锁时,使用链表方式进行查找
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
// 两种特殊情况下以链表的方式进行查找
// 1、有线程正持有 写锁,这样做能够不阻塞读线程
// 2、WAITER时,不再继续加 读锁,能够让已经被阻塞的写线程尽快恢复运行,或者刚好让某个写线程不被阻塞
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
}
2、一些基本操作
// 下面几个用于读写table数组,使用Unsafe提供的更强的功能(数组元素的volatile读写,CAS 更新)代替普通的读写,调用者预先进行参数控制
// volatile读取table[i]
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// CAS更新table[i],也就是Node链表的头节点,或者TreeBin节点(它持有红黑树的根节点)
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// volatile写入table[i]
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
// 满足变换为红黑树的两个条件时(链表长度这个条件调用者保证,这里只验证Map容量这个条件),将链表变为红黑树,否则只是进行一次扩容操作
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// Map的容量不够时,只是进行一次扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
// 规模不足时把红黑树转化为链表,此方法由调用者进行synchronized加锁,所以这里不加锁
static <K,V> Node<K,V> untreeify(Node<K,V> b) {
Node<K,V> hd = null, tl = null;
for (Node<K,V> q = b; q != null; q = q.next) {
Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
if (tl == null)
hd = p;
else
tl.next = p;
tl = p;
}
return hd;
}
3、put方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//1、根据 key 计算出 hashcode
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//2、判断是否需要进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//3、f即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋保证成功。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
//4、如果当前位置的hashcode == MOVED == -1,则需要进行扩容。如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时写线程会帮助扩容;如果扩容没有完成,当前链表的头节点会被锁住,所以写线程会被阻塞,直到扩容完成。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
/*
*如果在这个位置有元素的话,就采用synchronized的方式加锁
* 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历
* 如果找到了key和key的hash值都一样的节点,则把它的值替换
* 如果没找到的话,则添加在链表的最后面
* 否则,是树的话,则调用putTreeVal方法添加到树中去
* 在添加完之后,会对该节点上关联的的数目进行判断,
* 如果在8个以上的话则会调用treeifyBin方法,来尝试转化为树,或者是扩容
*/
V oldVal = null;
//5、如果都不满足,则利用synchronized锁写入数据。
synchronized (f) {
if (tabAt(tab, i) == f) {
//6、当前为链表,在链表中插入新的键值对
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//7、当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//8、插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
4、get()方法
这里可以看到get方法是没有加锁的。Node中的val和next定义的时候用了volatile来保证可见性和有序性。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//1、计算hashcode
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素
//2、如果该节点就是首节点则返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//3、hash为负值代表正在扩容,如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时get线程会帮助扩容。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//4、既不是首节点也不是ForwardingNode,继续往下遍历
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
5、扩容
一般我们说的扩容,都包含两个步骤。
-
新建一个2倍大小的数组,这个过程要求单线程完成(多线程创建几个数组没有意义,容易出错)。
-
执行节点迁移,相当于把旧数组中所有的节点重新put到新数组中。在1.8的HashMap中,用了一个技巧,避免了重新根据hash值定位(见上)。根据这个,我们可以知道,进行 n -> 2n 的扩容时,扩容前节点所在的hash桶的索引为index,这个节点迁移到新数组中只会有两种情况:要么在还是在新数组的索引为index处,要么迁移到新数组的索引为 index + n 的地方。所以旧的table数组上各个hash桶中的节点的迁移是不会互相影响的,这一点对多线程扩容非常有利。根据这一点,可以知道,每个hash桶的迁移都可以作为一个线程在扩容时的一个transfer任务。
-
另外,每个线程扩容任务都不应该规模太小(最小16个桶),因为扩容并不是IO型操作,节点迁移的执行速度本身很快,太多的线程来执行节点迁移,线程调度开销占比变大,反而降低了吞吐量。
ConcurrentHashMap这里,会根据CPU的核心数目,来算出一个transfer任务包含的hash桶的数量。
在扩容时,ConcurrentHashMap支持多线程并发扩容,在扩容过程中同时支持get查数据,若有线程put数据,还会帮助一起扩容,这种无阻塞算法,将并行最大化的设计。
/**假设一个线程A正在复制节点1的数据,到table中,那么此时,它把节点1给锁住了,并且标识为ForwardingNode,也就是hash值为MOVED。
由于没有对整个ConcurrentHashMap加锁,当线程2也能访问这个table,来了后它也尝试扩容,当进行第一个时候,诶,发现有人在扩容,我得先帮帮它。
但是节点1有人在扩,进不去,那我就去节点2扩容,如果此时节点2也有人占了,那么就节点3依次往后。
由于每次获取获取某一个节点f只能为一个线程,所以最终保证了线程的安全,并且能让其他线程来辅助扩容。
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 计数本次扩容的生成戳
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
/**检查nextTable是否为null,如果是,则初始化nextTable,容量为table的两倍。
*自旋遍历节点,直到finished:节点从table复制到nextTable中,支持并发,思路如下:
*如果节点 f 为null,则插入ForwardingNode(采用Unsafe.compareAndSwapObjectf方法实现),从而别的线程不会走到这个节点。
*如果f为链表的头节点(fh >= 0),则先构造两个链表,通过hash值第n位不同区分(ph & n) == 0 ,然后把他们分*别放在nextTable的i和i + n位置,并将ForwardingNode 插入原节点位置,代表已经处理过了
*如果f为TreeBin节点,同样也是构造两课树,同时需要判断是否需要进行unTreeify()操作,并把处理的结果分别插入到nextTable的i 和i+nw位置,并插入ForwardingNode 节点
*所有节点复制完成后,则将table指向nextTable,同时更新sizeCtl = nextTable的0.75倍,完成扩容过程
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个transfer任务中要负责迁移多少个hash桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//构造一个nextTable对象 它的容量是原来的两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 转发节点,在旧数组的一个hash桶中所有节点都被迁移完后,放置在这个hash桶中,表明已经迁移完,对它的读操作会转发到新数组
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
// 扩容中收尾的线程把做个值设置为true,进行本轮扩容的收尾工作(两件事,重新检查一次所有hash桶,给属性赋新值)
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n;
}
}
//如果遍历到的节点为空 则放入ForwardingNode指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过,所以这就是能够并发的扩容!
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {// 判断下加锁的节点是hash桶中的第一个节点,加锁的是第一个节点才算加锁成功
Node<K,V> ln, hn;
if (fh >= 0) {
//因为n的值为数组的长度,且是power(2,x),所以,在&操作的结果只可能是0或者n.
//根据这个规则,0--> 放在新表的相同位置,n--> 放在新表的(n+原来位置)
int runBit = fh & n;
// lastRun 表示的是需要复制的最后一个节点
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 构造两个链表,顺序大部分和原来是反的
//分别放到原来的位置和新增加的长度的相同位置(i/n+i)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i + n, hn);//在nextTable的i+n的位置上插入另一个链表
setTabAt(tab, i, fwd);//在table的i位置上插入forwardNode节点 表示已经处理过该节点
advance = true;//设置advance为true 返回到上面的while循环中 就可以执行i--操作
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
6、总结
ConcurrentHashMap
是HashMap的线程安全版本。ConcurrentHashMap
底层数据结构为数组+链表+红黑树,默认容量为16,不允许[key,value]为null。ConcurrentHashMap
内部采用的并发机制有synchronized、自旋锁、volatile。- 通过sizeCtl变量来控制扩容、初始化等操作。
- 查询操作不加锁,因此ConcurrentHashMap不是强一致性。
ConcurrentHashMap如何保证线程安全?
CAS+对节点加锁(减小锁粒度)+volatile
-
Node节点中的value和next指针使用了volatile来保证其可见性;table变量使用了volatile来保证每次获取到的都是最新写入的值。
-
初始化时,防止多线程初始化:
-
volatile变量(sizeCtl):它是一个标记位,用来告诉其他线程这个坑位有没有人在,其线程间的可见性由volatile保证。
-
CAS操作:CAS操作保证了设置sizeCtl标记位的原子性,保证了只有一个线程能设置成功
//控制table的初始化和扩容
//值为负时,表示正在进行初始化或扩容:
// sizeCtl = -1,表示有线程正在进行真正的初始化操作
// sizeCtl = -(1 + nThreads),表示有nThreads个线程正在进行扩容操作
// sizeCtl > 0,表示接下来的真正的初始化操作中使用的容量,或者初始化/扩容完成后的threshold
// sizeCtl = 0,默认值,此时在真正的初始化操作中使用默认容量
private transient volatile int sizeCtl;
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//每次循环都获取最新的Node数组引用
while ((tab = table) == null || tab.length == 0) {
//sizeCtl是一个标记位,若为-1也就是小于0,代表有线程在进行初始化工作了
if ((sc = sizeCtl) < 0)
//让出CPU时间片
Thread.yield(); // lost initialization race; just spin
//CAS操作,将本实例的sizeCtl变量设置为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//如果CAS操作成功了,代表本线程将负责初始化工作
try {
//再检查一遍数组是否为空
if ((tab = table) == null || tab.length == 0) {
//在初始化Map时,sizeCtl代表数组大小,默认16
//所以此时n默认为16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//Node数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
//将其赋值给table变量
table = tab = nt;
//通过位运算,n减去n二进制右移2位,相当于乘以0.75
//例如16经过运算为12,与乘0.75一样,只不过位运算更快
sc = n - (n >>> 2);
}
} finally {
//将计算后的sc(12)直接赋值给sizeCtl,表示达到12长度就扩容
//由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题
//只需要保证可见性
sizeCtl = sc;
}
break;
}
}
return tab;
}
- put操作时:
采用tabAt(tab, i)方法,其使用Unsafe类volatile的操作volatile式地查看值,保证每次获取到的值都是最新的。
利用CAS尝试写入,失败则自旋保证成功。
TreeMap
1、存储结构
只是红黑树,不支持链表操作。 通过自定义 key 比较器或者默认的比较算法来进行定位红黑树节点的存储位置。
static final class Entry<K,V> implements Map.Entry<K,V> {
K key;
V value;
Entry<K,V> left;
Entry<K,V> right;
Entry<K,V> parent;
boolean color = BLACK;
public boolean equals(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
return valEquals(key,e.getKey()) && valEquals(value,e.getValue());
}
public int hashCode() {
int keyHash = (key==null ? 0 : key.hashCode());
int valueHash = (value==null ? 0 : value.hashCode());
return keyHash ^ valueHash;
}
}
2、比较
- HashMap
数组方式存储key/value,线程非安全,允许null作为key和value,key不可以重复,value允许重复,不保证元素迭代顺序是按照插入时的顺序,key的hash值是先计算key的hashcode值,然后再进行计算,每次容量扩容会重新计算所以key的hash值,会消耗资源,要求key必须重写equals和hashcode方法
默认初始容量16,加载因子0.75,扩容为旧容量乘2,查找元素快,如果key一样则比较value,如果value不一样,则按照链表结构存储value,就是一个key后面有多个value;
- TreeMap
基于红黑二叉树的NavigableMap的实现,线程非安全,不允许null,key不可以重复,value允许重复,存入TreeMap的元素应当实现Comparable接口或者实现Comparator接口,会按照排序后的顺序迭代元素。主要用于存入元素的时候对元素进行自动排序,迭代输出的时候就按排序顺序输出
LinkedHashMap
存储结构
继承自 HashMap,因此具有和 HashMap 一样的快速查找特性。
public class LinkedHashMap<K,V> extends HashMap<K,V> implements Map<K,V>
内部维护了一个双向链表,用来维护插入顺序或者 LRU 顺序。
/**
* The head (eldest) of the doubly linked list.
*/
transient LinkedHashMap.Entry<K,V> head;
/**
* The tail (youngest) of the doubly linked list.
*/
transient LinkedHashMap.Entry<K,V> tail;
accessOrder 决定了顺序,默认为 false,此时维护的是插入顺序。
final boolean accessOrder;
LinkedHashMap 最重要的是以下用于维护顺序的函数,它们会在 put、get 等方法中调用。
void afterNodeAccess(Node<K,V> p) { }
void afterNodeInsertion(boolean evict) { }
afterNodeAccess()
当一个节点被访问时,如果 accessOrder 为 true,则会将该节点移到链表尾部。也就是说指定为 LRU 顺序之后,在每次访问一个节点时,会将这个节点移到链表尾部,保证链表尾部是最近访问的节点,那么链表首部就是最近最久未使用的节点。
void afterNodeAccess(Node<K,V> e) { // move node to last
LinkedHashMap.Entry<K,V> last;
if (accessOrder && (last = tail) != e) {
LinkedHashMap.Entry<K,V> p =
(LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
p.after = null;
if (b == null)
head = a;
else
b.after = a;
if (a != null)
a.before = b;
else
last = b;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
tail = p;
++modCount;
}
}
afterNodeInsertion()
在 put 等操作之后执行,当 removeEldestEntry() 方法返回 true 时会移除最晚的节点,也就是链表首部节点 first。
evict 只有在构建 Map 的时候才为 false,在这里为 true。
void afterNodeInsertion(boolean evict) { // possibly remove eldest
LinkedHashMap.Entry<K,V> first;
if (evict && (first = head) != null && removeEldestEntry(first)) {
K key = first.key;
removeNode(hash(key), key, null, false, true);
}
}
removeEldestEntry() 默认为 false,如果需要让它为 true,需要继承 LinkedHashMap 并且覆盖这个方法的实现,这在实现 LRU 的缓存中特别有用,通过移除最近最久未使用的节点,从而保证缓存空间足够,并且缓存的数据都是热点数据。
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
return false;
}
LRU 缓存
以下是使用 LinkedHashMap 实现的一个 LRU 缓存:
- 设定最大缓存空间 MAX_ENTRIES 为 3;
- 使用 LinkedHashMap 的构造函数将 accessOrder 设置为 true,开启 LRU 顺序;
- 覆盖 removeEldestEntry() 方法实现,在节点多于 MAX_ENTRIES 就会将最近最久未使用的数据移除。
class LRUCache<K, V> extends LinkedHashMap<K, V> {
private static final int MAX_ENTRIES = 3;
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_ENTRIES;
}
LRUCache() {
super(MAX_ENTRIES, 0.75f, true);
}
}
public static void main(String[] args) {
LRUCache<Integer, String> cache = new LRUCache<>();
cache.put(1, "a");
cache.put(2, "b");
cache.put(3, "c");
cache.get(1);
cache.put(4, "d");
System.out.println(cache.keySet());
}
[3, 1, 4]
WeakHashMap
存储结构
WeakHashMap 的 Entry 继承自 WeakReference,被 WeakReference 关联的对象在下一次垃圾回收时会被回收。
WeakHashMap 主要用来实现缓存,通过使用 WeakHashMap 来引用缓存对象,由 JVM 对这部分缓存进行回收。
private static class Entry<K,V> extends WeakReference<Object> implements Map.Entry<K,V>
ConcurrentCache
Tomcat 中的 ConcurrentCache 使用了 WeakHashMap 来实现缓存功能。
ConcurrentCache 采取的是分代缓存:
- 经常使用的对象放入 eden 中,eden 使用 ConcurrentHashMap 实现,不用担心会被回收(伊甸园);
- 不常用的对象放入 longterm,longterm 使用 WeakHashMap 实现,这些老对象会被垃圾收集器回收。
- 当调用 get() 方法时,会先从 eden 区获取,如果没有找到的话再到 longterm 获取,当从 longterm 获取到就把对象放入 eden 中,从而保证经常被访问的节点不容易被回收。
- 当调用 put() 方法时,如果 eden 的大小超过了 size,那么就将 eden 中的所有对象都放入 longterm 中,利用虚拟机回收掉一部分不经常使用的对象。
public final class ConcurrentCache<K, V> {
private final int size;
private final Map<K, V> eden;
private final Map<K, V> longterm;
public ConcurrentCache(int size) {
this.size = size;
this.eden = new ConcurrentHashMap<>(size);
this.longterm = new WeakHashMap<>(size);
}
public V get(K k) {
V v = this.eden.get(k);
if (v == null) {
v = this.longterm.get(k);
if (v != null)
this.eden.put(k, v);
}
return v;
}
public void put(K k, V v) {
if (this.eden.size() >= size) {
this.longterm.putAll(this.eden);
this.eden.clear();
}
this.eden.put(k, v);
}
}
ConcurrentSkipListMap
对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。
跳表的本质是同时维护了多个链表,并且链表是分层的,
最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。
跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。
查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。
从上面很容易看出,跳表是一种利用空间换时间的算法。
使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap。