• zlMediaKit 3 socket模块--怎么封装socket,怎么connect listen/bind write read


    socket.cpp socket.h

    socket

    在这里插入图片描述

    SockInfo类,有四个获取四元组信息的虚函数+一个获取自身标识符的虚函数

    shared_from_this

    原理关于boost中enable_shared_from_this类的原理分析 - 阿玛尼迪迪 - 博客园 (cnblogs.com)

    shared_ptr shared_from_this() { return shared_ptr(M_weak_this); }

    从上面的说明来看,需要小心的是shared_from_this()仅在shared_ptr的构造函数被调用之后才能使用,原因是enable_shared_from_this::weak_this_并不在构造函数中设置,而是在shared_ptr的构造函数中设置

    #include 
    #include 
    #include 
    #include
    #include 
    #include 
    using namespace std;
    
    class A : public std::enable_shared_from_this
    {
    public:
        int i{1};
    };
    
    int main()
    {
        
        //auto p = new A;
        auto p = std::make_shared();
        auto a = p->shared_from_this();// auto p = new A;  --> what():  bad_weak_ptr 没有给m_weak_ptr赋值
        printf("i:%d",p->i);
        return 0;   
    }
    

    Socket

    结构

    //和send_l部分关联
    //一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
    List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
    //一级发送缓存锁
    MutexWrapper<std::recursive_mutex> _mtx_send_buf_waiting;
    //二级发送缓存, socket可写时,会把二级缓存批量写入到socket
    List<BufferList::Ptr> _send_buf_sending;
    //二级发送缓存锁
    MutexWrapper<std::recursive_mutex> _mtx_send_buf_sending;
    //发送buffer结果回调
    BufferList::SendResult _send_result;
    //对象个数统计
    ObjectStatistic<Socket> _statistic;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    构造函数

    //接收数据回调
    using onReadCB = std::function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
    //发生错误回调
    using onErrCB = std::function<void(const SockException &err)>; //只有错误码,socket可能已经断开了
    //tcp监听接收到连接请求
    using onAcceptCB = std::function<void(Socket::Ptr &sock, std::shared_ptr<void> &complete)>;
    //socket发送缓存清空事件,返回true代表下次继续监听该事件,否则停止
    using onFlush = std::function<bool()>;
    //在接收到连接请求前,拦截Socket默认生成方式
    using onCreateSocket = std::function<Ptr(const EventPoller::Ptr &poller)>;
    //发送buffer成功与否回调
    using onSendResult = BufferList::SendResult;
    
    //类内部的静态方法,设置socket的poller和锁属性
    static Socket::Ptr toolkit::Socket::createSocket(const EventPoller::Ptr &poller, bool enable_mutex)
    
    Socket(const toolkit::EventPoller::Ptr &poller, bool enable_mutex)
    {
        //1 绑定poller
        //2 使能锁  _mtx_sock_fd|_mtx_event|_mtx_send_buf_waiting|_mtx_send_buf_sending
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    封装可控制开关的递归锁

    template<class Mtx = std::recursive_mutex>
    class MutexWrapper {
    public:
        MutexWrapper(bool enable) {
            _enable = enable;
        }
    
        ~MutexWrapper() = default;
    
        inline void lock() {
            if (_enable) {
                _mtx.lock();
            }
        }
    
        inline void unlock() {
            if (_enable) {
                _mtx.unlock();
            }
        }
    
    private:
        bool _enable;
        Mtx _mtx;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    lock_guard任意类型的锁的宏

    #define LOCK_GUARD(mtx) lock_guard<decltype(mtx)> lck(mtx)
    
    • 1

    connect

    使用weak_ptr作为一个观察指针,在执行异步时不确定socket是否还在,所以要用weak_ptr.lock()再判断下

    con_cb 执行成功后,释放async_con_cb/_con_timer对象,如果con失败,则释放自己对应的文件描述符

    async_con_cb:con_cb的声明周期由async_con_cb保证

    lambda函数本质:
    遵循了类的特征,生命周期和类是一样的
    如果有参数传入好比类的成员变量
    传入引用就是引用本身的生命周期

    为什么要有con_cb和async_con_cb,后面的if-else中可知,如果时直接给出ip,直接连接,连接的结果作为con_cb的参数传入

    如果需要DNS解析,那么这个操作是阻塞的,异步连接。无论同步异步,连接操作完毕之后,poll该fd上的可写事件,可写后调用onConnected事件

    todo

    • 定时器的实现
    • 工作线程分配任务的实现
    • poller添加事件的实现
    void Socket::connect(const string &url, uint16_t port, onErrCB con_cb_in, float timeout_sec, const string &local_ip, uint16_t local_port)
    {
            closeSock();
    
        weak_ptr<Socket> weak_self = shared_from_this();
        auto con_cb = [con_cb_in, weak_self](const SockException &err) {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                return;
            }
            strong_self->_async_con_cb = nullptr;//
            strong_self->_con_timer = nullptr;//释放智能指针指向的对象
            if (err) {
                LOCK_GUARD(strong_self->_mtx_sock_fd);
                strong_self->_sock_fd = nullptr;
            }
            con_cb_in(err);
        };
    
        auto async_con_cb = std::make_shared<function<void(int)> >([weak_self, con_cb](int sock) {
            //con_cb的生命周期由async_con_cb保证
            auto strong_self = weak_self.lock();
            if (sock == -1 || !strong_self) {
                if (!strong_self) {
                    CLOSE_SOCK(sock);
                } else {
                    con_cb(SockException(Err_dns, get_uv_errmsg(true)));
                }
                return;
            }
    
            auto sock_fd = strong_self->makeSock(sock, SockNum::Sock_TCP);
            weak_ptr<SockFD> weak_sock_fd = sock_fd;
    
            //监听该socket是否可写,可写表明已经连接服务器成功
            int result = strong_self->_poller->addEvent(sock, EventPoller::Event_Write, [weak_self, weak_sock_fd, con_cb](int event) {
                auto strong_sock_fd = weak_sock_fd.lock();
                auto strong_self = weak_self.lock();
                if (strong_sock_fd && strong_self) {
                    //socket可写事件,说明已经连接服务器成功
                    strong_self->onConnected(strong_sock_fd, con_cb);
                }
            });
    
            if (result == -1) {
                con_cb(SockException(Err_other, "add event to poller failed when start connect"));
                return;
            }
    
            //保存fd
            LOCK_GUARD(strong_self->_mtx_sock_fd);
            strong_self->_sock_fd = sock_fd;
        });
    
        if (isIP(url.data())) {
            (*async_con_cb)(SockUtil::connect(url.data(), port, true, local_ip.data(), local_port));
        } else {
            auto poller = _poller;
            weak_ptr<function<void(int)>> weak_task = async_con_cb;
            WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, weak_task, poller]() {
                //阻塞式dns解析放在后台线程执行
                int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port);
                poller->async([sock, weak_task]() {
                    auto strong_task = weak_task.lock();
                    if (strong_task) {
                        (*strong_task)(sock);
                    } else {
                        CLOSE_SOCK(sock);
                    }
                });
            });
            _async_con_cb = async_con_cb;//async_con_cb的生命周期由类成员_async_con_cb保证(续命)
        }
    
        //连接超时定时器 定时器堆上分配,生命周期由类成员_con_timer保证(续命)
        _con_timer = std::make_shared<Timer>(timeout_sec, [weak_self, con_cb]() {
            con_cb(SockException(Err_timeout, uv_strerror(UV_ETIMEDOUT)));
            return false;
        }, _poller);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    onConnected函数由connect函数调用,先删除可写事件监听,再调用attachEvent(正式进入工作状态了),初始化所有线程下共享的读缓存(_read_buffer,默认256KB),连接时的可写事件表明socket是否连接服务器成功。连接后的可读可写事件表明socket上确实有事件到来了,要调用onRead|onWrite|emitErr等方法

    onRead

    使用socket共享的读buffer调用recvfrom,调用_on_read,即同步触发onReadCB回调:接受到了哪个地址的多少数据

    send/send_l

    send(Buffer::Ptr buf, struct sockaddr *addr, socklen_t addr_len, bool try_flush)
    {
        if (!addr || !addr_len) {
          	return send_l(std::move(buf), false, try_flush); //没有sock地址的buf
        }
        return send_l(std::make_shared<BufferSock>(std::move(buf), addr, addr_len), true, try_flush);
    }
    
    send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush)
    // 
    {
        flushData
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    flushData

    //一级缓存是多个buffer
    //二级缓存是多个bufferlist,一个bufferlist是一个一级缓存,在创建时已经绑定对应的发送类型了BufferSendMsg/BufferSendMMsg
    
    • 1
    • 2

    消费sock二级缓存中的数据,如果二级缓存为空就消费一级的,一级缓存清空是通过move操作

    ​ 如果一级缓存也为空,那么说明所有数据均写入socket了,poller停止监听sock可写事件,即使不可写也可以往多级缓存中写,返回

    ​ 需要发数据,while循环中把二级缓存中的BufferList依次发送,同时设置sock的可写事件,有空间写了。udp发送失败,丢弃(pop_front),tcp触发异常,

    ​ 回滚未发送出去的数据,写回二级缓存中

    如果是poller线程就再flushData一次

    startWriteAbleEvent 开始监听可写事件: send失败了,缓冲区不够了,监听以便下次接着写

    stopWriteAbleEvent 停止监听可写事件: 多级buffer空了,即使不可写也能写buffer

    onWriteAble: 多级缓存空了就停止监听可写事件,否则多级缓存写入socket缓冲区

    listen/bindUdpSock

    监听读事件,对应的回调是onAccept

    onAccept

    SockUtil::setNoSigpipe(fd);
    SockUtil::setNoBlocked(fd);
    SockUtil::setNoDelay(fd);
    SockUtil::setSendBuf(fd);
    SockUtil::setRecvBuf(fd);
    SockUtil::setCloseWait(fd);
    SockUtil::setCloExec(fd);
    
    // tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
    peer_sock = _on_before_accept(_poller);//?
    
    // 监听都时间  利用自己传入deleter析构的时候执行 completed
    //这样的目的是 用户处理完onAccept时间后 才能再收到onRead事件
    //因为onAccept是可能异步处理的 所以不能触发事件后立即加入epoll监听onRead事件
    shared_ptr completed(nullptr, [peer_sock, peer_sock_fd](void *) {
        try {
            //然后把该fd加入poll监听(确保先触发onAccept事件然后再触发onRead等事件)
            if (!peer_sock->attachEvent(peer_sock_fd, false)) {
                //加入poll监听失败,触发onErr事件,通知该Socket无效
                peer_sock->emitErr(SockException(Err_eof, "add event to poller failed when accept a socket"));
            }
        } catch (std::exception &ex) {
            ErrorL << ex.what();
        }
    });
    
    
    _socket->setOnAccept([this](Socket::Ptr &sock, shared_ptr &complete) {
        auto ptr = sock->getPoller().get();
        auto server = getServer(ptr);
        ptr->async([server, sock, complete]() {
            //该tcp客户端派发给对应线程的TcpServer服务器
            server->onAcceptConnection(sock);
        });
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    bindUdpSock

    udp没有listen_fd。就是直接监听这个端口的读写事件就行

    bool Socket::bindUdpSock(uint16_t port, const string &local_ip, bool enable_reuse) {
        closeSock();
        int fd = SockUtil::bindUdpSock(port, local_ip.data(), enable_reuse);
        if (fd == -1) {
            return false;
        }
        auto sock = makeSock(fd, SockNum::Sock_UDP);
        if (!attachEvent(sock, true)) {
            return false;
        }
        LOCK_GUARD(_mtx_sock_fd);
        _sock_fd = sock;
        return true;
    }
    
    
    bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) {
        weak_ptr weak_self = shared_from_this();
        weak_ptr weak_sock = sock;
        _enable_recv = true;
        _read_buffer = _poller->getSharedBuffer();
        int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self,weak_sock,is_udp](int event) {
            auto strong_self = weak_self.lock();
            auto strong_sock = weak_sock.lock();
            if (!strong_self || !strong_sock) {
                return;
            }
    
            if (event & EventPoller::Event_Read) {
                strong_self->onRead(strong_sock, is_udp);
            }
            if (event & EventPoller::Event_Write) {
                strong_self->onWriteAble(strong_sock);
            }
            if (event & EventPoller::Event_Error) {
                strong_self->emitErr(getSockErr(strong_sock));
            }
        });
    
        return -1 != result;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    SockSender

    子类send的实现!

    virtual ssize_t send(Buffer::Ptr buf) = 0;
    virtual void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) = 0;
    
    //这么多发送类型,都依赖send(Buffer::Ptr buf)的重写实现!
    //发送char *
    SockSender &operator << (const char *buf);
    ssize_t send(const char *buf, size_t size = 0);
    //发送字符串
    SockSender &operator << (std::string buf);
    ssize_t send(std::string buf);
    //发送Buffer对象
    SockSender &operator << (Buffer::Ptr buf);
    //发送其他类型是数据
    template<typename T>
    SockSender &operator << (T &&buf) {
        std::ostringstream ss;
        ss << std::forward<T>(buf);
        send(ss.str());
        return *this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    test_tcpClient.cpp 这样也可以发数据

    auto buf = BufferRaw::create();
    if(buf){
        buf->assign("[BufferRaw]\0");
        (*this) << _nTick++ << " "
                << 3.14 << " "
                << string("string") << " "
                <<(Buffer::Ptr &)buf;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    SocketHelper

    在这里插入图片描述

    has a sock

    对于send的实现,是对Socket::send的封装

    ssize_t SocketHelper::send(Buffer::Ptr buf) {
        if (!_sock) {
            return -1;
        }
        return _sock->send(std::move(buf), nullptr, 0, _try_flush);
    }//nullptr判断是否是一个bufSock getBufferSockPtr,给sendmmsg指定地址用
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    对于async的实现,是对EventPoller::async的封装

    总结

    • weak_ptr|shared_from_this|的使用场景,异步操作时,判断对象生命周期

    • decltype常用在<>中,不像auto一样需要定义,只是声明

    • lock_guard的使用,传入一个可以lock和unlock的对象

    • recursive_mutex的使用场景

    • 二级缓存发送数据时的作用,以及回滚的实现

    • 写事件的监听的目的时机

    • shared xxx(nullptr, [](){} ) 用法,自己传入deleter

  • 相关阅读:
    [一篇读懂]C语言八讲:数据结构概述
    16.live555mediaserver-保活机制
    二进制安全虚拟机Protostar靶场 安装,基础知识讲解,破解STACK ZERO
    spark插入动态分区代码报错
    【AGC】如何创建自定义应用内消息
    vue面试题集(四)
    JavaFx学习问题1--图片不显示问题
    CSS中的浮动float(元素怎样浮动以及浮动元素的特点--脱标)
    力扣每日一题:895. 最大频率栈【哈希表和队列】
    009:字符串中的整数求和
  • 原文地址:https://blog.csdn.net/qq_41565920/article/details/127718224