binlog是一个二进制文件,它保存在磁盘中,是用来记录数据库表结构变更、表数据修改的二进制日志。其实除了数据复制外,它还可以实现数据恢复、增量备份等功能。
首先需要确保mysql服务已经启用
了binlog
show variables like 'log_bin';
如果为值为OFF,表示没有启用,那么需要首先启用binlog,修改mysql的配置文件:
log_bin= /var/log/mysql/mysql-bin.log #指定binlog路径
binlog-format=ROW
server-id=1
expire_logs_days = 10
max_binlog_size = 100M
对参数做一个简要说明:
注意:
在更改完配置文件后,重启mysql服务。再次查看是否启用binlog,返回为ON,表示已经开启成功。
这时候随意的对某一个数据库中的表做一下增删改,对应的日志就会记录在/var/log/mysql/这个文件夹下了。我们看一下这个文件夹里的东西:
这里的文件是没有办法正常查看的,需要使用mysql提供的命令来查看,命令是这个样子的:
1. 查看
mysqlbinlog mysql-bin.000002
2. 指定位置查看
mysqlbinlog --start-position="120" --stop-position="332" mysql-bin.000002
-- 查询binglog日志列表
show binary logs;
-- 查询第一个(最早)的binlog日志
show binlog events;
-- 指定查询 mysql-bin.000077 日志
show binlog events in 'mysql-bin.000077';
-- 指定查询 mysql-bin.000077 日志,并且从pos=1024开始查
show binlog events in 'mysql-bin.000077' from 1024;
-- 指定查询 mysql-bin.000077 日志,并且从pos=1024开始查起,查询10条
show binlog events in 'mysql-bin.000077' from 1024 limit 10;
-- 指定查询 mysql-bin.000077 日志,并且从pos=1024开始查起,偏移2行,查询10条
show binlog events in 'mysql-bin.000077' from 1024 limit 2,10;
因为我们现在的binlog_format指定的格式是ROW(就在上面写的,还记得吗?),所谓binlog文件的内容没有办法正常查看,因为他是这个样子的:
这时,我们需要对输出进行解码
mysqlbinlog --base64-output=decode-rows -v mysql-bin.000001
这时候,显示的结果就变成了:
虽然还不是正常的sql,但是好赖是有一定的格式了。
but自己来做解析的话还是很麻烦,so~放弃这种操作。
在这个过程中,我发现阿里巴巴有一款开源的软件可以用。就是标题上说道的:canal。看了一下网站上的介绍,简直美滋滋。
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
canal官网地址:https://github.com/alibaba/canal
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
canal要做的就是基于binlog实现增量而不是全量的数据同步。
基于binlog实现数据同步的方案有两种:
mysql-binlog-connector | ali的canal |
---|---|
通过引入依赖jar包实现,需要自行实现解析,但是相对轻量。 | 是数据同步中间件,需要单独部署维护,功能强大,支持数据库及MQ的同步,维护成本高。 |
根据实际业务场景,按需索取,业务量小,业务简单,轻量可以通过mysql-binlog-connector,业务量大,逻辑复杂,有专门的运维团队,可以考虑canal,比较经过阿里高并发验证,相对稳定。
Canal监听mysql的binlog日志实现数据同步:https://blog.csdn.net/m0_37583655/article/details/119517336
Java监听mysql的binlog详解(mysql-binlog-connector):https://blog.csdn.net/m0_37583655/article/details/119148470
打开配置文件conf/example/instance.properties,配置数据库连接等信息如下:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
# 在MySQL服务器授权的账号密码字符集
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex.*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# table black regex
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
启动canal.deployer-1.1.5\bin路径下:startup.bat
@Bean
public CanalConnector canalConnector() {
CanalConnector connector = CanalConnectors.newSingleConnector(
//对应canal服务的链接
new InetSocketAddress(canalConf.getIp(), canalConf.getPort()),
//链接的目标,这里对应canal服务中的配置,需要查阅文档
canalConf.getDestination(),
//不知道是什么用户,使用“”
canalConf.getUser(),
//不知道是什么密码,使用“”
canalConf.getPassword()
);
return connector;
}
2)先开启一个线程,里面写一个死循环,用于从canal的服务中获取binlog中的消息。这个消息类是:com.alibaba.otter.canal.protocol.Message。
Message message = connector.getWithoutAck(100);
3)取出Message中的事件集合,就是binlog中的每一条数据。将类型为增删改的数据取出,之后每一条数据放在一个线程中,用线程池去执行它。
List<Entry> entries = message.getEntries();
message.getEntries():从链接中获取的数据集合,每一条代表1条binlog数据
4)在每一个线程中,取出Entry中的数据,根据其类型拼接各种sql,并执行。
Header header = entry.getHeader();
//获取发生变化的表名称,可能会没有
String tableName = header.getTableName();
//获取发生变化的数据库名称,可能会没有
String schemaName = header.getSchemaName();
//获取事件类型
EventType eventType = rowChange.getEventType();
/**
这里我们只是用其中的三种类型:
EventType.DELETE 删除
EventType.INSERT 插入
EventType.UPDATE 更新
*/
//获取发生变化的数据
RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
//遍历其中的数据
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {
//每一行中的数据
RowData rowData = rowChange.getRowDatas(i);
}
//获取修改前的数据
List<Column> before = rowData.getBeforeColumnsList();
//获取修改后的数据
List<Column> after = rowData.getAfterColumnsList();
Column中有一系列方法,比如是否发生修改,时候为key,是否是null等,就不在细说了。扩展:阿里Canal框架(数据同步中间件)初步实践
https://mp.weixin.qq.com/s?__biz=MzU2MTI4MjI0MQ==&mid=2247486253&idx=1&sn=28112ccd3f5b20e93a3a98836f70948b&scene=45#wechat_redirect
1)这里先写一个线程,用于不停的从canal服务中获取消息,然后创建新的线程并让其处理其中的数据。代码如下:
@Override
public void run() {
while (true) {
//主要用于在链接失败后用于再次尝试重新链接
try {
if (!run) {
//打开链接,并设置 run=true
startCanal();
}
} catch (Exception e) {
System.err.println("连接失败,尝试重新链接。。。");
threadSleep(3 * 1000);
}
System.err.println("链接成功。。。");
//不停的从CanalConnector中获取消息
try {
while (run) {
//获取一定数量的消息,这里为线程池数量×3
Message message = connector.getWithoutAck(batchSize * 3);
long id = message.getId();
//处理获取到的消息
process(message);
connector.ack(id);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
//如果发生异常,最终关闭连接,并设置run=false
stopCanal();
}
}
}
void process(Message message) {
List<Entry> entries = message.getEntries();
if (entries.size() <= 0) {
return;
}
log.info("process message.entries.size:{}", entries.size());
for (Entry entry : entries) {
Header header = entry.getHeader();
String tableName = header.getTableName();
String schemaName = header.getSchemaName();
//这里判断是否可以取出数据库名称和表名称,如果不行,跳过循环
if (StringUtils.isAllBlank(tableName, schemaName)) {
continue;
}
//创建新的线程,并执行
jobList.stream()
.filter(job -> job.isMatches(tableName, schemaName))
.forEach(job -> executorService.execute(job.newTask(entry)));
}
}
这里的jobList是我自己定义List,代码如下:
package com.hebaibai.miner.job;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import static com.alibaba.otter.canal.protocol.CanalEntry.Entry;
@Slf4j
@Data
public abstract class Job {
/**
* 数据库链接
*/
protected JdbcTemplate jdbcTemplate;
/**
* 额外配置
*/
protected JSONObject prop;
/**
* 校验目标是否为合适的数据库和表
*
* @param table
* @param database
* @return
*/
abstract public boolean isMatches(String table, String database);
/**
* 实例化一个Runnable
*
* @param entry
* @return
*/
abstract public Runnable newTask(final Entry entry);
/**
* 获取RowChange
*
* @param entry
* @return
*/
protected CanalEntry.RowChange getRowChange(Entry entry) {
try {
return CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
}
}
jobList里面放的是Job的实现类。
写一个Job的实现类,并用于同步表,并转换字段名称。因为需求中要求两个同步的数据中可能字段名称不一致,所以我写了一个josn用来配置两个表的字段对应关系:
//省略其他配置
"prop": {
//来源数据库
"database": "pay",
//来源表
"table": "p_pay_msg",
//目标表(目标库在其他地方配置)
"target": "member",
//字段对应关系
//key :来源表的字段名
//value:目标表的字段名
"mapping": {
"id": "id",
"mch_code": "mCode",
"send_type": "mName",
"order_id": "phone",
"created_time": "create_time",
"creator": "remark"
}
}
//省略其他配置
下面是全部的代码,主要做的就是取出变动的数据,按照对应的字段名重新拼装sql,然后执行就好了。扩展:基于canal进行日志的订阅和转换
https://mp.weixin.qq.com/s?__biz=MzI4Njc5NjM1NQ==&mid=2247489997&idx=2&sn=c28567f0248601443c066ed63f4fcedb&chksm=ebd626e1dca1aff7ae57cfa1c5ebe7f121c5e85e463eae713621c12b6773d851ed94e4ec6bf2&scene=21#wechat_redirect
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static com.alibaba.otter.canal.protocol.CanalEntry.*;
/**
* 单表同步,表的字段名称可以不同,类型需要一致
* 表中需要有id字段
*/
@SuppressWarnings("ALL")
@Slf4j
public class TableSyncJob extends Job {
/**
* 用于校验是否适用于当前的配置
*
* @param table
* @param database
* @return
*/
@Override
public boolean isMatches(String table, String database) {
return prop.getString("database").equals(database) &&
prop.getString("table").equals(table);
}
/**
* 返回一个新的Runnable
*
* @param entry
* @return
*/
@Override
public Runnable newTask(final Entry entry) {
return () -> {
RowChange rowChange = super.getRowChange(entry);
if (rowChange == null) {
return;
}
EventType eventType = rowChange.getEventType();
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {
RowData rowData = rowChange.getRowDatas(i);
if (eventType == EventType.DELETE) {
delete(rowData.getBeforeColumnsList());
}
if (eventType == EventType.INSERT) {
insert(rowData.getAfterColumnsList());
}
if (eventType == EventType.UPDATE) {
update(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
}
}
};
}
/**
* 修改后的数据
*
* @param after
*/
private void insert(List<Column> after) {
//找到改动的数据
List<Column> collect = after.stream().filter(column -> column.getUpdated() || column.getIsKey()).collect(Collectors.toList());
//根据表映射关系拼装更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
List<String> columnNames = new ArrayList<>();
List<String> columnValues = new ArrayList<>();
for (int i = 0; i < collect.size(); i++) {
Column column = collect.get(i);
if (!mapping.containsKey(column.getName())) {
continue;
}
String name = mapping.getString(column.getName());
columnNames.add(name);
if (column.getIsNull()) {
columnValues.add("null");
} else {
columnValues.add("'" + column.getValue() + "'");
}
}
StringBuilder sql = new StringBuilder();
sql.append("REPLACE INTO ").append(target).append("( ")
.append(StringUtils.join(columnNames, ", "))
.append(") VALUES ( ")
.append(StringUtils.join(columnValues, ", "))
.append(");");
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
}
/**
* 更新数据
*
* @param before 原始数据
* @param after 更新后的数据
*/
private void update(List<Column> before, List<Column> after) {
//找到改动的数据
List<Column> updataCols = after.stream().filter(column -> column.getUpdated()).collect(Collectors.toList());
//找到之前的数据中的keys
List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
//没有key,执行更新替换
if (keyCols.size() == 0) {
return;
}
//根据表映射关系拼装更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
//待更新数据
List<String> updatas = new ArrayList<>();
for (int i = 0; i < updataCols.size(); i++) {
Column updataCol = updataCols.get(i);
if (!mapping.containsKey(updataCol.getName())) {
continue;
}
String name = mapping.getString(updataCol.getName());
if (updataCol.getIsNull()) {
updatas.add("`" + name + "` = null");
} else {
updatas.add("`" + name + "` = '" + updataCol.getValue() + "'");
}
}
//如果没有要修改的数据,返回
if (updatas.size() == 0) {
return;
}
//keys
List<String> keys = new ArrayList<>();
for (Column keyCol : keyCols) {
String name = mapping.getString(keyCol.getName());
keys.add("`" + name + "` = '" + keyCol.getValue() + "'");
}
StringBuilder sql = new StringBuilder();
sql.append("UPDATE ").append(target).append(" SET ");
sql.append(StringUtils.join(updatas, ", "));
sql.append(" WHERE ");
sql.append(StringUtils.join(keys, "AND "));
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
}
/**
* 删除数据
*
* @param before
*/
private void delete(List<Column> before) {
//找到改动的数据
List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
if (keyCols.size() == 0) {
return;
}
//根据表映射关系拼装更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
StringBuilder sql = new StringBuilder();
sql.append("DELETE FROM `").append(target).append("` WHERE ");
List<String> where = new ArrayList<>();
for (Column column : keyCols) {
String name = mapping.getString(column.getName());
where.add(name + " = '" + column.getValue() + "' ");
}
sql.append(StringUtils.join(where, "and "));
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
}
}
mysql-binlog-connector-java
https://github.com/shyiko/mysql-binlog-connector-java
mysql原理 ~ binlog系列之 table_id详谈https://www.cnblogs.com/danhuangpai/p/11484256.html
Canal监听mysql的binlog日志实现数据同步https://blog.csdn.net/m0_37583655/article/details/119517336
用 canal 监控 binlog 并实现mysql定制同步数据的功能https://blog.51cto.com/u_12302929/3294157