• 你还不知道ThreadLocal线程本地存储吗


    ThreadLocal线程本地存储

    当访问共享数据时,通常需要使用同步来控制并发程序的访问。那么有没有别的方法来解决呢?当然有,那就是使得共享数据不共享了,ThreadLocal就实现了该操作。该类提供了线程局部变量,为每一个线程创建一个单独的变量副本,使得每个线程都可以独立的改变自己所拥有的变量副本,而不会影响其他线程所对应的副本,消除了竞争条件。

    采用的以空间换时间的做法,在每个Thread里维护一个以开地址法实现的ThreadLocal.ThreadLocalMap,把数据隔离
    线程本地存储根除了对变量的共享,每当线程访问threadLocals变量时,访问的都是各自线程自己的threadLocals变量。

    public class ThreadLocalVariableHolder {
        private static ThreadLocal myThreadLocal = new ThreadLocal() {
            // 初始值默认为null 设置初始值为0
            protected Integer initialValue() {
                return 0;
            }
        };

        public static void increment() {
            myThreadLocal.set(myThreadLocal.get() + 1);
        }

        public static int get(){
            return myThreadLocal.get();
        }

        public static void main(String[] args) {
            // 线程池
            ExecutorService executorService = Executors.newCachedThreadPool();
            for(int i=0;i<3;i++){
                executorService.execute(new MyThread());
            }
            executorService.shutdown();
        }
    }

    public class MyThread implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                ThreadLocalVariableHolder.increment();
                System.out.println(Thread.currentThread().getName() + ":" + ThreadLocalVariableHolder.get());
                Thread.yield();
            }
        }
    }
    • 1

    输出结果

    pool-1-thread-1:1
    pool-1-thread-2:1
    pool-1-thread-3:1
    pool-1-thread-1:2
    pool-1-thread-2:2
    pool-1-thread-3:2
    pool-1-thread-1:3
    pool-1-thread-2:3
    pool-1-thread-3:3
    • 1

    可以看到ThreadLocal把不同线程的数据进行了隔离,互不影响

    来看一下ThreadLocal是如何实现的吧

    Thread类

    // 线程的本地变量存储在线程的threadLocals变量中,并不是存储在ThreadLocal实例中
    // 每一个线程都有一个自己的ThreadLocalMap,ThreadLocalMap类似于Map,key为ThreadLocal对象,value为存储的值,所以一个ThreadLocalMap可以存储多个ThreadLocal

    ThreadLocal.ThreadLocalMap threadLocals = null;

    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    • 1

    ThreadLocal源码逻辑

    • ThreadLocal对象通过Thread.currentThread()获取当前Thread对象
    • 当前Thread获取对象内部持有的ThreadLocalMap容器
    • 从ThreadLocalMap容器中用ThreadLocal对象作为key,操作当前Thread中的变量副本

    提供了四个方法:

    • get() 返回此线程局部变量的当前线程副本中的值

      public T get() {
         // 获取当前线程
          Thread t = Thread.currentThread();
         // 获取当前线程实例的threadLocals
          ThreadLocalMap map = getMap(t);
         // 不为空
          if (map != null) {
             // 根据当前的ThreadLocal对象引用来取值
              ThreadLocalMap.Entry e = map.getEntry(this);
              if (e != null) {
                  @SuppressWarnings("unchecked")
                  T result = (T)e.value;
                  return result;
              }
          }
         // 为空则设置key为当前的ThreadLocal对象,value为initialValue设置的初始值
          return setInitialValue();
      }
      • 1
    • initialValue() 返回此线程局部变量当前线程的初始值,当线程第一次调用get()或set()方法时调用,并且只调用一次

    • remove() 移除此线程局部变量当前线程的值

    public void remove() {
        ThreadLocalMap m = getMap(Thread.currentThread());
        if (m != null)
            m.remove(this);
    }
    • 1
    • set(T value) 将此线程局部变量的当前线程副本中的值设置为指定值

      public void set(T value) {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 获取该线程实例对象的threadLocals变量   getMap方法 return t.threadLocals;
        ThreadLocalMap map = getMap(t);
        // 不为空,key为当前的ThreadLocal对象引用,value为所存储的值
        if (map != null)
          map.set(this, value);
        else
          // 为空,则为threadLocals实例化对象
          createMap(t, value);
      }

      ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
      }

      void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
      }
      • 1
    • 还有一个静态内部类 ThreadLocalMap 提供了一种用键值对方式存储每一个线程的变量副本的方法,key为当前的ThreadLocal对象,value为对象线程的变量副本

    public class MyThread implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                ThreadLocalVariableHolder.increment();
                System.out.println(Thread.currentThread().getName() + ":" + ThreadLocalVariableHolder.get());
                Thread.yield();
            }
        }
    }

    public class ThreadLocalVariableHolder {
        private static ThreadLocal myThreadLocal = new ThreadLocal() {
            // 初始值默认为null 设置初始值为0
            protected Integer initialValue() {
                return 0;
            }
        };

        public static void increment() {
            myThreadLocal.set(myThreadLocal.get() + 1);
        }

        public static int get(){
            return myThreadLocal.get();
        }

        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for(int i=0;i<5;i++){
                executorService.execute(new MyThread());
            }
            executorService.shutdown();
        }
    }
    • 1

    执行结果

    pool-1-thread-1:1
    pool-1-thread-2:1
    pool-1-thread-3:1
    pool-1-thread-3:2
    pool-1-thread-2:2
    pool-1-thread-1:2
    pool-1-thread-2:3
    pool-1-thread-3:3
    pool-1-thread-4:1
    pool-1-thread-1:3
    pool-1-thread-4:2
    pool-1-thread-4:3
    pool-1-thread-5:1
    pool-1-thread-5:2
    pool-1-thread-5:3
    • 1

    存在内存泄露问题,每次使用完ThreadLocal,都调用它的remove()方法,清除数据

    ThreadLocalMap源码逻辑

    作为数据真正存储的位置,ThreadLocalMap对于ThreadLocal还是相当重要的

    // 初始容量,该容量必须是2的次幂
    private static final int INITIAL_CAPACITY = 16;
    // 存储数据的Entry数组
    private Entry[] table;
    // entry的数量
    private int size = 0;
    // 阈值
    private int threshold; 

    // Entry中key为ThreadLocal,value为值
    // key是一个弱引用
    // 每个线程在往ThreadLocal中放值的时候,都是放入线程本身的ThreadLocalMap中,key是ThreadLocal,从而实现了线程隔离
    // 只要发生了垃圾回收,且该对象没有强引用存在的话,弱引用就会被回收
    static class Entry extends WeakReference<ThreadLocal{
      /** The value associated with this ThreadLocal. */
      Object value;

      Entry(ThreadLocal k, Object v) {
        super(k);
        value = v;
      }
    }
    • 1
    构造函数
    ThreadLocalMap(ThreadLocal firstKey, Object firstValue) {
      // 初始化
        table = new Entry[INITIAL_CAPACITY];
      // 使用key的哈希值与上15 计算数组下标
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
      // 初始化该节点
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
      // 设置阈值
        setThreshold(INITIAL_CAPACITY);
    }
    • 1
    重要方法
    set方法

    存储数据

    private void set(ThreadLocal key, Object value) {

        // We don't use a fast path as with get() because it is at
        // least as common to use set() to create new entries as
        // it is to replace existing ones, in which case, a fast
        // path would fail more often than not.

        Entry[] tab = table;
        int len = tab.length;
      // 计算数组下标
        int i = key.threadLocalHashCode & (len-1);
    // 如果出现hash冲突,会采用线性探测,如果当前位置有值,则会获取下一个索引来进行存储,该结构是一个环形
      // 直到找到空闲位置为止
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) { // 槽位不为空,进行线性探测
            ThreadLocal k = e.get();
       // key值与当前key值相等,直接进行覆盖
            if (k == key) {
                e.value = value;
                return;
            }
       // Entry不为null,但是key为null,当前位置的key已经为null(失效了),则进行替换过期数据
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
          // 该循环完成,说明既没有找到原本存储key的位置,也没有找到失效的位置
        }
    // 执行到这里说明查找到了entry为null的位置
        tab[i] = new Entry(key, value);
        int sz = ++size;
      // 没有清除掉失效的槽位,并且当前数量已经达到了阈值,则进行扩容
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }

    // 获取下一个数组下标位置
     private static int nextIndex(int i, int len) {
       return ((i + 1 < len) ? i + 1 : 0);
     }

    // 替换掉失效的值  staleSlot为失效的槽位
    private void replaceStaleEntry(ThreadLocal key, Object value,
                                           int staleSlot)
     
    {
      Entry[] tab = table;
      int len = tab.length;
      Entry e;

      // Back up to check for prior stale entry in current run.
      // We clean out whole runs at a time to avoid continual
      // incremental rehashing due to garbage collector freeing
      // up refs in bunches (i.e., whenever the collector runs).
      int slotToExpunge = staleSlot;
      // 向前扫描,直到找到无效的槽位
      for (int i = prevIndex(staleSlot, len);
           (e = tab[i]) != null;
           i = prevIndex(i, len))
        // 找到无效的槽位,slotToExpunge复为当前位置
        if (e.get() == null)
          slotToExpunge = i;

      // Find either the key or trailing null slot of run, whichever
      // occurs first
      // 向后扫描,找到失效的槽位
      for (int i = nextIndex(staleSlot, len);
           (e = tab[i]) != null;
           i = nextIndex(i, len)) {
        ThreadLocal k = e.get();

        // If we find key, then we need to swap it
        // with the stale entry to maintain hash table order.
        // The newly stale slot, or any other stale slot
        // encountered above it, can then be sent to expungeStaleEntry
        // to remove or rehash all of the other entries in run.
        // 找到key,替换掉无效的槽位中的值
        if (k == key) {
          e.value = value;

          tab[i] = tab[staleSlot];
          tab[staleSlot] = e;

          // Start expunge at preceding stale entry if it exists
          // 向前扫描时没有找到无效的槽位,将slotToExpunge设为当前位置
          if (slotToExpunge == staleSlot)
            slotToExpunge = i;
          // 从slotToExpunge开始清理槽位
          cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
          return;
        }

        // If we didn't find stale entry on backward scan, the
        // first stale entry seen while scanning for key is the
        // first still present in the run.
        // 如果当前的槽位已经无效,并且向前扫描时没有找到无效的槽位,将slotToExpunge设为当前位置
        if (k == null && slotToExpunge == staleSlot)
          slotToExpunge = i;
      }

      // 没有找到对应的key新增加一个entry
      // If key not found, put new entry in stale slot
      tab[staleSlot].value = null;
      tab[staleSlot] = new Entry(key, value);

      // If there are any other stale entries in run, expunge them
      // 扫描过程中存在无效的槽位,进行清理
      if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }

    // 清理无效的槽位
    private boolean cleanSomeSlots(int i, int n) {
      boolean removed = false;
      Entry[] tab = table;
      int len = tab.length;
      do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        // key已失效
        if (e != null && e.get() == null) {
          n = len;
          removed = true;
          i = expungeStaleEntry(i);
        }
      } while ( (n >>>= 1) != 0);
      return removed;
    }

    private int expungeStaleEntry(int staleSlot) {
      Entry[] tab = table;
      int len = tab.length;

      // expunge entry at staleSlot
      // 去掉对value的引用
      tab[staleSlot].value = null;
      tab[staleSlot] = null;
      size--;

      // Rehash until we encounter null
      Entry e;
      int i;
      for (i = nextIndex(staleSlot, len);
           (e = tab[i]) != null;
           i = nextIndex(i, len)) {
        ThreadLocal k = e.get();
        // key为null,去掉对value的引用
        if (k == null) {
          e.value = null;
          tab[i] = null;
          size--;
        } else {
          int h = k.threadLocalHashCode & (len - 1);
          if (h != i) {
            tab[i] = null;

            // Unlike Knuth 6.4 Algorithm R, we must scan until
            // null because multiple entries could have been stale.
            while (tab[h] != null)
              h = nextIndex(h, len);
            tab[h] = e;
          }
        }
      }
      return i;
    }

    // 进行扩容操作
    private void rehash() {
      expungeStaleEntries();

      // Use lower threshold for doubling to avoid hysteresis
      if (size >= threshold - threshold / 4)
        resize();
    }
    // 扩容扩大2倍
    private void resize() {
      Entry[] oldTab = table;
      int oldLen = oldTab.length;
      int newLen = oldLen * 2;
      Entry[] newTab = new Entry[newLen];
      int count = 0;

      for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
          ThreadLocal k = e.get();
          if (k == null) {
            e.value = null// Help the GC
          } else {
            int h = k.threadLocalHashCode & (newLen - 1);
            while (newTab[h] != null)
              h = nextIndex(h, newLen);
            newTab[h] = e;
            count++;
          }
        }
      }

      setThreshold(newLen);
      size = count;
      table = newTab;
    }
    • 1
    getEntry方法

    获取Entry值

    private Entry getEntry(ThreadLocal key) {
      // 计算数组下标
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        if (e != null && e.get() == key)
            return e;
        else
          // 没有命中
          // 在存放数据的时候采用的是开方定址法,所以可能存在当前key的散列值和元素所在索引并不完全对应的情况
            return getEntryAfterMiss(key, i, e);
    }

    private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) {
      Entry[] tab = table;
      int len = tab.length;

      while (e != null) {
        ThreadLocal k = e.get();
        if (k == key)
          return e;
        if (k == null// key等于null,说明该key已经失效了,进行回收,可以有效的避免内存泄漏
          expungeStaleEntry(i);
        else
          i = nextIndex(i, len);
        e = tab[i];
      }
      return null;
    }
    • 1
    remove方法
    private void remove(ThreadLocal key) {
        Entry[] tab = table;
        int len = tab.length;
      // 计算数组下标
        int i = key.threadLocalHashCode & (len-1);
      
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
          // 找到对应的key
            if (e.get() == key) {
              // 清除对ThreadLocal的弱引用
                e.clear();
              // 清理key为null的元素
                expungeStaleEntry(i);
                return;
            }
        }
    }
    • 1

    InheritableThreadLocal类解决ThreadLocal继承性问题

    InheritableThreadLocal是ThreadLocal的子类,该类提供了一个特性,可以让子线程访问在父线程中设置的本地变量

    public class InheritableThreadLocal<Textends ThreadLocal<T{
        
        protected T childValue(T parentValue) {
            return parentValue;
        }

        // 获取ThreadLocalMap时获取的是该线程中的inheritableThreadLocals变量
        ThreadLocalMap getMap(Thread t) {
           return t.inheritableThreadLocals;
        }

        // 创建ThreadLocalMap时,使用的是inheritableThreadLocals变量而不是threadLocals变量
        void createMap(Thread t, T firstValue) {
            t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
        }
    }
    • 1

    如果一个线程是从其他某个线程中创建的,这个类将提供继承的值,如果一个线程A在线程局部变量中已有值,那么当线程A创建其他某个线程B时,线程B的线程局部变量将跟线程A是一样的,可以重写childValue()方法,该方法用来初始化子线程在线程局部变量中的值,使用父线程在线程局部变量中的值作为传入参数

    // Thread实例化时的初始化过程
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals)
     
    {
        // 省略无关代码
      
       // 当前线程
        Thread parent = currentThread();
        
        // inheritThreadLocals为true 且 父线程的inheritableThreadLocals不为null
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
           // 则设置子线程的inheritableThreadLocals,值为父线程的inheritableThreadLocals的值复制而来
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        
    }
    • 1

    javaweb中就是使用这种方式来使得子线程可以获取请求信息的

    RequestContextHolder.setRequestAttributes(requestAttributes,true);
    • 1

    内存泄漏问题

    // ThreadLocal中的Entry
    static class Entry extends WeakReference<ThreadLocal{
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal k, Object v) {
            super(k);
            value = v;
        }
    }
    • 1
    ThreadLocal内存泄漏
    ThreadLocal内存泄漏

    在ThreadLocal.ThreadLocalMap中的key为ThreadLocal对象实例,这个Map中的key是一个弱引用,当把ThreadLocal对象实例置为null后,没有任何强引用指向ThreadLocal对象实例,所以ThreadLocal对象实例会被gc回收,但是Map中的value却不会被回收(此时entry是一个key为null,但是value不为null),因为存在一条从当前thread连接过来的强引用,只有当前thread结束之后,当前thread的强引用才会断开,此时Map中的value才会被gc回收。

    但是在使用线程池的时候,由于线程是重复利用的,不会被回收,所以就可能出现内存泄漏

    当然JDK的设计中也有考虑到这点,所以在get()、set()、remove()中会扫描key为null的Entry,将对应的value也设置为null,这样value就会被回收

    所以当使用完之后,需要调用ThreadLocal对象的remove()方法来删除掉

    https://zhhll.icu/2020/多线程/基础/9.ThreadLocal/

    本文由 mdnice 多平台发布

  • 相关阅读:
    汽车电子行业入门指南「知识体系」
    数学建模 导航
    理解多态,让不同的“人”做同一件事情会产生不同的结果
    浅谈安防视频监控平台EasyCVR视频汇聚平台对于夏季可视化智能溺水安全告警平台的重要性
    MySQL——索引
    java后端开发热部署工具Springloaded
    数据结构【二叉搜索树、leetcode刷题】
    Rails 中的布局和渲染
    99%的时间里使用的14个git命令
    centos7平台安装部署mysql8(.tar包形式)
  • 原文地址:https://blog.csdn.net/Lxn2zh/article/details/136700468