• RabbitMQ延时队列


    延时队列场景

    比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。

    常用解决方案

    spring的schedule定时任务轮询数据库

    缺点:消耗系统内存,增加了数据库的压力、存在较大时间误差

    解决:rabbitmq的消息TTL和死信Exchange结合

    消息的TTL(Time To Live)

    消息的TTL就是消息的存活时间

    RabbitMQ可以对队列和消息分别设置TTL

    延时队列实现-1 

    延时队列实现-2 

     使用延时队列-1,给整个消息队列设置过期时间 

    RabbitMQ延时队列实现-1

    1、声明一个队列,并且指定TTL

    2、要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性

    3、声明交换机,将ttl与交换机绑定

    注意,这个队列设定了死信交换机为dl.ttl.direct 

    1. package com.hdb.pingmoweb.order.mq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.*;
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    5. import org.springframework.beans.factory.annotation.Autowired;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import javax.annotation.PostConstruct;
    9. @Slf4j
    10. @Configuration
    11. public class MyRabbitConfig {
    12. @Autowired
    13. RabbitTemplate rabbitTemplate;
    14. @PostConstruct
    15. public void initRabbitTemplate(){
    16. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    17. // 投递失败,记录日志
    18. log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
    19. replyCode, replyText, exchange, routingKey, message.toString());
    20. // 如果有业务需要,可以重发消息
    21. });
    22. }
    23. @Bean
    24. public Queue ttlQueue(){
    25. return QueueBuilder.durable("ttl.queue")
    26. .withArgument("x-message-ttl",60000)
    27. .withArgument("x-dead-letter-exchange","dl.ttl.direct")
    28. // 指定队列名称,并持久化
    29. //.ttl(10000) // 设置队列的超时时间,10秒
    30. //.deadLetterExchange("dl.ttl.direct") // 指定死信交换机
    31. .build();
    32. }
    33. @Bean
    34. public DirectExchange ttlExchange(){
    35. return new DirectExchange("ttl.direct");
    36. }
    37. @Bean
    38. public Binding ttlBinding(){
    39. return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    40. }
    41. }

    4、接收超时死信的死信交换机

    在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列

    1. package com.hdb.pingmoweb.order.mq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.Exchange;
    4. import org.springframework.amqp.rabbit.annotation.Queue;
    5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
    6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    7. import org.springframework.stereotype.Component;
    8. @Slf4j
    9. @Component
    10. public class SpringRabbitListener {
    11. @RabbitListener(bindings = @QueueBinding(
    12. value = @Queue(name = "dl.ttl.queue", durable = "true"),
    13. exchange = @Exchange(name = "dl.ttl.direct"),
    14. key = "ttl"
    15. ))
    16. public void listenDlQueue(String msg) {
    17. log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
    18. }
    19. }

    5、发送消息,但是不要指定TTL

    1. @Autowired
    2. RabbitTemplate rabbitTemplate;
    3. @RequestMapping("/ttl/{msg}")
    4. public R testTTLMsg(@PathVariable String msg){
    5. // 消息ID,需要封装到CorrelationData中
    6. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    7. // 发送消息
    8. rabbitTemplate.convertAndSend("ttl.direct", "ttl", msg, correlationData);
    9. // 记录日志
    10. log.debug("发送消息成功");
    11. return R.ok();
    12. }

  • 相关阅读:
    i++ 和 ++i的真正区别
    Qt地铁智慧换乘系统浅学(四 )实现添加线路,添加站点,添加边 并且存储到本地txt文件
    前端与HTML
    js的流程控制
    一些流程图(自用)
    PIES源码,大型体检中心源码,医院智慧体检系统源码
    软件测试项目实战(包含电商、银行、app等)
    7-用户输入和while循环
    RNN基本原理及代码实战
    2022-11-07 Excel的函数使用
  • 原文地址:https://blog.csdn.net/qq_29385297/article/details/127587479