FastThreadLocal,看名字就可以知道,netty君让其优化了。我们先来看看ThreadLocal哪里慢了需要fast,上一篇分析我们知道,Thread内有一个ThreadLocalMap的成员,该成员相当于一个map(数组+线性扫描),该map以ThreadLocal为key,若要定位到相应的value,需要两步。先是根据ThreadLocal的hashCode取余定位到数组的小标,因为是线性扫描,所以很有可能当前不是,需要往后遍历直到找到。而netty,使用一定的机制使其无需第二步操作。
因为ThreadLocal里面没有相应的接口,netty只能自己实现FastThreadLocalThread继承Thread在里面设有自己的ThreadLocalMap。
public class FastThreadLocalThread extends Thread {
// This will be set to true if we have a chance to wrap the Runnable.
private final boolean cleanupFastThreadLocals;
private InternalThreadLocalMap threadLocalMap;
其中cleanupFastThreadLocals从注释中可以读出当封装runnable时候该值为true。
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
从InternalThreadLocalMap的构造可以看到,InternalThreadLocalMap也采用了数组形式存储。
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
我们再看看其父类UnpaddedInternalThreadLocalMap的成员
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
static final AtomicInteger nextIndex = new AtomicInteger();
/** Used by {@link FastThreadLocal} */
Object[] indexedVariables;
// Core thread-locals
int futureListenerStackDepth;
int localChannelReaderStackDepth;
Map<Class<?>, Boolean> handlerSharableCache;
IntegerHolder counterHashCode;
ThreadLocalRandom random;
Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
// String-related thread-locals
StringBuilder stringBuilder;
Map<Charset, CharsetEncoder> charsetEncoderCache;
Map<Charset, CharsetDecoder> charsetDecoderCache;
// ArrayList-related thread-locals
ArrayList<Object> arrayList;
可以看到slowThreadLocalMap 为fastThreadLocal提供了支持原始ThreadLocal的分支。其中nextIndex为static的原子类,说明只有一份。我们看下index相关的操作。
// 直接根据index获取
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
/**
* @return {@code true} if and only if a new thread-local variable has been created
*/
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
// 如果要查询的index不在indexedVariables范围,则需要先扩展在设置
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
// newCapacity-> 32,64,128,256,512....
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
// remove方法直接在对应位置上设置UNSET
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
public boolean isIndexedVariableSet(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length && lookup[index] != UNSET;
}
这些都是直接根据index下标来获得,只是原来ThreadLocal的第一步。为什么这里可以一次命中,而不需要线性探测。问题就在其nextIndex是静态的,因为ThreadLocal正确的用法就是声明成static。在进程中是有很多的ThreadLocal实例的,而在jdk的ThreadLocal中,识别ThreadLocal的方式是引用的对比,即key == yourThreadLocal的方式,是通过ThreadLocal的hashCode取余来定位,也就是说hashCode的取余会重复。而在netty中,他使用了一个静态的全局序列号的方式,采用AtomicInteger分配可以保证index的唯一性。index从0开始,也就是说所有线程中的每个ThreadLocal都与index的一个唯一数字绑定。那么查找时候只需要通过index定位即可。
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
每一个FastThreadLocal实例对应一个全局唯一的Index。
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
如果超过int类型范围溢出,报错。
还要提一下的就是FastThreadLocal中的一个静态成员
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
它也分配了一个index,那说明它在数组中也占一个槽位。
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);
}
可以看到它对应的是Set<FastThreadLocal<?>>类型,FastThreadLocal集合,那么存的是啥内容?
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
if (value != InternalThreadLocalMap.UNSET) {
setKnownNotUnset(threadLocalMap, value);
} else {
remove(threadLocalMap);
}
}
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
return true;
}
return false;
}
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
可以看到,有一个新的槽位被set值的时候,将那个FastThreadLocal加入这个set中
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
可以看到,调用FastThreadLocal的remove时候,会把自己从set集合中移除。remove中当删除的位置上有值时还暴露了OnRemove()方法给用户继承处理。
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
可以看到,这个set可以方便removeAll提供所有,只需set里调用每个元素的remvoe。
在InternalThreadLocalMap中通过bitSet的cleanerFlag来记录清除的位数。 后台有个守护线程负责清理。
还有一点要提一下的是在InternalThreadLocalMap中,故意将其线程构造InternalThreadLocalMap时直接构造填充32个对象
// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
注释说开启指针压缩这个对象的实例内存占用至少要128字节,在<并发编程的艺术>里面看到过类似的操作,JDK7并发包中的LindedTransferQueue。目的:避免多处理器情况下,追加字节方式填满高速缓存区的缓存行,避免多个成员在同一缓冲区被锁定,增加并发度。
无非进一步采用空间换时间的方法,使get操作复杂度为O(1)。