当访问共享数据时,通常需要使用同步来控制并发程序的访问。那么有没有别的方法来解决呢?当然有,那就是使得共享数据不共享了,ThreadLocal就实现了该操作。该类提供了线程局部变量,为每一个线程创建一个单独的变量副本,使得每个线程都可以独立的改变自己所拥有的变量副本,而不会影响其他线程所对应的副本,消除了竞争条件。
采用的以空间换时间的做法,在每个Thread里维护一个以开地址法实现的ThreadLocal.ThreadLocalMap,把数据隔离
线程本地存储根除了对变量的共享,每当线程访问threadLocals变量时,访问的都是各自线程自己的threadLocals变量。
public class ThreadLocalVariableHolder {
private static ThreadLocal myThreadLocal = new ThreadLocal() {
// 初始值默认为null 设置初始值为0
protected Integer initialValue() {
return 0;
}
};
public static void increment() {
myThreadLocal.set(myThreadLocal.get() + 1);
}
public static int get(){
return myThreadLocal.get();
}
public static void main(String[] args) {
// 线程池
ExecutorService executorService = Executors.newCachedThreadPool();
for(int i=0;i<3;i++){
executorService.execute(new MyThread());
}
executorService.shutdown();
}
}
public class MyThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
ThreadLocalVariableHolder.increment();
System.out.println(Thread.currentThread().getName() + ":" + ThreadLocalVariableHolder.get());
Thread.yield();
}
}
}
输出结果
pool-1-thread-1:1
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:2
pool-1-thread-2:2
pool-1-thread-3:2
pool-1-thread-1:3
pool-1-thread-2:3
pool-1-thread-3:3
可以看到ThreadLocal把不同线程的数据进行了隔离,互不影响
来看一下ThreadLocal是如何实现的吧
Thread类
// 线程的本地变量存储在线程的threadLocals变量中,并不是存储在ThreadLocal实例中
// 每一个线程都有一个自己的ThreadLocalMap,ThreadLocalMap类似于Map,key为ThreadLocal对象,value为存储的值,所以一个ThreadLocalMap可以存储多个ThreadLocal
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
提供了四个方法:
get() 返回此线程局部变量的当前线程副本中的值
public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程实例的threadLocals
ThreadLocalMap map = getMap(t);
// 不为空
if (map != null) {
// 根据当前的ThreadLocal对象引用来取值
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 为空则设置key为当前的ThreadLocal对象,value为initialValue设置的初始值
return setInitialValue();
}
initialValue() 返回此线程局部变量当前线程的初始值,当线程第一次调用get()或set()方法时调用,并且只调用一次
remove() 移除此线程局部变量当前线程的值
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
set(T value) 将此线程局部变量的当前线程副本中的值设置为指定值
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取该线程实例对象的threadLocals变量 getMap方法 return t.threadLocals;
ThreadLocalMap map = getMap(t);
// 不为空,key为当前的ThreadLocal对象引用,value为所存储的值
if (map != null)
map.set(this, value);
else
// 为空,则为threadLocals实例化对象
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
还有一个静态内部类 ThreadLocalMap 提供了一种用键值对方式存储每一个线程的变量副本的方法,key为当前的ThreadLocal对象,value为对象线程的变量副本
public class MyThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
ThreadLocalVariableHolder.increment();
System.out.println(Thread.currentThread().getName() + ":" + ThreadLocalVariableHolder.get());
Thread.yield();
}
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal myThreadLocal = new ThreadLocal() {
// 初始值默认为null 设置初始值为0
protected Integer initialValue() {
return 0;
}
};
public static void increment() {
myThreadLocal.set(myThreadLocal.get() + 1);
}
public static int get(){
return myThreadLocal.get();
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
executorService.execute(new MyThread());
}
executorService.shutdown();
}
}
执行结果
pool-1-thread-1:1
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-3:2
pool-1-thread-2:2
pool-1-thread-1:2
pool-1-thread-2:3
pool-1-thread-3:3
pool-1-thread-4:1
pool-1-thread-1:3
pool-1-thread-4:2
pool-1-thread-4:3
pool-1-thread-5:1
pool-1-thread-5:2
pool-1-thread-5:3
存在内存泄露问题,每次使用完ThreadLocal,都调用它的remove()方法,清除数据
作为数据真正存储的位置,ThreadLocalMap对于ThreadLocal还是相当重要的
// 初始容量,该容量必须是2的次幂
private static final int INITIAL_CAPACITY = 16;
// 存储数据的Entry数组
private Entry[] table;
// entry的数量
private int size = 0;
// 阈值
private int threshold;
// Entry中key为ThreadLocal,value为值
// key是一个弱引用
// 每个线程在往ThreadLocal中放值的时候,都是放入线程本身的ThreadLocalMap中,key是ThreadLocal,从而实现了线程隔离
// 只要发生了垃圾回收,且该对象没有强引用存在的话,弱引用就会被回收
static class Entry extends WeakReference<ThreadLocal>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal> k, Object v) {
super(k);
value = v;
}
}
ThreadLocalMap(ThreadLocal> firstKey, Object firstValue) {
// 初始化
table = new Entry[INITIAL_CAPACITY];
// 使用key的哈希值与上15 计算数组下标
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 初始化该节点
table[i] = new Entry(firstKey, firstValue);
size = 1;
// 设置阈值
setThreshold(INITIAL_CAPACITY);
}
存储数据
private void set(ThreadLocal> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
// 计算数组下标
int i = key.threadLocalHashCode & (len-1);
// 如果出现hash冲突,会采用线性探测,如果当前位置有值,则会获取下一个索引来进行存储,该结构是一个环形
// 直到找到空闲位置为止
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) { // 槽位不为空,进行线性探测
ThreadLocal> k = e.get();
// key值与当前key值相等,直接进行覆盖
if (k == key) {
e.value = value;
return;
}
// Entry不为null,但是key为null,当前位置的key已经为null(失效了),则进行替换过期数据
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
// 该循环完成,说明既没有找到原本存储key的位置,也没有找到失效的位置
}
// 执行到这里说明查找到了entry为null的位置
tab[i] = new Entry(key, value);
int sz = ++size;
// 没有清除掉失效的槽位,并且当前数量已经达到了阈值,则进行扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
// 获取下一个数组下标位置
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
// 替换掉失效的值 staleSlot为失效的槽位
private void replaceStaleEntry(ThreadLocal> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
int slotToExpunge = staleSlot;
// 向前扫描,直到找到无效的槽位
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
// 找到无效的槽位,slotToExpunge复为当前位置
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
// 向后扫描,找到失效的槽位
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
// 找到key,替换掉无效的槽位中的值
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
// 向前扫描时没有找到无效的槽位,将slotToExpunge设为当前位置
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 从slotToExpunge开始清理槽位
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
// 如果当前的槽位已经无效,并且向前扫描时没有找到无效的槽位,将slotToExpunge设为当前位置
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 没有找到对应的key新增加一个entry
// If key not found, put new entry in stale slot
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
// 扫描过程中存在无效的槽位,进行清理
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
// 清理无效的槽位
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
// key已失效
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
// 去掉对value的引用
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal> k = e.get();
// key为null,去掉对value的引用
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
// 进行扩容操作
private void rehash() {
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
if (size >= threshold - threshold / 4)
resize();
}
// 扩容扩大2倍
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
获取Entry值
private Entry getEntry(ThreadLocal> key) {
// 计算数组下标
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
// 没有命中
// 在存放数据的时候采用的是开方定址法,所以可能存在当前key的散列值和元素所在索引并不完全对应的情况
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal> k = e.get();
if (k == key)
return e;
if (k == null) // key等于null,说明该key已经失效了,进行回收,可以有效的避免内存泄漏
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
private void remove(ThreadLocal> key) {
Entry[] tab = table;
int len = tab.length;
// 计算数组下标
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
// 找到对应的key
if (e.get() == key) {
// 清除对ThreadLocal的弱引用
e.clear();
// 清理key为null的元素
expungeStaleEntry(i);
return;
}
}
}
InheritableThreadLocal是ThreadLocal的子类,该类提供了一个特性,可以让子线程访问在父线程中设置的本地变量
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue) {
return parentValue;
}
// 获取ThreadLocalMap时获取的是该线程中的inheritableThreadLocals变量
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
// 创建ThreadLocalMap时,使用的是inheritableThreadLocals变量而不是threadLocals变量
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
如果一个线程是从其他某个线程中创建的,这个类将提供继承的值,如果一个线程A在线程局部变量中已有值,那么当线程A创建其他某个线程B时,线程B的线程局部变量将跟线程A是一样的,可以重写childValue()方法,该方法用来初始化子线程在线程局部变量中的值,使用父线程在线程局部变量中的值作为传入参数
// Thread实例化时的初始化过程
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 省略无关代码
// 当前线程
Thread parent = currentThread();
// inheritThreadLocals为true 且 父线程的inheritableThreadLocals不为null
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 则设置子线程的inheritableThreadLocals,值为父线程的inheritableThreadLocals的值复制而来
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
javaweb中就是使用这种方式来使得子线程可以获取请求信息的
RequestContextHolder.setRequestAttributes(requestAttributes,true);
// ThreadLocal中的Entry
static class Entry extends WeakReference<ThreadLocal>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal> k, Object v) {
super(k);
value = v;
}
}
在ThreadLocal.ThreadLocalMap中的key为ThreadLocal对象实例,这个Map中的key是一个弱引用,当把ThreadLocal对象实例置为null后,没有任何强引用指向ThreadLocal对象实例,所以ThreadLocal对象实例会被gc回收,但是Map中的value却不会被回收(此时entry是一个key为null,但是value不为null),因为存在一条从当前thread连接过来的强引用,只有当前thread结束之后,当前thread的强引用才会断开,此时Map中的value才会被gc回收。
但是在使用线程池的时候,由于线程是重复利用的,不会被回收,所以就可能出现内存泄漏
当然JDK的设计中也有考虑到这点,所以在get()、set()、remove()中会扫描key为null的Entry,将对应的value也设置为null,这样value就会被回收
所以当使用完之后,需要调用ThreadLocal对象的remove()方法来删除掉
本文由 mdnice 多平台发布