• RabbitMq的最终一致性分布式事务


    使用rabbitmq的步骤

    在这里插入图片描述

    1.运行安装在服务器上的rabbit服务

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs8LPaRN-1664352105453)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901201727164.png)]

    或者在docker上运行

    # 使用数据卷
    docker volume rm rabbitmq-5672-data
    docker volume create --name rabbitmq-5672-data
    docker run -d --rm --name rabbitmq-5672 \
        -v /etc/localtime:/etc/localtime:ro \
        -v rabbitmq-5672-data:/var/lib/rabbitmq \
        -p 5672:5672 \
        -p 15672:15672 \
        rabbitmq:3.10-management
    
    # 这个例子挂载「数据存储目录」
    docker run -d --rm --name rabbitmq-5672 \
        -v /etc/localtime:/etc/localtime:ro \
        -v ~/docker/5672/data:/var/lib/rabbitmq \
        -p 5672:5672 \
        -p 15672:15672 \
        rabbitmq:3.10-management
    

    2.在项目中安装依赖

    <dependency>
                <groupId>org.springframework.amqpgroupId>
                <artifactId>spring-rabbit-testartifactId>
                <scope>testscope>
    dependency>
    

    3.编写对应的配置文件

    ## 连接rabbitmq服务器
    spring.rabbitmq.host=192.168.12.12
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=hl
    
    ## 手动确认消息
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    
    ## 确认消息已发送到交换机( Exchange )
    spring.rabbitmq.publisher-confirm-type=CORRELATED
    
    # 确认消息已发送到队列(Queue)
    spring.rabbitmq.publisher-returns=true
    

    4.创建对应配置并加上启动注解

    @Configuration
    @EnableRabbit
    @Slf4j
    @Transactional
    public class RabbitConfig {
        @Resource
        private MessageDao messageDao;
    
        public static final String EMPLOYEE_LIST = "employee-list";
    
        public static final String DEPARTMENT_DELETE = "department-delete";
    
        @Bean
        public Queue DepartmentDelete(){
            return new Queue(DEPARTMENT_DELETE);
        }
    
        @Bean
        public Queue employeeList(){
            return new Queue(EMPLOYEE_LIST);
        }
    
        @Bean("rabbitTemplate")
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String s) {
                    //每次发送队列信息将触发此方法,需要添加配置属性
                    System.out.println(correlationData.getId());
                    Message message = messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));
                    if (ack){
                        message.setStatus("B");
                    }
                    message.setRetryCount(message.getRetryCount()-1);
                    log.info("剩余消息数:"+message.getRetryCount());
                    messageDao.save(message);
                }
            });
    
    //        rabbitTemplate.setMandatory(true);
    //
    //        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    //            @Override
    //            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    //                log.info("ReturnCallback 消息:{}", message);
    //                log.info("ReturnCallback 回应码:{}", replyCode);
    //                log.info("ReturnCallback 回应信息:{}", replyText);
    //                log.info("ReturnCallback 交换机:{}", exchange);
    //                log.info("ReturnCallback 路由键:{}", routingKey);
    //            }
    //        });
            return rabbitTemplate;
        }
    
    }
    

    5.创建message表记录发送次数及信息

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSNp3Mbr-1664352105454)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901202318282.png)]

    drop table if exists message;
    create table message
    (
        id          bigint auto_increment,
        exchange    varchar(64) ,
        routing_key varchar(64)  not null,
        content     varchar(128) not null,
        retry_count int          not null,
        status      varchar(32)  not null,
        primary key (id)
    );
    

    创建对应的DAO类和实体类

    6.发送请求时并创建message信息

    public void deleteById(Long id) {
            departmentDao.deleteById(id);
            Message message = new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id+"", 5, "A");
            messageDao.save(message);
    }
    

    7.创建spring Task定时器并定时输出rabbitmq信息

    @Component
    @Slf4j
    @Transactional
    @EnableScheduling
    public class RabbitTimer {
    
    
        @Resource
        private MessageMysqlDao messageDao;
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Scheduled(fixedDelay = 6000)
        private void process(){
            //获取状态不等于C和次数大于0的信息
            QueryWrapper<Message> wrapper = new QueryWrapper<>();
            wrapper.ne("status", "C");
            wrapper.gt("retry_count", 0);
            List<Message> messageList = messageDao.selectList(wrapper);
            if (messageList.size()==0){
                log.info("暂无消息发送,请等待...");
            }else {
                //进行信息发送
                for (Message message : messageList) {
                    String content = message.getId()+":"+message.getContent();
                    CorrelationData correlationData = new CorrelationData(message.getId()+"");
                    if (message.getExchange()==null) {
                        rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);
                    }
                    else{
                        rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(),  content, correlationData);
                    }
                    log.info("消息 {} 已发送",content);
                }
            }
        }
    }
    

    8.创建消息确定方法,确认接受方收到的了消息并进行了处理

    @RestController
    @RequestMapping("/message")
    @RequiredArgsConstructor
    public class MessageController implements IAMessageController{
    
        @Resource
        private MessageMysqlDao messageMysqlDao;
    
        @PostMapping("/update/{id}")
        public String messageUpdate(@PathVariable("id")Long id){
            QueryWrapper<Message> wrapper = new QueryWrapper<>();
            wrapper.eq("id", id);
            Message message = new Message();
            message.setStatus("C");
            messageMysqlDao.update(message,wrapper);
            return "success";
        }
    
    }
    

    9.消息接受者创建消息重复表进行消息去重

    在这里插入图片描述

    drop table if exists recived_message;
    create table recived_message
    (
        id  bigint auto_increment,
        recived_at datetime
    );
    

    10.接受方微服务创建监听器监听rabbitmq信息

    消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息

    @Configuration
    @RequiredArgsConstructor
    @Transactional
    public class HarvestResultLister {
        private final HarvestPlanMysqlDao harvestPlanMysqlDao;
        private final ReceivedMessageMysqlDao receivedMessageMysqlDao;
        private final HarvestCheckClient harvestCheckClient;
    
        @RabbitListener(queues = RabbitConfig.HARVEST_CHECK)
        public void harvestUpdateByCheck(String msg, Channel channel,
                                         @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
            try {
                System.out.println(msg);
                String[] split = msg.split(":");
                if (split.length != 3) {
                    throw new RabbitDataError("发送的数据异常");
                }
                String mesId = split[0];
                //获取发送内容id
                String contentId = split[1];
                //获取被修改的采收id
                String harvestId = split[2];
    
                ReceivedMessage receivedMessage = receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));
                if (receivedMessage != null){
                    throw new RabbitDataError("发送重复数据");
                }
    
                //存入数据
                receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));
    
                QueryWrapper<HarvestPlan> wrapper = new QueryWrapper<>();
                wrapper.eq("id", Long.parseLong(harvestId));
                HarvestPlan harvestPlan = new HarvestPlan();
                harvestPlan.setPurchaseStatusId(3L);
    
                String result = harvestCheckClient.messageUpdate(Long.parseLong(mesId));
                if (!"success".equals(result)){
                    throw new RabbitDataError("确认消息未正常传回");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
                throw e;
            }finally {
                try {
                    channel.basicAck(tag, false);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  • 相关阅读:
    Multitor:一款带有负载均衡功能的多Tor实例创建工具
    SpringBoot 集成RabbitMQ 实现钉钉日报定时发送功能
    中职计算机应用专业(大数据方向)建设实践
    (尚硅谷)2021 版 SpringMVC 教程笔记
    AAOS CarPowerManager
    Airtest如何自动连接重启后的设备并继续执行自动化脚本呢?
    JavaSE入门---数据类型与变量
    三个修饰符:abstract、static、final(JAVA基础五)
    TiDB简述及TiKV的数据结构与存储
    从内核角度看网络包发送流程
  • 原文地址:https://blog.csdn.net/qq_42652006/article/details/127091295