本文主要介绍PostgreSQL 逻辑复制社区插件原理与功能、逻辑订阅处理流程解析、适应场景,了解PostgreSQL 逻辑复制家族、核心技术、同步过程及同步原理等等,点击PostgreSQL 逻辑复制模块(一)。
test_decoding是Postgres现有的一个plugin,它的主要作用是将筛选过后的wal日志,转化为人们可以理解的形式。现在PG内部有两种方法可以使用plugin。如下是对这两种情况的简单说明。
官方文档:https://www.postgresql.org/docs/12/logicaldecoding-example.htm
--创建复制槽
test=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
slot_name | lsn
-----------------+-----------
regression_slot | 0/16569D8
(1 row)
-- 查看复制槽
test=# select * from pg_replication_slots ;
--插入一条数据到test01表中
test=# insert into test01 values(1,'test_decoding use by SQL');
INSERT 0 1
--查看解析的sql
test=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
------------+------+-----------------------------------------------------------------------------------------------
0/340001F8 | 1673 | BEGIN 1673
0/340001F8 | 1673 | table public.test01: INSERT: id[integer]:1 name[character varying]:'test_decoding use by SQL'
0/34000628 | 1673 | COMMIT 1673
test_decoding 原理图
文档地址:http://www.postgres.cn/docs/12/app-pgrecvlogical.html
# 在test库中创建默认test_decoding类型的复制槽 test2
[postgres@bogon ~]$ pg_recvlogical -d test --slot=test2 --create-slot
# 输出逻辑解码的信息到当前目录下的 ld.out 文件中
[postgres@bogon ~]$ pg_recvlogical -d test --slot=test2 --start -f ld.out &
[1] 19807
#从test库中插入一条数据
[postgres@bogon ~]$ psql -d test -c "insert into test01 values(2,'test_decoding use by pg_recvlogical');"
INSERT 0 1
#查看ld.out 中的信息
[postgres@bogon ~]$ cat ld.out
BEGIN 1674
table public.test01: INSERT: id[integer]:2 name[character varying]:'test_decoding use by pg_recvlogical'
COMMIT 1674
pg_recvlogical 命令流程图
wal2json plugin 是 test_decoding 的一个升级版,它优化了输出结果的结构,使之更容易被应用,输出的是 json 格式的数据。
# 在test库中创建wal2json类型的复制槽 test_slot_wal2json
[postgres@bogon ~]$ pg_recvlogical -d test --slot test_slot_wal2json --create-slot -P wal2json
# 输出逻辑解码的信息到当前目录下的 ld_wal2json.out 文件中
[postgres@bogon ~]$ pg_recvlogical -d test --slot test_slot_wal2json --start -o pretty-print=1 -f ld_wal2json.out &
#从test库中插入一条数据
[postgres@bogon ~]$ psql -d test -c "insert into test01 values(2,'wal2json use by pg_recvlogical');"
INSERT 0 1
查看 ld_wal2json.out 结果
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "test01",
"columnnames": ["id", "name"],
"columntypes": ["integer", "character varying"],
"columnvalues": [2, "wal2json use by pg_recvlogical"]
}
]
}
pger 可以使用test_decoding去认识逻辑复制,但是并不能真正的应用test_decoding来做什么,除非在test_decoding的基础上写代码去实现它的后续。logical replication真正的让用户可以在Postgres上体验另外一种不同的数据同步方式。换句话说,用户无法直接使用test_decoding功能用于生产环境,除非你使用第三方插件才能使用test_decoding完成数据同步的功能,而logical replication使得logical decoding技术更容易的用于生产环境。
pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为 PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布/订阅模型复制数据以进行选择性复制的一种高效方法。提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。
pglogical和logical replicate的原理是相同的,只不过pglogical有更加强大的冲突处理能力。
pglogical 文档:
https://www.2ndquadrant.com/en/resources/pglogical/pglogical-docs/
pglogical下载地址:
http://packages.2ndquadrant.com/pglogical/tarballs/
4.5 其他逻辑解码插件
标准的 pg12 默认的逻辑解码插件。
一个PostgreSQL逻辑解码器输出插件,用于将数据作为协议缓冲区传送。
重新构造应用更改的查询。
阿里巴巴研发的开源的解码插件。
专门设计用于分布在不同地理位置的集群的双向复制,区别与使用触发器进行双向复制的如SymmetricDS 的双主数据库。如果一个特定的数据集只在一个节点上修改,BDR工作的最好。BDR支持地理上分布的集群,不受距离的限制,并包含了地理围栏的能力。它的设计目的是最小化节点间的延迟。
订阅端执行CREATE SUBSCRIPTION后,在后台进行表数据同步。每个表的数据同步状态记录在pg_subscription_rel.srsubstate中,一共有4种状态码。
‘i’:初始化(SUBREL_STATE_INIT)
‘d’:正在copy数据(SUBREL_STATE_DATASYNC)
‘s’:已同步(SUBREL_STATE_SYNCDONE)
‘r’:准备好 (普通复制)(SUBREL_STATE_READY)
subdb=# select * from pg_subscription_rel;
srsubid | srrelid | srsubstate | srsublsn
---------+---------+------------+------------
29912 | 29904 | r | 0/3502AA90
(1 row)
从执行CREATE SUBSCRIPTION开始订阅端的相关处理流程概述如下:
设置每个表的为srsubstate中 ‘i’(SUBREL_STATE_INIT);
logical replication launcher 进程启动一个 logical replication apply worker 进程;
logical replication apply worker 进程连接到订阅端开始接受订阅消息,此时表尚未完成初始同步(状态为 i 或 d ),跳过所有 insert、update 和 delete 消息的处理;
logical replication apply worker 进程为每个未同步的表启动 logical replication sync worker 进程(每个订阅最多同时启动max_sync_workers_per_subscription个 sync worker);
logical replication sync worker 进程连接到订阅端并同步初始数据;
创建临时复制槽,并记录快照位置
设置表同步状态为 ‘d’(SUBREL_STATE_DATASYNC)
copy 表数据
设置表同步状态为SUBREL_STATE_SYNCWAIT(内部状态),并等待 apply worker 更新状态为SUBREL_STATE_CATCHUP(内部状态)
logical replication apply worker 进程更新表同步状态为SUBREL_STATE_CATCHUP(内部状态),记录最新lsn,并等待 sync worker 更新状态为SUBREL_STATE_SYNCDONE;
logical replication sync worker 进程完成初始数据同步;
检查 apply worker 当前处理的订阅消息位置是否已经走到了快照位置前面,如果是从订阅端接受消息并处理直到追上 apply worker
设置表同步状态为 ‘s’(SUBREL_STATE_SYNCDONE)
进程退出
logical replication apply worker 进程继续接受订阅消息并处理。
接受到 insert、update 和 delete 消息,如果是同步点(进入’s’ 或 ‘r’ 状态时的 lsn 位置)之后的消息进行应用
接受到 commit 消息
暂时没有新的消息处理
1.向发布端发送订阅位置反馈
2.如果不在事务块里,同步表状态。将所有处于 ‘s’(SUBREL_STATE_SYNCDONE)同步状态的表更新为 ‘r’(SUBREL_STATE_READY)
订阅表进入同步状态(状态码是 ‘s’ 或 ‘r’ )后,发布端的变更都会通过消息通知订阅端;订阅端 apply worker 按照订阅消息的接受顺序(即发布端事务提交顺序)对每个表 apply 变更,并反馈 apply 位置,用于监视复制延迟。
插入订阅表
insert into test01 values(100,'insert 1 条');
发布端修改订阅表时,在事务提交时,发布端依次发送下面的消息到订阅端
B(BEGIN)
R(RELATION)
I(INSERT)
C(COMMIT)
更新复制源状态表pg_replication_origin_status中的remote_lsn和local_lsn,该位点对应于每个订阅表最后一次事务提交的位置。
k(KEEPALIVE)
k(KEEPALIVE)
2个 keepalive 消息,会更新统计表中的位置:
发布端pg_stat_replication:write_lsn,flush_lsn,replay_lsn
pubdb=# select usesysid,usename,application_name,sent_lsn,write_lsn,flush_lsn,replay_lsn,sync_state from pg_stat_replication where usename = 'rep';
发布端pg_get_replication_slots():confirmed_flush_lsn
pubdb=# select slot_name,plugin,active_pid,restart_lsn,confirmed_flush_lsn from pg_get_replication_slots() where slot_name = 'sub1';
订阅端更新pg_stat_subscription:latest_end_lsn
subdb=# select subid,subname,received_lsn,latest_end_lsn from pg_stat_subscription;
可从多个上游服务器,做数据的聚集和合并
发布者跟订阅者的关系:一个发布者可以被多个订阅者订阅。多个发布者可以被同一个订阅者订阅。
PostgreSQL大版本升级,数据直接同步到高版本
pglogical 对 PostgreSQL 版本升级是一个很实用的工具。能实现以几乎为零的停机时间迁移和升级PostgreSQL。局限性在于pglogical支持的 PostgreSQL 版本。
将多个数据库实例的数据,同步到一个目标数据库。例如多个数据库同步到一个大的数据仓库
满足业务上需求,实现某些指定表的数据同步
因小编个人水平有限,专栏中难免存在错漏之处,请勿直接复制文档中的参数、命令或方法应用于线上环境中操作。