• 202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型


    ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型

    就是声明一个 fanout 类型的 Exchange 来分发消息。消费者进行消费
    fanout 类型就是广播模式

    fanout 类型 的 Exchange 不会判断消息的路由key,直接将消息分发给绑定到该Exchange的所有队列。

    生产者发送一条消息到fanout类型的Exchange后,绑定到该Exchange的所有队列都会收到该消息的一条副本,
    而消费者也能分别从不同的队列中读取消息,互不干扰。

    ▲ fanout类型的Exchange可以很好地模拟JMS的Pub-Sub消息模型。

    在这里插入图片描述

    代码演示:

    都是在前面一篇的代码基础上修改的。
    需求:使用 fanout 类型的Exchange ,实行发布-订阅的功能,其实就是创建一个生产者和两个消费者,实现广播模式的消息分发。

    在这里插入图片描述

    生产者:producer

    在生产者中声明Exchange ,然后声明两个消息队列 Queue,
    然后给这个Exchange 绑定 这个两个Queue
    在这里插入图片描述

    在这里插入图片描述

    消费者:Consumer01

    两个消费者的代码没啥区别,
    消费方法的参数 autoAck 都是true, 都是自动确认消费。
    两个消费者各自消费自己指定的消息队列。

    在这里插入图片描述

    在这里插入图片描述

    消费者:Consumer02

    在这里插入图片描述
    在这里插入图片描述

    测试结果

    消费生产者发送10条消息,两个消费者都能各自消费到10条消息就是正确的。

    消息生产者使用fanout这个广播的类型发送消息。
    在这里插入图片描述
    两个消费者都能消费到10条消息,正确。
    在这里插入图片描述

    完整代码

    ConnectionUtil

    package cn.ljh.rabbitmq.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //连接工具
    public class ConnectionUtil
    {
        //获取连接的方法
        public static Connection getConnection() throws IOException, TimeoutException
        {
            //创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来
            ConnectionFactory connectionFactory =  new ConnectionFactory();
            //设置连接信息
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("ljh");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("/"); //连接虚拟主机
            //从连接工厂获取连接
            Connection connection = connectionFactory.newConnection();
            //返回连接
            return connection;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    Publisher

    package cn.ljh.rabbitmq.producer;
    
    import cn.ljh.rabbitmq.consumer.Consumer01;
    import cn.ljh.rabbitmq.consumer.Consumer02;
    import cn.ljh.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    //消息生产者--使用fanout类型的exchange------就是广播模式
    public class Publisher
    {
        //常量:定义个Exchange的名字作为常量
        public static final String EXCHANGE_NAME = "myex01.fanout";
    
        public static void main(String[] args) throws IOException, TimeoutException
        {
            //1、创建连接
            Connection conn = ConnectionUtil.getConnection();
            //2、通过Connection获取Channel。
            Channel channel = conn.createChannel();
    
            //3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定
            channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange名字 */
                    BuiltinExchangeType.FANOUT,/* Exchange 类型 */
                    true,/* 是否持久化 */
                    false,/* 是否自动栅除 */
                    false,/* 是否为内部的 Exchange */
                    null /* 指定 Exchange 的额外属性 */
            );
    
            //声明多个消息队列------声明第1个消息队列
            channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null);
    
            //把 Exchange 和 Queue 绑定起来,绑定第一个消息队列
            channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME,
                    "" /* 因为Exchange 是fanout类型,所以无需 路由key */,
                    null /* 指定 Exchange 的额外属性 */);
    
            //声明第2个消息队列
            channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null);
    
            //把 Exchange 和 Queue 绑定起来,绑定第2个消息队列
            channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME,
                    "" /* 因为Exchange 是fanout类型,所以无需 路由key */,
                    null /* 指定 Exchange 的额外属性 */);
    
            //生产者发送10条消息
            for (int i = 1; i <= 10; i++)
            {
                String message = "生产者发送的第【 " + i + " 】条消息的内容";
    
                //4、调用Channel的basicPublish()方法发送消息
                channel.basicPublish(EXCHANGE_NAME /* 向这个 fanout类型的 Exchange 发送消息 */,
                        "" /* 因为 Exchange 是fanout 类型,所以有没有路由key都无所谓 */,
                        null /*指定额外的消息的属性*/,
                        message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/
                );
                System.out.println("生产者发送【 "+i+" 】条消息完成");
            }
            //5、关闭资源
            //关闭通道
            channel.close();
            //关闭连接
            conn.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    Consumer01

    package cn.ljh.rabbitmq.consumer;
    
    import cn.ljh.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //消息消费者1
    public class Consumer01
    {
        // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
        //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
        //(2)通过Connection获取Channel。
        //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
        //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
        //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。
    
        //常量
        public final static String QUEUE01 = "firstQueue";
    
        public static void main(String[] args) throws IOException, TimeoutException
        {
            //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
            Connection conn = ConnectionUtil.getConnection();
    
            //2、通过Connection获取Channel 消息通道
            Channel channel = conn.createChannel();
    
            //3、调用 Channel 的 queueDeclare() 方法声明队列,
            //   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
            channel.queueDeclare(QUEUE01, /* 声明的队列名 */
                    true,    /* 消息队列是否持久化 */
                    false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */
                    false, /* 是否自动删除 */
                    null   /* 指定消息队列额外的属性 */);
    
    
            //4、调用Channel 的 basicConsume()方法开始处理消费消息
            channel.basicConsume(
                    QUEUE01 /*消费这个消费队列里面的消息*/,
                    true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                    new DefaultConsumer(channel)
                    {
                        //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                                   AMQP.BasicProperties properties /*消息的那些属性*/,
                                                   byte[] body /*body:消息的消息体*/) throws IOException
                        {
                            //把消息体中的消息拿出来
                            String message = new String(body, "UTF-8");
                            //printf:格式化输出函数   %s:输出字符串  %n:换行
                            System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                    envelope.getExchange(),envelope.getRoutingKey(),message);
    
                        }
                    }
            );
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    Consumer02

    package cn.ljh.rabbitmq.consumer;
    
    import cn.ljh.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //消息消费者2
    public class Consumer02
    {
        // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
        //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
        //(2)通过Connection获取Channel。
        //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
        //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
        //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。
    
        //常量
        public final static String QUEUE02 = "secondQueue";
    
        public static void main(String[] args) throws IOException, TimeoutException
        {
            //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
            Connection conn = ConnectionUtil.getConnection();
    
            //2、通过Connection获取Channel 消息通道
            Channel channel = conn.createChannel();
    
            //3、调用 Channel 的 queueDeclare() 方法声明队列,
            //   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
            channel.queueDeclare(QUEUE02, /* 声明的队列名 */
                    true,    /* 消息队列是否持久化 */
                    false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */
                    false, /* 是否自动删除 */
                    null   /* 指定消息队列额外的属性 */);
    
            //4、调用Channel 的 basicConsume()方法开始处理消费消息
            channel.basicConsume(
                    QUEUE02 /*消费这个名字的消费队列里面的消息*/,
                    true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                    new DefaultConsumer(channel)
                    {
                        //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                                   AMQP.BasicProperties properties /*消息的那些属性*/,
                                                   byte[] body /*body:消息的消息体*/) throws IOException
                        {
                            //把消息体中的消息拿出来
                            String message = new String(body, "UTF-8");
                            //printf:格式化输出函数   %s:输出字符串  %n:换行
                            System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                    envelope.getExchange(),envelope.getRoutingKey(),message);
                        }
                    }
            );
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.ljh</groupId>
        <artifactId>rabbitmq_fanout</artifactId>
        <version>1.0.0</version>
        <name>rabbitmq_fanout</name>
    
        <!--  属性  -->
        <properties>
            <maven.compiler.source>11</maven.compiler.source>
            <maven.compiler.target>11</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>11</java.version>
        </properties>
    
        <!--  依赖  -->
        <dependencies>
            <!-- RabbitMQ 的依赖库 -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.13.0</version>
            </dependency>
    
        </dependencies>
    
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 相关阅读:
    Docker镜像制作
    Symmetric crypto AES
    【力扣10天SQL入门】Day7+8 计算函数
    免费开源音乐聚合软件-洛雪音乐助手
    在M1Mac上为GIMP安装G‘MIC插件
    CSS3 对齐方式
    java基础篇(1)
    第6/100天 阅读笔记
    智慧公厕擦手纸洗手液余量实时在线统计
    直接插入排序
  • 原文地址:https://blog.csdn.net/weixin_44411039/article/details/133789237