传统进程使用fork创建子进程,但fork调用存在问题:
由于线程
的速度快能很好解决上面的问题。同一进程内的所有线程共享相同的全局内存。这使得线程之间易于共享信息,然而伴随这种简易性而来的却是同步
问题。
#include
//创建线程
int pthread_create(pthread_t *tid,const pthread_attr_t *attr,void *(*func)(void *),void *arg);
//数据类型pthread_t,若新线程创建成功,ID通过tid指针返回。
//创建线程时通过初始化一个取代默认设置的pthread_attr_t变量指的这些属性,通常情况默认设置,attr指定为空指针。
//func:函数,其所指函数作为参数接受一个通用指针(void*),又作为返回值返回一个通用指针(void*)。
//函数唯一调用参数是指针arg。多个参数大包成结构
//等待一个线程终止,类似waitpid
int pthread_join(pthread_t *tid,void **status);
//必须指定要等待线程的tid,status指针为空,等待线程的返回值将存入由status指向的位置。
//返回线程的ID,线程可以获取自身ID,类似getpid
pthread_t pthread_self(void);
//设置脱离线程,这样就可以不用阻塞在pthread_join()函数中。
//常见的用法:在子线程中调用 pthread_detach(pthread_self());
//或者在父进程中:pthread_detach(thread_id tid);
int pthread_detach(pthread_t tid);
//线程终止
void pthread_exit(void *status);
//另外两个方法:
//启动线程函数(pthread_creat的第三个参数)可以返回,声明成功返回一个void指针。
//mian函数调用exit。
#include
#include
#include
#include
#include
#include
#include
//打印线程内的信息,包括进程ID和线程ID。
void printids(const char *s)
{
pid_t pid;
pthread_t tid;
pid = getpid();
tid = pthread_self();
printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid, (unsigned long)tid, (unsigned long)tid);
}
//两种不同的退出方式,一种是返回 void*;一种是调用pthread_exit();
void *thr_fn_exit(void *arg)
{
printids("thread 1: ");
printf("thread 1 returning\n");
sleep(1);
return ((void*) 1);
}
void *thr_fn_exit1(void *arg)
{
printids("thread 2: ");
printf("thread 2 exiting\n");
sleep(1);
pthread_exit( (void*) 2);
}
int main()
{
int err;
pthread_t tid1, tid2;
void *tret;
printids("Main thread: ");
err = pthread_create(&tid1,NULL,thr_fn_exit, NULL);
if(err != 0)
printf("creat error:%s\n",strerror(err));
err = pthread_create(&tid2, NULL, thr_fn_exit1, NULL);
if (err != 0)
printf("creat error:%s\n",strerror(err));
err = pthread_join(tid1, &tret);
printf("thread 1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if(err != 0)
printf("get end error:%s\n",strerror(err));
printf("thread 2 exit code %ld\n",(long)tret);
return 0;
}
在单线程程序中,我们经常要用到"全局变量"以实现多个函数间共享数据, 然而在多线程环境下,由于数据空间是共享的,因此全局变量也为所有线程所共有。
要提供线程私有的全局变量
,仅在某个线程中有效,但却可以跨多个函数访问。POSIX线程库通过维护一定的数据结构来解决这个问题,这个些数据称为线程特定数据( TSD)
。
线程特定数据基于每线程进行维护,TSD是定义和引用线程专用数据的唯一方法,线程特定数据看似很复杂,其实我们可以把它理解为就是一个索引(key)和指针。key结构
中存储的是索引,pthread结构
中存储的是指针,指向线程中的私有数据,通常是malloc函数返回的指针。
系统支持有效个数线程特定数据:
pthread_key_create
创建一个新的线程特定数据元素时,系统搜索其所在进程的key结构
数组,找出其中第一个未使用的元素,并通过keyptr
返回该元素的键,即就是我们前面说的索引。pthread_key_create
函数的第二个参数destructor
是一个函数指针,指向一个析构函数,用于线程结束以后的一些后期后期处理工作,析构函数的额参数就是线程特定数据的指针。
key结构
数组外,系统还在进程中维护关于每个线程的线程结构,把这个特定于线程的结构称为pthread结构,如下所示。它的128个指针和进程中的128个可能的键(索引)是逐一关联的。指针指向的内存就是线程特有数据。
下面看一个具体的过程,
启动一个进程并创建了若干线程,其中一个线程(比如线程1),要申请线程私有数据,系统调用pthread_key_creat()
在key结构数组中找到第一个未用的元素,并把它的的索引(0-127)
,返回给调用者,假设返回的索引是1,线程之后通过pthrea_getspecific(
)调用获得本线程的pkey[1]
值,返回的是一个空指针ptr = null
,这个指针就是我们可以通过索引1使用的线程数据的首地址了,但是他现在为空,因此根据实际情况用malloc
分配一快内存,在使用pthread_setspecific()
调用将特定数据的指针指向刚才分配到内存区域。整个过程结束后key结构和pthread结构如下所示。
#include
//key只初始化一次
int pthread_once(pthread_once_t *onceptr, void (*init)(void));
//分配用于标识进程中线程特定数据的键。键对进程中的所有线程来说是全局的。
int pthread_key_create(pthread_key_t *keyptr, void (*destructor)(void *value));
//设置线程特定数据
int pthread_setspecific(pthread_key_t key, const void *value);
pthread_once_t initflag = PTHREAD_ONCE_INIT;
//获取线程特定数据
void *pthread_getspecific(pthread_key_t key);
//删除线程特定数据键
int pthread_key_delete(pthread_key_t *key);
例:
/* 设置/获取线程特定数据
在两个线程中分别设置/获取线程特定数据, 查看两个线程中的数据是否是一样的(肯定是不一样的O(∩_∩)O~)
*/
pthread_key_t key;
typedef struct Tsd
{
pthread_t tid;
char *str;
} tsd_t;
//用来销毁每个线程所指向的实际数据
void destructor_function(void *value)
{
free(value);
cout << "destructor ..." << endl;
}
void *thread_routine(void *args)
{
//设置线程特定数据
tsd_t *value = (tsd_t *)malloc(sizeof(tsd_t));
value->tid = pthread_self();
value->str = (char *)args;
pthread_setspecific(key, value);
printf("%s setspecific, address: %p\n", (char *)args, value);
//获取线程特定数据
value = (tsd_t *)pthread_getspecific(key);
printf("tid: 0x%x, str = %s\n", (unsigned int)value->tid, value->str);
sleep(2);
//再次获取线程特定数据
value = (tsd_t *)pthread_getspecific(key);
printf("tid: 0x%x, str = %s\n", (unsigned int)value->tid, value->str);
pthread_exit(NULL);
}
int main()
{
//这样每个线程当中都会有一个key可用了,
//但是每个key所绑定的实际区域需要每个线程自己指定
pthread_key_create(&key, destructor_function);
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, thread_routine, (void *)"thread1");
pthread_create(&tid2, NULL, thread_routine, (void *)"thread2");
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_key_delete(key);
return 0;
}
/* 运用pthread_once, 让key只初始化一次
注意: 将对key的初始化放入到init_routine中
*/
#include
#include
#include
#include
pthread_key_t key;
pthread_once_t once_control = PTHREAD_ONCE_INIT;
typedef struct Tsd
{
pthread_t tid;
char *str;
} tsd_t;
//线程特定数据销毁函数,
//用来销毁每个线程所指向的实际数据
void destructor_function(void *value)
{
free(value);
printf("destructor ...\n");
}
//初始化函数, 将对key的初始化放入该函数中,
//可以保证inti_routine函数只运行一次
void init_routine()
{
pthread_key_create(&key, destructor_function);
printf("init...\n");
}
void *thread_routine(void *args)
{
pthread_once(&once_control, init_routine);
//设置线程特定数据
tsd_t *value = (tsd_t *)malloc(sizeof(tsd_t));
value->tid = pthread_self();
value->str = (char *)args;
pthread_setspecific(key, value);
printf("%s setspecific, address: %p\n", (char *)args, value);
//获取线程特定数据
value = (tsd_t *)pthread_getspecific(key);
printf("tid: 0x%x, str = %s\n", (unsigned int)value->tid, value->str);
sleep(2);
//再次获取线程特定数据
value = (tsd_t *)pthread_getspecific(key);
printf("tid: 0x%x, str = %s\n", (unsigned int)value->tid, value->str);
pthread_exit(NULL);
}
int main()
{
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, thread_routine, (void *)"thread1");
pthread_create(&tid2, NULL, thread_routine, (void *)"thread2");
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_key_delete(key);
return 0;
}
并发编程(线程编程)运行和访问相同变量,多个线程一起,但只有一个线程在一定时间访问,这时需要互斥锁通过加锁来保护共享的数据,具有锁才能访问。
#include
int pthread_mutex_lock(pthread_mutex_t *mptr);//上锁
int pthread_mutex_unlock(pthread_mutex_t *mptr);//解锁
//如果试图上锁已被另外某个线程锁住的互斥锁,本线程将被堵塞直到互斥锁被解锁。
//如果互斥锁变量是静态分配的,应该初始长值为PTHREAD_MUTEX_INITALIZER
//有的系统把PTHREAD_MUTEX_INITALIZER定义为0,忽略初始化
#include
#include
int counter;
void *doit(void*);
int main(int argc, char **argv)
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, &doit, NULL);
pthread_create(&tidB, NULL, &doit, NULL);
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
return 0;
}
void *doit(void * arg)
{
int i, val;
for(i=0; i<10; i++)
{
val = counter;
printf("counter is %d
", val+1);
counter = val+1;
}
return NULL;
}
//线程的运行是并发运行的,counter值的修改的结果是不定的。
//修改后:
#include
#include
int counter;
pthread_mutex_t counter_mutex;
void *doit(void*);
int main(int argc, char **argv)
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, &doit, NULL);
pthread_create(&tidB, NULL, &doit, NULL);
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
return 0;
}
void *doit(void * arg)
{
int i, val;
for(i=0; i<10; i++)
{
pthread_mutex_lock(&counter_mutex);
val = counter;
printf("counter is %d
", val+1);
counter = val+1;
pthread_mutex_unlock(&counter_mutex);
}
return NULL;
}
下列展示互斥锁用于上锁,而不能用于等待,所有生产者线程都启动后立即启动消费者线程。这样在生产者线程产生数据的同时,消费者线程就能处理它。
#define MAXNITEMS 1000000
#define MAXNTHREADS 100
int nitems; /* 生产者和消费者只读 */
struct {
pthread_mutex_t mutex;
int buff[MAXNITEMS];
int nput;
int nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };
void *produce(void *), *consume(void *);
int main(int argc, char **argv)
{
int i, nthreads, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3)
err_quit("usage: prodcons3 <#items> <#threads>");
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);
/* 创建所有生产者和一个消费者 */
set_concurrency(nthreads + 1);
for (i = 0; i < nthreads; i++) {
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
pthread_create(&tid_consume, NULL, consume, NULL);
/* 等待所有生产者和消费者 */
for (i = 0; i < nthreads; i++) {
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
pthread_join(tid_consume, NULL);
exit(0);
}
//生产者
void * produce(void *arg)
{
for ( ; ; ) {
Pthread_mutex_lock(&shared.mutex);
if (shared.nput >= nitems) {
Pthread_mutex_unlock(&shared.mutex);
return(NULL); /* 数组已满,完成*/
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
Pthread_mutex_unlock(&shared.mutex);
*((int *) arg) += 1;
}
}
/*消费者*/
void consume_wait(int i)
{
//等待生产者
for ( ; ; ) {
Pthread_mutex_lock(&shared.mutex);
if (i < shared.nput) {
Pthread_mutex_unlock(&shared.mutex);
return; /*一个项目准备就绪 */
}
Pthread_mutex_unlock(&shared.mutex);
}
}
void * consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
//消息者必须等待
consume_wait(i);
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return(NULL);
}
互斥锁防止多个线程同时访问同一共享变量,只用互斥量很可能会引起死锁。条件变量允许一个线程就某个共享变量的状态变化通知其他线程,并让其他线程等待这一通知。
条件变量常和互斥锁一起使用,互斥量主要用来保证对临界区的互斥进入,而条件变量则用于线程的阻塞等待,互斥锁定进入临界区以后,若条件不满足,线程便转为等待状态,等待条件满足后被唤醒执行,否则继续执行,执行完后开锁。
#include
//使用条件变量前必须使用对其初始化
//对于经由静态分配的条件变量,将其赋值为 PTHREAD_COND_INITALIZER 即完成初始化操作。
pthread_count_t cond = PTHREAD_COND_INITALIZER
//对条件变量进行动态初始化。未采用默认属性由静态分配的条件变量进行初始化必须使用此函数。
int pthread_code_init(pthread_code_t conde,const pthread_condattr_t *atrr);
//当不再需要一个经由自动或动态分配的条件变量时,应调此函数予以销毁。
// PTHREAD_COND_INITIALIZER 不需调用
int pthread_cond_destroy(pthread_cond_t *cond);
//等待
int pthread_code_wait(pthread_code_t *cond,pthread_mutex_t *mutex);//阻塞当前线程,等待某个即将终止的线程发送cond通知
//cond:所等待的条件变量。
//mutex:所使用的互斥锁。
int pthread_code_signal(pthread_code_t *cond);//发送信号给一个或者多个正在处于阻塞等待状态的线程,某个共享变量已经改变。使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。
//函数只保证唤醒至少一条遭到阻塞的线程
//相当pthread_code_broadcast于高效
//和pthread_code_signal差不多
int pthread_code_broadcast(pthread_cond_t *cond);//唤醒所有阻塞的线程
//和函数pthread_cond_wait()几近相同
//唯一区别在于由参数abstime来指定一个线程等待条件变量通知时休眠时间的上限
//参数 abstime 是一个timespec 类型的结构,
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
//cond:所等待的条件变量。
//mutex:所使用的互斥锁。
//abstime:指定等待的时间范围。
例:
C++11中提供了condition_variable函数,实际上是对上面函数的封装。
条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待条件变量的条件成立而挂起;另一个线程使条件成立(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥量结合在一起。
#include
C++11中提供了两种封装:condition_variable和condition_variable_any
std::mutex
一起使用。std::mutex
一起工作,而后者可以和任何满足最低标准的互斥量一起工作,从而加上了_any
的后缀。condition_variable_any
会产生额外的开销。一般只推荐使用condition_variable。除非对灵活性有硬性要求,才会考虑condition_variable_any。
条件变量的构造函数:
std::condition_variable::condition_variable
constructor:
condition_variable(); //默认构造函数无参
condition_variable(const condition_variable&) = delete; //删除拷贝构造函数
//条件变量的wait函数:
void wait( std::unique_lock<std::mutex>& lock );
//Predicate是lambda表达式。
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
//以上二者都被notify_one())或notify_broadcast()唤醒,但是
//第二种方式是唤醒后也要满足Predicate的条件。
//如果不满足条件,继续解锁互斥量,然后让线程处于阻塞或等待状态。
//第二种等价于
while (!pred())
{
wait(lock);
}
//创建连个线程分别来处理value为奇数和偶数的情况 ,用互斥锁与条件变量来对共享数据value进行管理。
#include
#include
#include
#include
#include
#define MAX_SIZE 10
int value = 0;
pthread_mutex_t mutex; //定义一个全局的互斥锁
pthread_cond_t dan, shuang; //定义两个条件变量来表示单数信号和双数信号
void * pthread_fun1(void *arg)
{
pthread_mutex_lock(&mutex); //对共享数据value上锁
while(value < MAX_SIZE){
if(value % 2 == 1){ //value为奇数
printf("fun1 value = %d\n", value);
value++; //value值加1
pthread_cond_signal(&shuang); //发送为双数信号。通知双数的处理线程pthread_fun2().
}else{ //value不为奇数
pthread_cond_wait(&dan, &mutex); //阻塞程序,并解开互斥锁。等待接收到条件变量dan后再进行上锁并使函数继续运行。
}
}
pthread_mutex_unlock(&mutex); //对线程解锁。
}
void* pthread_fun2(void *arg)
{
pthread_mutex_lock(&mutex); //对该线程上锁,如果该互斥量已经被上锁则程序阻塞在该处,直到其他线程对其解锁,它的到资源为止。
while(value < MAX_SIZE){ //如果vallue在预定范围内
if(value % 2 == 0){ //value是偶数
printf("fun2 value = %d\n", value);
value ++;
pthread_cond_signal(&dan); //发送为双数信号。通知奇数的处理线程pthread_fun1().
}else{
pthread_cond_wait(&shuang, &mutex); //阻塞程序,并解开互斥锁。等待接收到条件变量 shuang 后再进行上锁并使函数继续运行
}
}
pthread_mutex_unlock(&mutex); //该线程运行结束前,解开互斥锁。
}
int main()
{
pthread_mutex_init(&mutex,NULL); //互斥锁初始化
pthread_t tid[2];
pthread_create(&tid[1], NULL, pthread_fun2, &tid[0]); //创建一个线程用来执行pthread_fun2()操作。
sleep(1);
pthread_create(&tid[0], NULL, pthread_fun1, NULL); //创建一个线程,用来执行pthread_fun1()操作。
pthread_join(tid[0], NULL); //等待线程1结束
pthread_join(tid[1], NULL); //等待线程2结束
pthread_mutex_destroy(&mutex); //销毁互斥锁。
return 0;
}