1.降低资源消耗,复用已创建的线程来降低创建和销毁线程的消耗。
2.提高响应速度,任务到达时,可以不需要等待线程的创建立即执行。
3.提高线程的可管理性,使用线程池能够统一的分配、调优和监控。
这里封装了Task,lockGuard,Thread,以及ThreadPool,具体请从ThreadPool看起,前面的封装只是为了方便ThreadPool使用.
简化流程:ThreadPool也就是线程池,把任务数据结构(这里是队列)和多个线程封装在一起,当任务被生产出来并被线程池检测到,就会唤醒一个线程去处理,处理方法是把任务拷贝一份到线程独立栈上,并删除该任务。这里使用到了单例模式,把构造函数设为私有,只能调用类的静态成员函数来构造唯一一个线程池对象。
- #pragma once
- #include
- #include
- #include
- #include
- #include
- #include
-
-
- #include"Task.hpp"
- #include"Thread.hpp"
- #include"lockGuard.hpp"
-
- // 单例模式
- const int N=6;
-
- template<class T>
- class ThreadPool
- {
- private:
- ThreadPool(int num=N)//构造函数私有化
- :_num(num)
-
- {
- pthread_mutex_init(&_mutex,nullptr);
- pthread_cond_init(&_cond,nullptr);
-
- }
- ThreadPool(const ThreadPool
&tp) = delete;//删除默认拷贝构造 - void operator=(const ThreadPool
&tp) = delete;//删除赋值函数 - pthread_mutex_t* getlock()
- {
- return &_mutex;
- }
-
-
- void threadwait()
- {
- //挂起一个线程
- pthread_cond_wait(&_cond,&_mutex);
- }
-
- void threadwakeup()
- {
- //唤醒一个线程
- pthread_cond_signal(&_cond);
- }
-
- public:
- //创建单例模式对象;
- static ThreadPool
* getinstance() - {
- if(_instance==nullptr) //当一个对象已经被创建以后,就不进入申请锁并且判断的环节了;
- {
- lockGuard lock(&_mutex_instance);
- if(_instance==nullptr)
- {
- _instance=new ThreadPool
(); - _instance->init();
- _instance->start();
- }
- }
- return _instance;
- }
-
- ~ThreadPool()
- {
- for(auto& e:_threads)
- {
- e.join();
- }
- pthread_mutex_destroy(&_mutex);
- pthread_cond_destroy(&_cond);
- }
-
- bool isEmpty()
- {
- return _tasks.empty();
- }
-
- void init()
- {
- //创建线程
- for(int i=1;i<=_num;++i)
- //pthread_create(&_threads[i],nullptr,ThreadRoutine,this);
- _threads.push_back(Thread(i,ThreadRoutine,this));
- }
-
-
- void start()
- {
- //线程启动
- for(auto& e:_threads)
- {
- e.run();
- }
- }
-
- void check()
- {
- for(auto& e:_threads)
- {
- std::cout<<"线程ID"<
threadid()<<" , "<threadname()<<"is running··· "< - }
-
- }
- //放入任务
- void pushtask(const T& task)
- {
- lockGuard lock(&_mutex);
- _tasks.push(task);
-
- threadwakeup();//有新任务进来,唤醒线程去处理
- }
-
-
- T poptask()
- {
- T t=_tasks.front();
- _tasks.pop();
- return t;
- }
- private:
- static void* ThreadRoutine(void* args)
- {
- pthread_detach(pthread_self());
- //指针强转成线程池对象类型;
- ThreadPool
* tp=static_cast*>(args); - while(true)
- {
-
- T t;
- {
-
- lockGuard lock(tp->getlock());
-
- //1.判断是否有任务
- //有->处理
- //无->等待
- //如果任务队列为空,则等待
- while(tp->isEmpty())
- {
- tp->threadwait();
- }
- t=tp->poptask();//从共有区域拿到线程独立栈上;
- }
-
- t();//调用task类里面的仿函数处理任务
- std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
- }
- }
- private:
-
- std::vector
_threads; - int _num;//线程池里有几个线程;
-
- std::queue
_tasks; // 使用STL的自动扩容的特性 -
- pthread_mutex_t _mutex;
- pthread_cond_t _cond;
-
- static ThreadPool
*_instance;//懒汉方式实现单例模式 - static pthread_mutex_t _mutex_instance;//单例对象有自己的锁
- };
-
-
- //静态变量初始化
- template<class T>
- ThreadPool
* ThreadPool::_instance=nullptr; -
- template<class T>
- pthread_mutex_t ThreadPool
::_mutex_instance=PTHREAD_MUTEX_INITIALIZER;
Task.hpp
- #pragma once
- #include
- #include
-
-
- class Task
- {
- public:
- Task(){}
-
- Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0) {}
-
- int operator()()
- {
- switch(_op)
- {
- case '+':
- _result=_x+_y;
- break;
- case '-':
- _result=_x-_y;
- break;
- case '*':
- _result=_x*_y;
- break;
- case '/':
- {
- if(_y==0)
- _exitCode=-1;
- else
- _result=_x/_y;
- }
- break;
- case '%':
- {
- if(_y==0)
- _exitCode=-2;
- else
- _result=_x%_y;
- }
- break;
- default:
- break;
- }
- }
-
- //任务
- std::string formatArg()
- {
- return std::to_string(_x) + _op + std::to_string(_y) + "=?";
- }
-
- //任务处理结果
- std::string formatRes()
- {
- return std::to_string(_result) + "(" + std::to_string(_exitCode)+ ")";
- }
-
- ~Task() {}
-
-
-
- private:
- int _x;
- int _y;
- char _op;
- int _result;
- int _exitCode;
- };
lockGuard.hpp
- #pragma once
-
- #include
- #include
-
- class Mutex//成员:加锁函数和解锁函数
- {
- public:
- Mutex(pthread_mutex_t* pmutex):_pmutex(pmutex) {}
-
- void lock()
- {
- pthread_mutex_lock(_pmutex);
- }
-
- void unlock()
- {
- pthread_mutex_unlock(_pmutex);
- }
-
-
- ~Mutex(){}
-
- private:
- pthread_mutex_t* _pmutex;//需要传入一个互斥量(锁)的指针;
- };
-
-
- //对Mutex进行二次封装;
- //创建该对象时自动加锁,析构时自动解锁;
- class lockGuard
- {
- public:
- lockGuard(pthread_mutex_t* pmutex):_mutex(pmutex)//利用锁的指针构建Mutex对象
- {
- _mutex.lock();
- }
-
- ~lockGuard()
- {
- _mutex.unlock();
- }
-
- private:
- Mutex _mutex;//类内创建对象
- };
Thread.hpp
- #pragma once
-
- #include
- #include
- #include
-
- class Thread
- {
- public:
- typedef void* (*func_t) (void*);
- typedef enum
- {
- NEW=0,
- RUNNING,
- EXITED
- }ThreadStatus;
-
- public:
- //状态:new,running,exited
- int status()
- {
- return _status;
- }
-
- //线程名
- std::string threadname()
- {
- return _name;
- }
-
- //线程ID(共享库中的进程地址空间的虚拟地址)
- pthread_t threadid()
- {
- if(_status==RUNNING)//线程已经被创建,线程id已经输入到成员变量_tid中;
- return _tid;
- else
- {
- return 0;
- }
- }
-
- public:
-
- //构造函数;
- Thread(int num,func_t func,void* args)//num代表第几个线程
- :_tid(0),
- _status(NEW),
- _func(func),
- _args(args)
-
- {
- char name[128];
- snprintf(name,sizeof(name),"thread-%d",num);
- _name=name;
- }
-
- //析构函数
- ~Thread(){}
-
-
-
- //静态成员函数不能访问类内所有成员,因为没有this指针;
- static void* runHelper(void *args)
- {
- Thread* td=(Thread*)args;
- (*td)();//调用仿函数执行线程的回调函数;
- return nullptr;
- }
-
- void operator()()//仿函数
- {
- //如果函数指针不为空,则执行该函数指针指向的回调函数;
- if(_func!=nullptr) _func(_args);
- }
-
- //创建线程
- void run()
- {
- //因为runHelper函数必须只能有一个void*参数,所以runHelper函数在类内必须定义为static,这样才没有this指针;
- int n=pthread_create(&_tid,nullptr,runHelper,this);
- if(n!=0) return exit(1);//线程创建失败,那么直接退出进程;
- _status=RUNNING;
- }
-
- //等待线程结束
- void join()
- {
- int n=pthread_join(_tid,nullptr);
- if(n!=0)
- {
- std::cerr<<"main thread join thread "<<_name<<" error "<
- return;
- }
- _status=EXITED;//线程退出;
- }
-
- private:
- pthread_t _tid;//线程ID(原生线程库中为该线程所创建的TCB起始虚拟地址)
- std::string _name;//线程名
- func_t _func;//线程要执行的回调
- void* _args;//线程回调函数参数
- ThreadStatus _status;//枚举类型:状态
- };
-
main.cc
- #include
- #include
- #include"ThreadPool.hpp"
-
-
-
- using namespace std;
-
- int main()
- {
- // unique_ptr < ThreadPool
> tp(new ThreadPool(20)); - // tp->init();
- // tp->start();
- // tp->check();
-
- //实例化类,并且多次调用getinstance()函数;
- printf("0X%x\n", ThreadPool
::getinstance()); - printf("0X%x\n", ThreadPool
::getinstance()); - printf("0X%x\n", ThreadPool
::getinstance()); - printf("0X%x\n", ThreadPool
::getinstance()); - printf("0X%x\n", ThreadPool
::getinstance()); - printf("0X%x\n", ThreadPool
::getinstance()); - while(true)
- {
- // 充当生产者, 从网络中读取数据,构建成为任务,推送给线程池
- int x, y;
- char op;
- std::cout << "please Enter x> ";
- std::cin >> x;
- std::cout << "please Enter y> ";
- std::cin >> y;
- std::cout << "please Enter op(+-*/%)> ";
- std::cin >> op;
-
- Task t(x,y,op);
- ThreadPool
::getinstance()->pushtask(t); //单例对象也有可能在多线程场景中使用! - }
-
-
- return 0;
- }
makefile
- ThreadPool:main.cc
- g++ $^ -o $@ -std=c++11 -lpthread
- .PHONY:clean
- clean:
- rm -f ThreadPool