尽管通常我们认为一个进程只有单一的控制流,但是在现代系统中,一个进程实际上可以由多个称为 线程 的执行单元组成,每个线程都运行在进程的上下文中,并共享同样的代码和全局数据。由于网络服务器中对并行处理的需求,线程成为越来越重要的编程模型,因为多线程之间比多进程之间更容易共享数据,也因为线程一般来说都比进程更高效。当有多处理器可用的时候,多线程也是一种使得程序可以运行得更快的方法。
线程(thread) 就是运行在进程上下文中的逻辑流。线程由内核自动调度。每个线程都有它自己的 线程上下文(thread context),包括一个唯一的整数 线程 ID(Thread ID, TID)、栈、栈指针、程序计数器、通用目的寄存器和条件码。所有的运行在一个进程里的线程共享该进程的整个虚拟地址空间。
Linux 认为:进程和线程没有概念上的区分,只有一个概念——执行流。
在 Linux 中,线程是用进程模拟的,准确来说,是用进程的 PCB 模拟的。也就是说,Linux没有为线程设计新的 TCB,线程和进程都是用的同样的结构体 task_struct 描述的。
我们知道,fork 一个进程内核会为为你创建一个新的进程,包括新的虚拟地址空间,新的PCB、页表等。而创建一个新的线程,则只是创建一个新的 PCB,多个 PCB 共享同一个虚拟地址空间,一个进程的代码相当于被这多个 PCB 分割了。
之前我们说,进程 = 内核数据结构(PCB) + 进程对应的代码和数据,现在我们有了一个全新的理解:
从内核的视角:进程 = 承担分配系统资源的基本实体(进程的基座属性)。换句话说,不只是 PCB,还包括虚拟地址空间,页表,物理内存的数据和代码,这些全部合起来才叫进程。
线程是什么呢?线程 = 调度的基本单位,CPU 在调度的时候只能看到 task_struct,一个 task_struct 代表着进程中的一个执行流,也就是线程。
我们以前写的代码,都是一个进程只有一个执行流,这样的进程叫做 单执行流进程。对应的,内部有多个执行流的进程叫做 多执行流进程
总结:
线程共享进程数据,但也拥有自己的一部分数据:
Linux 下没有真正意义的线程,所以也就没有易于用户使用的线程相关的系统调用接口,但是有原生线程库,提供了封装好的线程接口供我们使用。Linux 默认都带有这个库。
创建线程:
NAME
pthread_create - create a new thread
SYNOPSIS
#include
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
Compile and link with -pthread.
// 返回值:成功:返回0,失败:返回一个错误码
thread: 输出型参数,表示线程 id
attr: 线程属性,我们不管,可以直接设置为 NULL
start_routine: 函数指针,线程调用的入口
arg: 传入 start_routine 函数的参数
线程等待,一个线程被创建了,最后一定要 join,否则就会造成如进程那样的内存泄漏问题。
NAME
pthread_join - join with a terminated thread
SYNOPSIS
#include
int pthread_join(pthread_t thread, void **retval);
Compile and link with -pthread.
// 返回值:成功:返回0,失败:返回错误码
thread: 线程 id,表示你要等的线程
retval: 退出结果
例子:
创建两个线程,然后让三个线程各自循环打印对应的信息。
#include
#include
#include
#include
using namespace std;
void* callback1(void* args)
{
string name = (char*)args;
while (true)
{
cout << name << endl;
sleep(1);
}
}
void* callback2(void* args)
{
string name = (char*)args;
while (true)
{
cout << name << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1, nullptr, callback1, (void*)"thread 1");
pthread_create(&tid2, nullptr, callback2, (void*)"thread 2");
while (true)
{
cout << "我是主线程..." << endl;
sleep(1);
}
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
return 0;
}
编译的时候一定要链接线程库
g++ -o mythread mythread.cpp -lpthread -std=c++11
运行结果:
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
我是主线程...
thread 2
thread 1
thread 2
我是主线程...
thread 1
thread 2
我是主线程...
thread 1
thread 2
我是主线程...
thread 1
thread 2
我是主线程...
在运行的同时,使用 ps -aL 查看线程
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ps -aL
PID LWP TTY TIME CMD
6876 6876 pts/3 00:00:00 mythread
6876 6877 pts/3 00:00:00 mythread
6876 6878 pts/3 00:00:00 mythread
7045 7045 pts/4 00:00:00 ps
LWP 意为轻量级进程(Lightweight process),其实就是 Linux 下的线程,它的编号如果与 PID 相等,那么这个线程就是主线程,另外两个线程的 PID 也与主线程相等,因为它们都是在同一个进程里运行的。
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, callback1, (void*)"thread 1");
cout << "new thread id: " << tid << endl;
while (true)
{
cout << "main thread 正在运行..." << endl;
sleep(1);
}
return 0;
}
通过如上代码来查看线程 id
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
new thread id: 140508908504832
main thread 正在运行...
thread 1: 32051
可以看到线程 id 是一个非常大的值。为什么会这么大呢?这个问题我们稍后再谈。
pthread_self 查看线程 id
NAME
pthread_self - obtain ID of the calling thread
SYNOPSIS
#include
pthread_t pthread_self(void);
Compile and link with -pthread.
下面我们来看 pthread_join 的第二个参数 retval:
它是一个二级指针,是一个输出型参数,因为 start_routine 函数的返回值时 void*,作为一个输出型参数,要想拿到这个返回值,自然就是 void** 类型的。
例子:
void* startRoutine(void* args)
{
string name = (char*)args;
int cnt = 3;
while (cnt--)
{
printTid(name, pthread_self());
sleep(1);
}
cout << "线程退出了..." << endl;
// 1. 线程退出方式,return
return (void*)111;
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, startRoutine, (void*)"thread 1");
void* ret = nullptr;
pthread_join(tid, &ret);
cout << "main thread join success, *ret: " << (long long)ret << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
thread 1 正在运行, thread id: 0xe5b0f700
thread 1 正在运行, thread id: 0xe5b0f700
thread 1 正在运行, thread id: 0xe5b0f700
线程退出了...
main thread join success, *ret: 111
线程退出方式2:使用 pthread_exit
NAME
pthread_exit - terminate calling thread
SYNOPSIS
#include
void pthread_exit(void *retval);
Compile and link with -pthread.
retval: 退出码
注意:pthread_exit 是专用来退出某个线程的。而 exit 是用来退出整个进程的,任何一个线程调用 exit 都会使整个进程退出。
线程退出方式3:使用 pthread_cancel
NAME
pthread_cancel - send a cancellation request to a thread
SYNOPSIS
#include
int pthread_cancel(pthread_t thread);
Compile and link with -pthread.
thread: 线程 id
例子:
新线程跑3秒后使用 pthread_cancel 终止
void* startRoutine(void* args)
{
string name = (char*)args;
while (true)
{
printTid(name, pthread_self());
sleep(1);
}
}
int main()
{
pthread_t tid;
pthread_create(&tid, nullptr, startRoutine, (void*)"thread 1");
sleep(3);
pthread_cancel(tid);
void* ret = nullptr;
pthread_join(tid, &ret);
cout << "main thread join success, *ret: " << (long long)ret << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
thread 1 正在运行, thread id: 0x46298700
thread 1 正在运行, thread id: 0x46298700
thread 1 正在运行, thread id: 0x46298700
main thread join success, *ret: -1
使用 pthread_cancel 退出的进程退出码为 -1
上面我们提到,线程 id 是一个非常大的值,其实这个值是一个地址,该地址处存储了线程的相关信息。这里的线程 id 是用户级别的 id,属于NPTL线程库的范畴,线程库的后续操作,就是根据该线程ID来操作线程的
首先要明确几点
在全局变量前面加 __thread,多个线程对它取地址,得到的地址是不同的
像这样的变量并不是所有线程共享的,它由各线程局部存储。
例子:
__thread int global_value = 100;
void* startRoutine(void* args)
{
while (true)
{
cout << "thread: " << pthread_self() << " global_value: " << global_value << " &global_value: " << &global_value << endl;
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 3");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
thread: 140564086306560 global_value: 100 &global_value: 0x7fd7a06eb6fc
thread: 140564077913856 global_value: 100 &global_value: 0x7fd79feea6fc
thread: 140564069521152 global_value: 100 &global_value: 0x7fd79f6e96fc
使用 pthread_detach 分离线程
NAME
pthread_detach - detach a thread
SYNOPSIS
#include
int pthread_detach(pthread_t thread);
Compile and link with -pthread.
// 返回值:成功:返回0,失败:返回错误码
thread: 要分离的线程 id
例子:
void* startRoutine(void* args)
{
pthread_detach(pthread_self());
cout << "线程已分离" << endl;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 3");
sleep(1);//注意这里的sleep,如果没有这个sleep,主线程可能在其他线程分离之前就把它们给join了
int n = pthread_join(tid1, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid2, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid3, nullptr);
cout << n << ":" << strerror(n) << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
线程已分离
线程已分离
线程已分离
22:Invalid argument
22:Invalid argument
22:Invalid argument
可以看到,线程成功分离,之后调用 pthread_join 函数都失败并返回错误码了。
因为执行流很混乱,其实我们更倾向于让主线程分离其他线程:
void* startRoutine(void* args)
{
while (true)
{
sleep(1);
}
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&tid2, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&tid3, nullptr, startRoutine, (void*)"thread 3");
pthread_detach(tid1);
pthread_detach(tid2);
pthread_detach(tid3);
int n = pthread_join(tid1, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid2, nullptr);
cout << n << ":" << strerror(n) << endl;
n = pthread_join(tid3, nullptr);
cout << n << ":" << strerror(n) << endl;
return 0;
}
[CegghnnoR@VM-4-13-centos 2022_11_11]$ ./mythread
22:Invalid argument
22:Invalid argument
22:Invalid argument
注意:一般我们分离线程,对应的 main thread 不退出。主线程代表了整个进程,主线程退出相当于进程退出,其他线程都会跟着退出。
这里补充一点:临界资源的访问可能会出现数据不一致的问题
例如有如下抢票系统:
创建三个线程,让它们都对全局变量减减,表示抢票操作
int tickets = 1000;
void *getTickets(void *args)
{
string name = (char*)args;
while (true)
{
// 临界区
if (tickets > 0)
{
cout << name << "抢到了票,编号为:" << tickets << endl;
--tickets;
}
else
{
cout << name << "抢不到票,票已抢完" << endl;
break;
}
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_create(&tid1, nullptr, getTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*)"thread 3");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
这个程序有一个漏洞,问题在于 --tickets ,在语言层面上,我们觉得它是一条语句,但是在系统层面,它其实要分三步来完成:
如 线程A 完成了第1、2两步,取tickets的值1000在CPU中减减到999,然后正要做第3步,还没把999写回到内存呢,线程就被切换了,线程B 过来就又取到了 1000,然后在CPU中减减到999,写回内存,此时 tickets 在内存中的值是999,而两个线程各取一次,预期值应该是998,二者不符,程序错误。
怎么解决这个问题呢?
很简单,我们只要保证 --tickets 这个操作具有原子性,即要么不做,要么就一次做到底。
即要让程序在执行 --tickets 期间不会被打扰,具体的解决方案就是加锁
问题复现
当面的代码出错的频率不是很高,下面我们多加一个线程,并使用usleep系统调用来增加线程切换的次数。
int tickets = 1000;
void *getTickets(void *args)
{
string name = (char*)args;
while (true)
{
// 临界区
if (tickets > 0)
{
usleep(1000);
cout << name << "抢到了票,编号为:" << tickets << endl;
--tickets;
}
else
{
cout << name << "抢不到票,票已抢完" << endl;
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1, nullptr, getTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*)"thread 3");
pthread_create(&tid4, nullptr, getTickets, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
//...
thread 3抢到了票,编号为:5
thread 4抢到了票,编号为:4
thread 2抢到了票,编号为:3
thread 1抢到了票,编号为:2
thread 3抢到了票,编号为:1
thread 3抢不到票,票已抢完
thread 4抢到了票,编号为:0
thread 4抢不到票,票已抢完
thread 2抢到了票,编号为:-1
thread 2抢不到票,票已抢完
thread 1抢到了票,编号为:-1
thread 1抢不到票,票已抢完
可以看到,运行到最后甚至出现了负数。
锁的类型:pthread_mutex_t
锁的初始化:
NAME
pthread_mutex_destroy, pthread_mutex_init - destroy and initialize a mutex
SYNOPSIS
#include
int pthread_mutex_destroy(pthread_mutex_t *mutex); // 锁的销毁
int pthread_mutex_init(pthread_mutex_t *restrict mutex, // 局部锁
const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 全局锁
NAME
pthread_mutex_lock, pthread_mutex_trylock, pthread_mutex_unlock - lock and unlock a mutex
SYNOPSIS
#include
int pthread_mutex_lock(pthread_mutex_t *mutex); // 加锁,阻塞式
int pthread_mutex_trylock(pthread_mutex_t *mutex); // 加锁,非阻塞式
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 解锁
下面对上面的代码进行加锁:
int tickets = 1000;
// 创建锁
pthread_mutex_t mutex;
void *getTickets(void *args)
{
string name = (char*)args;
while (true)
{
// 临界区
// 加锁
pthread_mutex_lock(&mutex);
if (tickets > 0)
{
usleep(1000);
cout << name << "抢到了票,编号为:" << tickets << endl;
--tickets;
// 解锁,分支语句往往每个分支都要加,防止跳过导致漏解锁
pthread_mutex_unlock(&mutex);
}
else
{
cout << name << "抢不到票,票已抢完" << endl;
// 解锁
pthread_mutex_unlock(&mutex);
break;
}
}
return nullptr;
}
int main()
{
// 锁的初始化
pthread_mutex_init(&mutex, nullptr);
pthread_t tid1;
pthread_t tid2;
pthread_t tid3;
pthread_t tid4;
pthread_create(&tid1, nullptr, getTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, getTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, getTickets, (void*)"thread 3");
pthread_create(&tid4, nullptr, getTickets, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
// 销毁锁
pthread_mutex_destroy(&mutex);
return 0;
}
注意要创建锁,然后别忘了写上初始化和销毁。
关于加锁,要注意以下几点:
在 C++ 中,我们可以像封装智能指针那样封装锁:
#pragma once
#include
#include
using namespace std;
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&lock_, nullptr);
}
void lock()
{
pthread_mutex_lock(&lock_);
}
void unlock()
{
pthread_mutex_unlock(&lock_);
}
~Mutex()
{
pthread_mutex_destroy(&lock_);
}
private:
pthread_mutex_t lock_;
};
class LockGuard
{
public:
LockGuard(Mutex* mutex)
: mutex_(mutex)
{
mutex_->lock();
}
~LockGuard()
{
mutex_->unlock();
}
private:
Mutex* mutex_;
};
例子:
#include
#include "Lock.hpp"
int tickets = 1000;
// 创建锁
Mutex mutex;
bool getTickets()
{
bool ret = false;
// 加锁
LockGuard lockGuard(&mutex);
if (tickets > 0)
{
usleep(1001);
cout << "thread: " << pthread_self() << " get a ticket" << tickets << endl;
--tickets;
ret = true;
}
return ret;
}
void* startRoutine(void* args)
{
const char* name = static_cast<const char*>(args);
while (true)
{
if (!getTickets()) break;
cout << name << " get tickets success" << endl;
usleep(100);
}
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 1");
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 2");
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 3");
pthread_create(&t1, nullptr, startRoutine, (void*)"thread 4");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_join(t4, nullptr);
return 0;
}
C++11 的线程库就是像这样封装了系统接口。
简单来说:可重入函数可以多线程调用,不可重入函数不可以多线程调用,否则会出现线程安全问题。
常见的线程不安全的情况:
常见的线程安全的情况:
常见不可重入的情况:
常见可重入的情况:
注意:
死锁是指一组线程中的各个线程互相申请被其他线程所占用的不会释放的资源而处于一种永久等待的状态。
例子:
让两个线程分别申请A锁和B锁,等待1s后再让它们互相申请已被对方占用的锁。
pthread_mutex_t mutexA = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutexB = PTHREAD_MUTEX_INITIALIZER;
void* startRoutine1(void* args)
{
while (true)
{
pthread_mutex_lock(&mutexA);
sleep(1);
pthread_mutex_lock(&mutexB);
cout << "线程1, tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexA);
pthread_mutex_unlock(&mutexB);
}
}
void* startRoutine2(void* args)
{
while (true)
{
pthread_mutex_lock(&mutexB);
sleep(1);
pthread_mutex_lock(&mutexA);
cout << "线程2, tid: " << pthread_self() << endl;
pthread_mutex_unlock(&mutexB);
pthread_mutex_unlock(&mutexA);
}
}
int main()
{
pthread_t t1, t2;
pthread_create(&t1, nullptr, startRoutine1, nullptr);
pthread_create(&t2, nullptr, startRoutine2, nullptr);
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
return 0;
}
运行后程序会一直处于等待状态:

死锁的必要条件:
避免死锁
条件变量是 Linux 中最常用的同步策略。
条件:对应的共享资源的状态。
条件变量:条件满足或不满足的时候,进行 wait 或 signal 的一种方式。
条件变量的类型:pthread_cond_t
条件变量函数:
初始化
// 局部初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
// cond: 要初始化的条件变量
// attr: NULL
// 全局初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
等待
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
// abstime: 可以设定等待时间
通过这个函数也可以知道,条件变量必须和互斥锁一起使用。
唤醒指定线程:
int pthread_cond_signal(pthread_cond_t *cond);
唤醒在 cond 条件变量下等的线程。
int pthread_cond_broadcast(pthread_cond_t *cond);
唤醒在 cond 条件变量下等的所有线程。
例子
创建一个条件变量和一个互斥锁,在下面的例子中,我们没有使用互斥锁,只是作为 pthread_cond_wait 的参数。接下来在 main 函数中创建线程,然后我们每输入一个 n 就唤醒一个线程执行打印。
// 定义一个条件变量和一个互斥锁
pthread_cond_t cond;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* waitCommand(void* args)
{
while (true)
{
pthread_cond_wait(&cond, &mutex); // 让对应的线程等待被唤醒
cout << "thread id: " << pthread_self() << " run..." << endl;
}
}
int main()
{
pthread_cond_init(&cond, nullptr);
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, waitCommand, nullptr);
pthread_create(&t2, nullptr, waitCommand, nullptr);
pthread_create(&t3, nullptr, waitCommand, nullptr);
while (true)
{
char n = 'a';
cout << "请输入你的command(n/q): ";
cin >> n;
if (n == 'n') pthread_cond_signal(&cond); // 让在cond条件变量下等的线程被唤醒
else break;
sleep(1);
}
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
pthread_cond_destroy(&cond);
return 0;
}
运行结果:

可以看到,线程是被轮流唤醒的。
为什么 pthread_cond_wait 需要互斥量?
这一模型中有三类对象,消费者,超市,工厂供应商(生产者),生产者提供货物上架超市,消费者从超市购买商品。超市在中间起到了一个缓冲区的效果。
总结:321原则
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素。当队列为满时,我那个队列里放入元素的操作也会被阻塞,直到有元素被从队列中取出。这一特性类似于管道。
要判断条件是否成立,必然要访问共享资源,要访问共享资源,必然要先申请锁,当判断条件不成立时,就需要进行等待。但是我们不可能让持有着锁的线程等待,这样就造成死锁了。对于这一点,其实系统接口早就帮我们完善了。
pthread_cond_wait 进入等待时,该线程会自动释放该锁,以让其他进程竞争,防止死锁。例子:
#pragma once
#include
#include
#include
#include
#include
using namespace std;
const uint32_t gDefaultCap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap)
: cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&conCond_, nullptr);
pthread_cond_init(&proCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&conCond_);
pthread_cond_destroy(&proCond_);
}
// 生产接口
void push(const T& in)
{
// 加锁
lockQueue();
// 这里必须使用while而不是if, 保证被唤醒时能够再次进行条件判断
while (isFull())
{
proBlockWait();
}
// 不满,可以生产
pushCore(in);
unlockQueue();
// 唤醒消费者
wakeupCon();
}
// 消费接口
T pop()
{
// 加锁
lockQueue();
// 判断是否为空
while (isEmpty())
{
conBlockWait();
}
// 为空,可以消费
T tmp = popCore();
unlockQueue();
// 唤醒生产者
wakeupPro();
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
bool isEmpty()
{
return bq_.empty();
}
bool isFull()
{
return bq_.size() == cap_;
}
void proBlockWait()
{
// 在阻塞线程的时候,会自动释放这把锁
pthread_cond_wait(&proCond_, &mutex_);
}
void conBlockWait()
{
pthread_cond_wait(&conCond_, &mutex_);
}
void wakeupPro()
{
pthread_cond_signal(&proCond_);
}
void wakeupCon()
{
pthread_cond_signal(&conCond_);
}
void pushCore(const T& in)
{
bq_.push(in);
}
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
private:
queue<T> bq_; // 阻塞队列
uint32_t cap_; // 容量
pthread_mutex_t mutex_; // 保护阻塞队列的互斥锁
pthread_cond_t conCond_;// 让消费者等待的条件变量
pthread_cond_t proCond_; // 让生产者等待的条件变量
};
#include "BlockQueue.hpp"
void* consumer(void* args)
{
BlockQueue<int>* pbq = static_cast<BlockQueue<int>*>(args);
while (true)
{
int data = pbq->pop();
cout << "consumer 消费数据完成: " << data << endl;
}
}
void* productor(void* args)
{
BlockQueue<int>* pbq = static_cast<BlockQueue<int>*>(args);
while (true)
{
int data = rand() % 10;
pbq->push(data);
cout << "productor 生产数据完成: " << data << endl;
sleep(2);
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
BlockQueue<int> bq;
pthread_t c, p;
pthread_create(&c, nullptr, consumer, &bq);
pthread_create(&p, nullptr, productor, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
生产者消费者模型的优点:
信号量是一个描述临界资源数量的计数器。其主要有两种操作:
临界资源可以被看做一个整体,也可以被分块,多个线程并发访问临界资源的不同区域并不会出现线程安全问题。
如果我们把信号量初始值设为1,那么申请资源就进行P操作,变为0,归还资源对应V操作,变为1,像这样只有01的信号量称作二元信号量。
申请资源也对应了加锁,归还资源对应释放锁,所以二元信号量和互斥锁是等价的。
如果信号量初始值大于1,那么我们就应该保证临界资源被划分成了多个,不同的线程申请的是不同区域的资源。
初始化信号量
#include
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared: 0表示线程间共享,非0表示进程间共享
// value: 信号量初始值
销毁信号量:
int sem_destroy(sem_t *sem);
等待信号量:
int sem_wait(sem_t *sem);
本质就是如果信号量不为0,那么将信号量的值减1。
发布信号量:
int sem_post(sem_t *sem);
表示资源使用完毕,可以归还资源了,信号量的值加1。
让环形队列作为临界资源,充当”超市“的角色。
当队列为空或满的时候,生产者和消费者指向的是同一个位置,此时需要互斥+同步。
其他时候,都指向的是不同的位置,可以并发访问。
例子:
#pragma once
#include
#include
#include
#include
using namespace std;
const int gCap = 5;
template<class T>
class RingQueue
{
public:
RingQueue(int cap = gCap)
: ringqueue_(cap)
, pIndex_(0)
, cIndex_(0)
{
sem_init(&roomSem_, 0, ringqueue_.size());
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pmutex_, nullptr);
pthread_mutex_init(&cmutex_, nullptr);
}
// 生产
void push(const T& in)
{
sem_wait(&roomSem_);
pthread_mutex_lock(&pmutex_); // 加锁,保证生产者和生产者之间的互斥
ringqueue_[pIndex_] = in;
++pIndex_;
pIndex_ %= ringqueue_.size();
pthread_mutex_unlock(&pmutex_);
sem_post(&dataSem_);
}
// 消费
T pop()
{
sem_wait(&dataSem_);
pthread_mutex_lock(&cmutex_); // 加锁,保证消费者和消费者之间的互斥
T temp = ringqueue_[cIndex_];
++cIndex_;
cIndex_ %= ringqueue_.size();
pthread_mutex_unlock(&cmutex_);
sem_post(&roomSem_);
return temp;
}
~RingQueue()
{
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
vector<T> ringqueue_; // 唤醒队列
sem_t roomSem_; // 空间计数器
sem_t dataSem_; // 数据计数器
uint32_t pIndex_; // 当前生产者写入位置
uint32_t cIndex_; // 当前消费者读取位置
pthread_mutex_t pmutex_;
pthread_mutex_t cmutex_;
};
#include "RingQueue.hpp"
#include
#include
#include
void* productor(void* args)
{
RingQueue<int>* rqp = static_cast<RingQueue<int>*>(args);
while (true)
{
int data = rand() % 10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "]" << "生产了一个数据: " << data << endl;
sleep(1);
}
}
void* consumer(void* args)
{
RingQueue<int>* rqp = static_cast<RingQueue<int>*>(args);
while (true)
{
int data = rqp->pop();
cout << "pthread[" << pthread_self() << "]" << "消费了一个数据: " << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr));
RingQueue<int> rq;
pthread_t c1, c2, c3, p1, p2, p3;
pthread_create(&p1, nullptr, productor, &rq);
pthread_create(&p2, nullptr, productor, &rq);
pthread_create(&p3, nullptr, productor, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
线程池是一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建于销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。
线程池的一个简易实现:
#pragma once
#include
#include
#include
#include
#include
#include
#include
using namespace std;
int gThreadNum = 5;
template<class T>
class ThreadPool
{
public:
ThreadPool(int threadNum = gThreadNum)
: isStart_(false)
, threadNum_(threadNum)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
// 类内成员,设置为static以去掉隐含的this指针,this指针只能手动传入。
static void* threadRoutine(void* args)
{
pthread_detach(pthread_self());
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
while (1)
{
tp->lockQueue();
while (!tp->haveTack())
{
tp->waitForTask();
}
T t = tp->pop();
tp->unlockQueue();
t.run(); // 规定:所有的任务都有一个run方法
}
}
// 运行线程池,创建线程
void start()
{
assert(!isStart_);
for (int i = 0; i < threadNum_; ++i)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
// 派发任务
void push(const T& in)
{
lockQueue();
taskQueue_.push(in);
choiceThreadForHandler();
unlockQueue();
}
private:
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
bool haveTack() { return !taskQueue_.empty(); }
void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
void choiceThreadForHeadler() { pthread_cond_signal(&cond_); }
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
private:
bool isStart_;
int threadNum_;
queue<T> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
};