• canal 服务安装


    简介:Canal 是阿里巴巴开源的一个基于 MySQL 数据库增量日志解析的中间件,用于提供准实时的数据同步功能。

    准备工作

    1.修改配置文件 

    ,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

    1. [mysqld]
    2. log-bin=mysql-bin # 开启 binlog
    3. binlog-format=ROW # 选择 ROW 模式
    4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
     2.授权

    canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    1. CREATE USER canal IDENTIFIED BY 'canal';
    2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    4. FLUSH PRIVILEGES;

    1.下载与安装

    首先从 GitHub 上下载 Canal ,解压后根据官方文档配置 Canal。

    我这里下载的快照版,

    直接在windows上解压,

    在lib目录下添加如下jar

    2.配置 Canal

    修改 conf/example/instance.properties 文件,主要配置项包括 MySQL 的地址、端口、用户名、密码以及要监听的数据库和表等。

    这里是本地部署,可不做修改


    3.启动 Canal Server

    通过命令行启动 Canal Server,如 startup.bat example,这里的 example 是实例名称,对应配置文件夹下的配置文件名。

    执行 startup.bat example 命令后,logs/canal/canal.log 打印如下日志证明启动成功;

     4.测试

    1.导入依赖:
    1. <dependency>
    2. <groupId>com.alibaba.ottergroupId>
    3. <artifactId>canal.clientartifactId>
    4. <version>1.1.4version>
    5. dependency>
     2.复制示例代码启动
    1. public static void main(String args[]) {
    2. // 创建链接
    3. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
    4. 11111), "example", "canal", "canal");
    5. int batchSize = 1000;
    6. int emptyCount = 0;
    7. try {
    8. connector.connect();
    9. connector.subscribe(".*\\..*");
    10. connector.rollback();
    11. int totalEmptyCount = 120;
    12. while (emptyCount < totalEmptyCount) {
    13. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
    14. long batchId = message.getId();
    15. int size = message.getEntries().size();
    16. if (batchId == -1 || size == 0) {
    17. emptyCount++;
    18. System.out.println("empty count : " + emptyCount);
    19. try {
    20. Thread.sleep(1000);
    21. } catch (InterruptedException e) {
    22. }
    23. } else {
    24. emptyCount = 0;
    25. // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
    26. printEntry(message.getEntries());
    27. }
    28. connector.ack(batchId); // 提交确认
    29. // connector.rollback(batchId); // 处理失败, 回滚数据
    30. }
    31. System.out.println("empty too many times, exit");
    32. } finally {
    33. connector.disconnect();
    34. }
    35. }
    36. private static void printEntry(List entrys) {
    37. for (CanalEntry.Entry entry : entrys) {
    38. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
    39. continue;
    40. }
    41. CanalEntry.RowChange rowChage = null;
    42. try {
    43. rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    44. } catch (Exception e) {
    45. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
    46. e);
    47. }
    48. CanalEntry.EventType eventType = rowChage.getEventType();
    49. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
    50. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    51. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    52. eventType));
    53. for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
    54. if (eventType == CanalEntry.EventType.DELETE) {
    55. printColumn(rowData.getBeforeColumnsList());
    56. } else if (eventType == CanalEntry.EventType.INSERT) {
    57. printColumn(rowData.getAfterColumnsList());
    58. } else {
    59. System.out.println("------->修改前");
    60. printColumn(rowData.getBeforeColumnsList());
    61. System.out.println("-------> 修改后");
    62. printColumn(rowData.getAfterColumnsList());
    63. }
    64. }
    65. }
    66. }
    67. private static void printColumn(List columns) {
    68. for (CanalEntry.Column column : columns) {
    69. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    70. }
    71. }
    3.在源数据库的表中修改数据

    在控制台会出现如下效果:

  • 相关阅读:
    go-GC垃圾回收
    成都 | 转行软件测试,从零收入到月薪过万,人生迎来新转折...
    Scratch二次开发8:背景、角色、造型、声音后台管理
    CTF-PWN-[ZJCTF 2019]Login 栈位置的转换跟踪
    Java --- IO流
    mysql 问题解决 3
    为什么标准AR HUD的FOV必须在10°×3°以上|技术科普
    tegra nvidia agx xaiver 系统开机自动启动风扇配置方法
    [附源码]计算机毕业设计springboot校园生活服务平台
    中英文说明书丨CalBioreagents艾美捷UR-144单克隆抗体
  • 原文地址:https://blog.csdn.net/qq_21299835/article/details/128877800