线程虽然比进程轻量了很多,但是每创建一个线程时,需要向操作系统申请空间创建,如果需要开辟大量的线程,申请和销毁的开销也是很大的。所以如果能够提前申请一块空间,专门用来创建线程,那么就能提高一些效率。
线程池:一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过度。
线程池通过一个线程安全的阻塞任务队列加上一个或一个以上的线程实现,线程池中的线程可以从阻塞队列中获取任务进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。
线程池的作用:可以避免大量线程频繁创建或销毁所带来的时间成本,也可以避免在峰值压力下,系统资源耗尽的风险;并且可以统一对线程池中的线程进行管理,调度监控。
线程池的应用场景:
需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,可以想象一个热门网站的点击次数。
对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,导致出现错误。
Task.cpp 任务对象
#pragma once #include
#include #include class Task { public: Task() { } Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0) { } void operator()() { switch (_op) { case '+': _result = _x + _y; break; case '-': _result = _x - _y; break; case '*': _result = _x * _y; break; case '/': { if (_y == 0) _exitCode = -1; else _result = _x / _y; } break; case '%': { if (_y == 0) _exitCode = -2; else _result = _x % _y; } break; default: break; } usleep(100000); } std::string formatArg() { return std::to_string(_x) + _op + std::to_string(_y) + "= ?"; } std::string formatRes() { return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")"; } ~Task() { } private: int _x; int _y; char _op; int _result; int _exitCode; }; ```
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
ThreadPool_v1.hpp 自定义实现线程池
#pragma once #include
#include #include #include #include #include #include "Task.hpp" const static int N = 5; template <class T> class ThreadPool { public: ThreadPool(int num = N) : _num(num), _threads(num) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_cond, nullptr); } void lockQueue() { pthread_mutex_lock(&_lock); } void unlockQueue() { pthread_mutex_unlock(&_lock); } void threadWait() { pthread_cond_wait(&_cond, &_lock); } void threadWakeup() { pthread_cond_signal(&_cond); } bool isEmpty() { return _tasks.empty(); } T popTask() { T t = _tasks.front(); _tasks.pop(); return t; } static void *threadRoutine(void *args) { pthread_detach(pthread_self()); ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args); while (true) { // 1. 检测有没有任务 // 2. 有:处理 // 3. 无:等待 // 细节:必定加锁 tp->lockQueue(); while (tp->isEmpty()) { tp->threadWait(); } T t = tp->popTask(); // 从公共区域拿到私有区域 tp->unlockQueue(); // for test t(); std::cout << "thread handler done, result: " << t.formatRes() << std::endl; // t.run(); // 处理任务,不应该在临界区中处理 ,已经拿到任务,线程可以自己去处理 } } void init() { // TODO } void start() { for (int i = 0; i < _num; i++) { pthread_create(&_threads[i], nullptr, threadRoutine, (void *)this); // ? } } void pushTask(const T &t) { lockQueue(); _tasks.push(t); threadWakeup(); unlockQueue(); } ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); } private: std::vector<pthread_t> _threads; int _num; std::queue<T> _tasks; // 使用stl的自动扩容的特性 pthread_mutex_t _lock; pthread_cond_t _cond; }; ```
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
main.cpp
#include "ThreadPool_v1.hpp" #include "Task.hpp" #include
int main() { ThreadPool<Task> tp; tp.init(); tp.start(); while (true) { int x, y; char op; std::cout << "please Enter x> "; std::cin >> x; std::cout << "please Enter y> "; std::cin >> y; std::cout << "please Enter op(+-*/%)> "; std::cin >> op; Task t(x, y, op); //ThreadPool ::getinstance()->pushTask(t); tp.pushTask(t); } } ```
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
一个类,只应该实例化出一个对象,就称为单例。
定义对象的本质,是将对象加载到内存,只让该对象在内存中加载一次,就是单例。
对象被设计成单例的场景:①语义上只需要一个对象 ②该对象内部存在大量的空间,保存了大量的数据,如果允许该对象存在多份,或者允许发生拷贝,内存中会存在数据冗余。
单例模式的两种模式: 饿汉模式、懒汉模式。
懒汉模式:对于一个进程来说,我们只有在第一次使用的时候才去创建。优点是节省进程加载的时间。缺点是在第一次使用的时候才去创建。
饿汉模式:在代码转化为进程时,先于main函数之前就已经创建好了,不需要使用的时候再去创建。优点是可以直接使用,该资源早早创建出来,影响进程加载的速度。
template <class T>
class Singleton
{
static T data; // 静态成员,该成员属于类,不属于对象,一旦创建了类,该成员就被创建了
public:
static T* GetInstance()
{
return &data;
}
};
template <class T>
T Singleton<T>:: data = T();
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例。
template <class T>
class Singleton
{
static T* inst; // 先创建静态成员的指针,指针指向空
public:
static T* GetInstance()
{
if (inst == nullptr)
{
inst = new T(); // 一旦调用了该函数(即需要用到该对象的时候),才创建该对象
}
return inst;
}
};
template <class T>
T* Singleton<T>:: inst = nullptr;
存在线程安全问题: 在没有实例化对象时, 如果两个线程同时调用GetInstance, 可能会创建出两份 T 对象的实例。所以需在创建单例时进行加锁。
其中的大概框架还是线程池,只是创建对象的方式改变了。
不会在main函数中一开就创建对象,而是在获取到任务之后才创建。并且实现了线程安全。
我们还将前面自己封装的线程类和自己封装的锁加入线程池的使用。
Task.hpp 对象类,模拟一个任务
#pragma once #include
#include #include class Task { public: Task() { } Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0) { } void operator()() { switch (_op) { case '+': _result = _x + _y; break; case '-': _result = _x - _y; break; case '*': _result = _x * _y; break; case '/': { if (_y == 0) _exitCode = -1; else _result = _x / _y; } break; case '%': { if (_y == 0) _exitCode = -2; else _result = _x % _y; } break; default: break; } usleep(100000); } std::string formatArg() { return std::to_string(_x) + _op + std::to_string(_y) + "= ?"; } std::string formatRes() { return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")"; } ~Task() { } private: int _x; int _y; char _op; int _result; int _exitCode; }; ```
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
lockGuard.hpp 自定义封装的锁
#pragma once #include
#include class Mutex // 自己不维护锁,有外部传入 { public: Mutex(pthread_mutex_t *mutex):_pmutex(mutex) {} void lock() { pthread_mutex_lock(_pmutex); } void unlock() { pthread_mutex_unlock(_pmutex); } ~Mutex() {} private: pthread_mutex_t *_pmutex; }; class LockGuard // 自己不维护锁,有外部传入 { public: LockGuard(pthread_mutex_t *mutex):_mutex(mutex) { _mutex.lock(); } ~LockGuard() { _mutex.unlock(); } private: Mutex _mutex; };
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
Thread.hpp 自定义封装的线程
#pragma once #include
#include #include #include #include class Thread { public: typedef enum { NEW = 0, RUNNING, EXITED } ThreadStatus; typedef void (*func_t)(void *); public: Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args) { char name[128]; snprintf(name, sizeof(name), "thread-%d", num); _name = name; } int status() { return _status; } std::string threadname() { return _name; } pthread_t threadid() { if (_status == RUNNING) return _tid; else { return 0; } } // 是不是类的成员函数,而类的成员函数,具有默认参数this,需要static // 但是会有新的问题:static成员函数,无法直接访问类属性和其他成员函数 static void *runHelper(void *args) { Thread *ts = (Thread*)args; //拿到当前对象 // _func(_args); (*ts)(); return nullptr; } void operator ()() //仿函数 { if(_func != nullptr) _func(_args); } void run() { int n = pthread_create(&_tid, nullptr, runHelper, this); if(n != 0) exit(1); _status = RUNNING; } void join() { int n = pthread_join(_tid, nullptr); if( n!=0) { std::cerr << "main thread join thread " << _name << " error" << std::endl; return; } _status = EXITED; } ~Thread() { } private: pthread_t _tid; std::string _name; func_t _func; // 线程未来要执行的回调 void *_args; ThreadStatus _status; }; ``
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
ThreadPool_v2.hpp 饿汉方式实现单例模式的线程池
#pragma once #include
#include #include #include #include #include "Thread.hpp" #include "Task.hpp" #include "lockGuard.hpp" const static int N = 5; template <class T> class ThreadPool { private: ThreadPool(int num = N) : _num(num) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_cond, nullptr); } ThreadPool(const ThreadPool<T> &tp) = delete; void operator=(const ThreadPool<T> &tp) = delete; public: static ThreadPool<T> *getinstance() { if(nullptr == instance) // 提高效率,减少加锁的次数 { LockGuard lockguard(&instance_lock); if (nullptr == instance) { instance = new ThreadPool<T>(); instance->init(); instance->start(); } } return instance; } pthread_mutex_t *getlock() { return &_lock; } void threadWait() { pthread_cond_wait(&_cond, &_lock); } void threadWakeup() { pthread_cond_signal(&_cond); } bool isEmpty() { return _tasks.empty(); } T popTask() { T t = _tasks.front(); _tasks.pop(); return t; } static void threadRoutine(void *args) { // pthread_detach(pthread_self()); ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args); while (true) { // 1. 检测有没有任务 // 2. 有:处理 // 3. 无:等待 // 细节:必定加锁 T t; { LockGuard lockguard(tp->getlock()); while (tp->isEmpty()) { tp->threadWait(); } t = tp->popTask(); // 从公共区域拿到私有区域 } t(); std::cout << "thread handler done, result: " << t.formatRes() << std::endl; // t.run(); // 处理对象在临界区外 } } void init() { for (int i = 0; i < _num; i++) { _threads.push_back(Thread(i, threadRoutine, this)); } } void start() { for (auto &t : _threads) { t.run(); } } void check() { for (auto &t : _threads) { std::cout << t.threadname() << " running..." << std::endl; } } void pushTask(const T &t) { LockGuard lockgrard(&_lock); _tasks.push(t); threadWakeup(); } ~ThreadPool() { for (auto &t : _threads) { t.join(); } pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); } private: std::vector<Thread> _threads; int _num; std::queue<T> _tasks; // 使用stl的自动扩容的特性 pthread_mutex_t _lock; pthread_cond_t _cond; static ThreadPool<T> *instance; static pthread_mutex_t instance_lock; }; template <class T> ThreadPool<T> *ThreadPool<T>::instance = nullptr; template <class T> pthread_mutex_t ThreadPool<T>::instance_lock = PTHREAD_MUTEX_INITIALIZER;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139