• 【RabbitMQ】——路由模式(Direct)


    一、简介

    1. 路由模式

    在前边的文章中已经介绍了rabbitmq的简单模式和工作模式和发布订阅模式,本文将介绍 路由模式。
    路由模式就是通过队列绑定不同routingKey ,进行消费不同的消息。
    与发布订阅模式采用相比,路由模式采用BuiltinExchangeType.DIRECT类型的交换机。

    生产者端:声明BuiltinExchangeType.DIRECT类型的交换机。然后在发送消息时,设定不同的routingKey。

     /**
     * 声明一个交换机
     */
     channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType. DIRECT);
    
    • 1
    • 2
    • 3
    • 4
       /**
        * 发送消息
        * param1 发送到哪个交换机
        * param2 routingKey
        * param3 其他参数信息
        * param4 发送的消息体
        */
       channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消费者端:不同消费者绑定不同的routingkey

    /**
     * 绑定交换机和队列
     * param1 队列名称
     * param2 交换机名称
     * param3 routingkey
     */
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_INFO);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2. 多重绑定

    在这里插入图片描述
    如果exchange的绑定类型是direct,但是它绑定的多个队列的routingKey如果都相同,在这种情况下虽然绑定类型是direct但是它表现的就和fanout有点类似了,就跟广播差不多,如上图所示。

     /**
     * 绑定交换机和队列
     * param1 队列名称
     * param2 交换机名称
     * param3 routingkey
     */
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_INFO);
    /**
     * 多重绑定
     */
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_WARNING);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3. 实现

    通过控制台输入不同的routingKey 和message ,向不同的队列发送消息。从而实现路由模式.

    <1> 生产者LogProducerDirect
    package com.rabbitmqDemo.rabbitmq.six;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 将消息发送到交换机
     */
    public class LogProducerDirect {
    
        private static final String EXCHANGE_NAME = "logs_direct_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 声明一个交换机
             */
             channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType. DIRECT);
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String inputString = scanner.next();
                String[] splitStr = inputString.split("-");
                String routingKey = splitStr[0];
                String message = splitStr[1];
                /**
                 * 发送消息
                 * param1 发送到哪个交换机
                 * param2 routingKey
                 * param3 其他参数信息
                 * param4 发送的消息体
                 */
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println("message send end : " + message);
    
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    <2> 消费者Logworker03
    package com.rabbitmqDemo.rabbitmq.six;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 将接收到的消息打印到控制台
     */
    public class Logworker03 {
    
        private static final String EXCHANGE_NAME = "logs_direct_exchange";
        private static final String QUEUE_NAME = "logs_direct_console";
        private static final String ROUTING_KEY_INFO = "info";
        private static final String ROUTING_KEY_WARNING = "warning";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 声明一个交换机
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            /**
             * 声明一个队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            /**
             * 绑定交换机和队列
             * param1 队列名称
             * param2 交换机名称
             * param3 routingkey
             */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_INFO);
            /**
             * 多重绑定
             */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_WARNING);
    
            System.out.println("wait receive message ,print message to console... ");
    
            //声明 消费者成功消费的回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Logworker03-message:" + new String(message.getBody(), "UTF-8"));
            };
            //声明 取消消息时的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("Logworker03-消息消费被中断-" + consumerTag);
            };
    
            /**
             * 消费者消费消息
             * param1 队列名称
             * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
             * param3 消费者成功消费的回调
             * param4 消费者取消消费回调
             */
            System.out.println("Logworker03等待接收消息......");
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    <3> 消费者Logworker04
    package com.rabbitmqDemo.rabbitmq.six;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 将接收到的消息打印到控制台
     */
    public class Logworker04 {
    
        private static final String EXCHANGE_NAME = "logs_direct_exchange";
        private static final String QUEUE_NAME = "logs_direct_disk";
        private static final String ROUTING_KEY_ERROR = "error";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 声明一个交换机
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            /**
             * 声明一个队列
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            /**
             * 绑定交换机和队列
             * param1 队列名称
             * param2 交换机名称
             * param3 routingkey
             */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
    
            System.out.println("wait receive message ,print message to disk... ");
    
            //声明 消费者成功消费的回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Logworker04-message:" + new String(message.getBody(), "UTF-8"));
            };
            //声明 取消消息时的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("Logworker04-消息消费被中断-" + consumerTag);
            };
    
            /**
             * 消费者消费消息
             * param1 队列名称
             * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
             * param3 消费者成功消费的回调
             * param4 消费者取消消费回调
             */
            System.out.println("Logworker04等待接收消息......");
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
  • 相关阅读:
    WebDriverManager自动管理浏览器Driver包
    HUAWEI华为MateBook X Pro 2022 12代酷睿版(MRGF-16)笔记本电脑原装出厂Windows11系统工厂模式含F10还原
    【C++】STL——priority_queue的使用及模拟实现
    c++ 智能指针
    Transformer是如何进军点云学习领域的?
    Oracle/PLSQL: BFilename Function
    【2022杭电多校1】2022“杭电杯”中国大学生算法设计超级联赛(1)
    DocuWare文档管理系统保护客户数据,改进审计流程
    unicode/utf8/utf16/utf32笔记
    朝着“强国建设民族复兴”之路奋勇前行
  • 原文地址:https://blog.csdn.net/qq_42000631/article/details/126343068