• rabbitmq完整学习-springboot整合rabbitmq


    文章目录

    MQ学习

    rabbitMq软件上传到liunx服务器

    基础准备

    1. linux的服务器-创建RabbitMq文件夹-并进入文件夹
     mkdir rabbitmq
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FAVDxMuM-1663250323428)(img\创建文件夹.png)]

    2. 上传需要的文件

    erlang-18.3-1.el7.centos.x86_64.rpm
    socat-1.7.3.2-5.el7.lux.x86_64.rpm
    rabbitmq-server-3.6.5-1.noarch.rpm

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fUg8vYcO-1663250323429)(img\上传文件到linux.png)]

    3. 上传成功

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ir39NTsq-1663250323429)(img\上传成功.png)]

    mq安装

    1.在线安装依赖环境:
    yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dnarnf7j-1663250323429)(img\安装日志.png)]

    2.安装Erlang
    # 安装
    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
    
    • 1
    • 2
    • 如果出现如下错误

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PBwr3Dzd-1663250323430)(img\错误.png)]

    • 说明gblic 版本太低。我们可以查看当前机器的gblic 版本
    strings /lib64/libc.so.6 | grep GLIBC
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Tk1zSpmC-1663250323430)(img\最高版本.png)]

    需要升级glibc

    • 使用yum更新安装依赖
    sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
    
    • 1
    • 安装rpm包
    sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
    
    • 1
    • 安装完毕后再查看glibc版本
    strings /lib64/libc.so.6 | grep GLIBC
    
    • 1
    3.安装RabbitMQ
    # 安装  -soca
    rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
    
    # 安装  -mq
    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    执行 出现如下错误:

    error: open of socat-1.7.3.2-1.1.el7.x86_64.rpm failed: No such file or directory
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PxNpoIvF-1663250323430)(img\错误2.png)]

    • 执行如下命令

      yum -y install tcp_wrappers
      
      • 1

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-roKNBuZL-1663250323431)(img\执行.png)]

    再执行安装RabbitMQ -socat:

    rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm
    
    • 1

    再执行安装RabbitMQ :

    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MzlRAZZk-1663250323431)(img\执行安装.png)]

    4. 开启管理界面及配置
    # 开启管理界面
    rabbitmq-plugins enable rabbitmq_management
    # 修改默认配置信息
    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 
    # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    linux不保存退出的命令是q!。
    1、打开linux系统,在linux的桌面的空白处右击。2、在弹出的下拉选项里,点击打开终端。3、在终端窗口输入vi+文件名打开需要编辑的文件。4、通过vi更改文件后,按ESC键退出insert模式,输入冒号(:),在冒号后面输入q!命令,回车即可不保存退出。
    linux保存退出
    1、打开linux系统,在linux的桌面的空白处右击。2、在弹出的下拉选项里,点击打开终端。3、在终端窗口输入vi+文件名打开需要编辑的文件。4、通过vi更改文件后,按ESC键退出insert模式,输入冒号(:),在冒号后面输入wq命令,回车即可保存退出。
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WuCVepr9-1663250323431)(img\放开.png)]

    查看命令

     cat /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4xaHQAvU-1663250323431)(img\修改成功.png)]

    5. 启动
    service rabbitmq-server start # 启动服务
    service rabbitmq-server stop # 停止服务
    service rabbitmq-server restart # 重启服务
    
    • 1
    • 2
    • 3

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YYk3xc4C-1663250323432)(img\启动成功.png)]

    如果这里访问不到: (http://ip:15672/ )关闭防火墙

    service iptables stop
    
    • 1

    防火墙关闭后还是无法访问: 重启mq

    service rabbitmq-server restart
    
    • 1

    访问 http://ip:15672/#/

    登录名: guest
    密码: guest
    
    • 1
    • 2

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CsicGaJs-1663250323432)(img\成功登录控制台.png)]

    6. 配置虚拟主机及用户

    RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码;

    我们创建自定义用户;也可以登录管理界面:

    新建用户

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2HQH5Nl5-1663250323432)(img\设置用户.png)]

    角色说明

    1、 超级管理员(administrator)

    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

    2、 监控者(monitoring)

    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

    3、 策略制定者(policymaker)

    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

    4、 普通管理者(management)

    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

    5、 其他

    无法登陆管理控制台,通常就是普通的生产者和消费者。

    Virtual Hosts配置

    像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器(类似于mysql的库),每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。

    • 创建Virtual Hosts

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4PZ9tJ52-1663250323432)(img\设置.png)]

    • 设置Virtual Hosts权限

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eHpNT4tW-1663250323433)(img\点击虚拟机.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J8hwCcan-1663250323433)(img\打开.png)]

    点击set permission

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D2ecQYUW-1663250323433)(img\set.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KC6bGRO3-1663250323433)(img\成功.png)]

    登录为abc用户

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZP8tlID-1663250323434)(img\登录为abc用户.png)]

    7. Overview配置文件 (not found)

    /etc/rabbitmq/rabbitmq.config (not found)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FP2tvwVF-1663250323434)(img\找不到配置.png)]

    • 设置配置文件步骤
    • 1.cd找到配置文件所在目录
    cd /usr/share/doc/rabbitmq-server-3.6.5/
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MBXk5p1B-1663250323434)(img\找到了文件.png)]

    • 2.将配置文件copy到指定位置
    cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    
    • 1
    • 3.重启生效

      service rabbitmq-server restart
      
      • 1

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YzRonbiy-1663250323434)(img\成功解决.png)]

    8.端口

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OMLR2yz7-1663250323435)(img\端口.png)]

    注意: 15672是控制台的端口. 25672是集群端口 5672是tcp连接的端口.

    RabbitMQ入门

    1.1. 搭建示例工程

    1.1.1. 创建工程

    • 创建一个空工程rabbitmq

    • 创建rabbitmq-producer 工程

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cTruYwyJ-1663250323435)(img/创建工程1.png)]

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jJUESYZF-1663250323435)(img/创建工程2.png)]

    • 创建rabbitmq-consumer 工程

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HFXkXCUa-1663250323435)(img/创建工程3.png)]

    1.1.2. 添加依赖

    往powernode-rabbitmq的rabbitmq-producer 工程和rabbitmq-consumer 工程的pom.xml文件中添加如下依赖:

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>com.powernodegroupId>
        <artifactId>rabbitmq-consumerartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <dependencies>
            
            <dependency>
                <groupId>com.rabbitmqgroupId>
                <artifactId>amqp-clientartifactId>
                <version>5.6.0version>
            dependency>
        dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-compiler-pluginartifactId>
                    <version>3.8.0version>
                    <configuration>
                        <source>1.8source>
                        <target>1.8target>
                    configuration>
                plugin>
            plugins>
        build>
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZBCcw0E2-1663250323435)(img/添加pom的依赖.png)]

    1.2.编写工具类

    (com.powernode.rabbitmq.util.ConnectionUtil)

    package com.powernode.rabbitmq.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * @program: rabbitmq
     * @ClassName: ConnectionUtil
     * @version: 1.0
     * @description:
     * @author: bjpowernode
     **/
    public class ConnectionUtil {
        public static Connection getConnection() throws Exception {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //主机地址;默认为 localhost
            connectionFactory.setHost("123.57.41.174");
            //连接端口;默认为 5672
            connectionFactory.setPort(5672);
            //虚拟主机名称;默认为 /
            connectionFactory.setVirtualHost("powernode");
            //连接用户名;默认为guest
            connectionFactory.setUsername("powernode");
            //连接密码;默认为guest
            connectionFactory.setPassword("powernode");
    
            //创建连接
            return connectionFactory.newConnection();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    1.3编写生产者

    编写消息生产者com.powernode.rabbitmq.simple.Producer

    package com.powernode.rabbitmq.simple;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * @program: rabbitmq
     * @ClassName: Producer
     * @version: 1.0
     * @description:
     * @author: bjpowernode
     **/
    public class Producer {
        //队列名称 (simple_queue 简单队列)
        static final String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] args) throws Exception {
            //工具类获取连接
            Connection connection = ConnectionUtil.getConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占本次连接。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                    *
                4. autoDelete:是否在不使用的时候自动删除队列。当没有Consumer时,自动删除掉
                5. arguments:队列其它参数
    
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    
    
            String message = "hello rabbitmq~~~";
          /*
            basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
             */
            //6. 发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    
    
            //7.释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    1.3. 编写消费者

    编写消息的消费者com.powernode.rabbitmq.simple.Consumer

    package com.powernode.rabbitmq.simple;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * @program: rabbitmq
     * @ClassName: Consumer
     * @version: 1.0
     * @description:
     * @author: bjpowernode
     **/
    public class Consumer {
        //队列名称 (simple_queue 简单队列)
        static final String QUEUE_NAME = "simple_queue";
    
    
        public static void main(String[] args) throws Exception {
            //工具类获取连接
            Connection connection = ConnectionUtil.getConnection();
    
            // 创建频道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            //创建消费者;并设置消息处理
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 *  回调方法,当收到消息后,会自动执行该方法
                 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * properties 属性信息
                 * body 消息
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //consumerTag
                    System.out.println("consumerTag:" + consumerTag);
                    //路由key
                    System.out.println("路由key为:" + envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:" + envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:" + envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("接收到的消息为:" + new String(body));
                }
            };
            //监听消息
            /**
             * basicConsume(String queue, boolean autoAck, Consumer callback)
             * 参数1:queue:队列名称
             * 参数2: autoAck:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
             * 参数3:callback:消息接收到后回调对象
             */
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            //不关闭资源,应该一直监听消息
    //        channel.close();
    //        connection.close();
        }
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    1.4. 小结

    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接受者,会一直等待消息到来。
    • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

    RabbitMQ的工作模式

    rabbitmq的官网

    • rabbitmq的官网:https://www.rabbitmq.com/
    • 进入MQ的工作模式

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wfGDYXOJ-1663250323439)(img/点击1.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xFinOBtV-1663250323439)(img/点击2.png)]

    • MQ的七种工作模式

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A9OSRSGB-1663250323439)(img/工作模式.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V5uJS7de-1663250323440)(img/第七种.png)]

    工作模式概述:其实就是消息路由分发的一种方式

    七种工作模式:(

    ​ 简单模式(Hello World)

    ​ 工作队列模式(Work queues)

    ​ 订阅模式(Publish/Subscribe)

    ​ 路由模式(Routing)

    ​ 主题模式(Topics)

    ​ 远程过程调用(RPC)

    ​ 发布者确认(Publisher Confirms))

    刚才我们已经实现了第一种模式。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zVU88XkE-1663250323440)(img/01.png)]

    (简单模式)“Hello World!”

    做最简单的事情,一个生产者对应一个消费者,RabbitMQ 相当于一个消息代理,负责将 A 的消息转发给 B 应用场景:将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。
    
    • 1

    现在我们实现第二种

    mq第二种工作模式-(工作队列模式)Work queues

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YNw5GVkG-1663250323440)(img/02.png)]

    work_queues概念:

    Work Queues在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理 应用场景:一个订单的处理需要 10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况。
    
    注意:一条消息只能被一个消费者消费,不能被多个消费者重复消费;
    
    应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    rabbitmq-producer 模块
    1. 新建work包

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pHLlgBxR-1663250323440)(img/新建work包.png)]

    2. 复制简单的模式类-修改名字为Producer_WorkQueues

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iciMR6li-1663250323441)(img/复制简单改名work.png)]

    3. 复制简单的模式类-修改名字为Producer_WorkQueues
    • 修改队列名称-

    • 增加for循环,一次新建10条消息

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Qco47zr-1663250323441)(img/修改队列名称.png)]

    • 完整代码
    package com.powernode.rabbitmq.work;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    
    /**
     * @program: rabbitmq
     * @ClassName: Producer
     * @version: 1.0
     * @description:
     * @author: bjpowernode
     **/
    public class Producer_WorkQueues {
        //队列名称 (work_queues 工作队列)
        static final String QUEUE_NAME = "work_queues";
    
        public static void main(String[] args) throws Exception {
            //工具类获取连接
            Connection connection = ConnectionUtil.getConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            //如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    
            for (int i = 0; i <= 10 ; i++) {
                String message = i+"你好,hello WorkQueues";
                //6. 发送消息
                channel.basicPublish("", QUEUE_NAME, null,message.getBytes("UTF-8"));
            }
    
            //7.释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    rabbitmq-consumer模块
    1. 新建work包

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h9OA60nV-1663250323441)(img/新建work包2.png)]

    2. 复制简单的模式类-修改名字为Producer_WorkQueues

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Dcl6oPgE-1663250323441)(img/复制简单改名work.png)]

    3. 复制简单的模式类-修改名字为Consumer_WorkQueues1
    • 修改队列名称-

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x9jqd28q-1663250323441)(img/修改队列名称1.png)]

    • 完整代码
    package com.powernode.rabbitmq.work;
    
    import com.powernode.rabbitmq.simple.Producer;
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * @program: rabbitmq
     * @ClassName: Consumer
     * @version: 1.0
     * @description:
     * @author: bjpowernode
     **/
    public class Consumer_WorkQueues1 {
        //队列名称 (simple_queue 简单队列)
        static final String QUEUE_NAME = "work_queues";
    
        public static void main(String[] args) throws Exception {
            //工具类获取连接
            Connection connection = ConnectionUtil.getConnection();
    
            // 创建频道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            //创建消费者;并设置消息处理
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 *  回调方法,当收到消息后,会自动执行该方法
                 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * properties 属性信息
                 * body 消息
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //consumerTag
                /*    System.out.println("consumerTag:" + consumerTag);
                    //路由key
                    System.out.println("路由key为:" + envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:" + envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:" + envelope.getDeliveryTag());*/
                    //收到的消息
                    System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
                }
            };
            //监听消息
            /**
             * basicConsume(String queue, boolean autoAck, Consumer callback)
             * 参数1:queue:队列名称
             * 参数2: autoAck:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
             * 参数3:callback:消息接收到后回调对象
             */
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            //不关闭资源,应该一直监听消息
    //        channel.close();
    //        connection.close();
        }
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    4. 复制Consumer_WorkQueues1 创建Consumer_WorkQueues2

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fJEgUKlm-1663250323442)(img/新建2.png)]

    首先启动两个客户端(消费者)-Consumer_WorkQueues2和Consumer_WorkQueues1

    然后启动服务端(生产者)-Producer_WorkQueues

    到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息,可以看到是顺序消费Consumer_WorkQueues1消费了消息0-2-4-6-8-10

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-251YZfhk-1663250323442)(img/消费成功.png)]

    Consumer_WorkQueues2消费了消息1-3-5-7-9

    小结

    在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

    mq第三种工作模式-Publish/Subscribe

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NYlpVGFL-1663250323442)(img/03.png)]

    Publish/Subscribe(发布订阅模式)概念:

    一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。 应用场景:更新商品库存后需要通知多个缓存和多个数据库
    
    • 1

    前面2个案例中,只有3个角色:

    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接受者,会一直等待消息到来。
    • queue:消息队列,图中红色部分

    而在订阅模型中,多了一个exchange角色,而且过程略有变化:

    • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • C:消费者,消息的接受者,会一直等待消息到来。
    • Queue:消息队列,接收消息、缓存消息。
    • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
      • Fanout:广播,将消息交给所有绑定到交换机的队列
      • Direct:定向,把消息交给符合指定routing key 的队列
      • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
      • headers: 参数匹配 , (使用的比较少,不做赘述)

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    • 发布订阅模式:
      1、每个消费者监听自己的队列。
      2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
      到消息
    生产者模块 Producer
    1. 新建ps包
    2. Producer_PublishSubscribe代码
    • 完整代码
    package com.powernode.rabbitmq.ps;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    
    /**
     * 发布与订阅使用的交换机类型为:fanout
     */
    public class Producer_PublishSubscribe {
        //交换机名称
        static final String FANOUT_EXCHAGE = "fanout_exchange";
        //队列名称
        static final String FANOUT_QUEUE_1 = "fanout_queue_1";
        //队列名称
        static final String FANOUT_QUEUE_2 = "fanout_queue_2";
    
        public static void main(String[] args) throws Exception {
            //创建连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建频道
            Channel channel = connection.createChannel();
    
            /**
             * 声明交换机
             * 参数1:交换机名称
             * 参数2:交换机类型,
             *                      fanout、扇形(广播),发送消息到每一个与之绑定的队列
             *                      topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列
             *                      direct、定向 把消息交给符合指定routing key 的队列
             *                      headers 参数匹配(使用的比较少,不做赘述)
             */
            //5. 创建交换机
            channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
            //6. 创建队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
            channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
            //7.队列绑定交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
            channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
            channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
    
            String message = "日志信息:A调用了findAll方法...日志级别:info...";
            //8. 发送消息
            channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    运行生产者–运行完成,队列多了两个

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0spEDPGM-1663250323442)(img/02运行完成,队列多了两个.png)]

    消费者模块consumer
    1. 新建ps包
    2. Consumer_PublishSubscribe1代码
    • 完整代码
    package com.powernode.rabbitmq.ps;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    //消费者1
    public class Consumer_PublishSubscribe1 {
        //交换机名称
        static final String FANOUT_EXCHAGE = "fanout_exchange";
        //队列名称
        static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            // 创建频道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
            // 声明(创建)队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
            //队列绑定交换机
            channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
            //创建消费者;并设置消息处理
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                /**
                 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * properties 属性信息
                 * body 消息
                 */
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            //监听消息
            /**
             * 参数1:队列名称
             * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
             * 参数3:消息接收到后回调
             */
            channel.basicConsume(FANOUT_QUEUE_1, true, consumer);
            //不需要关闭连接
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    4. Consumer_PublishSubscribe2代码
    • 复制Consumer_PublishSubscribe1创建Consumer_PublishSubscribe2

    • 修改队列名称 为2

      package com.powernode.rabbitmq.ps;
      
      
      import com.powernode.rabbitmq.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      import java.io.IOException;
      //消费者2
      public class Consumer_PublishSubscribe2 {
          //交换机名称
          static final String FANOUT_EXCHAGE = "fanout_exchange";
          //队列名称
          static final String FANOUT_QUEUE_2 = "fanout_queue_2";
          public static void main(String[] args) throws Exception {
              Connection connection = ConnectionUtil.getConnection();
              // 创建频道
              Channel channel = connection.createChannel();
              //声明交换机
              channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
              // 声明(创建)队列
              /**
               * 参数1:队列名称
               * 参数2:是否定义持久化队列
               * 参数3:是否独占本次连接
               * 参数4:是否在不使用的时候自动删除队列
               * 参数5:队列其它参数
               */
              channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
              //队列绑定交换机
              channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
              //创建消费者;并设置消息处理
              DefaultConsumer consumer = new DefaultConsumer(channel){
                  @Override
                  /**
                   * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                   * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                   * properties 属性信息
                   * body 消息
                   */
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("body:"+new String(body));
                      System.out.println("将日志信息打印到控制台.....");
                  }
              };
              //监听消息
              /**
               * 参数1:队列名称
               * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
               * 参数3:消息接收到后回调
               */
              channel.basicConsume(FANOUT_QUEUE_2, true, consumer);
              //不需要关闭连接
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53

    启动两个(消费者)-

    到IDEA的两个消费者对应的控制台查看

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9pv0hzW1-1663250323443)(img/02消费者01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5JYa6os6-1663250323443)(img/02消费者02.png)]

    小结

    交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

    mq第四种工作模式-Routing

    Routing(路由模式)概念:

    有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由 key ,消费者将队列绑定到交换机时需要指定路由 key,仅消费指定路由 key 的消息 
    
    应用场景:如在商品库存中增加了 1 台 iphone12,iphone12 促销活动消费者指定 routing key 为 iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此 routing key 的消息
    
    • 1
    • 2
    • 3
    生产者模块 Producer
    1. 新建routing包
    2. Producer_Routing代码

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eiud32nH-1663250323443)(img/指定模式为direct01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MTKRt6L3-1663250323445)(img/指定模式为direct02.png)]

    • 完整代码
    package com.powernode.rabbitmq.routing;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    
    /**
     * 发布与订阅使用的交换机类型为:fanout
     */
    public class Producer_Routing {
        //交换机名称
    
        static final String DIRCET_EXCHAGE = "dircet_exchange";
        //队列名称
        static final String DIRCET_QUEUE_1 = "dircet_queue_1";
        //队列名称
        static final String DIRCET_QUEUE_2 = "dircet_queue_2";
    
        public static void main(String[] args) throws Exception {
            //创建连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建频道
            Channel channel = connection.createChannel();
    
            /**
             * 声明交换机
             * 参数1:交换机名称
             * 参数2:交换机类型,
             *                      fanout、扇形(广播),发送消息到每一个与之绑定的队列
             *                      topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列
             *                      direct、定向 把消息交给符合指定routing key 的队列
             *                      headers 参数匹配(使用的比较少,不做赘述)
             */
            //5. 创建交换机
            channel.exchangeDeclare(DIRCET_EXCHAGE, BuiltinExchangeType.DIRECT);
            //6. 创建队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(DIRCET_QUEUE_1, true, false, false, null);
            channel.queueDeclare(DIRCET_QUEUE_2, true, false, false, null);
            //7.队列绑定交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
            //队列一 绑定了error
            channel.queueBind(DIRCET_QUEUE_1, DIRCET_EXCHAGE, "error");
            //队列二 绑定了 error  info warning
            channel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "error");
            channel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "info");
            channel.queueBind(DIRCET_QUEUE_2, DIRCET_EXCHAGE, "warning");
    
            String message = "日志信息:A调用了findAll方法...日志级别:info...";
            //8. 发送消息-并且指定 routingKey参数为
            channel.basicPublish(DIRCET_EXCHAGE, "info", null, message.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    运行生产者–运行完成,队列多了两个

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BweBd865-1663250323446)(img/02运行完成,队列多了两个.png)]

    消费者模块consumer
    1. 新建routing包
    2. Consumer_Routing1代码

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9S6tGd2t-1663250323446)(img/03客户端01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cGfolP42-1663250323446)(img/03客户端02.png)]

    • 完整代码
    package com.powernode.rabbitmq.routing;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    //消费者1
    public class Consumer_Routing1 {
        //交换机名称
        static final String DIRECT_EXCHAGE = "direct_exchange";
        //队列名称
        static final String DIRECT_QUEUE_1 = "direct_queue_1";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            // 创建频道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
            // 声明(创建)队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, null);
            //队列绑定交换机
            channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHAGE, "");
            //创建消费者;并设置消息处理
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                /**
                 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * properties 属性信息
                 * body 消息
                 */
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
    
                    System.out.println("将日志信息存储到数据库.....");
                }
            };
            //监听消息
            /**
             * 参数1:队列名称
             * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
             * 参数3:消息接收到后回调
             */
            channel.basicConsume(DIRECT_QUEUE_1, true, consumer);
            //不需要关闭连接
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    4. Consumer_Routing2代码

    复制Consumer_Routing1创建Consumer_Routing2

    修改队列名称 为2

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OYp8rXEw-1663250323446)(img/03-改成2.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Osjaw6DY-1663250323446)(img/03-改成打印到控制台.png)]

    • 代码
    package com.powernode.rabbitmq.routing;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    //消费者1
    public class Consumer_Routing2 {
        //交换机名称
        static final String DIRECT_EXCHAGE = "direct_exchange";
        //队列名称
        static final String DIRECT_QUEUE_2 = "direct_queue_2";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            // 创建频道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
            // 声明(创建)队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, null);
            //队列绑定交换机
            channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHAGE, "");
            //创建消费者;并设置消息处理
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                /**
                 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * properties 属性信息
                 * body 消息
                 */
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            //监听消息
            /**
             * 参数1:队列名称
             * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
             * 参数3:消息接收到后回调
             */
            channel.basicConsume(DIRECT_QUEUE_2, true, consumer);
            //不需要关闭连接
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    启动两个(消费者)-

    到IDEA的两个消费者对应的控制台查看

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-y4QkV2mM-1663250323447)(img/查看结果1.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aDH6fyrv-1663250323447)(img/查看结果2.png)]

    因为我们的2队列绑定了info级别的,所以在控制台打印了

    小结

    Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

    mq第五种工作模式-Topics

    Topics(主题模式)概念:

    根据主题(Topics)来接收消息,将路由 key 和某模式进行匹配,此时队列需要绑定在一个模式上,
    #匹配一个词或多个词,*只匹配一个词。 
    应用场景:同上,iphone 促销活动可以接收主题为 iphone 的消息,如 iphone12、iphone13 等
    
    • 1
    • 2
    • 3
    生产者模块 Producer
    1. 新建topics包
    2. Producer_Topics代码

    从复制Producer_PublishSubscribe到Producer_Topics代码

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fejmlZbo-1663250323447)(img/复制到topics生产者.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ASlqUZa9-1663250323447)(img/修改生产者topic01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AckQdWlK-1663250323448)(img/修改生产者topic02.png)]

    • 完整代码
    package com.powernode.rabbitmq.topics;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer_Topics {
        //交换机名称
        static final String TOPIC_EXCHAGE = "topic_exchange";
        //队列名称
        static final String TOPIC_QUEUE_1 = "topic_queue_1";
        //队列名称
        static final String TOPIC_QUEUE_2 = "topic_queue_2";
    
        public static void main(String[] args) throws Exception {
            //创建连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建频道
            Channel channel = connection.createChannel();
    
            /**
             * 声明交换机
             * 参数1:交换机名称
             * 参数2:交换机类型,
             *                      fanout、扇形(广播),发送消息到每一个与之绑定的队列
             *                      topic、通配符方式 把消息交给符合routing pattern(路由模式) 的队列
             *                      direct、定向 把消息交给符合指定routing key 的队列
             *                      headers 参数匹配(使用的比较少,不做赘述)
             */
            //5. 创建交换机
            channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
            //6. 创建队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
            channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
            //7.队列绑定交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
            // routing key  系统的名称.日志的级别。
            //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
            //绑定队列1
            channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"#.error");
            channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHAGE,"order.*");
            //绑定队列2
            channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHAGE,"*.*");
    
            String message = "日志信息:A调用了findAll方法...日志级别:info...";
            //8. 发送消息 指定routingKey 为
            channel.basicPublish(TOPIC_EXCHAGE, "order.info", null, message.getBytes());
            //9. 释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    运行生产者–运行完成,队列多了两个

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cocJTljz-1663250323448)(img/控制台拿到消息.png)]

    消费者模块consumer
    1. 新建topics包
    2. Consumer_Topic1代码

    从复制Consumer_Routing1到Consumer_Topic1代码

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2a6HGRc6-1663250323448)(img/复制到topics消费者01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wr9Gnw33-1663250323448)(img/复制到topics消费者02.png)]

    • 完整代码
    package com.powernode.rabbitmq.topics;
    
    import com.powernode.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    //消费者1
    public class Consumer_Topic1 {
        //交换机名称
        static final String TOPIC_EXCHAGE = "topic_exchange";
        //队列名称
        static final String TOPIC_QUEUE_1 = "topic_queue_1";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            // 创建频道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.DIRECT);
            // 声明(创建)队列
            /**
             * 参数1:队列名称
             * 参数2:是否定义持久化队列
             * 参数3:是否独占本次连接
             * 参数4:是否在不使用的时候自动删除队列
             * 参数5:队列其它参数
             */
            channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
            //队列绑定交换机
            channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "");
            //创建消费者;并设置消息处理
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                /**
                 * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                 * properties 属性信息
                 * body 消息
                 */
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
    
                    System.out.println("将日志信息存储到数据库.....");
                }
            };
            //监听消息
            /**
             * 参数1:队列名称
             * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
             * 参数3:消息接收到后回调
             */
            channel.basicConsume(TOPIC_QUEUE_1, true, consumer);
            //不需要关闭连接
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    4. Consumer_Topic2代码
    • 复制Consumer_Topic1创建Consumer_Topic2

    • 修改队列名称 为2

      package com.powernode.rabbitmq.topics;
      
      import com.powernode.rabbitmq.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      
      //消费者1
      public class Consumer_Topic2 {
          //交换机名称
          static final String TOPIC_EXCHAGE = "topic_exchange";
          //队列名称
          static final String TOPIC_QUEUE_2 = "topic_queue_2";
      
          public static void main(String[] args) throws Exception {
              Connection connection = ConnectionUtil.getConnection();
              // 创建频道
              Channel channel = connection.createChannel();
              //声明交换机
              channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.DIRECT);
              // 声明(创建)队列
              /**
               * 参数1:队列名称
               * 参数2:是否定义持久化队列
               * 参数3:是否独占本次连接
               * 参数4:是否在不使用的时候自动删除队列
               * 参数5:队列其它参数
               */
              channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
              //队列绑定交换机
              channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "");
              //创建消费者;并设置消息处理
              DefaultConsumer consumer = new DefaultConsumer(channel) {
                  @Override
                  /**
                   * consumerTag 消息者标签,在channel.basicConsume时候可以指定
                   * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
                   * properties 属性信息
                   * body 消息
                   */
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("body:"+new String(body));
      
                      System.out.println("将日志信息打印到控制台.....");
                  }
              };
              //监听消息
              /**
               * 参数1:队列名称
               * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
               * 参数3:消息接收到后回调
               */
              channel.basicConsume(TOPIC_QUEUE_2, true, consumer);
              //不需要关闭连接
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57

    启动两个(消费者)-

    到IDEA的两个消费者对应的控制台查看

    因为满足两个路由条件,则两个控制台都收到消息

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WveUg37Z-1663250323448)(img/因为满足两个路由条件,则两个控制台都收到消息1.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jqrkRNrh-1663250323449)(img/因为满足两个路由条件,则两个控制台都收到消息2.png)]

    小结

    Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

    忘记改交换机的类型,这里

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-odkgolQS-1663250323449)(img/交换机类型错误.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yBGgDGtA-1663250323449)(img/交换机类型错误,01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NXEvhng5-1663250323449)(img/交换机类型错误02.png)]

    模式总结

    RabbitMQ工作模式:
    1、简单模式 HelloWorld
    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)一对一

    2、工作队列模式 Work Queue
    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)一个队列的消息被多个人消费,每个人消费的消息是不够完整的,只能获取到队列的一半的消息–>例如 2-4-6-7-8

    3、发布订阅模式 Publish/subscribe
    需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列 产生两个队列的消息被两个人消费,每个人的消息是完整的

    4、路由模式 Routing
    需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列–只能指定一个参数,不能指定多个参数 ,例如,只能指定info

    5、通配符模式 Topic
    需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列可以指定多个参数

    6、远程过程调用(RPC)
    如果我们需要在远程计算机上运行功能并等待结果就可以使用 RPC,具体流程可以看图。

    应用场景:需要等待接口返回数据。

    7、发布者确认(Publisher Confirms)

    与发布者进行可靠的发布确认,发布者确认是 RabbitMQ 扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ 将异步确认发送者发布的消息,这意味着它们已在服务器端处理

    应用场景:对于消息可靠性要求较高,比如钱包扣款

    Springboot整合RabbitMQ

    Springboot整合RabbitMQ生产者

    1.创建生产者Springboot工程

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eDhBWa8m-1663250323450)(img/创建springboot的producer模块.jpg)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mBqqc5DT-1663250323450)(img/创建工程02.png)]

    创建springboot-producer 工程,Groupld为com.powernode

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MhiudEeo-1663250323450)(img/创建工程03.png)]

    2.引入依赖坐标
    
        
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.1.4.RELEASEversion>
        parent>
    
        <dependencies>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    3.编写yml配置文件

    application.yml

    spring:
      rabbitmq:
        host: ip #服务地址
        username: powernode #账户名
        password: powernode #密码
        virtual-host: powernode  # 虚拟机(默认/)
        port: 5672  #端口
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    4.编写启动类

    com.powernode.ProducerApplication

    • 代码
    package com.powernode;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * @program: rabbitmq
     * @ClassName: ProducerApplication
     * @version: 1.0
     * @description: 主启动类
     * @author: bjpowernode
     **/
    @SpringBootApplication
    public class ProducerApplication {
        public static void main(String[] args) {
           SpringApplication.run(ProducerApplication.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    5.编写配置类

    创建config包

    创建RabbitMQConfig配置类 --(定义交换机,队列 和绑定关系)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bcWUX1Xp-1663250323450)(img/创建config包.png)]

    • 代码
    package com.powernode.config;
    
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @program: rabbitmq
     * @ClassName: RabbitMQConfig
     * @version: 1.0
     * @description: 配置类
     * @author: bjpowernode
     **/
    @Configuration
    public class RabbitMQConfig {
        //指定EXCHANGE_NAME交换机 的名称
        public static final String EXCHANGE_NAME = "boot_topic_exchange";
        //指定QUEUE_NAME队列 的名称
        public static final String QUEUE_NAME = "boot_queue";
    
        //1.交换机
        @Bean("bootExchange")
        public Exchange bootExchange(){
            /**
             * ExchangeBuilder构建交换机对象.
             *          topicExchange(String name) -->topicExchange-指定交换机的类型--> (String name)指定交换机的名称.
             *          directExchange(String name)
             *          fanoutExchange(String name)
             *          headersExchange(String name)
             *  .durable(true) 选择true指定为持久化
             *  .build()为构建.
             */
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
    
    
        //2.Queue 队列
        @Bean("bootQueue")
        public Queue bootQueue(){
            /**
             * QueueBuilder构建队列的对象.
             *          durable()
             *          durable(String name) --> ()指定队列的名称.
             *          nonDurable()
             *          nonDurable(String name)
             *  .build()为构建.
             */
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
    
        //3. 队列和交互机绑定关系 Binding
        /**
            1. 指定队列
            2. 指定交换机
            3. routing key
                @Qualifier-->如果配置类出现多个队列,通过名称绑定参数
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
            /**
             * BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
             * .bind(queue)通过什么队列
             * .to(exchange)绑定什么交换机
             * .with("boot.#")指定routingKey
             * .noargs()不需要指定参数,如果需要指定参数,调用.and()
             */
            return BindingBuilder.bind(queue).to(exchange).with("springboot.#").noargs();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    6.编写测试类

    com.powernode.ProducerTest

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4RWgzJRD-1663250323450)(img/编写测试类.png)]

    • 代码

    注入RabbitTemplate–调用方法,完成发送

    package com.powernode;
    
    import com.powernode.config.RabbitMQConfig;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @program: rabbitmq
     * @ClassName: ProducerTest
     * @version: 1.0
     * @description: 测试类
     * @author: bjpowernode
     **/
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class ProducerTest {
        //1.注入RabbitTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSend(){
            /**
             * convertAndSend 发送消息
             *      1.参数1 exchange交换机的名称
             *      2.参数2 routingKey
             *      3.参数3 消息
             */
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"springboot.java","springboot mq hello~~~");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    启动测试类

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Kv0wkn3a-1663250323451)(img/测试启动成功.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aEScJaM6-1663250323451)(img/springboot测试开启.png)]

    7.查看mq控制台的消息

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1Smet39p-1663250323451)(img/点击01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tJgnheOm-1663250323451)(img/点击02.png)]

    Springboot整合RabbitMQ消费者

    1.创建消费者Springboot工程

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NcJoOYqO-1663250323451)(img/创建springboot的producer模块.jpg)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p9FIqxTS-1663250323452)(img/创建工程02.png)]

    创建springboot-producer 工程,Groupld为com.powernode

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2GSxPn2X-1663250323452)(img/创建springboot消费者模块.png)]

    2.引入依赖坐标
        
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.1.4.RELEASEversion>
        parent>
    
        <dependencies>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    3.编写yml配置文件

    application.yml

    spring:
      rabbitmq:
        host: ip #服务地址
        username: powernode #账户名
        password: powernode #密码
        virtual-host: powernode  # 虚拟机(默认/)
        port: 5672  #端口
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    4.编写启动类

    com.powernode.ConsumerApplication

    • 代码
    package com.powernode;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * @program: rabbitmq
     * @ClassName: ConsumerApplication
     * @version: 1.0
     * @description: 主启动类
     * @author: bjpowernode
     **/
    @SpringBootApplication
    public class ConsumerApplication {
        public static void main(String[] args) {
           SpringApplication.run(ProducerApplication.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    5.创建监听类
    com.powernode.listener.RabbimtMQListener
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zS1XeBnN-1663250323452)(img/监听类.png)]

    • 代码

    使用 完成监听

    package com.powernode.listener;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @program: rabbitmq
     * @ClassName: RabbimtMQListener
     * @version: 1.0
     * @description: 监听
     * @author: bjpowernode
     **/
    @Component
    public class RabbimtMQListener {
        @RabbitListener(queues = "boot_queue")
        public void ListenerQueue(Message message){
            System.out.println(new String(message.getBody()));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    6.启动-启动类

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Bk87gAR-1663250323452)(img/取到消息.png)]

    小结

    SpringBoot提供了快速整合RabbitMQ的方式

    基本信息yml配置 ,队列交互以及绑定关系在配置类中使用Bean注入的方式配置(配置类在)

    生产端直接注入RabbitTemplate完成消息发送

    消费端直接使用@RabbitListener完成消息的接收

    SpringBoot整合RabbitMQ (交换机与多个队列绑定)

    1.将上面的springboot整合直接复制过来

    2.生产端

    • RabbitMQConfig类新增-队列名称的定义[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-srO0VUD7-1663250323453)(img/多个队列02.png)]

    • RabbitMQConfig类新增-构建bootQueue2的队列[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YZRNINwh-1663250323453)(img/多个队列03.png)]

    • RabbitMQConfig类新增- 队列和交互机绑定关系 Binding[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iYAmaTbI-1663250323453)(img/多个队列04.png)]

    • ​ 类完整代码

      package com.powernode.config;
      
      
      import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      /**
       * @program: rabbitmq
       * @ClassName: RabbitMQConfig
       * @version: 1.0
       * @description: 配置类
       * @author: bjpowernode
       **/
      @Configuration
      public class RabbitMQConfig {
          //指定EXCHANGE_NAME交换机 的名称
          public static final String EXCHANGE_NAME = "boot_topic_exchange";
          //指定QUEUE_NAME队列 的名称
          public static final String QUEUE_NAME = "boot_queue";
          //指定QUEUE_NAME队列 的名称
          public static final String QUEUE_NAME2 = "boot_queue2";
      
          //1.交换机
          @Bean("bootExchange")
          public Exchange bootExchange(){
              /**
               * ExchangeBuilder构建交换机对象.
               *          topicExchange(String name) -->topicExchange-指定交换机的类型--> (String name)指定交换机的名称.
               *          directExchange(String name)
               *          fanoutExchange(String name)
               *          headersExchange(String name)
               *  .durable(true) 选择true指定为持久化
               *  .build()为构建.
               */
              return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
          }
      
      
          //2.Queue 队列
          @Bean("bootQueue")
          public Queue bootQueue(){
              /**
               * QueueBuilder构建队列的对象.
               *          durable()
               *          durable(String name) --> ()指定队列的名称.
               *          nonDurable()
               *          nonDurable(String name)
               *  .build()为构建.
               */
              return QueueBuilder.durable(QUEUE_NAME).build();
          }
          //2.Queue 队列
          @Bean("bootQueue2")
          public Queue bootQueue2(){
              /**
               * QueueBuilder构建队列的对象.
               *          durable()
               *          durable(String name) --> ()指定队列的名称.
               *          nonDurable()
               *          nonDurable(String name)
               *  .build()为构建.
               */
              return QueueBuilder.durable(QUEUE_NAME2).build();
          }
      
          //3. 队列和交互机绑定关系 Binding
          /**
           1. 指定队列
           2. 指定交换机
           3. routing key
           @Qualifier-->如果配置类出现多个队列,通过名称绑定参数
           */
          @Bean
          public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
              /**
               * BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
               * .bind(queue)通过什么队列
               * .to(exchange)绑定什么交换机
               * .with("boot.#")指定routingKey
               * .noargs()不需要指定参数,如果需要指定参数,调用.and()
               */
      
          /*    amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
              amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));*/
      
              return BindingBuilder.bind(queue).to(exchange).with("1.txt").noargs();
          }
          @Bean
          public Binding bindQueueExchange1(@Qualifier("bootQueue2") Queue queue, @Qualifier("bootExchange") Exchange exchange){
              /**
               * BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
               * .bind(queue)通过什么队列
               * .to(exchange)绑定什么交换机
               * .with("boot.#")指定routingKey
               * .noargs()不需要指定参数,如果需要指定参数,调用.and()
               */
              return BindingBuilder.bind(queue).to(exchange).with("1.#").noargs();
          }
      
          @Bean
          public Binding bindQueueExchange2(@Qualifier("bootQueue2") Queue queue, @Qualifier("bootExchange") Exchange exchange){
              /**
               * BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
               * .bind(queue)通过什么队列
               * .to(exchange)绑定什么交换机
               * .with("boot.#")指定routingKey
               * .noargs()不需要指定参数,如果需要指定参数,调用.and()
               */
              return BindingBuilder.bind(queue).to(exchange).with("555.zzz").noargs();
          }
      
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115

    3.消费端

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8Z2z6Mui-1663250323453)(img/多个队列01.png)]

    • ​ 新增的代码
        //跟你消息生产者的队列名称一致
        @RabbitListener(queues = "boot_queue2")
        public void ListenerQueue2(Message message){
            System.out.println(new String(message.getBody())+ "222");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.启动test测试类,进行测试

    (注意:先启动生产者,因为绑定关系在消费者未配置,则消费者不能先启动–>消费者先启动,会报找不到队列异常)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-71g7HkdX-1663250323453)(img/多队列执行.png)]

    RabbitMQ-集群搭建

    概述

    当我们在项目开发中引入了mq中间件,那中间件的高可用就变得尤为重要,但是如果我们只有单台mq,则容易出现不可用的问题,导致我们整个项目宕机.这时候,我们就想到了要部署mq中间件集群.来提高我们项目中间件的高可用.

    在我们的项目当中,单机多实例的集群部署方式,被称为伪集群

    前期准备-创建多节点
    • 首先检查mq是否正常运行

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ejmpgIes-1663250323454)(img/检查mq是否正常运行.png)]

    检查命令: -确保RabbitMQ运行没有问题

    rabbitmqctl  status
    
    • 1

    停止rabbitmq服务

    service rabbitmq-server stop
    
    • 1

    **新建一个会话窗口–>**启动第一个节点:(前台启动方式)

    RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
    #web管理插件端口占用,所以还要指定其web插件占用的端口号。
    RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
    
    • 1
    • 2
    • 3

    使用:guest 账户登录(http://ip:15673/#/)控制台查看

    **新建一个会话窗口–>**启动第二个节点:(前台启动方式)

    #web管理插件端口占用,所以还要指定其web插件占用的端口号。
    RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
    
    • 1
    • 2

    使用:guest 账户登录(http://ip:15674/#/)控制台查看

    结束命令:–(书写,但是这里不用,用下面的停止命令)

    rabbitmqctl -n rabbit1 stop
    rabbitmqctl -n rabbit2 stop
    
    • 1
    • 2
    设置主节点操作:

    新建一个会话窗口–>

    rabbit1操作作为主节点:
    • 停止rabbit1 节点
    rabbitmqctl -n rabbit1 stop_app 
    
    • 1
    • 重置rabbit1 节点
    rabbitmqctl -n rabbit1 reset	 
    
    • 1
    • 重启rabbit1 节点
    rabbitmqctl -n rabbit1 start_app
    
    • 1
    rabbit2操作作为从节点:
    • 停止rabbit1 节点
    rabbitmqctl -n rabbit2 stop_app
    
    • 1
    • 重置rabbit1 节点
    rabbitmqctl -n rabbit2 reset 
    
    • 1
    • 将rabbit2加入到集群中

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1sFquWgs-1663250323454)(img/主机名位置.png)]

    rabbitmqctl -n rabbit2 join_cluster rabbit1@'iz2ze268ldc0zhjsfzajw4z'
    ###''内是主机名换成自己的 例如  iz2ze268ldc0zhjsfzajw4z 
    
    • 1
    • 2
    • 重启rabbit2 节点
    rabbitmqctl -n rabbit2 start_app
    
    • 1

    添加节点加入集群设置成功

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LVfRZySM-1663250323454)(img/节点添加成功01.png)]

    从两个控制台查看,–(http://123.57.41.174:15673/#/) (http://123.57.41.173:15673/#/)均添加成功

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QVV1HZmv-1663250323454)(img/控制台查看01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FAq8jEDT-1663250323454)(img/控制台查看02.png)]

    • 新增队列

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nFVkDVXl-1663250323455)(img/新增队列.png)]

    RabbitMQ镜像集群配置

    上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一个项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

    镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。

    设置的镜像队列-(设置集群队列信息同步)可以通过开启的网页的管理端Admin->Policies,也可以通过命令

    管理控制台设置

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5dSQpGOW-1663250323455)(img/设置集群队列信息同步 步骤.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mC4JQkEa-1663250323455)(img/设置集群队列信息同步01.png)]

    含义解释
    • Name:策略名称

    • Pattern:匹配的规则,如果是匹配所有的队列,是^.

    • Definition: 镜像定义,包括三个部分 ha-mode,ha-params,ha-sync-mode

      • ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
      • all表示在集群所有的节点上进行镜像 ,也就是同步所有匹配的队列
      • exactly表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
      • nodes表示在指定的节点上进行镜像,节点名称通过ha-params指定
      • ha-params: ha-mode模式需要用到的参数
      • ha-sync-mode: 镜像队列中消息的同步方式,有效值为automatic(自动),manually (手动)
    • 成功设置

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bN3Jxt0R-1663250323455)(img/成功设置01.png)]

    • +1 表示同步到一个队列。如果有多个队列,这里就是+几

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ERucpvfL-1663250323456)(img/到悬浮窗口,看到02.jpg)]

    负载均衡-HAProxy

    概述

    HAProxy是一个使用C语言编写的自由及开放源代码软件[1],其提供高可用性、负载均衡,以及基于TCP和HTTP的应用程序代理。
    HAProxy特别适用于那些负载特大的web站点,这些站点通常又需要会话保持或七层处理。HAProxy运行在当前的硬件上,完全可以支持数以万计的并发连接。并且它的运行模式使得它可以很简单安全的整合进您当前的架构中, 同时可以保护你的web服务器不被暴露到网络上。
    HAProxy实现了一种事件驱动, 单一进程模型,此模型支持非常大的并发连接数。多进程或多线程模型受内存限制 、系统调度器限制以及无处不在的锁限制,很少能处理数千并发连接。事件驱动模型因为在有更好的资源和时间管理的用户空间(User-Space) 实现所有这些任务,所以没有这些问题。此模型的弊端是,在多核系统上,这些程序通常扩展性较差。这就是为什么他们必须进行优化以 使每个CPU时间片(Cycle)做更多的工作。
    包括 GitHub、Bitbucket[3]、Stack Overflow[4]、Reddit、Tumblr、Twitter[5][6]和 Tuenti[7]在内的知名网站,及亚马逊网络服务系统都使用了HAProxy。 [1] 
    
    • 1
    • 2
    • 3
    • 4

    安装-HAProxy

    • 下载依赖包
    yum install gcc vim wget
    
    • 1
    • 上传haproxy源码包

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i4sVAtAa-1663250323456)(img/点击上传01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wBrdzYcA-1663250323456)(img/上传02.png)]

    • ls命令查看是否上传成功

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JvDsKHRl-1663250323456)(img/查看上传成功.png)]

    • 解压

    tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
    
    • 1
    • 进入目录
    cd /usr/local/haproxy-1.6.5
    
    • 1
    • 编译

      make TARGET=linux31 PREFIX=/usr/local/haproxy
      
      • 1
    • 安装

      make install PREFIX=/usr/local/haproxy
      
      • 1
    • 赋权

    #添加组
    groupadd -r -g 149 haproxy
    #添加用户
    useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
    
    • 1
    • 2
    • 3
    • 4
    • 创建haproxy文件夹
    mkdir /etc/haproxy
    
    • 1
    • 进入haproxy配置文件
    vim /etc/haproxy/haproxy.cfg
    
    • 1

    配置-HAProxy

    配置文件路径:/etc/haproxy/haproxy.cfg

    #logging options
    global
    	log 127.0.0.1 local0 info
    	maxconn 5120
    	chroot /usr/local/haproxy
    	uid 99
    	gid 99
    	daemon
    	quiet
    	nbproc 20
    	pidfile /var/run/haproxy.pid
    
    defaults
    	log global
    	
    	mode tcp
    
    	option tcplog
    	option dontlognull
    	retries 3
    	option redispatch
    	maxconn 2000
    	contimeout 5s
       
         clitimeout 60s
    
         srvtimeout 15s	
    #front-end IP for consumers and producters
    #rabbitmq_cluster 是个名字,随意
    listen rabbitmq_cluster
    	#对外提供服务的端口号
    	bind 0.0.0.0:5672
    	mode tcp
    	#balance url_param userid
    	#balance url_param session_id check_post 64
    	#balance hdr(User-Agent)
    	#balance hdr(host)
    	#balance hdr(Host) use_domain_only
    	#balance rdp-cookie
    	#balance leastconn
    	#balance source //ip
    	
    	balance roundrobin
    		#两个rabbit节点和haproxy在同一个服务器可以使用127.0.0.1 当三台不在同一个服务器的时候,写具体的IP地址
            server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
            server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
    
    listen stats
    #Haproxy控制台的访问地址
    	bind ip:8100
    	mode http
    	option httplog
    	stats enable
    	stats uri /
    	stats refresh 5s
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 保存并退出
    #esc(键退出)->:(符号输入)->wq(保存编辑操作退出) 
    #:wq!保存编辑强制退出
    :wq 
    
    • 1
    • 2
    • 3
    • 启动HAproxy负载
    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
    
    • 1
    • 查看haproxy进程状态
    ps -ef | grep haproxy
    
    • 1

    访问如下地址对mq节点进行监控
    http://ip:8100/

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SYsJZ8Io-1663250323456)(img/启动成功01.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jmIOJmGR-1663250323457)(img/启动成功02.png)]

    注意:代码中访问mq集群地址,则变为访问haproxy地址:5672

    Haproxy启动失败

    haproxy配置本机IP或0.0.0.0以外的IP,启动时报错,

    错误信息
    [ALERT] 252/225311 (24204) : Starting proxy stats: cannot bind socket [43.109.197.238:8100]
    
    • 1

    错误的原因

    高可用虚IP配置后,无法启动。
    
    • 1
    解决方案

    绑定非本机的IP需要在sysctl.conf文件中配置

    vi /etc/sysctl.conf #修改内核参数
    net.ipv4.ip_nonlocal_bind = 1 #没有就新增此条记录
    sysctl -p #保存结果,使结果生效
    
    • 1
    • 2
    • 3

    创建新工程-测试

    colony-rabbitmq

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xM9oeML6-1663250323457)(img/创建新工程.png)]

    测试代码

    package com.powernode;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @program: rabbitmq
     * @ClassName: Demo
     * @version: 1.0
     * @description:
     * @author: bjpowernode
     **/
    public class Demo {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂11
            ConnectionFactory factory = new ConnectionFactory();
            // 2.设置参数
            factory.setHost("ip");
            //设置ip
            factory.setPort(5672);
            //创建连接connection
            Connection connection = factory.newConnection();
            //创建channel
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare("hello world",true,false,false,null);
            String  boby="hello rabbit~";
            //发送消息
            channel.basicPublish("","hello world",null,boby.getBytes());
            //释放资源
            channel.close();
            connection.close();
            System.out.println("send success....");
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
  • 相关阅读:
    工业设计:为什么要把消费放在如此高位?
    MyBatis-Plus 和swagger
    uniapp封装一个可移动的定位图标
    C语言(第三十七天)
    高内聚、低耦合、高并发、高可用、分布式这些名称到底什么意思?
    SHEIN推出自主运营+代运营多模式,助力卖家实现快速增长
    【AIGC专题】Stable Diffusion 从入门到企业级实战0402
    vue路由
    课题学习(八)----卡尔曼滤波动态求解倾角、方位角
    (十二) 共享模型之无锁【原子整数、原子引用、原子数组】
  • 原文地址:https://blog.csdn.net/qq_39505065/article/details/126880533