• RabbitMQ进阶技术回顾


    前言

    在项目开发过程中,对于发短信、发邮件和数据同步功能,或许会使用消息队列完成此功能的开发。既可以实现业务解耦,还可以保证在面对突发流量时相关业务正常运行。但在实际使用过程中,只了解最基本的发送和消费过程是不够的,还应了解消息的可靠性、持久性、可用性和扩展性等问题。通过阅读《RabbitMQ实战指南》一书,对于RabbitMQ有了更全面的了解,通过理论和项目实践有了更深入的了解,先将此过程记录如下。

    正文

    安装

    在了解 RabbitMQ 相关特性之前,还需要了解如何安装,这对于后续的参数配置和维护很有用。推荐使用 Docker 完成部署

    # 下载带有管理页面的镜像
    docker pull rabbitmq:3.9-management
    
    # 启动MQ并设置应用端口为5672,管理页面端口为15672,用户名为root,密码为123456
    docker run -d --hostname my-rabbit -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 ${镜像ID}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    角色

    从宏观角度看,RabbitMQ 所扮演的角色位于生产者和消费者之间,为两者提供了缓冲功能。在RabbitMQ内部通过交换(Exchange)和和队列(Queue)实现消息分发,最终由消费者完成消费。
    在这里插入图片描述

    • Product:消息生产者
    • Exchage:交换机,通过RoutingKey将消息转发到不同队列
    • Queue:存储消息
    • Consumer:消息消费者,通常包含业务逻辑
    Exchange

    在 RabbitMQ 中存储四种类型的交换机

    1. direct:将消息路由到 BindKey 和 RoutingKey 完全匹配的队列中
    2. fanout:把消息路由到所有与此交换机绑定的队列
    3. topic:通过通配符“.”、“*”和“#”分发到不同的队列
    4. headers:通过消息内容的headers属性进行匹配并进行分发

    组合关系

    在生产者发送消息前,需要配置交换机参数和队列参数,并完成交换机和队列的绑定。此过程既可以在管理页面进行设置,也可以通过代码配置
    在这里插入图片描述

    在这里插入图片描述

    发送消息

    在 SpringBoot 项目中使用 RabbitMQ 发送消息是非常方便的,这里对发送消息的API进行记录
    在这里插入图片描述

    至于消息的参数,可以通过官方文档了解,其中比较常见的配置如下:

    名称说明
    timestamp消息发送时间
    messageId消息id,一般使用 uuid
    deliveryMode消息持久化,取值有:NON_PERSISTENT、PERSISTENT
    contentEncoding内容编码,取值有:GZIP、UTF-8等
    contentType内容类型:application/json、application/xml等
    生产者确认

    当调用 send() 方法发送消息后,如何保证消息准确进入 RabbitMQ 和指定的队列哪?RabbitMQ 提供了事务和**发送方确认机制(publisher confirm)**解决此问题,这里对后者做如下记录:

    1. 配置回调
    @Slf4j
    @Component
    public class DefaultRabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
        /**
         * 消息成功进入 RabbitMQ 时,回调此方法且 ack 为 true
         *
         * @param correlationData correlation data for the callback.
         * @param ack             true for ack, false for nack
         * @param cause           An optional cause, for nack, when available, otherwise null.
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("confirm");
        }
    
        /**
         * 当交换机无法根据自身的特性或路由键找到一个符合条件的队列,就会回调此方法
         *
         * @param returned the returned message and metadata.
         */
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            log.info("returnedMessage");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    1. 指定回调
    @Slf4j
    @RestController
    public class MqController {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Resource
        private DefaultRabbitCallback defaultRabbitCallback;
    
    
        @GetMapping(value = "/send2")
        public Long send2() {
            // 配置 confirm 回调
            rabbitTemplate.setConfirmCallback(defaultRabbitCallback);
            // 配置 return 回调
            rabbitTemplate.setReturnsCallback(defaultRabbitCallback);
            // 数据体
            Map<String, Object> map = new HashMap<>(2);
            map.put("name", "王大海");
            map.put("age", 25);
            // 生成唯一 id
            String msgId = IdUtil.fastSimpleUUID();
            // 将唯一id 写入其中,后续当 confirm 回调失败时,可以完成一些业务逻辑
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(msgId);
    
            // 配置消息参数并发送
            rabbitTemplate.convertAndSend("example.exchange", "example.key1", JSONUtil.toJsonStr(map), message -> {
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.setTimestamp(new Date());
                messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                messageProperties.setMessageId(msgId);
                messageProperties.setContentEncoding(StandardCharsets.UTF_8.name());
                messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
                return message;
            }, correlationData);
            return System.currentTimeMillis();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    消费消息

    RabbitMQ 消费模式有两种:推(Push)模式和拉(Get)模式。
    拉模式

            // 队列名称和超时时间
            Message receive = rabbitTemplate.receive("example.queue", 3000);
            String str = new String(receive.getBody());
            MessageProperties messageProperties = receive.getMessageProperties();
            String messageId = messageProperties.getMessageId();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    推模式
    当有多个消费者监听一个队列时,RabbitMQ 会以轮训的方式将消息分发给每个消费者

    @Slf4j
    @Component
    public class MqListener {
    
        /**
         *
         * @param channel channel
         * @param message 消息体
         * @throws IOException IOException
         */
        @RabbitListener(queues = "example.queue")
        public void listen(Channel channel, Message message) throws IOException {
            String body = new String(message.getBody());
            MessageProperties messageProperties = message.getMessageProperties();
            Date timestamp = messageProperties.getTimestamp();
            log.info("body:{}; messageProperties:{}; timestamp:{}", body, messageProperties, DateUtil.formatDateTime(timestamp));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    可靠性接收

    当消费者接收到新消息,如果此时服务被强制关闭,如何保证这条新被正确消费吶?RabbitMQ 使用 ACK 机制保证了消息消费的可靠性。当消费过程中出现异常导致无法执行完整业务逻辑,此消息会重新回到队列中

    @Slf4j
    @Component
    public class MqListener {
    
        /**
         * 手动ack
         *
         * @param channel channel
         * @param message 消息体
         * @throws IOException IOException
         */
        @RabbitListener(queues = "example.queue", ackMode = "MANUAL", concurrency = "2")
        public void listen(Channel channel, Message message) throws IOException {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                String body = new String(message.getBody());
                MessageProperties messageProperties = message.getMessageProperties();
                Date timestamp = messageProperties.getTimestamp();
                log.info("body:{}; messageProperties:{}; timestamp:{}", body, messageProperties, DateUtil.formatDateTime(timestamp));
                // 成功消费,RabbitMQ 可以删除此消息
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                log.warn("exception:{}", ExceptionUtil.stacktraceToString(e));
                // 拒绝此标签的消息,并重新入队列
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    死信队列

    当消费者在消费信息的过程中出现异常,使用basicNackbasicReject命令并设置requeue参数为false时,此消息成为“死信”,存储死信的队列叫“死信队列”,转发消息的交换机叫“死信交换机”,同时,当消息过期和达到队列最大长度时,也会产生死信。
    具体到实际的业务场景,死信队列也可以实现业务解耦和功能拆分。例如:30分钟未支付就删除订单信息和事件过期监听等场景。
    实践出真知,使用SpringBoot实现死信队列的搭建和消息监听。
    在这里插入图片描述

    以上是死信队列的数据流转过程,搭建代码如下所示:

    @Configuration
    public class MessageConfig {
    
    
        public static final String NORMAL_QUEUE_NAME = "normal.queue";
        public static final String NORMAL_EXCHANGE_NAME = "normal.exchange";
        public static final String NORMAL_ROUTING_KEY = "normal.routing.key";
        public static final String DLX_QUEUE_NAME = "dlx.queue";
        public static final String DLX_QUEUE_EXCHANGE = "dlx.exchange";
        public static final String DLX_QUEUE_ROUTING_KEY = "dlx.routing.key";
    
        /**
         * 什么普通交换机
         *
         * @return Exchange
         */
        @Bean
        public Exchange exchange() {
            return ExchangeBuilder.directExchange(NORMAL_EXCHANGE_NAME).durable(true).build();
        }
    
        /**
         * 申明普通队列,并指定死信交换机和routingKey
         *
         * @return Queue
         */
        @Bean
        public Queue queue() {
            return QueueBuilder.durable(NORMAL_QUEUE_NAME).deadLetterExchange(DLX_QUEUE_EXCHANGE).deadLetterRoutingKey(DLX_QUEUE_ROUTING_KEY).build();
        }
    
        /**
         * 申明死信交换机
         *
         * @return Exchange
         */
        @Bean
        public Exchange dlxExchange() {
    
            return ExchangeBuilder.directExchange(DLX_QUEUE_EXCHANGE).durable(true).build();
        }
    
        /**
         * 申明死信队列
         *
         * @return Queue
         */
        @Bean
        public Queue dlxQueue() {
            return QueueBuilder.durable(DLX_QUEUE_NAME).build();
        }
    
        /**
         * 建立普通交换机和普通队列的绑定关系
         *
         * @return Binding
         */
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(exchange()).with(NORMAL_ROUTING_KEY).noargs();
        }
    
        /**
         * 简历死信交换机和死信队列的绑定关系
         *
         * @return Binding
         */
        @Bean
        public Binding dlxBinding() {
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_QUEUE_ROUTING_KEY).noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    向普通队列中推送偶数,并完成消费,若未奇数则转发给死信队列消费

    @RestController
    public class MqController {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Resource
        private MqConfirmConfig mqConfirmConfig;
    
    
        @GetMapping(value = "/send")
        public Long send() {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setMessageId(IdUtil.fastSimpleUUID());
            messageProperties.setContentEncoding(StandardCharsets.UTF_8.name());
            int i = RandomUtil.randomInt(1, 20);
            Message message = new Message(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messageProperties);
    
            rabbitTemplate.setConfirmCallback(mqConfirmConfig);
            rabbitTemplate.send(MessageConfig.NORMAL_EXCHANGE_NAME, MessageConfig.NORMAL_ROUTING_KEY, message, new CorrelationData(IdUtil.fastSimpleUUID()));
            return System.currentTimeMillis();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    普通队列消费偶数

    @Slf4j
    @Component
    public class NormalMessageListener {
    
        @RabbitListener(queues = MessageConfig.NORMAL_QUEUE_NAME)
        public void exec(Message message, Channel channel) throws IOException {
            String s = new String(message.getBody());
            MessageProperties messageProperties = message.getMessageProperties();
            int num = Integer.parseInt(s);
            boolean fixed = num % 2 == 0;
            if (fixed) {
                log.info("偶数正常消费; num:{}", num);
                channel.basicAck(messageProperties.getDeliveryTag(), false);
                return;
            }
            log.info("奇数不消费; num:{}", num);
            channel.basicReject(messageProperties.getDeliveryTag(), false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    死信队列消费奇数

    @Slf4j
    @Component
    public class DlxMessageListener {
    
        @RabbitListener(queues = MessageConfig.DLX_QUEUE_NAME)
        public void dlxListener(Message message, Channel channel) throws IOException {
            String s = new String(message.getBody());
            MessageProperties messageProperties = message.getMessageProperties();
            log.info("死信队列消费奇数; s:{}", s);
            channel.basicAck(messageProperties.getDeliveryTag(), false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    延时队列

    延时队列是一种特殊的死信队列,通过给消息设置过期时间(TTL),当消息过期后转发至死信交换机再进行消费

    @RestController
    public class MqController {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Resource
        private MqConfirmConfig mqConfirmConfig;
    
    
        @GetMapping(value = "/send")
        public Long send() {
            MessageProperties messageProperties = new MessageProperties();
            // 设置消息的有效时间为10秒
            messageProperties.setExpiration(String.valueOf(TimeUnit.SECONDS.toMillis(10)));
            messageProperties.setMessageId(IdUtil.fastSimpleUUID());
            messageProperties.setContentEncoding(StandardCharsets.UTF_8.name());
    
            Message message = new Message(DateUtil.now().getBytes(StandardCharsets.UTF_8), messageProperties);
    
            rabbitTemplate.setConfirmCallback(mqConfirmConfig);
            rabbitTemplate.send(MessageConfig.NORMAL_EXCHANGE_NAME, MessageConfig.NORMAL_ROUTING_KEY, message, new CorrelationData(IdUtil.fastSimpleUUID()));
            return System.currentTimeMillis();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    普通队列不消费,10秒钟后由死信交换机对应的消费者进行消费

    @Slf4j
    @Component
    public class DlxMessageListener {
    
        @RabbitListener(queues = MessageConfig.DLX_QUEUE_NAME)
        public void dlxListener(Message message, Channel channel) throws IOException {
            String s = new String(message.getBody());
            MessageProperties messageProperties = message.getMessageProperties();
            log.info("死信队列消费; s:{}", s);
            channel.basicAck(messageProperties.getDeliveryTag(), false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    实际问题

    候后续补充

    参考资料

    RabbitMQ 可靠性、重复消费、顺序性、消息积压解决方案

  • 相关阅读:
    DevExpress中文教程 - 如何在macOS和Linux (CTP)上创建、修改报表(上)
    剑指 Offer II 093. 最长斐波那契数列
    PHP框架详解:Symfony框架的深度剖析
    java汽车租赁超时罚款系统springboot+vue-前后端分离-e36ht
    启动spark-shell时报错java.lang.NumberFormatException: For input string: “0x100“
    【小白专属03】SpringBoot实现增删改查
    栈的运行算法
    AWS SAA-C03 #108
    高校教务系统登录页面JS分析——广东海洋大学
    使用资源编排 ROS 轻松部署高可用架构网站——以 WordPress 为例
  • 原文地址:https://blog.csdn.net/lhc_makefunny/article/details/127643844