• 使用RabbitMq实现延时队列


    原理:rabbitmq的消息TTL和死信Exchange

    消息的TTL就是消息的存活时间。

    RabbitMQ可以对队列和消息分别设置TTL。

    对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

    如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的

    TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。

    一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。

    一个消息被Consumer拒收了,并且reject方法的参数里requeuefalse。也就是说不会被再次放在队列里,被其他消费者使用basic.reject/ basic.nackrequeue=false。

    上面的消息的TTL到了,消息过期了。

    队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

    Dead Letter Exchange其实就是一种普通的exchange和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

    既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。

    案例:

    1. import org.springframework.amqp.core.Binding;
    2. import org.springframework.amqp.core.Exchange;
    3. import org.springframework.amqp.core.Queue;
    4. import org.springframework.amqp.core.TopicExchange;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. import java.util.HashMap;
    8. @Configuration
    9. public class MyMQConfig {
    10. //死信队列
    11. @Bean
    12. public Queue orderDelayQueue(){
    13. HashMap arguments = new HashMap<>();
    14. /**
    15. * x-dead-letter-exchange: order-event-exchange
    16. * x-dead-letter-routing-key: order.release.order
    17. * x-message-ttl: 60000
    18. */
    19. arguments.put("x-dead-letter-exchange", "order-event-exchange");
    20. arguments.put("x-dead-letter-routing-key", "order.release.order");
    21. arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
    22. /**
    23. * String name, 队列名字
    24. * boolean durable, 是否持久化
    25. * boolean exclusive, 是否排他
    26. * boolean autoDelete, 是否自动删除
    27. * Map arguments) 属性
    28. */
    29. //String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    30. return new Queue("order.delay.queue", true, false, false,arguments);
    31. }
    32. //普通队列
    33. @Bean
    34. public Queue orderReleaseOrderQueue(){
    35. return new Queue("order.release.order.queue", true, false, false);
    36. }
    37. @Bean
    38. public Exchange orderEventExchange(){
    39. //String name, boolean durable, boolean autoDelete
    40. return new TopicExchange("order-event-exchange", true, false);
    41. }
    42. @Bean
    43. public Binding orderCreateOrderBinding(){
    44. //String destination, Binding.DestinationType destinationType,
    45. //String exchange, String routingKey, Map arguments
    46. return new Binding("order.delay.queue",
    47. Binding.DestinationType.QUEUE,
    48. "order-event-exchange",
    49. "order.create.order",
    50. null);
    51. }
    52. @Bean
    53. public Binding orderReleaseOrderBinding(){
    54. return new Binding("order.release.order.queue",
    55. Binding.DestinationType.QUEUE,
    56. "order-event-exchange",
    57. "order.release.order",
    58. null);
    59. }
    60. }

     使用注入bean的方式能自动生成没有的队列相关内容(注意,一旦生成需要手动删除才能重新生成,无法覆盖),启动项目后进入rabbitmq配置页可以看到:

     

     

     监听队列:

    1. @RabbitListener(queues = "order.release.order.queue")
    2. public void listener(OrderEntity orderEntity, Channel channel, Message message){
    3. System.out.println("收到过期的订单,准备释放订单"+orderEntity.getOrderSn());
    4. try {
    5. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    6. } catch (IOException e) {
    7. e.printStackTrace();
    8. }
    9. }

    模拟生成订单:

    1. @Autowired
    2. private RabbitTemplate rabbitTemplate;
    3. @ResponseBody
    4. @GetMapping(value = "/test/createOrder")
    5. public String createOrderTest() {
    6. //订单下单成功
    7. OrderEntity orderEntity = new OrderEntity();
    8. orderEntity.setOrderSn(UUID.randomUUID().toString());
    9. orderEntity.setModifyTime(new Date());
    10. //给MQ发送消息
    11. rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
    12. return "ok";
    13. }

    再看队列:

     等待一分钟看控制台:

     再返回查看队列,发现消息已经不见了

     

  • 相关阅读:
    【Java牛客刷题】入门篇(01)
    C. Make Good
    阿里云天池大赛赛题(机器学习)——天猫用户重复购买预测(完整代码)
    使用.NET7和C#11打造最快的序列化程序-以MemoryPack为例
    延时 启动exe程序
    Android studio的Bumblebee版本Gradle7.2降到Gradle4.2.0详细步骤记录(项目已上传)
    实验一 将调试集成到vscode
    NSS [HNCTF 2022 WEEK2]ez_SSTI
    upload-labs/Pass-07 未知后缀名解析漏洞复现
    关于solidity解析abi方法,入参和结果字节码
  • 原文地址:https://blog.csdn.net/xushuai2333333/article/details/127412628