• RabbitMQ入门案例之Direct模式


    前言

    RabbitMQ的Direct模式是一种可以根据指定路由key,Exchang将消息发送到具有该路由key下的Queue下进行存储。也就类似于将数据写进指定数据库表中。这个路由Key可以类比为SQL语句中的:where routeKey = …

    官方文档地址:https://www.rabbitmq.com/getstarted.html

    什么是Direct模式

    RabbitMQ中的Direct模式是一种消息传输模式,通常使用Direct Exchange(直连交换机)实现。

    在Direct模式中,生产者将消息发送到交换机,并指定消息的Routing Key(路由键)。交换机会将Routing Key与队列绑定进行匹配,如果匹配成功,则将该消息路由到对应的队列中。如果没有匹配成功,该消息将被丢弃或返回给生产者。在Direct模式中,每个消息只能被一个消费者接收。

    Direct模式常用于一对一的场景,例如订单管理系统中将订单分配给特定的处理队列。

    通过使用Exchange和Routing Key来进行消息传输,Direct模式实现了消息的有选择性地路由,提高了消息传输的效率,减少了系统负载。
    在这里插入图片描述

    实操

    实操准备工作

    在开始使用代码进行操作前,我们先到管理界面构造一个Direct交换机,如下图:
    在这里插入图片描述
    为其绑定Queue,同时设置这个Queue的route key,如下图:
    在这里插入图片描述
    最终绑定结果:
    在这里插入图片描述
    既然交换机和队列已经准备好,接下来就是准备依赖与代码了

    
    <dependency>
    	<groupId>com.rabbitmqgroupId>
    	<artifactId>amqp-clientartifactId>
    	<version>5.10.0version>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    生产者代码

    public class Producer {
        public static void main(String[] args) {
            // 1: 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 设置连接属性
            connectionFactory.setHost("ip地址");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            Connection connection = null;
            Channel channel = null;
            try {
                // 3: 从连接工厂中获取连接
                connection = connectionFactory.newConnection("生产者");
                // 4: 从连接中获取通道channel
                channel = connection.createChannel();
                // 6: 准备发送消息的内容
                String message = "宇宙无敌爱学习";
                String  exchangeName = "direct_exchange";
                String routingKey1 = "class";
                String routingKey2 = "student";
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
                channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
                System.out.println("消息发送成功!");
            } catch (Exception ex) {
                ex.printStackTrace();
                System.out.println("发送消息出现异常...");
            } finally {
                // 7: 释放连接关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    消费者代码

    public class Consumer {
        private static Runnable runnable = () -> {
            // 1: 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 设置连接属性
            connectionFactory.setHost("ip地址");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 3: 从连接工厂中获取连接
                connection = connectionFactory.newConnection("生产者");
                // 4: 从连接中获取通道channel
                channel = connection.createChannel();
                // 5: 申明队列queue存储消息
                /*
                 *  如果队列不存在,则会创建
                 *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
                 * */
                // 这里如果queue已经被创建过一次了,可以不需要定义
                //channel.queueDeclare("queue1", false, false, false, null);
                // 6: 定义接受消息的回调
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
                System.out.println(queueName + ":开始接受消息");
                System.in.read();
            } catch (Exception ex) {
                ex.printStackTrace();
                System.out.println("发送消息出现异常...");
            } finally {
                // 7: 释放连接关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        };
        public static void main(String[] args) {
            // 启动三个线程去执行
            new Thread(runnable, "queue1").start();
            new Thread(runnable, "queue2").start();
            new Thread(runnable, "queue3").start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    在生产者代码中,我们定义了两个route key,如下图:
    在这里插入图片描述
    在这两个路由key的驱使下,生产者的消息便只会被放到我们刚刚在direct_exchange交换机中具有这两个路由key的Queue中,我们来执行代码验证一下。

    生产者执行结果
    在这里插入图片描述
    管理界面效果
    在这里插入图片描述
    可以看出,消息就只放进了queue2和queue3中,这是符合我们预期的。
    消费者执行结果,如下:在这里插入图片描述
    管理界面效果:
    在这里插入图片描述
    可以看出,消息也被成功取出去。

    以上便是Direct模式的全部内容,仅个人笔记使用
    感谢阅读
  • 相关阅读:
    tsdx 打包ts项目
    CompassArena 司南大模型测评--代码编写
    当中国走进全球化的“深水区”,亚马逊云科技解码云时代的中国式跃升
    让技术文档网站看起来更专业
    [计算机网络实验]头歌 实验二 以太网帧、IP报文分析(含部分分析)
    Yii 实现乐观锁和悲观锁
    WPF2022终结版系列课程笔记 1 WPF 基本布局
    redis探索之缓存击穿、缓存雪崩、缓存穿透
    MySQL中的视图
    python3 词频统计计数分析+可视化词云 jieba+wordcloud 数据分析
  • 原文地址:https://blog.csdn.net/weixin_59216829/article/details/131143044