• 【无标题】


    整合SpringBoot

    依赖引入及配置

    • pox.xml
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 配置文件
    # RabbitMQ config
    --- # RabbitMQ 配置
    spring:
      rabbitmq:
        host: 124.222.127.157
        port: 5672
        username: test
        password: guest
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    MQ五种模式

    简单模式 一对一消费

    一对一消费,只有一个消费者能接收到

    定义消费者
    /**
     * 基础队列 -- 消费者 一对一消费,只有一个消费者能接收到
     * */
    @Component
    @Slf4j
    public class HolloWordListener {
        // @RabbitListener(queues = ("simple.queue")) // queues需手动先创建队列
        @RabbitListener(queuesToDeclare = @Queue("simple.queue"))  // queuesToDeclare 自动声明队列
        public void helloWordListener(String message) {
            log.info("监听到mq消息,消费者进行消费消息");
            log.info("message = " + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    定义生产者
    
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
     
        /**
         * RabbitMQ 功能测试 简单模式
         *
         * 
         *     一个生产者,一个消费者
         * 
    */
    @Log(title = "RabbitMQ功能测试", businessType = BusinessType.OTHER) @PostMapping("/simple") @ResponseBody public void testSimpleQueue() { String queueName = "simple.queue"; // 队列名称 String message = "heel,simple.queue"; // 要发送的消息 JSONObject jsonObject = new JSONObject(); jsonObject.set("orderCode", 20220901401L); jsonObject.set("finishDate", new Date()); jsonObject.set("workId", 111111L); log.info("向MQ队列:{}中写入消息:{}", queueName, jsonObject.toString()); rabbitTemplate.convertAndSend(queueName, jsonObject.toString()); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    测试示例
    • postman调用

    • 控制台输出
      在这里插入图片描述
    向MQ队列:simple.queue中写入消息:{"workId":111111,"orderCode":20220901401,"finishDate":1663122121292}
    监听到mq消息,消费者进行消费消息
    message = {"workId":111111,"orderCode":20220901401,"finishDate":1663122121292}
    
    • 1
    • 2
    • 3

    Work queues工作队列(一对多)

    多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置能者多劳模式,谁完成的快,谁多做一点

    定义消费者
    
    /**
     *
     * 2、Work queues工作队列
     * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置能者多劳模式,谁完成的快,谁多做一点
     *
     *
     * */
    @Component
    public class WoekWordListener {
    
        @RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
        public void holloWordListener(String message) throws InterruptedException {
            Thread.sleep(200);
            System.out.println("message1 = " + message);
        }
    
        @RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
        public void holloWordListener1(String message) throws InterruptedException {
            Thread.sleep(400);
            System.out.println("message2 = " + message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    定义生产者
    
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        /**
         * RabbitMQ 功能测试 WorkQueue模式
         *
         * 
         *     一个生产者,多个消费者
         * 
    */
    @Log(title = "RabbitMQ功能测试 WorkQueue模式", businessType = BusinessType.OTHER) @PostMapping("/work") @ResponseBody public void testWorkQueue() { String queueName = "workQueue"; String message = "hello,work.queue__"; for (int i = 0; i < 10; i++) { log.info("向MQ队列:{}中写入消息:{}", queueName, message + i); rabbitTemplate.convertAndSend(queueName, message + i); log.info("i = " + i); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    测试示例
    • postman调用

    • 控制台输出

    1.1生产者生产消息

     向MQ队列:workQueue中写入消息:hello,work.queue__0
     向MQ队列:workQueue中写入消息:hello,work.queue__1
     向MQ队列:workQueue中写入消息:hello,work.queue__2
     向MQ队列:workQueue中写入消息:hello,work.queue__3
     向MQ队列:workQueue中写入消息:hello,work.queue__4
     向MQ队列:workQueue中写入消息:hello,work.queue__5
     向MQ队列:workQueue中写入消息:hello,work.queue__6
     向MQ队列:workQueue中写入消息:hello,work.queue__7
     向MQ队列:workQueue中写入消息:hello,work.queue__8
     向MQ队列:workQueue中写入消息:hello,work.queue__9
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.2消费者消费消息

    监听到MQ消息,work1消费消息 message1 = hello,work.queue__0
    监听到MQ消息,work2消费消息 message2 = hello,work.queue__1
    监听到MQ消息,work1消费消息 message1 = hello,work.queue__2
    监听到MQ消息,work1消费消息 message1 = hello,work.queue__4
    监听到MQ消息,work2消费消息 message2 = hello,work.queue__3
    监听到MQ消息,work1消费消息 message1 = hello,work.queue__6
    监听到MQ消息,work1消费消息 message1 = hello,work.queue__8
    监听到MQ消息,work2消费消息 message2 = hello,work.queue__5
    监听到MQ消息,work2消费消息 message2 = hello,work.queue__7
    监听到MQ消息,work2消费消息 message2 = hello,work.queue__9
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Publish/Subscribe发布订阅模型

    发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。

    实现方式是加入了exchange(交换机),
    注意:交换机是不缓存消息的

    定义消费者
    // 消费者直接绑定交换机,指定类型为fanout
    @Component
    @Slf4j
    public class FanoutExchangeListener {
        // 不指定队列,消息过了就没了
        //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
    
        /** 指定队列,可以接收缓存到队列里的消息 */
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test",durable = "true" ),exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
        public void reveivel(String message){
            log.info("message1 = " + message);
        }
    
        @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
        public void reveivel2(String message){
            log.info("message2 = " + message);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    定义生产者
        /**
         * RabbitMQ 功能测试 发布-订阅模式 Publish/发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者
         * 
         *  发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。
         *  实现方式是加入了exchange(交换机),注意:交换机是不缓存消息的
         *
         * 
    */
    @Log(title = "RabbitMQ功能测试 发布-订阅模式", businessType = BusinessType.OTHER) @PostMapping("/publish") @ResponseBody public void testPublishQueue() { String exchangeName = "fanoutTest"; String routingKey = ""; String message = "hello,work.queue__"; log.info("向MQ队列 交换机名称:{} 中写入消息:{}", exchangeName, message); // 参数1:交换机名称 , 参数2routingKey,(fanout类型可不写) , 参数3,消息内容 rabbitTemplate.convertAndSend(exchangeName, routingKey, message); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    测试示例
    • postman调用
    http://127.0.0.1:9091/app/mq/publish
    
    • 1
    • 控制台输出

    Routing路由模型

    routing模型也是将消息发送到交换机

    使用的是Direct类型的交换机,会将接收到的消息根据规则路由到指定的Queue(队列),因此称为路由模式

    定义消费者
    
    /** 消费者直接绑定交换机,指定类型为direct,并指定key表示能消费的key */ 
    @Component
    public class RoutingExchangeListener {
    
        // 不指定队列,消息过了就没了
        //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"info","error"})})
    
        // 指定队列,可以接收缓存到队列里的消息
        // key = {"info","error"} 表示我能接收到routingKey为 info和error的消息
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test1",durable = "true" ),exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"info","error"})})
        public void receivel1(String message){
            System.out.println("message1 = " + message);
        }
        // key = {"error"} 表示我只能接收到routingKey为 error的消息
        @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"error"})})
        public void receivel2(String message){
            System.out.println("message2 = " + message);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    定义生产者
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    	
        /**
         * RabbitMQ 测试 路由模式 routing
         *
         * 
         *   routing模型也是将消息发送到交换机
         *   使用的是Direct类型的交换机,会将接收到的消息根据规则路由到指定的Queue(队列),因此称为路由模式
         * 
    */
    @Log(title = "RabbitMQ功能测试 路由模式", businessType = BusinessType.OTHER) @PostMapping("/routing") @ResponseBody public void testRoutingQueue() { String exchangeName = "direstTest"; String routingInfoKey = "info"; String routingErrorKey = "error"; String messageInfo = "发送info的key的路由消息"; String messageError = "发送error的key的路由消息"; log.info("向MQ队列 routing模式 交换机名称:{} 中写入消息:{}", exchangeName, messageInfo); rabbitTemplate.convertAndSend(exchangeName, routingInfoKey, messageInfo); log.info("向MQ队列 routing模式 交换机名称:{} 中写入消息:{}", exchangeName, messageError); rabbitTemplate.convertAndSend(exchangeName, routingErrorKey, messageError); }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    测试示例
    • postman调用
    http://127.0.0.1:9091/app/mq/routing
    
    • 1
    • 控制台输出

    Topic 模式

    topicExchange与directExchange类型,区别在于routingKey必须是多个单词的列表,并且以 . 分隔

    *(代表通配符,任意一个字段)
    #(号代表一个或多个字段)

    定义消费者
    
        /**
         * RabbitMQ 测试 Topic模式
         */
        @Log(title = "RabbitMQ功能测试 Topic模式", businessType = BusinessType.OTHER)
        @PostMapping("/topic")
        @ResponseBody
        public void testTopicQueue() {
            String exchangeName = "topicList";
            String message1 = "topic路由消息,use.save";
            String message2 = "topic路由消息,order.select.getone";
    
            String routingKey1 = "user.save";
            String routingKey2 = "order.select.getone";
    
            log.info("向MQ队列 Topic模式 交换机名称:{} 中写入消息:{}", exchangeName, message1);
            rabbitTemplate.convertAndSend(exchangeName, routingKey1, message1);
            log.info("向MQ队列 Topic模式 交换机名称:{} 中写入消息:{}", exchangeName, message2);
            rabbitTemplate.convertAndSend(exchangeName, routingKey2, message2);
        }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    定义生产者
    
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Component
    @Slf4j
    public class TopicsExchangeListener {
    
        // 不指定队列,消息过了就没了
        //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})
    
        // 指定队列,可以接收缓存到队列里的消息
        // key = {"user.save","user.*"} 表示能消费 routingkey为  user.save 和 user.任意一个字符  的消息
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "test2", durable = "true"), exchange = @Exchange(name = "topicList", type = ExchangeTypes.TOPIC), key = {"user.save", "user.*"})})
        public void recevicel1(String message) {
            log.info("message1 = " + message);
        }
    
        // key = {"order.#","user.*"} 表示能消费 routingkey为  order.一个或多个字符   和  user.任意一个字符  的消息
        @RabbitListener(bindings = {@QueueBinding(value = @Queue, exchange = @Exchange(name = "topicList", type = ExchangeTypes.TOPIC), key = {"order.#", "user.*"})})
        public void recevicel2(String message) {
            log.info("message2 = " + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    测试示例
    • postman调用
    http://127.0.0.1:9091/app/mq/topic
    
    • 1
    • 控制台输出
    • 生产者日志

      消费者消费日志

    参考博文

    rabbitmq详解
    rabbitmq笔记(看完即入门)
    消息队列作用(解耦、异步、削峰)图详解

  • 相关阅读:
    竞赛选题 深度学习人脸表情识别算法 - opencv python 机器视觉
    基于模型驱动的深度学习高光谱图像融合研究_孙杨霖
    Windows端使用命令启动Natapp
    【java学习—十五】创建多线程的两种方式(2)
    Linux 虚拟机内挂载 iso 文件
    智能文档图像处理技术:解决大数据时代文档图像处理难题
    sqoop的安装和使用
    ElasticSearch (ES)学习之路(二)Win10安装ES,可视化界面,Kibanna
    js——arguments的使用
    Recommended Azure Monitors
  • 原文地址:https://blog.csdn.net/m0_37903882/article/details/126846840