• MogDB逻辑解码与pg_recvlogical


    MogDB逻辑解码与pg_recvlogical

    概述

    谈到逻辑解码需要先从流复制开始说起。流复制的最重要的一个用途就是实现数据库的热备,数据库的主备同步方式是物理级别的数据同步。但实际应用场景中仅仅通过物理赋值是无法满足业务需求的,因此提供了逻辑复制的功能。

    逻辑复制主要解决了以物理赋值无法解决的一些问题,例如:

    • 指定库或部分表的复制需求

    • 将多个数据库实例的数据汇聚到同一个目标库

    • 将一个库的数据分发到多个不同的库

    • 不同的版本之间的复制

    • 不同库名之间的表同步

    逻辑复制的关键是将WAL日志的内容进行逻辑解码成特定的格式,如json,SQL等。pg_recvlogical 客户端工具就是逻辑解码的一种典型应用,它将WAL日志解码为json格式,保存在指定文件或标准输出stdout中。

    逻辑复制约束

    需要修改wal_level,在MogDB中wal_level有如下取值:

    • minimal
      • 优点:一些重要操作(包括创建表、创建索引、簇操作和表的复制)都能安全的跳过,这样就可以使操作变得更快。
      • 缺点:WAL仅提供从数据库服务器崩溃或者紧急关闭状态恢复时所需要的基本信息,无法用WAL归档日志恢复数据。
    • archive
      • 这个参数增加了WAL归档需要的日志信息,从而可以支持数据库的归档恢复。
    • hot_standby
      • 这个参数进一步增加了在备机上运行的SQL查询的信息,这个参数只能在数据库服务重新启动后生效。
      • 为了在备机上开启只读查询,wal_level必须在主机上设置成hot_standby ,并且备机必须打开hot_standby参数。
    • logical
      • 这个参数表示WAL日志支持逻辑复制。
        默认值: minimal

    须知

    • 如果需要启用WAL日志归档和主备机的数据流复制,必须将此参数设置为archive或者hot_standby。
    • 如果此参数设置为minimal,archive_mode必须设置为off,hot_standby必须设置为off,max_wal_senders参数设置为0,且需为单机环境,否则将导致数据库无法启动。
    • 如果此参数设置为archive,hot_standby必须设置为off,否则将导致数据库无法启动。但是,hot_standby在双机环境中不能设置为off,具体参见hot_standby参数说明。

    注意:

    • 与PostgreSQL的wal_level略有不同,PostgreSQL只包括3种级别:minimal, replica, or logical
    • PostgreSQL中wal_level的默认值是replica,它会写入足够的数据以支持WAL归档和复制,包括在后备服务器上运行只读查询。minimal会去掉除从崩溃或者立即关机中进行恢复所需的信息之外的所有记录。最后,logical会增加支持逻辑解码所需的信息。每个层次包括所有更低层次记录的信息。这个参数只能在服务器启动时设置。

    postgres.conf配置

    wal_level = logical                     # minimal, archive, hot_standby or logical
                                            # (change requires restart)
    max_wal_senders = 10            # max number of walsender processes
                                    # (change requires restart)
    wal_keep_segments = 16          # in logfile segments, 16MB each; 0 disables
    max_replication_slots = 8       # max number of replication slots.i
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注:修改配置后需要重启MogDB

    创建replication用户

    MogDB=#CREATE ROLE pub_sub_user WITH SYSADMIN REPLICATION LOGIN PASSWORD 'pub_sub@123';
    NOTICE:  The encrypted password contains MD5 ciphertext, which is not secure.
    CREATE ROLE
    MogDB=#
    
    • 1
    • 2
    • 3
    • 4

    注:需要有SYSADMIN 和REPLICATION 权限

    pg_hba.conf配置

    # replication privilege.
    #local   replication     omm                                trust
    #host    replication     omm        127.0.0.1/32            trust
    #host    replication     omm        ::1/128                 trust
    
    host all all 0.0.0.0/0 md5
    host replication all 0.0.0.0/0 md5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    场景限制

    • 目前不支持DDL解析,只能解析DML(INSERT、UPDATE、DELETE,TRUNCATE);
    • TEMPORARY表和UNLOGGED表不会被复制;
    • 表必须有主键或唯一约束,否则像update或delete操作无法被复制;
    • 序列不被复制;
    • 大对象不被复制;
    • 新增加的表,不会自动加入订阅,需要在订阅端进行刷新;

    pg_recvlogical工作原理

    pg_recvlogical可以作为观察wal日志具体变化的工具。

    整体架构

    image.png

    如上图:

    1. pg_recvlogical通过libpq与MogDB server建立链接
    2. 通过CREATE_REPLICATION_SLOT 建立逻辑复制槽
    3. 运行START_REPLICATION ,循环处理wal流,并进行解码
    4. 将解码数据写入文件或者在stdout屏幕数据
    5. pg_logical_slot_get_changes、pg_logical_slot_peek_changes可以查看修改情况
    6. DROP_REPLICATION_SLOT 销毁复制槽

    pg_recvlogical主流程

    image.png

    • 设置插件
      • pg_recvlogical默认使用 mppdb_decoding
    plugin = pg_strdup("mppdb_decoding");
    
    • 1
    • pg_recvlogical也可以通过命令行参数 --plugin 设置插件 如:–plugin = wal2json

    image.png

    通过ouput/decode插件处理pg_recvlogical可以将wal变更内容输出为特定格式(TXT、JISON)。

    • 命令行处理
    pg_recvlogical receives logical change stream.
    
    Usage:
      pg_recvlogical [OPTION]...
    
    Options:
      -f, --file=FILE        receive log into this file. - for stdout
      -n, --no-loop          do not loop on connection lost
      -v, --verbose          output verbose messages
      -V, --version          output version information, then exit
      -?, --help             show this help, then exit
    
    Connection options:
      -d, --dbname=DBNAME    database to connect to
      -h, --host=HOSTNAME    database server host or socket directory
      -p, --port=PORT        database server port number
      -U, --username=NAME    connect as specified database user
      -w, --no-password      never prompt for password
      -W, --password         force password prompt (should happen automatically)
    
    Replication options:
      -F  --fsync-interval=INTERVAL
                             frequency of syncs to the output file (in seconds, defaults to 10)
      -o, --option=NAME[=VALUE]
                             Specify option NAME with optional value VAL, to be passed
                             to the output plugin
      -P, --plugin=PLUGIN    use output plugin PLUGIN (defaults to mppdb_decoding)
      -s, --status-interval=INTERVAL
                             time between status packets sent to server (in seconds, defaults to 10)
      -S, --slot=SLOT        use existing replication slot SLOT instead of starting a new one
      -I, --startpos=PTR     Where in an existing slot should the streaming start
      -r, --raw              parallel decoding output raw results without converting to text format
    
    Action to be performed:
          --create           create a new replication slot (for the slotname see --slot)
          --start            start streaming in a replication slot (for the slotname see --slot)
          --drop             drop the replication slot (for the slotname see --slot)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • CREATE_REPLICATION_SLOT
    • DROP_REPLICATION_SLOT
    • START_REPLICATION

    以上3个命令通过词法和语法解析src/backend/replication/repl_scanner.lsrc/gausskernel/storage/replication/repl_gram.y

    image.png

    /* CREATE_REPLICATION_SLOT SLOT slot [%X/%X] */
     create_replication_slot:
    			/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL [init_slot_lsn] */
     			K_CREATE_REPLICATION_SLOT IDENT K_PHYSICAL RECPTR
     				{
     					CreateReplicationSlotCmd *cmd;
     					cmd = makeNode(CreateReplicationSlotCmd);
     					cmd->kind = REPLICATION_KIND_PHYSICAL;
     					cmd->slotname = $2;
     					cmd->init_slot_lsn = $4;
     					$$ = (Node *) cmd;
     				}
    			/* CREATE_REPLICATION_SLOT slot LOGICAL plugin */
    			| K_CREATE_REPLICATION_SLOT IDENT K_LOGICAL IDENT
     				{
    					CreateReplicationSlotCmd *cmd;
    					cmd = makeNode(CreateReplicationSlotCmd);
    					cmd->kind = REPLICATION_KIND_LOGICAL;
    					cmd->slotname = $2;
    					cmd->plugin = $4;
    					$$ = (Node *) cmd;
     				}
     			;
     
     /* DROP_REPLICATION_SLOT slot */
     drop_replication_slot:
     			K_DROP_REPLICATION_SLOT IDENT
     				{
     					DropReplicationSlotCmd *cmd;
     					cmd = makeNode(DropReplicationSlotCmd);
     					cmd->slotname = $2;
     					$$ = (Node *) cmd;
     				}
     			;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 加载插件

    output插件都要实现_PG_output_plugin_init 函数,和必要的回调函数。

    image.png

    回调函数集合:

    /*
     * Output plugin callbacks
     */
    typedef struct OutputPluginCallbacks {
        LogicalDecodeStartupCB startup_cb;
        LogicalDecodeBeginCB begin_cb;
        LogicalDecodeChangeCB change_cb;
        LogicalDecodeCommitCB commit_cb;
        LogicalDecodeAbortCB abort_cb;
        LogicalDecodePrepareCB prepare_cb;
        LogicalDecodeShutdownCB shutdown_cb;
        LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    } OutputPluginCallbacks;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    主要的几个回调函数包括(mppdb_decoding为例):

    • pg_decode_begin_txn --开始事务
    • pg_decode_change --增、删、改、truncate
    • pg_decode_commit_txn --提交事务

    演示

    创建复制槽

     pg_recvlogical --create -S test_slot -d postgres
    
    • 1

    image.png

    启动recvlogical

    pg_recvlogical --start -S test_slot -d postgres -f -
    -- “-f -” 表示输出到stdout(屏幕)
    
    • 1
    • 2

    image.png

    注:pg_recvlogical会不停地每个5s检查复制槽是否有更新

    插入数据

    MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
     location | xid | data
    ----------+-----+------
    (0 rows)
    
    MogDB=#insert into pub_sub (name) values('test pg_recvlogical 01');
    INSERT 0 1
    MogDB=#select * from pg_logical_slot_get_changes('test_slot', NULL, NULL);
     location  |  xid  |                                                                                                                 data
    
    -----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
    --------------------------------------------------------------------------------------
     0/48C60D8 | 63062 | BEGIN 63062
     0/48C60D8 | 63062 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["23"
    ,"'test pg_recvlogical 01'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
     0/48C62B8 | 63062 | COMMIT 63062 (at 2022-09-01 11:14:24.539429+00) CSN 47645
    (3 rows)
    
    MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
     location | xid | data
    ----------+-----+------
    (0 rows)
    
    MogDB=#insert into pub_sub (name) values('test pg_recvlogical 02');
    INSERT 0 1
    MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
    ERROR:  replication slot "test_slot" is already active
    MogDB=#select * from pg_logical_slot_peek_changes('test_slot', NULL, NULL);
     location  |  xid  |                                                                                                                 data
    
    -----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
    --------------------------------------------------------------------------------------
     0/48C6548 | 63063 | BEGIN 63063
     0/48C6548 | 63063 | {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24"
    ,"'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
     0/48C6728 | 63063 | COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646
    (3 rows)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    image.png

    pg_recvlogical输出如下

    BEGIN 63063
    {"table_name":"public.pub_sub","op_type":"INSERT","columns_name":["i","name"],"columns_type":["integer","character varying"],"columns_val":["24","'test pg_recvlogical 02'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
    COMMIT 63063 (at 2022-09-01 11:15:09.292569+00) CSN 47646
    
    
    • 1
    • 2
    • 3
    • 4

    因为pg_recvlogical默认使用mppdb_decoding插件,mppdb_decoding输出格式为json。

    格式化后的json:

    image.png

    销毁复制槽

    pg_recvlogical --drop -S test_slot -d postgres
    
    • 1

    image.png

    换插件

    • wal2json
    $ pg_recvlogical --create -S test_slot -d postgres --plugin=wal2json
    $ pg_recvlogical --start -S test_slot -d postgres -f -
    
    • 1
    • 2

    wal2json 没有显示BEGIN和COMMIT

    image.png

    警告:有概率会报一下错误,疑似BUG。

    image.png

    • pgoutput
    $ pg_recvlogical --create -S test_slot -d postgres --plugin=pgoutput
    $ pg_recvlogical --start -S test_slot -d postgres -f -
    
    • 1
    • 2

    更换为pgoutput 后回报版本错误:FATAL: client sent proto_version=0 but we only support protocol 1 or higher

    image.png

    SQL方式演示

    MogDB=#SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'mppdb_decoding');
        slotname     | xlog_position
    -----------------+---------------
     regression_slot | 0/48D3488
    (1 row)
    
    
    MogDB=#select * from pg_replication_slots;
        slot_name    |     plugin     | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn | dummy_standby
    -----------------+----------------+-----------+--------+----------+--------+------+--------------+-------------+---------------
     user_sub        | pgoutput       | logical   |  15016 | postgres | t      |      |        63071 | 0/48D3438   | f
     wal2json        | wal2json       | logical   |  15016 | postgres | f      |      |        61212 | 0/40EBA88   | f
     test_slot       | mppdb_decoding | logical   |  15016 | postgres | f      |      |        61212 | 0/48C9A90   | f
     regression_slot | mppdb_decoding | logical   |  15016 | postgres | f      |      |        63071 | 0/48D3408   | f
    (4 rows)
    
    MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     location | xid | data
    ----------+-----+------
    (0 rows)
    
    MogDB=#CREATE TABLE data(id serial primary key, data text);
    NOTICE:  CREATE TABLE will create implicit sequence "data_id_seq" for serial column "data.id"
    NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "data_pkey" for table "data"
    CREATE TABLE
    
    
    MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     location  |  xid  |                           data
    -----------+-------+-----------------------------------------------------------
     0/48C9D30 | 63070 | BEGIN 63070
     0/48D32E8 | 63070 | COMMIT 63070 (at 2022-09-01 11:51:40.602993+00) CSN 47650
    (2 rows)
    
    MogDB=#BEGIN;
    BEGIN
    MogDB=#INSERT INTO data(data) VALUES('1');
    INSERT 0 1
    MogDB=#INSERT INTO data(data) VALUES('2');
    INSERT 0 1
    MogDB=#COMMIT
    #;
    COMMIT
    MogDB=#SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     location  |  xid  |                                                                                               data
    
    -----------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------
    -------------------------------------------------
     0/48E0CD0 | 63073 | BEGIN 63073
     0/48E0CD0 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["1","'1'"],"old_key
    s_name":[],"old_keys_type":[],"old_keys_val":[]}
     0/48E1090 | 63073 | {"table_name":"public.data","op_type":"INSERT","columns_name":["id","data"],"columns_type":["integer","text"],"columns_val":["2","'2'"],"old_key
    s_name":[],"old_keys_type":[],"old_keys_val":[]}
     0/48E11E0 | 63073 | COMMIT 63073 (at 2022-09-01 11:58:19.799037+00) CSN 47653
    (4 rows)
    
    
    MogDB=#SELECT pg_drop_replication_slot('regression_slot');
     pg_drop_replication_slot
    --------------------------
    
    (1 row)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    总结

    逻辑复制/解码相对于物理复制更加灵活,也可以根据实际业务需要开发对应的逻辑解码插件,甚至可以当做ETL来使用。总的来说逻辑解码是对用户非常友好的接口。以上通过对MogDB逻辑复制、逻辑解码的原理和部分代码进行分析,利用pg_recvlogical和SQL演示逻辑解码的过程,希望对大家理解MogDB逻辑解码有所帮助。

  • 相关阅读:
    【微服务容器化】第五章-Dockerfile
    华宇商业解析/链动商业模式框架,如何合理规避电商风险?
    简易图像处理器的设计
    甘露糖-聚乙二醇-羧酸|mannose-PEG-COOH|羧酸-PEG-甘露糖
    金仓数据库KingbaseES接口协议解析工具使用指南(3. 解析工具的使用)
    Elasticsearch语法知多少之Template
    python核心编程|一本书,玩转python!让你从入门到大牛!
    java的反应式流
    C++学习——C++函数的编译、成员函数的调用、this指针详解
    golang入门笔记——pprof性能分析
  • 原文地址:https://blog.csdn.net/xk_xx/article/details/126665061