1. 概要
FastThreadLocal是Netty对JDK提供的ThreadLocal的一种改良替代实现。
2. 源码实现
直接进入源码实现品析环节。 理解FastThreadLocal需要看的源码涉及到以下几个类
- FastThreadLocal
- UnpaddedInternalThreadLocalMap InternalThreadLocalMap的父类
- InternalThreadLocalMap
- FastThreadLocalThread
- FastThreadLocalRunnable
2.1 FastThreadLocalThread
FastThreadLocalThread是Netty中与FastThreadLocal配套使用的,它继承了Thread类,并包含以下2个字段
1
2
3
4
// 用于标记该FastThreadLocalThread在run方法执行完毕后是否会清理FastThreadLocals
private final boolean cleanupFastThreadLocals;
// 每个FastThreadLocalThread都有一个类型为InternalThreadLocalMap的实例
private InternalThreadLocalMap threadLocalMap;
cleanupFastThreadLocals对于FastThreadLocalThread所有带有Runnable的构造方法都会设置为true,因为构造方法中会将参数中的Runnable实例包装为FastThreadLocalRunnable。 FastThreadLocalRunnable就是在内部被包装的Runnable实例的run方法完成后,再执行FastThreadLocal.removeAll()。
FastThreadLocal的很多API都会根据当前线程是否为FastThreadLocalThread来分类实现具体逻辑。
2.2 InternalThreadLocalMap
InternalThreadLocalMap可以视作Netty中对于JDK的ThreadLocalMap的替代品。 Netty中所有FastThreadLocal的读写最终都是调用InternalThreadLocalMap的。 那么这里很容易产生一个问题: 对于定制化的FastThreadLocalThread,因为其中包含了一个InternalThreadLocalMap的实例,我们可以直接获取到。 那么普通的Thread,我们怎么将其与InternalThreadLocalMap进行关联呢?
答案就是:用一个ThreadLocal维护普通Thread对应的InternalThreadLocalMap。
1
2
3
4
class UnpaddedInternalThreadLocalMap {
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
// 省略
}
下面就来看一下如何获取当前线程对应的InternalThreadLocalMap。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
// 判断当前线程是否为FastThreadLocalThread及其子类
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
// 从slowThreadLocalMap中获取当前线程对应的InternalThreadLocalMap,如果为null则初始化之。
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
2.2.1 存储方式
先来回顾一下JDK的ThreadLocal的实现:ThreadLocalMap采用开放地址法,线性探测定位元素位置。每个ThreadLocal的编号的间隔为0x61c88647,尽可能保证ThreadLocal能够均匀分布在哈希表中。而Netty则采用了一种简单粗暴的方式,直接为每个FastThreadLocal顺序编号。直接根据编号在数组中获取某个FastThreadLocal存储的值。 这个用于保存值的数组,定义在UnpaddedInternalThreadLocalMap中,UnpaddedInternalThreadLocalMap是InternalThreadLocalMap的父类
1
Object[] indexedVariables;
2.2.2 垃圾清理
JDK的ThreadLocal针对无效ThreadLocal的清理,做法是ThreadLocalMap中键的类型使用弱引用,那么对于某个ThreadLocal成为垃圾对象的时候,ThreadLocal的API如get/set/remove可以探测到这种情况进行回收。 这类回收方式相对来说比较被动,如果用户代码不去主动调用get/set/remove,则可能有大对象会一直持有在ThreadLocalMap的Entry中。
我们可以根据线程类型分两类进行分析:
- FastThreadLocalThread 这类线程由于对参数的Runnable进行了包装,在线程执行完任务后,会调用FastThreadLocal#removeAll来释放该线程所有FastThreadLocal。
- 普通Thread 这类线程调用set或者get触发initialize操作时,都会调用registerCleaner方法将清理该FastThreadLocal的任务注册到对象收集器ObjectCleaner中。当然,为了避免同一个FastThreadLocal反复注册清理任务,每个FastThreadLocal都会有一个index(cleanerFlagIndex)来标记该FastThreadLocal是否已经注册到了对象收集器ObjectCleaner中。举例来说,假设某个FastThreadLocal的cleanerFlagIndex为97,则每个线程取出自己的InternalThreadLocalMap后,只要看第97号位置元素,就可以知道收集该FastThreadLocal的任务是否已经注册到ObjectCleaner中。
ObjectCleaner用低优先级的后台守护线程轮询reference queue是否有线程对象被回收了,如果有则调用FastThreadLocal#remove方法释放该FastThreadLocal。
不同于JDK,Netty采用了一种相对主动的垃圾清理机制。对于FastThreadLocalThread则采用任务执行完毕后主动清理该线程所有的FastThreadLocal对应的值,对于普通Thread,则通过注册垃圾收集任务,在Thread对象被GC后,由ObjectCleaner轮询到后,调用清理任务清理FastThreadLocal。
下面就是清理方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
2.2.3 扩容机制
当InternalThreadLocalMap内部的indexedVariables不够放新的FastThreadLocal的值的时候,会进行扩容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
/**
* 返回大于原table size的最小的2的幂。
*/
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
expandIndexedVariableTableAndSet的作用是得出大于原table size的最小的2的幂,具体做法是采用位运算,将原size最左边的1后面的位全部置为1,此时再+1即为大于原size最小的2的幂。 这与JDK中的HashMap的tableSizeFor类似,不同在于tableSizeFor求的是大于等于入参的最小的2的幂,所以tableSizeFor会先将入参减去1,将大于等于参数化为大于参数减1来计算。