• 2.canal服务器配置及java客户端


    【README】

    1.本文总结自 B站《尚硅谷-canal》;

    2.canal 介绍,可以参考 GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

    3. canal服务器配置包括 mysql配置,canal配置等;

    4.mysql服务器,canal服务器,canal客户端架构如下


     【1】mysql binlog日志

    【1.1】定义

    1)binglog日志:mysql master节点可以开启biglog日志记录功能,开启后每次向mysql服务端发送写操作命令,会把命令记录在一种特殊的文件中这个特殊的文件称为biglog日志

    2)biglog日志作用:若服务器异常退出,借用binlog可以恢复数据!

    3)二进制日志包括两类文件:

    • 二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件;
    • 二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件;

    4)查看日志文件及日志索引文件

    1. [root@centos201 ~]# vim /etc/my.cnf
    2. // 内容如下;
    3. datadir=/var/lib/mysql
    4. socket=/var/lib/mysql/mysql.sock
    5. log-error=/var/log/mysqld.log
    6. pid-file=/var/run/mysqld/mysqld.pid
    7. bind-address=192.168.163.201
    8. log-bin=mysql-bin
    9. binlog_format=row
    10. binlog-do-db=trcanal

    显然日志文件目录为: /var/lib/mysql

    4.1)打开 /var/lib/mysql


     【1.2】binlog日志分类

    1)binlog日志有三种:

    • statement:语句级;
    • mixed:混合;
    • row:行级;

    在配置文件中可以选择配置  binlog_format= statement | mixed | row


    【1.2.1】statement-语句级binlog日志

    statement-语句级:binlog 会记录每次执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

    • ① 优点:节省空间。
    • ② 缺点:有可能造成数据不一致

    【1.2.2】row-行级binlog日志 (推荐作为日志增量解析的binlog日志类型)

    行级, binlog 会记录每次操作后每行记录的变化。

    • ① 优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
    • ② 缺点:占用较大空间。

    【1.2.3】mixed-混合级binlog日志

    statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理:

    • ① 优点:节省空间,同时兼顾了一定的一致性。
    • ② 缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。

    小结:综合上面对比,Canal 想做监控分析,选择 row 格式比较合适


    【2】canal服务器配置

    【2.1】mysql 开启binlog日志

    1)进入 mysql服务器的配置文件  /etc/my.cnf

    打开binlog日志开关,并设置日志类型为 row,如下:

    1. [mysqld]
    2. datadir=/var/lib/mysql
    3. socket=/var/lib/mysql/mysql.sock
    4. log-error=/var/log/mysqld.log
    5. pid-file=/var/run/mysqld/mysqld.pid
    6. bind-address=192.168.163.201 // 绑定的本机的ip地址(对外)
    7. log-bin=mysql-bin // 开启binlog日志
    8. binlog_format=row // 设置日志级别
    9. binlog-do-db=trcanal // 设置插入binlog日志的数据库,如果不设置,则所有数据库都要插入binlog日志

    2)如何证明mysql开启 binlog是否成功?

    1. [root@centos201 mysql]# pwd
    2. /var/lib/mysql

     3)创建用户:专门用于抽取日志的 用户canal,并赋权,如下(当然了,可以使用root):

    1. mysql> set global validate_password.length=4;
    2. mysql> set global validate_password.policy=0;
    3. mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    4. mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

    4)重启mysql服务生效  

    sudo systemctl restart mysqld


    【2.2】canal 服务器配置

    1)下载 canal

    到  Releases · alibaba/canal · GitHub

    2)解压 canal 压缩包,目录结构如下:

    1. [root@centos201 software]# ll canal/
    2. total 104648
    3. drwxr-xr-x. 2 root root 93 Sep 17 13:40 bin
    4. -rwxr-xr-x. 1 root root 107152758 Sep 17 02:42 canal.deployer-1.1.6.tar.gz
    5. drwxr-xr-x. 5 root root 123 Sep 17 13:06 conf
    6. drwxr-xr-x. 2 root root 4096 Sep 17 10:43 lib
    7. drwxrwxrwx. 4 root root 34 Sep 17 12:50 logs
    8. drwxrwxrwx. 2 root root 235 Aug 11 10:52 plugin

    3)修改 canal/conf/canal.properties

    canal服务器端口号默认为11111,服务器模式设置为tcp;(用于java客户端连接)

     4)修改 example/instance.properties 文件

    • example是canal服务器的一个实例;
    • canal服务器可以有多个实例,如example2;在 conf下新建文件夹 example2 即可
    1. [root@centos201 example]# pwd
    2. /usr/software/canal/conf/example
    3. [root@centos201 example]# vim instance.properties

    canal服务器的example实例 属性修改如下:

    5)启动canal服务器,模式为tcp ;接收socket客户端连接,如java的socket客户端;

    • 服务器地址: 192.168.163.201;
    • 端口:  11111 ;
    [root@centos201 canal]# bin/startup.sh 

      【小结】

    canal服务器配置修改完成,canal服务器启动成功; 


    【3】java版的canal客户端

    1)创建maven项目,引入 canal 依赖;

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.alibaba.ottergroupId>
    4. <artifactId>canal.clientartifactId>
    5. <version>1.1.2version>
    6. dependency>
    7. dependencies>

    2)java客户端:

    1. /**
    2. * @Description canal客户端
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月17日
    6. */
    7. public class MyCanalClient {
    8. public static void main(String[] args) throws Exception {
    9. // 获取canal服务的连接
    10. CanalConnector canalConnector =
    11. CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.163.201", 11111), "example", "", "");
    12. // 尝试读取服务端是否有新数据
    13. while (true) {
    14. // 连接
    15. canalConnector.connect();
    16. // 订阅数据库,监控数据库 trcanal所有表
    17. canalConnector.subscribe("trcanal.*");
    18. // 获取数据,每次获取100条
    19. Message message = canalConnector.get(100);
    20. // 获取 Entry 集合
    21. List entries = message.getEntries();
    22. // 判断集合是否为空,如果为空,则等待继续拉取
    23. if (entries == null || entries.isEmpty()) {
    24. System.out.println("没有数据,休息3s");
    25. Thread.sleep(5000);
    26. continue;
    27. }
    28. // 遍历 entries 单条解析
    29. for (CanalEntry.Entry entry : entries) {
    30. // 获取表名
    31. String tableName = entry.getHeader().getTableName();
    32. // 获取类型
    33. CanalEntry.EntryType entryType = entry.getEntryType();
    34. // 获取序列化后的数据
    35. ByteString storeValue = entry.getStoreValue();
    36. // 判断当前entryType类型是是否为 RowData 类型
    37. if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
    38. // 反序列化数据
    39. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
    40. // 获取当前事件的操作类型
    41. CanalEntry.EventType eventType = rowChange.getEventType();
    42. // 获取数据集
    43. List rowDatasList = rowChange.getRowDatasList();
    44. // 遍历并打印数据集
    45. for (CanalEntry.RowData rowData : rowDatasList) {
    46. // 获取修改前的数据
    47. JSONObject beforeData = new JSONObject();
    48. List beforeColumnsList = rowData.getBeforeColumnsList();
    49. for (CanalEntry.Column column : beforeColumnsList) {
    50. beforeData.put(column.getName(), column.getValue());
    51. }
    52. // 获取修改后的数据
    53. JSONObject afterData = new JSONObject();
    54. List afterColumnsList = rowData.getAfterColumnsList();
    55. for (CanalEntry.Column column : afterColumnsList) {
    56. afterData.put(column.getName(), column.getValue());
    57. }
    58. // 打印
    59. System.out.println("table = " + tableName + ", eventType=" + eventType + " before= " + beforeData + "after " + afterData);
    60. }
    61. }
    62. }
    63. }
    64. }
    65. }

    3)运行结果:

    3.1)插入数据,触发binlog日志;

    1. INSERT INTO trcanal.user_inf_tbl (id, name, sex) VALUES
    2. ('20220917_0026', 'zhangsan0026', 'male26')
    3. ;

    3.2)canal客户端打印日志:

    table = user_inf_tbl, eventType=INSERT before= {}after {"sex":"male26","name":"zhangsan0026","id":"20220917_0026"}

    【4】 canal服务器运维(如canal抽数失败):

    1)查看日志

    以 example 实例为例,查看它的运行日志,如下(example.log):

    1. [root@centos201 example]# pwd
    2. /usr/software/canal/logs/example
    3. [root@centos201 example]# ll
    4. total 1796
    5. -rw-r--r--. 1 root root 1797729 Sep 17 17:13 example.log
    6. -rw-r--r--. 1 root root 1399 Sep 17 17:10 meta.log

    2)查看 example.log

    1. Caused by: java.io.IOException: handshake exception:
    2. ErrorPacket [errorNumber=1129, fieldCount=-1, message=192.168.163.201' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts', sqlState=ost ', sqlStateMarker=H]
    3. at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:168) ~[canal.parse.driver-1.1.6.jar:na]
    4. at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82) ~[canal.parse.driver-1.1.6.jar:na]
    5. ... 4 common frames omitted

    message=192.168.163.201' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts' 

    【日志解释】

    • 大致说 192.168.163.201 被阻塞了,需要执行 mysqladmin flush-hosts 接触阻塞;
    • 在mysql上执行 FLUSH HOSTS; 即可。

     参考自: mysql - How to unblock with mysqladmin flush hosts - Stack Overflow


  • 相关阅读:
    生信初学者必知的镜像设置
    学生HTML个人网页作业作品 基于HTML+CSS+JavaScript明星个人主页(15页)
    FreeRTOS学习笔记-基于stm32(4)临界段代码保护与任务调度器的挂起和恢复
    APP中RN页面热更新流程-ReactNative源码分析
    【c ++ primer 笔记】第15章 面向对象程序设计
    zynq7000 从github拉取源码进行编译,运行. 快速进行外设验证
    瀑布流布局(CSS flex实现)
    (十二)Mybatis的缓存机制
    字节跳动数据库的过去、现状与未来
    随机数问题
  • 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126903380