• rabbit MQ的延迟队列处理模型示例(基于SpringBoot死信模式)


    在这里插入图片描述

    说明:
    生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
    这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

    这里用SpringBoot maven:
            
                org.springframework.boot
                spring-boot-starter-amqp
           
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
    在这里插入图片描述

    框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

    package com.esint.configs;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * TTL延迟队列配置文件类
     *
     */
    @Configuration
    public class TtlQueueConfig {
        //
        //普通交换机的名称 X
        public static final String X_EXCHANGE = "X";
    
        //死信交换机名称 Y
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    
        //普通队列QA QB
        public static final String QUEUE_A = "QA";
        public static final String QUEUE_B = "QB";
        //死信队列名称QD
        public static final String DEAD_LETTER_QUEUE = "QD";
        //
        //声明X_EXCHANGE
        @Bean("xExchange")
        public DirectExchange xExchange(){
            return new DirectExchange(X_EXCHANGE);
        }
    
        //声明死信交换Y_DEAD_LETTER_EXCHANGE
        @Bean("yExchange")
        public DirectExchange yExchange(){
            return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
        }
        //声明队列 QA
        @Bean("queueA")
        public Queue queueA(){
            Map<String, Object> arguments = new HashMap<>(3);
            //设置死信交换机
            arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey (死信后充当了消费者的发送路由)
            arguments.put("x-dead-letter-routing-key","YD");
            //消息过期时间
            arguments.put("x-message-ttl",10000);
    
            return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    
        }
    
        //声明队列 QB
        @Bean("queueB")
        public Queue queueB(){
            Map<String, Object> arguments = new HashMap<>(3);
            //设置死信交换机
            arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //设置死信RoutingKey (死信后充当了消费者的发送路由)
            arguments.put("x-dead-letter-routing-key","YD");
            //消息过期时间
            arguments.put("x-message-ttl",40000);
    
            return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    
        }
        //声明死信队列QD
        @Bean("queueD")
        public Queue queueD(){
            return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
        }
        //捆绑
        //绑定队列QA与交换机X_EXCHANGE
        @Bean
        public Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,
                                          @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
        }
        //绑定队列QB与交换机X_EXCHANGE
        @Bean
        public Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,
                                          @Qualifier("xExchange") DirectExchange xExchange){
            return BindingBuilder.bind(queueB).to(xExchange).with("XB");
        }
    
        //绑定队列QD与交换机Y_Exchange
        @Bean
        public Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,
                                           @Qualifier("yExchange")DirectExchange yExchange){
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
    package com.esint.controller;
    
    //发送延迟消息
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    @Slf4j
    @RestController
    @RequestMapping("/ttl")
    public class SendMesController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/senMsg/{message}")
        public void sendMes(@PathVariable String message){
            log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);
    
            rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
            rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    消费者与死信队列创建一个监听者示例:
    package com.esint.consumer;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 队列TTL消费者
     */
    
    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {
        //接受消息
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel) throws Exception{
    
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    rabbitmq的配置文件:

    spring:
      rabbitmq:
        host: *.*.*.*
        port: 5672
        username: guest
        password: guest
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

    队列:
    在这里插入图片描述

    交换机:
    在这里插入图片描述
    点开详细后,也能考到他们之间的绑定关系:

    在这里插入图片描述

    在这里插入图片描述

    消息发布测试:

    生产者发送消息:

    浏览器:
    http://127.0.0.1:19092/ttl/senMsg/nice
    
    • 1
    • 2

    通过生产者发送:nice

    当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice
    
    • 1

    消费者在10s后和40秒分别收到了消息:
    在这里插入图片描述

  • 相关阅读:
    36、Flink 的 WindowAssigner之滑动窗口示例
    python经典百题之矩阵对角线之和
    应用层协议 ——— HTTP协议
    Worthington脱氧核糖核酸及相关研究工具
    【数据结构初阶】二叉树
    NPDP产品经理证书是什么行业的证书?
    大二Web课程设计——张家界旅游网站设计与实现(HTML+CSS+JavaScript)
    一文理解UDS安全访问服务(0x27)
    微信小程序
    输出所有最长公共子序列
  • 原文地址:https://blog.csdn.net/qq_17040587/article/details/134525771