首先,对于任何一个进程来讲,即便我们没有主动地去创建线程,进程也是默认有一个主线程的。
线程是负责执行二进制指令的,而进程管的比线程多多了,除了执行指令外,还有内存管理,文件系统等。默认情况下,你可以建一个大的任务,就是完成某某功能,然后交给一个人让它从头做到尾,这就是主线程。但是有时候,你发现任务是可以拆解的,如果相关性没有非常大前后关联关系,就可以并行执行。
为什么不能直接使用进程来实现并行?
- 创建进程需要占用的资源太多;
- 进程之间的通信需要数据在不同的内存空间传来传去,无法共享。
在 Linux 中,有时候我们希望将前台的任务和后台的任务分开。因为有些任务是需要马上返回结果的,例如你输入了一个字符,不可能五分钟再显示出来;而有些任务是可以默默执行的,例如将本机的数据同步到服务器上去,这个就没刚才那么着急。因此这样两个任务就应该在不同的线程处理,以保证互不耽误。
假如说,现在我们有 N 个非常大的视频需要下载,一个个下载需要的时间太长了。按照并行的思路,我们可以拆分成 N 个任务,分给 N 个线程各自去下载。
如何指导进程干活呢?这需要一个函数,我们将要执行的子任务放在这个函数里面,比如上面的下载任务。
这个函数参数是 void 类型的指针,用于接收任何类型的参数。我们就可以将要下载的文件的文件名通过这个指针传给它。当然,这里我们不是真的下载这个文件,而仅仅打印日志,并生成一个一百以内的随机数,作为下载时间返回。这样,每个子任务干活的同时在喊:“我正在下载,终于下载完了,用了多少时间。”
#include
#include
#include
#define NUM_OF_TASKS 5
void *downloadfile(void *filename)
{
printf("I am downloading the file %s!\n", (char *)filename);
sleep(10);
long downloadtime = rand()%100;
printf("I finish downloading the file within %d minutes!\n", downloadtime);
// 线程运行结束,退出线程
pthread_exit((void *)downloadtime);
}
int main(int argc, char *argv[])
{
char files[NUM_OF_TASKS][20]={"file1.avi","file2.rmvb","file3.mp4","file4.wmv","file5.flv"};
// 1、线程对象
pthread_t threads[NUM_OF_TASKS];
int rc;
int t;
int downloadtime;
// 2、线程属性
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
// PTHREAD_CREATE_JOINABLE:主线程程等待这个线程的结束,并获取退出时的状态
pthread_attr_setdetachstate(&thread_attr,PTHREAD_CREATE_JOINABLE);
for(t=0;t<NUM_OF_TASKS;t++){
printf("creating thread %d, please help me to download %s\n", t, files[t]);
// 3、分配线程任务
rc = pthread_create(&threads[t], &thread_attr, downloadfile, (void *)files[t]);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
// 4、运行结束后,销毁线程属性
pthread_attr_destroy(&thread_attr);
for(t=0;t<NUM_OF_TASKS;t++){
// 5、等待线程结束,获取线程退出的返回值
pthread_join(threads[t],(void**)&downloadtime);
printf("Thread %d downloads the file %s in %d minutes.\n",t,files[t],downloadtime);
}
// 6、主线程结束
pthread_exit(NULL);
}
一个运行中的线程可以调用 pthread_exit
退出线程。这个函数可以传入一个参数转换为 (void *)
类型,这是 线程退出的返回值。
接下来,我们来看主线程。在这里面,我列了五个文件名。接下来声明了一个数组,里面有五个 pthread_t
类型的线程对象。
接下来,声明一个线程属性 pthread_attr_t
。我们通过 pthread_attr_init
初始化这个属性,并且设置属性 PTHREAD_CREATE_JOINABLE
。这表示 将来主线程程等待这个线程的结束,并获取退出时的状态。
接下来是一个循环。对于每一个文件和每一个线程,可以调用 pthread_create
创建线程。一共有四个参数,第一个参数是线程对象,第二个参数是线程的属性,第三个参数是线程运行函数,第四个参数是线程运行函数的参数。主线程就是通过第四个参数,将自己的任务派给子线程。
任务分配完毕,每个线程下载一个文件,接下来主线程要做的事情就是等待这些子任务完成。当一个线程退出的时候,就会发送信号给其他所有同进程的线程。 有一个线程使用 pthread_join
获取这个线程退出的返回值。线程的返回值通过 pthread_join
传给主线程,这样子线程就将自己下载文件所耗费的时间,告诉给主线程。
好了,程序写完了,开始编译。多线程程序要依赖于 libpthread.so
。
gcc download.c -lpthread
编译好了,执行一下,就能得到下面的结果。
# ./a.out
creating thread 0, please help me to download file1.avi
creating thread 1, please help me to download file2.rmvb
I am downloading the file file1.avi!
creating thread 2, please help me to download file3.mp4
I am downloading the file file2.rmvb!
creating thread 3, please help me to download file4.wmv
I am downloading the file file3.mp4!
creating thread 4, please help me to download file5.flv
I am downloading the file file4.wmv!
I am downloading the file file5.flv!
I finish downloading the file within 83 minutes!
I finish downloading the file within 77 minutes!
I finish downloading the file within 86 minutes!
I finish downloading the file within 15 minutes!
I finish downloading the file within 93 minutes!
Thread 0 downloads the file file1.avi in 83 minutes.
Thread 1 downloads the file file2.rmvb in 86 minutes.
Thread 2 downloads the file file3.mp4 in 77 minutes.
Thread 3 downloads the file file4.wmv in 93 minutes.
Thread 4 downloads the file file5.flv in 15 minutes.
我们把线程访问的数据细分成三类。下面我们一一来看。
第一类是 线程栈上的本地数据,比如函数执行过程中的局部变量。函数的调用会使用栈的模型,这在线程里面是一样的。只不过 每个线程都有自己的栈空间。
栈的大小可以通过命令 ulimit -a
查看,默认情况下线程栈大小为 8192(8MB)。我们可以使用命令 ulimit -s
修改。对于线程栈,可以通过下面这个函数 pthread_attr_t
,修改线程栈的大小。
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
主线程在内存中有一个栈空间,其他线程栈也拥有独立的栈空间。为了避免线程之间的栈空间踩踏,线程栈之间还会有小块区域,用来隔离保护各自的栈空间。一旦另一个线程踏入到这个隔离区,就会引发段错误。
第二类数据就是 在整个进程里共享的全局数据。例如全局变量,虽然在不同进程中是隔离的,但是在一个进程中是共享的。如果同一个全局变量,两个线程一起修改,那肯定会有问题,有可能把数据改的面目全非。这就需要有一种机制来保护他们,比如你先用我再用。
Mutex,全称 Mutual Exclusion,中文叫 互斥。顾名思义,有你没我,有我没你。它的模式就是在共享数据访问的时候,去申请加把锁,谁先拿到锁,谁就拿到了访问权限,其他人就只好在门外等着,等这个人访问结束,把锁打开,其他人再去争夺,还是遵循谁先拿到谁访问。
#include
#include
#include
#define NUM_OF_TASKS 5
// 初始化资源
int money_of_tom = 100;
int money_of_jerry = 100;
// 第一次运行去掉下面这行
// 声明锁
pthread_mutex_t g_money_lock;
void *transfer(void *notused)
{
pthread_t tid = pthread_self();
printf("Thread %u is transfering money!\n", (unsigned int)tid);
// 第一次运行去掉下面这行
// 抢锁
pthread_mutex_lock(&g_money_lock);
sleep(rand()%10);
money_of_tom+=10;
sleep(rand()%10);
money_of_jerry-=10;
// 第一次运行去掉下面这行
// 放锁
pthread_mutex_unlock(&g_money_lock);
printf("Thread %u finish transfering money!\n", (unsigned int)tid);
pthread_exit((void *)0);
}
int main(int argc, char *argv[])
{
pthread_t threads[NUM_OF_TASKS];
int rc;
int t;
// 第一次运行去掉下面这行
// 初始化锁
pthread_mutex_init(&g_money_lock, NULL);
for(t=0;t<NUM_OF_TASKS;t++){
rc = pthread_create(&threads[t], NULL, transfer, NULL);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(t=0;t<100;t++){
// 第一次运行去掉下面这行
// 抢锁
pthread_mutex_lock(&g_money_lock);
printf("money_of_tom + money_of_jerry = %d\n", money_of_tom + money_of_jerry);
// 第一次运行去掉下面这行
// 放锁
pthread_mutex_unlock(&g_money_lock);
}
// 第一次运行去掉下面这行
// 销毁锁
pthread_mutex_destroy(&g_money_lock);
pthread_exit(NULL);
}
这里,有两个员工 Tom 和 Jerry,公司食堂的饭卡里面各自有 100 元,并行启动 5 个线程,都可以看到,中间有很多状态不正确,比如两个人的账户之和出现了超过 200 的情况,也就是 Tom 转入了,Jerry 还没转出。是 Jerry 转 10 元给 Tom,主线程不断打印 Tom 和 Jerry 的资金之和。按说,这样的话,总和应该永远是 200 元。
在上面的程序中,我们先去掉 mutex 相关的行,就像注释里面写的那样。在没有锁的保护下,在 Tom 的账户里面加上 10 元,在 Jerry 的账户里面减去 10 元,这不是一个原子操作。
root@deployer createthread]# ./a.out
Thread 508479232 is transfering money!
Thread 491693824 is transfering money!
Thread 500086528 is transfering money!
Thread 483301120 is transfering money!
Thread 516871936 is transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 220
money_of_tom + money_of_jerry = 220
money_of_tom + money_of_jerry = 230
money_of_tom + money_of_jerry = 240
Thread 483301120 finish transfering money!
money_of_tom + money_of_jerry = 240
Thread 508479232 finish transfering money!
Thread 500086528 finish transfering money!
money_of_tom + money_of_jerry = 220
Thread 516871936 finish transfering money!
money_of_tom + money_of_jerry = 210
money_of_tom + money_of_jerry = 210
Thread 491693824 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
可以看到,中间有很多状态不正确,比如两个人的账户之和出现了超过 200 的情况,也就是 Tom 转入了,Jerry 还没转出。
接下来我们在上面的代码里面,加上 mutex,然后编译、运行,就得到了下面的结果。
[root@deployer createthread]# ./a.out
Thread 568162048 is transfering money!
Thread 576554752 is transfering money!
Thread 551376640 is transfering money!
Thread 542983936 is transfering money!
Thread 559769344 is transfering money!
Thread 568162048 finish transfering money!
Thread 576554752 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
Thread 542983936 finish transfering money!
Thread 559769344 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
Thread 551376640 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
这个结果就正常了。两个账号之和永远是 200。这下你看到锁的作用了吧?
使用 Mutex,首先要使用 pthread_mutex_init
函数初始化这个 mutex,初始化后,就可以用它来保护共享变量了。
pthread_mutex_lock()
就是去抢那把锁的函数,如果抢到了,就可以执行下一行程序,对共享变量进行访;如果没抢到,就被阻塞在那里等待。
如果不想被阻塞,可以使用 pthread_mutex_trylock
去抢那把锁,如果抢到了,就可以执行下一行程序,对共享变量进行访问;如果没抢到,不会被阻塞,而是返回一个错误码。
当共享数据访问结束了,别忘了使用 pthread_mutex_unlock
释放锁,让给其他人使用,最终调用 pthread_mutex_destroy
销毁掉这把锁。
你这个老板,招聘了三个员工,但是你不是有了活才去招聘员工,而是先把员工招来,没有活的时候员工需要在那里等着,一旦有了活,你要去通知他们,他们要去抢活干(为啥要抢活?因为有绩效呀!),干完了再等待,你再有活,再通知他们。
条件变量 其实就是一种 通知机制,当某个共享数据到达某个值的时候,通知等待该共享数据到达该值的线程来处理该共享数据。
条件变量一般是用于:
的访问临界资源(临界资源指的是同时只允许一个线程访问的共享资源)。条件变量的使用一般是需要结合互斥锁来进行(条件变量相关函数的输入参数就是互斥锁),从而使得各个线程能够互斥的访问该临界资源。
#include
#include
#include
#define NUM_OF_TASKS 3
#define MAX_TASK_QUEUE 11
char tasklist[MAX_TASK_QUEUE]="ABCDEFGHIJ";
int head = 0;
int tail = 0;
int quit = 0;
// 声明锁
pthread_mutex_t g_task_lock;
// 声明条件变量
pthread_cond_t g_task_cv;
void *coder(void *notused)
{
pthread_t tid = pthread_self();
while(!quit){
// 加锁
pthread_mutex_lock(&g_task_lock);
while(tail == head){
// 如果当前没有任务
if(quit){
// 解锁
pthread_mutex_unlock(&g_task_lock);
pthread_exit((void *)0);
}
printf("No task now! Thread %u is waiting!\n", (unsigned int)tid);
// 等待被叫醒
pthread_cond_wait(&g_task_cv, &g_task_lock);
printf("Have task now! Thread %u is grabing the task !\n", (unsigned int)tid);
}
char task = tasklist[head++];
// 解锁
pthread_mutex_unlock(&g_task_lock);
printf("Thread %u has a task %c now!\n", (unsigned int)tid, task);
sleep(5);
printf("Thread %u finish the task %c!\n", (unsigned int)tid, task);
}
pthread_exit((void *)0);
}
int main(int argc, char *argv[])
{
pthread_t threads[NUM_OF_TASKS];
int rc;
int t;
// 初始化锁
pthread_mutex_init(&g_task_lock, NULL);
// 初始化条件变量
pthread_cond_init(&g_task_cv, NULL);
for(t=0;t<NUM_OF_TASKS;t++){
// 创建线程
rc = pthread_create(&threads[t], NULL, coder, NULL);
if (rc){
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
sleep(5);
// 生产者
for(t=1;t<=4;t++){
// 加锁
pthread_mutex_lock(&g_task_lock);
// 分配任务,生产,操作共享数据
tail+=t;
printf("I am Boss, I assigned %d tasks, I notify all coders!\n", t);
// 通知所有线程
pthread_cond_broadcast(&g_task_cv);
// 解锁
pthread_mutex_unlock(&g_task_lock);
sleep(20);
}
pthread_mutex_lock(&g_task_lock);
quit = 1;
pthread_cond_broadcast(&g_task_cv);
pthread_mutex_unlock(&g_task_lock);
// 销毁锁
pthread_mutex_destroy(&g_task_lock);
// 销毁条件变量
pthread_cond_destroy(&g_task_cv);
// 退出主线程
pthread_exit(NULL);
}
首先,我们创建了 10 个任务,每个任务一个字符,放在一个数组里面,另外有两个变量 head 和 tail,表示当前分配的工作从哪里开始,到哪里结束。如果 head 等于 tail,则当前的工作分配完毕;如果 tail 加 N,就是新分配了 N 个工作。
接下来声明的 pthread_mutex_t g_task_lock
和 pthread_cond_t g_task_cv
,是用于通知和抢任务的,工作模式如下图所示:
第三类数据,线程私有数据(Thread Specific Data),可以通过以下函数创建:
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*))
可以看到,创建一个 key,伴随着一个析构函数。
析构函数与构造函数相对应,构造函数是对象创建的时候自动调用的,而析构函数就是对象在销毁的时候自动调用的的
key 一旦被创建,所有线程都可以访问它,但各线程可根据自己的需要往 key 中填入不同的值,这就相当于提供了一个同名而不同值的全局变量。
我们可以通过下面的函数设置 key 对应的 value。
int pthread_setspecific(pthread_key_t key, const void *value)
我们还可以通过下面的函数获取 key 对应的 value。
void *pthread_getspecific(pthread_key_t key)
而等到线程退出的时候,就会调用析构函数释放 value。