canal可以用来监听mysql数据库的变化,用来同步数据
先下载最新的部署版本,release地址:Releases · alibaba/canal · GitHub
包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
下载完后,在linux上新建一个canal文件夹,放入tar包解压: tar -zxvf canal.xxx.tar.gz
解压完后修改配置文件
查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.user和canal.admin.passwd是客户端连接的账号
再打开conf/example/ instance.properties, master.address填数据库地址,dbUsername和dbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔
修改完成后,进入bin目录,执行./startup.sh是启动,./stop.sh是关闭
进入logs/example,执行tail -f -n 300 example.log,看到以下输出说明搭建成功了
引入依赖
- <dependencies>
- <dependency>
- <groupId>com.alibaba.ottergroupId>
- <artifactId>canal.clientartifactId>
- <version>1.1.7version>
- dependency>
- <dependency>
- <groupId>com.alibaba.ottergroupId>
- <artifactId>canal.protocolartifactId>
- <version>1.1.7version>
- dependency>
- dependencies>
代码实现:
- package cn.hollycloud.iplatform;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.google.protobuf.ByteString;
- import com.google.protobuf.InvalidProtocolBufferException;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.junit.Test;
-
- import java.net.InetSocketAddress;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
-
- /**
- * Unit test for simple App.
- */
- @Slf4j
- public class CanalTest {
- private Map
errorMap = new HashMap<>(); -
- @Test
- public void testCanal() {
- initThread();
- }
-
- private void initThread() {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- initConnect();
- } catch (Exception e) {
- String key = "canal_connection_error";
- if (!hasSameError(key, e.getMessage())) {
- log.error("canal连接出错: {}", e);
- }
- }
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- }
- }
- }
- }).start();
- }
-
- private void initConnect() {
- String canalIp = "localhost";
- int canalPort = 11111;
- String canalDestination = "example";
- String canalUsername = "admin";
- String canalPassword = "123456";
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,
- canalPort), canalDestination, canalUsername, canalPassword);
- int batchSize = 200;
- try {
- connector.connect(); // 连接到canal server
- connector.subscribe("db_.*\\..*"); // 订阅指定的消息
- connector.rollback(); // 回滚到未进行ack 的地方
- log.info("canal连接成功");
- while (true) {
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- try {
- //未获取到消息则睡眠
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- }
- } else {
- try {
- //处理消息
- log.info("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
- handleMessage(message.getEntries());
- } catch (Exception e) {
- connector.rollback(batchId); // 处理失败, 回滚数据
- String key = "canal_sync_data_error";
- String errMsg = e.getMessage();
- if (StringUtils.isEmpty(errMsg)) errMsg = e.toString();
- if (!hasSameError(key, errMsg)) {
- log.error("同步数据出错: {}", e);
- }
- //休眠一段时间继续获取数据
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- continue;
- }
- }
- connector.ack(batchId); // 提交确认
- }
- } finally {
- connector.disconnect();
- }
- }
-
- private boolean hasSameError(String key, String error) {
- String lastError = errorMap.get(key);
- if (Objects.equals(lastError, error)) {
- return true;
- }
- errorMap.put(key, error);
- return false;
- }
-
- private void handleMessage(List
entrys) throws InvalidProtocolBufferException { - for (CanalEntry.Entry entry : entrys) {
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
- continue;
- }
- //根据数据库名获取租户名
- String databaseName = entry.getHeader().getSchemaName();
- String tableName = entry.getHeader().getTableName();
- log.info("数据库: {}, 表名: {}", databaseName, tableName);
- // 获取类型
- CanalEntry.EntryType entryType = entry.getEntryType();
-
- // 获取序列化后的数据
- ByteString storeValue = entry.getStoreValue();
- if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
- // 反序列化数据
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
- // 获取当前事件的操作类型
- CanalEntry.EventType eventType = rowChange.getEventType();
- if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
- || eventType == CanalEntry.EventType.DELETE) {
- // 获取数据集
- List
rowDataList = rowChange.getRowDatasList(); - // 遍历rowDataList,并打印数据集
- for (CanalEntry.RowData rowData : rowDataList) {
- List
afterColumnsList = rowData.getAfterColumnsList(); - List
beforeColumnsList = rowData.getBeforeColumnsList(); - // 变更前数据
- for (CanalEntry.Column column : beforeColumnsList) {
- log.info("变更前数据: name: {}, value: {}", column.getName(), column.getValue());
- }
- // 变更后数据
- for (CanalEntry.Column column : afterColumnsList) {
- log.info("变更后数据: name: {}, value: {}", column.getName(), column.getValue());
- }
- }
- }
- }
- }
- }
- }