• Linux——线程池


    目录

    线程池的概念

    线程池的优点

    线程池的实现

    【注意】

    线程池的线程安全

    日志文件的实现


    线程池的概念

            线程池也是一种池化技术,可以预先申请一批线程,当我们后续有任务的时候就可以直接用,这本质上是一种空间换时间的策略。

            如果有任务来的时候再创建线程,那成本又要提高,又要初始化,又要创建数据结构。

    线程池的优点

    • 线程池避免了短时间内创建与销毁线程的代价。
    • 线程池不仅能够保证内核充分利用,还能防止过分调度。

    线程池的实现

            我们这次要实现的线程池就是这样,让主线程派发任务,让线程池中的线程处理任务,这也是一个生产者消费者模型。

    1. // thread.hpp
    2. // 把线程封装一下
    3. #pragma once
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. using namespace std;
    11. typedef void*(*func_t)(void*);
    12. class ThreadData
    13. {
    14. public:
    15. string name_;
    16. void* args_;
    17. };
    18. class Thread
    19. {
    20. public:
    21. Thread(int num, func_t callback, void* args)
    22. :func_(callback)
    23. {
    24. char nameBuffer[64];
    25. snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num);
    26. name_ = nameBuffer;
    27. tdata_.args_ = args;
    28. tdata_.name_ = name_;
    29. }
    30. void start()
    31. {
    32. pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
    33. }
    34. void join()
    35. {
    36. pthread_join(tid_, nullptr);
    37. }
    38. string name()
    39. {
    40. return name_;
    41. }
    42. ~Thread()
    43. {}
    44. private:
    45. string name_;
    46. pthread_t tid_;
    47. ThreadData tdata_;
    48. func_t func_;
    49. };
    1. // threadPool.hpp
    2. #pragma once
    3. #include "thread.hpp"
    4. #include "lockGuard.hpp"
    5. #include "log.hpp"
    6. const int g_default_num = 3;
    7. template <class T>
    8. class ThreadPool
    9. {
    10. public:
    11. // 通过接口获得成员变量
    12. pthread_mutex_t* getMutex()
    13. {
    14. return &lock_;
    15. }
    16. void waitCond()
    17. {
    18. pthread_cond_wait(&cond_, &lock_);
    19. }
    20. bool isEmpty()
    21. {
    22. return task_queue_.empty();
    23. }
    24. public:
    25. ThreadPool(int thread_num = g_default_num) // 初始化后,就已经有了对象,也有了this指针
    26. :num_(thread_num)
    27. {
    28. pthread_mutex_init(&lock_, nullptr);
    29. pthread_cond_init(&cond_, nullptr);
    30. for (int i = 0; i < num_; i++)
    31. {
    32. threads_.push_back(new Thread(i + 1, routine, this) ); // 通过传入this指针就可以拿到ThreadPool中的task_queue
    33. }
    34. }
    35. void run()
    36. {
    37. for (auto& iter : threads_)
    38. {
    39. iter->start();
    40. cout << iter->name() << "启动成功" << endl;
    41. }
    42. }
    43. // 去掉this指针
    44. // 消费的过程
    45. static void* routine(void* args)
    46. {
    47. ThreadData* td = (ThreadData*)args;
    48. ThreadPool* tq = (ThreadPool*)td->args_; // 去掉this指针就无法访问成员方法了,通过创建线程的时候传入this拿到线程池对象
    49. while (true)
    50. {
    51. T task;
    52. {
    53. lockGuard lockguard(tq->getMutex()); // 加锁
    54. while (tq->isEmpty()) tq->waitCond(); // 检测
    55. // 读取任务
    56. task = tq->getTask();
    57. }
    58. // 仿函数
    59. cout << td->name_ << ", 消费者:" << task._x << " + " << task._y << " = " << task() << endl;
    60. // sleep(1);
    61. }
    62. }
    63. void pushTask(const T& task)
    64. {
    65. lockGuard lockguard(&lock_);
    66. task_queue_.push(task);
    67. pthread_cond_signal(&cond_);
    68. }
    69. T getTask()
    70. {
    71. T t = task_queue_.front();
    72. task_queue_.pop();
    73. return t;
    74. }
    75. void joins()
    76. {
    77. for (auto& iter : threads_)
    78. {
    79. iter->join();
    80. }
    81. }
    82. ~ThreadPool()
    83. {
    84. for (auto& iter : threads_)
    85. {
    86. delete iter;
    87. }
    88. pthread_mutex_destroy(&lock_);
    89. pthread_cond_destroy(&cond_);
    90. }
    91. private:
    92. vector threads_;
    93. int num_;
    94. queue task_queue_; // 任务队列
    95. pthread_mutex_t lock_; // 互斥锁
    96. pthread_cond_t cond_; // 条件变量
    97. };
    1. // testMain.cc
    2. #include "threadPool.hpp"
    3. #include "Task.hpp"
    4. #include
    5. int Add(int x, int y)
    6. {
    7. return x + y;
    8. }
    9. int main()
    10. {
    11. srand((unsigned)time(nullptr));
    12. cout << "hello thread pool" << endl;
    13. ThreadPool *tp = new ThreadPool();
    14. tp->run();
    15. while (true)
    16. {
    17. int x = rand() % 10 + 1;
    18. usleep(rand() % 1000);
    19. int y = rand() % 10 + 1;
    20. Task t(x, y, Add);
    21. tp->pushTask(t);
    22. cout << "生产者:" << x << " + " << y << " = ? " << endl;
    23. //sleep(1);
    24. }
    25. tp->joins();
    26. return 0;
    27. }

    【注意】

    1. 线程池中的任务队列会被多个执行流访问,因此我们需要互斥锁对任务队列进行保护。
    2. 线程池中的线程要从任务队列中拿任务,所以任务队列中必须要先有任务,必须要加锁循环检测,如果任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,这些操作都是通过加锁和条件变量完成的
    3. 主线程向任务队列中push一个任务后,此时可能有线程正处于等待状态,所以在新增任务后需要唤醒在条件变量下等待的线程
    4. 某线程从任务队列中拿到任务后,该任务就已经属于当前线程了,所以解锁之后再进行处理任务,让加锁的动作更细粒度,也因为处理任务的过程会耗费时间,所以不要将处理动作其放到临界区当中
    5. 要给执行线程函数用static修饰,这个函数的类型必须是void* (*callback)(void*);如果放到类中,该函数就会多一个this指针。但是让他变成静态函数又不能访问线程池中的任务队列,所以要在线程创建的时候把线程池的对象指针传过去,因为初始化列表后已经有了对象,所以一定有this指针。也因为这个函数没有this指针,所以一些类内的操作要提供接口

    线程池的线程安全

            我们也可以把线程池变成单例模式(懒汉模式)的,让整个进程只有一个线程池,但是如果以后有多个线程同时访问,同时判断这个单例对象存不存在,那就会有线程安全的问题。

    1. class ThreadPool
    2. {
    3. // ...
    4. private: // 私有构造,删除拷贝构造和赋值重载
    5. ThreadPool(int thread_num = g_default_num) // 初始化后,就已经有了对象,也有了this指针
    6. :num_(thread_num)
    7. {
    8. pthread_mutex_init(&lock_, nullptr);
    9. pthread_cond_init(&cond_, nullptr);
    10. for (int i = 0; i < num_; i++)
    11. {
    12. threads_.push_back(new Thread(i + 1, routine, this) ); // 通过传入this指针就可以拿到ThreadPool中的task_queue
    13. }
    14. }
    15. ThreadPool(const ThreadPool&) = delete;
    16. const ThreadPool operator=(const ThreadPool& ) = delete;
    17. public:
    18. static ThreadPool* getThreadPool(int num = g_default_num) // 通过getThreadPool获取线程池
    19. {
    20. // 只有第一次为空的时候才创建,如果不为空直接返回thread_ptr,这样指针就只有一个
    21. {
    22. lockGuard lockguard(&mutex);
    23. if (nullptr == thread_ptr)
    24. {
    25. thread_ptr = new ThreadPool(num);
    26. }
    27. }
    28. return thread_ptr;
    29. }
    30. // ...
    31. private:
    32. // 添加静态成员变量
    33. static ThreadPool* thread_ptr; // 单例模式
    34. static pthread_mutex_t mutex;
    35. };
    36. // 初始化
    37. template<class T>
    38. ThreadPool* ThreadPool::thread_ptr = nullptr;
    39. template<class T>
    40. pthread_mutex_t ThreadPool::mutex = PTHREAD_MUTEX_INITIALIZER;

            这样就可以保证,第一次获取ThreadPool对象的时候,多个线程访问就是安全的。但这就带来了另一个问题,如果每次想要获取ThreadPool对象的时候就会申请释放锁,这个行为也是在浪费资源,所以还要再调整一下。

    1. static ThreadPool* getThreadPool(int num = g_default_num)
    2. {
    3. // 只有第一次为空的时候才创建,如果不为空直接返回thread_ptr,这样就只new了一次
    4. if (nullptr == thread_ptr) // 多判断一次不就可以了吗,已经创建了就直接返回,没有就加锁创建
    5. {
    6. lockGuard lockguard(&mutex);
    7. if (nullptr == thread_ptr)
    8. {
    9. thread_ptr = new ThreadPool(num);
    10. }
    11. }
    12. return thread_ptr;
    13. }

    这样使用双重判定空指针就减少了大量已经创建好单例,其他线程还在请求锁的行为。

    日志文件的实现

    我们需要用到下面这些接口。

    1. // log.hpp
    2. #pragma once
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. // 日志级别
    12. #define DEBUG 0
    13. #define NORMAL 1
    14. #define WARNING 2
    15. #define ERROR 3
    16. #define FATAL 4
    17. const char* gLevelMap[] = {
    18. "DEBUG",
    19. "NORMAL",
    20. "WARNING",
    21. "ERROR",
    22. "FATAL"
    23. };
    24. // 完整的日志功能,至少有:日志等级 时间 日志内容 支持用户自定义
    25. void logMessage(int level, const char* format, ...) // 最后一个参数就是可变参数列表
    26. {
    27. char stdBuffer[1024]; // 日志的标准部分
    28. time_t timestamp = time(nullptr); // 时间戳
    29. snprintf(stdBuffer, sizeof(stdBuffer), "[%s][%ld]", gLevelMap[level], timestamp);
    30. char logBuffer[1024]; // 自定义部分
    31. va_list args; // 可变参数列表
    32. va_start(args, format);
    33. vsnprintf(logBuffer, sizeof (logBuffer), format, args); // 用起来和printf相差不多
    34. va_end(args);
    35. // printf("%s%s\n", stdBuffer, logBuffer); // 打印到显示器
    36. FILE* fp = fopen("log.txt", "a");
    37. fprintf(fp, "%s%s\n", stdBuffer, logBuffer); // 打印到文件
    38. fclose(fp);
    39. }

            所以以后如果要用到这些线程池、日志文件等,就直接用了。 

  • 相关阅读:
    全程免费的ssl证书申请——七步实现网站https
    微信视频号下载视频工具3.0,实测有效免费保存!
    Springboot自行车在线租赁系统毕业设计源码101157
    Smart Document Control——杜绝文件成堆和文件混乱,保证业务连续性,创建企业新阶段
    cfssl使用方法重新整理说明
    VsCode:配置TypeScript开发环境
    Django 如何使用 Celery 完成异步任务或定时任务
    条款33、避免遮掩继承而来的名称
    math_利用微分算近似值
    高项 沟通管理论文
  • 原文地址:https://blog.csdn.net/m0_64607843/article/details/136714708