• RabbitMQ


    1、MQ概述

    在这里插入图片描述
    在这里插入图片描述

    2、MQ优势

    (1)应用解耦
    在没有MQ时,订单系统直接与其他系统交互,当其他任意一个系统出错时,都会影响到订单系统。加入MQ后,订单系统将信息都放入MQ中,其它系统从MQ取,即使出错也不会影响到订单系统。并且当再添加一个系统时也不会跟订单系统有所影响,直接根据MQ中的信息添加。
    在这里插入图片描述

    (2)异步提速
    不加MQ,用户响应需要920s
    在这里插入图片描述
    加上MQ,仅需25s,就得到响应
    在这里插入图片描述

    (3)削峰填补
    不加MQ,当请求瞬间过多,系统容易崩溃。
    在这里插入图片描述
    加上MQ,直接解决此问题,所有请求装进MQ,系统慢慢取慢慢处理。
    在这里插入图片描述

    3、MQ劣势

    在这里插入图片描述

    4、使用条件

    在这里插入图片描述

    5、常见MQ产品

    在这里插入图片描述

    6、RabbitMQ简介

    RabbitMQ基于AMQP标准。

    在这里插入图片描述
    RabbitMQ基础架构图:

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    7、RabbitMQ安装与配置

    1、下面3个包传到虚拟机
    在这里插入图片描述
    2、在线安装依赖环境:

    yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
    
    • 1

    3、安装Erlang

    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
    
    • 1

    4、安装RabbitMQ

    # 安装
    rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
    
    # 安装
    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
    
    • 1
    • 2
    • 3
    • 4
    • 5

    5、开启管理界面及配置

    # 开启管理界面
    rabbitmq-plugins enable rabbitmq_management
    # 修改默认配置信息
    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 
    # 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6、启动

    
    service rabbitmq-server start # 启动服务
    service rabbitmq-server stop # 停止服务
    service rabbitmq-server restart # 重启服务
    
    • 1
    • 2
    • 3
    • 4

    7、设置配置文件

    cd /usr/share/doc/rabbitmq-server-3.6.5/
    
    cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    
    • 1
    • 2
    • 3

    RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码。
    可以在上面创建用户并为其分配权限、创建Virtual hosts、并给用户设置权限。可以查看交换机、队列、连接信息。

    8、RabbitMQ-简单模式

    在这里插入图片描述

    producer端:

    • 创建连接工厂
    • 设置参数
    • 创建连接Connection
    • 创建channel
    • 创建队列(已存在的话则不会创建)
    • 发送消息
    • 释放资源,关闭连接
    package com.liu.producer;
    
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 发送消息
     */
    public class Producer_HelloWorld {
        public static void main(String[] args) throws IOException, TimeoutException {
    
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("192.168.182.134");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                    *
                4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
                5. arguments:参数。
    
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("hellooo", true, false, false, null);
            /*
            basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称,这里简单模式跟队列名称相同
                3. props:配置信息
                4. body:发送消息数据
    
             */
            for (int i = 1 ;i<=5; i++) {
                String body = "hello lmhlmh~~~";
    
                //6. 发送消息
                channel.basicPublish("", "hellooo", null, body.getBytes());
            }
    
    
    
            //7.发送到MQ中后就可以释放资源了。消费者直接去MQ中取
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    consumer端:

    • 创建连接工厂
    • 设置参数
    • 创建连接Connection
    • 创建channel
    • 创建队列(已存在的话则不会创建)
    • 接收消息
    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_HelloWorld {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("192.168.182.134");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                    *
                4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
                5. arguments:参数。
    
             */
             //生产者刚才创建的队列,也可以不指定(但有个小bug是当producer来回多次启动后再取得话,就取不出来)
            channel.queueDeclare("hellooo",true,false,false,null);
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);
                    System.out.println("body:"+new String(body));
                }
            };
            channel.basicConsume("hellooo",true,consumer);
            //关闭资源?不要
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    9、RabbitMQ-Work queues 工作队列模式

    在这里插入图片描述

    producer端:

    package com.liu.producer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 发送消息
     */
    public class Producer_WorkQueues {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                    *
                4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
                5. arguments:参数。
    
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
            /*
            basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            参数:
                1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
                2. routingKey:路由名称
                3. props:配置信息
                4. body:发送消息数据
    
             */
            for (int i = 1; i <= 10; i++) {
                String body = i+"hello rabbitmq~~~";
    
                //6. 发送消息
                channel.basicPublish("","work_queues",null,body.getBytes());
            }
    
    
            //7.释放资源
          channel.close();
          connection.close();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    consumer1端:

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_WorkQueues1 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                    *
                4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
                5. arguments:参数。
    
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                }
            };
            channel.basicConsume("work_queues",true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    consumer2端

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_WorkQueues2 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
            //5. 创建队列Queue
            /*
            queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
            参数:
                1. queue:队列名称
                2. durable:是否持久化,当mq重启之后,还在
                3. exclusive:
                    * 是否独占。只能有一个消费者监听这队列
                    * 当Connection关闭时,是否删除队列
                    *
                4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
                5. arguments:参数。
    
             */
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("work_queues",true,false,false,null);
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                }
            };
            channel.basicConsume("work_queues",true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    小结:

    在这里插入图片描述

    10、 RabbitMQ-Pub/Sub 订阅模式

    ![在这里插入图片描述](https://img-blog.csdnimg.cn/e25f0fa5ddf14706ae077a1daef53ae9.png

    在这里插入图片描述

    producer端:

    • 创建连接工厂
    • 设置参数
    • 创建连接Connection
    • 创建channel
    • 创建交换机
    • 创建队列(已存在的话则不会创建)
    • 绑定队列和交换机
    • 发送消息
    • 释放资源,关闭连接

    这里是广播模式:

    package com.liu.producer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 发送消息
     */
    public class Producer_PubSub {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
           /*
    
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
    
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
            */
    
            String exchangeName = "test_fanout";
            //5. 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            //6. 创建队列
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7. 绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
    
            String body = "日志信息:张三调用了findAll方法...日志级别:info...";
            //8. 发送消息
            channel.basicPublish(exchangeName,"",null,body.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    consumer1端:

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_PubSub1 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
    
            // 队列名称
            String queue1Name = "test_fanout_queue1";
    
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue1Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    consumer2端:

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_PubSub2 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
    
    
            String queue2Name = "test_fanout_queue2";
    
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息保存数据库.....");
                }
            };
            channel.basicConsume(queue2Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    小结:

    在这里插入图片描述

    11、RabbitMQ-direct定向模式

    在这里插入图片描述

    producer端:

    package com.liu.producer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 发送消息
     */
    public class Producer_Routing {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
           /*
    
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
    
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
            */
    
           String exchangeName = "test_direct";
            //5. 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
            //6. 创建队列
            String queue1Name = "test_direct_queue_first";
            String queue2Name = "test_direct_queue_second";
    
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7. 绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
                    这里是direct类型,routingKey设置为特定的值
             */
            //队列1绑定 error
            channel.queueBind(queue1Name,exchangeName,"error");
            //队列2绑定 info  error  warning
            channel.queueBind(queue2Name,exchangeName,"info");
            channel.queueBind(queue2Name,exchangeName,"error");
            channel.queueBind(queue2Name,exchangeName,"warning");
    
            String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
            //8. 发送消息
            channel.basicPublish(exchangeName,"warning",null,body.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    consumer1端:

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Routing1 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
    
            // 找到队列,取其中的消息
            String queue2Name = "test_direct_queue_second";
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
            channel.queueDeclare("test_direct_queue_second",true,false,false,null);
    
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue2Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    consumer2端:

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Routing2 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
    
            // 找到队列,取其中的消息
            String queue1Name = "test_direct_queue_first";
    
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建。 这里有一个用处是可以通过消费者创建先进行监听
            channel.queueDeclare("test_direct_queue_first",true,false,false,null);
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息存储到数据库.....");
                }
            };
            channel.basicConsume(queue1Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    小结:
    在这里插入图片描述

    12、RabbitMQ-topic通配符模式

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    producer端:

    package com.liu.producer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 发送消息
     */
    public class Producer_Topics {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
           /*
    
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
    
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
            */
    
           String exchangeName = "test_topic";
            //5. 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
            //6. 创建队列
            String queue1Name = "test_topic_queue_first";
            String queue2Name = "test_topic_queue_second";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7. 绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
    
            // routing key  系统的名称.日志的级别。
            //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库; #代表任意个数据,*代表一个数据
            channel.queueBind(queue1Name,exchangeName,"#.error");
            channel.queueBind(queue1Name,exchangeName,"order.*");
            channel.queueBind(queue2Name,exchangeName,"*.*");
    
            String body = "日志信息:张三调用了delete方法...日志级别:error...";
            //8. 发送消息
            channel.basicPublish(exchangeName,"bb.aa",null,body.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    consumer1:

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Topic1 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
    
            // 找到队列,取消息
            String queue1Name = "test_topic_queue_first";
            String queue2Name = "test_topic_queue_second";
    
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建。 这里有一个用处是可以通过消费者创建先进行监听
            channel.queueDeclare(queue1Name,true,false,false,null);
    
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息存入数据库.......");
                }
            };
            channel.basicConsume(queue1Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    consumer2:

    package com.liu.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_Topic2 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2. 设置参数
            factory.setHost("master");//ip  默认值 localhost
            factory.setPort(5672); //端口  默认值 5672
            factory.setVirtualHost("/menghao");//虚拟机 默认值/
            factory.setUsername("lmh");//用户名 默认 guest
            factory.setPassword("123456");//密码 默认值 guest
            //3. 创建连接 Connection
            Connection connection = factory.newConnection();
            //4. 创建Channel
            Channel channel = connection.createChannel();
    
            // 找到队列,取消息
            String queue1Name = "test_topic_queue_first";
            String queue2Name = "test_topic_queue_second";
    
            //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建。 这里有一个用处是可以通过消费者创建先进行监听
            channel.queueDeclare(queue2Name,true,false,false,null);
    
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印控制台.......");
                }
            };
            channel.basicConsume(queue2Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    13、工作模式总结

    在这里插入图片描述

    14、RabbitMQ整合Springboot

    生产者和消费者 pom.xml文件添加依赖:

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    生产者和消费者 application.yam 文件配置如下:

    # 配置RabbitMQ的基本信息  ip 端口 username  password..
    spring:
      rabbitmq:
        host: master # ip
        port: 5672
        username: lmh
        password: 123456
        virtual-host: /menghao
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    生产者主要是在配置类中定义队列、交换机、二者绑定、以及一些参数规则。

    package com.liu.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    // 定义的 队列、交换机、绑定规则 等都放在这个配置类中
    @Configuration
    public class RabbitMQConfig {
    
        public static final String EXCHANGE_NAME = "boot_topic_exchange";
        public static final String QUEUE_NAME = "boot_queue";
    
        //1.交换机
        @Bean("bootExchange")  // 注入bean,为了后面绑定时的获取
        public Exchange bootExchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME)
                    .durable(true)
                    .build();
        }
    
    
        //2.Queue 队列
        @Bean("bootQueue") // 注入bean,为了后面绑定时的获取
        public Queue bootQueue(){
            return QueueBuilder
                    .durable(QUEUE_NAME)
                    .build();
        }
    
        //3. 队列和交互机绑定关系 Binding
        /*
            1. 知道哪个队列
            2. 知道哪个交换机
            3. routing key
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
            return BindingBuilder.bind(queue)
                    .to(exchange)
                    .with("boot.#")
                    .noargs();
        }
    
    
        public static final String EXCHANGE_NAME_CONFIRM = "boot_exchange_confirm";
    
        public static final String QUEUE_NAME_CONFIRM = "boot_queue_confirm";
    
        //1.交换机
        @Bean("confirmExchange")  // 注入bean,为了后面绑定时的获取
        public Exchange confirm_Exchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME_CONFIRM)
                    .durable(true)
                    .build();
        }
    
    
        //2.Queue 队列
        @Bean("confirmQueue") // 注入bean,为了后面绑定时的获取
        public Queue confirm_Queue(){
            return QueueBuilder
                    .durable(QUEUE_NAME_CONFIRM)
                    .build();
        }
    
        @Bean
        public Binding bindQueueExchangeConfirm(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){
            return BindingBuilder.bind(queue)
                    .to(exchange)
                    .with("confirm.#")
                    .noargs();
        }
    
        // 设置过期时间的交换机
        public static final String EXCHANGE_NAME_TTL = "boot_exchange_ttl";
    
        public static final String QUEUE_NAME_TTL = "boot_queue_ttl";
    
        //1.交换机
        @Bean("ttlExchange")  // 注入bean,为了后面绑定时的获取
        public Exchange ttl_Exchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME_TTL)
                    .durable(true)
                    .build();
        }
    
    
        //2.Queue 队列
        @Bean("ttlQueue") // 注入bean,为了后面绑定时的获取
        public Queue tll_Queue(){
            return QueueBuilder
                    .durable(QUEUE_NAME_TTL)
                    .ttl(10000) // 10s内队列里的消息没被消费的话 就统一全过期
                    .deadLetterExchange(EXCHANGE_NAME_DLX) // 绑定死信交换机
                    .deadLetterRoutingKey("dlx.#") // 定义死信队列路由
                    .maxLength(10) // 设置队列最大长度
                    .build();
        }
    
    
    
        @Bean
        public Binding bindQueueExchangeTtl(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
            return BindingBuilder.bind(queue)
                    .to(exchange)
                    .with("ttl.#")
                    .noargs();
        }
    
        // 死信队列,当某一个队列 消息过期/达到队列长度限制/消息被消费者拒收 这些消息可以被转为与之绑定的死信队列中
        public static final String EXCHANGE_NAME_DLX = "boot_exchange_dlx";
    
        public static final String QUEUE_NAME_DLX = "boot_queue_dlx";
    
        //1.交换机
        @Bean("dlxExchange")  // 注入bean,为了后面绑定时的获取
        public Exchange dlx_Exchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX)
                    .durable(true)
                    .build();
        }
    
    
        //2.Queue 队列
        @Bean("dlxQueue") // 注入bean,为了后面绑定时的获取
        public Queue dlx_Queue(){
            return QueueBuilder
                    .durable(QUEUE_NAME_DLX)
                    .build();
        }
    
        @Bean
        public Binding bindQueueExchangeDlx(@Qualifier("dlxQueue") Queue queue, @Qualifier("dlxExchange") Exchange exchange){
            return BindingBuilder.bind(queue)
                    .to(exchange)
                    .with("dlx.#")
                    .noargs();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142

    这里定义了四对 交换机&队列。

    14.1 简单例子

    (1)第一对是一个简单的通配符模式的匹配规则。

    通过测试方法,来向队列发送消息:

    // 注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 1、普通发送
    @Test
    public void testSend(){
       // 三个参数:交换机名称、匹配的规则、发送的信息
       rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    然后在consumer将消息取出:

    package com.liu.listener;
    
    //注意这里引入的Channel 包
    import com.rabbitmq.client.Channel;
    // 注意这里引入的 Message 包
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * @ClassName: RabbitMQListener
     * @Author: liumenghao
     * @Description:
     * @Date: 2022/11/18 13:12
     */
    @Component
    public class RabbitMQListener {
    
        @RabbitListener(queues = "boot_queue") // 项目启动后,就监听此队列,监听到消息就输出
        public void ListenerQueue(Message message){
            //System.out.println(message);
            System.out.println(new String(message.getBody())); // 输出监听到的消息
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    主要用到了 @RabbitListener 此注解,来找到队列!

    14.2 消息可靠投递confirm&return

    (2)第二对是开启确认模式
    在这里插入图片描述
    在生产者的yam文件中增添:

    publisher-confirm-type: correlated  # 开启确认模式
    
    • 1

    通过测试方法,像队列发送消息:

    // 2、 确认模式下发送
    /*
    * 确认模式:当消息发送给Exchange后,返回给发送者的结果(仅仅判断是否成功发送到Exchange,而不管Exchange是否成功路由到Queue)
    * 1、在 yam文件中设置    publisher-confirm-type: correlated
    * 2、在 rabbitTemplate 中定义 confirmCallBack
    * */
    
    @Test
    public void testConfirm(){
       // 在确认模式下,定义回调
       rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
           /*
           * correlationData: 相关配置信息
           * b: 交换机是否成功收到了消息。
           * s: 失败原因
           * */
           @Override
           public void confirm(CorrelationData correlationData, boolean b, String s) {
               System.out.println("confirm方法被执行了.....");
               if(b) {
                   System.out.println("接受成功消息" + s);
               } else {
                   System.out.println("接收失败消息" + s);
               }
           }
       });
       rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_CONFIRM,"confirm.aaa.bb","boot mq confirm~~~");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    (2)第二对还开启回退模式

    在生产者的yam文件中添加:

    publisher-returns: true # 开启回退模式
    
    • 1

    通过测试方法,向队列发送消息:

    // 3、回退模式下发送
    /*
    * 回退模式:处理成功被Exchange接收,而路由到Queue失败的消息
    * 当消息发送给Exchange后,Exchange路由到Queue失败时,才会执行ReturnCallBack
    * 步骤:
    *  1、开启回退模式,在yam文件中     publisher-returns: true
    *  2、设置Exchange处理消息的模式
    *    1、如果消息没有路由到Queue,则丢弃消息(默认)
    *    2、如果消息没有路由到Queue,返回给消息发送方(在ReturnCallBack中写)
    * 3、如果处理消息模式为处理失败消息,定义 ReturnCallBack
    * */
    @Test
    public void testReturn(){
    
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
    
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println(returnedMessage.getMessage());
                System.out.println(returnedMessage.getReplyCode());
                System.out.println(returnedMessage.getReplyText());
                System.out.println(returnedMessage.getExchange());
                System.out.println(returnedMessage.getRoutingKey());
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_CONFIRM,"confirm","boot mq confirm~~~");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    14.3 Consumer Ack

    在这里插入图片描述
    生产端用的还是第二对的交换机&队列
    消费端yam文件增加配置:

    listener:
      simple:
        acknowledge-mode: manual # consumer ack 手动确认
    
    • 1
    • 2
    • 3

    还是上述测试方法的方式发送消息。
    发送过之后,在consumer端取消息,这里在取消息的时候故意制造错误,会发现生产端不停重发

    package com.liu.listener;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    /**
     * @ClassName: ConsumerAckListener
     * @Author: liumenghao
     * @Description:
     * @Date: 2022/11/18 15:58
     */
    @Component
    public class ConsumerAckListener {
        /*
         * consumer ACK确认机制
         * 1、默认情况下是自动签收,收到就返回,不管后续处理如何
         * 2、手动签收:yam文件 配置 listener.simple.acknowledge-mode: manual
         * 3、还是通过RabbitListener注解
         * 4、如果消息处理成功,调用channel的 basicAck() 签收
         * 5、如果消息处理失败,调用channel的 basicNack() 拒绝签收,第三方重新发送给consumer
         * */
        @RabbitListener(queues = "boot_queue_confirm")
        // @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag 将请求头AmqpHeaders中的DELIVERY_TAG参数 映射到 deliveryTag
        public void ListenerACKQueue(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception{
    
    //        Thread.sleep(1000); // 延迟1s看的更清楚
            try {
                // 1、接受转换消息
                System.out.println("ack: " + new String(message.getBody()));
    
                // 2、处理业务逻辑
                System.out.println("处理业务逻辑...");
    
                // 设置一个错误,会看到第三方会无线发消息
    //            int i = 3/0 ;
                // 3、手动签收
                channel.basicAck(deliveryTag,true);
    
            }catch (Exception e){
    
                // 4、一旦出现异常就拒绝签收
                // 第三个参数含义:设为true,消息重新返回到queue, 第三方会重新发送该消息给consumer
                channel.basicNack(deliveryTag, true, true);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    14.4 消费端限流

    所有请求都先发送到MQ,消费端自己定义每次处理多少请求。避免了突然崩溃的情况。

    在这里插入图片描述

    通过测试方法向队列发送消息:

    // 4、发送多条消息,测试 consumer 限流
    @Test
    public void testSendMany() {
        for(int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_CONFIRM,"confirm","boot mq confirm~~~");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在消费者的yam文件中增加如下配置:

    prefetch: 1 # 限流量,consumer 每次从mq拉取的消息数,直到手动确认完毕后,才会继续拉取下一条消息
    
    • 1

    消费者获取消息代码:

    package com.liu.listener;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    /**
     * @ClassName: QosListener
     * @Author: liumenghao
     * @Description:
     * @Date: 2022/11/18 16:01
     */
    
    /*
    * Consumer 限流机制
    * 1、确保consumer ack机制为手动确认
    * 2、在 yam 文件中配置 listener.simple.prefetch
    *   prefetch = 1,表示consumer 每次从mq拉取一条消息来消费,直到手动确认消费完毕后才会继续拉取下一条消息。
    * */
    @Component
    public class QosListener {
    
        @RabbitListener(queues = "boot_queue_confirm")
        public void qosListener(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception{
            Thread.sleep(1000); // 每次处理后,休息1s,看的更清楚
            // 获取消息
            System.out.println("qos: " + new String(message.getBody()));
            // 处理业务逻辑
    
            // 签收. 如果把这行注释,则控制台只会打印一条消息,因为每次接收一个,没签收之前不会去接收下一个
            channel.basicAck(deliveryTag, true);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    消费端的确认模式一定要为手动确认!

    14.5 TTL过期

    第三对交换机&队列 设置了过期时间。在队列中的消息一段时间没被取出就过期。
    在这里插入图片描述
    对队列整体设置过期时间:
    在这里插入图片描述
    测试代码(可以为单个发送的消息设置过期时间)

    // 5、发送多条消息,测试 过期时间
     /*
     * TTL: 过期时间
     *   1、队列统一过期
     *
     *   2、消息单独过期
     *
     *  如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
     * */
     @Test
     public void testTtl() {
    
         // 单个消息过期时间的设置
         MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
             @Override
             public Message postProcessMessage(Message message) throws AmqpException {
                 // 1、设置message的信息
                 message.getMessageProperties().setExpiration("5000"); // 消息过期时间为5s
                 // 2、返回该消息
                 return message;
             }
         };
    
    
         for(int i = 0; i < 10; i++) {
             rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq ttl~~~", messagePostProcessor);
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    14.6 死信队列

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    测试代码向队列发送消息:
    在三种方式下可以看到,到达指定时间、或超出队列长度、或消息被拒收都会转移到死信队列中

    // 6、测试死信队列
    @Test
    public void testDlx() {
        // 1、超出过期时间从而转入死信队列测试
    //        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq dlx~~~");
        // 2、超出队列长度从而转入死信队列测试
    //        for(int i=1 ;i<=20 ;i++) {
    //            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq dlx~~~");
    //        }
        // 3、测试消费者拒收消息从而转入死信队列测试
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME_TTL,"ttl.hehe","boot mq dlx~~~");
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    小结:

    在这里插入图片描述

    14.7 延迟队列

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    嵌入式分享合集25
    【Spring篇】AOP案例
    【notion enhancer安装】一个强大的笔记软件,可以实现侧边目录的notion
    Redis教程(二十二):Redis的过期删除和缓存淘汰策略
    三大排序(插入排序,选择排序,冒泡排序)
    感谢信 | 企企通赋能鲜丰水果搭建特色数字化供应链协同系统,领跑中国水果连锁品牌
    1.7 生成模型与判别模型
    Smartbi数据模型以自助为理念,带来“敏捷建模”的新思路
    windows 10中R的环境变量配置
    【学习笔记】倍增 + 二分
  • 原文地址:https://blog.csdn.net/henulmh/article/details/127927945