谈到逻辑解码需要先从流复制开始说起。流复制的最重要的一个用途就是实现数据库的热备,数据库的主备同步方式是物理级别的数据同步。但实际应用场景中仅仅通过物理赋值是无法满足业务需求的,因此提供了逻辑复制的功能。
逻辑复制主要解决了以物理赋值无法解决的一些问题,例如:
指定库或部分表的复制需求
将多个数据库实例的数据汇聚到同一个目标库
将一个库的数据分发到多个不同的库
不同的版本之间的复制
不同库名之间的表同步
逻辑复制的关键是将WAL日志的内容进行逻辑解码成特定的格式,如json,SQL等。pg_recvlogical 客户端工具就是逻辑解码的一种典型应用,它将WAL日志解码为json格式,保存在指定文件或标准输出stdout中。
需要修改wal_level,在MogDB中wal_level有如下取值:
须知:
注意:
replica,它会写入足够的数据以支持WAL归档和复制,包括在后备服务器上运行只读查询。minimal会去掉除从崩溃或者立即关机中进行恢复所需的信息之外的所有记录。最后,logical会增加支持逻辑解码所需的信息。每个层次包括所有更低层次记录的信息。这个参数只能在服务器启动时设置。wal_level = logical # minimal, archive, hot_standby or logical
# (change requires restart)
max_wal_senders = 10 # max number of walsender processes
# (change requires restart)
wal_keep_segments = 16 # in logfile segments, 16MB each; 0 disables
max_replication_slots = 8 # max number of replication slots.i
注:修改配置后需要重启MogDB
MogDB=#CREATE ROLE pub_sub_user WITH SYSADMIN REPLICATION LOGIN PASSWORD 'pub_sub@123';
NOTICE: The encrypted password contains MD5 ciphertext, which is not secure.
CREATE ROLE
MogDB=#
注:需要有SYSADMIN 和REPLICATION 权限
# replication privilege.
#local replication omm trust
#host replication omm 127.0.0.1/32 trust
#host replication omm ::1/128 trust
host all all 0.0.0.0/0 md5
host replication all 0.0.0.0/0 md5
pg_recvlogical可以作为观察wal日志具体变化的工具。

如上图:
CREATE_REPLICATION_SLOT 建立逻辑复制槽START_REPLICATION ,循环处理wal流,并进行解码DROP_REPLICATION_SLOT 销毁复制槽
mppdb_decodingplugin = pg_strdup("mppdb_decoding");
--plugin 设置插件 如:–plugin = wal2json
通过ouput/decode插件处理pg_recvlogical可以将wal变更内容输出为特定格式(TXT、JISON)。
pg_recvlogical receives logical change stream.
Usage:
pg_recvlogical [OPTION]...
Options:
-f, --file=FILE receive log into this file. - for stdout
-n, --no-loop do not loop on connection lost
-v, --verbose output verbose messages
-V, --version output version information, then exit
-?, --help show this help, then exit
Connection options:
-d, --dbname=DBNAME database to connect to
-h, --host=HOSTNAME database server host or socket directory
-p, --port=PORT database server port number
-U, --username=NAME connect as specified database user
-w, --no-password never prompt for password
-W, --password force password prompt (should happen automatically)
Replication options:
-F --fsync-interval=INTERVAL
frequency of syncs to the output file (in seconds, defaults to 10)
-o, --option=NAME[=VALUE]
Specify option NAME with optional value VAL, to be passed
to the output plugin
-P, --plugin=PLUGIN use output plugin PLUGIN (defaults to mppdb_decoding)
-s, --status-interval=INTERVAL
time between status packets sent to server (in seconds, defaults to 10)
-S, --slot=SLOT use existing replication slot SLOT instead of starting a new one
-I, --startpos=PTR Where in an existing slot should the streaming start
-r, --raw parallel decoding output raw results without converting to text format
Action to be performed:
--create create a new replication slot (for the slotname see --slot)
--start start streaming in a replication slot (for the slotname see --slot)
--drop drop the replication slot (for the slotname see --slot)
以上3个命令通过词法和语法解析src/backend/replication/repl_scanner.l、src/gausskernel/storage/replication/repl_gram.y

/* CREATE_REPLICATION_SLOT SLOT slot [%X/%X] */
create_replication_slot:
/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL [init_slot_lsn] */
K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL RECPTR
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
cmd->init_slot_lsn = $4;
$$ = (Node *) cmd;
}
/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_LOGICAL;
cmd->slotname = $2;
cmd->plugin = $4;
$$ = (Node *) cmd;
}
;
/* DROP_REPLICATION_SLOT slot */
drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT
{
DropReplicationSlotCmd *cmd;
cmd = makeNode(DropReplicationSlotCmd);
cmd->slotname = $2;
$$ = (Node *) cmd;
}
;
output插件都要实现_PG_output_plugin_init 函数,和必要的回调函数。

回调函数集合:
/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks {
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeAbortCB abort_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
} OutputPluginCallbacks;
主要的几个回调函数包括(mppdb_decoding为例):
pg_recvlogical --create -S test_slot -d postgres

pg_recvlogical --start -S test_slot -d postgres -f -
-- “-f -” 表示输出到stdout(屏幕)

注:pg_recvlogical会不停地每个5s检查复制槽是否有更新
MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
location | xid | data
----------+-----+------
(0 rows)
MogDB=#insert into pub_sub (name) values('test pg_recvlogical 01');
INSERT 0 1
MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
location | xid | data
-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------
0/48C60D8 | 63062 | BEGIN 63062
0/48C60D8 | 63062 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["23"
,"'test pg_recvlogical 01'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48C62B8 | 63062 | COMMIT 63062 (at 2022-09-01 11:14:24.539429+00) CSN 47645
(3 rows)
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
location | xid | data
----------+-----+------
(0 rows)
MogDB=#insert into pub_sub (name) values('test pg_recvlogical 02');
INSERT 0 1
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
ERROR: replication slot "test_slot" is already active
MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
location | xid | data
-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------
0/48C6548 | 63063 | BEGIN 63063
0/48C6548 | 63063 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24"
,"'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48C6728 | 63063 | COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646
(3 rows)

pg_recvlogical输出如下
BEGIN 63063
{"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24","'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646
因为pg_recvlogical默认使用mppdb_decoding插件,mppdb_decoding输出格式为json。
格式化后的json:

pg_recvlogical --drop -S test_slot -d postgres

$ pg_recvlogical --create -S test_slot -d postgres --plugin=wal2json
$ pg_recvlogical --start -S test_slot -d postgres -f -
wal2json 没有显示BEGIN和COMMIT

警告:有概率会报一下错误,疑似BUG。

$ pg_recvlogical --create -S test_slot -d postgres --plugin=pgoutput
$ pg_recvlogical --start -S test_slot -d postgres -f -
更换为pgoutput 后回报版本错误:FATAL: client sent proto_version=0 but we only support protocol 1 or higher

MogDB=#SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'mppdb_decoding');
slotname | xlog_position
-----------------+---------------
regression_slot | 0/48D3488
(1 row)
MogDB=#select * from pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn | dummy_standby
-----------------+----------------+-----------+--------+----------+--------+------+--------------+-------------+---------------
user_sub | pgoutput | logical | 15016 | postgres | t | | 63071 | 0/48D3438 | f
wal2json | wal2json | logical | 15016 | postgres | f | | 61212 | 0/40EBA88 | f
test_slot | mppdb_decoding | logical | 15016 | postgres | f | | 61212 | 0/48C9A90 | f
regression_slot | mppdb_decoding | logical | 15016 | postgres | f | | 63071 | 0/48D3408 | f
(4 rows)
MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
location | xid | data
----------+-----+------
(0 rows)
MogDB=#CREATE TABLE data(id serial primary key, data text);
NOTICE: CREATE TABLE will create implicit sequence "data_id_seq" for serial column "data.id"
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "data_pkey" for table "data"
CREATE TABLE
MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
location | xid | data
-----------+-------+-----------------------------------------------------------
0/48C9D30 | 63070 | BEGIN 63070
0/48D32E8 | 63070 | COMMIT 63070 (at 2022-09-01 11:51:40.602993+00) CSN 47650
(2 rows)
MogDB=#BEGIN;
BEGIN
MogDB=#INSERT INTO data(data) VALUES('1');
INSERT 0 1
MogDB=#INSERT INTO data(data) VALUES('2');
INSERT 0 1
MogDB=#COMMIT
#;
COMMIT
MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
location | xid | data
-----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------
0/48E0CD0 | 63073 | BEGIN 63073
0/48E0CD0 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["1","'1'"],"old_key
s_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48E1090 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["2","'2'"],"old_key
s_name":[],"old_keys_type":[],"old_keys_val":[]}
0/48E11E0 | 63073 | COMMIT 63073 (at 2022-09-01 11:58:19.799037+00) CSN 47653
(4 rows)
MogDB=#SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
逻辑复制/解码相对于物理复制更加灵活,也可以根据实际业务需要开发对应的逻辑解码插件,甚至可以当做ETL来使用。总的来说逻辑解码是对用户非常友好的接口。以上通过对MogDB逻辑复制、逻辑解码的原理和部分代码进行分析,利用pg_recvlogical和SQL演示逻辑解码的过程,希望对大家理解MogDB逻辑解码有所帮助。