• RibbitMQ学习笔记之交换机实战



    总目录如下:
    1. RibbitMQ学习笔记之MQ 的相关概念
    2. RibbitMQ学习笔记之MQ练习
    3. RibbitMQ学习笔记之MQ练习
    4. RibbitMQ学习笔记之MQ发布确认
    5. RibbitMQ学习笔记之交换机实战
    6. RibbitMQ学习笔记之死信队列
    7. RibbitMQ学习笔记延迟队列
    8. RibbitMQ学习笔记之发布确认高级
    9. RibbitMQ学习笔记之 RabbitMQ 其他知识点扩展
    10. RibbitMQ学习笔记之RabbitMQ 集群


    5. 交换机

    在这里插入图片描述
    在这里插入图片描述

    在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为 ”发布/订阅”.

    为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消者;

    5.1. Exchanges

    5.1.1. Exchanges 概念

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列(之前写的时候 是发送了一个默认的交换机械的null)。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

    相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

    在这里插入图片描述

    5.1.2. Exchanges 的类型

    总共有以下类型:

    直接(direct){路由类型}, 主题(topic) ,标题(headers) , 扇出(fanout){发布订阅}

    5.1.3. 无名 exchange 无名类型

    在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。

    在这里插入图片描述

    第一个参数是交换机的名称。空字符串表示默认或无名称交换机**:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话**

    5.2. 临时队列

    之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。

    每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

    创建临时队列的方式如下: 创建临时队列的 名字
    String queueName = channel.queueDeclare().getQueue();

    创建出来之后长成这样:
    在这里插入图片描述

    AD ,excl 是表示临时的队列

    5.3. 绑定(bindings)

    什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
    在这里插入图片描述

    5.4. Fanout

    5.4.1. Fanout 介绍

    Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型

    在这里插入图片描述

    5.4.2. Fanout 实战

    在这里插入图片描述

    Logs 和临时队列的绑定关系如下图
    在这里插入图片描述

    ReceiveLogs01 将接收到的消息打印在控制台

    **public class ReceiveLogs01 {

    private static final String EXCHANGE_NAME = “logs”; public static void main(String[] argv) throws Exception {

    Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);

    /**

    • 生成一个临时的队列 队列的名称是随机的

    • 当消费者断开和该队列的连接时 队列自动删除

    */
    String queueName = channel.queueDeclare().getQueue();

    //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, “”);

    System.out.println("等待接收消息,把接收到的消息打印在屏幕 … "); DeliverCallback deliverCallback = (consumerTag, delivery) ->

    { String message = new String(delivery.getBody(), “UTF-8”);

    System.out.println(“控制台打印接收到的消息”+message);
    };

    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
    }**

    ReceiveLogs02 将接收到的消息存储在磁盘

    public class ReceiveLogs02 {
     
    private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception {
     
    Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
     
    /**
    * 生成一个临时的队列 队列的名称是随机的
     
    * 当消费者断开和该队列的连接时 队列自动删除
     
    */
    String queueName = channel.queueDeclare().getQueue();
     
    //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, "");
     
    System.out.println("等待接收消息,把接收到的消息写到文件 ........... "); DeliverCallback deliverCallback = (consumerTag, delivery) ->
     
    { String message = new String(delivery.getBody(), "UTF-8"); File file = new File("C:\\work\\rabbitmq_info.txt"); FileUtils.writeStringToFile(file,message,"UTF-8");
     
    System.out.println("数据写入文件成功");
    };
     
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    EmitLog 发送消息给两个消费者接收

    
    public class EmitLog {
     
    private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception {
    try (Channel channel = RabbitUtils.getChannel()) {
     
    /**
    *  声明一个 exchange
    *  1.exchange 的名称
     
    *  2.exchange 的类型
     
    */
     
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner sc = new Scanner(System.in);
     
    System.out.println("请输入信息");
     
    while (sc.hasNext()) {
    String message = sc.nextLine();
     
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message);
     
    }
    }
    }
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    自己的代码

    package com.atguigu.rabbitmq.five;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmCallback;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveLogs01 {
    
    
        //交换机的名字
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
            //声明一个交换机
            /**
             * 1.交换机的名字
             * 2.类型 fanout 主要练习的就是他
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            /**
             * 声明一个队列  临时的(队列的名称随机的)
             * 连接的时候就在 不连接的时候就不在了
             * 断开队列的时候自动删除
             */
    
            String queueName = channel.queueDeclare().getQueue();
            /**
             * 绑定交换机与队列
             * 1.队列名称
             * 2.交换机的名称
             * 3.routingkey 空 可以空
             */
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println("等待接受消息把接受到的消息打印到屏幕上面....");
    
            //接受消息
            DeliverCallback deliverCallback = (var1, var2) ->
            {
    
                System.out.println("ReceiveLogs01控制台打印接口道的消息:-[]" + new String(var2.getBody(), "utf-8"));
            };
    
    
            //消费者取消消息时回调的接口
            channel.basicConsume(queueName, true, deliverCallback, var1 -> {
            }, null);
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
     package com.atguigu.rabbitmq.five;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveLogs02 {
    
    
        //交换机的名字
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
            //声明一个交换机
            /**
             * 1.交换机的名字
             * 2.类型 fanout 主要练习的就是他
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            /**
             * 声明一个队列  临时的(队列的名称随机的)
             * 连接的时候就在 不连接的时候就不在了
             * 断开队列的时候自动删除
             */
    
            String queueName = channel.queueDeclare().getQueue();
            /**
             * 绑定交换机与队列
             * 1.队列名称
             * 2.交换机的名称
             * 3.routingkey 空 可以空
             */
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println("等待接受消息把接受到的消息打印到屏幕上面....");
    
            //接受消息
            DeliverCallback deliverCallback = (var1, var2) ->
            {
    
                System.out.println("ReceiveLogs02控制台打印接口道的消息:-[]" + new String(var2.getBody(), "utf-8"));
            };
    
    
            //消费者取消消息时回调的接口
            channel.basicConsume(queueName, true, deliverCallback, var1 -> {
            }, null);
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    package com.atguigu.rabbitmq.five;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 发消息给交换机
     */
    public class EmitLog {
        //交换机的名字
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
            //交互机
           // channel.exchangeDeclare(EXCHANGE_NAME,"fauout");
            Scanner scanner = new Scanner(System.in);
    
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));
                System.out.println("生产者发出的消息是什么"+message);
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    5.5. Direct exchange 路由交换机

    5.5.1. 回顾

    在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

    我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:

    队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的
    意义由其交换类型决定。

    5.5.2. Direct exchange 介绍

    在这里插入图片描述

    上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。

    在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

    在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

    5.5.3. 多重绑定

    在这里插入图片描述

    当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

    5.5.4. 实战

    在这里插入图片描述
    在这里插入图片描述

    public class ReceiveLogsDirect01 {
     
    private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception {
     
    Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "disk";
     
    channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "error");
     
    System.out.println("等待接收消息 ........... ");
     
    DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");
     
    message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message; File file = new File("C:\\work\\rabbitmq_info.txt"); FileUtils.writeStringToFile(file,message,"UTF-8");
     
    System.out.println("错误日志已经接收");
     
    };
     
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    public class ReceiveLogsDirect02 {
     
    private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception {
    Channel channel = RabbitUtils.getChannel();
     
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "console";
     
    channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning");
     
    System.out.println("等待接收消息 ........... ");
     
    DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8");
     
    System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);
     
    };
     
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
     
    }
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    public class EmitLogDirect {
     
    private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception {
     
    try (Channel channel = RabbitUtils.getChannel())
     
    { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
     
    //创建多个 bindingKey
     
    Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info","普通 info 信息"); bindingKeyMap.put("warning","警告 warning 信息"); bindingKeyMap.put("error","错误 error 信息");
     
    //debug 没有消费这接收这个消息 所有就丢失了
     
    bindingKeyMap.put("debug","调试 debug 信息");
     
    for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey();
     
    String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
     
    message.getBytes("UTF-8"));
     
    System.out.println("生产者发出消息:" + message);
     
    }
    }
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    自己写的

    package com.atguigu.rabbitmq.six;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveLogsDirect01 {
    
    
        public static final String EXCHANGE_NAME="direct_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
    
            //声明一个交换机
            /**
             * 1.交换机的名字
             * 2.类型 fanout 主要练习的就是他
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //声明一个队列
            channel.queueDeclare("console",false,false,false,null);
    
            channel.queueBind("console",EXCHANGE_NAME,"info");
            channel.queueBind("console",EXCHANGE_NAME,"warning");
    
    
            //接受消息
            DeliverCallback deliverCallback = (var1, var2) ->
            {
    
                System.out.println("ReceiveLogsDirect01控制台打印接口道的消息:-[]" + new String(var2.getBody(), "utf-8"));
            };
    
    
            //消费者取消消息时回调的接口
            channel.basicConsume("console", true, deliverCallback, var1 -> {
            }, null);
    
    
        }
    
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    package com.atguigu.rabbitmq.six;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveLogsDirect02 {
    
    
        public static final String EXCHANGE_NAME="direct_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
    
            //声明一个交换机
            /**
             * 1.交换机的名字
             * 2.类型 fanout 主要练习的就是他
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //声明一个队列
            channel.queueDeclare("disk",false,false,false,null);
    
            channel.queueBind("disk",EXCHANGE_NAME,"error");
    
    
            //接受消息
            DeliverCallback deliverCallback = (var1, var2) ->
            {
    
                System.out.println("ReceiveLogsDirect02控制台打印接口道的消息:-[]" + new String(var2.getBody(), "utf-8"));
            };
    
    
            //消费者取消消息时回调的接口
            channel.basicConsume("disk", true, deliverCallback, var1 -> {
            }, null);
    
    
        }
    
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    package com.atguigu.rabbitmq.six;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    public class DirectLogs {
    
    
        //交换机的名字
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
            //交互机
            // channel.exchangeDeclare(EXCHANGE_NAME,"fauout");
            Scanner scanner = new Scanner(System.in);
    
            while (scanner.hasNext()) {
                String message = scanner.next();
               // channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("utf-8"));
              //  channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("utf-8"));
                channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("utf-8"));
    
                System.out.println("生产者发出的消息是什么" + message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在这里插入图片描述

    在这里插入图片描述

    5.6. Topics 主题交换机

    5.6.1. 之前类型的问题

    在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。

    尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型

    5.6.2. Topic 的要求

    发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单

    词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

    在这个规则列表中,其中有两个替换符是大家需要注意的

    *(星号)可以代替一个单词

    #(井号)可以替代零个或多个单词

    5.6.3. Topic 匹配案例

    下图绑定关系如下

    Q1–>绑定的是

    中间带 orange 带 3 个单词的字符串(.orange.)

    Q2–>绑定的是

    最后一个单词是 rabbit 的 3 个单词(..rabbit)

    第一个单词是 lazy 的多个单词(lazy.#)
    在这里插入图片描述

    上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

    quick.orange.rabbit被队列 Q1Q2 接收到
    lazy.orange.elephant被队列 Q1Q2 接收到
    quick.orange.fox被队列 Q1 接收到
    lazy.brown.fox被队列 Q2 接收到
    lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
    quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
    quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
    lazy.orange.male.rabbit是四个单词但匹配 Q2

    当队列绑定关系是下列这种情况时需要引起注意

    当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了

    如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

    5.6.4. 实战

    在这里插入图片描述

    public class EmitLogTopic {
     
    private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception
     
    { try (Channel channel = RabbitUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic");
     
    /**
    *  Q1-->绑定的是
    *       中间带 orange 带 3 个单词的字符串(*.orange.*)
     
    *  Q2-->绑定的是
    *       最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
    *       第一个单词是 lazy 的多个单词(lazy.#)
     
    *
    */
     
    Map<String, String> bindingKeyMap = new HashMap<>(); 
    bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
     bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
      bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); 
      bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
    bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
     bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
      bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
       bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
     
    for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey();
     
    String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
     
    message.getBytes("UTF-8"));
     
    System.out.println("生产者发出消息" + message);
    }
    }
     
    }
     
    }
    
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    public class ReceiveLogsTopic01 {
     
    private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception {
     
    Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic");
     
    //声明 Q1 队列与绑定关系
     
    String queueName="Q1";
     
    channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
     
    System.out.println("等待接收消息 ........... ");
    DeliverCallback deliverCallback = (consumerTag, delivery) ->
     
    { String message = new String(delivery.getBody(), "UTF-8");
     
    System.out.println("           接           收          队           列          :"+queueName+"                                                        绑           定
     
    键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); };
     
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
     
    }
     
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    public class ReceiveLogsTopic02 {
     
    private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception {
     
    Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    //声明 Q2 队列与绑定关系
     
    String queueName="Q2";
     
    channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
     
    System.out.println("等待接收消息 ........... ");
    DeliverCallback deliverCallback = (consumerTag, delivery) ->
     
    { String message = new String(delivery.getBody(), "UTF-8");
     
    System.out.println("           接           收          队           列          :"+queueName+"                                                        绑           定
     
    键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); };
     
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
     
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    自己写的

    package com.atguigu.rabbitmq.server;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.Channel;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 生产者
     */
    public class EmitLogTopic {
    
    
        //交换机的名称
        public static final String EXCHANGE_NAME = "topic_logs";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
    
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
            bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
    
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                String bindingKey = bindingKeyEntry.getKey();
    
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
    
                        message.getBytes("UTF-8"));
    
                System.out.println("生产者发出消息" + message);
    
    
            }
        }
        }
    
     package com.atguigu.rabbitmq.server;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.sun.corba.se.spi.legacy.interceptor.ORBInitInfoExt;
    
    import java.io.IOException;
    import java.net.BindException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 声明主题交换机相关的队列
     *
     * 消费者C1
     */
    public class ReceiveLogTopic01 {
    
        //交换机的名称
        public static final String EXCHANGE_NAME="topic_logs";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
    
         //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列名字
            String queueName="Q1";
            channel.queueDeclare(queueName,false,false,false,null);
            /**
             *
             * 1.队列名字
             *  2.交换机的名字
             *  3.key的值
             */
            channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
    
            System.out.println("等待接受消息");
    
            DeliverCallback deliverCallback= (var1,var2)->{
                System.out.println(new String(var2.getBody(),"utf-8"));
                System.out.println("接受队列"+queueName+"绑定建"+var2.getEnvelope().getRoutingKey());
            };
    
            //接受消息
            channel.basicConsume(queueName,true,deliverCallback,var1->{});
    
    
        }
    
    }
    
    package com.atguigu.rabbitmq.server;
    
    import com.atguigu.rabbitmq.utils.RabbitMqutils;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 声明主题交换机相关的队列
     *
     * 消费者C2
     */
    public class ReceiveLogTopic02 {
    
        //交换机的名称
        public static final String EXCHANGE_NAME="topic_logs";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqutils.getChannel();
    
         //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列名字
            String queueName="Q2";
            channel.queueDeclare(queueName,false,false,false,null);
            /**
             *
             * 1.队列名字
             *  2.交换机的名字
             *  3.key的值
             */
            channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
            channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
            System.out.println("等待接受消息");
    
            DeliverCallback deliverCallback= (var1,var2)->{
                System.out.println(new String(var2.getBody(),"utf-8"));
                System.out.println("接受队列"+queueName+"绑定建"+var2.getEnvelope().getRoutingKey());
            };
    
            //接受消息
            channel.basicConsume(queueName,true,deliverCallback,var1->{});
    
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154

    在这里插入图片描述
    在这里插入图片描述在这里插入图片描述

  • 相关阅读:
    “对症下药”,高效控价——直击低价成因
    【LeetCode刷题-滑动窗口】--1658.将x减到0的最小操作数
    从0开始学人工智能测试节选:Spark -- 结构化数据领域中测试人员的万金油技术(四)
    2023年中国玉米淀粉糖市场现状及行业需求前景分析[图]
    OpenCV中常用的代码
    超硬核,拒绝内卷全靠阿里大能整理的这份 Java 核心手册,堪称强无敌,谁来不说一声牛 AC
    金融业信贷风控算法2-初等统计理论
    力扣刷题day41|198打家劫舍、213打家劫舍II、337打家劫舍III
    leetcode 268. 丢失的数字(异或!!)
    1.5 - 逻辑运算
  • 原文地址:https://blog.csdn.net/qq_42055933/article/details/127399520