线程池的特点:
空间换时间,浪费服务器的硬件资源,换取运行效率.
池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源.
当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配.
当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源.
工作流程:
采用Proactor并发模型,主线程负责监听文件描述符,接受socket连接,若当前监听的socket发生了读写事件,就把任务插入到请求队列中,工作线程从请求队列中取出任务,完成读写数据的处理。
线程池的定义如下:
- template <typename T>
- class threadpool
- {
- public:
- /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
- threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
- ~threadpool();
- bool append(T *request, int state);
- bool append_p(T *request);
-
- private:
- /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
- static void *worker(void *arg);
- void run();
-
- private:
- int m_thread_number; //线程池中的线程数
- int m_max_requests; //请求队列中允许的最大请求数
- pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_number
- std::list<T *> m_workqueue; //请求队列
- locker m_queuelocker; //保护请求队列的互斥锁
- sem m_queuestat; //是否有任务需要处理
- connection_pool *m_connPool; //数据库
- int m_actor_model; //模型切换
- };
- template <typename T>
- threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
- {
- if (thread_number <= 0 || max_requests <= 0)
- throw std::exception();
- m_threads = new pthread_t[m_thread_number];
- if (!m_threads)
- throw std::exception();
- for (int i = 0; i < thread_number; ++i)
- {
- if (pthread_create(m_threads + i, NULL, worker, this) != 0)
- {
- delete[] m_threads;
- throw std::exception();
- }
- if (pthread_detach(m_threads[i]))
- {
- delete[] m_threads;
- throw std::exception();
- }
- }
- }
- template <typename T>
- threadpool<T>::~threadpool()
- {
- delete[] m_threads;
- }
- template <typename T>
- bool threadpool<T>::append(T *request, int state)
- {
- m_queuelocker.lock();
- if (m_workqueue.size() >= m_max_requests)
- {
- m_queuelocker.unlock();
- return false;
- }
- request->m_state = state;
- m_workqueue.push_back(request);
- m_queuelocker.unlock();
- m_queuestat.post();
- return true;
- }
- template <typename T>
- bool threadpool<T>::append_p(T *request)
- {
- m_queuelocker.lock();
- if (m_workqueue.size() >= m_max_requests)
- {
- m_queuelocker.unlock();
- return false;
- }
- m_workqueue.push_back(request);
- m_queuelocker.unlock();
- m_queuestat.post();
- return true;
- }
- template <typename T>
- void *threadpool
::worker(void *arg) - {
- threadpool *pool = (threadpool *)arg;
- pool->run();
- return pool;
- }
- template <typename T>
- void threadpool<T>::run()
- {
- while (true)
- {
- m_queuestat.wait();
- m_queuelocker.lock();
- if (m_workqueue.empty())
- {
- m_queuelocker.unlock();
- continue;
- }
- T *request = m_workqueue.front();
- m_workqueue.pop_front();
- m_queuelocker.unlock();
- if (!request)
- continue;
- if (1 == m_actor_model)
- {
- if (0 == request->m_state)
- {
- if (request->read_once())
- {
- request->improv = 1;
- connectionRAII mysqlcon(&request->mysql, m_connPool);
- request->process();
- }
- else
- {
- request->improv = 1;
- request->timer_flag = 1;
- }
- }
- else
- {
- if (request->write())
- {
- request->improv = 1;
- }
- else
- {
- request->improv = 1;
- request->timer_flag = 1;
- }
- }
- }
- else
- {
- connectionRAII mysqlcon(&request->mysql, m_connPool);
- request->process();
- }
- }
- }
详细解释下work函数,这个函数不断的检查队列是否为空,如果不为空会唤醒线程,在处理任务前获取锁。从任务队列头部取出任务,释放锁,如果取出的任务对象是有效指针,如果是Proactor并发模型代表有数据要读,调用函数读取客户端数据,标记请求已被处理,获得数据库连接执行数据库操作。
如果读取失败,表示需要进行定时器的处理,如果m_stat=1,那么表示要写入数据,写入成功标记请求已经被处理,否则设置定时器。
如果不是Proactro模型,那么直接处理请求。