相关
《Postgresql源码(90)共享内存申请CreateSharedMemoryAndSemaphores》
《Linux内存映射函数mmap与匿名内存块》
《Linux共享内存与子进程继承》
用dsm框架的流程
shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps)
,将刚才的数据结构地址关联一个PARALLEL_KEY_FIXED,使用的时候用PARALLEL_KEY_FIXED查找即可fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false)
。数据结构含义:
在实际使用中,通常会将它们组合在一起,以实现共享内存中的消息传递机制。
在PG代码中可以看到shm_toc初始化一段内存,在头部放置shm_toc,这块内存叫做一个内存段,shm_toc_create函数接受已经申请好的内存地址,在头部初始化shm_toc(表示内存段头),后面可以存放用户自定义数据,比如message queue所需的结构体、数据等。
初始化一段共享内存,在头上放shm_toc结构,明显是用来记录内存段管理的一些数据。
struct shm_toc
{
uint64 toc_magic; /* Magic number identifying this TOC */
slock_t toc_mutex; /* Spinlock for mutual exclusion */
Size toc_total_bytes; /* Bytes managed by this TOC */
Size toc_allocated_bytes; /* Bytes allocated of those managed */
uint32 toc_nentry; /* Number of entries in TOC */
shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER];
};
初始化
shm_toc *
shm_toc_create(uint64 magic, void *address, Size nbytes)
{
shm_toc *toc = (shm_toc *) address;
Assert(nbytes > offsetof(shm_toc, toc_entry));
toc->toc_magic = magic;
SpinLockInit(&toc->toc_mutex);
/*
* The alignment code in shm_toc_allocate() assumes that the starting
* value is buffer-aligned.
*/
toc->toc_total_bytes = BUFFERALIGN_DOWN(nbytes);
toc->toc_allocated_bytes = 0;
toc->toc_nentry = 0;
return toc;
}
那么shm_toc_create用的内存是从哪来的?
mmap
一次性申请大匿名块,然后自己切分使用的。但如果运行时需要申请新的、不定大小的共享内存块,肯定无法再启动时预先申请,这就引入了dsm模块。在源码中有一个非常好的例子,可以用来分析dsm申请内存用作mq的方法:
src/test/modules/test_shm_mq
下面对代码流程做一些分析,主要分析动态申请共享内存的过程,不涉及初始化后的使用流程。
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
shm_mq_handle **output, shm_mq_handle **input)
{
dsm_segment *seg;
test_shm_mq_header *hdr;
shm_mq *outq = NULL; /* placate compiler */
shm_mq *inq = NULL; /* placate compiler */
worker_state *wstate;
/* Set up a dynamic shared memory segment. */
setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
*segp = seg;
/* Register background workers. */
wstate = setup_background_workers(nworkers, seg);
/* Attach the queues. */
*output = shm_mq_attach(outq, seg, wstate->handle[0]);
*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
/* Wait for workers to become ready. */
wait_for_workers_to_become_ready(wstate, hdr);
/*
* Once we reach this point, all workers are ready. We no longer need to
* kill them if we die; they'll die on their own as the message queues
* shut down.
*/
cancel_on_dsm_detach(seg, cleanup_background_workers,
PointerGetDatum(wstate));
pfree(wstate);
}
static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,
dsm_segment **segp, test_shm_mq_header **hdrp,
shm_mq **outp, shm_mq **inp)
{
shm_toc_estimator e;
int i;
Size segsize;
dsm_segment *seg;
shm_toc *toc;
test_shm_mq_header *hdr;
用estimator来评估需要多少共享内存(可以不用自己算了:)
评估器的接口:
评估器的原理:记录key的个数和chunk的总大小即可。
shm_toc_initialize_estimator(&e);
shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
for (i = 0; i <= nworkers; ++i)
shm_toc_estimate_chunk(&e, (Size) queue_size);
shm_toc_estimate_keys(&e, 2 + nworkers);
segsize = shm_toc_estimate(&e);
评估完了开始用dsm_create申请新的共享内存了!
dsm_impl_op→dsm_impl_op→dsm_impl_mmap来具体从os申请共享内存,不深入了。
现在拿到seg就是dsm_segment动态共享内存段,代表一段共享内存(段内部有dsm_control一套管理机制,需要深入的话可以看看dsm_create代码),这里只需要指导这是一段按需要大小mmap出来的共享内存就好了。
/* Create the shared memory segment and establish a table of contents. */
seg = dsm_create(shm_toc_estimate(&e), 0);
在共享内存其实的位置,放一个shm_toc结构,作为后面数据的表头和索引
---------------- 共享内存起始地址
struct shm_toc
{
uint64 toc_magic; /* Magic number identifying this TOC */
slock_t toc_mutex; /* Spinlock for mutual exclusion */
Size toc_total_bytes; /* Bytes managed by this TOC */
Size toc_allocated_bytes; /* Bytes allocated of those managed */
uint32 toc_nentry; /* Number of entries in TOC */
shm_toc_entry toc_entry[0];
};
---------------- 申请空间是从最高地址开始的用的,索引数组toc_entry从低到高,数据从高到低,和page一样
toc_entry[0] -------------\
---------------- |
toc_entry[1] |
---------------- |
toc_entry[2] ---------\ |
---------------- | |
data <-----------------/ |
... |
data |
... |
data <----------------------/
----------------
toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
segsize);
申请一段用户自定义数据空间,放一个test_shm_mq_header结构。
hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
SpinLockInit(&hdr->mutex);
hdr->workers_total = nworkers;
hdr->workers_attached = 0;
hdr->workers_ready = 0;
初始化好数据结构后,把数据插进去,位置前面shm_toc_allocate已经申请好了,这一步shm_toc_insert主要是配置toc_entry那个索引数组!具体就是toc_entry[0]的key和offset,下次用的时候用0就能找到offset了。
shm_toc_insert(toc, 0, hdr);
/* Set up one message queue per worker, plus one. */
for (i = 0; i <= nworkers; ++i)
{
shm_mq *mq;
每个worker申请一个queue_size,按上面方法allocate+insert。
mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
(Size) queue_size);
shm_toc_insert(toc, i + 1, mq);
if (i == 0)
{
把mq用户需要的数据结构传进去进行需要的配置即可,shm_toc_insert完了也可以用,alloc出来的地址直接指向共享内存,可以在当前进程一直使用。
/* We send messages to the first queue. */
shm_mq_set_sender(mq, MyProc);
*outp = mq;
}
if (i == nworkers)
{
/* We receive messages from the last queue. */
shm_mq_set_receiver(mq, MyProc);
*inp = mq;
}
}
/* Return results to caller. */
*segp = seg;
*hdrp = hdr;
}
static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{
MemoryContext oldcontext;
BackgroundWorker worker;
worker_state *wstate;
int i;
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
setup_background_workers函数目标是构造worker_state,下面是worker_state的内存结构:
------------------------------
typedef struct
{
int nworkers;
BackgroundWorkerHandle *handle[]; // flex数组,每个位置对应一个worker
{
int slot;
uint64 generation;
}
} worker_state;
------------------------------
handle[0]
------------------------------
handle[1]
------------------------------
handle[2]
------------------------------
wstate = MemoryContextAlloc(TopTransactionContext,
offsetof(worker_state, handle) +
sizeof(BackgroundWorkerHandle *) * nworkers);
wstate->nworkers = 0;
on_dsm_detach(seg, cleanup_background_workers,
PointerGetDatum(wstate));
/* Configure a worker. */
配通用的worker通用部分:
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "test_shm_mq");
sprintf(worker.bgw_function_name, "test_shm_mq_main");
snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
/* set bgw_notify_pid, so we can detect if the worker stops */
worker.bgw_notify_pid = MyProcPid;
把worker通用的部分传进去,拼每个worker特有的部分
/* Register the workers. */
for (i = 0; i < nworkers; ++i)
{
从BackgroundWorkerData->slot
数组(bgworker管理的全局变量记录使用所有使用中未使用的bgworker,每个worker一个slot)里面拿到一个没使用的slot,记录slot的id到handle[i]
中并返回即可。
if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not register background process"),
errhint("You may need to increase max_worker_processes.")));
++wstate->nworkers;
}
/* All done. */
MemoryContextSwitchTo(oldcontext);
return wstate;
}
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
shm_mq_handle **output, shm_mq_handle **input)
{
dsm_segment *seg;
test_shm_mq_header *hdr;
shm_mq *outq = NULL; /* placate compiler */
shm_mq *inq = NULL; /* placate compiler */
worker_state *wstate;
/* Set up a dynamic shared memory segment. */
setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
*segp = seg;
/* Register background workers. */
wstate = setup_background_workers(nworkers, seg);
第三步到这里了,shm_mq_handle的作用是构建一个shm_mq_handle,记录dsm seg、mq、bgworker的handle
/* Attach the queues. */
*output = shm_mq_attach(outq, seg, wstate->handle[0]);
*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
用取一个PID的方式检查worker是不是ready了。
wait_for_workers_to_become_ready(wstate, hdr);
cancel_on_dsm_detach(seg, cleanup_background_workers,
PointerGetDatum(wstate));
pfree(wstate);
}
到这里内存初始化的工作做完了。后面可以直接调用mq的api发送信息,本篇不在关注。