• PostgreSQL 逻辑复制模块(二)


    前言

    本文主要介绍PostgreSQL 逻辑复制社区插件原理与功能、逻辑订阅处理流程解析、适应场景,了解PostgreSQL 逻辑复制家族、核心技术、同步过程及同步原理等等,点击PostgreSQL 逻辑复制模块(一)

    4 社区插件的原理及功能

    4.1 test_decoding plugin

    test_decoding是Postgres现有的一个plugin,它的主要作用是将筛选过后的wal日志,转化为人们可以理解的形式。现在PG内部有两种方法可以使用plugin。如下是对这两种情况的简单说明。

    官方文档:https://www.postgresql.org/docs/12/logicaldecoding-example.htm

    4.1.1 内置SQL函数使用

    --创建复制槽
    test=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
      slot_name    |   lsn    
    -----------------+-----------
    regression_slot | 0/16569D8
    (1 row)
    
    -- 查看复制槽
    test=# select * from pg_replication_slots ;
    
    --插入一条数据到test01表中
    test=# insert into test01 values(1,'test_decoding use by SQL');
    INSERT 0 1
    
    --查看解析的sql
    test=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
      lsn     | xid  |                                             data                                              
    ------------+------+-----------------------------------------------------------------------------------------------
    0/340001F8 | 1673 | BEGIN 1673
    0/340001F8 | 1673 | table public.test01: INSERT: id[integer]:1 name[character varying]:'test_decoding use by SQL'
    0/34000628 | 1673 | COMMIT 1673
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在这里插入图片描述
    test_decoding 原理图

    4.1.2 pg_recvlogical使用

    文档地址:http://www.postgres.cn/docs/12/app-pgrecvlogical.html

    # 在test库中创建默认test_decoding类型的复制槽 test2
    [postgres@bogon ~]$ pg_recvlogical -d test --slot=test2 --create-slot
    
    # 输出逻辑解码的信息到当前目录下的 ld.out 文件中
    [postgres@bogon ~]$ pg_recvlogical -d test --slot=test2 --start -f ld.out &
    [1] 19807
    
    #从test库中插入一条数据
    [postgres@bogon ~]$ psql -d test -c "insert into test01 values(2,'test_decoding use by pg_recvlogical');" 
    INSERT 0 1
    
    #查看ld.out 中的信息
    [postgres@bogon ~]$ cat ld.out 
    BEGIN 1674
    table public.test01: INSERT: id[integer]:2 name[character varying]:'test_decoding use by pg_recvlogical'
    COMMIT 1674
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述
    pg_recvlogical 命令流程图

    4.2 wal2json plugin

    wal2json plugin 是 test_decoding 的一个升级版,它优化了输出结果的结构,使之更容易被应用,输出的是 json 格式的数据。

    # 在test库中创建wal2json类型的复制槽 test_slot_wal2json
    [postgres@bogon ~]$ pg_recvlogical -d test --slot test_slot_wal2json --create-slot -P wal2json
    
    # 输出逻辑解码的信息到当前目录下的 ld_wal2json.out 文件中
    [postgres@bogon ~]$ pg_recvlogical -d test --slot test_slot_wal2json --start -o pretty-print=1 -f ld_wal2json.out &
    
    #从test库中插入一条数据
    [postgres@bogon ~]$ psql -d test -c "insert into test01 values(2,'wal2json use by pg_recvlogical');" 
    INSERT 0 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    查看 ld_wal2json.out 结果

    {
    "change": [
    {
    "kind": "insert",
    "schema": "public",
    "table": "test01",
    "columnnames": ["id", "name"],
    "columntypes": ["integer", "character varying"],
    "columnvalues": [2, "wal2json use by pg_recvlogical"]
    }
    ]
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4.3 logical replication

    pger 可以使用test_decoding去认识逻辑复制,但是并不能真正的应用test_decoding来做什么,除非在test_decoding的基础上写代码去实现它的后续。logical replication真正的让用户可以在Postgres上体验另外一种不同的数据同步方式。换句话说,用户无法直接使用test_decoding功能用于生产环境,除非你使用第三方插件才能使用test_decoding完成数据同步的功能,而logical replication使得logical decoding技术更容易的用于生产环境。

    4.4 pglogical plugin

    pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为 PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布/订阅模型复制数据以进行选择性复制的一种高效方法。提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。

    pglogical和logical replicate的原理是相同的,只不过pglogical有更加强大的冲突处理能力。

    pglogical 文档:
    
    https://www.2ndquadrant.com/en/resources/pglogical/pglogical-docs/
    
    pglogical下载地址:
    
    http://packages.2ndquadrant.com/pglogical/tarballs/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4.5 其他逻辑解码插件

    • pgoutput plugin

    标准的 pg12 默认的逻辑解码插件。

    • decoderbufs

    一个PostgreSQL逻辑解码器输出插件,用于将数据作为协议缓冲区传送。

    • decoder_raw

    重新构造应用更改的查询。

    • ali_decoding

    阿里巴巴研发的开源的解码插件。

    • BDR

    专门设计用于分布在不同地理位置的集群的双向复制,区别与使用触发器进行双向复制的如SymmetricDS 的双主数据库。如果一个特定的数据集只在一个节点上修改,BDR工作的最好。BDR支持地理上分布的集群,不受距离的限制,并包含了地理围栏的能力。它的设计目的是最小化节点间的延迟。

    5 Logical Replication 逻辑订阅处理流程解析

    5.1 表同步阶段处理流程概述

    订阅端执行CREATE SUBSCRIPTION后,在后台进行表数据同步。每个表的数据同步状态记录在pg_subscription_rel.srsubstate中,一共有4种状态码。

    ‘i’:初始化(SUBREL_STATE_INIT)

    ‘d’:正在copy数据(SUBREL_STATE_DATASYNC)

    ‘s’:已同步(SUBREL_STATE_SYNCDONE)

    ‘r’:准备好 (普通复制)(SUBREL_STATE_READY)

    subdb=# select * from pg_subscription_rel;
    srsubid | srrelid | srsubstate | srsublsn  
    ---------+---------+------------+------------
      29912 |   29904 | r          | 0/3502AA90
    (1 row)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    从执行CREATE SUBSCRIPTION开始订阅端的相关处理流程概述如下:

    1. 设置每个表的为srsubstate中 ‘i’(SUBREL_STATE_INIT);

    2. logical replication launcher 进程启动一个 logical replication apply worker 进程;

    3. logical replication apply worker 进程连接到订阅端开始接受订阅消息,此时表尚未完成初始同步(状态为 i 或 d ),跳过所有 insert、update 和 delete 消息的处理;

    4. logical replication apply worker 进程为每个未同步的表启动 logical replication sync worker 进程(每个订阅最多同时启动max_sync_workers_per_subscription个 sync worker);

    5. logical replication sync worker 进程连接到订阅端并同步初始数据;

      创建临时复制槽,并记录快照位置

      设置表同步状态为 ‘d’(SUBREL_STATE_DATASYNC)

      copy 表数据

      设置表同步状态为SUBREL_STATE_SYNCWAIT(内部状态),并等待 apply worker 更新状态为SUBREL_STATE_CATCHUP(内部状态)

    6. logical replication apply worker 进程更新表同步状态为SUBREL_STATE_CATCHUP(内部状态),记录最新lsn,并等待 sync worker 更新状态为SUBREL_STATE_SYNCDONE;

    7. logical replication sync worker 进程完成初始数据同步;

      检查 apply worker 当前处理的订阅消息位置是否已经走到了快照位置前面,如果是从订阅端接受消息并处理直到追上 apply worker

      设置表同步状态为 ‘s’(SUBREL_STATE_SYNCDONE)

      进程退出

    8. logical replication apply worker 进程继续接受订阅消息并处理。

      接受到 insert、update 和 delete 消息,如果是同步点(进入’s’ 或 ‘r’ 状态时的 lsn 位置)之后的消息进行应用

      接受到 commit 消息

      暂时没有新的消息处理

    1.向发布端发送订阅位置反馈

    2.如果不在事务块里,同步表状态。将所有处于 ‘s’(SUBREL_STATE_SYNCDONE)同步状态的表更新为 ‘r’(SUBREL_STATE_READY)

    5.2 表同步后的持续逻辑复制

    订阅表进入同步状态(状态码是 ‘s’ 或 ‘r’ )后,发布端的变更都会通过消息通知订阅端;订阅端 apply worker 按照订阅消息的接受顺序(即发布端事务提交顺序)对每个表 apply 变更,并反馈 apply 位置,用于监视复制延迟。

    插入订阅表

    insert into test01 values(100,'insert 1 条');
    
    • 1

    发布端修改订阅表时,在事务提交时,发布端依次发送下面的消息到订阅端

    • B(BEGIN)

    • R(RELATION)

    • I(INSERT)

    • C(COMMIT)

      更新复制源状态表pg_replication_origin_status中的remote_lsn和local_lsn,该位点对应于每个订阅表最后一次事务提交的位置。

    • k(KEEPALIVE)

    • k(KEEPALIVE)

      2个 keepalive 消息,会更新统计表中的位置:

    发布端pg_stat_replication:write_lsn,flush_lsn,replay_lsn

    pubdb=# select usesysid,usename,application_name,sent_lsn,write_lsn,flush_lsn,replay_lsn,sync_state from pg_stat_replication where usename = 'rep';
    
    • 1

    发布端pg_get_replication_slots():confirmed_flush_lsn

    pubdb=# select slot_name,plugin,active_pid,restart_lsn,confirmed_flush_lsn from pg_get_replication_slots() where slot_name = 'sub1';
    
    • 1

    订阅端更新pg_stat_subscription:latest_end_lsn

    subdb=# select subid,subname,received_lsn,latest_end_lsn from pg_stat_subscription;
    
    • 1

    5.3 异常处理

    5.3.1 sync worker 进程

    1. SQL错误(如主键冲突):worker 进程异常退出,之后 apply worker 进程创建一个新的 sync worker 重试。错误解除前每5秒重试一次
    2. 表被锁:等待
    3. 更新或删除的记录不存在:正常执行,检测不到错误,也么没有日志输出(输出一条DEBUG1级别的日志)

    5.3.2 apply worker 进程

    1. SQL错误(如主键冲突):apply worker 进程异常退出,之后 logical replication launcher 进程创建一个新的apply worker 重试。错误解除前每5秒重试一次
    2. 表被锁:等待
    3. 更新或删除的记录不存在:正常执行,检测不到错误,也么没有日志输出(输出一条DEBUG1级别的日志)。

    6 逻辑复制适用场景

    • 可从多个上游服务器,做数据的聚集和合并

      发布者跟订阅者的关系:一个发布者可以被多个订阅者订阅。多个发布者可以被同一个订阅者订阅。

    • PostgreSQL大版本升级,数据直接同步到高版本

      pglogical 对 PostgreSQL 版本升级是一个很实用的工具。能实现以几乎为零的停机时间迁移和升级PostgreSQL。局限性在于pglogical支持的 PostgreSQL 版本。

    • 将多个数据库实例的数据,同步到一个目标数据库。例如多个数据库同步到一个大的数据仓库

    • 满足业务上需求,实现某些指定表的数据同步

    7 逻辑复制注意事项

    • 发布节点的WAL_LEVEL参数需要设置成LOGICAL
    • 发布节点上逻辑复制用户至少需要REPLICATION角色权限
    • 支持一次发布一个数据库中的所有表
    • 发布节点上需要发布的表如果需要将UPDATE/DELETE操作同步到订阅节点,需要给发布配置复制标识(复制标识默认为主键,如果没有主键也可以是唯一索引)
    • 发布表上的DDL操作不会自动同步到订阅节点,如果发布节点上发布的表执行了DDL操作,需手工给订阅节点的相应表执行DDL,之前如果有没有同步的数据会自动同步
    • 当发布者添加新表时,订阅者不能自动的获知,需要将表的SELECT权限赋给逻辑复制用户 (发布端执行 )
    • 订阅者列多于发布者(订阅者的列包含发布者的列)时,发布者插入的数据会同步到订阅者,多的字段内容为 NULL
    • 建立逻辑复制后,由于冲突或者表结构变更,导致逻辑复制关系挂起后,通过解决冲突和问题后,逻辑复制关系会恢复,并会同步此时发布者的数据
    • 发布者表结构变更后,插入数据(1、字段为之前数据类型,2、字段为变更后的数据类型),逻辑复制挂起,均不能进行数据同步,当解决冲突后,逻辑复制恢复,同步数据

    声明

    因小编个人水平有限,专栏中难免存在错漏之处,请勿直接复制文档中的参数、命令或方法应用于线上环境中操作。

    近期文章推荐

    PostgreSQL 逻辑复制模块(一)
    PostgreSQL体系结构(上)
    PostgreSQL体系结构(下)

  • 相关阅读:
    深度学习-序列模型
    第二章 第二十四节:文件操作:写
    基于Java+SpringBoot+Vue摄影分享网站的设计与实现 前后端分离【Java毕业设计·文档报告·代码讲解·安装调试】
    SRVCC关键场景及Log分析
    ​无需测试环境!如何利用测试脚手架隔离微服务,实现功能自动化
    火伞云Web应用防火墙的特点与优势
    2022.11.24
    C. Swap Game(简单博弈)
    JDY-31蓝牙模块(四针头)与USB转串口模块的连接和JDY-31蓝牙模块与手机蓝牙调试器频繁断开问题
    redis非关系型数据库
  • 原文地址:https://blog.csdn.net/weixin_45320660/article/details/126866294