• RabbitMQ:路由模式



    📃个人主页:不断前进的皮卡丘
    🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
    🔥个人专栏:消息中间件

    1.基本介绍

    image.png
    在路由工作模式中,我们需要配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,由消费者进行消费。

    • P:生产者,向交换机发送消息的时候,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列
    • C1:消费者,它所在队列指定了需要routing key为error的信息
    • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

    路由模式的特点
    队列和交换机的绑定是需要指定routing key的,不可以随意绑定
    消息的发送方向交换机发送消息的时候,也需要指定消息的routing key
    交换机不再把消息交给每一个绑定的队列,而是根据消息的routing key来进行判断,只有队列的routing key和消息的routing key完全一样才会接收到消息。


    2.生产者

    public class Producer {
        public static String DIRECT_EXCHANGE = " direct_exchange";
        public static String DIRECT_QUEUE_1 = "direct_queue_1";
        public static String DIRECT_QUEUE_2 = "direct_queue_2";
    
        public static void main(String[] args) {
            try {
                Channel channel = ConnectUtil.getChannel();
                //声明交换机(交换机名称,交换机类型)
                channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
                //声明队列
                channel.queueDeclare(DIRECT_QUEUE_1,true,false,false,null);
                channel.queueDeclare(DIRECT_QUEUE_2,true,false,false,null);
                //把交换机和队列1进行绑定
                channel.queueBind(DIRECT_QUEUE_1,DIRECT_EXCHANGE,"error");
                //把交换机和队列2进行绑定
                channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"info");
                channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"error");
                channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"warning");
                //发送消息
                String msg="日志信息:调用了xxx方法,日志级别是info";
                 channel.basicPublish(DIRECT_EXCHANGE,"info",null,msg.getBytes());
                System.out.println("消息发送成功");
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3.消费者

    消费者1

    public class Consumer1 {
        public static void main(String[] args) {
    
            try {
                //获取信道对象
                Channel channel = ConnectUtil.getChannel();
                //消费消息
                DefaultConsumer consumer=new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消费者1接收到消息:"+new String(body,"UTF-8"));
                        System.out.println("消费者1把日志信息保存到数据库");
                    }
                };
                channel.basicConsume(Producer.DIRECT_QUEUE_1,true,consumer);
    
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    消费者2

    public class Consumer2 {
        public static void main(String[] args) {
    
            try {
                //获取信道对象
                Channel channel = ConnectUtil.getChannel();
                //消费消息
                DefaultConsumer consumer=new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消费者2接收到消息:"+new String(body,"UTF-8"));
                        System.out.println("消费者2把日志信息输出到控制台");
                    }
                };
                channel.basicConsume(Producer.DIRECT_QUEUE_2,true,consumer);
    
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    4.测试

    image.png
    image.png
    第一次测试,发送日志级别为info的信息
    image.png
    image.png
    image.png
    第二次测试,发送日志级别为error的信息
    image.png
    image.png

  • 相关阅读:
    图神经网络 异常检测,神经网络显著性检测
    【数据聚类】第八章第一节:谱聚类算法概述及拉普拉斯矩阵
    k8s自动化运维八-如何清理docker存储的大文件
    事务提交之后再执行某些操作 → 你有哪些实现方式?
    如何利用Api接口获取手机当前的网络位置信息
    【javaweb】学习日记Day10 - tlias智能管理系统 - 部门、员工管理功能开发
    JS深入理解立即执行函数,js匿名函数()
    VScode 中 CRLF 和 LF 兼容问题,报错原因及解决方案
    8、Linux 网络编程
    【附源码】计算机毕业设计SSM社区便捷管理系统
  • 原文地址:https://blog.csdn.net/qq_52797170/article/details/127183693