这一部分介绍linux中常见的各种锁的机制和进程间通信机制。进程间通信只介绍消息队列和共享内存,其他的不复杂,如果有兴趣的话可以研究一下源码,会操作使用就可以了。
内核可以不受限制地访问整个地址空间。在多处理器系统上,这会引起一些问题。如果几个处理器同时处于内核态,则理论上它们可以同时访问同一个数据结构。在第一个提供了SMP功能的内核版本中,该问题的解决方案非常简单,即每次只允许一个处理器处于核心态。因此,对数据未经协调的并行访问被自动排除了。令人遗憾的是,该方法因为效率不高,很快被废弃了。现在内核使用由锁组成的细粒度网络,来明确地保护各个数据结构。如果处理器A在操作数据结构S,则处理器B可以执行任何其他的内核操作,但不能操作S。
内核为此提供了各种锁选项,分别优化不同的内核数据使用模式。
先介绍两个概念。
一个数据包到达时,进程必须执行一些操作,才能正常保存数据:
多个进程在执行以上操作的过程中,有可能出现竞态。进程在访问资源的时候彼此干扰的情况通常称为竞态条件(race condition)。
由于导致竞态条件的情况非常罕见,因此需要提出一个问题:是否值得做一些(有时候是大量的)工作来保护代码避免竞态条件。在某些环境中(比如航空飞机的控制系统、重要机械的监控、危险装备),竞态条件是致命问题。
每个进程中访问临界资源的那段代码称为临界区(Critical Section)(临界资源是一次仅允许一个进程使用的共享资源)。
举几个属于临界资源的例子,硬件如打印机,软件如消息队列、数组、缓冲区。进程间采取互斥方式,实现对这些资源的共享。
多个进程中涉及到同一个临界区,叫相关临界区。进程进入临界区时其他进程需要等待,这就需要一些同步机制,在进程进入和离开临界区时进行实现。
进程进入临界区的调度规则:
自旋锁是最常用的锁选项。它们用于短期保护某段代码,以防止其他处理器的访问。在内核等待自旋锁释放时,会重复检查是否能获取锁,而不会进入睡眠状态(忙等待)。当然,如果等待时间较长,则效率显然不高。注意,在单核单cpu系统上使用自旋锁是没用的。
自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,该处理器自旋等待(也称为忙等待)。进程、软中断和硬件中断都可以使用自旋锁。
目前内核的自旋锁是排队自旋锁(queued spinlock,也称为"FIFO ticket spinlock"),核心算法类似银行柜台排队叫号,下面解释一下:
看一看Linux内核自旋锁源码。
//进程在临界区内可以被抢占和睡眠,但raw_spinlock还是自旋锁
//如果打上实时内核补丁,那么spinlock使用实时互斥锁保护临界区
//到目前为止,还没有合并实时内核补丁,补丁代码可以兼容实时内核,最好坚持3个原则:
/* 1. 尽可能使用spinlock
* 2. 绝对不允许被抢占和睡眠的地方使用raw_spinlock,否则使用spinlock
* 3. 如果临界区足够小,使用绝对不允许被抢占和睡眠的raw_spinlock
*/
typedef struct spinlock {
union {
struct raw_spinlock rlock;
# ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
} spinlock_t;
跟一下,看下原生自旋锁
typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} raw_spinlock_t;
处理器架构都要定义自己的数据类型arch_spinlock_t,举个arm64架构的例子:
typedef struct {
#ifdef __AARCH64EB__ //大端字节序(高位存放在低地址)
u16 next; //排队号
u16 owner; //服务号
#else //小端字节序(低位存放在低地址)
u16 owner;
u16 next;
#endif
} __aligned(4) arch_spinlock_t;
//申请自旋锁,如果锁被其他处理器占有,当前处理器自旋等待
static __always_inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
//申请自旋锁,并且禁止当前处理器的软中断
static __always_inline void spin_lock_bh(spinlock_t *lock)
{
raw_spin_lock_bh(&lock->rlock);
}
//申请自旋锁,如果申请成功返回1,如果锁被其他处理器占有,当前处理器不等待,直接返回0
static __always_inline int spin_trylock(spinlock_t *lock)
{
return raw_spin_trylock(&lock->rlock);
}
//对应的释放锁函数
static __always_inline void spin_unlock(spinlock_t *lock)
{
raw_spin_unlock(&lock->rlock);
}
//对应的释放锁函数
static __always_inline void spin_unlock_bh(spinlock_t *lock)
{
raw_spin_unlock_bh(&lock->rlock);
}
//释放自旋锁,恢复当前主机硬中断的状态
static __always_inline void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)
{
raw_spin_unlock_irqrestore(&lock->rlock, flags);
}
跟一下spin_lock代码,还是arm64架构
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
unsigned int tmp;
arch_spinlock_t lockval, newval;
asm volatile(
/* Atomically increment the next ticket. */
ARM64_LSE_ATOMIC_INSN(
/* LL/SC */
" prfm pstl1strm, %3\n"
"1: ldaxr %w0, %3\n"
" add %w1, %w0, %w5\n"
" stxr %w2, %w1, %3\n"
" cbnz %w2, 1b\n",
/* LSE atomics */
" mov %w2, %w5\n"
" ldadda %w2, %w0, %3\n"
__nops(3)
)
/* Did we get the lock? */
" eor %w1, %w0, %w0, ror #16\n"
" cbz %w1, 3f\n"
/*
* No: spin on the owner. Send a local event to avoid missing an
* unlock before the exclusive load.
*/
" sevl\n"
"2: wfe\n"
" ldaxrh %w2, %4\n"
" eor %w1, %w2, %w0, lsr #16\n"
" cbnz %w1, 2b\n"
/* We got the lock. Critical section starts here. */
"3:"
: "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
: "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
: "memory");
}
核心是汇编代码,主要看注释。
下面再跟一下spin_unlock函数,也是汇编。
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
unsigned long tmp;
asm volatile(ARM64_LSE_ATOMIC_INSN( //原子相关操作
/* LL/SC */
" ldrh %w1, %0\n"
" add %w1, %w1, #1\n"
" stlrh %w1, %0",
/* LSE atomics */
//如果处理器支持大系统扩展,原子加法指令staddlh实现
//前面提到的加载/存储指令必须在staddlh之前执行
" mov %w1, #1\n"
" staddlh %w1, %0\n"
__nops(1))
: "=Q" (lock->owner), "=&r" (tmp)
:
: "memory");
}
读写自旋锁(又称为读写锁)是对自旋锁的改进,区分读者和写者,允许多个读者同时进入临界区,读者和写者互斥,写者和写者互斥。如果读者占用读锁,写者申请写锁的时候自旋等待。如果写者占有写锁,读者申请读锁的时候自旋等待。
看下代码
typedef struct {
arch_rwlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} rwlock_t;
继续跟arch_rwlock_t,不同处理器架构都要自己定义数据类型arch_rwlock_t,arm64架构定义如下
typedef struct {
volatile unsigned int slock;
} arch_spinlock_t;
互斥锁只允许一个进程进入临界区,适合保护比较长的临界区,因为竞争互斥锁时进程可能睡眠和再次唤醒,代价很高。尽管可以把二值信号当作互斥锁使用,但是内核单独实现互斥锁,内核源码的互斥锁定义如下
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
介绍几个申请互斥锁常用函数
//申请互斥锁,如果锁被占有,进程深度睡眠
#define mutex_lock(lock) mutex_lock_nested(lock, 0)
//申请互斥锁,如果锁被占有,进程轻度睡眠
#define mutex_lock_interruptible(lock) mutex_lock_interruptible_nested(lock, 0)
//申请互斥锁,如果锁被占有,进程中度睡眠
#define mutex_lock_killable(lock) mutex_lock_killable_nested(lock, 0)
//申请互斥锁,如果申请成功返回1,如果锁被其他进程占有,该进程不等待,直接返回0
extern int mutex_trylock(struct mutex *lock);
所有的释放互斥锁函数只有一个
extern void mutex_unlock(struct mutex *lock);
实时互斥锁是针对互斥锁进行改进的,实现了优先级继承,解决了优先级反转的问题。
什么是优先级反转呢?假设进程1的优先级低,进程2的优先级高,此时进程1占有实时互斥锁。此时,进程2申请实时互斥锁,因为进程1已经占有锁,所以进程2必须睡眠等待,导致优先级高的进程2等待优先级低的进程1。如果此时再来一个进程3,而且优先级介于进程1、2之间,就出现问题了。因为进程3比进程1优先级高,所以进程3可以抢占进程1,导致进程1持有实时互斥锁的时间延长,从而进程2的等待时间延长。这看起来就像是优先级低的进程3反而在优先级高的进程2之前执行,优先级发生了反转。
解决措施是优先级继承。把进程1的优先级临时提高到进程2的优先级,防止进程3抢占进程1,使进程1尽快执行完临界区,减少进程2等待时间。
原子操作是最简单的锁操作。它们保证简单的操作,通常用来保证整数的互斥访问,诸如计数器加1之类,可以不中断地原子执行。即使操作由几个汇编语句组成,也可以保证互斥性。
进程间通信方式,一般有pipe、fifo、signal、message、semaphore、shared memory、socket。这里只介绍下面两种。
消息队列是消息的链表,包括 Posix 消息队列和 System V 消息队列。消息队列克服了信号承载信息量少、管道只能承载无格式字节流以及缓冲区大小受限等缺点,克服了早期 linux通信机制的一些缺点。消息队列将消息看作一个记录,具有特定的格式以及特定的优先级,对消息队列有写权限的进程可以向中按照一定的规则添加新消息。对消息队列有读权限的进程则可以从消息队列中读取消息,消息队列是随内核持续的。
/* one msg_msg structure for each message */
struct msg_msg {
struct list_head m_list;
long m_type;
size_t m_ts; /* message text size */
struct msg_msgseg *next;
void *security;
/* the actual message follows immediately */
};
/* one msq_queue structure for each present queue on the system */
struct msg_queue {
struct kern_ipc_perm q_perm;
time_t q_stime; /* last msgsnd time */
time_t q_rtime; /* last msgrcv time */
time_t q_ctime; /* last change time */
unsigned long q_cbytes; /* current number of bytes on queue */
unsigned long q_qnum; /* number of messages in queue */
unsigned long q_qbytes; /* max number of bytes on queue */
pid_t q_lspid; /* pid of last msgsnd */
pid_t q_lrpid; /* last receive pid */
struct list_head q_messages; //指向一个消息的链表(存放等待读取的消息)
struct list_head q_receivers; //接收者链表(指向一个等待接收的进程(阻塞进程))
struct list_head q_senders; //发送者链表(指向一个等待发送的进程(阻塞进程))
};
需要包含以下头文件
#include
#include
#include
主要介绍以下四个函数
在程序上层可以直接调用msgsnd(msqid,&msgs,sizeof(struct msgstru),IPC_NOWAIT) 这样的形式来发送消息,但是在底层是用以下的形式来调用
/* 解释一下各参数:
* msgid:消息队列标识符
* msgp:发送给队列的消息
* msgsz:要发送消息的大小
* msgflg:一般为0,表示消息队列满时,该函数会阻塞,直到消息能写入该队列;如果为IPC_NOWAIT,则是以非阻塞方式发送消息
*/
SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,
int, msgflg)
{
long mtype;
if (get_user(mtype, &msgp->mtype))
return -EFAULT;
return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
}
这里需要先解释一下SYSCALL_DEFINE4。对于 SYSCALL_DEFINE4,这相当于是系统调用,首个变量用于函数名,剩下的偶数对参数,依次代表参数类型与参数变量。SYSCALL_DEFINEx,随后的 x 就是对于不同的参数的个数。所以,实际上是调用do_msgsnd函数。
long do_msgsnd(int msqid, long mtype, void __user *mtext,
size_t msgsz, int msgflg)
{
struct msg_queue *msq;
struct msg_msg *msg;
int err;
struct ipc_namespace *ns;
DEFINE_WAKE_Q(wake_q);
ns = current->nsproxy->ipc_ns;
if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
return -EINVAL;
if (mtype < 1)
return -EINVAL;
msg = load_msg(mtext, msgsz);
if (IS_ERR(msg))
return PTR_ERR(msg);
msg->m_type = mtype;
msg->m_ts = msgsz;
rcu_read_lock();
msq = msq_obtain_object_check(ns, msqid);
if (IS_ERR(msq)) {
err = PTR_ERR(msq);
goto out_unlock1;
}
ipc_lock_object(&msq->q_perm);
for (;;) {
struct msg_sender s;
err = -EACCES;
if (ipcperms(ns, &msq->q_perm, S_IWUGO))
goto out_unlock0;
/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
}
err = security_msg_queue_msgsnd(msq, msg, msgflg);
if (err)
goto out_unlock0;
if (msg_fits_inqueue(msq, msgsz))
break;
/* queue full, wait: */
if (msgflg & IPC_NOWAIT) {
err = -EAGAIN;
goto out_unlock0;
}
/* enqueue the sender and prepare to block */
ss_add(msq, &s, msgsz);
if (!ipc_rcu_getref(msq)) {
err = -EIDRM;
goto out_unlock0;
}
ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();
schedule();
rcu_read_lock();
ipc_lock_object(&msq->q_perm);
ipc_rcu_putref(msq, msg_rcu_free);
/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
}
ss_del(&s);
if (signal_pending(current)) {
err = -ERESTARTNOHAND;
goto out_unlock0;
}
}
msq->q_lspid = task_tgid_vnr(current);
msq->q_stime = get_seconds();
if (!pipelined_send(msq, msg, &wake_q)) {
/* no one is waiting for this message, enqueue it */
list_add_tail(&msg->m_list, &msq->q_messages);
msq->q_cbytes += msgsz;
msq->q_qnum++;
atomic_add(msgsz, &ns->msg_bytes);
atomic_inc(&ns->msg_hdrs);
}
err = 0;
msg = NULL;
out_unlock0:
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
if (msg != NULL)
free_msg(msg);
return err;
}
得到消息队列标识符或创建一个消息队列对象并返回消息队列标识符。
SYSCALL_DEFINE2(msgget, key_t, key, int, msgflg)
{
struct ipc_namespace *ns;
static const struct ipc_ops msg_ops = {
.getnew = newque,
.associate = msg_security,
};
struct ipc_params msg_params;
ns = current->nsproxy->ipc_ns;
msg_params.key = key;
msg_params.flg = msgflg;
return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params);
}
说明一下参数。
key值:用于为消息队列生成(计算出)唯一的消息队列ID。我们可以指定三种形式的key值:
msgflg值:指定创建时的原始权限,比如0664。创建一个新的消息队列时,除了原始权限,还需要指定IPC_CREAT选项。
msgid = msgget(key, 0664|IPC_CREAT);
返回值
接收消息,从消息队列中取出别人所放的某个编号的消息。
SYSCALL_DEFINE5(msgrcv, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,
long, msgtyp, int, msgflg)
{
return do_msgrcv(msqid, msgp, msgsz, msgtyp, msgflg, do_msg_fill);
}
参数:
返回值:成功返回消息正文的字节数;失败返回-1,errno被设置。
根据cmd指定的要求,去控制消息队列。主要是获取消息队列的属性信息、修改消息队列的属性信息、删除消息队列等等。我们调用msgctl函数的最常见目的就是删除消息队列,事实上,删除消息队列只是各种消息队列控制中的一种。
SYSCALL_DEFINE3(msgctl, int, msqid, int, cmd, struct msqid_ds __user *, buf)
{
int version;
struct ipc_namespace *ns;
if (msqid < 0 || cmd < 0)
return -EINVAL;
version = ipc_parse_version(&cmd);
ns = current->nsproxy->ipc_ns;
switch (cmd) {
case IPC_INFO:
case MSG_INFO:
case MSG_STAT: /* msqid is an index rather than a msg queue id */
case IPC_STAT:
return msgctl_nolock(ns, msqid, cmd, version, buf);
case IPC_SET:
case IPC_RMID:
return msgctl_down(ns, msqid, cmd, buf, version);
default:
return -EINVAL;
}
}
参数:
有的时候需要给第三个参数,有时不需要,取决于cmd的设置。
共享内存就是允许两个或多个进程共享一定的存储区。就如同 malloc() 函数向不同进程返回了指向同一个物理内存区域的指针。当一个进程改变这块地址中内容的时候,其它进程都会察觉到这个更改。因为数据不需要在客户机和服务器端之间复制,数据直接写到内存,不用若干次数据拷贝,所以这是最快的一种IPC。备注:共享内存没有任何的同步与互斥机制,所以要使用信号量来实现对共享内存的存取的同步。共享内存是IPC通信中传输速度最快的通信方式没有之一。
共享内存的生命周期与内核一致,即使所有访问共享内存区域对象的进程都已结束,共享内存区域对象仍然在内核中存在,除非显式删除共享内存区域对象。在内核重新引导之前,共享内存区域对象的任何改写将会保留,即共享内存区域对象的生命周期与系统内核的相同,且共享内存区域对象的作用域是整个系统内核的生命周期。
共享内存原理结构图如下:
共享内存也有缺陷:共享内存并未提供同步机制,一个进程在对共享内存进行写操作结束之前,并没有机制可以阻止对另一个进程的读写操作。所以通常使用信号量来实现对共享内存同步访问的控制。
//得到一个共享内存标识符或创建一个共享内存对象并返回共享内存标识符
//成功:返回共享内存的标识符;出错:返回-1并设置errno
int shmget(key_t key, // 0(IPC_PRIVATE):会建立新共享内存对象;大于0:视参数shmflg来确定操作。通常要求此值来源于ftok返回的IPC键值
size_t size, //大于0:新建的共享内存大小,以字节为单位;0:只获取共享内存时指定为0
int shmflg); //IPC_CREAT:如果内核中不存在键值与key相等的共享内存,则新建一个共享内存;如果存在这样的共享内存,返回此共享内存的标识符。IPC_CREAT|IPC_EXCL:如果内核中不存在键值与key相等的共享内存,则新建一个消息队列;如果存在这样的共享内存则报错
//连接共享内存标识符为shmid的共享内存,连接成功后把共享内存区对象映射到调用进程的地址空间,随后可像本地空间一样访问
//成功:返回附加好的共享内存地址;出错返回-1并设置errno
void *shmat(int shmid, //共享内存标识符
const void *shmaddr, //指定共享内存出现在进程内存地址的什么位置,直接指定为NULL让内核自己决定一个合适的地址位置
int shmflg); //SHM_RDONLY:为只读模式,其他为读写模式
//是用来断开与共享内存附加点的地址,禁止本进程访问此片共享内存
//成功:返回0;出错:返回-1并设置errno
int shmdt(const void *shmaddr); //连接的共享内存的起始地址
//完成对共享内存的控制
//成功:返回0;出错:返回-1并设置errno
int shmctl(int shmid, //共享内存标识符
int cmd, //IPC_SET:改变共享内存的状态;IPC_RMID:删除这片共享内存
struct shmid_ds *buf) //共享内存管理结构体
简单举一个例子,说明一下使用方法。
在c/s模型中
服务器
客户端