SpringBoot 3.1.0
JDK 17
安装MQ: liunx+docker+rabbitmq安装延迟队列插件
实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
在 application.properties 配置文件中添加 RabbitMQ 的连接信息:
- spring.rabbitmq.host=127.0.0.1
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=test
- spring.rabbitmq.password=test
- spring.rabbitmq.virtual-host=/
- # 手动应答
- #spring.rabbitmq.listener.simple.acknowledge-mode=manual
- #每次从队列中取一个,轮询分发,默认是公平分发
- spring.rabbitmq.listener.simple.prefetch=1
- # 开启重试
- spring.rabbitmq.listener.simple.retry.enabled=true
- # 重试次数
- spring.rabbitmq.listener.simple.retry.max-attempts=5
- @Configuration
- public class RabbitMQOrderConfig {
-
- /**
- * 订单交换机
- */
- public static final String ORDER_EXCHANGE = "order_exchange";
- /**
- * 订单队列
- */
- public static final String ORDER_QUEUE = "order_queue";
- /**
- * 订单路由key
- */
- public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";
-
- /**
- * 死信交换机
- */
- public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
- /**
- * 死信队列 routingKey
- */
- public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";
-
- /**
- * 死信队列
- */
- public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";
-
- /**
- * 延迟时间 (单位:ms(毫秒))
- */
- public static final Integer DELAY_TIME = 10000;
-
-
- /**
- * 创建死信交换机
- */
- @Bean("orderDeadLetterExchange")
- public Exchange orderDeadLetterExchange() {
- return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
- }
-
- /**
- * 创建死信队列
- */
- @Bean("orderDeadLetterQueue")
- public Queue orderDeadLetterQueue() {
- return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
- }
-
- /**
- * 绑定死信交换机和死信队列
- */
- @Bean("orderDeadLetterBinding")
- public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
- }
-
-
- /**
- * 创建订单交换机
- */
- @Bean("orderExchange")
- public Exchange orderExchange() {
- return new TopicExchange(ORDER_EXCHANGE, true, false);
- }
-
- /**
- * 创建订单队列
- */
- @Bean("orderQueue")
- public Queue orderQueue() {
- Map<String, Object> args = new HashMap<>(3);
- //消息过期后,进入到死信交换机
- args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);
-
- //消息过期后,进入到死信交换机的路由key
- args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);
-
- //过期时间,单位毫秒
- args.put("x-message-ttl", DELAY_TIME);
-
- return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
- }
-
- /**
- * 绑定订单交换机和队列
- */
- @Bean("orderBinding")
- public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
- }
- }
定义一个消息体类,用来存储需要发送的消息:
- @Slf4j
- @Data
- @Builder
- public class OrderMessage implements Serializable {
-
- /**
- * 商户订单号
- */
- private String orderId;
-
- /**
- * 支付宝订单号
- */
- private String tradeNo;
- }
定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:
- @Slf4j
- @Component
- public class MessageSender {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void sendOrderMessage(OrderMessage message) {
- //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
- rabbitTemplate.setMandatory(true);
- //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
- rabbitTemplate.setReturnsCallback(returned -> {
- int code = returned.getReplyCode();
- System.out.println("code=" + code);
- System.out.println("returned=" + returned);
- });
- rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);
-
- log.info("===============延时队列生产消息====================");
- log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);
- }
- }
定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:
- @Component
- @Slf4j
- @RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
- public class OrderMQListener {
-
- @RabbitHandler
- public void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
- log.info("收到消息:{}",new Date());
- log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());
- log.info("message:{}", message);
- log.info("content:{}", orderMessage);
- }
- }
这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。
- @Slf4j
- @Api(tags = "延迟消息接口")
- @RestController
- @RequestMapping("/rabbitmq_order_delay_message")
- public class RabbitMQDelayMessageController {
-
- @Autowired
- private MessageSender sender;
-
- /**
- * 发送消息
- * @return
- */
- @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
- @ResponseBody
- public void sendMsg() {
- OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();
- sender.sendOrderMessage(orderMessage);
- }
- }
启动项目,请求运行结果:

总的xml:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.xiaoleilu</groupId>
- <artifactId>hutool-all</artifactId>
- <version>3.0.7</version>
- </dependency>
-
- <dependency>
- <groupId>io.swagger</groupId>
- <artifactId>swagger-annotations</artifactId>
- <version>${swagger-annotations.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.73</version>
- <scope>compile</scope>
- </dependency>
需要创建一个交换机

请检查配置 application.xml配置的rabbimq不生效,可以将配置放到application.properties
3.Channel shutdown: channel error; protocol method: #method
这种情况:
1.消费者内部重复签收导致签收异常
解决方案:增加配置手动处理应答
1.配置新增
spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收
2.代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- public void consumer(String body, Message message, Channel channel) throws IOException {
- long msgTag = message.getMessageProperties().getDeliveryTag();
- try {
- System.out.println("收到消息:" + new Date());
- System.out.println("msgTag=" + msgTag);
- System.out.println("message=" + message);
- System.out.println("body=" + body);
- channel.basicAck(msgTag, false);
- }catch (Exception e) {
- log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);
- } finally {
- // 处理完之后手动签收(这里再次签收)
- channel.basicAck(msgTag, false);
- }
- }
2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)
解决方案:去除channel.basicAck(msgTag, false)
消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

