• MySQL与ES数据同步之异步调用


    简述

    上一篇是同步调用,我们在中间加上MQ就可以实现异步调用,这种方式性能高,不易出现数据丢失问题,多源写入之间相互隔离,便于扩展更多的数据源写入。
    同时也会带来一些问题,首先还是代码侵入强,其次系统复杂度会增加,因为引入了消息中间件
    可能出现延时问题:MQ是异步消费模型,可能会造成延时。
    这种方案也不是很推荐,简单了解学习一下就好。

    下面通过SpringBoot项目演示一下,首先本地要有MQ,我这里使用RabbitMQ。若本地没有,可移步:Windows版Docker安装RabbitMQ
    Linux的Docker也类似

    对RabbitMQ还不是很了解的,可以打开我的主页查看RabbitMQ系列教程

    这里只做最简单的MQ可靠性配置

    SpringBoot项目

    引入依赖

    全部依赖如下

            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-data-elasticsearchartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-jdbcartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-devtoolsartifactId>
                <scope>runtimescope>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>com.baomidougroupId>
                <artifactId>mybatis-plus-boot-starterartifactId>
                <version>3.5.3.1version>
            dependency>
            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
                <scope>runtimescope>
            dependency>
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    配置文件

    注意修改Mysql,ES,rabbitmq地址及信息

    # 端口号8080
    server:
      port: 8080
    
    # 数据库名:mysql,用户名root,密码123456
    spring:
      datasource:
        username: root
        password: 123456
        url: jdbc:mysql://mysql地址:3306/mysql?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
        driver-class-name: com.mysql.cj.jdbc.Driver
      elasticsearch:
        rest:
          uris: ES地址:9200
      rabbitmq:
        host: rabbitmq地址
        port: 5672
        username: admin
        password: admin
        #确认消息已发送到交换机
        publisher-confirm-type: correlated
        #确认消息已发送到队列(Queue)
        publisher-returns: true
    
    # mybatis-plus配置
    mybatis-plus:
      # xml文件位置
      mapper-locations: classpath:mapper/*.xml
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    项目结构

    在这里插入图片描述

    实体类

    /**
     * mysql(user)与ES(user-demo)实体类
     */
    @Data
    @TableName(value = "user_t")
    @Document(indexName = "user-demo")
    public class User {
        @Id
        private String id;
        private String userName;
        private String address;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    配置类

    RabbitMQ交换机队列声明,绑定配置类

    /**
     * RabbitMQ交换机队列声明,绑定配置类
     */
    @Configuration
    public class Config {
    
        //交换机名称
        public static final String X_EXCHANGE = "X";
    
        //队列名称
        public static final String QUEUE_INSERT = "A";
        public static final String QUEUE_DELETE = "B";
        public static final String QUEUE_UPDATE = "C";
    
    
        //声明交换机xExchange
        @Bean("xExchange")
        public DirectExchange xExchange() {
            return new DirectExchange(X_EXCHANGE);
        }
    
        //声明队列A
        @Bean("queueA")
        public Queue queueInsert() {
            return QueueBuilder.durable(QUEUE_INSERT).build();
        }
    
        //声明队列B
        @Bean("queueB")
        public Queue queueDelete() {
            return QueueBuilder.durable(QUEUE_DELETE).build();
        }
    
        //声明队列C
        @Bean("queueC")
        public Queue queueUpdate() {
            return QueueBuilder.durable(QUEUE_UPDATE).build();
        }
    
        //绑定交换机与队列
        //A与X通过XA线路绑定
        @Bean
        public Binding queueInsertBindingX(@Qualifier("queueA") Queue queueA,
                                      @Qualifier("xExchange") DirectExchange xExchange) {
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
        }
    
        //B与X通过XB线路绑定
        @Bean
        public Binding queueDeleteBindingX(@Qualifier("queueB") Queue queueB,
                                      @Qualifier("xExchange") DirectExchange xExchange) {
            return BindingBuilder.bind(queueB).to(xExchange).with("XB");
        }
    
        //C与X通过XC线路绑定
        @Bean
        public Binding queueUpdateBindingX(@Qualifier("queueC") Queue queueC,
                                           @Qualifier("xExchange") DirectExchange xExchange) {
            return BindingBuilder.bind(queueC).to(xExchange).with("XC");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    回调接口配置类

    /**
     * 回调接口
     */
    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            //内部接口注入类中
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
    
        }
    
        /**
         * 交换机确定回调方法
         * 1.发消息 交换机接收到消息 回调
         * 1.1 correlationData 保存回调消息的ID及相关信息
         * 1.2 交换机收到消息 ack=true
         * 1.3 cause null
         * 2.发消息 交换机接受失败 回调
         * 2.1 correlationData 保存回调消息的ID及相关信息
         * 2.2 交换机收到消息 ack=false
         * 2.3 cause 失败原因
         */
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String id = correlationData != null ? correlationData.getId() : "";
            if (ack) {
                log.info("交换机回报消息:收到id为:{}的消息", id);
            } else {
                log.info("交换机回报消息:未经收到id为:{}的消息,原因为:{}", id, cause);
            }
    
        }
    
        /**
         * 队列失败回报
         * @param message 消息
         * @param i 返回码
         * @param s 返回信息
         * @param s1 交换机
         * @param s2 路由
         */
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            log.error("队列回报消息:消息被交换机:{}退回,路由key:{},退回原因:{}", s1,s2,s);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    Mapper接口

    UserMapper接口

    /**
     * mysql user实体Mapper接口
     */
    public interface UserMapper extends BaseMapper<User> {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    UserEsMapper

    /**
     * ES user-demo实体Mapper接口
     */
    @Repository
    public interface UserEsMapper extends ElasticsearchRepository<User,String> {
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Controller类

    此处Controller充当生产者,接到请求,先执行mysql操作,然后将消息按情况通过交换机转发到不同的队列,相应的消费者收到消息后对ES进行处理

    /**
     * 异步调用方式实现mysql与ES数据同步Controller/消息生产者
     */
    @Slf4j
    @RestController
    @RequestMapping(value = "/asyn")
    public class DataController {
    
        @Resource
        private IDataService dataService;
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 同步更新mysql和ES的user信息
         * @param user user实体
         */
        @GetMapping("/update")
        public void updateData(User user){
            dataService.updateMysqlData(user);
            String key = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(key);
            rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);
            log.info("Producer消息:已发送消息:{}到队列A中等待ES更新处理,消息ID:{}",user.getId(),key);
        }
    
        /**
         * 查询user表信息
         * @return user信息集合
         */
        @GetMapping("/findData")
        public List<User> findAllData(){
            return dataService.findAllData();
        }
    
        /**
         * 同步根据id删除mysql和ES中user对应的数据信息
         * @param id 需要删除的信息id
         */
        @GetMapping("/delete")
        public void deleteDataById(String id){
            dataService.deleteDataById(id);
            String key = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(key);
            rabbitTemplate.convertAndSend("X","XB",id,correlationData);
            log.info("Producer消息:已发送消息:{}到队列B中等待ES删除处理,消息ID:{}",id,key);
        }
    
        /**
         * 同步新增mysql和ES的user数据
         * @param user user实体
         */
        @GetMapping("addData")
        public void addData(User user){
            dataService.addData(user);
            String key = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(key);
            rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);
            log.info("Producer消息:已发送消息:{} 到队列A中等待ES新增处理,消息ID:{}",user.getId(),key);
        }
    
        /**
         * 同步删除mysql和ES中所有user信息
         */
        @GetMapping("deleteAll")
        public void deleteAllData(){
            dataService.deleteAllData();
            dataService.esDeleteAllData();
        }
    
        /**
         * 查询ES中所有user信息
         */
        @GetMapping("findEs")
        public Iterable<User> findEs(){
            return dataService.findEs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    Service接口

    /**
     * 异步调用方式实现mysql与ES数据同步Service
     */
    public interface IDataService extends IService<User> {
    
        /**
         * 根据id更新mysql数据
         * @param user 需要更新数据的user对象
         */
        void updateMysqlData(User user);
    
        /**
         * 查询所有数据
         * @return user对象集合
         */
        List<User> findAllData();
    
        /**
         * mysql根据id删除信息
         * @param id 需要删除信息的id
         */
        void deleteDataById(String id);
    
        /**
         * mysql新增数据
         * @param user 需要新增数据的对象
         */
        void addData(User user);
    
        /**
         * ES根据ID删除数据
         * @param id 需要删除信息的id
         */
        void esDeleteDataById(String id);
    
        /**
         * ES新增/根据ID修改数据
         * @param user 需要新增/根据ID修改数据的对象
         */
        void esAddData (User user);
    
        /**
         * mysql删除user表所有数据
         */
        void deleteAllData();
    
        /**
         * es删除index=user-demo中所有数据
         */
        void esDeleteAllData();
    
        /**
         * 查询ES中所有数据信息
         */
        Iterable<User> findEs();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    Service实现类

    /**
     * 异步调用方式实现mysql与ES数据同步Service实现类
     */
    @Service
    public class DataServiceImpl extends ServiceImpl<UserMapper, User> implements IDataService {
    
        @Resource
        private UserMapper userMapper;
        @Resource
        private UserEsMapper userEsMapper;
    
        /**
         * 根据id更新mysql数据
         * @param user 需要更新数据的user对象
         */
        @Override
        public void updateMysqlData(User user) {
            userMapper.updateById(user);
        }
    
        /**
         * 查询所有数据
         * @return user对象集合
         */
        @Override
        public List<User> findAllData() {
            return userMapper.selectList(null);
        }
    
        /**
         * mysql根据id删除信息
         * @param id 需要删除信息的id
         */
        @Override
        public void deleteDataById(String id) {
            userMapper.deleteById(id);
        }
    
        /**
         * mysql新增数据
         * @param user 需要新增数据的对象
         */
        @Override
        public void addData(User user) {
            userMapper.insert(user);
        }
    
        /**
         * ES根据ID删除数据
         * @param id 需要删除信息的id
         */
        @Override
        public void esDeleteDataById(String id) {
            userEsMapper.deleteById(id);
        }
    
        /**
         * ES新增/根据ID修改数据
         * @param user 需要新增/根据ID修改数据的对象
         */
        @Override
        public void esAddData(User user) {
            userEsMapper.save(user);
        }
    
        /**
         * mysql删除user表所有数据
         */
        @Override
        public void deleteAllData() {
            userMapper.delete(null);
        }
    
        /**
         * es删除index=user-demo中所有数据
         */
        @Override
        public void esDeleteAllData() {
            userEsMapper.deleteAll();
        }
    
        /**
         * 查询ES中user所有信息
         * @return 查询user信息集合
         */
        @Override
        public Iterable<User> findEs() {
            return userEsMapper.findAll();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    监听类/消费者

    /**
     * 异步调用方式实现mysql与ES数据同步消息消费者
     */
    @Slf4j
    @Component
    public class Consumer {
        @Resource
        private IDataService dataService;
        @Resource
        private UserMapper userMapper;
    
        //接收消息
        @RabbitListener(queues="A")
        public void addData(Message message){
            log.info("Consumer消息:当前时间:{},收到A队列的消息:{},进行ES新增操作",new Date().toString(),new String(message.getBody()));
            QueryWrapper<User> queryWrapper = new QueryWrapper<>();
            queryWrapper.eq("id",new String(message.getBody()));
            User user = userMapper.selectOne(queryWrapper);
            dataService.esAddData(user);
            log.info("ES新增/更新数据为:{}",user);
        }
    
        @RabbitListener(queues = "B")
        public void delete(Message message){
            log.info("Consumer消息:当前时间:{},收到B队列的消息:{},进行ES删除操作",new Date().toString(),new String(message.getBody()));
            dataService.esDeleteDataById(new String(message.getBody()));
        }
    
        @RabbitListener(queues="C")
        public void update(Message message){
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    操作完成

  • 相关阅读:
    Day 48 | 198.打家劫舍 & 213.打家劫舍II & 337.打家劫舍 III
    scala 连接 MySQL 数据库案例
    晶体管级数字电路设计专栏目录
    我用Axure制作了一款火影小游戏 | PM老猫
    AQS小总结
    Python编程 字符串组成方式
    Effective Modern C++[实践]->优选delete关键字删除函数,而非private未定义函数
    再获Gartner认可!持安科技获评ZTNA领域代表供应商
    CSDN竞赛第五期竞赛-习题解析
    【cocos2dx】记录问题,粒子不会通过setOpacity调整整体透明度
  • 原文地址:https://blog.csdn.net/m0_68681879/article/details/132836614