想要实现MySQL与Redis数据同步,常见的方式有以下三种:
这里介绍的是第三种,异步通知。
异步通知又可以使用MQ或者Canal来实现。
(1)基于MQ的异步通知
核心:
这样在更新数据的操作中,仍然会有少量代码的侵入
(2)基于Canal的通知
核心:
什么是Canal?
canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
GitHub的地址:https://github.com/alibaba/canal
Canal是基于MySQL的主从同步来实现的,MySQL主从同步的原理如下:
Canal的核心工作原理就是将自己伪装成Mysql salve,模拟MySQL salve的交互协议向MySQL master发送dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。
Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。
(1)首先,先开启binlog
进入MySQL容器挂在日志文件,我这里在/tmp/mysql/conf
目录
修改文件:
vi /tmp/mysql/conf/my.cnf
添加内容:
log-bin=/var/lib/mysql/mysql-bin
--这里指定的是主库的数据库名
binlog-do-db=heima
配置解读:
log-bin=/var/lib/mysql/mysql-bin
:设置binary log文件的存放地址和文件名,叫做mysql-binbinlog-do-db=heima
:指定对哪个database记录binary log events,这里记录heima这个库接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对heima这个库的操作权限。
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;
操作完成之后需要重启容器
docker restart mysql
使用该命令就可以查看
show master status;
首先需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:
docker network create heima
让mysql加入这个网络:
docker network connect heima mysql
然后使用镜像运行cannal容器,没有镜像的话可以自行拉取
docker run -p 11111:11111 --name canal \
-e canal.destinations=heima \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=heima\\..* \
--network heima \
-d canal/canal-server:v1.1.5
-p 11111:11111
:这是canal的默认监听端口-e canal.instance.master.address=mysql:3306
:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id
来查看-e canal.instance.dbUsername=canal
:数据库用户名-e canal.instance.dbPassword=canal
:数据库密码-e canal.instance.filter.regex=
:要监听的表名称这样就安装完Canal了
在Java程序中,我们可以使用GitHub上的第三方开源的canal-starter客户端对Canal进行监控
地址:https://github.com/NormanGyllenhaal/canal-client
它与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。
引入依赖
<dependency>
<groupId>top.javatoolgroupId>
<artifactId>canal-spring-boot-starterartifactId>
<version>1.2.1-RELEASEversion>
dependency>
yml相关配置
canal:
destination: heima # canal的集群名字,要与安装canal时设置的名称一致
server: 192.168.150.101:11111 # canal服务地址
接下来需要修改实体类,使用注解完成数据库字段与实体类字段的映射
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
//标记不属于这个表的字段
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
编写监听器
通过实现EntryHandler
接口编写监听器,监听Canal消息。注意两点:
@CanalTable("tb_item")
指定监听的表信息@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;
@Override
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}
这样当MySQL中的数据发生变化之后,就会自动修改缓存中的数据。
参考: