• netty数据缓冲区之ChannelOutboundBuffer


    发送数据会先被缓存在ChannelOutboundBuffer类中。该类有一个Entry内部类,每次调用write写数据会构造一个Entry放入Buffer类中。使用链表结构进行存储多个Entry,每次加入到链表tail。Entry的创建使用了对象池进行循环使用。

    Entry

    主要属性

    private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() {
        @Override
        public Entry newObject(Handle handle) {
            return new Entry(handle);
        }
    });
    
    private final Handle handle;
    Entry next;
    Object msg;
    ByteBuffer[] bufs;
    ByteBuffer buf;
    ChannelPromise promise;
    long progress;
    long total;
    int pendingSize;//大小
    int count = -1;
    boolean cancelled;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    RECYCLER:对象池,重写了newObject方法,对象池新创建对象会使用该方法,然后将handle传给entry。handle主要在对象回收的时候进行回收操作。

    next:链表下一个元素指针

    msg:要写的原始数据对象

    promise:发送方法的promise

    主要方法

    1、newInstance(Object msg, int size, long total, ChannelPromise promise)

    创建entry,会调用对象池RECYCLER.get()方法获取一个实例,然后将入参设置到entry属性。

    2、recyle

    回收当前对象,当前对象不会被销毁。只是属性清理,然后调用handle.recycle(this)放入对象池可用队列中,供下次使用。也不会被垃圾回收调。

    主要属性

    ChannelOutboundBuffer{
      //链表标记的flush开始位置
      private Entry flushedEntry;
      //链表标记未flush的元素位置
      private Entry unflushedEntry;
      //链表尾部
      private Entry tailEntry;
      //标记未flush但是还未写出的entry梳理
      private int flushed;
    
      private int nioBufferCount;
      private long nioBufferSize;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    flushedEntry和unflushedEntry是两个头指针。flushedEntry标识flush开始位置,只是标记,数据还未发送。unflushedEntry标记未flush的元素位置。tailEntry当前entry链表的尾指针。

    主要方法

    1、addMessage(Object msg, int size, ChannelPromise promise)

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
        //累加缓冲区待写数据大小
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    添加消息,在数据写入第一步会调用该方法。这里addMessage就是构造一个entry,然后将entry放到链表尾部。如果unflushedEntry指针为空,设置unflushedEntry指向该entry。

    2、addFlush()

    public void addFlush() {
        //将flushedEntry指向unflushedEntry
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                flushedEntry = entry;
            }
            do {//循环设置每个entry的promise。我已准备要write该数据,现在不能取消了
                flushed ++;
                if (!entry.promise.setUncancellable()) {//设置失败,标记该entry为cancel
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);
    
            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3、nioBuffers(int maxCount, long maxBytes)

    //maxCount 最大buffer数量,maxBytes单个buffer最大字节数
    public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
        long nioBufferSize = 0;
        int nioBufferCount = 0;//buffer数量
        //从当前线程threadlocal获取nioBuffers
        final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
        Entry entry = flushedEntry;//已flushed未写出头指针
        while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
            if (!entry.cancelled) {//未被取消
                ByteBuf buf = (ByteBuf) entry.msg;
                final int readerIndex = buf.readerIndex();
                //数据可读大小
                final int readableBytes = buf.writerIndex() - readerIndex;
    
                if (readableBytes > 0) {
                    if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
                        // 超过单个最大buffer最大字节数并且buffer数不为0
                        // See also:
                        // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
                        // - https://linux.die.net//man/2/writev
                        break;
                    }
                    nioBufferSize += readableBytes;//累计buffer字节数大小
                    int count = entry.count;//默认值-1
                    if (count == -1) {
                        //buf是PooledByteBuf类型,默认返回1
                        entry.count = count = buf.nioBufferCount();
                    }
                    //是否需要扩充nioBuffers
                    int neededSpace = min(maxCount, nioBufferCount + count);
                    if (neededSpace > nioBuffers.length) {
                        nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                        NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                    }
                    if (count == 1) {
                      //将buf放入nioBuffers中
                        ByteBuffer nioBuf = entry.buf;
                        if (nioBuf == null) {
                            entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                        }
                        nioBuffers[nioBufferCount++] = nioBuf;
                    } else {
                        nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
                    }
                    if (nioBufferCount >= maxCount) {//buffer数量超过最大数量
                        break;
                    }
                }
            }
            entry = entry.next;
        }
        this.nioBufferCount = nioBufferCount;
        this.nioBufferSize = nioBufferSize;
    
        return nioBuffers;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    4、removeBytes(long writtenBytes)

    //writtenBytes 已经写出数据大小
    public void removeBytes(long writtenBytes) {
        for (;;) {
            //current()= flushedEntry.msg 
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }
    
            final ByteBuf buf = (ByteBuf) msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;//当前buf字节大小
    
            if (readableBytes <= writtenBytes) {//当前buf不够写出数据量
                if (writtenBytes != 0) {
                    //更新写出进度 entry.progress = entry.progress + readableBytes
                    //如果promise是ChannelProgressivePromise,调用tryProgress方法。
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                /**
                 移除当前entry
                 1、entry的promise设置为成功
                 2、flushedEntry 指向当前entry.next
                 3、对象池回收当前entry
                */
                remove();
            } else { // 当前buf够写出数量
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);//更新buf已读下标
                    progress(writtenBytes);
                }
                break;
            }
        }
        //根据nioBufferCount数量,清空NIO_BUFFERS数组对应下标元素,for gc
        clearNioBuffers();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    写出后删除对应的buf

  • 相关阅读:
    【iMessage相册推位置推软件】实现 Greetbale 的类型Compiler Error编译器将返回错误Compiler Error
    经验分享:产品经理面试的5大技巧
    Java Swing 制作一个Pong小游戏
    UDP的可靠性传输
    css设置字体属性
    YUV空间-两张图片颜色匹配(颜色替换)
    抗疫逆行者网页作业 感动人物HTML网页代码成品 网页作业带JS下拉菜单 最美逆行者网页模板 致敬疫情感动人物网页设计制作
    C++【类的自动类型转换和强制类型转换】,总要了解一下
    Python匿名函数
    基于C++实现⾃然连接操作算法
  • 原文地址:https://blog.csdn.net/sinat_16493273/article/details/133079010