• linux锁与进程间通信


    这一部分介绍linux中常见的各种锁的机制和进程间通信机制。进程间通信只介绍消息队列和共享内存,其他的不复杂,如果有兴趣的话可以研究一下源码,会操作使用就可以了。

    内核可以不受限制地访问整个地址空间。在多处理器系统上,这会引起一些问题。如果几个处理器同时处于内核态,则理论上它们可以同时访问同一个数据结构。在第一个提供了SMP功能的内核版本中,该问题的解决方案非常简单,即每次只允许一个处理器处于核心态。因此,对数据未经协调的并行访问被自动排除了。令人遗憾的是,该方法因为效率不高,很快被废弃了。现在内核使用由锁组成的细粒度网络,来明确地保护各个数据结构。如果处理器A在操作数据结构S,则处理器B可以执行任何其他的内核操作,但不能操作S。
    内核为此提供了各种锁选项,分别优化不同的内核数据使用模式。
    先介绍两个概念。

    竞态条件

    一个数据包到达时,进程必须执行一些操作,才能正常保存数据:

    1. 从接口读取数据
    2. 用序号counter构造文件名,打开一个文件
    3. 将序号加1
    4. 将数据写入文件,然后关闭文件

    多个进程在执行以上操作的过程中,有可能出现竞态。进程在访问资源的时候彼此干扰的情况通常称为竞态条件(race condition)。
    由于导致竞态条件的情况非常罕见,因此需要提出一个问题:是否值得做一些(有时候是大量的)工作来保护代码避免竞态条件。在某些环境中(比如航空飞机的控制系统、重要机械的监控、危险装备),竞态条件是致命问题。

    临界区

    每个进程中访问临界资源的那段代码称为临界区(Critical Section)(临界资源是一次仅允许一个进程使用的共享资源)。
    举几个属于临界资源的例子,硬件如打印机,软件如消息队列、数组、缓冲区。进程间采取互斥方式,实现对这些资源的共享。
    多个进程中涉及到同一个临界区,叫相关临界区。进程进入临界区时其他进程需要等待,这就需要一些同步机制,在进程进入和离开临界区时进行实现。
    进程进入临界区的调度规则:

    1. 如果有若干进程要求进入空闲的临界区,一次仅允许一个进程进入
    2. 任何时候,处于临界区的进程不得超过一个
    3. 进入临界区的进程要在有限的时间内退出,以便其他进程能及时进入自己的临界区
    4. 如果进程不能进入自己的临界区,则应让出cpu,避免进程出现“忙等待”现象

    自旋锁

    自旋锁是最常用的锁选项。它们用于短期保护某段代码,以防止其他处理器的访问。在内核等待自旋锁释放时,会重复检查是否能获取锁,而不会进入睡眠状态(忙等待)。当然,如果等待时间较长,则效率显然不高。注意,在单核单cpu系统上使用自旋锁是没用的。
    自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,该处理器自旋等待(也称为忙等待)。进程、软中断和硬件中断都可以使用自旋锁。
    目前内核的自旋锁是排队自旋锁(queued spinlock,也称为"FIFO ticket spinlock"),核心算法类似银行柜台排队叫号,下面解释一下:

    1. 锁拥有排队号和服务号,服务号是当前占有锁的进程的排队号。
    2. 每个进程申请锁的时候,首先申请一个排队号,然后轮询服务号是否等于自己的排队号。如果相等,则表示自己占有自旋锁,可以进入临界区,否则继续轮询。
    3. 当进程释放自旋锁时,把服务号加1,下一个进程查询到服务号等于自己的排队号,退出自旋,进入临界区。

    看一看Linux内核自旋锁源码。

    //进程在临界区内可以被抢占和睡眠,但raw_spinlock还是自旋锁
    //如果打上实时内核补丁,那么spinlock使用实时互斥锁保护临界区
    //到目前为止,还没有合并实时内核补丁,补丁代码可以兼容实时内核,最好坚持3个原则:
    /* 1. 尽可能使用spinlock
     * 2. 绝对不允许被抢占和睡眠的地方使用raw_spinlock,否则使用spinlock
     * 3. 如果临界区足够小,使用绝对不允许被抢占和睡眠的raw_spinlock
    */
    typedef struct spinlock {
    	union {
    		struct raw_spinlock rlock;
    
    # ifdef CONFIG_DEBUG_LOCK_ALLOC
    # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
    		struct {
    			u8 __padding[LOCK_PADSIZE];
    			struct lockdep_map dep_map;
    		};
    #endif
    	};
    } spinlock_t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    跟一下,看下原生自旋锁

    typedef struct raw_spinlock {
    	arch_spinlock_t raw_lock;
    #ifdef CONFIG_GENERIC_LOCKBREAK
    	unsigned int break_lock;
    #endif
    #ifdef CONFIG_DEBUG_SPINLOCK
    	unsigned int magic, owner_cpu;
    	void *owner;
    #endif
    #ifdef CONFIG_DEBUG_LOCK_ALLOC
    	struct lockdep_map dep_map;
    #endif
    } raw_spinlock_t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    处理器架构都要定义自己的数据类型arch_spinlock_t,举个arm64架构的例子:

    typedef struct {
    #ifdef __AARCH64EB__  //大端字节序(高位存放在低地址)
    	u16 next;  //排队号
    	u16 owner;  //服务号
    #else   //小端字节序(低位存放在低地址)
    	u16 owner;
    	u16 next;
    #endif
    } __aligned(4) arch_spinlock_t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    常用函数

    //申请自旋锁,如果锁被其他处理器占有,当前处理器自旋等待
    static __always_inline void spin_lock(spinlock_t *lock)
    {
    	raw_spin_lock(&lock->rlock);
    }
    
    //申请自旋锁,并且禁止当前处理器的软中断
    static __always_inline void spin_lock_bh(spinlock_t *lock)
    {
    	raw_spin_lock_bh(&lock->rlock);
    }
    
    //申请自旋锁,如果申请成功返回1,如果锁被其他处理器占有,当前处理器不等待,直接返回0
    static __always_inline int spin_trylock(spinlock_t *lock)
    {
    	return raw_spin_trylock(&lock->rlock);
    }
    
    //对应的释放锁函数
    static __always_inline void spin_unlock(spinlock_t *lock)
    {
    	raw_spin_unlock(&lock->rlock);
    }
    //对应的释放锁函数
    static __always_inline void spin_unlock_bh(spinlock_t *lock)
    {
    	raw_spin_unlock_bh(&lock->rlock);
    }
    
    //释放自旋锁,恢复当前主机硬中断的状态
    static __always_inline void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)
    {
    	raw_spin_unlock_irqrestore(&lock->rlock, flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    跟一下spin_lock代码,还是arm64架构

    static inline void arch_spin_lock(arch_spinlock_t *lock)
    {
    	unsigned int tmp;
    	arch_spinlock_t lockval, newval;
    
    	asm volatile(
    	/* Atomically increment the next ticket. */
    	ARM64_LSE_ATOMIC_INSN(
    	/* LL/SC */
    "	prfm	pstl1strm, %3\n"
    "1:	ldaxr	%w0, %3\n"
    "	add	%w1, %w0, %w5\n"
    "	stxr	%w2, %w1, %3\n"
    "	cbnz	%w2, 1b\n",
    	/* LSE atomics */
    "	mov	%w2, %w5\n"
    "	ldadda	%w2, %w0, %3\n"
    	__nops(3)
    	)
    
    	/* Did we get the lock? */
    "	eor	%w1, %w0, %w0, ror #16\n"
    "	cbz	%w1, 3f\n"
    	/*
    	 * No: spin on the owner. Send a local event to avoid missing an
    	 * unlock before the exclusive load.
    	 */
    "	sevl\n"
    "2:	wfe\n"
    "	ldaxrh	%w2, %4\n"
    "	eor	%w1, %w2, %w0, lsr #16\n"
    "	cbnz	%w1, 2b\n"
    	/* We got the lock. Critical section starts here. */
    "3:"
    	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
    	: "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
    	: "memory");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    核心是汇编代码,主要看注释。
    下面再跟一下spin_unlock函数,也是汇编。

    static inline void arch_spin_unlock(arch_spinlock_t *lock)
    {
    	unsigned long tmp;
    
    	asm volatile(ARM64_LSE_ATOMIC_INSN(  //原子相关操作
    	/* LL/SC */
    	"	ldrh	%w1, %0\n"
    	"	add	%w1, %w1, #1\n"
    	"	stlrh	%w1, %0",  
    	/* LSE atomics */
    //如果处理器支持大系统扩展,原子加法指令staddlh实现
    //前面提到的加载/存储指令必须在staddlh之前执行
    	"	mov	%w1, #1\n"
    	"	staddlh	%w1, %0\n"
    	__nops(1))
    	: "=Q" (lock->owner), "=&r" (tmp)
    	:
    	: "memory");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    读写自旋锁

    读写自旋锁(又称为读写锁)是对自旋锁的改进,区分读者和写者,允许多个读者同时进入临界区,读者和写者互斥,写者和写者互斥。如果读者占用读锁,写者申请写锁的时候自旋等待。如果写者占有写锁,读者申请读锁的时候自旋等待。
    看下代码

    typedef struct {
    	arch_rwlock_t raw_lock;
    #ifdef CONFIG_GENERIC_LOCKBREAK
    	unsigned int break_lock;
    #endif
    #ifdef CONFIG_DEBUG_SPINLOCK
    	unsigned int magic, owner_cpu;
    	void *owner;
    #endif
    #ifdef CONFIG_DEBUG_LOCK_ALLOC
    	struct lockdep_map dep_map;
    #endif
    } rwlock_t;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    继续跟arch_rwlock_t,不同处理器架构都要自己定义数据类型arch_rwlock_t,arm64架构定义如下

    typedef struct {
        volatile unsigned int slock;
    } arch_spinlock_t;
    
    • 1
    • 2
    • 3

    互斥锁

    互斥锁只允许一个进程进入临界区,适合保护比较长的临界区,因为竞争互斥锁时进程可能睡眠和再次唤醒,代价很高。尽管可以把二值信号当作互斥锁使用,但是内核单独实现互斥锁,内核源码的互斥锁定义如下

    struct mutex {
    	atomic_long_t		owner;
    	spinlock_t		wait_lock;
    #ifdef CONFIG_MUTEX_SPIN_ON_OWNER
    	struct optimistic_spin_queue osq; /* Spinner MCS lock */
    #endif
    	struct list_head	wait_list;
    #ifdef CONFIG_DEBUG_MUTEXES
    	void			*magic;
    #endif
    #ifdef CONFIG_DEBUG_LOCK_ALLOC
    	struct lockdep_map	dep_map;
    #endif
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    介绍几个申请互斥锁常用函数

    //申请互斥锁,如果锁被占有,进程深度睡眠
    #define mutex_lock(lock) mutex_lock_nested(lock, 0)
    //申请互斥锁,如果锁被占有,进程轻度睡眠
    #define mutex_lock_interruptible(lock) mutex_lock_interruptible_nested(lock, 0)
    //申请互斥锁,如果锁被占有,进程中度睡眠
    #define mutex_lock_killable(lock) mutex_lock_killable_nested(lock, 0)
    //申请互斥锁,如果申请成功返回1,如果锁被其他进程占有,该进程不等待,直接返回0
    extern int mutex_trylock(struct mutex *lock);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    所有的释放互斥锁函数只有一个

    extern void mutex_unlock(struct mutex *lock);
    
    • 1

    实时互斥锁

    实时互斥锁是针对互斥锁进行改进的,实现了优先级继承,解决了优先级反转的问题。
    什么是优先级反转呢?假设进程1的优先级低,进程2的优先级高,此时进程1占有实时互斥锁。此时,进程2申请实时互斥锁,因为进程1已经占有锁,所以进程2必须睡眠等待,导致优先级高的进程2等待优先级低的进程1。如果此时再来一个进程3,而且优先级介于进程1、2之间,就出现问题了。因为进程3比进程1优先级高,所以进程3可以抢占进程1,导致进程1持有实时互斥锁的时间延长,从而进程2的等待时间延长。这看起来就像是优先级低的进程3反而在优先级高的进程2之前执行,优先级发生了反转。
    解决措施是优先级继承。把进程1的优先级临时提高到进程2的优先级,防止进程3抢占进程1,使进程1尽快执行完临界区,减少进程2等待时间。

    原子操作

    原子操作是最简单的锁操作。它们保证简单的操作,通常用来保证整数的互斥访问,诸如计数器加1之类,可以不中断地原子执行。即使操作由几个汇编语句组成,也可以保证互斥性。

    进程间通信

    进程间通信方式,一般有pipe、fifo、signal、message、semaphore、shared memory、socket。这里只介绍下面两种。

    消息队列

    消息队列是消息的链表,包括 Posix 消息队列和 System V 消息队列。消息队列克服了信号承载信息量少、管道只能承载无格式字节流以及缓冲区大小受限等缺点,克服了早期 linux通信机制的一些缺点。消息队列将消息看作一个记录,具有特定的格式以及特定的优先级,对消息队列有写权限的进程可以向中按照一定的规则添加新消息。对消息队列有读权限的进程则可以从消息队列中读取消息,消息队列是随内核持续的。

    消息队列的实现

    /* one msg_msg structure for each message */
    struct msg_msg {
    	struct list_head m_list;
    	long m_type;
    	size_t m_ts;		/* message text size */
    	struct msg_msgseg *next;
    	void *security;
    	/* the actual message follows immediately */
    };
    
    /* one msq_queue structure for each present queue on the system */
    struct msg_queue {
    	struct kern_ipc_perm q_perm;
    	time_t q_stime;			/* last msgsnd time */
    	time_t q_rtime;			/* last msgrcv time */
    	time_t q_ctime;			/* last change time */
    	unsigned long q_cbytes;		/* current number of bytes on queue */
    	unsigned long q_qnum;		/* number of messages in queue */
    	unsigned long q_qbytes;		/* max number of bytes on queue */
    	pid_t q_lspid;			/* pid of last msgsnd */
    	pid_t q_lrpid;			/* last receive pid */
    
    	struct list_head q_messages;  //指向一个消息的链表(存放等待读取的消息)
    	struct list_head q_receivers;  //接收者链表(指向一个等待接收的进程(阻塞进程))
    	struct list_head q_senders;  //发送者链表(指向一个等待发送的进程(阻塞进程))
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    消息队列具体操作

    需要包含以下头文件

    #include 
    #include 
    #include  
    
    • 1
    • 2
    • 3

    主要介绍以下四个函数

    msgsnd发送消息

    在程序上层可以直接调用msgsnd(msqid,&msgs,sizeof(struct msgstru),IPC_NOWAIT) 这样的形式来发送消息,但是在底层是用以下的形式来调用

    /* 解释一下各参数:
     * msgid:消息队列标识符
     * msgp:发送给队列的消息
     * msgsz:要发送消息的大小
     * msgflg:一般为0,表示消息队列满时,该函数会阻塞,直到消息能写入该队列;如果为IPC_NOWAIT,则是以非阻塞方式发送消息
    */
    SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,
    		int, msgflg)
    {
    	long mtype;
    	if (get_user(mtype, &msgp->mtype))
    		return -EFAULT;
    	return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    这里需要先解释一下SYSCALL_DEFINE4。对于 SYSCALL_DEFINE4,这相当于是系统调用,首个变量用于函数名,剩下的偶数对参数,依次代表参数类型与参数变量。SYSCALL_DEFINEx,随后的 x 就是对于不同的参数的个数。所以,实际上是调用do_msgsnd函数。

    long do_msgsnd(int msqid, long mtype, void __user *mtext,
    		size_t msgsz, int msgflg)
    {
    	struct msg_queue *msq;
    	struct msg_msg *msg;
    	int err;
    	struct ipc_namespace *ns;
    	DEFINE_WAKE_Q(wake_q);
    
    	ns = current->nsproxy->ipc_ns;
    
    	if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
    		return -EINVAL;
    	if (mtype < 1)
    		return -EINVAL;
    
    	msg = load_msg(mtext, msgsz);
    	if (IS_ERR(msg))
    		return PTR_ERR(msg);
    
    	msg->m_type = mtype;
    	msg->m_ts = msgsz;
    
    	rcu_read_lock();
    	msq = msq_obtain_object_check(ns, msqid);
    	if (IS_ERR(msq)) {
    		err = PTR_ERR(msq);
    		goto out_unlock1;
    	}
    
    	ipc_lock_object(&msq->q_perm);
    
    	for (;;) {
    		struct msg_sender s;
    
    		err = -EACCES;
    		if (ipcperms(ns, &msq->q_perm, S_IWUGO))
    			goto out_unlock0;
    
    		/* raced with RMID? */
    		if (!ipc_valid_object(&msq->q_perm)) {
    			err = -EIDRM;
    			goto out_unlock0;
    		}
    
    		err = security_msg_queue_msgsnd(msq, msg, msgflg);
    		if (err)
    			goto out_unlock0;
    
    		if (msg_fits_inqueue(msq, msgsz))
    			break;
    
    		/* queue full, wait: */
    		if (msgflg & IPC_NOWAIT) {
    			err = -EAGAIN;
    			goto out_unlock0;
    		}
    
    		/* enqueue the sender and prepare to block */
    		ss_add(msq, &s, msgsz);
    
    		if (!ipc_rcu_getref(msq)) {
    			err = -EIDRM;
    			goto out_unlock0;
    		}
    
    		ipc_unlock_object(&msq->q_perm);
    		rcu_read_unlock();
    		schedule();
    
    		rcu_read_lock();
    		ipc_lock_object(&msq->q_perm);
    
    		ipc_rcu_putref(msq, msg_rcu_free);
    		/* raced with RMID? */
    		if (!ipc_valid_object(&msq->q_perm)) {
    			err = -EIDRM;
    			goto out_unlock0;
    		}
    		ss_del(&s);
    
    		if (signal_pending(current)) {
    			err = -ERESTARTNOHAND;
    			goto out_unlock0;
    		}
    
    	}
    
    	msq->q_lspid = task_tgid_vnr(current);
    	msq->q_stime = get_seconds();
    
    	if (!pipelined_send(msq, msg, &wake_q)) {
    		/* no one is waiting for this message, enqueue it */
    		list_add_tail(&msg->m_list, &msq->q_messages);
    		msq->q_cbytes += msgsz;
    		msq->q_qnum++;
    		atomic_add(msgsz, &ns->msg_bytes);
    		atomic_inc(&ns->msg_hdrs);
    	}
    
    	err = 0;
    	msg = NULL;
    
    out_unlock0:
    	ipc_unlock_object(&msq->q_perm);
    	wake_up_q(&wake_q);
    out_unlock1:
    	rcu_read_unlock();
    	if (msg != NULL)
    		free_msg(msg);
    	return err;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112

    msgget函数

    得到消息队列标识符或创建一个消息队列对象并返回消息队列标识符。

    SYSCALL_DEFINE2(msgget, key_t, key, int, msgflg)
    {
    	struct ipc_namespace *ns;
    	static const struct ipc_ops msg_ops = {
    		.getnew = newque,
    		.associate = msg_security,
    	};
    	struct ipc_params msg_params;
    
    	ns = current->nsproxy->ipc_ns;
    
    	msg_params.key = key;
    	msg_params.flg = msgflg;
    
    	return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    说明一下参数。
    key值:用于为消息队列生成(计算出)唯一的消息队列ID。我们可以指定三种形式的key值:

    1. 指定为IPC_PRIVATE宏,指定这个宏后,每次调用msgget时都会创建一个新的消息队列。如果你每次使用的必须是新消息队列的话,就可以指定这个,不过这个用的很少。因为一般来说,只要有一个消息队列可以用来通信就可以了,并不需要每次都创建一个全新的消息队列。
      2.可以自己指定一个整形数,但是容易重复指定。本来我想创建一个新的消息队列,结果我所指定的这个整形数,之前就已经被用于创建某个消息队列了,当我的指定重复时,msgget就不会创建新消息队列,而是使用的是别人之前就创建好的消息队列。很少使用这种方式
    2. 使用ftok函数来生成key。ftok通过指定路径名和一个整形数,就可以计算并返回一个唯一对应的key值,只要路径名和整形数不变,所对应的key值就唯一不变的。不过由于ftok只会使用整形数(proj_id)的低8位,因此我们往往会指定为一个ASCII码值,因为ASCII码值刚好是8位的整形数。

    msgflg值:指定创建时的原始权限,比如0664。创建一个新的消息队列时,除了原始权限,还需要指定IPC_CREAT选项。

    msgid = msgget(key, 0664|IPC_CREAT);
    
    • 1

    返回值

    • 成功:返回消息队列标识符(消息队列的ID)对于每一个创建好的消息队列来说,ID是固定的。
    • 失败:失败返回-1,并设置error。

    msgrcv函数

    接收消息,从消息队列中取出别人所放的某个编号的消息。

    SYSCALL_DEFINE5(msgrcv, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,
    		long, msgtyp, int, msgflg)
    {
    	return do_msgrcv(msqid, msgp, msgsz, msgtyp, msgflg, do_msg_fill);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    参数:

    • msqid:消息队列的标识符。
    • msgp:缓存地址,缓存用于存放所接收的消息。类型还是struct msgbuf,同上。注意结构体类型需与发送的消息类型一致,
    • msgsz:消息正文的大小
    • msgtyp:要接收消息的编号
    • msgflg:0:阻塞接收消息
      IPC_NOWAIT:非阻塞接收消息。也就是说没有消息时,该函数不阻塞,会因为读取失败而错误返回
      IPC_EXCEPT:与msgtyp配合使用,返回队列中第一个指定类的消息
      IPC_NOERROR:如果队列中满足条件的消息内容大于msgsz字节,则直接截断并返回

    返回值:成功返回消息正文的字节数;失败返回-1,errno被设置。

    msgctl函数

    根据cmd指定的要求,去控制消息队列。主要是获取消息队列的属性信息、修改消息队列的属性信息、删除消息队列等等。我们调用msgctl函数的最常见目的就是删除消息队列,事实上,删除消息队列只是各种消息队列控制中的一种。

    SYSCALL_DEFINE3(msgctl, int, msqid, int, cmd, struct msqid_ds __user *, buf)
    {
    	int version;
    	struct ipc_namespace *ns;
    
    	if (msqid < 0 || cmd < 0)
    		return -EINVAL;
    
    	version = ipc_parse_version(&cmd);
    	ns = current->nsproxy->ipc_ns;
    
    	switch (cmd) {
    	case IPC_INFO:
    	case MSG_INFO:
    	case MSG_STAT:	/* msqid is an index rather than a msg queue id */
    	case IPC_STAT:
    		return msgctl_nolock(ns, msqid, cmd, version, buf);
    	case IPC_SET:
    	case IPC_RMID:
    		return msgctl_down(ns, msqid, cmd, buf, version);
    	default:
    		return  -EINVAL;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    参数:

    • msqid:消息队列标识符
    • cmd:控制选项
    • buf:存放属性信息

    有的时候需要给第三个参数,有时不需要,取决于cmd的设置。

    共享内存

    共享内存就是允许两个或多个进程共享一定的存储区。就如同 malloc() 函数向不同进程返回了指向同一个物理内存区域的指针。当一个进程改变这块地址中内容的时候,其它进程都会察觉到这个更改。因为数据不需要在客户机和服务器端之间复制,数据直接写到内存,不用若干次数据拷贝,所以这是最快的一种IPC。备注:共享内存没有任何的同步与互斥机制,所以要使用信号量来实现对共享内存的存取的同步。共享内存是IPC通信中传输速度最快的通信方式没有之一。
    共享内存的生命周期与内核一致,即使所有访问共享内存区域对象的进程都已结束,共享内存区域对象仍然在内核中存在,除非显式删除共享内存区域对象。在内核重新引导之前,共享内存区域对象的任何改写将会保留,即共享内存区域对象的生命周期与系统内核的相同,且共享内存区域对象的作用域是整个系统内核的生命周期。
    共享内存原理结构图如下:
    在这里插入图片描述
    共享内存也有缺陷:共享内存并未提供同步机制,一个进程在对共享内存进行写操作结束之前,并没有机制可以阻止对另一个进程的读写操作。所以通常使用信号量来实现对共享内存同步访问的控制。

    共享内存常用函数

    //得到一个共享内存标识符或创建一个共享内存对象并返回共享内存标识符
    //成功:返回共享内存的标识符;出错:返回-1并设置errno
    int shmget(key_t key, // 0(IPC_PRIVATE):会建立新共享内存对象;大于0:视参数shmflg来确定操作。通常要求此值来源于ftok返回的IPC键值
        size_t size,  //大于0:新建的共享内存大小,以字节为单位;0:只获取共享内存时指定为0
        int shmflg);  //IPC_CREAT:如果内核中不存在键值与key相等的共享内存,则新建一个共享内存;如果存在这样的共享内存,返回此共享内存的标识符。IPC_CREAT|IPC_EXCL:如果内核中不存在键值与key相等的共享内存,则新建一个消息队列;如果存在这样的共享内存则报错
    
    • 1
    • 2
    • 3
    • 4
    • 5
    //连接共享内存标识符为shmid的共享内存,连接成功后把共享内存区对象映射到调用进程的地址空间,随后可像本地空间一样访问
    //成功:返回附加好的共享内存地址;出错返回-1并设置errno
    void *shmat(int shmid,   //共享内存标识符
        const void *shmaddr,  //指定共享内存出现在进程内存地址的什么位置,直接指定为NULL让内核自己决定一个合适的地址位置
        int shmflg);  //SHM_RDONLY:为只读模式,其他为读写模式
    
    • 1
    • 2
    • 3
    • 4
    • 5
    //是用来断开与共享内存附加点的地址,禁止本进程访问此片共享内存
    //成功:返回0;出错:返回-1并设置errno
    int shmdt(const void *shmaddr);  //连接的共享内存的起始地址
    
    • 1
    • 2
    • 3
    //完成对共享内存的控制
    //成功:返回0;出错:返回-1并设置errno
    int shmctl(int shmid,  //共享内存标识符
        int cmd,  //IPC_SET:改变共享内存的状态;IPC_RMID:删除这片共享内存
        struct shmid_ds *buf)  //共享内存管理结构体
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用方法

    简单举一个例子,说明一下使用方法。
    在c/s模型中
    服务器

    1. 创建共享内存区域
    2. 内存映射到当前内存
    3. 写入数据

    客户端

    1. 打开共享内存区域
    2. 内存映射到当前内存
    3. 读出数据
  • 相关阅读:
    IDEA下载后没有tomcat选项
    CubeMX RTOS Demo
    将时间序列转成图像——相对位置矩阵方法 Matlab实现
    Java通过Lambda表达式根据指定字段去除重复数据(集合去重)
    微服务框架 SpringCloud微服务架构 5 Nacos 5.4 NacosRule 负载均衡
    【MySql】数据库的聚合查询
    C++核心编程(三十四)容器(set/multiset)
    hologres基础知识一文全
    数据删掉了怎么恢复?数据删除后还能恢复吗
    C++ final用法简介
  • 原文地址:https://blog.csdn.net/m0_65931372/article/details/126109554