• WebServer——二:线程池


    • 创建线程池的目的是为了避免频繁的申请和释放线程。
    • 一开始申请一堆线程,然后等待请求队列中有任务,线程负责取出任务并执行。

    本项目的事件处理模式使用同步IO模拟proactor模式

    模拟Proactor模式

    • 使用同步IO也能模拟Proactor模式,原理在于主线程执行数据读写操作,当读写完成后,主线程向工作线程通知完成事件,所以从工作线程角度看,他直接获得了数据读写的结果,只需要对结果进行逻辑处理。
    • 使用同步IO(例如epoll_wait)模拟Proactor模式的工作流程如下:
      • 主线程往epoll内核事件表注册socket 读就绪事件
      • 主线程调用epoll_wait函数等待socket上有数据可读
        当socket上有数据可读时,epoll_wait通知主线程,主线程从socket循环读取数据,将读取到的数据封装成请求并加入请求队列。
      • 请求队列某个工作线程被唤醒,获得请求对象并处理客户请求,之后往epoll内核事件表注册socket写就绪事件、
        主线程调用epoll_wait等待socket可写
      • socket可写时,epoll_wait通知主线程,主线程往socket写入处理结果。
        在这里插入图片描述

    半同步半反应堆模式

    • 用来实现上图的工作线程和主线程部分。
    • 先用线程池生成一定量的工作线程,工作线程生成之后用while循环等待,直到信号量通知,也就是请求队列中有请求。
    while(!m_stop)
    {    
         //信号量等待
         m_queuestat.wait();
         ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 主线程为异步线程,负责监听socket事件
    • 采用模拟Proactor模式
      在这里插入图片描述
    • 缺点:
      • 主线程和工作线程共享请求队列,所以主线程加任务到请求队列,或者工作线程从主线程中取出任务,都要加锁解锁。
        工作线程只能处理一个客户请求,客户数量多而工作线程少, 客户端响应速度会降低。

    线程池

    • 设计模式:半同步半反应堆。反应堆为模拟Proactor事件处理模式。
    • 主线程,也就是main()负责监听和插入请求到请求队列,子线程,也就是工作线程,也就是在线程池初始化的时候就有的,负责从请求队列中取出任务并完成任务。这里的任务具体为构造http响应。

    构造思路:

    • 初始化给三个参数:数据库连接池,线程数,工作队列最大允许数量。
    • 在初始化的时候创建线程,由于pthread_cread传递的函数必须是静态的,所以可以在类中构建一个static函数work,给这个work传递this指针,并在work函数中调用类的run函数,这样绕一圈,既能在构造线程池的时候完成线程的构造,又能让工作线程执行threadPool对象的run函数。

    class threadPool -> static threadPool::work(this)->this->threadPool::run();

    • append() :用于往工作队列里添加请求
    • run():用while+sem::wait()阻塞等待工作队列的内容,一旦工作队列有内容出现,各个工作线程就争抢请求,拿到请求的线程执行。
    template<typename T>
    class threadPool{
    public:
        threadPool(connection_pool*, int num = 4, int maxListNums = 100);
        ~threadPool();
        bool append(std::shared_ptr<T>);
    private:
        static void* work(void*);
        void run();
    private:
        //工作队列
        std::queue<std::shared_ptr<T>> workList;
        //线程ID保存
        std::vector<pthread_t> m_threadList;
        //工作队列最大数目
        int MAX_LIST_NUMS;
        //创建线程数目
        int m_thread_nums;
        //锁
        locker m_mutex;
        //信号量
        sem m_sem;
        //线程池是否开启
        bool isOpen;
        //数据库池
        connection_pool *m_connPool;
    };
    template<typename T>
    void *threadPool<T>::work(void* t){
        threadPool *request = static_cast<threadPool*>(t);
        request->run();
        pthread_exit(nullptr);
    }
    
    
    //构造函数
    template<typename T>
    threadPool<T>::threadPool(connection_pool* conn_pool, int num, int maxListNums)
        :m_thread_nums(num), MAX_LIST_NUMS(maxListNums), m_threadList(num), isOpen(true), m_connPool(conn_pool)
    {
        for(int i = 0; i != m_thread_nums; ++i){
            int err = pthread_create(&m_threadList[i], nullptr, work, (void*)this);
            if(err != 0){
                std::cerr <<  "[" << __FILE__ << "]: " << __LINE__ << " "<<  __FUNCTION__ << ": pthread_create error " << err << std::endl;
                exit(0);
            }
        }
    }
    //析构函数
    template<typename T>
    threadPool<T>::~threadPool(){
        //关闭线程循环
        isOpen = false;
        //回收线程
        for(int i = 0; i != m_thread_nums; ++i){
            pthread_join(m_threadList[i], nullptr);
        }
    }
    //添加到队列
    template<typename T>
    bool threadPool<T>::append(std::shared_ptr<T> request){
        MutexLockGuard m_locker(m_mutex);
        if(!isOpen || workList.size() >= MAX_LIST_NUMS){
            return false;
        }
        workList.push(request);
        //信号量+1
        m_sem.post();
        return true;
    }
    //取出并运行
    template<typename T>
    void threadPool<T>::run(){
        std::shared_ptr<T> request;
        //线程关闭且请求队列空
        while(isOpen && !workList.empty())
        {
            m_sem.wait();
            {
                MutexLockGuard m_locker(m_mutex);
                //如果被其他线程抢先用了
                if(workList.empty()) continue;
                request = workList.front();
                workList.pop();
            }
            if(request == nullptr) continue;
            ConnectRAII mysql_raii(m_connPool);
            request->mysql = mysql_raii.getConn();
            request->process();
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    要点:

    1. 对于线程我们不适用分离线程

    • 主要是考虑到主动关闭服务器的情况,threadPool持有一个存储线程ID的数组,在关闭服务器,也就是threadPool进入析构的时候,会先把isOpen变量赋值false,当工作线程检测到这个变量的改变之后,会禁止append(),然后把请求队列清空,退出线程。析构函数负责接受。这样才能请求队列被执行完毕之后优雅退出。
    //析构函数
    template<typename T>
    threadPool<T>::~threadPool(){
        //关闭线程循环
        isOpen = false;
        //回收线程
        for(int i = 0; i != m_thread_nums; ++i){
            pthread_join(m_threadList[i], nullptr);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 由于采用了模板,要求模板T的类型持有一个mysql变量,用于存储数据库连接,还需要一个process()函数,负责执行业务。
  • 相关阅读:
    基于人工势场法的二维平面内无人机的路径规划的matlab仿真,并通过对势场法改进避免了无人机陷入极值的问题
    保护基团的作用(四类基团的保护方法有哪些)--渝偲医药
    VUE el-select Select 选择器 选项是对象,显示是一个值name ,但是绑定的对象值是id方案
    货物从内地到香港的选择
    opengl pyqt 显示文字
    笙默考试管理系统-MyExamTest----codemirror(34)
    LINUX基础培训三十之理论基础知识自测题(附答案)
    Perl基础入门指南:从零开始掌握Perl编程
    新闻稿写作结构有哪些类型?新闻稿写作要点是什么?
    Spring中有哪几种方法获取HttpSession对象
  • 原文地址:https://blog.csdn.net/qq_42370809/article/details/126137479