本文通过简单修改开源Postgresql源码,实现批量获取事务ID的功能,对比前后性能差异。
周末实验项目for fun,代码可以随意使用。
!!!注意:修改会带来的并发问题会造成数据不一致,ProcArray和快照的逻辑很多都是在XID严格递增的情况下设计的,修改后的xid空洞、跳变需要很大的修改量来适配。
(性能数据没有太大参考意义,只用于前后对比)
16C小规格测试机128并发压测,PG参数全部异步写,瓶颈来到事务ID生成
128并发压测只写120秒XidGen锁每秒的出现数量:均值在60左右,QPS = 80589
-- 参数
fsync = off
synchronous_commit = off
autovacuum = off
create table testbl1(c1 int, c2 int, c3 int, c4 text, c5 text);
-- in.sql
insert into testbl1 values (12,123,456,'avzdsqerqwadsf','asdfgerrerg');
pgbench -c 128 -j 128 -n -r -P 1 -T 120 -f ./in.sql
for i in {1..60};do psql -c "select count(*) from pg_stat_activity where wait_event='XidGen'" -A -t; sleep 1;done;
0
12
100
41
0
50
45
64
94
98
97
27
...
...
由于是实验项目,改造会造成逻辑复制等代码crash,这里不关注。
【本地进程】拿事务ID从一次拿一个变成一次拿N个,其他不变。
关键改造点:
拿事务ID由每个进程自己拿,变成由一个进程统一分配。
结论:QPS有略微提升(和环境关系比较大,CPU共享性能很差)
QPS对比
【一批拿5个xid】 vs 【一次拿1个xid】xidgen锁事件对比
xidgen明显下降,瓶颈点打散到ProcArrayGroupUpdate、XactGroupUpdate等
【一批拿64个xid】 vs 【一次拿1个xid】xidgen锁事件对比
观测不到xidgen,瓶颈点打散到ProcArrayGroupUpdate、XactGroupUpdate等
FullTransactionId localTransactionId = {0};
int localTransactionIdCnt = 0;
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
FullTransactionId full_xid;
TransactionId xid;
/*
* Workers synchronize transaction state at the beginning of each parallel
* operation, so we can't account for new XIDs after that point.
*/
if (IsInParallelMode())
elog(ERROR, "cannot assign TransactionIds during a parallel operation");
/*
* During bootstrap initialization, we return the special bootstrap
* transaction id.
*/
if (IsBootstrapProcessingMode())
{
Assert(!isSubXact);
MyProc->xid = BootstrapTransactionId;
ProcGlobal->xids[MyProc->pgxactoff] = BootstrapTransactionId;
return FullTransactionIdFromEpochAndXid(0, BootstrapTransactionId);
}
/* safety check, we should never get this far in a HS standby */
if (RecoveryInProgress())
elog(ERROR, "cannot assign TransactionIds during recovery");
bool needlock = false;
if (localTransactionIdCnt > 0)
{
// LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
Assert(localTransactionId.value > 0);
full_xid = localTransactionId;
xid = XidFromFullTransactionId(full_xid);
FullTransactionIdAdvance(&localTransactionId);
localTransactionIdCnt--;
needlock = false;
}
else
{
FullTransactionId prevTransactionId = {0};
TransactionId prevXid;
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
needlock = true;
// [1] get 1000, use 1000
localTransactionId = full_xid = ShmemVariableCache->nextXid;
xid = XidFromFullTransactionId(full_xid);
// [2] move local to 1001
FullTransactionIdAdvance(&localTransactionId);
// [3] move share to 1001
FullTransactionIdAdvance(&ShmemVariableCache->nextXid);
for (int i = 0; i < 5; i++)
{
prevTransactionId = ShmemVariableCache->nextXid;
// [4] move share to 1006 (1006 for others!)
FullTransactionIdAdvance(&ShmemVariableCache->nextXid);
// [5] cnt == 5 (local: 1001 1002 1003 1004 1005)
localTransactionIdCnt++;
}
// [6] extend once to 1005
prevXid = XidFromFullTransactionId(prevTransactionId);
ExtendCLOG(prevXid);
ExtendCommitTs(prevXid);
ExtendSUBTRANS(prevXid);
}
Assert(localTransactionIdCnt >= 0);
if (!isSubXact)
{
Assert(ProcGlobal->subxidStates[MyProc->pgxactoff].count == 0);
Assert(!ProcGlobal->subxidStates[MyProc->pgxactoff].overflowed);
Assert(MyProc->subxidStatus.count == 0);
Assert(!MyProc->subxidStatus.overflowed);
/* LWLockRelease acts as barrier */
MyProc->xid = xid;
ProcGlobal->xids[MyProc->pgxactoff] = xid;
}
else
{
XidCacheStatus *substat = &ProcGlobal->subxidStates[MyProc->pgxactoff];
int nxids = MyProc->subxidStatus.count;
Assert(substat->count == MyProc->subxidStatus.count);
Assert(substat->overflowed == MyProc->subxidStatus.overflowed);
if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
{
MyProc->subxids.xids[nxids] = xid;
pg_write_barrier();
MyProc->subxidStatus.count = substat->count = nxids + 1;
}
else
MyProc->subxidStatus.overflowed = substat->overflowed = true;
}
if (needlock)
LWLockRelease(XidGenLock);
// elog(WARNING, "[%ld](%d)->[%ld]", localTransactionId.value, localTransactionIdCnt, full_xid.value);
return full_xid;
}
#define CLOG_MAX_PAGES (UINT_MAX / CLOG_XACTS_PER_PAGE) // 131071
bool ClogPageMark[CLOG_MAX_PAGES] = {false};
void
ExtendCLOG(TransactionId newestXact)
{
int pageno;
/*
* No work except at first XID of a page. But beware: just after
* wraparound, the first XID of page zero is FirstNormalTransactionId.
*/
// if (TransactionIdToPgIndex(newestXact) != 0 &&
// !TransactionIdEquals(newestXact, FirstNormalTransactionId))
// return;
if (ClogPageMark[TransactionIdToPage(newestXact)])
return;
pageno = TransactionIdToPage(newestXact);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCLOGPage(pageno, true);
LWLockRelease(XactSLRULock);
ClogPageMark[TransactionIdToPage(newestXact)] = true;
}