• C++ 网络编程学习五


    网络结构的更新

    1. asio网络层,会使用io_context进行数据封装,底层的话,在linux中就是epoll模型,在windows就是iocp模型。
      在这里插入图片描述
    2. 当服务器的接受数据较多时,又要处理接收到的信息的逻辑处理,逻辑处理一般会放到一个逻辑处理队列中进行处理。因为有时候逻辑比较复杂。
    3. 通过一个队列,单独线程从队列中取出逻辑函数。从而实现网络线程和逻辑线程分开,由一个队列进行连接。极大地提升网络线程的收发能力,并且可以用多线程的方式管理网络层。
    4. asio的多线程模式:
      • 启动n个线程,每个线程都有一个iocontext,每个线程负责一部分的socket。
      • 一个ioconext由多个线程共享。也可以一定程度上减轻readhandler的负担。
    5. 逻辑处理一般都是单线程的,因为大量的用户同时处理一个逻辑过程的时候,频繁地加锁取消锁,还不如就单线程的来做。

    完善消息结构:
    消息 = 消息id + 消息长度 + 消息内容。 前两部分统一封装到消息头里tlv格式。消息id占2个字节,消息长度占2个字节,消息头共4个字节。

    更新消息节点:将收取消息的类和发送消息的类,继承自消息基类。

    //HEAD_TOTAL_LEN = 4 包含id 和 消息长度。
    SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
    , _msg_id(msg_id){
        //先发送id, 本机字节序转为网络字节序
        short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
        memcpy(_data, &msg_id_host, HEAD_ID_LEN); // 变量按照字节流的方式,写到_data的前两个字节。
        //消息长度和消息体本身  转为网络字节序
        short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
        // 消息体长度信息的拷贝。
        memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
        // 消息体本身的拷贝。
        memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    单例模式

    懒汉单例模式

    1. 通过静态成员变量实现单例。存在隐患,对于多线程方式生成的实例可能是多个。
    class Single2 {
    private:
    	Single2() {}
    	Single2(const Single2&) = delete;
    	Single2& operator=(const Single2&) = delete;
    public:
    	// 静态局部变量实现单例
    	static Single2& GetInst() {
    		static Single2 single; // 生命周期和进程一样,函数的局部静态变量生命周期随着进程结束而结束。
    		return single;
    	}
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    饿汉单例模式

    class Single2Hungry
    {
    private:
        Single2Hungry()
        {
        }
        Single2Hungry(const Single2Hungry &) = delete;
        Single2Hungry &operator=(const Single2Hungry &) = delete;
    public:
        static Single2Hungry *GetInst()
        {
            if (single == nullptr)
            {
                single = new Single2Hungry();
            }
            return single;
        }
    private:
        static Single2Hungry *single;
    };
    // 初始化
    Single2Hungry *Single2Hungry::single = Single2Hungry::GetInst();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    饿汉式是在程序启动时就进行单例的初始化,这种方式也可以通过懒汉式调用,无论饿汉式还是懒汉式都存在一个问题,就是什么时候释放内存?多线程情况下,释放内存就很难了,还有二次释放内存的风险。

    懒汉式指针

    单例模式的单例由指针存在,创建单例的时候,用加锁的方式进行判断。防止在加锁的过程中,出现单例类被创建的情况。

    class SinglePointer
    {
    private:
        SinglePointer()
        {
        }
        SinglePointer(const SinglePointer &) = delete;
        SinglePointer &operator=(const SinglePointer &) = delete;
    public:
        static SinglePointer *GetInst()
        {
            if (single != nullptr)
            {
                return single;
            }
            s_mutex.lock();
            if (single != nullptr)
            {
                s_mutex.unlock();
                return single;
            }
            single = new SinglePointer();
            s_mutex.unlock();
            return single;
        }
    private:
        static SinglePointer *single;
        static mutex s_mutex;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    智能指针设计单例类

    class SingleAuto
    {
    private:
    	SingleAuto()
    	{
    	}
    	SingleAuto(const SingleAuto&) = delete;
    	SingleAuto& operator=(const SingleAuto&) = delete;
    public:
    	~SingleAuto()
    	{
    		cout << "single auto delete success " << endl;
    	}
    	static std::shared_ptr<SingleAuto> GetInst()
    	{
    		if (single != nullptr)
    		{
    			return single;
    		}
    		s_mutex.lock();
    		if (single != nullptr)
    		{
    			s_mutex.unlock();
    			return single;
    		}
    		single = std::shared_ptr<SingleAuto>(new SingleAuto);
    		s_mutex.unlock();
    		return single;
    	}
    private:
    	static std::shared_ptr<SingleAuto> single;
    	static mutex s_mutex;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    服务器优雅退出

    • 服务器退出之前,要把服务器逻辑队列中的服务执行完成。
    • asio提供的信号建立的方式,利用signal_set 定义了一系列信号合集,并且绑定了一个匿名函数,匿名函数捕获了io_context的引用,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop
    int main()
    {
        try {
            boost::asio::io_context  io_context;
            // 绑定信号 想捕获的信号,都加进去就行了 捕获io_context的收到的信号
            boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
            // 异步等待方式,等待停止信号
            // 绑定了一个匿名函数,匿名函数捕获了io_context的引用,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop
            signals.async_wait([&io_context](auto, auto) {
                io_context.stop();
                });
            CServer s(io_context, 10086);
            io_context.run();
        }
        catch (std::exception& e) {
            std::cerr << "Exception: " << e.what() << endl;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    asio的多线程模型IOService

    1. 第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享。
      在这里插入图片描述

    2. 启动线程的个数,不要超过核数。

    3. 每个线程独立调用io_context,一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调:

      • 那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。
      • 如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。可以通过加锁或者逻辑队列的方式解决安全问题。
    4. 多线程的优势:提升了并发能力, 单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用。如果一个回调函数的调用时间较长肯定会影响后续的函数调用。

    5. 通过逻辑队列的方式将网络线程和逻辑线程解耦合,不会出现前一个调用时间影响下一个回调触发的问题。

    class AsioIOServicePool :public Singleton<AsioIOServicePool>
    {
        friend Singleton<AsioIOServicePool>; //声明友元,用这个类访问构造函数
    public:
        using IOService = boost::asio::io_context; // 定义别名
        // 通常会将一些异步操作提交给io_context进行处理,然后该操作会被异步执行,而不会立即返回结果。
        // 如果没有其他任务需要执行,那么io_context就会停止工作,导致所有正在进行的异步操作都被取消。
        // 这时,我们需要使用boost::asio::io_context::work对象来防止io_context停止工作。
        using Work = boost::asio::io_context::work; // work,防止io_context在没有被注册事件的时候退出。
        using WorkPtr = std::unique_ptr<Work>; //work不被改变。
    
        ~AsioIOServicePool();
        AsioIOServicePool(const AsioIOServicePool&) = delete;//不加引用的话,会造成一个递归构造的危险,不允许拷贝构造
        AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
        // 使用 round-robin 的方式返回一个 io_service
        boost::asio::io_context& GetIOService();// 轮询的方式
        void Stop();// 停止ioservice
    private:
        // 不让外界直接调用
        AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); //根据CPU核数去创建线程数量
        std::vector<IOService> _ioServices; //IOService的vector变量,用来存储初始化的多个IOService。
        std::vector<WorkPtr> _works;
        std::vector<std::thread> _threads;
        std::size_t   _nextIOService;//下标
    };
    
    // 构造函数,初始化size个ioservice,size个_works, _ioServices绑定到works
    // _ioServices放到不同的线程中。
    AsioIOServicePool::AsioIOServicePool(std::size_t size) :_ioServices(size),
    _works(size), _nextIOService(0) {
        for (std::size_t i = 0; i < size; ++i) {
            _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
        }
        //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
        for (std::size_t i = 0; i < _ioServices.size(); ++i) {
            _threads.emplace_back([this, i]() {
                _ioServices[i].run();// 如果不绑定work,run()就会直接返回了
                });
        }
    }
    
    AsioIOServicePool::~AsioIOServicePool() {
        std::cout << "destruct" << std::endl;
    }
    
    // 轮询的方式,每次取出来的ioservice都不同。
    boost::asio::io_context& AsioIOServicePool::GetIOService() {
        auto& service = _ioServices[_nextIOService++]; //取出service
        if (_nextIOService == _ioServices.size()) {
            _nextIOService = 0;
        }
        return service;
    }
    
    void AsioIOServicePool::Stop() {
        for (auto& work : _works) {
            work.reset();// 想要Service停掉,要先work.reset();
            // work.reset()是让unique指针置空并释放,那么work的析构函数就会被调用,work被析构,其管理的io_service在没有事件监听时就会被释放。
        }
        for (auto& t : _threads) {
            t.join();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    asio多线程IOThreadPool

    1. 一个IOServicePool开启n个线程和n个iocontext,每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪,如果就绪就在各自线程里触发回调函数。
    2. IOThreadPool:初始化一个iocontext用来监听服务器的读写事件,包括新连接到来的监听也用这个iocontext。只是我们让iocontext.run在多个线程中调用,这样回调函数就会被不同的线程触发,从这个角度看回调函数被并发调用了。
    3. 线程池统一管理一个io_context,每个线程调用一个io_context,会话session都注册到一个,哪个线程调用了io_context.run,哪个线程去就绪队列取出回调函数。
    4. 回调函数对同一个session来说就是不安全的。
    class AsioThreadPool :public Singleton<AsioThreadPool>
    {
    public:
        friend class Singleton<AsioThreadPool>;
        ~AsioThreadPool() {}
        AsioThreadPool& operator=(const AsioThreadPool&) = delete;
        AsioThreadPool(const AsioThreadPool&) = delete;
        boost::asio::io_context& GetIOService();
        void Stop();
    private:
        AsioThreadPool(int threadNum = std::thread::hardware_concurrency());
        boost::asio::io_context _service;
        std::unique_ptr<boost::asio::io_context::work> _work; //没有客人来,我也不会让饭店关门
        std::vector<std::thread> _threads;
    };
    
    // 初始化列表进行初始化
    AsioThreadPool::AsioThreadPool(int threadNum) :_work(new boost::asio::io_context::work(_service)) {
        for (int i = 0; i < threadNum; ++i) {
            _threads.emplace_back([this]() {
                _service.run();
                });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 线程池里每个线程都会运行_service.run函数,这就是多线程调用一个io_context的逻辑。
    • 因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况。
    • _service.run 内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表在windows上会循环调用GetQueuedCompletionStatus函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数。

    epoll 和 iocp的一些知识点

    IOCP的使用主要分为以下几步:
    1 创建完成端口(iocp)对象。
    2 创建一个或多个工作线程,在完成端口上执行并处理投递到完成端口上的I/O请求。
    3 Socket关联iocp对象,在Socket上投递网络事件。
    4 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包,取得事件信息并进行处理。
    
    epoll_wait的工作方式:
    1 调用epoll_creat在内核中创建一张epoll表。
    2 开辟一片包含n个epoll_event大小的连续空间。
    3 将要监听的socket注册到epoll表里。
    4 调用epoll_wait,传入之前我们开辟的连续空间,epoll_wait返回就绪的epoll_event列表,
    epoll会将就绪的socket信息写入我们之前开辟的连续空间。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 使用这种方式,有可能会存在隐患,不同的线程有可能处理同一块Read回调处理函数,存在网络上的并行。
      改进方法:再添加一个strand管理的队列,asio的strand是一个安全队列,里面进行独立的单线程访问。
    • 回调处理放在_strand中进行执行。
    void CSession::Start(){
        ::memset(_data, 0, MAX_LENGTH);
        _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
            boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this,
                std::placeholders::_1, std::placeholders::_2, SharedSelf())));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • IOThreadPool相比于IOServicePool,速度慢一些。

    参考列表
    https://www.bilibili.com/video/BV1FV4y1U7oo/
    https://llfc.club/category?catid=225RaiVNI8pFDD5L4m807g7ZwmF#!aid/2Qld2hoFIu8ycYBJXQdxwyWEBfy

  • 相关阅读:
    Git 配置处理客户端无法正常访问到 github 原网站时,npm 下载依赖包失败的问题
    【软考】数据结构之队列和栈
    Reinforcement Learning in the Era of LLMs: What is Essential? What is needed?
    为了讲明白继承和super、this关键字,群主发了20块钱群红包
    leetCode 11. 盛最多水的容器 + 双指针
    C++学习 day--21 地震监测系统实现、内存泄漏检测工具
    内存逃逸分析
    发现一款非常好用的学术GPT,可形成知识库,并分析论文,根据观点生成文字
    CSDN21天学习挑战赛之冒泡排序
    Python虚拟环境指南2022版本
  • 原文地址:https://blog.csdn.net/cliu1_16/article/details/136688331