• linux 中 mq_notify 创建线程监控消息队列实现原理


    前言

    UNPV2 学习:Posix Message Queues 这篇文章中,我描述了 mq_notify 的不同处理机制,写博客的同时我也使用原书的附录代码在我本地 linux 环境中进行了测试。在本文中我将从 strace 跟踪新建线程监控队列消息示例 demo 开始,完整描述 mq_notify 工作的原理。

    strace mqnotifythread 进程启动过程

    mq_open("test1", O_RDONLY|O_NONBLOCK)   = 3
    mq_getsetattr(3, NULL, {mq_flags=O_NONBLOCK, mq_maxmsg=10, mq_msgsize=8192, mq_curmsgs=0}) = 0
    socket(AF_NETLINK, SOCK_RAW|SOCK_CLOEXEC, NETLINK_ROUTE) = 4
    ...................................................................................
    clone(child_stack=0x7fcf0d366ff0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tid=[51323], tls=0x7fcf0d367740, child_tidptr=0x7fcf0d367a10) = 51323
    ...................................................................................
    [pid 51323] recvfrom(4,  <unfinished ...>
    [pid 51322] <... futex resumed>)        = 0
    [pid 51322] mq_notify(3, {sigev_value={sival_int=1063272832, sival_ptr=0x7fff3f604180}, sigev_signo=SIGILL, sigev_notify=SIGEV_THREAD, sigev_notify_function=0xf, sigev_notify_attributes=0x5653b4ce809f}) = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    此进程首先创建了一个 Posix 消息队列,然后获取消息队列的属性得到消息大小 8192,此后的逻辑看上去与示例代码并无关系,它执行了如下过程:

    1. 创建了一个 NETLINK_ROUTE 类型的 netlink 套接字
    2. 创建了一个子线程 51323,子线程从第一步创建的 netlink 套接字上接收消息
    3. 主线程调用 mq_notify,但是 sigev_notify_attributes、sigev_notify_function 等多个属性的值并不是示例代码中设置的值

    分析示例代码不难猜到上述过程大概率是 mq_notify C 库函数中执行的逻辑,此函数创建了一个新的线程来执行用户函数,然而它阻塞在从 fd 4 接收消息上,猜测这是在等待内核通知,而 mq_notify 系统调用中对参数的修改也是与上述过程关联起来以提供给内核必要的信息,让内核能够在向空队列投递消息后正确通知到用户态线程

    strace 从消息队列中接收消息处理过程

    strace 新的进程,然后调用 mqsend 向队列中发送消息,得到如下信息:

    [pid 51323] <... recvfrom resumed>{{len=3033429792, type=0x5653 /* NLMSG_??? */, flags=0, seq=0, pid=0}, "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01"}, 32, MSG_WAITALL|MSG_NOSIGNAL, NULL, NULL) = 32
    ...................................................................................
    [pid 51323] clone(child_stack=0x7fcf0d143fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tid=[51399], tls=0x7fcf0d144700, child_tidptr=0x7fcf0d1449d0) = 51399
    ...................................................................................
    [pid 51323] recvfrom(4,  <unfinished ...>
    ...................................................................................
    [pid 51399] write(1, "notify_thread started\n", 22notify_thread started
    ) = 22
    [pid 51399] mq_notify(3, {sigev_value={sival_int=219430416, sival_ptr=0x7fcf0d143e10}, sigev_signo=SIGILL, sigev_notify=SIGEV_THREAD, sigev_notify_function=0x10, sigev_notify_attributes=NULL}) = 0
    [pid 51399] mq_timedreceive(3, "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 8192, NULL, NULL) = 1024
    [pid 51399] write(1, "read 1024 bytes\n", 16read 1024 bytes
    ) = 16
    [pid 51399] mq_timedreceive(3, 0x7fcf00000f70, 8192, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
    ...................................................................................
    [pid 51399] exit(0)                     = ?
    [pid 51399] +++ exited with 0 +++
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    recvfrom 系统调用成功接收到消息,从打印看消息是一个 netlink 消息,pid 为 0 表明消息来自内核。使用 nlmon 抓取 netlink 消息,获取到如下内容:

    Frame 48: 48 bytes on wire (384 bits), 48 bytes captured (384 bits)
    Linux netlink (cooked header)
        Link-layer address type: Netlink (824)
        Family: Route (0x0000)
    Linux rtnetlink (route netlink) protocol
        Netlink message header (type: 0x55ca)
            Length: 3033429792
            Message type: Unknown (21962)
            Flags: 0x0000
            Sequence: 0
            Port ID: 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    确认此消息是 type 为 0x55ca 的 netlink 路由消息,并设置了 MSG_WAITALL|MSG_NOSIGNAL 接收标志,但为什么会是路由消息?这里存在一些问题。

    成功接收到消息后创建一个新的线程,51323 线程继续执行 recvfrom 系统调用。新创建的 51399 线程调用示例程序中设置的回调函数,打印 notify_thread started 字符串,然后执行 mq_notify 重新注册监听队列消息事件,然后调用 mq_timedreceive 系统调用从消息队列中接收到 1024 字节长度的消息,当接收不到任何消息后执行 exit 销毁线程。

    mq_notify 创建线程执行函数的实现原理

    从 strace 跟踪的过程能够看出,mq_notify 会创建一个新的线程,内核通过发送 netlink 消息通知新线程事件到达,新线程接收到此消息后创建一个新的线程在新的线程中执行用户设置的回调函数处理然后销毁线程。

    那内核是如何知道该通知哪个目标线程呢?不难猜测内核通过获取到 netlink socket 的 fd 来关联到目标 socket,进一步关联到目标线程中,这部分细节隐藏在 libc 实现中,下面我从 libc 与内核代码这两个方面进行分析。

    libc 源码版本:glibc-2.31
    内核源码版本:5.10

    libc 代码实现

    mq_notify 中的代码:

    /* Construct the new request.  */
      struct sigevent se;
      se.sigev_notify = SIGEV_THREAD;
      se.sigev_signo = netlink_socket;
      se.sigev_value.sival_ptr = &data;
    
      /* Tell the kernel.  */
      int retval = INLINE_SYSCALL (mq_notify, 2, mqdes, &se);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    libc 的 mq_notify wrapper 函数中会重新填充一个 sigevent 结构作为 mq_notify 系统调用的参数,这是我们 strace 看到 mq_notify 系统调用参数变化的原因。

    注意这里 sigev_signo 设置为 netlink_socket 的值,在上文 strace 跟踪中,此值为 4, strace 显示 SIGILL 信号,此信号的值为 4

    netlink_socket 在 init_mq_netlink 函数中被创建,相关代码如下:

    /* Just a normal netlink socket, not bound.  */
    netlink_socket = __socket (AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, 0);
    
    • 1
    • 2

    能够看到它创建了一个普通的 NETLINK 套接字,上文中 nlmon 抓取到的 netlink 消息被解析为 route netlink 报文,从这里就可以看出应该是解析错误,这里使用的消息只是普通的消息
    netlink_socket 的信息就能够让内核关联到目标 socket,然后向这个 socket 发送 netlink 消息以通知消息到达,其它的参数似乎根本不需要,可显然不是这样。
    另外一个传递给内核的重要信息为 sigval_ptr,此信息指向一个 notify_data 结构,此结构的定义如下:

    /* Defined in the kernel headers: */
    #define NOTIFY_COOKIE_LEN       32      /* Length of the cookie used.  */
    ........................................................................
    /* Data structure for the queued notification requests.  */
    union notify_data
    {
      struct
      {
        void (*fct) (union sigval); /* The function to run.  */
        union sigval param;         /* The parameter to pass.  */
        pthread_attr_t *attr;       /* Attributes to create the thread with.  */
        /* NB: on 64-bit machines the struct as a size of 24 bytes.  Which means
           byte 31 can still be used for returning the status.  */
      };
      char raw[NOTIFY_COOKIE_LEN];
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    data 变量初始化代码如下:

    	memset (&data, '\0', sizeof (data));
      data.fct = notification->sigev_notify_function;
      data.param = notification->sigev_value;
    
    • 1
    • 2
    • 3

    data 结构的 fct 字段保存了用户传入的回调函数,对应示例程序中的 notify_thread 函数,param 中保存用户传入的 notification 结构的 sigev_value 值,同时当用户设置了 sigev_sigev_notify_attributes 的值时,此属性也会被拷贝到 data 结构的 attr 字段中保存。
    此后 init_mq_netlink 函数会创建一个新线程,此线程的入口函数为 helper_thread,其代码如下:

    static void *
    helper_thread (void *arg)
    {
      while (1)
        {
          union notify_data data;
    
          ssize_t n = __recv (netlink_socket, &data, sizeof (data),
                              MSG_NOSIGNAL | MSG_WAITALL);
          if (n < NOTIFY_COOKIE_LEN)
            continue;
    
          if (data.raw[NOTIFY_COOKIE_LEN - 1] == NOTIFY_WOKENUP)
            {
              /* Just create the thread as instructed.  There is no way to
                 report a problem with creating a thread.  */
              pthread_t th;
              if (__builtin_expect (pthread_create (&th, data.attr,
                                                    notification_function, &data)
                                    == 0, 0))
                /* Since we passed a pointer to DATA to the new thread we have
                   to wait until it is done with it.  */
                (void) __pthread_barrier_wait (&notify_barrier);
            }
    .........................................................................
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    注意这里会将接收到的 netlink 消息直接转化为一个 notify_data 结构,这进一步说明上文 nlmon 抓取到的报文为 route netlink 类型只是解析错误。

    helper_thread 函数判断 data.raw 里 NOTIFY_COOKIE_LEN 处的消息,解析到为唤醒事件时,以 notification_function 函数为入口函数创建一个新的线程并传入 notify_data 结构,然后通过 pthread_barrier 等待新创建的线程执行完成。
    notification_function 函数实现如下:

    /* The function used for the notification.  */
    static void *
    notification_function (void *arg)
    {
      /* Copy the function and parameter so that the parent thread can go
         on with its life.  */
      volatile union notify_data *data = (volatile union notify_data *) arg;
      void (*fct) (union sigval) = data->fct;
      union sigval param = data->param;
    
      /* Let the parent go.  */
      (void) __pthread_barrier_wait (&notify_barrier);
    
      /* Make the thread detached.  */
      (void) pthread_detach (pthread_self ());
    
    	.......................................................
    
      /* Now run the user code.  */
      fct (param);
      .......................................................
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    此函数首先调用 __pthread_barrier_wait 函数唤醒父线程,父线程被唤醒后继续执行 recvfrom 系统调用等待事件消息。此后此函数将当前线程 detach,最后以 notify_data 中的 param 为参数执行 notify_data 中保存的用户回调函数。

    实际此 param 的值为用户调用 mq_notify 时设置的 notification 结构的 sigev_value 值。

    内核代码

    mq_notify 系统调用可以分为如下几个关键处理流程:

    处理 SIGEV_THREAD 类型的 mq_notify 调用,创建 sk_buff 并使用用户态传入的 notification 结构中的 sigev_value 的 sigval_ptr 字段填充数据段,共计 NOTIFY_COOKIE_LEN 个大小(32 字节)。

    相关代码如下:

    			/* create the notify skb */
    			nc = alloc_skb(NOTIFY_COOKIE_LEN, GFP_KERNEL);
    			if (!nc)
    				return -ENOMEM;
    
    			if (copy_from_user(nc->data,
    					notification->sigev_value.sival_ptr,
    					NOTIFY_COOKIE_LEN)) {
    				ret = -EFAULT;
    				goto free_skb;
    			}
    
    			/* TODO: add a header? */
    			skb_put(nc, NOTIFY_COOKIE_LEN);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    此后获取 notification 结构中的 sigev_signo 字段保存的 netlink 套接字的 fd,获取对应的 file 结构得到 sock 结构,然后调用 netlink_attachskb 函数,检查接收者的接收 buffer 是否能够容纳要接收的 sk buffer,如果不能,则休眠等待内存可用,内存可用后将报文的关联到此 sock 结构上。

    相关代码如下:

    retry:
    			f = fdget(notification->sigev_signo);
    			...................................
    			sock = netlink_getsockbyfilp(f.file);
    			fdput(f);
    			...................................
    
    			timeo = MAX_SCHEDULE_TIMEOUT;
    			ret = netlink_attachskb(sock, nc, &timeo, NULL);
    			if (ret == 1) {
    				sock = NULL;
    				goto retry;
    			}
    			....................................
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    最后根据 sigev_notify 的类型将必要的数据保存到消息队列的 info 字段中,主要有两种不同类型的保存过程:

    1. SIGEV_THREAD

      需要保存用户态创建的 netlink 在内核中对应的 sock 结构地址与创建的报文地址以及 sigev_notify 类型

    2. SIGEV_SIGNAL

      需要保存 sigev_signo、sigev_value、elf_exec_id 以及 sigev_notify 类型

    最后保存 notify_owner 为当前进程的 pid,并保存 notify_user_ns 为当前进程的 namespace,并更新消息队列 inode 的 atime 与 ctime。

    相关代码如下:

    switch (notification->sigev_notify) {
    .....................................
    		case SIGEV_THREAD:
    			info->notify_sock = sock;
    			info->notify_cookie = nc;
    			sock = NULL;
    			nc = NULL;
    			info->notify.sigev_notify = SIGEV_THREAD;
    			break;
    		case SIGEV_SIGNAL:
    			info->notify.sigev_signo = notification->sigev_signo;
    			info->notify.sigev_value = notification->sigev_value;
    			info->notify.sigev_notify = SIGEV_SIGNAL;
    			info->notify_self_exec_id = current->self_exec_id;
    			break;
    		}
    
    		info->notify_owner = get_pid(task_tgid(current));
    		info->notify_user_ns = get_user_ns(current_user_ns());
    		inode->i_atime = inode->i_ctime = current_time(inode);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    当用户调用 mq_send 投递消息到队列中时,内核会在调用子函数 do_mq_timedsend 时判断必要的条件执行 __do_notify 函数发送事件,相关代码如下:

    		if (receiver) {
    			pipelined_send(&wake_q, info, msg_ptr, receiver);
    		} else {
    			/* adds message to the queue */
    			ret = msg_insert(msg_ptr, info);
    			if (ret)
    				goto out_unlock;
    			__do_notify(info);
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里首先判断是否有及接受者,如果有则不会产生事件,符合 UNPV2 学习:Posix Message Queues 中提到的 mq_notify 使用的第四条规则。

    __do_notify 根据 mqueue_inode_info 信息分发到不同的逻辑上执行,此 info 结构在 mq_notify 中被填充。

    SIGEV_SIGNAL 方式内核代码如下:

    		case SIGEV_SIGNAL: {
    			struct kernel_siginfo sig_i;
    			struct task_struct *task;
    			...................................................
    			task = pid_task(info->notify_owner, PIDTYPE_TGID);
    			if (task && task->self_exec_id ==
    						info->notify_self_exec_id) {
    				do_send_sig_info(info->notify.sigev_signo,
    						&sig_i, task, PIDTYPE_TGID);
    			}
    			...................................................
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    核心逻辑为向 notify_owner 进程发送 sigev_signo 标识的信号。

    SIGEV_THREAD 方式内核处理代码如下:

    	static inline void set_cookie(struct sk_buff *skb, char code)
    	{
    		((char *)skb->data)[NOTIFY_COOKIE_LEN-1] = code;
    	}
    		
    	case SIGEV_THREAD:
    			set_cookie(info->notify_cookie, NOTIFY_WOKENUP);
    			netlink_sendskb(info->notify_sock, info->notify_cookie);
    			break;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    首先调用 set_cookie 将 skb buff data 的 NOTIFY_COOK_LEN 处的内存值设置为 NOTIFY_WOKENUP,然后调用 netlink_sendskb 将报文发向用户态。用户态接收到此消息后将此消息直接转化为一个 notify_data 结构,并以此结构为参数创建一个新的线程在新线程中执行用户注册的回调函数就完成了全部的过程化。
    最后内核重置 notify,reset 相关字段,相关代码如下:

    		put_pid(info->notify_owner);
    		put_user_ns(info->notify_user_ns);
    		info->notify_owner = NULL;
    		info->notify_user_ns = NULL;
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    手记系列之二 ----- 关于IDEA的一些使用方法经验
    25. Java 锁机制之 Condition 接口
    用DIV+CSS技术制作一个简单的网页 我的家乡主题
    【SQL语法基础】什么是SQL的聚集函数,如何利用它们汇总表的数据?
    WPF控件(三)
    最近两周出去面试遇到的面试题(前端)
    MFC入门基础(十一)控件编程示例
    [异构图-论文阅读]Heterogeneous Graph Transformer
    【DL】使用神经网络进行序列到序列学习
    想要彻底搞的性能优化,得先从底层逻辑开始了解~
  • 原文地址:https://blog.csdn.net/Longyu_wlz/article/details/128169025