前言
保险出单场景中,业务员出单时后台需要根据业务员所在机构来决定业务流转,审计字段记录出单员信息等,这些在后端处理中需要多次用到,每笔订单都是独立对应一个业务线程,可以使用ThreadLocal将业务员代码存储到线程中,方便后续取值。
概述
我们知道想要启动一个新的线程,最终需要用到Thread类,一个Thread对应一个独立的线程。所以我们是不是可以在Thread中定义一个Map集合,最后以key-value的形式将内容存储到里面;我们在线程中想要拿到当前线程的引用对象,只需要调用Thread.currentThread()即可拿到实例,拿到实例后从其属性Map集合中进行取值即可,由于每个Thread都是独立的,这样就能做到线程间的隔离了。
ThreadLocal往Thread容器中添加数据时,以自身对象作为存储的key,以自身对象左右取值的Key;
正文
案例
public static void main(String[] args) {
ThreadLocal<String > loginUserCode=new ThreadLocal<>();
new Thread(()->{
loginUserCode.set("100000");
System.out.println(Thread.currentThread().getName()+":"+loginUserCode.get());
}).start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+":"+loginUserCode.get());
loginUserCode.set("200000");
System.out.println(Thread.currentThread().getName()+":"+loginUserCode.get());
Thread thread = Thread.currentThread();
}).start();
}
通过上面的案例知道ThreadLocal的基本用法。
数据存储结构
从上述的步骤中我们知道,Thread中肯定存储一个缓存容器用于存储数据,所以先看下其缓存的实现
ThreadLocalMap的源码比较多,这里我就挑比较重点属性的出来讲解;
static class ThreadLocalMap {
private Entry[] table;
//默认Enrty[]容量
private static final int INITIAL_CAPACITY = 16;
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
//将key值包装成弱引用进行存储
super(k);
value = v;
}
}
}
看过HashMap源码的同学看这部分应该会比较容易点,这里的Entry继承了弱引用修饰的ThreadLocal。
ThreadLocal: set(T value)方法
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
- 获取当前线程
- 从当前线程中获取缓存对象
- 判断该缓存对象是否被初始化,没有则创建一个容器并赋值
- 将当前的ThreadLocal对象作为key,与value存入缓存中
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
//计算出当前对象在数组中的下标,这里相当于取模效果
int i = key.threadLocalHashCode & (len-1);
//看是否出现Hash冲突的情况
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果其k值对象一样,证明是修改操作,直接复制即可
if (k == key) {
e.value = value;
return;
}
//由于是弱引用,之前的key可能被垃圾回收,所以存储为null的,情况。
//所以这里需要重新选址和清理一些垃圾
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
//走到这里,证明数组中不存在hash冲突的情况,这里创建一个对象并存入数组中
tab[i] = new Entry(key, value);
//当前存储容量+1
int sz = ++size;
//如果没有清理垃圾的情况发生,并且容量超过阈值大小了,需要进行扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
//扩容。容量为当前旧数组的2倍
rehash();
}
综上所述:
- 根据当前ThreadLocal计算Hash值,并进行与运算,得到数组下标
- 判断当前数组对应下标是否有值,没有则创建一个新的enrty对象进行存储,并尝试进行垃圾回收和扩容
- 如果数组对应下标已经有值了,则是hash冲突了,判断是否存在key值相等的情况,如果存在则直接覆盖
- 如果当前key值不相等,此时使用线性寻址法找出对应的下标位置
- 对key值为null的垃圾进行回收。
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
//存储当前下标志
int slotToExpunge = staleSlot;
//往前找,找出被垃圾回收的ThreadLocal对应的下标值
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
//在数组中一直往后寻找,看是否能找到key一致的位置
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
//找到位置,将该位置的值与之前ThreadLocal为null的位置进行互换
//这里主要让数据在数组中尽可能连城一块,减少碎片化
if (k == key) {
e.value = value;
//将当前key为null的位置与真正存储值的数据进行互换,减少碎片化
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
//判断在hash冲突位置的前面是否存在垃圾数据,
//存在时 则两者不等,从最前面开始进行扫描清理垃圾
if (slotToExpunge == staleSlot)
slotToExpunge = i;
//垃圾清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
//如果key不存在,则创建一个新的,放入数组中
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
//判断前面位置是否存在垃圾数据,存在则从最前面开始清理,否则不需要处理
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
综上所述:
- 找出当前hash冲突位置前面是否存在key为null(被垃圾回收)的脏数据
- 一直往后找,看是否存在key相等的位置,有则将该位置的数据与hash冲突位置进行互换,减少内存空间的碎片化
- 垃圾清理,判断清理的位置是否在hash冲突位置的前面,如果是则从前面开始清理
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
//一直往后找,找到位置为null的数组下标
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//如果是垃圾数据,则清理。
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
//看是否存在Hash冲突的情况
if (h != i) {
//将当前位置的值清理掉
//重新在hash冲突处往后寻找一个地址为null的数据进行存储
//因为前面可能存在清理过的位置,这样的好处是减少碎片化,将数据尽量往前面靠近
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
综上所述:
- 从参数下标处开始扫描,如果数组位置没有存储数据,则返回该下标
- 如果key已经被回收了,此时将该位置的空间进行清理
- 如果存在hash冲突的情况,将当前位置空间清理掉,重新在hash冲突处往后再寻找一个空的空间进行存储,这样的好处是让数据能连续,避免过度碎片化
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
由于整个可能存在碎片化过多的情况,而expungeStaleEntry()方法遇到位置为空时,就结束清理。所以需要配合cleanSomeSlots()方法进行指数次清理。我理解使用指数次清理的方法可能清理不是特别干净,但是相对整个数组的扫描效率要高得多,数组过大时,整组扫描清理的方法会过度消耗时间。
ThreadLocal:get()方法
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
- 获取当前线程
- 从当前线程中获取缓存对象
- 判断该缓存对象是否被初始化,没有则创建一个容器并赋默认值
- ThreadLocal对象作为key从缓存对象中获取值
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
ThreadLocal:remove()方法
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
m.remove(this);
}
}
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
//释放当前数组位置空间,并进行垃圾清理
expungeStaleEntry(i);
return;
}
}
}
总结
Thread中定义了ThreadLocalMap 容器用于存储当前线程数据,ThreadLocalMap 的数据结构采用数组的方法,不像HashMap采用数组+链表的方法,所以在产生hash冲突时,两者的处理方法不一样。ThreadLocalMap中Key为弱引用的ThreadLocal对象,当发生GC时,该对象会被清理,而此时key所对应的value为强引用,所以就存在key=null,value不为null的情况,如果线程持续时间长,value得不到回收就会存在内存泄漏,如果出现大量线程,可能还会引发内存溢出。
所以ThreadLocalMap中定义了一些回收策略,对key为null的垃圾进行清理后,可能会导致数组中存在大量不连续的null的空间,整个数组碎片化验证。所以在expungeStaleEntry()方法中,对垃圾清理的同时,也会有对应策略重新计算下标,将数组存储尽量连续性;