drop table foo;
CREATE TABLE foo (bar bigint, baz text);
ALTER TABLE foo SET (autovacuum_enabled = false, toast.autovacuum_enabled = false);
INSERT INTO foo VALUES (1, 'Hello world');
SELECT * FROM dirtyread('foo') as t(bar bigint, baz text);
DELETE FROM foo;
SELECT * FROM dirtyread('foo') as t(bar bigint, baz text);

插件系统为返回多行的插件提供了SRF框架,函数会被自动反复调用多次,直到SRF_RETURN_DONE。
为fn_extra填充数据FuncCallContext,红色部分:

切换到SRF专用内存上下文。
拿到输出元组格式,例如当前用例:
SELECT * FROM dirtyread('foo') as t(bar bigint, baz text);
会得到tupdesc:
{natts = 2, tdtypeid = 2249, tdtypmod = 0, tdrefcount = -1, constr = 0x0, attrs = 0x1523858}
attrs[0] = {attrelid = 0, attname = {data = "bar", ...
attrs[1] = {attrelid = 0, attname = {data = "baz", ...
记录输出元组格式
(gdb) p *(FuncCallContext*)fcinfo->flinfo->fn_extra
$32 = {call_cntr = 0, max_calls = 0, user_fctx = 0x0, attinmeta = 0x0, multi_call_memory_ctx = 0x1524c00, tuple_desc = 0x1523840}
修改蓝色部分:

记录用户自定义数据结构,修改紫色部分:

拿到上图中FuncCallContext。
修改计数器与状态,粉色部分:

#include "postgres.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlog.h"
#include "catalog/pg_type.h"
#include "common/hashfn.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Datum dirtyread(PG_FUNCTION_ARGS);
typedef struct dirtyread_ctx_state
{
Relation rel;
TupleDesc desc;
TableScanDesc scan;
} dirtyread_ctx_state;
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(dirtyread);
Datum dirtyread(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
dirtyread_ctx_state *inter_call_data = NULL;
HeapTuple tuple;
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
Oid relid;
TupleDesc tupdesc;
relid = PG_GETARG_OID(0);
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
inter_call_data = (dirtyread_ctx_state *)palloc(sizeof(dirtyread_ctx_state));
inter_call_data->rel = table_open(relid, AccessShareLock);
inter_call_data->desc = RelationGetDescr(inter_call_data->rel);
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
inter_call_data->scan = heap_beginscan(inter_call_data->rel, SnapshotAny, 0, NULL, NULL, 0);
funcctx->user_fctx = (void *)inter_call_data;
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
inter_call_data = (dirtyread_ctx_state *)funcctx->user_fctx;
if ((tuple = heap_getnext(inter_call_data->scan, ForwardScanDirection)) != NULL)
{
SRF_RETURN_NEXT(funcctx, heap_copy_tuple_as_datum(tuple, inter_call_data->desc));
}
else
{
heap_endscan(inter_call_data->scan);
table_close(inter_call_data->rel, AccessShareLock);
SRF_RETURN_DONE(funcctx);
}
}