• Postgresql源码(110)分析dsm动态共享内存分配与共享内存mq实例


    相关
    《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
    《Linux内存映射函数mmap与匿名内存块》
    《Linux共享内存与子进程继承》

    dsm/toc使用备忘

    用dsm框架的流程

    1. 评估共享内存大小:多次用shm_toc_estimate_chunk、shm_toc_estimate_keys向estimate中增加数据结构,最后用shm_toc_estimate得出刚才增加的总大小。
    2. 申请共享内存:dsm_create
    3. 共享内存头部放kv管理结构toc:shm_toc_create,现在共享内存中可以使用toc接口,以kv的形式存放数据结构了。
    4. 例如需要存放FixedParallelState结构
      1. 第一步:在刚才申请的共享内存段中,申请一个起始地址:shm_toc_allocate
      2. 第二步:拿到地址后,转为FixedParallelState开始赋值。
      3. 第三步:最后调用toc接口shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps),将刚才的数据结构地址关联一个PARALLEL_KEY_FIXED,使用的时候用PARALLEL_KEY_FIXED查找即可fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false)

    0 概念

    数据结构含义:

    1. dsm_segment(动态共享内存段):
      • 每个后端进程可以通过使用dsm_segment来访问共享内存。dsm_segment包含了内存分配和释放等操作所需的信息,并提供了一组函数来管理和操作共享内存。每个会话(session)都可以有一个或多个dsm_segment。
      • 提供共享内存,dsm_segment表示申请的一个共享内存段,对于dsm api来说这是代表共享内存的最小单位。
    2. shm_mq_handle(共享内存消息队列句柄)
      • shm_mq_handle则是用于在共享内存中进行消息传递的句柄。它提供了一组函数来发送和接收消息,并提供了同步和互斥机制,确保多个进程之间的顺序和一致性。
      • 使用dsm_segment提供共享内存段做进程通信。

    在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。

    1 shm_toc初始化一段共享内存,共享内存是从哪来的?

    在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。

    初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。

    struct shm_toc
    {
    	uint64		toc_magic;		/* Magic number identifying this TOC */
    	slock_t		toc_mutex;		/* Spinlock for mutual exclusion */
    	Size		toc_total_bytes;	/* Bytes managed by this TOC */
    	Size		toc_allocated_bytes;	/* Bytes allocated of those managed */
    	uint32		toc_nentry;		/* Number of entries in TOC */
    	shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    初始化

    shm_toc *
    shm_toc_create(uint64 magic, void *address, Size nbytes)
    {
    	shm_toc    *toc = (shm_toc *) address;
    
    	Assert(nbytes > offsetof(shm_toc, toc_entry));
    	toc->toc_magic = magic;
    	SpinLockInit(&toc->toc_mutex);
    
    	/*
    	 * The alignment code in shm_toc_allocate() assumes that the starting
    	 * value is buffer-aligned.
    	 */
    	toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);
    	toc->toc_allocated_bytes = 0;
    	toc->toc_nentry = 0;
    
    	return toc;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    那么shm_toc_create用的内存是从哪来的?

    2 动态mmap一段新的共享内存(dsm机制)

    1. Postgresql能看到很多dsm开头的函数,这类函数属于运行时动态申请共享内存模块( dynamic shared memory)。
    2. 《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》介绍过,PG的共享内存是在启动时,直接用mmap 一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。

    3 dsm申请共享内存应用实例:mq

    在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:

    src/test/modules/test_shm_mq

    下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。

    3.1 内存初始化:test_shm_mq_setup

    void
    test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
    				  shm_mq_handle **output, shm_mq_handle **input)
    {
    	dsm_segment *seg;
    	test_shm_mq_header *hdr;
    	shm_mq	   *outq = NULL;	/* placate compiler */
    	shm_mq	   *inq = NULL;		/* placate compiler */
    	worker_state *wstate;
    
    	/* Set up a dynamic shared memory segment. */
    	setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
    	*segp = seg;
    
    	/* Register background workers. */
    	wstate = setup_background_workers(nworkers, seg);
    
    	/* Attach the queues. */
    	*output = shm_mq_attach(outq, seg, wstate->handle[0]);
    	*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
    
    	/* Wait for workers to become ready. */
    	wait_for_workers_to_become_ready(wstate, hdr);
    
    	/*
    	 * Once we reach this point, all workers are ready.  We no longer need to
    	 * kill them if we die; they'll die on their own as the message queues
    	 * shut down.
    	 */
    	cancel_on_dsm_detach(seg, cleanup_background_workers,
    						 PointerGetDatum(wstate));
    	pfree(wstate);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    第一步:申请共享内存setup_dynamic_shared_memory
    static void
    setup_dynamic_shared_memory(int64 queue_size, int nworkers,
    							dsm_segment **segp, test_shm_mq_header **hdrp,
    							shm_mq **outp, shm_mq **inp)
    {
    	shm_toc_estimator e;
    	int			i;
    	Size		segsize;
    	dsm_segment *seg;
    	shm_toc    *toc;
    	test_shm_mq_header *hdr;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    用estimator来评估需要多少共享内存(可以不用自己算了:)
    评估器的接口:

    • shm_toc_initialize_estimator:初始化记录(key的个数和chunk的总大小)。
    • shm_toc_estimate_chunk:需要多大的chunk?调用一次记录一个指定大小的chunk,对齐到32字节上。
    • shm_toc_estimate_keys:需要几个key?这里记录一下
    • shm_toc_estimate:把之前记录的结构换算成大小

    评估器的原理:记录key的个数和chunk的总大小即可。

    	shm_toc_initialize_estimator(&e);
    	shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
    	for (i = 0; i <= nworkers; ++i)
    		shm_toc_estimate_chunk(&e, (Size) queue_size);
    	shm_toc_estimate_keys(&e, 2 + nworkers);
    	segsize = shm_toc_estimate(&e);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    评估完了开始用dsm_create申请新的共享内存了!

    dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。

    现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。

    	/* Create the shared memory segment and establish a table of contents. */
    	seg = dsm_create(shm_toc_estimate(&e), 0);
    
    • 1
    • 2

    在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引

    ---------------- 共享内存起始地址
    struct shm_toc
    {
    	uint64		toc_magic;				/* Magic number identifying this TOC */
    	slock_t		toc_mutex;				/* Spinlock for mutual exclusion */
    	Size		toc_total_bytes;		/* Bytes managed by this TOC */
    	Size		toc_allocated_bytes;	/* Bytes allocated of those managed */
    	uint32		toc_nentry;				/* Number of entries in TOC */
    	shm_toc_entry toc_entry[0];
    };
    ---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
    toc_entry[0]   -------------\
    ----------------             |
    toc_entry[1]                 |
    ----------------             |
    toc_entry[2]  ---------\     |
    ----------------       |     |
    data <-----------------/     |
    ...                          |
    data                         |
    ...                          |
    data <----------------------/
    ----------------
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    	toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
    						 segsize);
    
    
    • 1
    • 2
    • 3

    申请一段用户自定义数据空间,放一个test_shm_mq_header结构。

    	hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
    	SpinLockInit(&hdr->mutex);
    	hdr->workers_total = nworkers;
    	hdr->workers_attached = 0;
    	hdr->workers_ready = 0;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。

    	shm_toc_insert(toc, 0, hdr);
    
    	/* Set up one message queue per worker, plus one. */
    	for (i = 0; i <= nworkers; ++i)
    	{
    		shm_mq	   *mq;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    每个worker申请一个queue_size,按上面方法allocate+insert。

    		mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
    						   (Size) queue_size);
    		shm_toc_insert(toc, i + 1, mq);
    
    		if (i == 0)
    		{
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。

    			/* We send messages to the first queue. */
    			shm_mq_set_sender(mq, MyProc);
    			*outp = mq;
    		}
    		if (i == nworkers)
    		{
    			/* We receive messages from the last queue. */
    			shm_mq_set_receiver(mq, MyProc);
    			*inp = mq;
    		}
    	}
    
    	/* Return results to caller. */
    	*segp = seg;
    	*hdrp = hdr;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    第二步:配置bgworker:setup_background_workers
    static worker_state *
    setup_background_workers(int nworkers, dsm_segment *seg)
    {
    	MemoryContext oldcontext;
    	BackgroundWorker worker;
    	worker_state *wstate;
    	int			i;
    
    	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:

    ------------------------------
    typedef struct
    {
    	int			nworkers;
    	BackgroundWorkerHandle *handle[];  //  flex数组,每个位置对应一个worker
    	{
    		int			slot;
    		uint64		generation;
    	}
    } worker_state;
    ------------------------------
    handle[0]    
    ------------------------------
    handle[1]
    ------------------------------
    handle[2]
    ------------------------------
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    	wstate = MemoryContextAlloc(TopTransactionContext,
    								offsetof(worker_state, handle) +
    								sizeof(BackgroundWorkerHandle *) * nworkers);
    	wstate->nworkers = 0;
    	
    	on_dsm_detach(seg, cleanup_background_workers,
    				  PointerGetDatum(wstate));
    
    	/* Configure a worker. */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    配通用的worker通用部分:

    	memset(&worker, 0, sizeof(worker));
    	worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
    	worker.bgw_start_time = BgWorkerStart_ConsistentState;
    	worker.bgw_restart_time = BGW_NEVER_RESTART;
    	sprintf(worker.bgw_library_name, "test_shm_mq");
    	sprintf(worker.bgw_function_name, "test_shm_mq_main");
    	snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
    	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
    	/* set bgw_notify_pid, so we can detect if the worker stops */
    	worker.bgw_notify_pid = MyProcPid;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    把worker通用的部分传进去,拼每个worker特有的部分

    	/* Register the workers. */
    	for (i = 0; i < nworkers; ++i)
    	{
    
    • 1
    • 2
    • 3

    BackgroundWorkerData->slot数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]中并返回即可。

    		if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
    			ereport(ERROR,
    					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
    					 errmsg("could not register background process"),
    					 errhint("You may need to increase max_worker_processes.")));
    		++wstate->nworkers;
    	}
    
    	/* All done. */
    	MemoryContextSwitchTo(oldcontext);
    	return wstate;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    第三步:剩余流程
    void
    test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
    				  shm_mq_handle **output, shm_mq_handle **input)
    {
    	dsm_segment *seg;
    	test_shm_mq_header *hdr;
    	shm_mq	   *outq = NULL;	/* placate compiler */
    	shm_mq	   *inq = NULL;		/* placate compiler */
    	worker_state *wstate;
    
    	/* Set up a dynamic shared memory segment. */
    	setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
    	*segp = seg;
    
    	/* Register background workers. */
    	wstate = setup_background_workers(nworkers, seg);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle

    	/* Attach the queues. */
    	*output = shm_mq_attach(outq, seg, wstate->handle[0]);
    	*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
    
    
    • 1
    • 2
    • 3
    • 4

    用取一个PID的方式检查worker是不是ready了。

    	wait_for_workers_to_become_ready(wstate, hdr);
    
    	cancel_on_dsm_detach(seg, cleanup_background_workers,
    						 PointerGetDatum(wstate));
    	pfree(wstate);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。

  • 相关阅读:
    Nginx之带宽限制解读
    ssh远程安装teamviewer与正在连接...问题解决方法
    kube-apiserver鉴权源码简析
    Websocket升级版
    C#实现HTTP访问类HttpHelper
    C语言动态内存管理、柔性数组(超详细版)
    如何3分钟,快速开发一个新功能
    AI 时代的向量数据库、关系型数据库与 Serverless 技术丨TiDB Hackathon 2023 随想
    H264解码器实现-帧间预测
    Kubernetes (k8s 1.23) 安装与卸载
  • 原文地址:https://blog.csdn.net/jackgo73/article/details/132044836