https://blog.csdn.net/weixin_44371237/article/details/127904514
1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
把自己伪装成 Slave,假装从 Master 复制数据
启动MySQL
service mysqld start
登录 mysql
mysql -uroot -proot123456
创建数据库canal,表 test
赋权限
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
查看mysql数据库,可以看到用户已生效
下载并解压 Jar 包
https://github.com/alibaba/canal/releases
上传jar包到software,在/opt/module/创建canal文件夹,再解压
tar -zxvf /software/canal.deployer-1.1.2.tar.gz -C /opt/module/canal
修改 canal.properties 的配置(不用改)
vim /opt/module/canal/conf/canal.properties
进入
cd /opt/module/canal/conf/example
修改配置文件 vim instance.properties
## 只要跟/etc/my.cnf的server.id=1不一样就行
canal.instance.mysql.slaveId=20
canal.instance.master.address=hadoop100:3306
启动
/opt/module/canal/bin/startup.sh
jps查看有CanalLauncher进程
pom.xml
4.0.0
com.chen
canal
1.0-SNAPSHOT
8
8
com.alibaba.otter
canal.client
1.1.2
org.apache.kafka
kafka-clients
2.4.1
CanalClient
package com.chen;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
//连接器
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop100", 11111), "example", "", "");
while (true){
//连接
canalConnector.connect();
//订阅数据库
canalConnector.subscribe("canal.*");
//获取数据
Message message = canalConnector.get(100);
//获取Entry集合
List entries = message.getEntries();
//判断集合是否为空,为空则等待一会继续拉取
if (entries.size()<=0){
System.out.println("当次抓去没有数据,休息一会儿。。。");
Thread.sleep(1000);
}else {
//遍历entries,单条解析
for (CanalEntry.Entry entry : entries) {
//获取表名
String tableName = entry.getHeader().getTableName();
//获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//判断entry类型是否为ROWDATA类型
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
//反序列化
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//获取当前事件操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//获取数据集
List rowDatasList = rowChange.getRowDatasList();
//遍历
for (CanalEntry.RowData rowData : rowDatasList) {
//改变前数据
JSONObject jsonObjectBefore = new JSONObject();
List beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
jsonObjectBefore.put(column.getName(),column.getValue());
}
//改变后数据
JSONObject jsonObjectAfter = new JSONObject();
List afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
jsonObjectAfter.put(column.getName(),column.getValue());
}
System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
}
}else {
System.out.println("当前操作类型为:"+entryType);
}
}
}
}
}
}
运行程序
然后向canal数据库的test表插入数据
insert into canal.test values(1,'aaa');
控制台输出如下
修改 canal.properties 的配置
vim /opt/module/canal/conf/canal.properties
canal.serverMode = kafka
canal.mq.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
修改instance.properties
vim /opt/module/canal/conf/example/instance.properties
canal.mq.topic=canal
启动zookeeper和kafka
/home/zk.sh start
/home/kafka.sh start
启动canal
/opt/module/canal/bin/startup.sh
启动kafka消费者canal
bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic canal
向canal数据库的test表插入数据
insert into canal.test values(8,'aaa');
可以看到kafka消费者接收到如下