发送数据会先被缓存在ChannelOutboundBuffer类中。该类有一个Entry内部类,每次调用write写数据会构造一个Entry放入Buffer类中。使用链表结构进行存储多个Entry,每次加入到链表tail。Entry的创建使用了对象池进行循环使用。
主要属性
private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() {
@Override
public Entry newObject(Handle handle) {
return new Entry(handle);
}
});
private final Handle handle;
Entry next;
Object msg;
ByteBuffer[] bufs;
ByteBuffer buf;
ChannelPromise promise;
long progress;
long total;
int pendingSize;//大小
int count = -1;
boolean cancelled;
RECYCLER:对象池,重写了newObject方法,对象池新创建对象会使用该方法,然后将handle传给entry。handle主要在对象回收的时候进行回收操作。
next:链表下一个元素指针
msg:要写的原始数据对象
promise:发送方法的promise
主要方法
1、newInstance(Object msg, int size, long total, ChannelPromise promise)
创建entry,会调用对象池RECYCLER.get()方法获取一个实例,然后将入参设置到entry属性。
2、recyle
回收当前对象,当前对象不会被销毁。只是属性清理,然后调用handle.recycle(this)放入对象池可用队列中,供下次使用。也不会被垃圾回收调。
ChannelOutboundBuffer{
//链表标记的flush开始位置
private Entry flushedEntry;
//链表标记未flush的元素位置
private Entry unflushedEntry;
//链表尾部
private Entry tailEntry;
//标记未flush但是还未写出的entry梳理
private int flushed;
private int nioBufferCount;
private long nioBufferSize;
}
flushedEntry和unflushedEntry是两个头指针。flushedEntry标识flush开始位置,只是标记,数据还未发送。unflushedEntry标记未flush的元素位置。tailEntry当前entry链表的尾指针。
1、addMessage(Object msg, int size, ChannelPromise promise)
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
//累加缓冲区待写数据大小
incrementPendingOutboundBytes(entry.pendingSize, false);
}
添加消息,在数据写入第一步会调用该方法。这里addMessage就是构造一个entry,然后将entry放到链表尾部。如果unflushedEntry指针为空,设置unflushedEntry指向该entry。
2、addFlush()
public void addFlush() {
//将flushedEntry指向unflushedEntry
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {//循环设置每个entry的promise。我已准备要write该数据,现在不能取消了
flushed ++;
if (!entry.promise.setUncancellable()) {//设置失败,标记该entry为cancel
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
3、nioBuffers(int maxCount, long maxBytes)
//maxCount 最大buffer数量,maxBytes单个buffer最大字节数
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
long nioBufferSize = 0;
int nioBufferCount = 0;//buffer数量
//从当前线程threadlocal获取nioBuffers
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;//已flushed未写出头指针
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {//未被取消
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
//数据可读大小
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// 超过单个最大buffer最大字节数并且buffer数不为0
// See also:
// - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
// - https://linux.die.net//man/2/writev
break;
}
nioBufferSize += readableBytes;//累计buffer字节数大小
int count = entry.count;//默认值-1
if (count == -1) {
//buf是PooledByteBuf类型,默认返回1
entry.count = count = buf.nioBufferCount();
}
//是否需要扩充nioBuffers
int neededSpace = min(maxCount, nioBufferCount + count);
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (count == 1) {
//将buf放入nioBuffers中
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount++] = nioBuf;
} else {
nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
}
if (nioBufferCount >= maxCount) {//buffer数量超过最大数量
break;
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
4、removeBytes(long writtenBytes)
//writtenBytes 已经写出数据大小
public void removeBytes(long writtenBytes) {
for (;;) {
//current()= flushedEntry.msg
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;//当前buf字节大小
if (readableBytes <= writtenBytes) {//当前buf不够写出数据量
if (writtenBytes != 0) {
//更新写出进度 entry.progress = entry.progress + readableBytes
//如果promise是ChannelProgressivePromise,调用tryProgress方法。
progress(readableBytes);
writtenBytes -= readableBytes;
}
/**
移除当前entry
1、entry的promise设置为成功
2、flushedEntry 指向当前entry.next
3、对象池回收当前entry
*/
remove();
} else { // 当前buf够写出数量
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);//更新buf已读下标
progress(writtenBytes);
}
break;
}
}
//根据nioBufferCount数量,清空NIO_BUFFERS数组对应下标元素,for gc
clearNioBuffers();
}
写出后删除对应的buf