• Spring Boot使用rabbitMq(二)进阶篇


    根据上篇文件:spring boot使用rabbitMq(一)基础篇
    已经可以初步使用mq了,接下来就是交换机和消息队列绑定
    1.进入mq监控台
    进入exchanges (交换机)
    在这里插入图片描述
    在交换机中,一共可分为四类交换机 :Direct、Fanout、Topic、Headers。

    准备 创建MqQueueConfig

    创建三个消息队列

    package com.kofan.server.rabbitMq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MqQueueConfig {
        @Bean
        public Queue userQueue() {
            /**
             * name:    队列名称
             * durable: 是否持久化
             * exclusive: 是否私有化。如果设置为true,则只有创建者可以使用此队列。
             * autoDelete: 是否自动删除。临时队列,当最后一个消费者断开连接后,会自动删除。
             * */
            return new Queue("userQueue", true, false, false);
        }
    
        @Bean
        public Queue queue() {
            return new Queue("queue");
        }
        @Bean
        public Queue queue1() {
            return new Queue("queue1");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    1. Direct交换机

    Direct为直连交换机 机制为1对1,消费者为多个且为同一个queue,则会顺序发送。先发送给A,当出现另一条信息则发送B,再出现发A
    具体实现:

    创建DirectConfig文件

    package com.kofan.server.rabbitMq.config.exchange;
    
    import com.kofan.server.rabbitMq.config.MqQueueConfig;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    /**
     * @author king
     * 直连交换机 1对1
     */
    @Configuration
    public class DirectConfig {
    	// 引入消息队列
        @Resource
        MqQueueConfig mqQueueConfig;
        /**
         * 直连交换机 1对1,接收为多个时且为同一个queue,则会顺序发送。先发送给A,当出现另一条信息则发送B,再出现发A
         * @return
         */
        @Bean
        public DirectExchange directExchange() {
            //Direct交换机
            return new DirectExchange("userExchange", true, false);
        }
    
        /**
         * 绑定交换机 1-1
         * @return
         */
        @Bean
        public Binding bindDirect() {
            //链式写法,绑定交换机和队列,并设置匹配键
            return BindingBuilder
                    //绑定队列
                    .bind(mqQueueConfig.userQueue())
                    //到交换机
                    .to(directExchange())
                    // 设置匹配键指向Queue
                    .withQueueName();
            // .with("userRouting");设置匹配键
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    发送信息

    package com.kofan.server.rabbitMq.component;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MySender {
    	@Autowired
        private AmqpTemplate rabbitTemplate;
        /**
         * 发送信息
         */
        public void send() {
            String context = "hi, i'm Direct";
            // 参数分别为:交换机,指定的消息队列,消息
                this.rabbitTemplate.convertAndSend("userExchange","userQueue", context);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    接收信息
    这里我们使用queuesToDeclare =@Queue(“userQueue”),代表如果不存在该消息队列则创建该消息队列,发送方分离后,不会因为无该消息队列而无法启动

    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyReceiver {
    	@RabbitHandler
        @RabbitListener(queuesToDeclare =@Queue("userQueue"))
        public void process1(Object user) {
            System.out.println("我接收信息了userQueue");
            System.out.println("Receiver  : " + user);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2. Fanout交换机

    广播交换机, 发送到交换机的消息都会被转发到与该交换机绑定的所有消息队列上
    具体实现:

    package com.kofan.server.rabbitMq.config.exchange;
    
    import com.kofan.server.rabbitMq.config.MqQueueConfig;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.beans.factory.config.BeanPostProcessor;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    @Configuration
    public class FanoutConfig implements BeanPostProcessor {
    	// 引入消息队列
        @Resource
        MqQueueConfig mqQueueConfig;
        /**
         * 广播交换机 一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            //fanout交换机
            return new FanoutExchange("fanoutExchange", true, false);
        }
    
        /**
         * 绑定交换机 多-1
         * @return
         */
        @Bean
        public Binding bindFanoutA() {
            //链式写法,绑定交换机和队列,并设置匹配键
            return BindingBuilder
                    //绑定队列
                    .bind(mqQueueConfig.queue())
                    //到交换机
                    .to(fanoutExchange());
        }
        /**
         * 绑定交换机 多-1
         * @return
         */
        @Bean
        public Binding bindFanoutB() {
            //链式写法,绑定交换机和队列,并设置匹配键
            return BindingBuilder
                    //绑定队列
                    .bind(mqQueueConfig.queue1())
                    //到交换机
                    .to(fanoutExchange());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    发送信息

    package com.kofan.server.rabbitMq.component;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MySender {
    	@Autowired
        private AmqpTemplate rabbitTemplate;
        /**
         * 广播信息
         */
        public void sendFanout() {
            String context = "hi, tom " + new Date();
            /**
             * s : Exchange
             * s1 : routingKey 关键值
             * o: 消息
             */
            rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    接收信息

    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyReceiver {
    	@RabbitHandler
        @RabbitListener(queuesToDeclare =@Queue("queue"))
        public void process2(Object messages) {
            System.out.println("我接收信息了queue");
            System.out.println("Receiver  : " + messages);
        }
        @RabbitHandler
        @RabbitListener(queuesToDeclare =@Queue("queue1"))
        public void process2(Object messages) {
            System.out.println("我接收信息了queue1");
            System.out.println("Receiver  : " + messages);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3. Topic交换机

    条件分发交换机, 。通配符有两种:“*” 、 “#”
    符号 *:有且只匹配一个词。比如 a.*可以匹配到"a.b"、“a.c”,但是匹配不了"a.b.c"。
    符号 #:匹配一个或多个词。比如"d.#“既可以匹配到"d.b”、“d.c”,也可以匹配到"d.b.c"。
    具体实现:

    package com.kofan.server.rabbitMq.config.exchange;
    
    import com.kofan.server.rabbitMq.config.MqQueueConfig;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.beans.factory.config.BeanPostProcessor;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    /**
     * @author king
     *
     * 。通配符有两种:"*" 、 "#"
     * * 符号:有且只匹配一个词。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
     *
     * # 符号:匹配一个或多个词。比如"d.#"既可以匹配到"d.b"、"d.c",也可以匹配到"d.b.c"。
     */
    @Configuration
    public class TopicConfig implements BeanPostProcessor {
        @Resource
        MqQueueConfig mqQueueConfig;
        /**
         * 条件分发交换机
         * @return
         */
        @Bean
        public TopicExchange topicExchange() {
            //配置TopicExchange交换机
            return new TopicExchange("topicExchange", true, false);
        }
    
        @Bean
        public Binding bindTopicA() {
            //队列A绑定到FanoutExchange交换机
            return BindingBuilder.bind(mqQueueConfig.queue1())
                    .to(topicExchange())
                    .with("a.*");
        }
    
        @Bean
        public Binding bindTopicB() {
            //队列A绑定到FanoutExchange交换机
            return BindingBuilder.bind(mqQueueConfig.queue())
                    .to(topicExchange())
                    .with("a.*");
        }
    
        @Bean
        public Binding bindTopicC() {
            //队列A绑定到FanoutExchange交换机
            return BindingBuilder.bind(mqQueueConfig.userQueue())
                    .to(topicExchange())
                    .with("d.#");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    发送信息

    package com.kofan.server.rabbitMq.component;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MySender {
    	@Autowired
        private AmqpTemplate rabbitTemplate;
        /**
         * 条件分发
         */
        public void sendTopic() {
            String context = "hi, tom this is  TopicConfig" + new Date();
            /**
             * s : Exchange
             * s1 : routingKey 关键值
             * o: 消息
             */
            rabbitTemplate.convertAndSend("topicExchange", "d.s.sas.sas", context);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    接收信息

    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyReceiver {
    	@RabbitHandler
        @RabbitListener(queuesToDeclare =@Queue("queue"))
        public void process2(Object messages) {
            System.out.println("我接收信息了queue");
            System.out.println("Receiver  : " + messages);
        }
        @RabbitHandler
        @RabbitListener(queuesToDeclare =@Queue("queue1"))
        public void process2(Object messages) {
            System.out.println("我接收信息了queue1");
            System.out.println("Receiver  : " + messages);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4. Headers交换机

    匹配请求头中所带的键值进行路由,分为全匹配和部分匹配
    具体实现:

    package com.kofan.server.rabbitMq.config.exchange;
    
    import com.kofan.server.rabbitMq.config.MqQueueConfig;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.HeadersExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author king
     * 匹配请求头中所带的键值进行路由
     */
    @Configuration
    public class HeadersConfig {
        @Resource
        MqQueueConfig mqQueueConfig;
    
        @Bean
        public HeadersExchange headersExchange() {
            return new HeadersExchange("headersExchange", true, false);
        }
    	// 可配置多个
        @Bean
        public Binding bindHeadersA() {
            Map<String, Object> map = new HashMap<>();
            map.put("key_one", "java");
            map.put("key_two", "rabbit");
            //全匹配
            return BindingBuilder.bind(mqQueueConfig.queue())
                    .to(headersExchange())
                    .whereAll(map)
                    .match();
        }
    	// 可配置多个
        @Bean
        public Binding bindHeadersB() {
            Map<String, Object> map = new HashMap<>();
            map.put("key_three", "coke");
            map.put("headers_fore", "sky");
            //部分匹配
            return BindingBuilder.bind(mqQueueConfig.queue1())
                    .to(headersExchange())
                    .whereAny(map)
                    .match();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    发送信息

    package com.kofan.server.rabbitMq.component;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @Component
    public class MySender {
    	@Autowired
        private AmqpTemplate rabbitTemplate;
         public void sendHeaders() {
            String context = "hi, tom this is  TopicConfig" + new Date();
            Map<String, Object> map = new HashMap<>();
    //        map.put("key_one", "java");
    //        map.put("key_two", "rabbit");
            map.put("key_three", "coke");
    //        map.put("headers_fore", "sky");
            MessageProperties messageProperties = new MessageProperties();
            //消息持久化
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            messageProperties.setContentType("UTF-8");
            //添加消息
            messageProperties.getHeaders().putAll(map);
            Message message = new Message(context.getBytes(), messageProperties);
    
            /**
             * s : Exchange
             * s1 : routingKey 关键值
             * o: 消息
             */
            rabbitTemplate.convertAndSend("headersExchange", "", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    接收信息

    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyReceiver {
    	@RabbitHandler
        @RabbitListener(queuesToDeclare =@Queue("queue"))
        public void process2(Object messages) {
            System.out.println("我接收信息了queue");
            System.out.println("Receiver  : " + messages);
        }
        @RabbitHandler
        @RabbitListener(queuesToDeclare =@Queue("queue1"))
        public void process2(Object messages) {
            System.out.println("我接收信息了queue1");
            System.out.println("Receiver  : " + messages);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    [激光原理与应用-39]:《光电检测技术-6》- 光干涉的原理与基础
    μC/OS-II---整理学习1
    第 3 章:GO 的接口和抽象 拓展篇 - CRUD 接口实现示例
    IDEA07:Mybatis和Springboot操作数据库
    C# 中的特性
    重新组织我的知识库
    SpringBoot整合Websocket,实现作为客户端接收消息的同时作为服务端向下游客户发送消息
    任务和特权级保护
    clickhouse中SummingMergeTree
    Java如何实现统计在线人数的功能?
  • 原文地址:https://blog.csdn.net/gd898989/article/details/125561168