Netty版本为4.1,
1. 存储结构InternalThreadLocalMap以及他的父类UnpaddedInternalThreadLocalMap,并且改用数组来存储线程相关数据,而JDK的ThreadLocal是用类似Map的形式,数组下标形式扩容和增删改查很迅速,nextIndex主要作为全局计数,也就是线程存储的数据具体在线程InternalThreadLocalMap属性数组中的下标。slowThreadLocalMap也提供了JDK原生的给用户选择,用来存储自身InternalThreadLocalMap。indexedVariables是存储线程数据的数组。
class UnpaddedInternalThreadLocalMap { static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); static final AtomicInteger nextIndex = new AtomicInteger(); /** Used by {@link FastThreadLocal} */ Object[] indexedVariables; // Core thread-locals int futureListenerStackDepth; int localChannelReaderStackDepth; Map<Class<?>, Boolean> handlerSharableCache; IntegerHolder counterHashCode; ThreadLocalRandom random; Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache; Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache; // String-related thread-locals StringBuilder stringBuilder; Map<Charset, CharsetEncoder> charsetEncoderCache; Map<Charset, CharsetDecoder> charsetDecoderCache; // ArrayList-related thread-locals ArrayList<Object> arrayList; UnpaddedInternalThreadLocalMap(Object[] indexedVariables) { this.indexedVariables = indexedVariables; } }
当InternalThreadLocalMap初始化时会先创建数组元素个数为32的充满Object UNSET = new Object()对象的数据,也是以后找数据的一个默认填充。
private InternalThreadLocalMap() { super(newIndexedVariableTable()); } private static Object[] newIndexedVariableTable() { Object[] array = new Object[32]; Arrays.fill(array, UNSET); return array; }
获取数组对应的下标
public static int nextVariableIndex() { int index = nextIndex.getAndIncrement(); if (index < 0) { nextIndex.decrementAndGet(); throw new IllegalStateException("too many thread-local indexed variables"); } return index; }
2. FastThreadLocal,静态变量variablesToRemoveIndex从类加载初始化时已经确定了该变量的值,也意味着存储着该线程所有FastThreadLocal数据的数组下标
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
初始化FastThreadLocal,确定该对象处于线程数组的下标位置index
public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); }
3. set方法,当设置的值为UNSET时是删除,否则是增加。根据线程类型获取不同的ThreadLocal,这个时候就会初始化数组indexedVariables
public final void set(V value) { if (value != InternalThreadLocalMap.UNSET) { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); if (setKnownNotUnset(threadLocalMap, value)) { registerCleaner(threadLocalMap); } } else { remove(); } }
public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return fastGet((FastThreadLocalThread) thread); } else { return slowGet(); } } private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); } return threadLocalMap; } private static InternalThreadLocalMap slowGet() { ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret = slowThreadLocalMap.get(); if (ret == null) { ret = new InternalThreadLocalMap(); slowThreadLocalMap.set(ret); } return ret; }
增加数据,根据FastThreadLocal对象的index来设置到数组相应的位置上,如果index超出会进行扩容。
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { if (threadLocalMap.setIndexedVariable(index, value)) { addToVariablesToRemove(threadLocalMap, this); return true; } return false; }
public boolean setIndexedVariable(int index, Object value) { Object[] lookup = indexedVariables; if (index < lookup.length) { Object oldValue = lookup[index]; lookup[index] = value; return oldValue == UNSET; } else { expandIndexedVariableTableAndSet(index, value); return true; } } private void expandIndexedVariableTableAndSet(int index, Object value) { Object[] oldArray = indexedVariables; final int oldCapacity = oldArray.length; int newCapacity = index; newCapacity |= newCapacity >>> 1; newCapacity |= newCapacity >>> 2; newCapacity |= newCapacity >>> 4; newCapacity |= newCapacity >>> 8; newCapacity |= newCapacity >>> 16; newCapacity ++; Object[] newArray = Arrays.copyOf(oldArray, newCapacity); Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); newArray[index] = value; indexedVariables = newArray; }
最后把该FastThreadLocal设置到数组variablesToRemoveIndex下标的集合中,以待后续集中删除等等,只有这个下标是存储的Set集合,其他的下标存储的是FastThreadLocal或者UNSET对象。
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set<FastThreadLocal<?>> variablesToRemove; if (v == InternalThreadLocalMap.UNSET || v == null) { variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { variablesToRemove = (Set<FastThreadLocal<?>>) v; } variablesToRemove.add(variable); }
删除数据,getIfSet不需要默认值,
public final void remove() { remove(InternalThreadLocalMap.getIfSet()); }
public static InternalThreadLocalMap getIfSet() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return ((FastThreadLocalThread) thread).threadLocalMap(); } return slowThreadLocalMap.get(); }
删除对应的数组下标位置的数据,也就是把该数据置为UNSET,这里提供一个删除后处理扩展onRemoval
public final void remove(InternalThreadLocalMap threadLocalMap) { if (threadLocalMap == null) { return; } Object v = threadLocalMap.removeIndexedVariable(index); removeFromVariablesToRemove(threadLocalMap, this); if (v != InternalThreadLocalMap.UNSET) { try { onRemoval((V) v); } catch (Exception e) { PlatformDependent.throwException(e); } } }
public Object removeIndexedVariable(int index) { Object[] lookup = indexedVariables; if (index < lookup.length) { Object v = lookup[index]; lookup[index] = UNSET; return v; } else { return UNSET; } }
最后再从那个集合中删除掉该FastThreadLocal。
private static void removeFromVariablesToRemove( InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); if (v == InternalThreadLocalMap.UNSET || v == null) { return; } @SuppressWarnings("unchecked") Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v; variablesToRemove.remove(variable); }4. get方法,获取数组对应下标的数据,当数据为默认值UNSET时,执行初始化操作initialize,用户可以覆盖该方法initialValue,最后把该数据放到数组对应的下标index,并且加入到set集合中。
public final V get() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } V value = initialize(threadLocalMap); registerCleaner(threadLocalMap); return value; }
private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null; try { v = initialValue(); } catch (Exception e) { PlatformDependent.throwException(e); } threadLocalMap.setIndexedVariable(index, v); addToVariablesToRemove(threadLocalMap, this); return v; }