• 深入理解 Netty FastThreadLocal


    作者:vivo 互联网服务器团队- Jiang Zhu

    本文以线上诡异问题为切入点,通过对比JDK ThreadLocal和Netty FastThreadLocal实现逻辑以及优缺点,并深入解读源码,由浅入深理解Netty FastThreadLocal。

    一、前言

    最近在学习Netty相关的知识,在看到Netty FastThreadLocal章节中,回想起一起线上诡异问题。

    • 问题描述:外销业务获取用户信息判断是否支持https场景下,获取的用户信息有时候竟然是错乱的。

    • 问题分析:使用ThreadLocal保存用户信息时,未能及时进行remove()操作,而Tomcat工作线程是基于线程池的,会出现线程重用情况,所以获取的用户信息可能是之前线程遗留下来的。

    • 问题修复:ThreadLocal使用完之后及时remove()、ThreadLocal使用之前也进行remove()双重保险操作。

    接下来,我们继续深入了解下JDK ThreadLocal和Netty FastThreadLocal吧。

    二、JDK ThreadLocal介绍

    ThreadLocal是JDK提供的一个方便对象在本线程内不同方法中传递、获取的类。用它定义的变量,仅在本线程中可见,不受其他线程的影响,与其他线程相互隔离

    那具体是如何实现的呢?如图1所示,每个线程都会有个ThreadLocalMap实例变量,其采用懒加载的方式进行创建,当线程第一次访问此变量时才会去创建。

    ThreadLocalMap使用线性探测法存储ThreadLocal对象及其维护的数据,具体操作逻辑如下:

    • 假设有一个新的ThreadLocal对象,通过hash计算它应存储的位置下标为x。

    • 此时发现下标x对应位置已经存储了其他的ThreadLocal对象,则它会往后寻找,步长为1,下标变更为x+1。

    • 接下来发现下标x+1对应位置也已经存储了其他的ThreadLocal对象,同理则它会继续往后寻找,下标变更为x+2。

    • 直到寻找到下标为x+3时发现是空闲的,然后将该ThreadLocal对象及其维护的数据构建一个entry对象存储在x+3位置。

    在ThreadLocalMap中数据很多的情况下,很容易出现hash冲突,解决冲突需要不断的向下遍历,该操作的时间复杂度为O(n),效率较低

    图片

    图1

    从下面的代码中可以看出:

    Entry 的 key 是弱引用,value 是强引用。在 JVM 垃圾回收时,只要发现弱引用的对象,不管内存是否充足,都会被回收。

    但是当 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出现 Entry 的 key 为 NULL,那么 Entry 的 value 一直会强引用数据而得不到释放,只能等待线程销毁,从而造成内存泄漏

    static class ThreadLocalMap {
        // 弱引用,在资源紧张的时候可以回收部分不再引用的ThreadLocal变量
        static class Entry extends WeakReference> {
            // 当前ThreadLocal对象所维护的数据
            Object value;
     
            Entry(ThreadLocal k, Object v) {
                super(k);
                value = v;
            }
        }
        // 省略其他代码
    }

    综上所述,既然JDK提供的ThreadLocal可能存在效率较低和内存泄漏的问题,为啥不做相应的优化和改造呢?

    1. 从ThreadLocal类注释看,它是JDK1.2版本引入的,早期可能不太关注程序的性能。

    2. 大部分多线程场景下,线程中的ThreadLocal变量较少,因此出现hash冲突的概率相对较小,及时偶尔出现了hash冲突,对程序的性能影响也相对较小。

    3. 对于内存泄漏问题,ThreadLocal本身已经做了一定的保护措施。作为使用者,在线程中某个ThreadLocal对象不再使用或出现异常时,立即调用 remove() 方法删除 Entry 对象,养成良好的编码习惯。

    三、Netty FastThreadLocal介绍

    FastThreadLocal是Netty中对JDK提供的ThreadLocal优化改造版本,从名称上来看,它应该比ThreadLocal更快了,以应对Netty处理并发量大、数据吞吐量大的场景。

    那具体是如何实现的呢?如图2所示,每个线程都会有个InternalThreadLocalMap实例变量。

    每个FastThreadLocal实例创建时,都会采用AtomicInteger保证顺序递增生成一个不重复的下标index,它是该FastThreadLocal对象维护的数据应该存储的位置。

    读写数据的时候通过FastThreadLocal的下标 index 直接定位到该FastThreadLocal的位置,时间复杂度为 O(1),效率较高。

    如果该下标index递增到特别大,InternalThreadLocalMap维护的数组也会特别大,所以FastThreadLocal是通过空间换时间来提升读写性能的。

    图片

    图2

    四、Netty FastThreadLocal源码分析

    4.1 构造方法

    public class FastThreadLocal {
        // FastThreadLocal中的index是记录了该它维护的数据应该存储的位置
        // InternalThreadLocalMap数组中的下标, 它是在构造函数中确定的
        private final int index;
     
        public InternalThreadLocal() {
            index = InternalThreadLocalMap.nextVariableIndex();
        }
        // 省略其他代码
    }

     

    public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
        // 自增索引, ⽤于计算下次存储到Object数组中的位置
        private static final AtomicInteger nextIndex = new AtomicInteger();
     
        private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;
     
        public static int nextVariableIndex() {
            int index = nextIndex.getAndIncrement();
            if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {
                nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);
                throw new IllegalStateException("too many thread-local indexed variables");
            }
            return index;
        }
        // 省略其他代码
    }

    上面这两段代码在Netty FastThreadLocal介绍中已经讲解过,这边就不再重复介绍了。

    4.2 get 方法

    public class FastThreadLocal {
        // FastThreadLocal中的index是记录了该它维护的数据应该存储的位置
        private final int index;
     
        public final V get() {
            // 获取当前线程的InternalThreadLocalMap
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            // 根据当前线程的index从InternalThreadLocalMap中获取其绑定的数据
            Object v = threadLocalMap.indexedVariable(index);
            // 如果获取当前线程绑定的数据不为缺省值UNSET,则直接返回;否则进行初始化
            if (v != InternalThreadLocalMap.UNSET) {
                return (V) v;
            }
     
            return initialize(threadLocalMap);
        }
        // 省略其他代码
    }

     

    public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
        private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
     
        // 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSET
        public static final Object UNSET = new Object();
     
        // 存储绑定到当前线程的数据的数组
        private Object[] indexedVariables;
     
        // slowThreadLocalMap为JDK ThreadLocal存储InternalThreadLocalMap
        private static final ThreadLocal slowThreadLocalMap =
                new ThreadLocal();
     
        // 从绑定到当前线程的数据的数组中取出index位置的元素
        public Object indexedVariable(int index) {
            Object[] lookup = indexedVariables;
            return index < lookup.length? lookup[index] : UNSET;
        }
     
        public static InternalThreadLocalMap get() {
            Thread thread = Thread.currentThread();
            // 判断当前线程是否是FastThreadLocalThread类型
            if (thread instanceof FastThreadLocalThread) {
                return fastGet((FastThreadLocalThread) thread);
            } else {
                return slowGet();
            }
        }
     
        private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
            // 直接获取当前线程的InternalThreadLocalMap
            InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
            // 如果当前线程的InternalThreadLocalMap还未创建,则创建并赋值
            if (threadLocalMap == null) {
                thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
            }
            return threadLocalMap;
        }
     
        private static InternalThreadLocalMap slowGet() {
            // 使用JDK ThreadLocal获取InternalThreadLocalMap
            InternalThreadLocalMap ret = slowThreadLocalMap.get();
            if (ret == null) {
                ret = new InternalThreadLocalMap();
                slowThreadLocalMap.set(ret);
            }
            return ret;
        }
     
        private InternalThreadLocalMap() {
            indexedVariables = newIndexedVariableTable();
        }
     
        // 初始化一个32位长度的Object数组,并将其元素全部设置为缺省值UNSET
        private static Object[] newIndexedVariableTable() {
            Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
            Arrays.fill(array, UNSET);
            return array;
        }
        // 省略其他代码
    }

    源码中 get() 方法主要分为下面3个步骤处理:

    1. 通过InternalThreadLocalMap.get()方法获取当前线程的InternalThreadLocalMap。

    2. 根据当前线程的index 从InternalThreadLocalMap中获取其绑定的数据。

    3. 如果不是缺省值UNSET,直接返回;如果是缺省值,则执行initialize方法进行初始化。

    下面我们继续分析一下InternalThreadLocalMap.get()方法的实现逻辑。

    1. 首先判断当前线程是否是FastThreadLocalThread类型,如果是FastThreadLocalThread类型则直接使用fastGet方法获取InternalThreadLocalMap,如果不是FastThreadLocalThread类型则使用slowGet方法获取InternalThreadLocalMap兜底处理。

    2. 兜底处理中的slowGet方法会退化成JDK原生的ThreadLocal获取InternalThreadLocalMap。

    3. 获取InternalThreadLocalMap时,如果为null,则会直接创建一个InternalThreadLocalMap返回。其创建过过程中初始化一个32位长度的Object数组,并将其元素全部设置为缺省值UNSET。

    4.3 set 方法

    public class FastThreadLocal {
        // FastThreadLocal初始化时variablesToRemoveIndex被赋值为0
        private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
     
        public final void set(V value) {
            // 判断value值是否是未赋值的Object变量(缺省值)
            if (value != InternalThreadLocalMap.UNSET) {
                // 获取当前线程对应的InternalThreadLocalMap
                InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
                // 将InternalThreadLocalMap中数据替换为新的value
                // 并将FastThreadLocal对象保存到待清理的Set中
                setKnownNotUnset(threadLocalMap, value);
            } else {
                remove();
            }
        }
     
        private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
            // 将InternalThreadLocalMap中数据替换为新的value
            if (threadLocalMap.setIndexedVariable(index, value)) {
                // 并将当前的FastThreadLocal对象保存到待清理的Set中
                addToVariablesToRemove(threadLocalMap, this);
            }
        }
     
        private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal variable) {
            // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
            Set> variablesToRemove;
            if (v == InternalThreadLocalMap.UNSET || v == null) {
                // 下标index为0的数据为空,则创建FastThreadLocal对象Set集合
                variablesToRemove = Collections.newSetFromMap(new IdentityHashMap, Boolean>());
                // 将InternalThreadLocalMap中下标为0的数据,设置成FastThreadLocal对象Set集合
                threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
            } else {
                variablesToRemove = (Set>) v;
            }
            // 将FastThreadLocal对象保存到待清理的Set中
            variablesToRemove.add(variable);
        }
        // 省略其他代码
    }

     

    public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
        // 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSET
        public static final Object UNSET = new Object();
        // 存储绑定到当前线程的数据的数组
        private Object[] indexedVariables;
        // 绑定到当前线程的数据的数组能再次采用x2扩容的最大量
        private static final int ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD = 1 << 30;
        private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;
     
        // 将InternalThreadLocalMap中数据替换为新的value
        public boolean setIndexedVariable(int index, Object value) {
            Object[] lookup = indexedVariables;
            if (index < lookup.length) {
                Object oldValue = lookup[index];
                // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)
                lookup[index] = value;
                return oldValue == UNSET;
            } else { // 绑定到当前线程的数据的数组需要扩容,则扩容数组并数组设置新value
                expandIndexedVariableTableAndSet(index, value);
                return true;
            }
        }
     
        private void expandIndexedVariableTableAndSet(int index, Object value) {
            Object[] oldArray = indexedVariables;
            final int oldCapacity = oldArray.length;
            int newCapacity;
            // 判断可进行x2方式进行扩容
            if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {
                newCapacity = index;
                // 位操作,提升扩容效率
                newCapacity |= newCapacity >>>  1;
                newCapacity |= newCapacity >>>  2;
                newCapacity |= newCapacity >>>  4;
                newCapacity |= newCapacity >>>  8;
                newCapacity |= newCapacity >>> 16;
                newCapacity ++;
            } else { // 不支持x2方式扩容,则设置绑定到当前线程的数据的数组容量为最大值
                newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;
            }
            // 按扩容后的大小创建新数组,并将老数组数据copy到新数组
            Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
            // 新数组扩容后的部分赋UNSET缺省值
            Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
            // 新数组的index位置替换成新的value
            newArray[index] = value;
            // 绑定到当前线程的数据的数组用新数组替换
            indexedVariables = newArray;
        }
        // 省略其他代码
    }

    源码中 set() 方法主要分为下面3个步骤处理:

    1. 判断value是否是缺省值UNSET,如果value不等于缺省值,则会通过InternalThreadLocalMap.get()方法获取当前线程的InternalThreadLocalMap,具体实现3.2小节中get()方法已做讲解。

    2. 通过FastThreadLocal中的setKnownNotUnset()方法将InternalThreadLocalMap中数据替换为新的value,并将当前的FastThreadLocal对象保存到待清理的Set中。

    3. 如果等于缺省值UNSET或null(else的逻辑),会调用remove()方法,remove()具体见后面的代码分析。

    接下来我们看下InternalThreadLocalMap.setIndexedVariable方法的实现逻辑。

    1. 判断index是否超出存储绑定到当前线程的数据的数组indexedVariables的长度,如果没有超出,则获取index位置的数据,并将该数组index位置数据设置新value。

    2. 如果超出了,绑定到当前线程的数据的数组需要扩容,则扩容该数组并将它index位置的数据设置新value。

    3. 扩容数组以index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省值UNSET,最终把新数组赋值给 indexedVariables。

    下面我们再继续看下FastThreadLocal.addToVariablesToRemove方法的实现逻辑。

    1. 取下标index为0的数据(用于存储待清理的FastThreadLocal对象Set集合中),如果该数据是缺省值UNSET或null,则会创建FastThreadLocal对象Set集合,并将该Set集合填充到下标index为0的数组位置。

    2. 如果该数据不是缺省值UNSET,说明Set集合已金被填充,直接强转获取该Set集合。

    3. 最后将FastThreadLocal对象保存到待清理的Set集合中。

    4.4 remove、removeAll方法

    public class FastThreadLocal {
        // FastThreadLocal初始化时variablesToRemoveIndex被赋值为0
        private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
     
        public final void remove() {
            // 获取当前线程的InternalThreadLocalMap
            // 删除当前的FastThreadLocal对象及其维护的数据
            remove(InternalThreadLocalMap.getIfSet());
        }
     
        public final void remove(InternalThreadLocalMap threadLocalMap) {
            if (threadLocalMap == null) {
                return;
            }
     
            // 根据当前线程的index,并将该数组下标index位置对应的值设置为缺省值UNSET
            Object v = threadLocalMap.removeIndexedVariable(index);
            // 存储待清理的FastThreadLocal对象Set集合中删除当前FastThreadLocal对象
            removeFromVariablesToRemove(threadLocalMap, this);
     
            if (v != InternalThreadLocalMap.UNSET) {
                try {
                    // 空方法,用户可以继承实现
                    onRemoval((V) v);
                } catch (Exception e) {
                    PlatformDependent.throwException(e);
                }
            }
        }
     
        public static void removeAll() {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
            if (threadLocalMap == null) {
                return;
            }
     
            try {
                // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中
                Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
                if (v != null && v != InternalThreadLocalMap.UNSET) {
                    @SuppressWarnings("unchecked")
                    Set> variablesToRemove = (Set>) v;
                    // 遍历所有的FastThreadLocal对象并删除它们以及它们维护的数据
                    FastThreadLocal[] variablesToRemoveArray =
                            variablesToRemove.toArray(new FastThreadLocal[0]);
                    for (FastThreadLocal tlv: variablesToRemoveArray) {
                        tlv.remove(threadLocalMap);
                    }
                }
            } finally {
                // 删除InternalThreadLocalMap中threadLocalMap和slowThreadLocalMap数据
                InternalThreadLocalMap.remove();
            }
        }
     
        private static void removeFromVariablesToRemove(
                InternalThreadLocalMap threadLocalMap, FastThreadLocal variable) {
            // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
     
            if (v == InternalThreadLocalMap.UNSET || v == null) {
                return;
            }
     
            @SuppressWarnings("unchecked")
            // 存储待清理的FastThreadLocal对象Set集合中删除该FastThreadLocal对象
            Set> variablesToRemove = (Set>) v;
            variablesToRemove.remove(variable);
        }
     
        // 省略其他代码
    }

     

    public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
     
        // 根据当前线程获取InternalThreadLocalMap
           public static InternalThreadLocalMap getIfSet() {
            Thread thread = Thread.currentThread();
            if (thread instanceof FastThreadLocalThread) {
                return ((FastThreadLocalThread) thread).threadLocalMap();
            }
            return slowThreadLocalMap.get();
        }
     
        // 数组下标index位置对应的值设置为缺省值UNSET
        public Object removeIndexedVariable(int index) {
            Object[] lookup = indexedVariables;
            if (index < lookup.length) {
                Object v = lookup[index];
                lookup[index] = UNSET;
                return v;
            } else {
                return UNSET;
            }
        }
     
        // 删除threadLocalMap和slowThreadLocalMap数据
        public static void remove() {
            Thread thread = Thread.currentThread();
            if (thread instanceof FastThreadLocalThread) {
                ((FastThreadLocalThread) thread).setThreadLocalMap(null);
            } else {
                slowThreadLocalMap.remove();
            }
        }
        // 省略其他代码
    }

    源码中 remove() 方法主要分为下面2个步骤处理:

    1. 通过InternalThreadLocalMap.getIfSet()获取当前线程的InternalThreadLocalMap。具体和3.2小节get()方法里面获取当前线程的InternalThreadLocalMap相似,这里就不再重复介绍了。

    2. 删除当前的FastThreadLocal对象及其维护的数据。

    源码中 removeAll() 方法主要分为下面3个步骤处理:

    1. 通过InternalThreadLocalMap.getIfSet()获取当前线程的InternalThreadLocalMap。

    2. 取下标index为0的数据(用于存储待清理的FastThreadLocal对象Set集合),然后遍历所有的FastThreadLocal对象并删除它们以及它们维护的数据。

    3. 最后会将InternalThreadLocalMap本身从线程中移除。

    五、总结

    那么使用ThreadLocal时最佳实践又如何呢?

    每次使用完ThreadLocal实例,在线程运行结束之前的finally代码块中主动调用它的remove()方法,清除Entry中的数据,避免操作不当导致的内存泄漏。

    使⽤Netty的FastThreadLocal一定比JDK原生的ThreadLocal更快吗?

    不⼀定。当线程是FastThreadLocalThread,则添加、获取FastThreadLocal所维护数据的时间复杂度是 O(1),⽽使⽤ThreadLocal可能存在哈希冲突,相对来说使⽤FastThreadLocal更⾼效。但如果是普通线程则可能更慢。

    使⽤FastThreadLocal有哪些优点?

    正如文章开头介绍JDK原生ThreadLocal存在的缺点,FastThreadLocal全部优化了,它更⾼效、而且如果使⽤的是FastThreadLocal,它会在任务执⾏完成后主动调⽤removeAll⽅法清除数据,避免潜在的内存泄露。

  • 相关阅读:
    解决MySQL 8.0以上版本设置大小写不敏感的问题
    无向图的双连通分量算法详解 + 模板题
    RabbitMQ消息队列
    【目标检测】YOLOX训练王者荣耀数据集
    【Luogu】UVA1205 Color a Tree / SP3912 MTREECOL - Color a tree
    PHP-PHPstorm配置自动编译less
    探索古彝文的秘密,AI实现古籍传承
    记一次 .NET 某工控软件 内存泄露分析
    数据库与缓存更新一致性四种策略
    flink源码编译-job提交
  • 原文地址:https://www.cnblogs.com/vivotech/p/17774377.html