Home Netty的FastThreadLocal实现源码解析
Post
Cancel

Netty的FastThreadLocal实现源码解析

1. 概要

FastThreadLocal是Netty对JDK提供的ThreadLocal的一种改良替代实现。

2. 源码实现

直接进入源码实现品析环节。 理解FastThreadLocal需要看的源码涉及到以下几个类

  • FastThreadLocal
  • UnpaddedInternalThreadLocalMap InternalThreadLocalMap的父类
  • InternalThreadLocalMap
  • FastThreadLocalThread
  • FastThreadLocalRunnable

2.1 FastThreadLocalThread

FastThreadLocalThread是Netty中与FastThreadLocal配套使用的,它继承了Thread类,并包含以下2个字段

1
2
3
4
// 用于标记该FastThreadLocalThread在run方法执行完毕后是否会清理FastThreadLocals
private final boolean cleanupFastThreadLocals;
// 每个FastThreadLocalThread都有一个类型为InternalThreadLocalMap的实例
private InternalThreadLocalMap threadLocalMap;

cleanupFastThreadLocals对于FastThreadLocalThread所有带有Runnable的构造方法都会设置为true,因为构造方法中会将参数中的Runnable实例包装为FastThreadLocalRunnable。 FastThreadLocalRunnable就是在内部被包装的Runnable实例的run方法完成后,再执行FastThreadLocal.removeAll()

FastThreadLocal的很多API都会根据当前线程是否为FastThreadLocalThread来分类实现具体逻辑。

2.2 InternalThreadLocalMap

InternalThreadLocalMap可以视作Netty中对于JDK的ThreadLocalMap的替代品。 Netty中所有FastThreadLocal的读写最终都是调用InternalThreadLocalMap的。 那么这里很容易产生一个问题: 对于定制化的FastThreadLocalThread,因为其中包含了一个InternalThreadLocalMap的实例,我们可以直接获取到。 那么普通的Thread,我们怎么将其与InternalThreadLocalMap进行关联呢?

答案就是:用一个ThreadLocal维护普通Thread对应的InternalThreadLocalMap。

1
2
3
4
class UnpaddedInternalThreadLocalMap {
    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    // 省略
}

下面就来看一下如何获取当前线程对应的InternalThreadLocalMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    // 判断当前线程是否为FastThreadLocalThread及其子类
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}

private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}

private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    // 从slowThreadLocalMap中获取当前线程对应的InternalThreadLocalMap,如果为null则初始化之。
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

2.2.1 存储方式

先来回顾一下JDK的ThreadLocal的实现:ThreadLocalMap采用开放地址法,线性探测定位元素位置。每个ThreadLocal的编号的间隔为0x61c88647,尽可能保证ThreadLocal能够均匀分布在哈希表中。而Netty则采用了一种简单粗暴的方式,直接为每个FastThreadLocal顺序编号。直接根据编号在数组中获取某个FastThreadLocal存储的值。 这个用于保存值的数组,定义在UnpaddedInternalThreadLocalMap中,UnpaddedInternalThreadLocalMap是InternalThreadLocalMap的父类

1
Object[] indexedVariables;

2.2.2 垃圾清理

JDK的ThreadLocal针对无效ThreadLocal的清理,做法是ThreadLocalMap中键的类型使用弱引用,那么对于某个ThreadLocal成为垃圾对象的时候,ThreadLocal的API如get/set/remove可以探测到这种情况进行回收。 这类回收方式相对来说比较被动,如果用户代码不去主动调用get/set/remove,则可能有大对象会一直持有在ThreadLocalMap的Entry中。

我们可以根据线程类型分两类进行分析:

  • FastThreadLocalThread 这类线程由于对参数的Runnable进行了包装,在线程执行完任务后,会调用FastThreadLocal#removeAll来释放该线程所有FastThreadLocal。
  • 普通Thread 这类线程调用set或者get触发initialize操作时,都会调用registerCleaner方法将清理该FastThreadLocal的任务注册到对象收集器ObjectCleaner中。当然,为了避免同一个FastThreadLocal反复注册清理任务,每个FastThreadLocal都会有一个index(cleanerFlagIndex)来标记该FastThreadLocal是否已经注册到了对象收集器ObjectCleaner中。举例来说,假设某个FastThreadLocal的cleanerFlagIndex为97,则每个线程取出自己的InternalThreadLocalMap后,只要看第97号位置元素,就可以知道收集该FastThreadLocal的任务是否已经注册到ObjectCleaner中。

ObjectCleaner用低优先级的后台守护线程轮询reference queue是否有线程对象被回收了,如果有则调用FastThreadLocal#remove方法释放该FastThreadLocal。

不同于JDK,Netty采用了一种相对主动的垃圾清理机制。对于FastThreadLocalThread则采用任务执行完毕后主动清理该线程所有的FastThreadLocal对应的值,对于普通Thread,则通过注册垃圾收集任务,在Thread对象被GC后,由ObjectCleaner轮询到后,调用清理任务清理FastThreadLocal。

下面就是清理方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }

    Object v = threadLocalMap.removeIndexedVariable(index);
    removeFromVariablesToRemove(threadLocalMap, this);

    if (v != InternalThreadLocalMap.UNSET) {
        try {
            onRemoval((V) v);
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}

private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {

    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);

    if (v == InternalThreadLocalMap.UNSET || v == null) {
        return;
    }

    @SuppressWarnings("unchecked")
    Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
    variablesToRemove.remove(variable);
}

2.2.3 扩容机制

当InternalThreadLocalMap内部的indexedVariables不够放新的FastThreadLocal的值的时候,会进行扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value);
        return true;
    }
}

/**
 * 返回大于原table size的最小的2的幂。
 */
private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;

    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;
}

expandIndexedVariableTableAndSet的作用是得出大于原table size的最小的2的幂,具体做法是采用位运算,将原size最左边的1后面的位全部置为1,此时再+1即为大于原size最小的2的幂。 这与JDK中的HashMap的tableSizeFor类似,不同在于tableSizeFor求的是大于等于入参的最小的2的幂,所以tableSizeFor会先将入参减去1,将大于等于参数化为大于参数减1来计算。

This post is licensed under CC BY 4.0 by the author.
Contents

记录线上一次线程hang住问题

GC Ergonomics间接引发的锁等待超时问题排查分析