• SpringBoot整合RabbitMQ实现消息延迟队列


    环境依赖

    SpringBoot 3.1.0

    JDK 17

    前期准备

    安装MQ:  liunx+docker+rabbitmq安装延迟队列插件

    实例

    实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:

    1.添加 RabbitMQ 的 Maven 依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>

    2.配置 RabbitMQ

    在 application.properties 配置文件中添加 RabbitMQ 的连接信息:

    1. spring.rabbitmq.host=127.0.0.1
    2. spring.rabbitmq.port=5672
    3. spring.rabbitmq.username=test
    4. spring.rabbitmq.password=test
    5. spring.rabbitmq.virtual-host=/
    6. # 手动应答
    7. #spring.rabbitmq.listener.simple.acknowledge-mode=manual
    8. #每次从队列中取一个,轮询分发,默认是公平分发
    9. spring.rabbitmq.listener.simple.prefetch=1
    10. # 开启重试
    11. spring.rabbitmq.listener.simple.retry.enabled=true
    12. # 重试次数
    13. spring.rabbitmq.listener.simple.retry.max-attempts=5

    3.配置文件

    1. @Configuration
    2. public class RabbitMQOrderConfig {
    3. /**
    4. * 订单交换机
    5. */
    6. public static final String ORDER_EXCHANGE = "order_exchange";
    7. /**
    8. * 订单队列
    9. */
    10. public static final String ORDER_QUEUE = "order_queue";
    11. /**
    12. * 订单路由key
    13. */
    14. public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";
    15. /**
    16. * 死信交换机
    17. */
    18. public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
    19. /**
    20. * 死信队列 routingKey
    21. */
    22. public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";
    23. /**
    24. * 死信队列
    25. */
    26. public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";
    27. /**
    28. * 延迟时间 (单位:ms(毫秒))
    29. */
    30. public static final Integer DELAY_TIME = 10000;
    31. /**
    32. * 创建死信交换机
    33. */
    34. @Bean("orderDeadLetterExchange")
    35. public Exchange orderDeadLetterExchange() {
    36. return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
    37. }
    38. /**
    39. * 创建死信队列
    40. */
    41. @Bean("orderDeadLetterQueue")
    42. public Queue orderDeadLetterQueue() {
    43. return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
    44. }
    45. /**
    46. * 绑定死信交换机和死信队列
    47. */
    48. @Bean("orderDeadLetterBinding")
    49. public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
    50. return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
    51. }
    52. /**
    53. * 创建订单交换机
    54. */
    55. @Bean("orderExchange")
    56. public Exchange orderExchange() {
    57. return new TopicExchange(ORDER_EXCHANGE, true, false);
    58. }
    59. /**
    60. * 创建订单队列
    61. */
    62. @Bean("orderQueue")
    63. public Queue orderQueue() {
    64. Map<String, Object> args = new HashMap<>(3);
    65. //消息过期后,进入到死信交换机
    66. args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);
    67. //消息过期后,进入到死信交换机的路由key
    68. args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);
    69. //过期时间,单位毫秒
    70. args.put("x-message-ttl", DELAY_TIME);
    71. return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
    72. }
    73. /**
    74. * 绑定订单交换机和队列
    75. */
    76. @Bean("orderBinding")
    77. public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
    78. return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
    79. }
    80. }

    4.定义消息实体类

    定义一个消息体类,用来存储需要发送的消息:

    1. @Slf4j
    2. @Data
    3. @Builder
    4. public class OrderMessage implements Serializable {
    5. /**
    6. * 商户订单号
    7. */
    8. private String orderId;
    9. /**
    10. * 支付宝订单号
    11. */
    12. private String tradeNo;
    13. }

    5.定义消息发送者

    定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:

    1. @Slf4j
    2. @Component
    3. public class MessageSender {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. public void sendOrderMessage(OrderMessage message) {
    7. //true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
    8. rabbitTemplate.setMandatory(true);
    9. //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    10. rabbitTemplate.setReturnsCallback(returned -> {
    11. int code = returned.getReplyCode();
    12. System.out.println("code=" + code);
    13. System.out.println("returned=" + returned);
    14. });
    15. rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);
    16. log.info("===============延时队列生产消息====================");
    17. log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);
    18. }
    19. }

    6.定义消息消费者

    定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:

    1. @Component
    2. @Slf4j
    3. @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
    4. public class OrderMQListener {
    5. @RabbitHandler
    6. public void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
    7. log.info("收到消息:{}",new Date());
    8. log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());
    9. log.info("message:{}", message);
    10. log.info("content:{}", orderMessage);
    11. }
    12. }

    这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。

    7.定义一个controller

    1. @Slf4j
    2. @Api(tags = "延迟消息接口")
    3. @RestController
    4. @RequestMapping("/rabbitmq_order_delay_message")
    5. public class RabbitMQDelayMessageController {
    6. @Autowired
    7. private MessageSender sender;
    8. /**
    9. * 发送消息
    10. * @return
    11. */
    12. @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
    13. @ResponseBody
    14. public void sendMsg() {
    15. OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();
    16. sender.sendOrderMessage(orderMessage);
    17. }
    18. }

    启动项目,请求运行结果:

    总的xml:

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter</artifactId>
    4. </dependency>
    5. <dependency>
    6. <groupId>com.xiaoleilu</groupId>
    7. <artifactId>hutool-all</artifactId>
    8. <version>3.0.7</version>
    9. </dependency>
    10. <dependency>
    11. <groupId>io.swagger</groupId>
    12. <artifactId>swagger-annotations</artifactId>
    13. <version>${swagger-annotations.version}</version>
    14. </dependency>
    15. <dependency>
    16. <groupId>org.springframework.boot</groupId>
    17. <artifactId>spring-boot-starter-test</artifactId>
    18. <scope>test</scope>
    19. </dependency>
    20. <dependency>
    21. <groupId>org.springframework.boot</groupId>
    22. <artifactId>spring-boot-starter-amqp</artifactId>
    23. </dependency>
    24. <dependency>
    25. <groupId>org.projectlombok</groupId>
    26. <artifactId>lombok</artifactId>
    27. </dependency>
    28. <dependency>
    29. <groupId>org.springframework.boot</groupId>
    30. <artifactId>spring-boot-starter-web</artifactId>
    31. </dependency>
    32. <dependency>
    33. <groupId>com.alibaba</groupId>
    34. <artifactId>fastjson</artifactId>
    35. <version>1.2.73</version>
    36. <scope>compile</scope>
    37. </dependency>

    问题总结

    1.Invalid argument, ‘x-delayed-type’ must be an existing exchange type

    需要创建一个交换机

    2.Connection refused: no further information

    请检查配置 application.xml配置的rabbimq不生效,可以将配置放到application.properties

    3.Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    这种情况:

    1.消费者内部重复签收导致签收异常

    ​    解决方案:增加配置手动处理应答

            1.配置新增

    spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收

            2.代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    1. public void consumer(String body, Message message, Channel channel) throws IOException {
    2. long msgTag = message.getMessageProperties().getDeliveryTag();
    3. try {
    4. System.out.println("收到消息:" + new Date());
    5. System.out.println("msgTag=" + msgTag);
    6. System.out.println("message=" + message);
    7. System.out.println("body=" + body);
    8. channel.basicAck(msgTag, false);
    9. }catch (Exception e) {
    10. log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);
    11. } finally {
    12. // 处理完之后手动签收(这里再次签收)
    13. channel.basicAck(msgTag, false);
    14. }
    15. }

    2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)

    ​ 解决方案:去除channel.basicAck(msgTag, false)

    4.Failed to convert message

    消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

  • 相关阅读:
    无敌python复制粘贴——大某点评数据面试外包
    工作小计-GPU硬编以及依赖库 nvcuvid&nvidia-encode
    Puma560机器人运动学正逆解
    简单了解一下pinia的结构
    ARM Day2
    软件评测师之RISC/CISC
    HOOPS Visualize 2023 SP2 U1 Crack-HOOPS Visualize
    为什么OpenCV计算的帧率是错误的?
    java word转pdf,word模板
    苹果 M1带起ARM,英特尔 x86 霸主地位遭威胁
  • 原文地址:https://blog.csdn.net/2401_82767224/article/details/139489037