• Docker简易部署RabbitMQ集群、分布式事务解决方案+案例(可靠生产、可靠消费)


    RabbitMQ–了解中间件、常用的中间件、分布式系统使用中间件、Docker安装rabbitmq及遇到的问题、RabbitMQ核心组成、消息模式
    Springboot整合RabbitMQ(Fanout、Direct、Topic模式)、设置队列信息TTL、死信队列、RabbitMQ磁盘监控,内存控制
    Springboot+Rabbitmq消费者注解详解、改序列化方式
    Docker简易部署RabbitMQ集群、分布式事务解决方案+案例(可靠生产、可靠消费)
    Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区

    RabbitMQ集群

    使用Docker的方式创建并且配置集群,可以参考docker简易搭建RabbitMQ集群

    需要服务器上先安装好Docker,可以看看这篇Docker+Centos7安装+卸载

    先拉取镜像:

    这个镜像已经开启了页面管理插件,不需要我们再去使用开启插件了。

    docker pull rabbitmq:3.7-management
    
    • 1

    设置好挂载的宿主机目录

    mkdir /rabbitmqcluster
    cd /rabbitmqcluster/
    mkdir rabbitmq01 rabbitmq02 rabbitmq03
    
    • 1
    • 2
    • 3

    创建启动节点

    创建并且启动主节点 rabbitmq1

    docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -v /rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq --hostname myRabbit1 --name rabbitmq1 rabbitmq:3.7-management
    
    • 1

    rabbitmq1设置为主节点,并且通过rabbitmq1的管理页面来管理整个集群。设置账号密码为admin

    每个节点都要设置erlang的cookie,并且每个节点的erlang.cookie中的值是一样的才行。

    创建启动rabbitmq2,从节点。–link连接 rabbitmq1的host

    docker run -d -p 5673:5672 --link rabbitmq1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -v /rabbitmqcluster/rabbitmq02:/var/lib/rabbitmq --hostname myRabbit2 --name rabbitmq2 rabbitmq:3.7-management
    
    • 1

    创建启动rabbitmq3,从节点。–link连接 rabbitmq1的host和 rabbitmq2的host

    docker run -d -p 5674:5672 --link rabbitmq1:myRabbit1 --link rabbitmq2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -v /rabbitmqcluster/rabbitmq03:/var/lib/rabbitmq --hostname myRabbit3 --name rabbitmq3 rabbitmq:3.7-management
    
    • 1

    加入到rabbitmq集群

    rabbitmq1

    docker exec -it rabbitmq1 /bin/bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl start_app
    exit
    
    • 1
    • 2
    • 3
    • 4
    • 5

    rabbitmq2

    -join_cluster: 加入到集群中,作为 hostname为myRabbit1的内存节点。

    docker exec -it rabbitmq2 /bin/bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbit@myRabbit1
    rabbitmqctl start_app
    exit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    –ram : 参数“–ram”表示设置为内存节点,忽略次参数默认为磁盘节点。 rabbit是默认写死。myRabbit1是rabbitmq1的hostname值。

    rabbitmq3

    docker exec -it rabbitmq3 /bin/bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbit@myRabbit1
    rabbitmqctl start_app
    exit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    打开管理页面

    我们只开启了 rabbitmq1 的管理页面,只开放了 15672 端口,访问如下:

    http://192.168.1.18:15672

    账号:admin

    密码:admin

    在这里插入图片描述

    可看到,集群中一共有3个节点。以及它们的内存占用空间,基本信息。

    分布式事务

    简述:

    分布式事务是指事务的操作位于不同的节点上,需要保证事务的ACID特性。例如购物系统,订单和库存位于不同的节点上,假如减库存操作成功了,但是订单创建失败了(网络或者某种原因下),那么我们的分布式事务就需要确保减库存的操作进行回滚,保证数据的最终一致性。

    总体来讲就是:独立的系统,独立的服务,独立的JVM,独立的数据库如何保证数据的一致性,就是指分布式事务。

    方式:

    • 两阶段提交(2PC):通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。(同步阻塞,强一致性)
    • 补偿事务(TCC) : TCC采用的就是补偿机制,针对每个操作,都要注册一个与其对应的确认和补偿操作。(流程简单,数据一致性比2PC差一些)
    • 本地消息表(异步确保):本地消息表与业务数据表处于同一个数据库中,利用本地事务来保证在对这两个表的操作满足事务特性,并且使用消息队列保证最终一致性。(避免了分布式事务,实现了最终一致性)
    • MQ:第三方的MQ是支持事务消息的,它们实现的方式也是类似于采用二阶段提交(2PC)。如使用 RabbitMQ

    案例

    参考学相伴-飞哥RabbitMQ教学

    案例大致流程图

    在这里插入图片描述

    图知:订单服务和配送服务访问的不是同一个数据库,那么如何保证数据的一致性呢?

    先把案例需要的环境及配置准备好。

    数据库表

    订单服务表

    在这里插入图片描述

    派送服务表

    在这里插入图片描述

    创建两个springboot项目,mvn依赖如下:

    <dependencies>
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-amqpartifactId>
        dependency>
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-configuration-processorartifactId>
        dependency>
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-testartifactId>
        dependency>
        
        <dependency>
            <groupId>org.apache.commonsgroupId>
            <artifactId>commons-lang3artifactId>
            <version>3.6version>
        dependency>
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-jdbcartifactId>
        dependency>
        
        <dependency>
            <groupId>mysqlgroupId>
            <artifactId>mysql-connector-javaartifactId>
        dependency>
        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
        dependency>
        
        <dependency>
            <groupId>com.fasterxml.jackson.dataformatgroupId>
            <artifactId>jackson-dataformat-avroartifactId>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    dispatcher(配送服务)项目

    application.yml

    server:
      port: 9000
    
    spring:
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/dispatcher?useUnicode=true&characterEncoding=utf-8
        username: root
        password: 134520
    
      rabbitmq:
        username: admin
        password: admin
        virtual-host: /
        # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
        addresses: 192.168.1.18:5672,192.168.1.18:5673,192.168.1.18:5674
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    DispatcherService.java实现

    @Service
    @Transactional
    public class DispatcherService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void dispatch(String orderId) throws Exception {
            //定义sql
            String sql = "insert into dispatcher(order_id,dispatcher_id,status,order_content,user_id,create_time) values (?,?,?,?,?,?)";
            //添加记录
            int count = jdbcTemplate.update(sql, orderId, UUID.randomUUID().toString(), 0, "测试数据", 1, new Date());
    
            if (count != 1) {
                throw new Exception("订单修改失败");
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    DispatcherController.java实现,配送接口模拟超时场景。

    @RestController
    public class DispatcherController {
    
        @Autowired
        private DispatcherService dispatchService;
    
        @GetMapping("/dispatch")
        public String dispatcher(String orderId) throws Exception {
            dispatchService.dispatch(orderId);
            //模拟超时 导致调用方请求失败
            Thread.sleep(5000);
            return "success";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    order(订单服务)项目

    application.yml

    server:
      port: 8089
    
    spring:
      datasource:
        url: jdbc:mysql://localhost:3306/dispatcher_order?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=utf-8
        username: root
        password: 134520
        driver-class-name: com.mysql.cj.jdbc.Driver
      rabbitmq:
        username: admin
        password: admin
        virtual-host: /
        # 集群连接方式
        addresses: 192.168.1.18:5672,192.168.1.18:5673,192.168.1.18:5674
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    OrderDataService.java

    @Transactional(rollbackFor = Exception.class)
    @Service
    public class OrderDataService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void saveOrder(Order order) throws Exception {
            String sql = "insert into `order`(order_id,user_id,order_content) values (?,?,?)";
            int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
    
            if (count != 1) {
                throw new Exception("订单创建失败。");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    OrderService.java

    @Service
    public class OrderService {
    
        @Autowired
        private OrderDataService orderDataService;
    
        @Transactional(rollbackFor = Exception.class)
        public void createOrder(Order order) throws Exception {
            orderDataService.saveOrder(order);
    
            String result = dispatcherHttpApi(order.getOrderId());
            if (!result.equals("success")) {
                throw new Exception("远程调用失败。");
            }
        }
    
        private String dispatcherHttpApi(String orderId) {
            SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
            //连接超时>3
            factory.setConnectTimeout(3000);
            //处理超时>2
            factory.setReadTimeout(2000);
    
            String url = "http://localhost:9000/dispatch?orderId=" + orderId;
    		RestTemplate restTemplate = new RestTemplate(factory);
            return restTemplate.getForObject(url, String.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    测试类

    @SpringBootTest
    class SpringbootOrderRabbitmqOrderApplicationTests {
    
        @Autowired
        private OrderService orderService;
    
        @Test
        void createOrder() throws Exception {
            String orderId = "10001";
            Order order = new Order();
            order.setOrderId(orderId);
            order.setUserId(1);
            order.setOrderContent("汉堡");
    
            orderService.createOrder(order);
            System.out.println("创建成功!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    经过测试后发现:

    在这里插入图片描述

    订单服务在创建完订单后调用了配送服务,但是调用配送服务时超时了并且报了异常,然后订单服务进行数据回滚了。配送服务是先把配送数据给新增成功了,但是响应时间过长了。就导致订单服务中没有数据,配送服务反而多了一条数据。当然这个例子可能不是特别合理,一般情况下,并发量过高的系统中我们会去使用消息中间件异步执行,而不是直接去调用服务。

    处理分布式事务

    那么如何能让两个数据库的数据回滚呢?

    如果让两边的数据同时都回滚,这是不可能的事,两个数据库实例都是独立的,无论是开启事务还是提交事务都只能影响到一个数据库。那么我们可以通过一些别的手段来实现想要的效果。

    基于MQ的分布式事务整体设计思路。为了处理分布式事务问题,我们可以引入消息队列来实现相关的功能。

    主要是分为两部分:

    • 可靠生产
    • 可靠消费

    可靠生产

    如果做到可靠生产呢?

    在这里插入图片描述

    思路

    思路:增加一个数据状态表,每次新增订单时同时也新增一条数据状态,然后利用中间件提供的publisher/confirm功能开启确认机制,每次消息发送到MQ中,MQ都会回调生产者的方法返回回执信息,然后再将数据状态表中的数据改为已发送状态。

    我们也可以同时创建定时任务,从数据状态表中查找状态异常的数据,然后重新发送到MQ中

    实现

    新增一个数据表,对数据进行冗余从而确保消息的可靠性。

    在这里插入图片描述

    方式:首先需要将订单存入数据表中,同时新增一条数据状态表,然后将消息推送到队列中,接收者接收到消息并且执行完相应的操作后将这个数据状态改为正常(或者别的能够理解的方式)。注意一定要打开消息队列的确认机制。

    rabbitmq:
    	publisher-confirm-type: correlated # 确认机制 必须要
    
    • 1
    • 2

    配置交换机和队列

    @Configuration
    public class RabbitMQConfiguration {
    
        // 配置一个死信交换机
        @Bean
        public FanoutExchange deadExchange() {
            return new FanoutExchange("dead_order_fanout_exchange", true, false);
        }
    
        // 死信队列
        @Bean
        public Queue deadOrderQueue() {
            return new Queue("dead.order.queue", true);
        }
    
        // 绑定死信交换机和队列
        @Bean
        public Binding bindDeadOrder() {
            return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
        }
    
        // 配置订单交换机
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("order_fanout_exchange", true, false);
        }
    	// 订单队列,并且设置好死信交换机
        @Bean
        public Queue orderQueue() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
            return new Queue("order.queue", true, false, false, args);
        }
    
        // 绑定
        @Bean
        public Binding bindOrder() {
            return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    业务层代码

    @Service
    public class MQOrderService {
    
        @Autowired
        private OrderDataService orderDataService;
    
        @Autowired
        private OrderMQService orderMQService;
    
        public void createOrder(Order order) throws Exception {
    
            orderDataService.saveOrder(order);
    
            orderMQService.sendMessage(order);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    OrderDataService.java保存数据类

    @Transactional(rollbackFor = Exception.class)
    @Service
    public class OrderDataService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void saveOrder(Order order) throws Exception {
            String sql = "insert into `order`(order_id,user_id,order_content) values (?,?,?)";
            int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
    
            if (count != 1) {
                throw new Exception("订单创建失败。");
            }
    
            saveLocalMessage(order);
    
        }
        private void saveLocalMessage(Order order) throws Exception{
            String sqlString = "insert into orders_message(order_id,order_content,status,unique_id) values(?,?,?,?)";
            int count = jdbcTemplate.update(sqlString, order.getOrderId(), order.getOrderContent(), 0, 1);
            if (count != 1) {
                throw new RuntimeException("保存消息异常");
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    发送消息,以及rabbitmq返回的ack应答机制(回调)。

    @Transactional(rollbackFor = Exception.class)
    @Service
    public class OrderMQService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //java 自带注解
        //被他注解的方法会在服务器加载servlet的时候运行,并且只执行一次,在init()方法前执行
        // 表示此类被实例化完成后调用此方法
        @PostConstruct
        public void regCallBack() {
            //消息发送成功后,给与生产者的消息回执,确保生产者的可靠性
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String s) {
                    System.out.println("cause:" + s);
                    String orderId = correlationData.getId();
                    
                    //如果ack为false,则代表消息没收到
                    if (!ack) {
                        // 这里可能要进行其他的方式进行存储
                        System.out.println("应答失败,orderid:" + orderId);
                        return;
                    }
    
                    try {
                        String updateSQL = "update orders_message set status = 1 where order_id = ?";
                        int count = jdbcTemplate.update(updateSQL, orderId);
                        if (count == 1) {
                            System.out.println("修改成功,进入消息队列。");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }
            });
        }
    
        public void sendMessage(Order order) {
            //通过MQ发送消息
            rabbitTemplate.convertAndSend("order_fanout_exchange", "", JSON.toJSONString(order), new CorrelationData(order.getOrderId()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    测试后,订单表中新增了一条数据,并且状态表新增了一条数据,并且状态数据的status字段值被改为了1,说明流程正常执行,确保了消息生产的可靠。

    可靠消费

    在这里插入图片描述

    思路

    思路:首先利用RabbitMQ的ACK机制,由消费者控制消息的重发、清除、丢弃,然后再考虑可能会出现的消息重复(多半由于网络动荡原因导致MQ没接收到ack,然后进行了重试发送消息)造成的非幂等性问题(可用mysql的主键、唯一索引之类、redis的分布式锁解决)

    实现

    在我们没有开启手动ack并且没有设置最大重试次数时,一旦我们的消费者在程序中抛出异常,那么消费者将会一直尝试重新消费。所以,我们一定要将手动ack机制打开,且将最大重试次数给设置上,否则将会一直死循环抛出异常!!!

    且我们在前面配置了死信队列,一旦此消息的重试次数达到了最大后,那么此消息会被放置到死信队列中去。

    实现消费可靠性,在不进行处理的情况下,如果消息错误,会造成死循环,可以通过两种方式来解决这种情况。

    • 设置手动ack及重试次数 + 死信队列
    • try/catch + 手动ack + 死信队列 + 人工
    设置手动ack及重试次数 + 死信队列
        # 这里是开启手动ack,让程序去控制MQ的消息重发
        listener:
          simple:
            acknowledge-mode: manual
            retry:
              enabled: true # 开启重试
              max-attempts: 10 # 最大重试次数
              initial-interval: 2000ms # 间隔时间
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消费者完整的yml文件

    server:
      port: 9000
    
    spring:
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/dispatcher?useUnicode=true&characterEncoding=utf-8
        username: root
        password: 134520
    
      rabbitmq:
        username: admin
        password: admin
        virtual-host: /
        # 开启手动 ack,让程序控制MQ的消息重发和删除和转义
        listener:
          simple:
            acknowledge-mode: manual
            retry:
              enabled: true # 开启重试
              max-attempts: 10  # 最大重试次数
              initial-interval: 2000ms  # 重试间隔
        # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
        addresses: 192.168.1.18:5672,192.168.1.18:5673,192.168.1.18:5673
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    消费者监听

    @Component
    public class MQConsumer {
    
        @Autowired
        private DispatcherService dispatcherService;
    
        @RabbitListener(queues = "order.queue")
        public void messageConsumer(String orderMsg) throws Exception {
            System.out.println("消息:" + orderMsg);
            JSONObject order = JSONObject.parseObject(orderMsg);
            String orderId = order.getString("orderId");
            // 派单处理
            dispatcherService.dispatch(orderId);
    
            System.out.println(1 / 0); // 出现异常
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    以上重试10次后,仍然出现异常,消息会投放到死信队列中去。

    try/catch + 手动ack + 死信队列 + 人工

    首先也需要打开手动ack模式,然后在监听队列的时候对异常进行相关的处理。如果出现异常,则手动控制消息是否重发,不建议重发,否则在try/catch中会出现死循环;由于我们设置了死信队列,所以该队列会将消息转交给死信队列。
    我们在监听死信队列的时候,如果还出现异常就进行相对应的处理,可以将其存储到其他数据库中或者短信提示让人工进行干预,然后将消息移除即可。

    消费者进行监听

    @Component
    public class MQConsumer {
        
        @Autowired
        private DispatcherService dispatcherService;
        
        @RabbitListener(queues = "order.queue")
        public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
            try {
                System.out.println("消息:" + orderMsg);
                JSONObject order = JSONObject.parseObject(orderMsg);
                String orderId = order.getString("orderId");
                // 派单处理
                dispatcherService.dispatch(orderId);
    
                System.out.println(1 / 0); // 出现异常
                // 手动确认
                channel.basicAck(tag, false);
            } catch (Exception e) {
                // 如果出现异常的情况下 根据实际情况重发
                // 重发一次后,丢失
                // 参数1:消息的tag
                // 参数2:多条处理
                // 参数3:重发
                    // false 不会重发,会把消息打入到死信队列
                    // true 重发,建议不使用try/catch 否则会死循环
                
                // 手动拒绝消息
                channel.basicNack(tag, false, false); // false会将数据给死信队
            }
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    @Header(AmqpHeaders.DELIVERY_TAG) long tag这个注解可以获取到接收到的消息Header中的一些数据,而在手动提交/决绝时必须要携带这个Header中的数据才能正常提交/拒绝。

    以上,使用了try-catch如果捕获到异常,那么会直接将数据丢给死信队列,重试次数的配置就已经没有任何意义了。如果在手动拒绝消息时设置为重发,那么配置的最大重试次数也会失效(无限重发)。(如果使用了try-catch捕获异常,手动拒绝消息,重试次数的配置就已经毫无意义了。)

    而且,当消费者中接收数据后处理出现异常,很多情况下是bug原因,是需要程序员修改的。

    如果要做好消息闭环,那么再配置一个死信接收者,处理后续的消息。如果在死信消费中也异常,那么直接将消息通过人工干预,或存入其他DB中方便查看,再手动拒绝掉。

    @Service
    public class DeadOrderMQConsumer {
    
        @Autowired
        private DispatchService dispatchService;
    
        @RabbitListener(queues = {"dead.order.queue"})
        public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
            try {
                System.out.println("消息:" + orderMsg);
                JSONObject order = JSONObject.parseObject(orderMsg);
                String orderId = order.getString("orderId");
                //派单处理
                dispatchService.dispatch(orderId);
                // 手动确认
                channel.basicAck(tag, false);
            } catch (Exception e) {
                System.out.println("人工干预");
                System.out.println("同时把消息转移到别的存储db");
    
                // 手动拒绝
                channel.basicNack(tag, false, false);
            }
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    总结

    基于MQ的分布式事务解决方案优点

    • 通用性强
    • 拓展方便
    • 耦合度低,方案比较成熟

    缺点

    • 基于消息中间件,只适合异步场景
    • 消息会延迟处理

    建议

    • 尽量去避免分布式事务
    • 尽量将非核心业务做成异步
  • 相关阅读:
    代码随想录二刷day31
    中国汉服行业发展深度调研与未来趋势预测报告
    凉鞋的 Godot 笔记 108. 第二个通识:增删改查
    #机器翻译
    说一说PCIe5.0的速率和带宽
    Linux_磁盘管理_df命令
    CS:GO开服架设服务器搭建游戏配置方法教程教学插件配置下载资源配置
    【Arduino27】DHT11温湿度传感器模拟值实验
    ISIS——LSP讲解
    【Pycharm配置】在Pycharm中配置Jupyter环境
  • 原文地址:https://blog.csdn.net/weixin_45248492/article/details/126178308