• 缓存同步之 RabbitMQ 和 Canal


    缓存同步

    大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。所以我们必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。

    数据同步策略

    设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

    • 优势:简单、方便
    • 缺点:时效性差,缓存过期之前可能不一致
    • 场景:更新频率较低,时效性要求低的业务

    同步双写:在修改数据库的同时,直接修改缓存

    • 优势:时效性强,缓存与数据库强一致
    • 缺点:有代码侵入,耦合度高;
    • 场景:对一致性、时效性要求较高的缓存数据

    异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

    • 优势:低耦合,可以同时通知多个缓存服务
    • 缺点:时效性一般,可能存在中间不一致状态
    • 场景:时效性要求一般,有多个服务需要同步

    大多情况下采用异步通知,而异步实现又可以基于MQ或者Canal来实现。

    基于MQ的异步通知

    RabbitMQ

    SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

    SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

    整体流程:

    • 服务完成对数据的修改后,只需要发送一条消息到MQ中。
    • 缓存服务监听MQ消息,然后完成对缓存的更新。
    • 依然有少量的代码侵入

    在这里插入图片描述

    案例代码

    引入依赖
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    编写配置
    spring:
      rabbitmq:
        host: 192.168.150.101 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: XXX # 用户名
        password: XXX # 密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    声明队列交换机名称
    /**
     * 声明队列交换机名称
     */
    public class MqConstants {
    
        /**
         * 交换机
         */
        public final static String ITEM_EXCHANGE = "item.topic";
    
        /**
         * 监听新增和修改的队列
         */
        public final static String ITEM_INSERT_AND_UPDATE_QUEUE = "item.insert.update.queue";
        /**
         * 监听删除的队列
         */
        public final static String ITEM_DELETE_QUEUE = "item.delete.queue";
    
        /**
         * 新增或修改的 RoutingKey
         */
        public final static String ITEM_INSERT_AND_UPDATE_KEY = "item.insert.update";
        /**
         * 删除的 RoutingKey
         */
        public final static String ITEM_DELETE_KEY = "item.delete";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    配置队列交换机
    @Configuration
    public class MqConfig {
        /**
         * 配置交换机
         * @return
         */
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange(ITEM_EXCHANGE, true, false);
        }
        /**
         * 配置新增和修改的队列
         * @return
         */
        @Bean
        public Queue insertQueue(){
            return new Queue(ITEM_INSERT_AND_UPDATE_QUEUE, true);
        }
        /**
         * 配置删除的队列
         * @return
         */
        @Bean
        public Queue deleteQueue(){
            return new Queue(ITEM_DELETE_QUEUE, true);
        }
        /**
         * 绑定 新增和修改的队列和对应的 RoutingKey
         * @return
         */
        @Bean
        public Binding insertQueueBinding(){
            return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(ITEM_INSERT_AND_UPDATE_KEY);
        }
        /**
         * 绑定 删除的队列和对应的 RoutingKey
         * @return
         */
        @Bean
        public Binding deleteQueueBinding(){
            return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(ITEM_DELETE_KEY);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    发送MQ消息

    在service中的增、删、改业务中分别发送MQ消息:

    @RestController
    @RequestMapping("/item")
    public class ItemController {
    
        @Autowired
        private IItemService itemService;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    	//新增
        @PostMapping
        public void saveItem(@RequestBody Item item){
            itemService.save(item);
            rabbitTemplate.convertAndSend(ITEM_EXCHANGE,ITEM_INSERT_AND_UPDATE_KEY,item.getId());
        }
    	//修改
        @PutMapping
        public void updateItem(@RequestBody Item item) {
            itemService.updateById(item);
            rabbitTemplate.convertAndSend(ITEM_EXCHANGE,ITEM_INSERT_AND_UPDATE_KEY,item.getId());
        }
    
    	//删除
        @DeleteMapping("/{id}")
        public void deleteById(@PathVariable("id") Long id){
            itemService.removeById(id);
            rabbitTemplate.convertAndSend(ITEM_EXCHANGE,ITEM_DELETE_KEY,id);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    监听MQ消息
    @Component
    public class ItemListener {
        
        @Autowired
        private ItemServiceImpl itemService;
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        /**
         * 监听新增或修改的业务
         * @param id 
         */
        @RabbitListener(queues = ITEM_INSERT_AND_UPDATE_QUEUE)
        public void listenInsertOrUpdate(Long id){
            //查询数据库
            Item item = itemService.getById(id);
            //新增redis缓存,对于修改,直接新增覆盖(key一致)redis缓存即可
            redisTemplate
                    .opsForValue()
                    .set("item:id:"+id, JSONUtil.toJsonStr(item), Duration.ofMinutes(30));
        }
    
        /**
         * 监听删除的业务
         * @param id 
         */
        @RabbitListener(queues = ITEM_DELETE_QUEUE)
        public void listenDelete(Long id){
            //删除redis缓存
            redisTemplate.delete("item:id:"+id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    基于Canal的通知

    认识Canal

    Canal [kə’næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发,基于数据库增量日志解析,提供增量数据订阅&消费。

    GitHub官方的地址:https://github.com/alibaba/canal

    Canal是基于mysql的主从同步来实现的,Canal就是把自己伪装成MySQL的一个slave节点,从而监听masterbinary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
    在这里插入图片描述

    整体流程:

    • 服务完成修改后,业务直接结束,没有任何代码侵入。
    • Canal监听MySQL变化,当发现变化后,立即通知缓存服务。
    • 缓存服务接收到canal通知,更新缓存。
    • 代码零侵入
      在这里插入图片描述

    实例代码

    引入依赖
    <dependency>
        <groupId>top.javatoolgroupId>
        <artifactId>canal-spring-boot-starterartifactId>
        <version>1.2.1-RELEASEversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    编写配置
    canal:
      destination: canal # canal的集群名字,要与安装canal时设置的名称一致
      server: 192.168.150.101:11111 # canal服务地址,更换ip
    
    • 1
    • 2
    • 3
    修改实体类

    通过@Id、@Column、@Transient等注解完成实体类与数据库表字段的映射:

    • @Id :主键ID
    • @Column:属性名与数据库表中字段不一致
    • @Transient:数据库表中不存在字段
    @Data
    @TableName("tb_item")
    public class Item {
        @TableId(type = IdType.AUTO)
        @Id
        private Long id;//商品id
        @Column(name = "name")
        private String name;//商品名称
        private String title;//商品标题
        private Long price;//价格(分)
        private String image;//商品图片
        private String category;//分类名称
        private String brand;//品牌名称
        private String spec;//规格
        private Integer status;//商品状态 1-正常,2-下架
        private Date createTime;//创建时间
        private Date updateTime;//更新时间
        @TableField(exist = false)
        @Transient
        private Integer sold;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    编写监听器

    通过实现EntryHandler接口编写监听器,监听Canal消息。注意两点:

    • 实现类通过@CanalTable("XXX")指定监听的表信息
    • EntryHandler的泛型是与表对应的实体类
    @CanalTable("tb_item")
    @Component
    public class ItemHandler implements EntryHandler<Item> {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        /**
         * 监听新增方法
         * @param item
         */
        @Override
        public void insert(Item item) {
            //新增redis缓存
            redisTemplate
                    .opsForValue()
                    .set("item:id:"+item.getId(), JSONUtil.toJsonStr(item), Duration.ofMinutes(30));
        }
    
        /**
         * 监听修改方法
         * @param before
         * @param after
         */
        @Override
        public void update(Item before, Item after) {
            //对于修改,直接新增覆盖(key一致)redis缓存即可
            redisTemplate
                    .opsForValue()
                    .set("item:id:"+after.getId(), JSONUtil.toJsonStr(after), Duration.ofMinutes(30));
        }
    
        /**
         * 监听删除方法
         * @param item
         */
        @Override
        public void delete(Item item) {
            //删除redis缓存
            redisTemplate.delete("item:id:"+item.getId());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
  • 相关阅读:
    Kafka系列之:Kafka滚动升级流程
    淘宝/天猫获取卖出的商品订单列表 API
    不用Swagger,那我用啥?
    阿里云原生应用平台架构师田伟:应用架构的规划、治理与演进
    syslog Linux系统log打印原理
    python相关岗位面试题总结(五)(持续更新)
    【Linux】shell脚本+cron定时任务实现“当程序报错时,发送邮件”
    大数据培训之配置Hbase支持Phoenix创建二级索引
    DIY官网可视化工具打造UNIAPP-uviewUI可视化
    JVM常用的一些参数
  • 原文地址:https://blog.csdn.net/qq_54429571/article/details/128117441