目录
一,线程概念
在一程序内,一个执行路线称为线程thread,即线程是一个进程内部的控制序列;

线程优点
线程缺点
线程异常
线程用途
二,Linux进程与线程
进程的多个线程共享同一地址空间,因此Text Segment、Data Segment都是共享的;如定义一个函数,在各个线程中都可调用,如定义一个全局变量,在各线程都可访问到,除此之外,各线程还共享以下进程资源和环境:

三,Linux线程控制
POSIX线程库
线程创建
//创建新线程 int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void * (*start_routine)(void*), void *arg);
- thread,返回线程的ID;
- attr,设置线程的属性,如为NULL表示使用默认属性;
- start_routine,函数地址,线程启动后执行的函数;
- arg,传给线程启动函数的参数;
返回值
- 成功返回0,失败返回错误码;
- #include
- #include
- #include
- #include
- #include
-
- void* rout(void* arg){
- for( ; ; ){
- printf("I am thread1\n");
- sleep(1);
- }
- }
-
- int main(){
- pthread_t tid;
- int ret = pthread_create(&tid, NULL, rout, NULL);
- if(ret != 0){
- fprintf(stderr, "pthread_create: %s\n", strerror(ret));
- exit(EXIT_FAILURE);
- }
- for( ; ; ){
- printf("I am main thread\n");
- sleep(1);
- }
- }
- [wz@192 Desktop]$ gcc -o test test.c -lpthread
- [wz@192 Desktop]$ ldd test
- linux-vdso.so.1 => (0x00007ffce0bb2000)
- libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f9ab6dd8000)
- libc.so.6 => /lib64/libc.so.6 (0x00007f9ab6a0a000)
- /lib64/ld-linux-x86-64.so.2 (0x00007f9ab6ff4000)
- [wz@192 Desktop]$ ./test
- I am main thread
- I am thread1
- I am main thread
- I am thread1
- I am main thread
- I am thread1
- //一个进程,两个线程(轻量级进程)
- [wz@192 ~]$ ps axj | head -1 && ps axj | grep test
- PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
- 2976 53194 53194 2976 pts/0 53194 Sl+ 1000 0:00 ./test
- 3351 53324 53323 3351 pts/1 53323 S+ 1000 0:00 grep --color=auto test
- [wz@192 ~]$ ps -aL | head -1 && ps -aL | grep test
- PID LWP TTY TIME CMD
- 53194 53194 pts/0 00:00:00 test
- 53194 53195 pts/0 00:00:00 test
线程ID及进程地址空间布局
pthread_t类型是什么,取决于实现;对于Linux目前实现的NPTL,pthread_t类型的线程ID,本质上是一个进程地址空间的一个地址;
- Linux没有真正意义上的线程,是用进程模拟的(轻量级进程);
- Linux本身不会直接提供类似线程创建、终止、等待、分离等相关system call接口,但会提供创建轻量级进程的接口vfork;
- 但用户需要所谓的线程创建、终止、等待、分离等相关接口,所以系统基于轻量级进程接口模拟封装了用户原生线程库pthread;
- 进程由PCB管理的,用户层也需进行用户级线程管理(由用户空间维护);
- 用户层线程ID,本质是一个地址(共享区,pthread库中某个起始位置);

线程终止
如需终止某个线程而不是整个进程,有三种方法:
void pthread_exit(void* value_ptr);
- vaule_ptr,不要指向一个局部变量;
- pthread_exit或return返回的指针指向的内存单元必须是全局或是用malloc分配的,不能在线程函数的栈上分配,因为当其他线程得到这个返回指针时线程函数已经退出;
int pthread_cancel(pthread_t thread);
- 成功返回0,失败返回错误码;
线程等待
为何需要线程等待:
int pthread_join(pthread_t thread, void** value_ptr);
- value_ptr,指向一个指针,然后在指向线程的返回值;
- 成功返回0,失败返回错误码;
调用该函数的线程将挂起等待,直到id为thread的线程终止;thread的线程以不同的方法终止,通过pthread_join得到的终止状态也是不同的:

- #include
- #include
- #include
- #include
-
- void* thread1(void* arg){
- printf("thread1 returning ...\n");
- int *p = (int*)malloc(sizeof(int));
- *p = 1;
- return (void*)p;
- }
-
- void* thread2(void* arg){
- printf("thread1 exiting ...\n");
- int *p = (int*)malloc(sizeof(int));
- *p = 2;
- pthread_exit((void*)p);
- }
-
- void* thread3(void* arg){
- while(1){
- printf("thread3 running ...\n");
- sleep(1);
- }
- return NULL;
- }
-
- int main(){
- pthread_t tid;
- void* ret;
- //线程1,return
- pthread_create(&tid, NULL, thread1, NULL);
- pthread_join(tid, &ret);
- printf("thread1 return, thread id %x, return code: %d\n", tid, *(int*)ret);
- free(ret);
- //线程2,exit
- pthread_create(&tid, NULL, thread2, NULL);
- pthread_join(tid, &ret);
- printf("thread2 return, thread id %x, return code: %d\n", tid, *(int*)ret);
- free(ret);
- //线程3,cancel by other
- pthread_create(&tid, NULL, thread3, NULL);
- sleep(3);
- pthread_cancel(tid);
- pthread_join(tid, &ret);
- if(ret == PTHREAD_CANCELED)
- printf("thread3 return, thread id %x, return code: PTHREAD_CANCELED\n", tid);
- else
- printf("thread3 return, thread id %x, return code: NULL\n", tid);
- }
- [wz@192 Desktop]$ gcc -o test test.c -lpthread
- [wz@192 Desktop]$ ./test
- thread1 returning ...
- thread1 return, thread id 2d3f6700, return code: 1
- thread1 exiting ...
- thread2 return, thread id 2d3f6700, return code: 2
- thread3 running ...
- thread3 running ...
- thread3 running ...
- thread3 return, thread id 2d3f6700, return code: PTHREAD_CANCELED
线程分离
- int pthread_detach(pthread_t thread);
- //可是线程组内其他线程对目标线程进行分离,也可是线程自己分离
- pthread_detach(pthread_self());
- #include
- #include
- #include
- #include
-
- void* thread_run(void* arg){
- pthread_detach(pthread_self());
- printf("%s\n", (char*)arg);
- return NULL;
- }
-
- int main(){
- pthread_t tid;
- if(pthread_create(&tid, NULL, thread_run, "thread run ...\n") != 0){
- printf("create thread error\n");
- return 1;
- }
-
- int ret = 0;
- sleep(1); //很重要,要让线程先分离,在等待
-
- if(pthread_join(tid, NULL) == 0){
- printf("pthread wait success\n");
- ret = 0;
- } else{
- printf("pthread wait failed\n");
- ret = 1;
- }
- return ret;
- }
- [wz@192 Desktop]$ gcc -o test test.c -lpthread
- [wz@192 Desktop]$ ./test
- thread run ...
-
- pthread wait failed
四,Linux线程互斥
互斥量mutex
- #include
- #include
- #include
- #include
- #include
-
- int ticket = 100;
-
- void* route(void* arg){
- char* id = (char*)arg;
- while(1){
- if(ticket > 0){
- usleep(1000);
- printf("%s sells ticket: %d\n", id, ticket);
- ticket--;
- }
- else break;
- }
- }
-
- int main(){
- pthread_t t1, t2, t3, t4;
-
- pthread_create(&t1, NULL, route, "thread1");
- pthread_create(&t2, NULL, route, "thread2");
- pthread_create(&t3, NULL, route, "thread3");
- pthread_create(&t4, NULL, route, "thread4");
-
- pthread_join(t1, NULL);
- pthread_join(t2, NULL);
- pthread_join(t3, NULL);
- pthread_join(t4, NULL);
- }
- [wz@192 Desktop]$ gcc -o test test.c -lpthread
- [wz@192 Desktop]$ ./test
- thread1 sells ticket: 100
- thread3 sells ticket: 100
- thread4 sells ticket: 98
- thread2 sells ticket: 100
- thread1 sells ticket: 96
- thread4 sells ticket: 95
- thread2 sells ticket: 94
- ...
- thread3 sells ticket: 4
- thread4 sells ticket: 2
- thread1 sells ticket: 2
- thread3 sells ticket: 0
- thread2 sells ticket: 0
- thread4 sells ticket: -2
ticket--语句通常有三个步骤,绝对不是原子的:
- ticket从内存到CPU相关寄存器;
- CPU对ticket--操作;
- 操作完将ticket值在写回内存;
- 代码必须要有互斥行为,当代码进入临界区执行时,不允许其他线程进入该临界区;
- 如多个线程同时要求执行临界区的代码,并且临界区没有线程执行,那么只能允许一个线程进入临界区;
- 如线程不再临界区中执行,那么该线程不能阻止其他线程进入临界区;

互斥量接口
初始互斥量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
int pthread_mutex_init(pthread_mutex_t* restrict mutex, const pthread_mutexattr_t* restrict attr);
销毁互斥量
int pthread_mutex_destroy(pthread_mutex_t* mutex);
互斥量加锁/解锁
- int pthread_mutex_lock(pthread_mutex_t* mutex);
- int pthread_mutex_unlock(pthread_mutex_t* mutex);
调用pthread_lock时,可能会遇到以下问题:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功;
- 发起函数调用时,其他线程已锁定互斥量,或存在线程同时申请互斥量,但没有竞争到互斥量,那么调用会阻塞(执行流被挂起),等待互斥量解锁;
- #include
- #include
- #include
- #include
- #include
-
- int ticket = 100;
- pthread_mutex_t mutex;
-
- void* route(void* arg){
- char* id = (char*)arg;
- while(1){
- pthread_mutex_lock(&mutex);
- if(ticket > 0){
- usleep(1000);
- printf("%s sells ticket: %d\n", id, ticket);
- ticket--;
- pthread_mutex_unlock(&mutex);
- }
- else{
- pthread_mutex_unlock(&mutex);
- break;
- }
- }
- }
-
- int main(){
- pthread_t t1, t2, t3, t4;
-
- pthread_mutex_init(&mutex, NULL);
- pthread_create(&t1, NULL, route, "thread1");
- pthread_create(&t2, NULL, route, "thread2");
- pthread_create(&t3, NULL, route, "thread3");
- pthread_create(&t4, NULL, route, "thread4");
-
- pthread_join(t1, NULL);
- pthread_join(t2, NULL);
- pthread_join(t3, NULL);
- pthread_join(t4, NULL);
- pthread_mutex_destroy(&mutex);
- }
- [wz@192 Desktop]$ gcc -o test test.c -lpthread
- [wz@192 Desktop]$ ./test
- thread1 sells ticket: 100
- thread1 sells ticket: 99
- thread1 sells ticket: 98
- thread1 sells ticket: 97
- thread1 sells ticket: 96
- ...
- thread1 sells ticket: 5
- thread1 sells ticket: 4
- thread1 sells ticket: 3
- thread1 sells ticket: 2
- thread1 sells ticket: 1
互斥量实现原理

可重入和线程安全
常见线程不安全情况
- 不保护共享变量的函数;
- 函数状态随着被调用,状态发生变化的函数;
- 返回指向静态变量指针的函数;
- 调用线程不安全函数的函数;
常见线程安全的情况
- 每个线程对全局变量或静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的;
- 类或接口对于线程来说都是原子操作;
- 多个线程之间的切换不会导致该接口的执行结果存在二义性;
常见不可重入情况
- 调用了malloc/free函数,因malloc函数是全局链表来管理堆的;
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构;
- 可重入函数体内使用了静态的数据结构;
常见可重入情况
- 不使用全局变量或静态变量;
- 不使用malloc/new开辟的空间;
- 不调用不可重入函数;
- 不返回静态或全局数据,所有数据都由函数的调用者提供;
- 使用本地数据,或通过制作全局数据的本地拷贝来保护全局数据;
可重入与线程安全联系
可重入与线程安全区别
死锁
死锁是指在一组进程中各个进程均占有不会释放的资源,但因相互申请被其他进程所占有不会释放的资源而处于一种永久等待的状态;
四个必要条件
避免死锁
避免死锁算法
五,Linux线程同步
条件变量,描述临界资源的状态,当一个线程互斥地访问每个变量时,可能发现在其他线程改变状态之前,其什么也做不了,如一个线程访问队列时,发现队列为空,只能等待,只到其他线程将一个节点添加到队列中,此时就需要用到条件变量;
同步,在保证数据安全的情况下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步;
竞态条件,因为时序问题,而导致程序异常,称为竞态条件;
条件变量函数
//初始化 int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
//销毁 int pthread_cond_destroy(pthread_cond_t *cond)
//等待条件满足 int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
//唤醒等待 int pthread_cond_broadcast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);
- #include
- #include
- #include
-
- pthread_cond_t cond;
- pthread_mutex_t mutex;
-
- void* r1(void* arg){
- while(1){
- pthread_cond_wait(&cond, &mutex);
- printf("active\n");
- }
- }
-
- void* r2(void* arg){
- while(1){
- pthread_cond_signal(&cond);
- sleep(1);
- }
- }
-
- int main(){
- pthread_t t1, t2;
- pthread_cond_init(&cond, NULL);
- pthread_mutex_init(&mutex, NULL);
- pthread_create(&t1, NULL, r1, NULL);
- pthread_create(&t2, NULL, r2, NULL);
- pthread_join(t1, NULL);
- pthread_join(t2, NULL);
- pthread_mutex_destroy(&mutex);
- pthread_cond_destroy(&cond);
- }
- [wz@192 Desktop]$ gcc -o test test.c -lpthread
- [wz@192 Desktop]$ ./test
- active
- active
- active
- ...
pthread_cond_wait需要互斥量的原因

- // 错误的设计
- pthread_mutex_lock(&mutex);
- while (condition_is_false) {
- pthread_mutex_unlock(&mutex);
- //解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
- pthread_cond_wait(&cond);
- pthread_mutex_lock(&mutex);
- }
- pthread_mutex_unlock(&mutex);
条件变量使用规范
- //等待条件代码
- pthread_mutex_lock(&mutex);
- while(条件为假)
- pthread_cond_wait(cond, mutex);
- 修改条件
- pthread_mutex_unlock(&mutex);
- //给条件发送信号代码
- pthread_mutex_lock(&mutex);
- 设置条件为真
- pthread_cond_signal(cond);
- pthread_mutex_unlock(&mutex);
六,生产者消费者模型
此模型是通过一个容器来解决生产者和消费者的强耦合问题;生产者和消费者之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生成完数据后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡生产者和消费者的处理能力;此阻塞队列就是用来给生产者和消费者解耦的;
优点

基于BlockingQuene的生产者消费者模型
在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构;其与普通队列的区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出;以上操作均是基于不同的线程来说的,线程在对阻塞队列操作时会被阻塞;

- //单生产者、单消费者
- #include
- #include
- #include
- #include
- #include
- using namespace std;
-
- #define NUM 8
-
- class BlockQueue{
- private:
- queue<int> q;
- int cap;
- pthread_mutex_t lock;
- pthread_cond_t full;
- pthread_cond_t empty;
-
- private:
- void LockQueue(){
- pthread_mutex_lock(&lock);
- }
- void UnlockQueue(){
- pthread_mutex_unlock(&lock);
- }
- void Productwait(){
- pthread_cond_wait(&full, &lock);
- }
- void Consumewait(){
- pthread_cond_wait(&empty, &lock);
- }
- void NotifyProduct(){
- pthread_cond_signal(&full);
- }
- void NotifyConsume(){
- pthread_cond_signal(&empty);
- }
- bool IsEmpty(){
- return (q.size() == 0 ? true : false);
- }
- bool IsFull(){
- return (q.size() == cap ? true : false);
- }
-
- public:
- BlockQueue(int _cap = NUM)
- :cap(_cap)
- {
- pthread_mutex_init(&lock, NULL);
- pthread_cond_init(&full, NULL);
- pthread_cond_init(&empty, NULL);
- }
- void PushData(const int& data){
- LockQueue();
- while(IsFull()){
- NotifyConsume();
- cout<<"queue full, notify consume data, product stop;"<
- Productwait();
- }
- q.push(data);
- UnlockQueue();
- }
- void PopData(int& data){
- LockQueue();
- while(IsEmpty()){
- NotifyProduct();
- cout<<"queue empty, notify product data, consume stop;"<
- Consumewait();
- }
- data = q.front();
- q.pop();
- UnlockQueue();
- }
- ~BlockQueue(){
- pthread_mutex_destroy(&lock);
- pthread_cond_destroy(&full);
- pthread_cond_destroy(&empty);
- }
- };
-
- void* consumer(void* arg){
- BlockQueue* bqp = (BlockQueue*)arg;
- int data;
- for( ; ; ){
- bqp->PopData(data);
- cout<<"consume data done: "<
- }
- }
-
- void* producter(void* arg){
- BlockQueue* bqp = (BlockQueue*)arg;
- srand((unsigned long)time(NULL));
- for( ; ; ){
- int data = rand()%1024;
- bqp->PushData(data);
- cout<<"product data done: "<
- sleep(1);
- }
- }
-
- int main(){
- BlockQueue bq;
- pthread_t c, p;
- pthread_create(&c, NULL, consumer, (void*)&bq);
- pthread_create(&p, NULL, producter, (void*)&bq);
- pthread_join(c, NULL);
- pthread_join(p, NULL);
- return 0;
- }
- [wz@192 Desktop]$ g++ -o test test.c -lpthread
- [wz@192 Desktop]$ ./test
- queue empty, notify product data, consume stop;
- product data done: 763
- product data done: 391
- product data done: 116
- product data done: 519
- product data done: 611
- product data done: 106
- product data done: 825
- product data done: 58
- queue full, notify consume data, product stop;
- consume data done: 763
- consume data done: 391
- consume data done: 116
- consume data done: 519
- consume data done: 611
- consume data done: 106
- consume data done: 825
- consume data done: 58
- queue empty, notify product data, consume stop;
七,信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,以到达无冲突的访问共享资源目的;但POSIX可用于线程间同步;
信号量本质是一个计数器,用来描述临界资源数目的计数器;互斥锁能保护临界资源,条件变量能知道临界资源的状态;如临界资源可看作多份情况下,只要访问的区域不是同一个,即可让多个线程同时访问;任何线程如想访问临界资源中的某一个,一定必须先申请信号量,使用完毕,必须释放信号量;如需先申请信号量资源,前提是所有的线程,必须先看到信号量;信号量本身也是临界资源,其PV操作必须是原子的;
- //初始化信号量
- #include
- int sem_init(sem_t* sem, int pshared, unsigned int value);
- //pshared,0表示线程间共享,非零表示进程间共享;
- //value,信号量初始值;
- //销毁信号量
- int sem_destroy(sem_t* sem)
- //等待信号量,P()
- int sem_wait(sem_t* sem);
- //等待信号量会将信号量值减1
- //发布信号量,V()
- int sem_post(sem_t* sem);
- //发布信号量表示资源使用完毕,可以归还资源了,将信号量值加1;
基于环形队列的生产者消费者模型
- 环形队列采用数组模拟;
- 环形结构起始状态和结束状态都是一样的,不好判断空或满,所以可通过加计数器或标记位来判断空或满;另外也可预留一个空的位置,作为满的状态;

- //单生产单消费,没有直接加锁
- //多生产多消费,需加锁
- #include
- #include
- #include
- #include
- #include
- #include
- using namespace std;
-
- #define NUM 10
-
- class RingQueue{
- private:
- vector<int> q;
- int cap;
- int consume_step;
- int product_step;
- sem_t data_sem;
- sem_t space_sem;
-
- public:
- RingQueue(int _cap = NUM)
- :q(_cap), cap(_cap), consume_step(0), product_step(0)
- {
- sem_init(&data_sem, 0, 0);
- sem_init(&space_sem, 0, cap);
- }
- void PutData(const int& data){
- sem_wait(&space_sem);
- q[product_step] = data;
- sem_post(&data_sem);
- product_step++;
- product_step %= cap;
- }
- void GetData(int& data){
- sem_wait(&data_sem);
- data = q[consume_step];
- sem_post(&space_sem);
- consume_step++;
- consume_step %= cap;
- }
- ~RingQueue(){
- sem_destroy(&data_sem);
- sem_destroy(&space_sem);
- }
- };
-
- void* consumer(void* arg){
- RingQueue* rqp = (RingQueue*)arg;
- int data;
- for( ; ; ){
- rqp->GetData(data);
- cout<<"consume data done: "<
- sleep(1);
- }
- }
-
- void* producter(void* arg){
- RingQueue* rqp = (RingQueue*)arg;
- srand((unsigned long)time(NULL));
- for( ; ; ){
- int data = rand()%1024;
- rqp->PutData(data);
- cout<<"product data done: "<
- }
- }
-
- int main(){
- RingQueue* rq = new RingQueue(10);
- pthread_t c, p;
- pthread_create(&c, NULL, consumer, (void*)rq);
- pthread_create(&p, NULL, producter, (void*)rq);
- pthread_join(c, NULL);
- pthread_join(p, NULL);
- }
八,线程池
线程池一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能;而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务;这避免了在处理短时间任务时创建与销毁的代价;线程池不仅能够保证内核的充分利用,还能防止过分调度;可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络socket等的数量;
应用场景:
- 需要大量的线程来完成任务,且完成任务的时间较短;WEB服务器完成网页请求这样的任务,使用线程池技术就非常合适;因为单个任务小、任务数量巨大(可想象网站点击次数);但对于长时间的任务,如一个Telnet连接请求,线程池的优点就不明显了;因为Telnet会话时间比线程的创建时间大得多;
- 对性能要求苛刻的应用,如要求服务器迅速响应客户请求;
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用;突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可使内存达到极限,出现错误;
- //threadpool.hpp
- #ifndef __M_TP_H__
- #define __M_TP_H__
-
- #include
- #include
- #include
- #include
-
- #define MAX_THREAD 5
- typedef bool (* handler_t)(int);
-
- class ThreadTask{
- private:
- int _data;
- handler_t _handler;
- public:
- ThreadTask():_data(-1), _handler(NULL){}
- ThreadTask(int data, handler_t handler){
- _data = data;
- _handler = handler;
- }
- void SetTask(int data, handler_t handler){
- _data = data;
- _handler = handler;
- }
- void Run(){
- _handler(_data);
- }
- };
-
- class ThreadPool{
- private:
- int _thread_max;
- int _thread_cur;
- bool _tp_quit;
- std::queue
_task_queue; - pthread_mutex_t _lock;
- pthread_cond_t _cond;
- private:
- void LockQueue(){
- pthread_mutex_lock(&_lock);
- }
- void UnLockQueue(){
- pthread_mutex_unlock(&_lock);
- }
- void WakeUpOne(){
- pthread_cond_signal(&_cond);
- }
- void WakeUpAll(){
- pthread_cond_broadcast(&_cond);
- }
- void ThreadQuit(){
- _thread_cur--;
- UnLockQueue();
- pthread_exit(NULL);
- }
- void ThreadWait(){
- if(_tp_quit)
- ThreadQuit();
- pthread_cond_wait(&_cond, &_lock);
- }
- bool IsEmpty(){
- return _task_queue.empty();
- }
-
- static void* thr_start(void* arg){
- ThreadPool* tp = (ThreadPool*) arg;
- while(1){
- tp->LockQueue();
- while(tp->IsEmpty()){
- tp->ThreadWait();
- }
- ThreadTask* tt;
- tp->PopTask(&tt);
- tp->UnLockQueue();
- tt->Run();
- delete tt;
- }
- return NULL;
- }
-
- public:
- ThreadPool(int max = MAX_THREAD)
- :_thread_max(max), _thread_cur(max),_tp_quit(false){
- pthread_mutex_init(&_lock, NULL);
- pthread_cond_init(&_cond, NULL);
- }
- ~ThreadPool(){
- pthread_mutex_destroy(&_lock);
- pthread_cond_destroy(&_cond);
- }
- bool PoolInit(){
- pthread_t tid;
- for(int i=0; i<_thread_max; i++){
- int ret = pthread_create(&tid, NULL, thr_start, this);
- if(ret !=0 ){
- std::cout<<"create pool thread error\n";
- return false;
- }
- }
- return true;
- }
- bool PushTask(ThreadTask* tt){
- LockQueue();
- if(_tp_quit){
- UnLockQueue();
- return false;
- }
- _task_queue.push(tt);
- WakeUpOne();
- UnLockQueue();
- return true;
- }
- bool PopTask(ThreadTask** tt){
- *tt = _task_queue.front();
- _task_queue.pop();
- return true;
- }
- bool PoolQuit(){
- LockQueue();
- _tp_quit = true;
- UnLockQueue();
- while(_thread_cur > 0){
- WakeUpAll();
- usleep(1000);
- }
- return true;
- }
- };
- #endif
- //main.cpp
- #include "threadpool.hpp"
- bool handler(int data){
- srand(time(NULL));
- int n = rand()%5;
- printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n);
- sleep(n);
- return true;
- }
-
- int main(){
- ThreadPool pool;
- pool.PoolInit();
- int i;
- for(i=0; i<10; i++){
- ThreadTask* tt = new ThreadTask(i, handler);
- pool.PushTask(tt);
- }
- pool.PoolQuit();
- return 0;
- }
- [wz@192 Desktop]$ g++ -std=c++0x main.cpp -o test -pthread -lrt
- [wz@192 Desktop]$ ./test
- Thread: 0x7f6e23c8f700 Run Tast: 0--sleep 4 sec
- Thread: 0x7f6e25c93700 Run Tast: 4--sleep 4 sec
- Thread: 0x7f6e24490700 Run Tast: 1--sleep 0 sec
- Thread: 0x7f6e24490700 Run Tast: 5--sleep 4 sec
- Thread: 0x7f6e24c91700 Run Tast: 3--sleep 4 sec
- Thread: 0x7f6e25492700 Run Tast: 2--sleep 4 sec
- Thread: 0x7f6e25c93700 Run Tast: 6--sleep 2 sec
- Thread: 0x7f6e24490700 Run Tast: 8--sleep 2 sec
- Thread: 0x7f6e23c8f700 Run Tast: 9--sleep 2 sec
- Thread: 0x7f6e24c91700 Run Tast: 7--sleep 2 sec
九,线程安全单例模式
单例模式是经典常用的设计模式,其特点是某些类只应该具有一个对象(实例)称为单例;
饿汉方式实现单例模式
- //只要通过Singleton这个包装类来使用T对象,则一个进程中只有一个T对象的实例;
- template <typename T>
- class Singleton{
- static T data;
- public:
- static T* GetInstance(){
- return &data;
- }
- };
懒汉方式实现单例模式,核心思想是“延时加载”,从而优化服务器启动速度;
- //存在一个严重的问题,线程不安全;
- //第一次调用GetInstance时,如两个线程同时调用,可能会创建两份T的对象的实例;但后续再次调用,没问题;
- template <typename T>
- class Singleton{
- static T* inst;
- public:
- static T* GetInstance(){
- if(inst == NULL)
- inst = new T();
- return inst;
- }
- };
懒汉方式实现单例模式(线程安全版本)
- template <typename T>
- class Singleton{
- volatile static T* inst; //设置volatile,否则可能会被编译器优化
- static std::mutex lock;
- public:
- static T* GetInstance(){
- //双重判定空指针,降低锁冲突的概率,提高性能
- //使用互斥锁,保证多线程只能调用异常new
- if(inst == NULL){
- lock.lock();
- if(inst == NULL)
- inst = new T();
- lock.unlock();
- }
- return inst;
- }
- };
十,STL、智能指针和线程安全
STL中的容器不是线程安全的,STL设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响;而且对于不同的容器,加锁方式的不同,性能可能也不同(如hash表的锁表和锁桶);因此STL默认不是线程安全的,如需在多线程下使用,往往要调用者自行保证线程安全;
智能指针,对于unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题;对于shared_ptr,多个对象需共用一个引用计数变量,所以会存在线程安全问题,但标准库实现时考虑到了这个问题,基于原子操作的方式保证shared_ptr能够高效、原子的操作引用计数;
- 悲观锁,在每次取数据时,总是担心数据会被其他线程修改,所以取数据前先加锁(读锁、写锁、行锁等),当其他线程想要范围数据时,被阻塞挂起;
- 乐观锁,每次取数据时,总是乐观的认为数据不会被其他线程修改,因此不上锁,但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改,只要采用两种方式:版本号机制和CAS操作;
- CAS操作,当需要更新数据时,判断当前内存值和之前取得的值是否相等,如相等则用新值更新,如不相等则失败,失败则重试,一般是一个自旋的过程,即不断重试;
- 自旋锁,公平锁、非公平锁;
十一,读者写者问题
读写锁,在编写多线程的时候,有一种情况是十分常见的,那就是有些公共数据修改的机会比较少,相比较改写,它们读的机会反而高的多;通常在读的过程中,往往伴随着查找的操作,中间耗时很长;给这种代码段加锁,会极大地降低程序的效率;读写锁就是专门处理这种多读少写的情况;

读写锁接口
- //设置读写优先
- int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t* attr, int pref);
- pref 有三种选择
- PTHREAD_RWLOCK_PREFER_READER_NP,默认设置,读者优先,可能会导致写者饥饿情况;
- PTHREAD_RWLOCK_PREFER_WRITER_NP,写者优先,目前有BUG,导致表现行为和上一种选择一致;
- PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP,写者优先,但写者不能递归加锁;
- //初始化
- int pthread_rwlock_init(pthread_rwlock_t* restrict rwlock, const pthread_rwlockattr_t* restrict attr);
- //销毁
- int pthread_rwlock_destroy(pthread_rwlock_t* rwlock);
- //加锁和解锁
- int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
- int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);
- int pthread_rwlock_unlock(pthread_rwlock_t* rwlock);
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- volatile int ticket = 1000;
- pthread_rwlock_t rwlock;
-
- void* reader(void* arg){
- char* id = (char*) arg;
- while(1){
- pthread_rwlock_rdlock(&rwlock);
- if(ticket <= 0){
- pthread_rwlock_unlock(&rwlock);
- break;
- }
- printf("%s: %d\n", id, ticket);
- pthread_rwlock_unlock(&rwlock);
- usleep(1);
- }
- return nullptr;
- }
-
- void* writer(void* arg){
- char* id = (char*)arg;
- while(1){
- pthread_rwlock_wrlock(&rwlock);
- if(ticket <= 0){
- pthread_rwlock_unlock(&rwlock);
- break;
- }
- printf("%s: %d\n", id, --ticket);
- pthread_rwlock_unlock(&rwlock);
- usleep(1);
- }
- return nullptr;
- }
-
- struct ThreadAttr{
- pthread_t tid;
- std::string id;
- };
-
- std::string create_reader_id(std::size_t i){
- std::ostringstream oss("thread reader ", std::ios_base::ate);
- oss<
- return oss.str();
- }
-
- std::string create_writer_id(std::size_t i){
- std::ostringstream oss("thread writer ", std::ios_base::ate);
- oss<
- return oss.str();
- }
-
- void init_readers(std::vector
& vec) { - for(std::size_t i=0; i
size(); i++){ - vec[i].id = create_reader_id(i);
- pthread_create(&vec[i].tid, nullptr, reader, (void*)vec[i].id.c_str());
- }
- }
-
- void init_writers(std::vector
& vec) { - for(std::size_t i=0; i
size(); i++){ - vec[i].id = create_writer_id(i);
- pthread_create(&vec[i].tid, nullptr, writer, (void*)vec[i].id.c_str());
- }
- }
-
- void join_threads(std::vector
const& vec) { - for(std::vector
::const_reverse_iterator it = vec.rbegin(); it!=vec.rend(); it++){ - pthread_t const& tid = it->tid;
- pthread_join(tid, nullptr);
- }
- }
-
- void init_rwlock(){
- #if 0
- pthread_rwlockattr_t attr;
- pthread_rwlockattr_init(&attr);
- pthread_rwlockattr_setkind_np(&attr, PHTREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
- pthread_rwlock_init(&rwlock, &attr);
- pthread_rwlockattr_destroy(&attr);
- #else
- pthread_rwlock_init(&rwlock, nullptr);
- #endif
- }
-
- int main(){
- const std::size_t reader_nr = 1000;
- const std::size_t writer_nr = 2;
-
- std::vector
readers(reader_nr) ; - std::vector
writers(writer_nr) ; -
- init_rwlock();
-
- init_readers(readers);
- init_writers(writers);
-
- join_threads(writers);
- join_threads(readers);
-
- pthread_rwlock_destroy(&rwlock);
- }
- [wz@192 Desktop]$ g++ -std=c++11 -Wall -Werror main.cpp -o test -lpthread
- [wz@192 Desktop]$ ./test
- thread reader 259: 1000
- thread reader 707: 1000
- thread reader 978: 1000
- thread reader 498: 1000
- thread reader 980: 1000
- thread reader 499: 1000
- ...
-
相关阅读:
Swift data范围截取问题
如何理解单例模式?
SpringBoot定时任务 - Spring自带的定时任务是如何实现的?有何注意点?
沙利文&头豹研报|腾讯安全威胁情报中心TIX位居领导者,创新指数排名第一!
IDEA Gradle Lombok错误:找不到符号 setter getter方法没有
“策略+模型“评级流程详析(含数据字典)
【navicat 密码查看】小技巧navicat 如何查看密码
第3周学习:ResNet+ResNeXt
stm32之1602+DHT11+继电器
react antd 一些问题和要注意的地方
-
原文地址:https://blog.csdn.net/NapoleonCoder/article/details/132866853