• RabbitMQ-死信队列、延迟队列(原生+springboot+插件实现)


    目录

    一、死信队列

    1.1 概念

    1.2 来源

    1.3 演示

    二、延迟队列

    2.1 TTL-消息最大存活时间

    2.2 在SpringBoot中演示延迟队列与死信队列

    2.2.1 基本演示

    2.2.2 优化-动态设置TTL

    2.2.3 使用插件实现延迟队列

    2.3 总结


    一、死信队列

    1.1 概念

           死信顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

            应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如用户在商城下单成功并点击去支付后在指定时间未支付自动失效

    1.2 来源

            1.消息 TTL 过期

            2.队列达到最大长度(队列满了,无法再添加数据到 mq 中)

            3.消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

    1.3 演示

    (1)消息TTL过期

            Producer

    1. class Producer implements Callable {
    2. private static final String NORMAL_EXCHANGE = "normal_exchange";
    3. @Override
    4. public Object call() throws Exception {
    5. Channel channel = rabbitMQUtils.getChannel();
    6. //声明交换机
    7. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    8. //设置消息的 TTL 时间为10s
    9. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    10. //该信息是用作演示队列个数限制
    11. for (int i = 1; i <11 ; i++) {
    12. String message="info"+i;
    13. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
    14. message.getBytes());
    15. System.out.println("生产者发送消息:"+message);
    16. }
    17. return null;
    18. }
    19. }

            C1:

    1. class C1 implements Callable {
    2. //普通交换机名称
    3. private static final String NORMAL_EXCHANGE = "normal_exchange";
    4. //死信交换机名称
    5. private static final String DEAD_EXCHANGE = "dead_exchange";
    6. @Override
    7. public Object call() throws Exception {
    8. Channel channel = rabbitMQUtils.getChannel();
    9. //声明死信和普通交换机 类型为 direct
    10. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    11. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    12. //声明死信队列
    13. channel.queueDeclare("dead-queue", false, false, false, null);
    14. //死信队列绑定死信交换机与 routingkey
    15. channel.queueBind("dead-queue", DEAD_EXCHANGE, "lisi");
    16. //正常队列绑定死信队列信息
    17. Map params = new HashMap<>();
    18. //正常队列设置死信交换机 参数 key 是固定值
    19. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    20. //正常队列设置死信 routing-key 参数 key 是固定值
    21. params.put("x-dead-letter-routing-key", "lisi");
    22. //正常队列绑定正常交换机
    23. channel.queueDeclare("normal-queue", false, false, false, params);
    24. channel.queueBind("normal-queue", NORMAL_EXCHANGE, "zhangsan");
    25. System.out.println("等待接收消息.....");
    26. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    27. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    28. System.out.println("Consumer01 接收到消息"+message);
    29. };
    30. Thread.sleep(1000000);
    31. channel.basicConsume("normal-queue", true, deliverCallback, consumerTag -> {
    32. });
    33. return null;
    34. }
    35. }

             C2:

    1. class C2 implements Callable{
    2. private static final String DEAD_EXCHANGE = "dead_exchange";
    3. @Override
    4. public Object call() throws Exception {
    5. Channel channel = rabbitMQUtils.getChannel();
    6. //声明死信交换机
    7. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    8. //声明死信队列
    9. channel.queueDeclare("dead-queue", false, false, false, null);
    10. //绑定死信交换机
    11. channel.queueBind("dead-queue", DEAD_EXCHANGE, "lisi");
    12. System.out.println("等待接收死信队列消息.....");
    13. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    14. String message = new String(delivery.getBody(), "UTF-8");
    15. System.out.println("Consumer02 接收死信队列的消息" + message);
    16. };
    17. channel.basicConsume("dead-queue", true, deliverCallback, consumerTag -> {
    18. });
    19. return null;
    20. }
    21. }

            测试:

    1. public static void main(String[] args) throws InterruptedException {
    2. ExecutorService service= Executors.newFixedThreadPool(10);
    3. service.submit(new C1());
    4. service.submit(new C2() {
    5. });
    6. Thread.sleep(1000);
    7. service.submit(new Producer());
    8. }
    9. //结果:
    10. 等待接收死信队列消息.....
    11. 等待接收消息.....
    12. 生产者发送消息:info1
    13. 生产者发送消息:info2
    14. 生产者发送消息:info3
    15. 生产者发送消息:info4
    16. 生产者发送消息:info5
    17. 生产者发送消息:info6
    18. 生产者发送消息:info7
    19. 生产者发送消息:info8
    20. 生产者发送消息:info9
    21. 生产者发送消息:info10
    22. Consumer02 接收死信队列的消息info1
    23. Consumer02 接收死信队列的消息info2
    24. Consumer02 接收死信队列的消息info3
    25. Consumer02 接收死信队列的消息info4
    26. Consumer02 接收死信队列的消息info5
    27. Consumer02 接收死信队列的消息info6
    28. Consumer02 接收死信队列的消息info7
    29. Consumer02 接收死信队列的消息info8
    30. Consumer02 接收死信队列的消息info9
    31. Consumer02 接收死信队列的消息info10

            解读Producer给C1发送的消息生存时间是10s,而C1的代码中我让这个线程延时100s,相当于把TTL耗完了。这些过期的消息都经过死信交换机处理,最终送到了C2手中。 

    (2)队列达到最大长度

            需要修改两处:

                    1.Producer中删除TTL的配置,修改后如下

    1. class Producer implements Callable {
    2. private static final String NORMAL_EXCHANGE = "normal_exchange";
    3. @Override
    4. public Object call() throws Exception {
    5. Channel channel = rabbitMQUtils.getChannel();
    6. //声明交换机
    7. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    8. //该信息是用作演示队列个数限制
    9. for (int i = 1; i <11 ; i++) {
    10. String message="info"+i;
    11. channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
    12. System.out.println("生产者发送消息:"+message);
    13. }
    14. return null;
    15. }
    16. }

            2.C1中添加新的参数

    1. //设置队列长度限制为6
    2. params.put("x-max-length",6);

             其他代码不变,删掉之前的Queue,再次测试:

    1. //结果:
    2. 等待接收死信队列消息.....
    3. 等待接收消息.....
    4. 生产者发送消息:info1
    5. 生产者发送消息:info2
    6. 生产者发送消息:info3
    7. 生产者发送消息:info4
    8. 生产者发送消息:info5
    9. 生产者发送消息:info6
    10. 生产者发送消息:info7
    11. 生产者发送消息:info8
    12. 生产者发送消息:info9
    13. 生产者发送消息:info10
    14. Consumer02 接收死信队列的消息info1
    15. Consumer02 接收死信队列的消息info2
    16. Consumer02 接收死信队列的消息info3
    17. Consumer02 接收死信队列的消息info4

            可见,C1队列满之后,多余的消息都被C2接收了 

    (3)消息被拒绝

            Producer代码不变,只修改C1的代码,将某条消息拒绝掉:

    1. class Consumer01 implements Callable {
    2. //普通交换机名称
    3. private static final String NORMAL_EXCHANGE = "normal_exchange";
    4. //死信交换机名称
    5. private static final String DEAD_EXCHANGE = "dead_exchange";
    6. @Override
    7. public Object call() throws Exception {
    8. Channel channel = rabbitMQUtils.getChannel();
    9. //声明死信和普通交换机 类型为 direct
    10. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    11. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    12. //声明死信队列
    13. channel.queueDeclare("dead-queue", false, false, false, null);
    14. //死信队列绑定死信交换机与 routingkey
    15. channel.queueBind("dead-queue", DEAD_EXCHANGE, "lisi");
    16. //正常队列绑定死信队列信息
    17. Map params = new HashMap<>();
    18. //正常队列设置死信交换机 参数 key 是固定值
    19. params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    20. //正常队列设置死信 routing-key 参数 key 是固定值
    21. params.put("x-dead-letter-routing-key", "lisi");
    22. //设置队列长度限制为6
    23. params.put("x-max-length",6);
    24. //正常队列绑定正常交换机
    25. channel.queueDeclare("normal-queue", false, false, false, params);
    26. channel.queueBind("normal-queue", NORMAL_EXCHANGE, "zhangsan");
    27. System.out.println("等待接收消息.....");
    28. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    29. if(message.equals("info5")){
    30. System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
    31. //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
    32. channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    33. }else {
    34. System.out.println("Consumer01 接收到消息"+message);
    35. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    36. }
    37. };
    38. Thread.sleep(1000000);
    39. channel.basicConsume("normal-queue", true, deliverCallback, consumerTag -> {
    40. });
    41. return null;
    42. }
    43. }

            :因为C1的线程我不知道怎么关闭,所以没办法完全演示C1挂掉的情况,流程其实就是info5被C1拒绝,且C1挂了,那么info5将被死信交换机处理,最终送到C2手中。

            结果:

    1. 等待接收死信队列消息.....
    2. 等待接收消息.....
    3. 生产者发送消息:info1
    4. 生产者发送消息:info2
    5. 生产者发送消息:info3
    6. 生产者发送消息:info4
    7. 生产者发送消息:info5
    8. 生产者发送消息:info6
    9. 生产者发送消息:info7
    10. 生产者发送消息:info8
    11. 生产者发送消息:info9
    12. 生产者发送消息:info10
    13. Consumer01 接收到消息info1
    14. Consumer01 接收到消息info2
    15. Consumer01 接收到消息info3
    16. Consumer01 接收到消息info4
    17. Consumer01 接收到消息info5并拒绝签收该消息
    18. Consumer01 接收到消息info6
    19. Consumer01 接收到消息info7
    20. Consumer01 接收到消息info8
    21. Consumer01 接收到消息info9
    22. Consumer01 接收到消息info10
    23. Consumer02 接收到死信队列的消息info5

    二、延迟队列

            延时队列的内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

            适用场景:

            1.订单在十分钟之内未支付则自动取消

            2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

            3.用户注册成功后,如果三天内没有登陆则进行短信提醒。

            4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。

            5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

            对于这些场景,我们其实可以采用轮询的方式定时查询。但如果数据量、并发量巨大,对于数据库来说将有很大的压力,导致性能低下。 

    2.1 TTL-消息最大存活时间

            TTL 是 RabbitMQ 中一个消息或者队列属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的值将会被使用。

            有两种方式设置 TTL:

            1.消息设置

    1. ... ...
    2. channel.basicPublish(exchangeName,routingKey,mandatory,
    3. new AMQP.BasicProperties().builder().expiration("6000").build(),msg.getBytes());
    4. ... ...

            2.队列设置

    1. ... ...
    2. Map args = new HashMap();
    3. args.put("x-message-ttl",6000);//延时6秒
    4. channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
    5. ... ...

            两种方式的对比:

            对于队列设置,消息一旦过期会被立刻抹掉,因为队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可。

            对于消息设置,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,但RabbitMQ不会这样做。

    2.2 在SpringBoot中演示延迟队列与死信队列

    2.2.1 基本演示

            1.引入依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>
    5. <dependency>
    6. <groupId>org.springframework.bootgroupId>
    7. <artifactId>spring-boot-starter-webartifactId>
    8. dependency>
    9. <dependency>
    10. <groupId>org.springframework.bootgroupId>
    11. <artifactId>spring-boot-starter-testartifactId>
    12. <scope>testscope>
    13. dependency>
    14. <dependency>
    15. <groupId>org.projectlombokgroupId>
    16. <artifactId>lombokartifactId>
    17. dependency>
    18. <dependency>
    19. <groupId>io.springfoxgroupId>
    20. <artifactId>springfox-swagger2artifactId>
    21. <version>2.9.2version>
    22. dependency>
    23. <dependency>
    24. <groupId>io.springfoxgroupId>
    25. <artifactId>springfox-swagger-uiartifactId>
    26. <version>2.9.2version>
    27. dependency>
    28. <dependency>
    29. <groupId>org.springframework.amqpgroupId>
    30. <artifactId>spring-rabbit-testartifactId>
    31. <scope>testscope>
    32. dependency>

            2.配置application.yml

    1. spring:
    2. rabbitmq:
    3. host: 192.168.80.128
    4. port: 5672
    5. username: admin
    6. password: 123

            如图,创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD 及绑定关系.

             3.配置类

    1. @Configuration
    2. public class TTLQueueConfig {
    3. public static final String QUEUE_A = "QA";
    4. public static final String QUEUE_B = "QB";
    5. public static final String DEAD_LETTER_QUEUE = "QD";
    6. public static final String X_EXCHANGE = "X";
    7. public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    8. //声明交换机X
    9. @Bean("xExchange")
    10. public DirectExchange xExchange(){
    11. return new DirectExchange(X_EXCHANGE);
    12. }
    13. //声明交换机Y
    14. @Bean("yExchange")
    15. public DirectExchange yExchange(){
    16. return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    17. }
    18. //声明队列A并绑定到交换机Y
    19. @Bean("QA")
    20. public Queue QA(){
    21. Map args=new HashMap<>();
    22. args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明A绑定的死信交换机
    23. args.put("x-dead-letter-routing-key","YD");//声明队列的死信路由key
    24. args.put("x-message-ttl",10000);//声明TTL-10s
    25. return QueueBuilder.durable(QUEUE_A).withArguments(args).build();//声明队列A
    26. }
    27. //A绑定到交换机X
    28. @Bean
    29. public Binding ABindX(@Qualifier("QA") Queue QA,@Qualifier("xExchange") DirectExchange xExchange){
    30. return BindingBuilder.bind(QA).to(xExchange).with("XA");
    31. }
    32. //声明队列B并绑定到交换机Y
    33. @Bean("QB")
    34. public Queue QB(){
    35. Map args=new HashMap<>();
    36. args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明B绑定的死信交换机
    37. args.put("x-dead-letter-routing-key","YD");//声明队列的死信路由key
    38. args.put("x-message-ttl",40000);//声明TTL-40s
    39. return QueueBuilder.durable(QUEUE_B).withArguments(args).build();//声明队列B
    40. }
    41. //B绑定到交换机X
    42. @Bean
    43. public Binding BBindX(@Qualifier("QB") Queue QB,@Qualifier("xExchange") DirectExchange xExchange){
    44. return BindingBuilder.bind(QB).to(xExchange).with("XB");
    45. }
    46. //声明死信队列D
    47. @Bean("QD")
    48. public Queue QD(){
    49. return new Queue(DEAD_LETTER_QUEUE);
    50. }
    51. //D绑定到死信交换机Y
    52. @Bean
    53. public Binding DBindY(@Qualifier("QD") Queue QD,
    54. @Qualifier("yExchange") DirectExchange yExchange){
    55. return BindingBuilder.bind(QD).to(yExchange).with("YD");
    56. }
    57. }

            4.生产者和消费者

    1. @RestController
    2. public class ProducerController {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. @GetMapping("/ttl/{msg}")
    6. public String sendMsg(@PathVariable String msg){
    7. System.out.println(new Date()+":发送一条信息给两个 TTL 队列:"+ msg);
    8. rabbitTemplate.convertAndSend("X","XA","来自ttl为10s的队列:"+msg);
    9. rabbitTemplate.convertAndSend("X","XB","来自ttl为40s的队列:"+msg);
    10. return "发送成功!";
    11. }
    12. }
    1. @Component
    2. public class Consumer {
    3. @RabbitListener(queues = "QD")
    4. public void receive(Message message, Channel channel){
    5. System.out.println(new Date()+":收到死信队列的信息:"+ new String(message.getBody()));
    6. }
    7. }

            5.测试

    1. //发送请求:
    2. http://localhost:8080/ttl/我爱新世纪百货
    3. //控制台结果:
    4. Sun Aug 14 09:41:40 CST 2022:发送一条信息给两个 TTL 队列:我爱新世纪百货
    5. Sun Aug 14 09:41:50 CST 2022:收到死信队列的信息:来自ttl为10s的队列:我爱新世纪百货
    6. Sun Aug 14 09:42:21 CST 2022:收到死信队列的信息:来自ttl为40s的队列:我爱新世纪百货

            第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。

    2.2.2 优化-动态设置TTL

            对于上面的案例,如果这样使用的话,每增加一个新的时间需求,就要新增一个队列。所以接下来对代码进行优化。接下来新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

             优化后的配置类:

    1. @Configuration
    2. public class TTLQueueConfigOptimized {
    3. public static final String X_EXCHANGE = "X";
    4. public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    5. public static final String QUEUE_C = "QC";
    6. public static final String QUEUE_A = "QA";
    7. public static final String QUEUE_B = "QB";
    8. public static final String DEAD_LETTER_QUEUE = "QD";
    9. //声明交换机X
    10. @Bean("xExchange")
    11. public DirectExchange xExchange(){
    12. return new DirectExchange(X_EXCHANGE);
    13. }
    14. //声明交换机Y
    15. @Bean("yExchange")
    16. public DirectExchange yExchange(){
    17. return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    18. }
    19. //声明队列A并绑定到交换机Y
    20. @Bean("QA")
    21. public Queue QA(){
    22. Map args=new HashMap<>();
    23. args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明A绑定的死信交换机
    24. args.put("x-dead-letter-routing-key","YD");//声明队列的死信路由key
    25. args.put("x-message-ttl",10000);//声明TTL-10s
    26. return QueueBuilder.durable(QUEUE_A).withArguments(args).build();//声明队列A
    27. }
    28. //A绑定到交换机X
    29. @Bean
    30. public Binding ABindX(@Qualifier("QA") Queue QA,@Qualifier("xExchange") DirectExchange xExchange){
    31. return BindingBuilder.bind(QA).to(xExchange).with("XA");
    32. }
    33. //声明队列B并绑定到交换机Y
    34. @Bean("QB")
    35. public Queue QB(){
    36. Map args=new HashMap<>();
    37. args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明B绑定的死信交换机
    38. args.put("x-dead-letter-routing-key","YD");//声明队列的死信路由key
    39. args.put("x-message-ttl",40000);//声明TTL-40s
    40. return QueueBuilder.durable(QUEUE_B).withArguments(args).build();//声明队列B
    41. }
    42. //B绑定到交换机X
    43. @Bean
    44. public Binding BBindX(@Qualifier("QB") Queue QB,@Qualifier("xExchange") DirectExchange xExchange){
    45. return BindingBuilder.bind(QB).to(xExchange).with("XB");
    46. }
    47. //声明死信队列D
    48. @Bean("QD")
    49. public Queue QD(){
    50. return new Queue(DEAD_LETTER_QUEUE);
    51. }
    52. //D绑定到死信交换机Y
    53. @Bean
    54. public Binding DBindY(@Qualifier("QD") Queue QD,
    55. @Qualifier("yExchange") DirectExchange yExchange){
    56. return BindingBuilder.bind(QD).to(yExchange).with("YD");
    57. }
    58. //声明队列C并绑定死信交换机
    59. @Bean("QC")
    60. public Queue QC(){
    61. Map args = new HashMap<>(3);
    62. //声明当前队列绑定的死信交换机
    63. args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
    64. //声明当前队列的死信路由 key
    65. args.put("x-dead-letter-routing-key", "YD");
    66. //没有声明 TTL 属性
    67. return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    68. }
    69. //声明队列C绑定X交换机
    70. @Bean
    71. public Binding CBindingX(@Qualifier("QC") Queue queueC,
    72. @Qualifier("xExchange") DirectExchange xExchange){
    73. return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    74. }
    75. }

            优化后的生产者类(消费者类不变):

    1. @RestController
    2. public class ProducerController {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. @GetMapping("/ttl/{msg}/{TTL}")
    6. public String sendMsg(@PathVariable String msg,@PathVariable String TTL){
    7. System.out.println(new Date()+":发送3条信息给 TTL 队列:"+ msg);
    8. rabbitTemplate.convertAndSend("X","XC","来自ttl为"+TTL+"ms的队列:"+msg,correlationData->{
    9. correlationData.getMessageProperties().setExpiration(TTL);
    10. return correlationData;
    11. });
    12. rabbitTemplate.convertAndSend("X","XA","来自ttl为10000ms的队列:"+msg);
    13. rabbitTemplate.convertAndSend("X","XB","来自ttl为40000ms的队列:"+msg);
    14. return "发送成功!";
    15. }
    16. }

            测试:

    1. //在浏览器中输入地址:
    2. http://localhost:8080/ttl/我爱新世纪百货/2000
    3. //结果:
    4. Sun Aug 14 10:04:20 CST 2022:发送3条信息给 TTL 队列:我爱新世纪百货
    5. Sun Aug 14 10:04:22 CST 2022:收到死信队列的信息:来自ttl为2000ms的队列:我爱新世纪百货
    6. Sun Aug 14 10:04:30 CST 2022:收到死信队列的信息:来自ttl为10000ms的队列:我爱新世纪百货
    7. Sun Aug 14 10:05:00 CST 2022:收到死信队列的信息:来自ttl为40000ms的队列:我爱新世纪百货

    2.2.3 使用插件实现延迟队列

            前面提到,在消息设置中设置TTL时会存在一些问题。比如第一个消息延时时长很长,而第二个消息延时时长很短,但第二个消息不会优先执行。想要解决这样的问题要使用到RabbitMQ的插件。

            1.首先去下载 rabbitmq_delayed_message_exchange 插件,后缀名是 .ez 。注意版本号要与安装的rabbitMQ一致!

            2.复制到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins (插件目录),若权限不足,使用下面命令修改文件夹权限:

    1. chmod -R 777 xx/
    2. //xx表示当前所处文件夹下的子文件夹名字,该命令同时修改所有子文件夹的权限

            3.在插件目录下使用管理员权限安装插件

    1. su
    2. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

            接下来对下图的结构作实现:

            1.配置类

    1. @Configuration
    2. public class DelayedQueueConfig {
    3. public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    4. public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    5. public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    6. //声明延迟队列
    7. @Bean("Q")
    8. public Queue delayedQueue() {
    9. return new Queue(DELAYED_QUEUE_NAME);
    10. }
    11. //声明自定义交换机
    12. @Bean("E")
    13. public CustomExchange delayedExchange() {
    14. Map args = new HashMap<>();
    15. //自定义交换机的类型
    16. args.put("x-delayed-type", "direct");
    17. return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    18. }
    19. //绑定routingKey
    20. @Bean
    21. public Binding bindingDelayedQueue(@Qualifier("Q") Queue queue,
    22. @Qualifier("E") CustomExchange delayedExchange) {
    23. return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    24. }
    25. }

            2.生产者

    1. @RestController
    2. public class Producer {
    3. public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    4. public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    5. @Autowired
    6. RabbitTemplate rabbitTemplate;
    7. @GetMapping("delayMsg/{message}/{delayTime}")
    8. public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    9. rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
    10. correlationData ->{
    11. correlationData.getMessageProperties().setDelay(delayTime);
    12. return correlationData;
    13. });
    14. System.out.println(new Date()+":发送信息给队列:"+ message);
    15. return "发送成功!";
    16. }
    17. }

            3.消费者

    1. @Component
    2. public class Consumer {
    3. public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    4. @RabbitListener(queues = DELAYED_QUEUE_NAME)
    5. public void receiveDelayedQueue(Message message){
    6. System.out.println(new Date()+":收到延迟队列的信息:"+ new String(message.getBody()));
    7. }
    8. }

            4.测试:

    1. //浏览器输入地址:
    2. http://localhost:8080/delayMsg/我爱新世纪百货/2000
    3. //结果:
    4. Sun Aug 14 11:02:48 CST 2022:发送信息给队列:我爱新世纪百货
    5. Sun Aug 14 11:02:50 CST 2022:收到延迟队列的信息:我爱新世纪百货

    2.3 总结

            延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送消息可靠投递死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

            当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

  • 相关阅读:
    HOC的运用
    面试10分钟就完事了,问的实在是太...
    flutter在导航栏处实现对两个列表的点击事件
    G:\r\tcga_example-master\scripts 生存分析 tcgaexample jimmy 存活分析 单条线生存分析
    以深圳为例Python一键生成核酸检测日历
    不就是Java吗 之 接口
    网络基本结构及数据传输方式
    【黑马程序员】SpringCloud——Eureka
    PTA 6-1 删除字符串中所有*
    性能测试jmeter连接数据库jdbc(sql server举例)
  • 原文地址:https://blog.csdn.net/weixin_62427168/article/details/126324120