1.本文总结自 B站《尚硅谷-canal》;
2.canal 介绍,可以参考 GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
3. canal服务器配置包括 mysql配置,canal配置等;
4.mysql服务器,canal服务器,canal客户端架构如下:

1)binglog日志:mysql master节点可以开启biglog日志记录功能,开启后每次向mysql服务端发送写操作命令,会把命令记录在一种特殊的文件中,这个特殊的文件称为biglog日志。
2)biglog日志作用:若服务器异常退出,借用binlog可以恢复数据!
3)二进制日志包括两类文件:
4)查看日志文件及日志索引文件
- [root@centos201 ~]# vim /etc/my.cnf
-
- // 内容如下;
- datadir=/var/lib/mysql
- socket=/var/lib/mysql/mysql.sock
-
- log-error=/var/log/mysqld.log
- pid-file=/var/run/mysqld/mysqld.pid
- bind-address=192.168.163.201
- log-bin=mysql-bin
- binlog_format=row
- binlog-do-db=trcanal
显然日志文件目录为: /var/lib/mysql
4.1)打开 /var/lib/mysql

1)binlog日志有三种:
在配置文件中可以选择配置 binlog_format= statement | mixed | row
statement-语句级:binlog 会记录每次执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
行级, binlog 会记录每次操作后每行记录的变化。
statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理:
小结:综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。
1)进入 mysql服务器的配置文件 /etc/my.cnf
打开binlog日志开关,并设置日志类型为 row,如下:
- [mysqld]
- datadir=/var/lib/mysql
- socket=/var/lib/mysql/mysql.sock
-
- log-error=/var/log/mysqld.log
- pid-file=/var/run/mysqld/mysqld.pid
- bind-address=192.168.163.201 // 绑定的本机的ip地址(对外)
- log-bin=mysql-bin // 开启binlog日志
- binlog_format=row // 设置日志级别
- binlog-do-db=trcanal // 设置插入binlog日志的数据库,如果不设置,则所有数据库都要插入binlog日志
2)如何证明mysql开启 binlog是否成功?
- [root@centos201 mysql]# pwd
- /var/lib/mysql

3)创建用户:专门用于抽取日志的 用户canal,并赋权,如下(当然了,可以使用root):
- mysql> set global validate_password.length=4;
- mysql> set global validate_password.policy=0;
- mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
- mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
4)重启mysql服务生效
sudo systemctl restart mysqld
1)下载 canal
到 Releases · alibaba/canal · GitHub

2)解压 canal 压缩包,目录结构如下:
- [root@centos201 software]# ll canal/
- total 104648
- drwxr-xr-x. 2 root root 93 Sep 17 13:40 bin
- -rwxr-xr-x. 1 root root 107152758 Sep 17 02:42 canal.deployer-1.1.6.tar.gz
- drwxr-xr-x. 5 root root 123 Sep 17 13:06 conf
- drwxr-xr-x. 2 root root 4096 Sep 17 10:43 lib
- drwxrwxrwx. 4 root root 34 Sep 17 12:50 logs
- drwxrwxrwx. 2 root root 235 Aug 11 10:52 plugin
3)修改 canal/conf/canal.properties
canal服务器端口号默认为11111,服务器模式设置为tcp;(用于java客户端连接)

4)修改 example/instance.properties 文件
- [root@centos201 example]# pwd
- /usr/software/canal/conf/example
- [root@centos201 example]# vim instance.properties
canal服务器的example实例 属性修改如下:

5)启动canal服务器,模式为tcp ;接收socket客户端连接,如java的socket客户端;
[root@centos201 canal]# bin/startup.sh

【小结】
canal服务器配置修改完成,canal服务器启动成功;
1)创建maven项目,引入 canal 依赖;
- <dependencies>
- <dependency>
- <groupId>com.alibaba.ottergroupId>
- <artifactId>canal.clientartifactId>
- <version>1.1.2version>
- dependency>
- dependencies>
2)java客户端:
- /**
- * @Description canal客户端
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月17日
- */
- public class MyCanalClient {
-
- public static void main(String[] args) throws Exception {
- // 获取canal服务的连接
- CanalConnector canalConnector =
- CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.163.201", 11111), "example", "", "");
- // 尝试读取服务端是否有新数据
- while (true) {
- // 连接
- canalConnector.connect();
- // 订阅数据库,监控数据库 trcanal所有表
- canalConnector.subscribe("trcanal.*");
- // 获取数据,每次获取100条
- Message message = canalConnector.get(100);
- // 获取 Entry 集合
- List
entries = message.getEntries(); - // 判断集合是否为空,如果为空,则等待继续拉取
- if (entries == null || entries.isEmpty()) {
- System.out.println("没有数据,休息3s");
- Thread.sleep(5000);
- continue;
- }
- // 遍历 entries 单条解析
- for (CanalEntry.Entry entry : entries) {
- // 获取表名
- String tableName = entry.getHeader().getTableName();
- // 获取类型
- CanalEntry.EntryType entryType = entry.getEntryType();
- // 获取序列化后的数据
- ByteString storeValue = entry.getStoreValue();
- // 判断当前entryType类型是是否为 RowData 类型
- if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
- // 反序列化数据
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
- // 获取当前事件的操作类型
- CanalEntry.EventType eventType = rowChange.getEventType();
- // 获取数据集
- List
rowDatasList = rowChange.getRowDatasList(); - // 遍历并打印数据集
- for (CanalEntry.RowData rowData : rowDatasList) {
- // 获取修改前的数据
- JSONObject beforeData = new JSONObject();
- List
beforeColumnsList = rowData.getBeforeColumnsList(); - for (CanalEntry.Column column : beforeColumnsList) {
- beforeData.put(column.getName(), column.getValue());
- }
- // 获取修改后的数据
- JSONObject afterData = new JSONObject();
- List
afterColumnsList = rowData.getAfterColumnsList(); - for (CanalEntry.Column column : afterColumnsList) {
- afterData.put(column.getName(), column.getValue());
- }
- // 打印
- System.out.println("table = " + tableName + ", eventType=" + eventType + " before= " + beforeData + "after " + afterData);
- }
- }
- }
- }
- }
- }
3)运行结果:
3.1)插入数据,触发binlog日志;
- INSERT INTO trcanal.user_inf_tbl (id, name, sex) VALUES
- ('20220917_0026', 'zhangsan0026', 'male26')
- ;
3.2)canal客户端打印日志:
table = user_inf_tbl, eventType=INSERT before= {}after {"sex":"male26","name":"zhangsan0026","id":"20220917_0026"}
1)查看日志
以 example 实例为例,查看它的运行日志,如下(example.log):
- [root@centos201 example]# pwd
- /usr/software/canal/logs/example
- [root@centos201 example]# ll
- total 1796
- -rw-r--r--. 1 root root 1797729 Sep 17 17:13 example.log
- -rw-r--r--. 1 root root 1399 Sep 17 17:10 meta.log
2)查看 example.log
- Caused by: java.io.IOException: handshake exception:
- ErrorPacket [errorNumber=1129, fieldCount=-1, message=192.168.163.201' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts', sqlState=ost ', sqlStateMarker=H]
- at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:168) ~[canal.parse.driver-1.1.6.jar:na]
- at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82) ~[canal.parse.driver-1.1.6.jar:na]
- ... 4 common frames omitted
message=192.168.163.201' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
【日志解释】

参考自: mysql - How to unblock with mysqladmin flush hosts - Stack Overflow