• 深入了解 RabbitMQ:高性能消息中间件


    一、什么是消息队列

            消息队列(Message Queue)是在消息的传输过程中保存消息的容器、  消息指的是两个应用间传递的数据。数据的类型有很多种形式

    二、应用场景 

    主要有三个作用
    异步处理

            场景说明: 用户注册后,需要发注册邮件和注册短信,传统的做法串行的

    应用解耦

            场景: 双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口

            
    流量削峰

    场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

    消息队列优缺点

    关于消息队列的优点也就是上面列举的:解耦、异步、削峰

    缺点有以下几个:系统可用性降低、系统复杂度提高、一致性问题

    常用消息中间件


            AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

            JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    AMQP和JMS

            MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

    两者间的区别和联系:

    • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
    • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
    • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

    常见MQ产品  kafka、ActiveMQ、RocketMQ、RabbitMQ

    • ActiveMQ:基于JMS
    • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
    • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
    • Kafka:分布式消息系统,高吞吐量

    下面我们来看一下,他们之间有什么区别,他们分别应该用于什么场景

    三、RabbitMQ工作原理

            RabbitMQ是一个由 erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

    核心概念
    Publisher
            消息的生产者,也是一个向交换器发布消息的客户端应用程序。
    Message
            消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、 priority (相对于其他消息的优先权)。
    Exchange
            交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
    Exchange 4 种类型: direct( 默认 ) fanout, topic, headers ,不同类型的 Exchange 转发消息的策略有所区别
    Queue
            消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
    Binding
            绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。
    Exchange Queue 的绑定可以是多对多的关系。
    Connection
            网络连接,比如一个TCP 连接。
    Channel
            信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP 连接内的虚拟连接, AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
    Consumer
            消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
    Virtual Host
            虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
    Broker
    表示消息队列服务器实体

    AMQP 中的消息路由

    AMQP 中消息的路由过程和 Java 开 发者熟悉的 JMS 存在一些差别, AMQP 中增加了 Exchange Binding 的角色。生产者把消息发布 到 Exchange 上,消息最终到达队列 并被消费者接收,而 Binding 决定交 换器的消息应该发送到那个队列。

    Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型: direct、 fanout、topic(超链接详情)、 headers headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

     四、Docker安装RabbitMQ

    docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
    4369, 25672 (Erlang 发现 & 集群端口 )
    5672, 5671 (AMQP 端口 )
    15672 (web 管理后台端口 )
    61613, 61614 (STOMP 协议端口 )
    1883, 8883 (MQTT 协议端口 )
     端口介绍 可进入官网看详解:https://www.rabbitmq.com/networking.html

    五、SpringBoot整合RabbitMQ

    pom.xml里导入相关的依赖:

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>

    application.properties配置文件

    1. spring.rabbitmq.host=192.168.152.155
    2. spring.rabbitmq.port=5672
    3. spring.rabbitmq.virtual-host=/

    使用 Direct exchange(直通交换机)Rabbitmq的发送和接受消息

    1. package com.beijing.gulimall.product.rabbitmq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Binding;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. @Slf4j
    9. @Configuration
    10. public class DirectRabbitConfig {
    11. //创建队列
    12. @Bean
    13. public Queue TestDirectQueue() {
    14. //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
    15. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    16. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    17. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    18. log.info("Queue[{}]创建成功", "TestDirectQueue");
    19. return new Queue("TestDirectQueue", true, false, false);
    20. }
    21. //创建交换机
    22. @Bean
    23. DirectExchange TestDirectExchange() {
    24. log.info("Exchange[{}]创建成功", "TestDirectExchange");
    25. return new DirectExchange("TestDirectExchange", true, false);
    26. }
    27. //创建绑定关系
    28. @Bean
    29. Binding TestBindingDirect() {
    30. // public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
    31. // Map arguments) {
    32. log.info("Binding[{}]创建成功", "TestBindingDirect");
    33. return new Binding("TestDirectQueue", Binding.DestinationType.QUEUE, "TestDirectExchange", "direct.test", null);
    34. }
    35. }

    然后写个接口进行消息推送 SendMessageController.java

    1. package com.beijing.gulimall.product.rabbitmq;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. import java.time.LocalDateTime;
    7. import java.time.format.DateTimeFormatter;
    8. import java.util.HashMap;
    9. import java.util.Map;
    10. import java.util.UUID;
    11. @RestController
    12. public class SendMessageController {
    13. @Autowired
    14. RabbitTemplate rabbitTemplate;
    15. @RequestMapping("/hello")
    16. public void testRabbitMQ() {
    17. String messageId = String.valueOf(UUID.randomUUID());
    18. String messageData = "test message, hello!";
    19. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    20. Map map = new HashMap<>();
    21. map.put("messageId", messageId);
    22. map.put("messageData", messageData);
    23. map.put("createTime", createTime);
    24. //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
    25. rabbitTemplate.convertAndSend("TestDirectExchange", "direct.test", map);
    26. }
    27. }

    配置一下Rabbit序列化对象的方式

    1. package com.beijing.gulimall.product.rabbitmq;
    2. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    3. import org.springframework.amqp.support.converter.MessageConverter;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. @Configuration
    7. public class RabbitmqConfig {
    8. @Bean
    9. public MessageConverter messageConverter(){
    10. return new Jackson2JsonMessageConverter();
    11. }
    12. }

    然后写个接口进行消息接收消息 主启动类上必须加 @EnableRabbit

    1. package com.beijing.gulimall.product.rabbitmq;
    2. import com.rabbitmq.client.Channel;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    6. import org.springframework.stereotype.Service;
    7. import java.util.Map;
    8. @RabbitListener(queues = "TestDirectQueue")
    9. @Service
    10. public class ReceiveRabbitMQ {
    11. @RabbitHandler
    12. public void ReceiveRabbitMQ(Map entity, Channel channel, Message message){
    13. /**
    14. * MQ监听消息
    15. * @param
    16. * @return
    17. * queues:指定监听那个队列 可以监听多个 只要收到消息,队列删除消息,而且只能有一个收到此消息
    18. * 1、Message msg:原生的消息详细信息内容 包括头和体
    19. * 2、T<发送的消息类型>:内容,会根据发送的消息类型自动跳转不同的方法
    20. * 3、通道 channel 当前传输的数据的通道
    21. *
    22. * 场景:
    23. * 1、假设是集群部署当前项目 都有这段代码 同一个消息只能由一个服务消费
    24. * 2、只有一个消息处理完成之后(方法执行完)才能继续接受下一个消息
    25. *
    26. * RabbitListener (可以标记在方法(表示当前方法监听队列)和类上(配合 RabbitHandler,整个类中标记了改注解的都会监听指定的队列)):指定监听的队列,当发送的消息类型不同时,可以使用RabbitHandler标记不同的方法,接收的类型不同,进入的监听方法就不同
    27. * RabbitHandler 只能标记在方法上,用于监听接收同一个队列中的不同的消息类型的消息
    28. */
    29. System.out.println(entity);
    30. System.out.println(channel);
    31. System.out.println(message.getBody());
    32. System.out.println(message.getMessageProperties());
    33. }
    34. }

    六、RabbitMQ消息确认机制-可靠抵达

    保证消息不丢失,可靠抵达,可以使用事务消息,性能下降 250 倍,为此引入确认 机制
    publisher confirmCallback 确认模式
    publisher returnCallback 未投递到 queue 退回模式
    consumer ack 机制

     可靠抵达-ConfirmCallback

     spring.rabbitmq.publisher-confirms=true

    • 开启 confirmcallback
    • CorrelationData:用来表示当前消息唯一性。
    • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker接收到才会调用 confirmCallback
    • broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递
    • 到目标 queue 里。所以需要用到接下来的 returnCallback
    1. #开启发送端确认 生产者Publisher 到服务器Broker
    2. spring.rabbitmq.publisher-confirms=true
    可靠抵达-ReturnCallback
     
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    • confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到returns退回模式。
    • 这样如果未能投递到目标queue里将调用returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
    1. #开启发送端消息抵达队列的确认
    2. spring.rabbitmq.publisher-returns=true
    3. #只要抵达队列,以异步发送优先回调我们这个returnConfirm
    4. spring.rabbitmq.template.mandatory=true

      实现代码: 

    1. package com.beijing.gulimall.product.rabbitmq;
    2. import org.springframework.amqp.core.Message;
    3. import org.springframework.amqp.rabbit.connection.CorrelationData;
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    5. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    6. import org.springframework.amqp.support.converter.MessageConverter;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.context.annotation.Bean;
    9. import org.springframework.context.annotation.Configuration;
    10. import javax.annotation.PostConstruct;
    11. @Configuration
    12. public class RabbitmqConfig {
    13. @Autowired
    14. private RabbitTemplate rabbitTemplate;
    15. @Bean
    16. public MessageConverter messageConverter(){
    17. return new Jackson2JsonMessageConverter();
    18. }
    19. /**
    20. * 定制RabbitTemplate
    21. * 1、服务收到消息就回调
    22. * 1、spring.rabbitmq.publisher-confirms=true
    23. * 2、设置确认回调 ConfirmCallback
    24. * 2、消息正确抵达队列进行回调
    25. * 1、spring.rabbitmq.publisher-returns=true
    26. * spring.rabbitmq.template.mandatory=true
    27. * 2、设置确认回调ReturnCallback
    28. * 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
    29. * spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动ack消息
    30. * 1、默认是自动确认的,只要消息接收到,客户端自动确认,服务端就会移除这个消息
    31. * 问题:
    32. * 我们收到很多消息,自动回复给服务器ack,只要一个消息处理成功,宕机了。发送消息丢失;
    33. * 手动确认。只要我们没有明确告诉MQ,货物被签收。没有ack,消息就一直是unacked状态。即使Consumer宕机。消息不会丢失,会重新变为Ready模式
    34. *
    35. * 2、如何签收:
    36. * channel. basicAck(deliveryTag, false);签收;业务成功完成就应该签收
    37. * channel。basicNack(deliveryTag, false, true);拒签;业务失败,拒签
    38. */
    39. //p->b
    40. @PostConstruct //MyRabbitConfig 对象创建完成以后,执行这个方法
    41. public void initRabbitTemplate() {
    42. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    43. /**
    44. *
    45. * @param correlationData 当前消息的唯一管理数据(这个是消息的唯一id)
    46. * @param ack 消息是否成功收到
    47. * @param cause 失败的原因
    48. */
    49. @Override
    50. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    51. /**
    52. * 1、做好消息确认机制(producer,consumer【手动ack】)
    53. * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
    54. */
    55. //服务器收到了
    56. //修改消息状态
    57. System.out.println("confirm...correlationData[" + correlationData + "]" + "ack == [" + ack + "]" + "cause ==[" + cause + "]");
    58. }
    59. });
    60. //e->q
    61. //设置消息抵达队列的确认回调
    62. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
    63. /**
    64. * 只要消息没有投递给指定的队列,就触发这个失败回调
    65. * @param message 投递失败的消息详细消息
    66. * @param replyCode 回复的状态码
    67. * @param replyText 回复的文本内容
    68. * @param exchange 当时这个消息发给那个交换机
    69. * @param routingKey 当时这个消息用那个路由键
    70. */
    71. @Override
    72. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    73. //报错误了。修改数据库当前消息的状态 -> 错误
    74. System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>"+exchange+"==>[["+routingKey+"]]");
    75. }
    76. });
    77. }
    78. }

    测一下 故意把路由写成 p->b 成功 e->失败

    1. @RequestMapping("/hello")
    2. public void testRabbitMQ() {
    3. String messageId = String.valueOf(UUID.randomUUID());
    4. String messageData = "test message, hello!";
    5. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    6. Map map = new HashMap<>();
    7. map.put("messageId", messageId);
    8. map.put("messageData", messageData);
    9. map.put("createTime", createTime);
    10. //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
    11. rabbitTemplate.convertAndSend("TestDirectExchange", "direct.te12312st", map);
    12. }

    结果 

    Fail Message[(Body:'{"createTime":"2023-10-13 16:38:17","messageId":"b916a451-b96b-4a41-9229-c8b12d5710a0","messageData":"test message, hello!"}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, X-B3-SpanId=174db242dccf34df, __KeyTypeId__=java.lang.Object, X-B3-ParentSpanId=f0a4cf84a15d8d32, X-B3-Sampled=0, X-B3-TraceId=f0a4cf84a15d8d32, __TypeId__=java.util.HashMap}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]==>replyCode[312]==>replyText[NO_ROUTE]==>TestDirectExchange==>[[direct.te12312st]]
    confirm...correlationData[null]ack == [true]cause ==[null]
    
    可靠抵达-Ack消息确认机制
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    消费者获取到消息,成功处理,可以回复 Ack Broker
    • basic.ack用于肯定确认;broker将移除此消息
    • basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
    • basic.reject用于否定确认;同上,但不能批量
    默认自动 ack ,消息被消费者收到,就会从 broker queue 中移除
    queue 无消费者,消息依然会被存储,直到消费者消费
    消费者收到消息,默认会自动 ack 。但是如果无法确定此消息是否被处理完成, 或者成功处理。我们可以开启手动ack 模式
    • 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
    • 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
    • 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户
    • 端断开,消息不会被broker移除,会投递给别人
    1. import com.rabbitmq.client.Channel;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    6. import org.springframework.stereotype.Service;
    7. import java.io.IOException;
    8. import java.util.Map;
    9. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    10. import com.baomidou.mybatisplus.core.metadata.IPage;
    11. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
    12. import com.lihailin.common.utils.PageUtils;
    13. import com.lihailin.common.utils.Query;
    14. @RabbitListener(queues = {"hello-java-queue"})
    15. @Service("orderItemService")
    16. @Slf4j
    17. public class OrderItemServiceImpl extends ServiceImpl implements OrderItemService {
    18. @Override
    19. public PageUtils queryPage(Map params) {
    20. IPage page = this.page(
    21. new Query().getPage(params),
    22. new QueryWrapper()
    23. );
    24. return new PageUtils(page);
    25. }
    26. /**
    27. * queues :声明需要监听的所有队列
    28. * 参数可以写一下类型
    29. * 1、Message message: 原生信息详细信息。 头 + 体
    30. * 2、T<发送的消息的类型> OrderReturnReasonEntity content
    31. * 3、Channel channel:当前传输数据的通道
    32. *

    33. * Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
    34. * 场景:
    35. * 1)、订单服务启动多个;同一个消息,只能有一个客户端收到
    36. * 2)、只要一个消息完成处理完,方法运行结束,我们就可以接收收到下一个消息
    37. */
    38. @RabbitHandler
    39. public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException, IOException {
    40. //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1678017004436}
    41. //byte[] body = message.getBody();
    42. //消息头属性信息
    43. log.info("OrderReturnReasonEntity 接收到消息。。。:{}", content);
    44. //System.out.println("OrderReturnReasonEntity 接收到消息。。。:"+content);
    45. //Thread.sleep(3000);
    46. //byte[] body = message.getBody();
    47. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    48. log.info("deliveryTag =>{}:", deliveryTag);
    49. try {
    50. if (deliveryTag % 2 == 0){
    51. //验收货物,非配量模式
    52. channel.basicAck(deliveryTag, false);
    53. log.info(" 验收货物成功 deliveryTag =>{}:", deliveryTag);
    54. }else {
    55. //退货 requeue = false 丢弃 requeue= true 发回服务器,服务器重新入队
    56. channel.basicNack(deliveryTag,false,false);
    57. log.info("拒收了 货物");
    58. }
    59. } catch (Exception e) {
    60. //网络中断
    61. }
    62. }
    63. }

    测试结果:

    七、RabbitMQ延时队列(实现定时任务)

    场景:
    比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
    常用解决方案:
    spring schedule 定时任务轮询数据库
            缺点:
                    消耗系统内存、增加了数据库的压力、存在较大的时间误差
            解决:
                   rabbitmq的消息 TTL 和死信 Exchange 结合

    消息的TTLTime To Live

    • 消息的TTL就是消息的存活时间
    • RabbitMQ可以对队列消息分别设置TTL
      • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信
      • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。

    Dead Letter ExchangesDLX

    • 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
      • 一个消息被Consumer拒收了,并且reject方法的参数里requeuefalse。也就是说不会被再次放在队列里,被其他消费者使用。basic.reject/ basic.nackrequeue=false
      • 上面的消息的TTL到了,消息过期了。
      • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
    • Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
    • 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
    • 手动 ack& 异常消息统一放在一个队列处理建议的两种方式
      catch 异常后, 手动发送到指定队列 ,然后使用 channel rabbitmq 确认消息已消费
      Queue 绑定死信队列,使用 nack requque false )确认消息消费失败
    延时队列实现-1
    延时队列实现-2

     

    SpringBoot中使用延时队列 

    • 1QueueExchangeBinding可以@Bean进去
    • 2、监听消息的方法可以有三种参数(不分数量,顺序)
      •  Object content, Message message, Channel channel
    • 3channel可以用来拒绝消息,否则自动ack

     演示:创建queue、exchange、binding

    1. import org.springframework.amqp.core.*;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import java.util.HashMap;
    5. import java.util.Map;
    6. @Configuration
    7. public class MyMQConfig {
    8. //@Bean Binding,Queue,Exchange
    9. /**
    10. * 容器中的Binding,Queue,Exchange 都会自动创建(RabbitMQ 没有的情况)
    11. *

    12. * RabbitMQ 只要有。@Bean声明属性发生变化也不会覆盖
    13. */
    14. @Bean
    15. public Queue orderDelayQueue() {
    16. Map arguments = new HashMap<>();
    17. /**
    18. x-dead-letter-exchange: order-event-exchange
    19. x-dead-letter-routing-key: order.release.order
    20. x-message-ttl: 60000
    21. */
    22. arguments.put("x-dead-letter-exchange", "order-event-exchange");
    23. arguments.put("x-dead-letter-routing-key", "order.release.order");
    24. arguments.put("x-message-ttl", 60000);
    25. //(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) {
    26. Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
    27. return queue;
    28. }
    29. @Bean
    30. public Queue orderReleaseOrderQueue() {
    31. Queue queue = new Queue("order.release.order.queue", true, false, false);
    32. return queue;
    33. }
    34. @Bean
    35. public Exchange orderEventExchange() {
    36. //String name, boolean durable, boolean autoDelete, Map arguments
    37. return new TopicExchange("order-event-exchange", true, false);
    38. }
    39. @Bean
    40. public Binding orderCreateOrderBinding() {
    41. //String destination, DestinationType destinationType, String exchange, String routingKey, Map arguments
    42. return new Binding("order.delay.queue",
    43. Binding.DestinationType.QUEUE,
    44. "order-event-exchange",
    45. "order.create.order",
    46. null);
    47. }
    48. @Bean
    49. public Binding orderReleaseOrderBinding() {
    50. return new Binding("order.release.order.queue",
    51. Binding.DestinationType.QUEUE,
    52. "order-event-exchange",
    53. "order.release.order",
    54. null);
    55. }
    56. }

    发送消息 

    1. @ResponseBody
    2. @GetMapping(value = "/test/createOrder")
    3. public String createOrderTest() {
    4. //订单下单成功
    5. OrderEntity orderEntity = new OrderEntity();
    6. orderEntity.setOrderSn(UUID.randomUUID().toString());
    7. orderEntity.setModifyTime(new Date());
    8. //给MQ发送消息
    9. rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
    10. return "ok";
    11. }

    接收消息

    1. package com.lihailin.gulimall.order.listener;
    2. import com.lihailin.gulimall.order.entity.OrderEntity;
    3. import com.lihailin.gulimall.order.service.OrderService;
    4. import com.rabbitmq.client.Channel;
    5. import org.springframework.amqp.core.Message;
    6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.stereotype.Service;
    10. import java.io.IOException;
    11. @RabbitListener(queues = "order.release.order.queue")
    12. @Service
    13. public class OrderCloseListener {
    14. @Autowired
    15. OrderService orderService;
    16. @RabbitHandler
    17. public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
    18. System.out.println("收到过期的订单信息:准备关闭订单" + entity.getOrderSn());
    19. try {
    20. orderService.closeOrder(entity);
    21. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    22. }catch (Exception e){
    23. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    24. }
    25. }
    26. }

     

    如何保证消息可靠性-消息丢失

    • 1、消息丢失
      • 消息发送出去,由于网络问题没有抵达服务器
        • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
        • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
        • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发
      • 消息抵达BrokerBroker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
        • publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
      • 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
        • 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队
    • 2、消息重复
      • 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
      • 消息消费失败,由于重试机制,自动又将消息发送出去
      • 成功消费,ack时宕机,消息由unack变为readyBroker又重新发送
        • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
        • 使用防重表redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
        • rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的
    • 3、消息积压
      • 消费者宕机积压
      • 消费者消费能力不足积压
      • 发送者发送流量太大
        • 上线更多的消费者,进行正常消费
        • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
  • 相关阅读:
    软件设计模式系列之八——适配器模式
    Mybatis的分页和动态sql
    Flutter 《入门到成仙 》第二章 环境安装
    “面向大厂编程”一线互联网公司面试究竟问什么?打入内部针对性学习!
    SQL入门教程
    【leetcode】有效的回文
    pytest
    对垃圾回收的简单了解
    SpringBoot入门
    【华为OD机试真题 JS】找车位
  • 原文地址:https://blog.csdn.net/weixin_42383680/article/details/133585692