对象池和我们的连接池一样就是对象放入一个池中循环使用。特别是在netty创建ByteBuf的时候buf循环使用大大减小了频繁创建对象,垃圾收集的压力。特别是在使用直接内存的时候。
netty的对象池对象 RecyclerObjectPool extends ObjectPool。RecyclerObjectPool只是外层抽象,实际对象池由其内部的Recycler对象来维护管理。对象池有两个重要操作对象的获取和回收,下面就从Recycler看起对象池。
Recycler.get()
public final T get() {
//recycler.get-1
if (maxCapacityPerThread == 0) {//每个线程最大容量
return newObject((Handle) NOOP_HANDLE);
}
//recycler.get-2
LocalPool localPool = threadLocal.get();
DefaultHandle handle = localPool.claim();
T obj;
if (handle == null) {
handle = localPool.newHandle();
if (handle != null) {
obj = newObject(handle);
handle.set(obj);
} else {
obj = newObject((Handle) NOOP_HANDLE);
}
} else {
obj = handle.get();
}
return obj;
}
get方法首先判断maxCapacityPerThread 每个线程对象最大容量,这个值的初始化在Recycler静态块里
static {
// In the future, we might have different maxCapacity for different object types.
// e.g. io.netty.recycler.maxCapacity.writeTask
// io.netty.recycler.maxCapacity.outboundBuffer
int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
if (maxCapacityPerThread < 0) {
maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
}
}
会读取"io.netty.recycler.maxCapacityPerThread"和"io.netty.recycler.maxCapacity"配置,如果都没有才哦那个
DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD变量值,该静态变量初始值是4 * 1024。所以这里一般不为0.
通过threadLocal变量get获取localPool。
LocalPool localPool = threadLocal.get();
这里threadLocal变量不是直接jdk里的ThreadLocal实例,是进行了一层包装
private final FastThreadLocal> threadLocal = new FastThreadLocal>() {
@Override
protected LocalPool initialValue() {
return new LocalPool(maxCapacityPerThread, interval, chunkSize);
}
@Override
protected void onRemoval(LocalPool value) throws Exception {
super.onRemoval(value);
MessagePassingQueue> handles = value.pooledHandles;
value.pooledHandles = null;
handles.clear();
}
};
threadLocal是FastThreadLocal类型。其内部真正ThreadLocal是 ThreadLocal类型。InternalThreadLocalMap是存储线程变量实际对象。其内部有一个Object[] indexedVariables数组用来存储数据。
FastThreadLocal.get方法
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
//按当前FastThreadLocal的index从InternalThreadLocalMap获取变量值
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
//初始化线程变量
return initialize(threadLocalMap);
}
首先从threadlocal里获取InternalThreadLocalMap,然后按照当前FastThreadLocal的索引获取其对应的数据对象。如果没有获取到则调用initialize方法初始化数据对象。这里initialize方法在上面new FastThreadLocal时候进行重写了,最后是执行new LocalPoo()新建了一个LocalPool。
FastThreadLocal.index变量是在构造方法里初始化的
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
InternalThreadLocalMap里有一个AtomicInteger类型的变量用来获取唯一的index。这样最后每个FastThreadLocal有唯一一个索引,InternalThreadLocalMap内有一个Object[] 用来存储每个FastThreadLocal对应的线程变量。
InternalThreadLocalMap类主要结构:
class InternalThreadLocalMap{
//真正的threadlocal变量
ThreadLocal slowThreadLocalMap =
new ThreadLocal();
//fastThreadlocal获取唯一index
AtomicInteger nextIndex = new AtomicInteger();
//存储每个fastThreadLocal.index对应的变量
Object[] indexedVariables;
}
看一下LocalPool的构造方法
LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
this.ratioInterval = ratioInterval;
if (BLOCKING_POOL) {
pooledHandles = new BlockingMessageQueue>(maxCapacity);
} else {
pooledHandles = (MessagePassingQueue>) newMpscQueue(chunkSize, maxCapacity);
}
ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
}
这里看到pooledHandles是一个队列类型。根据是否是阻塞对象池创建BlockingMessageQueue或MessagePassingQueue类型队列实例。队列最大容量就是我们最开始看的maxCapacityPerThread。
然后掉用队列的claim方法去获取一个handle
claim方法:
DefaultHandle claim() {
MessagePassingQueue> handles = pooledHandles;
if (handles == null) {
return null;
}
DefaultHandle handle;
do {
handle = handles.relaxedPoll();
} while (handle != null && !handle.availableToClaim());
return handle;
}
循环从队列中获取一个handle,handle可用条件:handle不为空且当前状态也不是claim的。 这时候队列是空获取handle为空,接着localPool.newHandle()创建一个handle。没有什么特殊除了,就是创建了一个DefaultHandle实例,将localPool引用传给handle
DefaultHandle(LocalPool localPool) {
this.localPool = localPool;
}
然后会调用recycler.newObject()方法去创建对象。newObject是一个抽象方法,在创建recycler实例的时候要实现该方法。也就是我们在这个方法里定义自己对象实例化方法。其有一个入参handle,由handle可以获取localPool的引用,后面我们就可以当前对象实例执行对象回收。
通过newObject方法创建实例对象后,在返回该实例前,还会调用handle.set(obj)方法将对象放到handle里,方便下次直接从handle里get对象。
到这里对象创建了,但是对象池里好像还没有保留对象的引用,这个需要继续看对象的回收。
对象的回收通过handle.recylce()方法来执行。上面newObject方法创建对象实例的时候入参会传入handle,所以可以通过对象实例本身发起对象的回收。同时我们可以将当前必要的属性值进行清理以满足下次重复使用。
handle.recycle方法
public void recycle(Object object) {
if (object != value) {//判断当前对象和set的值是否相等
throw new IllegalArgumentException("object does not belong to handle");
}
//调用localPool release方法回收对象
localPool.release(this);
}
release方法:
void release(DefaultHandle handle) {
MessagePassingQueue> handles = pooledHandles;
handle.toAvailable();//修改状态标识
if (handles != null) {
//handle入队
handles.relaxedOffer(handle);
}
}
release方法真正将handle入队,也就是对象放入线程池。
这里看到和我们的数据库连接池稍微不同是,不是先初始化几个连接放到池子里。而是先创建使用,然后回收的时候才入池。
这里总结一下对象属性关系:
对象池存放在threadLocal变量里,是一个LocalPool对象实例。LocalPool内部使用一个Queue来保存对象。Queue不是直接保留的对象实例,是保留的Handle实例。
ThreadLocal>
}>
大致关系如上
1、定义一个简单对象
private static class User {
private String name;
private Recycler.Handle handle;
public User(Recycler.Handle handle){
this.handle = handle;
}
public void recycle(){
this.name = null;
//TODO 对象必要清理
handle.recycle(this);
}
}
这里我们用一个属性来保留handle的引用,主要是是在recycle里调起回收方法。
2、Recycler创建
static final Recycler recycler = new Recycler() {
@Override
protected User newObject(Handle handle) {
return new User(handle);
}
};
重写其newObject方法。
3、对象池使用
User user1 = recycler.get();
user1.recycle();
User user2 = recycler.get();
System.out.println(user1 == user2);
这里可以看到user1和user2是同一个实例引用。