• Postgresql源码(75)notify与listen执行流程分析


    相关
    《Postgresql源码(60)事务系统总结》
    《Postgresql源码(75)notify与listen执行流程分析》

    顺着看事务提交时发现PG有异步消息队列的功能,这里试着分析总结。

    0 总结速查

    两句话总结:

    • notify将msg追加到slru消息队列,发信号通知。
    • listen注册监听人backend到监听队列,每个监听者消费,并自己记录消费位置。

    Listen监听:

    • CommitTransaction->PreCommit_Notifybackend事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。
    • 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。
    • 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend+db,只能使用一个位置。
    • 监听队列是SLRU结构,所以指向监听队列的指针为{page, offset}

    在这里插入图片描述

    notify通知:

    • DDL记录通知信息(不通知)。
    • CommitTransaction --> PreCommit_Notify事务提交时将记录的notify追加到消息队列。
    • CommitTransaction --> AtCommit_Notify事务提交时kill sigusr1通知其他进程。

    消息队列

    • 使用通用SLRU结构,参考之前写过的SLRU页面分析(CLOG、SUBTRANS等)。
    • 总控结构AsyncQueueControl->head端新增,AsyncQueueControl->tail端消费。
    • 消息队列虽然使用SLRU结构,但不保证持久化,只是在内存页面不够用的时候,用LRU换出到磁盘。和CLOG不同,CLOG在checkpoint时fsync刷盘,消息队列不在CheckPointGuts中。
      在这里插入图片描述

    1 背景

    Listen:

    1. 监听语句如果在事务内,listen执行后不能拿到通知信息,必须等待事务提交;注意事务提交后,会拿到所有listen语句后的通知。
    2. 监听必须在notify之前,如果notify时没有监听,消息收不到。
    3. 监听如果在psql执行,只在任何语句执行完时收到通知,如没有语句执行不会收到通知。
    4. 监听如果使用API,例如libpq的PQnotifies函数,可以立即收到通知进行处理。

    Notify:

    1. 通知语句的事务必须提交才会生效。
    2. 通知是异步的,记录在队列中,每次监听会收到队列中累加的所有消息,PG保证收到的顺序和发送顺序一致。

    2 使用案例

    2.1 PSQL

    -- session 1
    postgres=# listen ch1;
    LISTEN
    
    -- session 2
    postgres=# listen ch1;
    LISTEN
    
    -- session 3
    postgres=# notify ch1;
    NOTIFY
    postgres=# notify ch1;
    NOTIFY
    
    -- session 1
    postgres=# select 1;
     ?column? 
    ----------
            1
    (1 row)
    
    Asynchronous notification "ch1" received from server process with PID 1837.
    Asynchronous notification "ch1" received from server process with PID 1837.
    
    -- session 2
    postgres=# select 1;
     ?column? 
    ----------
            1
    (1 row)
    
    Asynchronous notification "ch1" received from server process with PID 1837.
    Asynchronous notification "ch1" received from server process with PID 1837.
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2.2 LIBPQ使用案例

    https://www.postgresql.org/docs/14/libpq-example.html#LIBPQ-EXAMPLE-2

    3 内核代码分析

    3.1 listen监听

    先放总结

    • backend在事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。

    • 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。

    • 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend+db,只能使用一个位置。

    • 监听队列是SLRU结构,所以指向监听队列的指针为{page, offset}

    在这里插入图片描述

    Async_Listen进入位置

    exec_simple_query
      PortalRun
        PortalRunMulti
          PortalRunUtility
            ProcessUtility
              standard_ProcessUtility
                Async_Listen
                  queue_listen
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    listen属于DDL,也是跟着事务提交才会生效,所以函数调用嵌在事务系统中。

    listen调用Async_Listen登记Listen信息,只把action(三种类型:listen、unlisten、unlisten all)记录在pendingActions中。
    在这里插入图片描述

    在语句结尾的事务状态机流转函数中,如果是事务提交状态,会走入CommitTransaction进行事务提交的具体工作。
    在这里插入图片描述

    在事务提交时,调用PreCommit_Notify函数:

    void
    PreCommit_Notify(void)
    {
        ...
    	if (pendingActions != NULL)
    	{
    		foreach(p, pendingActions->actions)
    		{
    			ListenAction *actrec = (ListenAction *) lfirst(p);
    
    			switch (actrec->action)
    			{
    				case LISTEN_LISTEN:
    					Exec_ListenPreCommit();
    					break;
    				case LISTEN_UNLISTEN:
    					/* there is no Exec_UnlistenPreCommit() */
    					break;
    				case LISTEN_UNLISTEN_ALL:
    					/* there is no Exec_UnlistenAllPreCommit() */
    					break;
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    异步队列的数据结构

    typedef struct AsyncQueueControl
    {
    	QueuePosition head;			/* head points to the next free location */
    	QueuePosition tail;			/* tail must be <= the queue position of every
    								 * listening backend */
    	int			stopPage;		/* oldest unrecycled page; must be <=
    								 * tail.page */
    	BackendId	firstListener;	/* id of first listener, or InvalidBackendId */
    	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
    	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
    	/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
    } AsyncQueueControl;
    
    typedef struct QueueBackendStatus
    {
    	int32		pid;			/* either a PID or InvalidPid */
    	Oid			dboid;			/* backend's database OID, or InvalidOid */
    	BackendId	nextListener;	/* id of next listener, or InvalidBackendId */
    	QueuePosition pos;			/* backend has read queue up to here */
    } QueueBackendStatus;
    
    typedef struct QueuePosition
    {
    	int			page;			/* SLRU page number */
    	int			offset;			/* byte offset within page */
    } QueuePosition;
    
    static AsyncQueueControl *asyncQueueControl;
    
    #define QUEUE_HEAD					(asyncQueueControl->head)
    #define QUEUE_TAIL					(asyncQueueControl->tail)
    #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
    #define QUEUE_FIRST_LISTENER		(asyncQueueControl->firstListener)
    #define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
    #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
    #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
    #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    完成监听过程

    注意拿到的消费起点位置是:max(控制结构记录的TAIL,其他所有进程消费到的最新位置)

    Exec_ListenPreCommit
      if (amRegisteredListener)
        return;
    
    	head = QUEUE_HEAD;
    	max = QUEUE_TAIL;
    	prevListener = InvalidBackendId;
    	for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    	{
    		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    		    // 拿到消费位置,从全局信息取QUEUE_TAIL,或从每个backend消费到的最大位置取。
    			max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
    		/* Also find last listening backend before this one */
    		if (i < MyBackendId)
    			prevListener = i;
    	}
    	QUEUE_BACKEND_POS(MyBackendId) = max;
    	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
    	QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
    	// 后插入监听队列
    	if (prevListener > 0)
    	{
    		QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
    		QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
    	}
    	else
    	{
    		QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
    		QUEUE_FIRST_LISTENER = MyBackendId;
    	}
    	LWLockRelease(NotifyQueueLock);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3.2 notify通知

    第一步:DDL记录通知信息(不通知)

    Async_Notify
      // 拼接 Notification n = {channel_len = 3, payload_len = 0, data = 0x2a0ab84 "ch1"}
      // 挂在 pendingNotifies->events后 = list_make1(n)
    
    • 1
    • 2
    • 3

    第二步:PreCommit_Notify事务提交时append to 消息队列

    CommitTransaction --> PreCommit_Notify --> asyncQueueAddEntries

    static ListCell *
    asyncQueueAddEntries(ListCell *nextNotify)
    {
    	AsyncQueueEntry qe;
    	QueuePosition queue_head;
    	int			pageno;
    	int			offset;
    	int			slotno;
    
    	/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
    	LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    SLRU标准接口拿消息队列页面SimpleLruZeroPage

    	queue_head = QUEUE_HEAD;
    	pageno = QUEUE_POS_PAGE(queue_head);
    	if (QUEUE_POS_IS_ZERO(queue_head))
    		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
    	else
    		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    								   InvalidTransactionId);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    使用slru标准结构,会刷脏落盘。

    	NotifyCtl->shared->page_dirty[slotno] = true;
    
    	while (nextNotify != NULL)
    	{
    		Notification *n = (Notification *) lfirst(nextNotify);
    
    		/* Construct a valid queue entry in local variable qe */
    		asyncQueueNotificationToEntry(n, &qe);
    
    		offset = QUEUE_POS_OFFSET(queue_head);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    当前页面能装得下,可以把nextNotify指向下一条了。

    		if (offset + qe.length <= QUEUE_PAGESIZE)
    		{
    			/* OK, so advance nextNotify past this item */
    			nextNotify = lnext(pendingNotifies->events, nextNotify);
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当前页面装不下,length把剩下的装满,dboid=InvalidOid用于标记无效。

    		else
    		{
    			qe.length = QUEUE_PAGESIZE - offset;
    			qe.dboid = InvalidOid;
    			qe.data[0] = '\0';	/* empty channel */
    			qe.data[1] = '\0';	/* empty payload */
    		}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    拷贝qe到消息队列中:

    typedef struct AsyncQueueEntry
    {
    	int			length;			/* total allocated length of entry */
    	Oid			dboid;			/* sender's database OID */
    	TransactionId xid;			/* sender's XID */
    	int32		srcPid;			/* sender's PID */
    	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
    } AsyncQueueEntry;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    开始拷贝

    		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
    			   &qe,
    			   qe.length);
    
    
    • 1
    • 2
    • 3
    • 4

    推进head指针

    		/* Advance queue_head appropriately, and detect if page is full */
    		if (asyncQueueAdvance(&(queue_head), qe.length))
    		{
    			...
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    控制结构记录head指针asyncQueueControl->head = queue_head

    	/* Success, so update the global QUEUE_HEAD */
    	QUEUE_HEAD = queue_head;
    
    	LWLockRelease(NotifySLRULock);
    
    	return nextNotify;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    第三步:AtCommit_Notify事务提交时通知其他进程

    AtCommit_Notify
      SignalBackends
        // 查询asyncQueueControl->backend监听数组,找到监听者
        // 例如两个监听者: 
        // count = 2
        // p pids[0] = 15446
        // p pids[1] = 23101
    
        // SendProcSignal(15446, PROCSIG_NOTIFY_INTERRUPT)
        // SendProcSignal(23101, PROCSIG_NOTIFY_INTERRUPT)  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    第四步:(监听进程)被信号SIGUSR1中断,进入procsignal_sigusr1_handler

    procsignal_sigusr1_handler
      if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
        HandleNotifyInterrupt()
    • 1
    • 2
    • 3

    HandleNotifyInterrupt函数配置标志位后就退出。notifyInterruptPending = true;

    等流程走到信号处理函数在做处理。

  • 相关阅读:
    Express-01
    OpenCV3-颜色模型与转换-通道分离与合并
    Java回顾-网络编程
    一文读懂简单查询代价估算
    手机拍照转机器人末端坐标(九点标定法)
    设计模式之迭代器模式
    软件测试进阶篇----自动化测试概述
    Tomcat的启动问题
    如何用IDEA创建SpringBoot项目
    vue3面试题
  • 原文地址:https://blog.csdn.net/jackgo73/article/details/126485122