• RabbitMQ------交换机(fanout(扇出)、direct(直接)、topic(主题))(五)


    RabbitMQ------交换机(五)

    交换机是rabbitMq的一个重要组成部分,生产者先将消息发送到交换机,再由交换机路由到不同的队列当中。
    之前都没有指定交换机。
    传一个空串,默认会走AMQP default默认交换机。

    channel.basicPublish("",QUEUE_NAME,MessageProperties.PRESISTENT_TEXT_PLAN,message.getBytes("UTF-8"));
    
    • 1

    一个消息只能被消费一次,为了满足一个消息被不同消费者,各消费一次的需求,可以将同一个消息指定交换机,有交换机路由(绑定)到不同的队列。
    将消息传递给多个消费者(发布/订阅模式)
    在这里插入图片描述

    Exchanges

    概念

    RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。通常生产者并不知道消息传递到哪些队列中。
    相反,生产者只能将消息发送到交换机,交换机一方面接收来自生产者的消息,一方面将它们推入队列。
    应该将消息放到特定队列中,还是放到许多队列中,还是应该丢弃,由交换机的类型决定。

    类型

    有以下类型:
    直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
    扇出就是发布订阅类型。
    标题已不常用。
    直接类型也叫路由类型。

    无名exchange

    默认交换机,传递空字符串就是无名类型交换机。即默认类型。
    消息能够路由发送到队列中,其实是有routingKey(bindingkey)绑定key指定的,如果它存在的话。

    临时队列

    队列的名称至关重要,需要指定消费者去消费哪个队列的消息。
    临时队列是指,没有持久化的队列,可以在控制台的Features中D标识,有D就是持久化。
    创建临时队列方式。

    String queue = channel.queueDeclare().getQueue();
    
    • 1

    临时队列再管理界面中的Features中,标记为AD Exd。

    绑定(Bindings)

    binding其实是exchange和queue之间的桥梁,它告诉exchange和那个队列进行绑定关系,通过绑定关系,交换机将消息路由给指定队列。

    Fanout交换机

    Fanout扇出,对应的是发布订阅模式。它是将接受到的所有消息广播到它知道的所有队列中(类似QQ群中,一个人发消息所有人都能够看到)。
    生产者:
    如果先启动消费者,就不需要再声明logs的交换机,声明交换机代码可以注释掉。
    此处生产者、和消费者指定的routingkey策略都是空串。

    public class ProducerLog {
        public static  final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //声明交换机,可注释
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
            Scanner scanner = new Scanner(System.in);
    
            while (scanner.hasNext()){
                String next = scanner.next();
                channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes("UTF-8"));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    消费者01
    消费者01和消费者02代码一致,因此只写一种即可。

    /**
     * 扇出 fanout 发布订阅
     * 消费者01
     */
    public class ConsumerLogs01 {
        public static  final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //声明一个队列 临时队列
            /**
             * 生成一个临时队列,队列的名称是随机的
             * 当消费者断开与队列的连接的时候,队列就自动删除
             */
            String queue = channel.queueDeclare().getQueue();
            /**
             * 交换机与堆绑定
             * 队列名
             * 交换机名
             * 绑定策略
             */
            channel.queueBind(queue,EXCHANGE_NAME,"");
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("消费者01接收到的消息"+new String(message.getBody(),"UTF-8"));
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            channel.basicConsume(queue,true,deliverCallback,cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    结果:生产者发送消息,消费者1和消费者2都能够接收到。

    Direct交换机

    Direct直接交换机,直接交换机和扇出交换机不同的是,两个队列的绑定策略不同。如果交换机类型为direcet,但是绑定的多个队列的routingkey是一致的,那么效果和扇出fanout交换机就类似了。
    生产者:

    public class ProducerDirectLog {
        public static  final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String next = scanner.next();
                //指定routingkey,相当于指定发送到的交换机
                channel.basicPublish(EXCHANGE_NAME,"info",null,next.getBytes("UTF-8"));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消费者01

    public class ConsumerDirectLogs01 {
        public static  final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明一个队列 
            channel.queueDeclare("console",false,false,false,null);
            /**
             * 交换机与堆绑定
             * 队列名
             * 交换机名
             * 绑定策略
             */
            channel.queueBind("console",EXCHANGE_NAME,"info");
            //多重绑定
            channel.queueBind("console",EXCHANGE_NAME,"warning");
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("消费者01接收到的消息"+new String(message.getBody(),"UTF-8"));
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            channel.basicConsume("console",true,deliverCallback,cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    消费者02与消费者01类似,只有队列名和绑定策略进行了改变。

    channel.queueBind("disk",EXCHANGE_NAME,"info");
    
    • 1

    结论:这种情况下,只有与生产者发送时routingkey一致的队列,才能够接收到消息。

    Topic交换机

    扇出交换机和直接交换机有个弊端,当指定路由策略时,无法使多个rountingkey策略不一样的队列接收到同一个消息,扇出交换机的队列的rountingkey策略为空串,其实还是一样的策略。
    主题交换机的rountingkey不能随意写,有书写规范,它必须是一个单词(无论这个单词是否正确,是否有意义)列表,以点好分开。比如“aa.sda.klj”,或者“skd.d”。
    有两个需要注意:
    (星号)可以代替一个单词
    #(井号)可以代替零个或多个单词
    此时routingkey策略就可以进行兼容,比如生产者执行的rountingkey为:aa.sd。而消费者1队列的routingkey
    .sd,消费者2队列的routingkey为aa.#或者aa.星号,此时消费者1和消费者2,都能够接收到生产者发送的消息
    在这里插入图片描述
    如图所示,交换机名:topic。
    队列que01,的绑定模式:*.orange.*
    对列que02,的绑定模式有两种:*.*.rabbit、lazy.#
    各个绑定模式的意思

             /**
             * Q1--绑定的是  
             *           中间带orange,带三个单词的字符串(*.orange.*)
             * Q2--绑定的是
             *           最后一个单词是rabbit的三个单词(*.*.rabbit)
             *           第一个单词是lazy的多个单词
             */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    接收情况:

    routingKey              
    quick.orange.rabbit  被队列Q1、Q2接收到
    lazy.orange.elephant 被队列Q1、Q2接收到
    quick.orange.fox  被队列Q1接收到
    lazy.brown.fox 被队列Q2接收到
    lazy.pink.rabbit 虽然满足两个绑定,但只被队列Q2接收一次
    quick.brown.fox  不匹配任何绑定,不会被任何队列接收到,会被丢弃
    qucik.orange.male.rabbit 是四个单词不匹配任何绑定,会被丢弃
    lazy.orange.male.rabbit 是四个单词但匹配Q2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    示例代码:
    生产者,发送消息,并指定路由策略
    消费者,消费对应路由策略的消息。
    生产者:
    推送消息到指定交换机,并且指定路由策略以及推送的消息

    /**
     * 声明主题交换机,以及相关队列
     * 生产者
     */
    public class Producer {
    
       //交换机名称
        public static  final String EXCHANGE_NAME = "topic_logs";
    
        //接收消息
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            /**
             * Q1--绑定的是
             *           中间带orange,带三个单词的字符串(*.orange.*)
             * Q2--绑定的是
             *           最后一个单词是rabbit的三个单词("*.*.rabbit)
             *           第一个单词是lazy的多个单词(lazy.#)
             */
            Map bindMap = new HashMap<>();
            bindMap.put("quick.orange.rabbit","被队列Q1、Q2接收到");
            bindMap.put("lazy.orange.elephant","被队列Q1、Q2接收到");
            bindMap.put("quick.orange.fox","被队列Q1接收到");
            bindMap.put("lazy.brown.fox","被队列Q2接收到");
            bindMap.put("lazy.pink.rabbit","虽然满足两个绑定,但只被队列Q2接收一次");
            bindMap.put("quick.brown.fox","不匹配任何绑定,不会被任何队列接收到,会被丢弃");
            bindMap.put("qucik.orange.male.rabbit","是四个单词不匹配任何绑定,会被丢弃");
            bindMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
            Set> entries = bindMap.entrySet();
    
            for(Map.Entry entry :bindMap.entrySet()){
                String routingkey = entry.getKey();
                String message = entry.getValue();
                channel.basicPublish(EXCHANGE_NAME,routingkey,null,message.getBytes("UTF-8"));
                System.out.println("生产者发送消息:"+message);
            }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    消费者C1
    声明消费的交换机,以及交换机类型,声明消费的队列,以及此消费者,消费的交换机和队列、以及二者之间的绑定策略。

    /**
     * 声明主题交换机,以及相关队列
     * 消费者C1
     */
    public class Topic01 {
    
        //交换机名称
        public static  final String EXCHANGE_NAME = "topic_logs";
    
        //接收消息
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            //声明队列
            String que01 = "Q1";
            channel.queueDeclare(que01,false,false,false,null);
            channel.queueBind(que01,EXCHANGE_NAME,"*.orange.*");
            System.out.println("等待接受消息.........");
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("接收到的消息"+message.getBody());
                System.out.println("接收到的队列"+que01+"绑定键"+message.getEnvelope().getRoutingKey());
    
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            //接收消息
            channel.basicConsume(que01,true,deliverCallback,cancelCallback);
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    消费者C2

    /**
     * 声明主题交换机,以及相关队列
     * 消费者C2
     */
    public class Topic02 {
    
        //交换机名称
        public static  final String EXCHANGE_NAME = "topic_logs";
    
        //接收消息
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            //声明队列
            String que02 = "Q2";
            channel.queueDeclare(que02,false,false,false,null);
            channel.queueBind(que02,EXCHANGE_NAME,"*.*.rabbit");
            channel.queueBind(que02,EXCHANGE_NAME,"lazy.#");
            System.out.println("等待接受消息.........");
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("接收到的消息"+message.getBody());
                System.out.println("接收到的队列"+que02+"绑定键"+message.getEnvelope().getRoutingKey());
    
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            //接收消息
            channel.basicConsume(que02,true,deliverCallback,cancelCallback);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    输出结果:消费者消费对应路由策略的消息。

  • 相关阅读:
    电脑小白快来!这有电脑常见故障解决方法
    RabbitMQ-死信队列
    python动态属性的应用场景
    Greetings(状压DP,枚举子集转移)
    【头歌实验】四、Python分支结构
    2023-09-16力扣每日一题-经典DP
    车载网关通信能力解析——SV900-5G车载网关推荐
    680. 验证回文串 II-先删后验
    独立站FP收款黑科技来啦!再也不用担心账户被封了~
    认识matlab
  • 原文地址:https://blog.csdn.net/cz_chen_zhuo/article/details/127720026