• zlMediaKit 4 buffer模块--buffer什么都能装&不只有send还有sendmsg/sendmmsg


    对应源码

    Buffer.h

    BufferSock.h

    Buffer

    UML

    在这里插入图片描述

    //禁止拷贝基类
    class noncopyable {
    protected:
        noncopyable() {}
        ~noncopyable() {}
    private:
        //禁止拷贝
        noncopyable(const noncopyable &that) = delete;
        noncopyable(noncopyable &&that) = delete;
        noncopyable &operator=(const noncopyable &that) = delete;
        noncopyable &operator=(noncopyable &&that) = delete;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Buffer基类

    缓存抽象基类。有两个虚函数,data和size,前者获取数据指针,后者获取数据长度,派生类需要实现这两个函数。仅通过data拿到char时,不要直接就访问数据,因为数据可能是二进制的(不能以\0判断结尾)。std::string是可以存储二进制数据的,初始化时需要使用string(char,int count),指定长度后不会以\0结束。

    virtual char * 	data () const =0 
    virtual size_t 	size () const =0
    virtual std::string 	toString () const
    virtual size_t 	getCapacity () const
        
    ObjectStatistic<Buffer> _statistic;//用于计数的静态变量
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    BufferLikeString:对string进行封装

    size_t _erase_head;
    size_t _erase_tail;
    std::string _str;
    //对象个数统计
    ObjectStatistic<BufferLikeString> _statistic;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    构造函数

    基类禁止拷贝操作对派生类的影响

    C++实现用来继承的noncopyable类

    C++11中的defalut和delete关键字

    • 派生类继承基类时,若派生类没有定义自己的拷贝构造函数和拷贝复制运算符,则基类的拷贝属性会传递到派生类。

    • 派生类继承基类时,若派生类已定义自己的拷贝构造函数和拷贝复制运算符,则基类的拷贝属性不会传递到派生类。(即使继承了nocopy类,也照样可以copy)

    //可以由string和char构造
    BufferLikeString() 
    BufferLikeString(const char *str)
    BufferLikeString(std::string str)
    BufferLikeString(BufferLikeString &&that)//移动构造函数(参数本来就是右值,资源转移)
    BufferLikeString(const BufferLikeString &that)//照样可以copy构造
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    BufferLikeString & 	operator= (std::string str)
    BufferLikeString & 	operator= (const char *str)
    BufferLikeString & 	operator= (BufferLikeString &&that)
    BufferLikeString & 	operator= (const BufferLikeString &that)//照样可以拷贝赋值
    
    • 1
    • 2
    • 3
    • 4

    结构

    |---erase_head---|XXXXXX|---erase_tail---|
    //一个掐头去尾的字符串,头部和尾部余量留出来了,为了封装头部和尾部的协议?    
        
    size_t 	_erase_head
    size_t 	_erase_tail
    std::string    _str
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    assign的目的是?可以方便的缩短字符串

    //如果原来是"123456",需要assign成"2345",只用变动头尾的两个指针,为了解析封装好的协议?容易的掐头去尾
    
    BufferLikeString &assign(const char *data, size_t len) {
    //...
       if (data >= _str.data() && data < _str.data() + _str.size()) {
           _erase_head = data - _str.data();
           if (data + len > _str.data() + _str.size()) {
               throw std::out_of_range("BufferLikeString::assign out_of_range");
           }
           _erase_tail = _str.data() + _str.size() - (data + len);
           return *this;
       }
    //...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    BufferRaw 指针式缓存对象

    结构

    size_t 	_size = 0
    size_t 	_capacity = 0
    char * 	_data = nullptr
    
    • 1
    • 2
    • 3
    //一个以"\0"结尾,但是可以存储"\0"的字符串数组(因为size被_size记录着),会随着assign开辟新的容量,_data指针会变化
    void setCapacity(size_t capacity)
    {
        _data = new char[capacity];
        _capacity = capacity;
    }
    void assign(const char *data, size_t size = 0) 
    {
        if (size <= 0) {
            size = strlen(data);
        }
        setCapacity(size + 1);
        memcpy(_data, data, size);
        _data[size] = '\0';
        setSize(size);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    BufferOffset

    结构

    C _data;
    size_t _size;
    size_t _offset;
    
    • 1
    • 2
    • 3
    template <typename C>//代码中看,C这泛型类要由data和size两个方法
    class BufferOffset : public  Buffer 
    {
        C _data;
    	size_t _size;
    	size_t _offset;
    }
    
    //该类最大的不同是,可以有offset
    char *data() const override {
        return const_cast<char *>(getPointer<C>(_data)->data()) + _offset;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    C可以是值,也可以是指针

    https://c-cpp.com/cpp/types/remove_cv.html cpp的traits技巧

    C++11 enable_if 的使用

    type traits相关技巧

    //定义了几个继承std::false_type 的 is_pointer类的特化。调用is_pointer时编译器会帮我们选择合适的类
    template <typename T> struct is_pointer : public std::false_type {};
    template <typename T> struct is_pointer<std::shared_ptr<T> > : public std::true_type {};
    template <typename T> struct is_pointer<std::shared_ptr<T const> > : public std::true_type {};
    template <typename T> struct is_pointer<T*> : public std::true_type {};
    template <typename T> struct is_pointer<const T*> : public std::true_type {};
    
    //配合std::enable_if使用判断解析模板参数的类型,决定返回值返回啥
    //也就是说如果这个T是is_pointer特化的后四类(即指针),那么就返回他本身
    template<typename T>
    static typename std::enable_if<::toolkit::is_pointer<T>::value, const T &>::type
    getPointer(const T &data) {
        return data;
    }
    //如果这个T是不是指针类型,就返回const T *,他的地址
    template<typename T>
    static typename std::enable_if<!::toolkit::is_pointer<T>::value, const T *>::type
    getPointer(const T &data) {
        return &data;
    }
    //这保证了
    //getPointer(_data)->data() 无论返回啥,都能成功调用->运算符
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    BufferSock

    结构

    继承自Buffer类,内部组合了Buffer父类的智能指针,多态可以有任意buffer子类的特性,同时附加了sockaddr的特性

    和其他的Buffer子类并不平行

    int _addr_len = 0;
    struct sockaddr *_addr = nullptr;
    Buffer::Ptr _buffer; //*****
    
    • 1
    • 2
    • 3

    BufferList

    结构

    很特殊,没有类成员

    有一个返回自己智能指针的静态函数create,给create传入的是

    一个装着Buffer::Ptr的list

    一个SendResult的cb,由SendResult的定义可知,基本上每个buffer的发送结果都会有回声,

    一个is_udp

    using Ptr = std::shared_ptr<BufferList>;
    using SendResult = std::function<void(const Buffer::Ptr &buffer, bool send_success)>;
    static Ptr create(List<std::pair<Buffer::Ptr, bool> > list, SendResult cb, bool is_udp);
    
    • 1
    • 2
    • 3

    create 构造函数

    create函数根据是否是udp返回BufferSendMMsgBufferSendMsg,所以这个两个类肯定是继承自BufferList,返回值可以向上转型,同时可知这个两个类都可以由list+sendcb初始化

    make_sharedhttps://www.jianshu.com/p/03eea8262c11 make_shared在于只分配一次内存,将计数器紧挨在对象后面,异常安全的,因为计数器紧挨对象,不会因为new对象和构造Ptr的顺序不一致而有内存泄露

    if (is_udp) {
        return std::make_shared<BufferSendMMsg>(std::move(list), std::move(cb));
    }
    return std::make_shared<BufferSendMsg>(std::move(list), std::move(cb));
    
    • 1
    • 2
    • 3
    • 4

    虚函数,需要这两个类重写的,解决怎么发,发了多少,发完了没的问题

    virtual bool empty() = 0;
    virtual size_t count() = 0;
    virtual ssize_t send(int fd, int flags) = 0;
    
    • 1
    • 2
    • 3

    UML

    在这里插入图片描述

    在这里插入图片描述

    BufferCallBack

    结构

    BufferList::SendResult _cb;
    List<std::pair<Buffer::Ptr, bool> > _pkt_list;
    
    • 1
    • 2
    void sendCompleted(bool flag) {
        if (_cb) {
            //全部发送成功或失败回调
            while (!_pkt_list.empty()) {
                _cb(_pkt_list.front().first, flag);
                _pkt_list.pop_front();
            }
        } else {
            _pkt_list.clear();
        }
    }
    void sendFrontSuccess() {
        if (_cb) {
            //发送成功回调
            _cb(_pkt_list.front().first, true);
        }
        _pkt_list.pop_front();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    *BufferSendMsg

    list中已经全部给了BufferCallBack保存,则这个类就是override send empty count三个方法

    结构

    size_t _iovec_off = 0;  //count _iovec.size() - _iovec_off; 发送了多少个buffer,即发送给哪个块了
    size_t _remain_size = 0;//?构造函数中表明这个不是有多少个buffer,是bufferlist中的sum(数据长度) 总字节数//为待发送的字节数
    std::vector<struct iovec> _iovec;//?
    
    • 1
    • 2
    • 3

    iovec是个啥 一个指向Buffer的结构,一个可以被readv,writev调用的结构

    struct iovec定义了一个向量元素。通常,这个结构用作一个多元素的数组。对于每一个传输的元素,指针成员iov_base指向一个缓冲区,这个缓冲区是存放的是readv所接收的数据或是writev将要发送的数据。成员iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度。

    readv(2)与writev(2)函数这些函数是作为read与write函数的衍生函数而被人所知的。他们以这样的方式进行设计是因为他们可以 在一个原子操作中读取或是写入多个缓冲区 。这些函数的原型如下:

    int readv(int fd, const struct iovec *vector, int count);
    int writev(int fd, const struct iovec *vector, int count);

    //关键函数send
    ssize_t BufferSendMsg::send(int fd, int flags) {
        auto remain_size = _remain_size;
        while (_remain_size && send_l(fd, flags) != -1);
    
        //通过返回值可以知道发送的多少
        ssize_t sent = remain_size - _remain_size;
        if (sent > 0) {
            //部分或全部发送成功
            return sent;
        }
        //一个字节都未发送成功
        return -1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    !发包的过程sendmsg

    send_l中使能的是msghdr中的flags

    //关键函数的关键函数send_l
    ssize_t BufferSendMsg::send_l(int fd, int flags) {
        ssize_t n;
        do {
            struct msghdr msg;
            msg.msg_name = nullptr;  //发送地址可选为空,因为是TCP连接套接字,fd中已经指明往哪里发了
            msg.msg_namelen = 0;
            msg.msg_iov = &(_iovec[_iovec_off]);
            msg.msg_iovlen = _iovec.size() - _iovec_off;
            if (msg.msg_iovlen > IOV_MAX) {
                msg.msg_iovlen = IOV_MAX;
            }
            msg.msg_control = nullptr;
            msg.msg_controllen = 0;
            msg.msg_flags = flags;
            n = sendmsg(fd, &msg, flags); 
        } while (-1 == n && UV_EINTR == get_uv_error(true));
    
        if (n >= (ssize_t)_remain_size) {
            //全部写完了,发送完毕的回调
            _remain_size = 0;
            sendCompleted(true);
            return n;
        }
    
        if (n > 0) {
            //部分发送成功
            reOffset(n);//调整到下一个发包或者更新本次未发完的包的起始地址
            return n;
        }
    
        //一个字节都未发送
        return n;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    sendmsg https://linux.die.net/man/2/sendmsg 发送的结构中要包含iovec,成功返回发送的字节数,失败-1

    send只可用于基于连接的套接字,send 和 write唯一的不同点是标志的存在,当标志为0时,send等同于write。sendtosendmsg既可用于无连接的套接字,也可用于基于连接的套接字。基于连接套接字时msghdr中的address需要为空。

    falgs MSG_CONFIRM |MSG_DONTROUTE|MSG_DONTWAIT |MSG_EOR |MSG_MORE |MSG_NOSIGNAL|MSG_OOB

    ssize_t sendmsg(int  sockfd , const struct msghdr * msg , int  flags);
    
    struct msghdr {
        void         *msg_name;       /* optional address */
        socklen_t     msg_namelen;    /* size of address */
        struct iovec *msg_iov;        /* scatter/gather array */ //Buffer和iovec的对应是为了符合调用sendmsg
        size_t        msg_iovlen;     /* # elements in msg_iov */
        void         *msg_control;    /* ancillary data, see below */
        size_t        msg_controllen; /* ancillary data buffer len */
        int           msg_flags;      /* flags on received message */
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    |----------|----------|-----------|
    	  _iovec_off
        
    //因为组织形式是包的形式,发送返回的是字节数,需要更新目前发送到哪个包了
    //同时还有可能一个包发了一半没发玩,调整该包需要发送的起始地址
    void BufferSendMsg::reOffset(size_t n) {
        _remain_size -= n;//首先更新剩余的待发送总字节数
        size_t offset = 0; 
        for (auto i = _iovec_off; i != _iovec.size(); ++i) {
            auto &ref = _iovec[i];
            offset += ref.iov_len;
            if (offset < n) {
                //此包发送完毕
                sendFrontSuccess();
                continue;
            }
            _iovec_off = i;
            if (offset == n) {
                //这是末尾发送完毕的一个包
                ++_iovec_off;//下次就要发送新包了
                sendFrontSuccess();
                break;
            }
            //这是末尾发送部分成功的一个包
            size_t remain = offset - n; //这个包还有多少字节待发
            ref.iov_base = (char *)ref.iov_base + ref.iov_len - remain; //调整起始地址,下次从这里开始发
            ref.iov_len = remain;
            break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    BufferSendMMsg

    结构

    用于发送udp的结构,相比于tcp多了_hdrvec这个结构,少了偏移量,所以udp的包都是一次发一个包?

    size_t _remain_size = 0;
    std::vector<struct iovec> _iovec;
    std::vector<struct mmsghdr> _hdrvec;
    
    • 1
    • 2
    • 3
    struct mmsghdr {
            struct msghdr   msg_hdr;
            unsigned        msg_len;
    };
    
    • 1
    • 2
    • 3
    • 4
    BufferSendMMsg::BufferSendMMsg(List<std::pair<Buffer::Ptr, bool>> list, SendResult cb)
        : BufferCallBack(std::move(list), std::move(cb))
        , _iovec(_pkt_list.size())
        , _hdrvec(_pkt_list.size()) {
        auto i = 0U;
        _pkt_list.for_each([&](std::pair<Buffer::Ptr, bool> &pr) {
            auto &io = _iovec[i];
            io.iov_base = pr.first->data();
            io.iov_len = pr.first->size();
            _remain_size += io.iov_len;
    // ---------------构造函数中比tcp多的部分,在调用send前已经初始化好msghdr结构了----------------------
            auto ptr = getBufferSockPtr(pr);
            auto &mmsg = _hdrvec[i];
            auto &msg = mmsg.msg_hdr;
            mmsg.msg_len = 0;
            msg.msg_name = ptr ? (void *)ptr->sockaddr() : nullptr;
            msg.msg_namelen = ptr ? ptr->socklen() : 0;
            msg.msg_iov = &io;
            msg.msg_iovlen = 1;
            msg.msg_control = nullptr;
            msg.msg_controllen = 0;
            msg.msg_flags = 0;
            ++i;
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    !发包的过程sendmmsg

    The sendmmsg() system call is an extension of sendmsg(2) that allows the caller to transmit multiple messages on a socket using a single system call. (This has performance benefits for some applications.)

    一个允许调用者单次系统调用发送多个messages的函数,是对sendmsg的扩展

    https://man7.org/linux/man-pages/man2/sendmmsg.2.html

    sendmsg(fd, &msg, flags);//sendmsg一次发一个msg
    sendmmsg(fd, &_hdrvec[0], _hdrvec.size(), flags);//sendmsg一次发多个msg,多出来的参数就是要一次发几个msg
    
    std::vector<struct mmsghdr> _hdrvec;
    struct mmsghdr {
        struct msghdr   msg_hdr;
        unsigned        msg_len;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    //在这里把sendmmsg系统调用搞成了内联的函数,少一次寻址
    static inline int sendmmsg(int fd, struct mmsghdr *mmsg, unsigned vlen, unsigned flags)
    {
        return syscall(__NR_sendmmsg, fd, mmsg, vlen, flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    对应的也要有调整动作reOffset,sendmmsg一次没发完就接着发

    //调整对应的vec中的iov的起始地址
    void BufferSendMMsg::reOffset(size_t n) {
        for (auto it = _hdrvec.begin(); it != _hdrvec.end();) {
            auto &hdr = *it;
            auto &io = *(hdr.msg_hdr.msg_iov);
            assert(hdr.msg_len <= io.iov_len);
            _remain_size -= hdr.msg_len;
            if (hdr.msg_len == io.iov_len) {
                //这个udp包全部发送成功
                it = _hdrvec.erase(it); //找到了!,没记录偏移量是因为把之前的都删了
                sendFrontSuccess();
                continue;
            }
            //部分发送成功
            io.iov_base = (char *)io.iov_base + hdr.msg_len; //调整起始地址
            io.iov_len -= hdr.msg_len;//调整对应的待发数据长度
            break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    总结

    关键词

    • 系统调用 sendmsg sendmmsg 对应的底层结构 mmsghdr msghdr iovec

    • 怎样发包|怎样发多个包|一次没发完,怎么接着发

    • type traits相关技巧 std::enable_if std::is_pointer

    • make_shared好处

    • 继承noncopyable不一定不能copy

    既然看了源码,就争取看细致一些,趁着自己有时间和精力

    感谢https://blog.csdn.net/youlezhe/article/details/122269224?spm=1001.2014.3001.5502,指路

  • 相关阅读:
    解决Ubuntu系统字体太小的问题
    甲硝唑修饰二硫化钨MoS2/二硒化钨WSe2/碲化钨WTe2/二氧化锰MnO2/四氧化三铁Fe3O4/二氧化硅SiO2/二氧化钛TIO2纳米粒
    C++ 继承和多态
    kafka知识点汇总
    2022.11.06 洛谷 P6587 超超的序列
    iOS13新增SceneDelegate文件适配
    Spring Boot中如何实现bean加载
    【并发编程】-1. 计算机内存架构、JAVA内存模型、Volatile关键字
    LeetCode_位运算_困难_805.数组的均值分割
    拓端tecdat|r语言中对LASSO回归,Ridge岭回归和Elastic Net模型实现
  • 原文地址:https://blog.csdn.net/qq_41565920/article/details/127718369