• postgres 源码解析34 进程间通信--2


      本小节将从源码角度重点讲解postgres中共享内存和信号量创建,其接口函数为CreateSharedMemoryAndSemaphores。postmaster调用此函数时会初始化共享内存和信号量,而其他进程调用不进行初始化,仅获取全局变量:共享内存指针和信号量指针。关键数据结构知识回顾见 postgres 源码解析33 进程间通信–1

    执行流程

    1) 计算共享内存总大小
    2 )为各个模块分配相应大小的内存空间
    3)初始化共享内存头指针

    4)初始化各模块内存空间
    5)启动动态共享内存设施

    本文讲解蓝色字体模块逻辑

    主函数

    1 计算共享内存总大小

    /*
     * CreateSharedMemoryAndSemaphores
     *		Creates and initializes shared memory and semaphores.
     *
     * This is called by the postmaster or by a standalone backend.
     * It is also called by a backend forked from the postmaster in the
     * EXEC_BACKEND case.  In the latter case, the shared memory segment
     * already exists and has been physically attached to, but we have to
     * initialize pointers in local memory that reference the shared structures,
     * because we didn't inherit the correct pointer values from the postmaster
     * as we do in the fork() scenario.  The easiest way to do that is to run
     * through the same code as before.  (Note that the called routines mostly
     * check IsUnderPostmaster, rather than EXEC_BACKEND, to detect this case.
     * This is a bit code-wasteful and could be cleaned up.)
     */
    void
    CreateSharedMemoryAndSemaphores(void)
    {
    	PGShmemHeader *shim = NULL;
    
    	if (!IsUnderPostmaster)
    	{
    		PGShmemHeader *seghdr;
    		Size		size;
    		int			numSemas;
    
    		/* Compute number of semaphores we'll need */
    		numSemas = ProcGlobalSemas();					// Pro信号量,每个进程都持有 
    		numSemas += SpinlockSemas();					// 自旋锁构建的信号量大小
    
    		/*
    		 * Size of the Postgres shared-memory block is estimated via
    		 * moderately-accurate estimates for the big hogs, plus 100K for the
    		 * stuff that's too small to bother with estimating.
    		 *
    		 * We take some care during this phase to ensure that the total size
    		 * request doesn't overflow size_t.  If this gets through, we don't
    		 * need to be so careful during the actual allocation phase.
    		 */
    		 // 计算各模块共享内存空间大小 + 100 K 以囊括一些过小的内存
    		size = 100000;
    		size = add_size(size, PGSemaphoreShmemSize(numSemas));			// 信号量大小
    		size = add_size(size, SpinlockSemaSize());						// spinlock信号量
    		size = add_size(size, hash_estimate_size(SHMEM_INDEX_SIZE,		// 哈希索引表大小【快速定位各模块的内存区域】
    												 sizeof(ShmemIndexEnt)));
    		size = add_size(size, dsm_estimate_size());						// 动态共享内存大小
    		size = add_size(size, BufferShmemSize());						// buffer共享内存大小
    		size = add_size(size, LockShmemSize());							// Lock模块共享内存大小
    		size = add_size(size, PredicateLockShmemSize());				///谓词锁共享内存大小
    		size = add_size(size, ProcGlobalShmemSize());					// 全局共享ProcGlobal内存大小
    		size = add_size(size, XLOGShmemSize());							// XLOG模块共享内存大小
    		size = add_size(size, CLOGShmemSize());							// XLOG模块共享内存大小
    		size = add_size(size, CommitTsShmemSize());						// CommitTs模块共享内存大小
    		size = add_size(size, SUBTRANSShmemSize());						// 子事务模块共享内存大小
    		size = add_size(size, TwoPhaseShmemSize());						// 两阶段事务模块共享内存大小
    		size = add_size(size, BackgroundWorkerShmemSize());				// bgworker模块共享内存大小
    		size = add_size(size, MultiXactShmemSize());					// multi事务模块共享内存大小
    		size = add_size(size, LWLockShmemSize());						// LWlock模块共享内存大小
    		size = add_size(size, ProcArrayShmemSize());					// Proc数组共享内存大小
    		size = add_size(size, BackendStatusShmemSize());				// backendstatus模块共享内存大小
    		size = add_size(size, SInvalShmemSize());						// 无效消息模块共享内存大小
    		size = add_size(size, PMSignalShmemSize());						// PM信号模块共享内存大小
    		size = add_size(size, ProcSignalShmemSize());					// 进程信号模块共享内存大小
    		size = add_size(size, CheckpointerShmemSize());					// 检查点模块共享内存大小
    		size = add_size(size, AutoVacuumShmemSize());					// 自动清理进程模块共享内存大小
    		size = add_size(size, ReplicationSlotsShmemSize());				// 复制槽模块共享内存大小
    		size = add_size(size, ReplicationOriginShmemSize());			// 逻辑复制模块共享内存大小
    		size = add_size(size, WalSndShmemSize());						// walsender模块共享内存大小
    		size = add_size(size, WalRcvShmemSize());						// walreceiver模块共享内存大小
    		size = add_size(size, PgArchShmemSize());						// 归档模块共享内存大小
    		size = add_size(size, ApplyLauncherShmemSize());				// applyluncher模块共享内存大小
    		size = add_size(size, SnapMgrShmemSize());						// 快照管理器模块共享内存大小
    		size = add_size(size, BTreeShmemSize());						// BTree模块共享内存大小
    		size = add_size(size, SyncScanShmemSize());						// 同步扫描
    		size = add_size(size, AsyncShmemSize());						// 异步通知  listen/notify/unlisten
    #ifdef EXEC_BACKEND
    		size = add_size(size, ShmemBackendArraySize());
    #endif
    
    		/* freeze the addin request size and include it */
    		addin_request_allowed = false;
    		size = add_size(size, total_addin_request);
    
    		/* might as well round it off to a multiple of a typical page size */
    		size = add_size(size, 8192 - (size % 8192));			// 向上取整, 8K 对齐
    
    		elog(DEBUG3, "invoking IpcMemoryCreate(size=%zu)", size);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    2 PGSharedMemoryCreate
    1)首先进行安全性检查,如DataDir目录头信息,申请内存大小大于 PGShmemHeader结构体大小等;
    2)根据共享内存类型shared_memory_type调用对应的方法申请共享内存,如SHMEM_TYPE_MMAP则调用CreateAnonymousSegment申请匿名内存段;
    3)紧接着为指定的 IPC key分配一个新的内存段,将这块内存段 Attach在当前进程,注册on_shmem_exit 回调函数释放此存储空间;
    4)初始化标准头信息 PGShmemHeader,返回申请内存的头地址。

    /*
     * PGSharedMemoryCreate
     *
     * Create a shared memory segment of the given size and initialize its
     * standard header.  Also, register an on_shmem_exit callback to release
     * the storage.
     *
     // 创建指定大小的共享内存段,并初始化标准头信息。同时注册 on_shmem_exit 回调函数释放此存储空间
     * Dead Postgres segments pertinent to this DataDir are recycled if found, but
     * we do not fail upon collision with foreign shmem segments.  The idea here
     * is to detect and re-use keys that may have been assigned by a crashed
     * postmaster or backend.
     */
    PGShmemHeader *
    PGSharedMemoryCreate(Size size,
    					 PGShmemHeader **shim)
    {
    	IpcMemoryKey NextShmemSegID;
    	void	   *memAddress;
    	PGShmemHeader *hdr;
    	struct stat statbuf;
    	Size		sysvsize;
    
    	/*
    	 * We use the data directory's ID info (inode and device numbers) to
    	 * positively identify shmem segments associated with this data dir, and
    	 * also as seeds for searching for a free shmem key.
    	 */
    	if (stat(DataDir, &statbuf) < 0)
    		ereport(FATAL,
    				(errcode_for_file_access(),
    				 errmsg("could not stat data directory \"%s\": %m",
    						DataDir)));
    
    	/* Complain if hugepages demanded but we can't possibly support them */
    #if !defined(MAP_HUGETLB)
    	if (huge_pages == HUGE_PAGES_ON)
    		ereport(ERROR,
    				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    				 errmsg("huge pages not supported on this platform")));
    #endif
    
    	/* Room for a header? */
    	Assert(size > MAXALIGN(sizeof(PGShmemHeader)));
    
    	if (shared_memory_type == SHMEM_TYPE_MMAP)
    	{
    		AnonymousShmem = CreateAnonymousSegment(&size);
    		AnonymousShmemSize = size;
    
    		/* Register on-exit routine to unmap the anonymous segment */
    		on_shmem_exit(AnonymousShmemDetach, (Datum) 0);
    
    		/* Now we need only allocate a minimal-sized SysV shmem block. */
    		sysvsize = sizeof(PGShmemHeader);
    	}
    	else
    		sysvsize = size;
    
    	/*
    	 * Loop till we find a free IPC key.  Trust CreateDataDirLockFile() to
    	 * ensure no more than one postmaster per data directory can enter this
    	 * loop simultaneously.  (CreateDataDirLockFile() does not entirely ensure
    	 * that, but prefer fixing it over coping here.)
    	 */
    	NextShmemSegID = statbuf.st_ino;
    
    	for (;;)
    	{
    		IpcMemoryId shmid;
    		PGShmemHeader *oldhdr;
    		IpcMemoryState state;
    
    		/* Try to create new segment */
    		memAddress = InternalIpcMemoryCreate(NextShmemSegID, sysvsize);
    		if (memAddress)
    			break;				/* successful create and attach */
    
    		/* Check shared memory and possibly remove and recreate */
    
    		/*
    		 * shmget() failure is typically EACCES, hence SHMSTATE_FOREIGN.
    		 * ENOENT, a narrow possibility, implies SHMSTATE_ENOENT, but one can
    		 * safely treat SHMSTATE_ENOENT like SHMSTATE_FOREIGN.
    		 */
    		shmid = shmget(NextShmemSegID, sizeof(PGShmemHeader), 0);
    		if (shmid < 0)
    		{
    			oldhdr = NULL;
    			state = SHMSTATE_FOREIGN;
    		}
    		else
    			state = PGSharedMemoryAttach(shmid, NULL, &oldhdr);
    
    		switch (state)
    		{
    			case SHMSTATE_ANALYSIS_FAILURE:
    			case SHMSTATE_ATTACHED:
    				ereport(FATAL,
    						(errcode(ERRCODE_LOCK_FILE_EXISTS),
    						 errmsg("pre-existing shared memory block (key %lu, ID %lu) is still in use",
    								(unsigned long) NextShmemSegID,
    								(unsigned long) shmid),
    						 errhint("Terminate any old server processes associated with data directory \"%s\".",
    								 DataDir)));
    				break;
    			case SHMSTATE_ENOENT:
    
    				/*
    				 * To our surprise, some other process deleted since our last
    				 * InternalIpcMemoryCreate().  Moments earlier, we would have
    				 * seen SHMSTATE_FOREIGN.  Try that same ID again.
    				 */
    				elog(LOG,
    					 "shared memory block (key %lu, ID %lu) deleted during startup",
    					 (unsigned long) NextShmemSegID,
    					 (unsigned long) shmid);
    				break;
    			case SHMSTATE_FOREIGN:
    				NextShmemSegID++;
    				break;
    			case SHMSTATE_UNATTACHED:
    
    				/*
    				 * The segment pertains to DataDir, and every process that had
    				 * used it has died or detached.  Zap it, if possible, and any
    				 * associated dynamic shared memory segments, as well.  This
    				 * shouldn't fail, but if it does, assume the segment belongs
    				 * to someone else after all, and try the next candidate.
    				 * Otherwise, try again to create the segment.  That may fail
    				 * if some other process creates the same shmem key before we
    				 * do, in which case we'll try the next key.
    				 */
    				if (oldhdr->dsm_control != 0)
    					dsm_cleanup_using_control_segment(oldhdr->dsm_control);
    				if (shmctl(shmid, IPC_RMID, NULL) < 0)
    					NextShmemSegID++;
    				break;
    		}
    
    		if (oldhdr && shmdt(oldhdr) < 0)
    			elog(LOG, "shmdt(%p) failed: %m", oldhdr);
    	}
    
    	/* Initialize new segment. */
    	hdr = (PGShmemHeader *) memAddress;
    	hdr->creatorPID = getpid();
    	hdr->magic = PGShmemMagic;
    	hdr->dsm_control = 0;
    
    	/* Fill in the data directory ID info, too */
    	hdr->device = statbuf.st_dev;
    	hdr->inode = statbuf.st_ino;
    
    	/*
    	 * Initialize space allocation status for segment.
    	 */
    	hdr->totalsize = size;
    	hdr->freeoffset = MAXALIGN(sizeof(PGShmemHeader));
    	*shim = hdr;
    
    	/* Save info for possible future use */
    	UsedShmemSegAddr = memAddress;
    	UsedShmemSegID = (unsigned long) NextShmemSegID;
    
    	/*
    	 * If AnonymousShmem is NULL here, then we're not using anonymous shared
    	 * memory, and should return a pointer to the System V shared memory
    	 * block. Otherwise, the System V shared memory block is only a shim, and
    	 * we must return a pointer to the real block.
    	 */
    	if (AnonymousShmem == NULL)
    		return hdr;
    	memcpy(AnonymousShmem, hdr, sizeof(PGShmemHeader));
    	return (PGShmemHeader *) AnonymousShmem;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176

    3 InitShmemAccess
    该函数负责初始化全局的共享内存指针,方便后续各模块内存初始化调用

    /*
     *	InitShmemAccess() --- set up basic pointers to shared memory.
     *
     * Note: the argument should be declared "PGShmemHeader *seghdr",
     * but we use void to avoid having to include ipc.h in shmem.h.
     */
    void
    InitShmemAccess(void *seghdr)
    {
    	PGShmemHeader *shmhdr = (PGShmemHeader *) seghdr;
    
    	ShmemSegHdr = shmhdr;
    	ShmemBase = (void *) shmhdr;
    	ShmemEnd = (char *) ShmemBase + shmhdr->totalsize;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4 PGReserveSemaphores
    该函数用于初始化信号量模块的共享内存,以支持进程间通信

    /*
     * PGReserveSemaphores --- initialize semaphore support
     *
     * This is called during postmaster start or shared memory reinitialization.
     * It should do whatever is needed to be able to support up to maxSemas
     * subsequent PGSemaphoreCreate calls.  Also, if any system resources
     * are acquired here or in PGSemaphoreCreate, register an on_shmem_exit
     * callback to release them.
     *
     * In the SysV implementation, we acquire semaphore sets on-demand; the
     * maxSemas parameter is just used to size the arrays.  There is an array
     * of PGSemaphoreData structs in shared memory, and a postmaster-local array
     * with one entry per SysV semaphore set, which we use for releasing the
     * semaphore sets when done.  (This design ensures that postmaster shutdown
     * doesn't rely on the contents of shared memory, which a failed backend might
     * have clobbered.)
     */
    void
    PGReserveSemaphores(int maxSemas)
    {
    	struct stat statbuf;
    
    	/*
    	 * We use the data directory's inode number to seed the search for free
    	 * semaphore keys.  This minimizes the odds of collision with other
    	 * postmasters, while maximizing the odds that we will detect and clean up
    	 * semaphores left over from a crashed postmaster in our own directory.
    	 */
    	if (stat(DataDir, &statbuf) < 0)
    		ereport(FATAL,
    				(errcode_for_file_access(),
    				 errmsg("could not stat data directory \"%s\": %m",
    						DataDir)));
    
    	/*
    	 * We must use ShmemAllocUnlocked(), since the spinlock protecting
    	 * ShmemAlloc() won't be ready yet.  (This ordering is necessary when we
    	 * are emulating spinlocks with semaphores.)
    	 */
    	 // 在共享内存段中申请指定带大小的信号量区域,并初始化全局信号量字段信息
    	sharedSemas = (PGSemaphore)
    		ShmemAllocUnlocked(PGSemaphoreShmemSize(maxSemas));
    	numSharedSemas = 0;
    	maxSharedSemas = maxSemas;
    
    	maxSemaSets = (maxSemas + SEMAS_PER_SET - 1) / SEMAS_PER_SET;
    	mySemaSets = (IpcSemaphoreId *)
    		malloc(maxSemaSets * sizeof(IpcSemaphoreId));
    	if (mySemaSets == NULL)
    		elog(PANIC, "out of memory");
    	numSemaSets = 0;
    	nextSemaKey = statbuf.st_ino;
    	nextSemaNumber = SEMAS_PER_SET; /* force sema set alloc on 1st call */
    
    	// 注册 on_shmem_exit回调函数,释放信号量
    	on_shmem_exit(ReleaseSemaphores, 0);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    5 SpinlockSemaInit
    该函数用于实现SpinLock信号量初始化,一些平台支持Spinlock

    /*
     * Initialize spinlock emulation.
     *
     * This must be called after PGReserveSemaphores().
     */
    void
    SpinlockSemaInit(void)
    {
    	PGSemaphore *spinsemas;
    	int			nsemas = SpinlockSemas();
    	int			i;
    
    	/*
    	 * We must use ShmemAllocUnlocked(), since the spinlock protecting
    	 * ShmemAlloc() obviously can't be ready yet.
    	 */
    	spinsemas = (PGSemaphore *) ShmemAllocUnlocked(SpinlockSemaSize());
    	for (i = 0; i < nsemas; ++i)
    		spinsemas[i] = PGSemaphoreCreate();
    	SpinlockSemaArray = spinsemas;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    6 InitShmemAllocation
    该函数用于初始化全局变量 ShmemLock,在后续各模块申请内存空间需持有该锁;同时初始化事务管理器中OID/XID相关的全局结构体:ShmemVariableCache

    /*
     *	InitShmemAllocation() --- set up shared-memory space allocation.
     *
     * This should be called only in the postmaster or a standalone backend.
     */
    void
    InitShmemAllocation(void)
    {
    	PGShmemHeader *shmhdr = ShmemSegHdr;
    	char	   *aligned;
    
    	Assert(shmhdr != NULL);
    
    	/*
    	 * Initialize the spinlock used by ShmemAlloc.  We must use
    	 * ShmemAllocUnlocked, since obviously ShmemAlloc can't be called yet.
    	 */
    	ShmemLock = (slock_t *) ShmemAllocUnlocked(sizeof(slock_t));
    
    	SpinLockInit(ShmemLock);
    
    	/*
    	 * Allocations after this point should go through ShmemAlloc, which
    	 * expects to allocate everything on cache line boundaries.  Make sure the
    	 * first allocation begins on a cache line boundary.
    	 */
    	aligned = (char *)
    		(CACHELINEALIGN((((char *) shmhdr) + shmhdr->freeoffset)));
    	shmhdr->freeoffset = aligned - (char *) shmhdr;
    
    	/* ShmemIndex can't be set up yet (need LWLocks first) */
    	shmhdr->index = NULL;
    	ShmemIndex = (HTAB *) NULL;
    
    	/*
    	 * Initialize ShmemVariableCache for transaction manager. (This doesn't
    	 * really belong here, but not worth moving.)
    	 */
    	ShmemVariableCache = (VariableCache)
    		ShmemAlloc(sizeof(*ShmemVariableCache));
    	memset(ShmemVariableCache, 0, sizeof(*ShmemVariableCache));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
  • 相关阅读:
    这可能是最全的SpringBoot3新版本变化了!
    JVM线上监控环境搭建Grafana+Prometheus+Micrometer
    马瑞利汽车企业ERP管理系统 毕业设计源码96118
    CCF-CSP真题《202305-4 电力网络》思路+python,c++满分题解
    MySQL4
    MySQL零基础从入门到精通(总)
    淘宝商品详情API接口,解决滑块问题
    中间件安全:Apache Tomcat 文件上传.(CVE-2017-12615)
    Python 3.11比3.10 快60%:使用冒泡排序和递归函数对比测试
    springsecurity学习笔记-未完
  • 原文地址:https://blog.csdn.net/qq_52668274/article/details/127776509