• 【Linux】线程池


    一、概念

    1.线程池是一种利用池化技术思想来实现的线程管理技术,主要是为了复用线程、便利地管理线程和任务、并将线程的创建和任务的执行解耦开来。我们可以创建线程池来复用已经创建的线程来降低频繁创建和销毁线程所带来的资源消耗。
    2.线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

    二、线程池的应用场景

    1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。
    2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
    3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

    三、线程池的优点
     

    1.降低资源消耗,复用已创建的线程来降低创建和销毁线程的消耗。
    2.提高响应速度,任务到达时,可以不需要等待线程的创建立即执行。
    3.提高线程的可管理性,使用线程池能够统一的分配、调优和监控。

    四、代码

    这里封装了Task,lockGuard,Thread,以及ThreadPool,具体请从ThreadPool看起,前面的封装只是为了方便ThreadPool使用.

    简化流程:ThreadPool也就是线程池,把任务数据结构(这里是队列)和多个线程封装在一起,当任务被生产出来并被线程池检测到,就会唤醒一个线程去处理,处理方法是把任务拷贝一份到线程独立栈上,并删除该任务。这里使用到了单例模式,把构造函数设为私有,只能调用类的静态成员函数来构造唯一一个线程池对象。

    ThreadPool.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include"Task.hpp"
    9. #include"Thread.hpp"
    10. #include"lockGuard.hpp"
    11. // 单例模式
    12. const int N=6;
    13. template<class T>
    14. class ThreadPool
    15. {
    16. private:
    17. ThreadPool(int num=N)//构造函数私有化
    18. :_num(num)
    19. {
    20. pthread_mutex_init(&_mutex,nullptr);
    21. pthread_cond_init(&_cond,nullptr);
    22. }
    23. ThreadPool(const ThreadPool &tp) = delete;//删除默认拷贝构造
    24. void operator=(const ThreadPool &tp) = delete;//删除赋值函数
    25. pthread_mutex_t* getlock()
    26. {
    27. return &_mutex;
    28. }
    29. void threadwait()
    30. {
    31. //挂起一个线程
    32. pthread_cond_wait(&_cond,&_mutex);
    33. }
    34. void threadwakeup()
    35. {
    36. //唤醒一个线程
    37. pthread_cond_signal(&_cond);
    38. }
    39. public:
    40. //创建单例模式对象;
    41. static ThreadPool* getinstance()
    42. {
    43. if(_instance==nullptr) //当一个对象已经被创建以后,就不进入申请锁并且判断的环节了;
    44. {
    45. lockGuard lock(&_mutex_instance);
    46. if(_instance==nullptr)
    47. {
    48. _instance=new ThreadPool();
    49. _instance->init();
    50. _instance->start();
    51. }
    52. }
    53. return _instance;
    54. }
    55. ~ThreadPool()
    56. {
    57. for(auto& e:_threads)
    58. {
    59. e.join();
    60. }
    61. pthread_mutex_destroy(&_mutex);
    62. pthread_cond_destroy(&_cond);
    63. }
    64. bool isEmpty()
    65. {
    66. return _tasks.empty();
    67. }
    68. void init()
    69. {
    70. //创建线程
    71. for(int i=1;i<=_num;++i)
    72. //pthread_create(&_threads[i],nullptr,ThreadRoutine,this);
    73. _threads.push_back(Thread(i,ThreadRoutine,this));
    74. }
    75. void start()
    76. {
    77. //线程启动
    78. for(auto& e:_threads)
    79. {
    80. e.run();
    81. }
    82. }
    83. void check()
    84. {
    85. for(auto& e:_threads)
    86. {
    87. std::cout<<"线程ID"<threadid()<<" , "<threadname()<<"is running··· "<
    88. }
    89. }
    90. //放入任务
    91. void pushtask(const T& task)
    92. {
    93. lockGuard lock(&_mutex);
    94. _tasks.push(task);
    95. threadwakeup();//有新任务进来,唤醒线程去处理
    96. }
    97. T poptask()
    98. {
    99. T t=_tasks.front();
    100. _tasks.pop();
    101. return t;
    102. }
    103. private:
    104. static void* ThreadRoutine(void* args)
    105. {
    106. pthread_detach(pthread_self());
    107. //指针强转成线程池对象类型;
    108. ThreadPool* tp=static_cast*>(args);
    109. while(true)
    110. {
    111. T t;
    112. {
    113. lockGuard lock(tp->getlock());
    114. //1.判断是否有任务
    115. //有->处理
    116. //无->等待
    117. //如果任务队列为空,则等待
    118. while(tp->isEmpty())
    119. {
    120. tp->threadwait();
    121. }
    122. t=tp->poptask();//从共有区域拿到线程独立栈上;
    123. }
    124. t();//调用task类里面的仿函数处理任务
    125. std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
    126. }
    127. }
    128. private:
    129. std::vector _threads;
    130. int _num;//线程池里有几个线程;
    131. std::queue _tasks; // 使用STL的自动扩容的特性
    132. pthread_mutex_t _mutex;
    133. pthread_cond_t _cond;
    134. static ThreadPool *_instance;//懒汉方式实现单例模式
    135. static pthread_mutex_t _mutex_instance;//单例对象有自己的锁
    136. };
    137. //静态变量初始化
    138. template<class T>
    139. ThreadPool* ThreadPool::_instance=nullptr;
    140. template<class T>
    141. pthread_mutex_t ThreadPool::_mutex_instance=PTHREAD_MUTEX_INITIALIZER;

    Task.hpp

    1. #pragma once
    2. #include
    3. #include
    4. class Task
    5. {
    6. public:
    7. Task(){}
    8. Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0) {}
    9. int operator()()
    10. {
    11. switch(_op)
    12. {
    13. case '+':
    14. _result=_x+_y;
    15. break;
    16. case '-':
    17. _result=_x-_y;
    18. break;
    19. case '*':
    20. _result=_x*_y;
    21. break;
    22. case '/':
    23. {
    24. if(_y==0)
    25. _exitCode=-1;
    26. else
    27. _result=_x/_y;
    28. }
    29. break;
    30. case '%':
    31. {
    32. if(_y==0)
    33. _exitCode=-2;
    34. else
    35. _result=_x%_y;
    36. }
    37. break;
    38. default:
    39. break;
    40. }
    41. }
    42. //任务
    43. std::string formatArg()
    44. {
    45. return std::to_string(_x) + _op + std::to_string(_y) + "=?";
    46. }
    47. //任务处理结果
    48. std::string formatRes()
    49. {
    50. return std::to_string(_result) + "(" + std::to_string(_exitCode)+ ")";
    51. }
    52. ~Task() {}
    53. private:
    54. int _x;
    55. int _y;
    56. char _op;
    57. int _result;
    58. int _exitCode;
    59. };

    lockGuard.hpp

    1. #pragma once
    2. #include
    3. #include
    4. class Mutex//成员:加锁函数和解锁函数
    5. {
    6. public:
    7. Mutex(pthread_mutex_t* pmutex):_pmutex(pmutex) {}
    8. void lock()
    9. {
    10. pthread_mutex_lock(_pmutex);
    11. }
    12. void unlock()
    13. {
    14. pthread_mutex_unlock(_pmutex);
    15. }
    16. ~Mutex(){}
    17. private:
    18. pthread_mutex_t* _pmutex;//需要传入一个互斥量(锁)的指针;
    19. };
    20. //对Mutex进行二次封装;
    21. //创建该对象时自动加锁,析构时自动解锁;
    22. class lockGuard
    23. {
    24. public:
    25. lockGuard(pthread_mutex_t* pmutex):_mutex(pmutex)//利用锁的指针构建Mutex对象
    26. {
    27. _mutex.lock();
    28. }
    29. ~lockGuard()
    30. {
    31. _mutex.unlock();
    32. }
    33. private:
    34. Mutex _mutex;//类内创建对象
    35. };

    Thread.hpp

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. class Thread
    6. {
    7. public:
    8. typedef void* (*func_t) (void*);
    9. typedef enum
    10. {
    11. NEW=0,
    12. RUNNING,
    13. EXITED
    14. }ThreadStatus;
    15. public:
    16. //状态:new,running,exited
    17. int status()
    18. {
    19. return _status;
    20. }
    21. //线程名
    22. std::string threadname()
    23. {
    24. return _name;
    25. }
    26. //线程ID(共享库中的进程地址空间的虚拟地址)
    27. pthread_t threadid()
    28. {
    29. if(_status==RUNNING)//线程已经被创建,线程id已经输入到成员变量_tid中;
    30. return _tid;
    31. else
    32. {
    33. return 0;
    34. }
    35. }
    36. public:
    37. //构造函数;
    38. Thread(int num,func_t func,void* args)//num代表第几个线程
    39. :_tid(0),
    40. _status(NEW),
    41. _func(func),
    42. _args(args)
    43. {
    44. char name[128];
    45. snprintf(name,sizeof(name),"thread-%d",num);
    46. _name=name;
    47. }
    48. //析构函数
    49. ~Thread(){}
    50. //静态成员函数不能访问类内所有成员,因为没有this指针;
    51. static void* runHelper(void *args)
    52. {
    53. Thread* td=(Thread*)args;
    54. (*td)();//调用仿函数执行线程的回调函数;
    55. return nullptr;
    56. }
    57. void operator()()//仿函数
    58. {
    59. //如果函数指针不为空,则执行该函数指针指向的回调函数;
    60. if(_func!=nullptr) _func(_args);
    61. }
    62. //创建线程
    63. void run()
    64. {
    65. //因为runHelper函数必须只能有一个void*参数,所以runHelper函数在类内必须定义为static,这样才没有this指针;
    66. int n=pthread_create(&_tid,nullptr,runHelper,this);
    67. if(n!=0) return exit(1);//线程创建失败,那么直接退出进程;
    68. _status=RUNNING;
    69. }
    70. //等待线程结束
    71. void join()
    72. {
    73. int n=pthread_join(_tid,nullptr);
    74. if(n!=0)
    75. {
    76. std::cerr<<"main thread join thread "<<_name<<" error "<
    77. return;
    78. }
    79. _status=EXITED;//线程退出;
    80. }
    81. private:
    82. pthread_t _tid;//线程ID(原生线程库中为该线程所创建的TCB起始虚拟地址)
    83. std::string _name;//线程名
    84. func_t _func;//线程要执行的回调
    85. void* _args;//线程回调函数参数
    86. ThreadStatus _status;//枚举类型:状态
    87. };

    main.cc

    1. #include
    2. #include
    3. #include"ThreadPool.hpp"
    4. using namespace std;
    5. int main()
    6. {
    7. // unique_ptr < ThreadPool > tp(new ThreadPool(20));
    8. // tp->init();
    9. // tp->start();
    10. // tp->check();
    11. //实例化类,并且多次调用getinstance()函数;
    12. printf("0X%x\n", ThreadPool::getinstance());
    13. printf("0X%x\n", ThreadPool::getinstance());
    14. printf("0X%x\n", ThreadPool::getinstance());
    15. printf("0X%x\n", ThreadPool::getinstance());
    16. printf("0X%x\n", ThreadPool::getinstance());
    17. printf("0X%x\n", ThreadPool::getinstance());
    18. while(true)
    19. {
    20. // 充当生产者, 从网络中读取数据,构建成为任务,推送给线程池
    21. int x, y;
    22. char op;
    23. std::cout << "please Enter x> ";
    24. std::cin >> x;
    25. std::cout << "please Enter y> ";
    26. std::cin >> y;
    27. std::cout << "please Enter op(+-*/%)> ";
    28. std::cin >> op;
    29. Task t(x,y,op);
    30. ThreadPool::getinstance()->pushtask(t); //单例对象也有可能在多线程场景中使用!
    31. }
    32. return 0;
    33. }

    makefile

    1. ThreadPool:main.cc
    2. g++ $^ -o $@ -std=c++11 -lpthread
    3. .PHONY:clean
    4. clean:
    5. rm -f ThreadPool

  • 相关阅读:
    周鸿祎自称3次破解特斯拉云端系统:安全隐患巨大
    大厂标配 | 百亿级并发系统设计 | 学完薪资框框涨
    Hadoop3教程(二十):MapReduce的工作机制总结
    【Leetcode】70. 爬楼梯
    23.1 微服务理论基础
    Python中,如何读取和写入文件?
    Simulink模型加密共享
    java计算机毕业设计学生网上请假系统源码+系统+数据库+lw文档
    2696. Minimum String Length Afte
    构造与析构
  • 原文地址:https://blog.csdn.net/zzxz8/article/details/132638557