• rabbitmq跟着b站尚硅谷老师学习


    百度云文档地址:

    链接:https://pan.baidu.com/s/1gwKHzMiwUB-hw9ZFIJ9SxQ
    提取码:y07h

    gitee基本代码地址:

    https://gitee.com/liushanshan126/rabbitmq-test

    gitee整合springboot代码地址:

    https://gitee.com/liushanshan126/rabbitmq-springboot-test

    一、安装,查看文档

    1.1、启动服务命令 /sbin/service rabbitmq-server start

    /sbin/service rabbitmq-server start

    1.2、开启web管理插件rabbitmq-plugins enable rabbitmq_management

    rabbitmq-plugins enable rabbitmq_management

    在这里插入图片描述

    1.3、添加新用户

    在这里插入图片描述

    1.4、重置命令(关闭,去除,重启rabbtimq命令)

    在这里插入图片描述

    二、rabbitmq的6种设置模式和名词介绍

    2.1、rabbitmq的6种设置模式

    在这里插入图片描述

    2.2、名词介绍

    在这里插入图片描述
    在这里插入图片描述

    三、代码实现

    3.1 、work queues 工作队列

    3.1.1、发送者
    package com.bear.简单模式;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * <简述>  简单模式测试
     * <详细描述>
     *
     * @author LiuShanshan
     * @version $Id$
     */
    public class HelloWorldTest {
        private final static String QUEUE_NAME = "chijiuhua";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("120.48.77.231");
            factory.setUsername("admin");
            factory.setPassword("123");
            //channel实现了自动close接口自动关闭不需要显示关闭
            Connection connection = factory.newConnection();
            Channel channel =  connection.createChannel();
            /***生成一个队列
             * *1.队列名称*
             * 2.队列里面的消息是否 持久化 默认消息存储在内存中
             * *3.该队列是否只供一个消费者进行消费是否进行共享   false可以多个消费者消费
             * 注意:exclusive:有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,
             * 没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常。
             * *4.是否自动删除最后一个消费者端开连接以后该队列是否自动删除  true自动删除
             * *5.其他参数*/
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            String message="helloworld测试";
            /***发送一个消息
             * *1.发送到那个交换机
             * *2.路由的key是哪个
             * *3.其他的参数信息
             * *4.发送消息的消息体*/
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕");
            // 关闭连接
    //        connection.close();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    3.1.2、消费者
    package com.bear.简单模式;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**   测试多个消费者消费一个队列
     * 测试结果:多个消费者可以消费同一个队列,但是队列里面的数据只能给一个消费者,多个消费者得进行争抢
     * <简述>
     * <详细描述>
     *
     * @author LiuShanshan
     * @version $Id$
     */
    public class Consumer2 {
        private final static String QUEUE_NAME="hello3";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("120.48.77.231");
            factory.setUsername("admin");
            factory.setPassword("123");
            //channel实现了自动close接口自动关闭不需要显示关闭
            Connection connection = factory.newConnection();
            Channel channel =  connection.createChannel();
            System.out.println("等待接收消息.........");
            // 成功回调的函数式接口
            DeliverCallback deliverCallback = (consumerTag, delivery)->{
                String message = new String(delivery.getBody());
                System.out.println("获取的队列里面的参数为:" + message);
            } ;
            // 失败的函数式接口
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("失败的参数consumerTag:" + consumerTag);
            };
            /***消费者消费消息
             * *1.消费哪个队列
             * *2.消费成功之后是否要自动应答true代表自动应答false手动应答
             * *3.消费者成功消费的回调
             * 4.消费者失败的回调*/
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    3.1.3、应答模式—针对消费者(设置消费者中autoAck为false,在成功回调函数中加入channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);)

    注意:channel.basicQos(2); 代表一次可以有多个消息进来等待被消费,叫做不公平分发,这里是2。2可以换成其他的数字

    在这里插入图片描述

    3.1.4、持久化
    3.1.4.1、概念(保证队列和消息是持久化的)

    在这里插入图片描述

    3.1.4.2、队列持久化操作(boolean durable = true)

    在这里插入图片描述
    在这里插入图片描述

    3.1.4.3、消息持久化操作MessageProperties.PERSISTENT_TEXT_PLAIN

    在这里插入图片描述

    3.1.5、不公平分发(channel.basicQos(数字):代表一次可以有多个消息进来等待被消费,括号里面可以填任意数字)

    在这里插入图片描述
    在这里插入图片描述

    3.2.6、预期值 (指定队列消费多少条消息,固定的)

    3.2、发布确认(发送者发送消息到队列上去,队列如果真的收到了会告诉发送者一声收到了;应答模式是队列和消费者之间的应答,发布确认是发送者和队列之间的应答)-

    3.2.1、开启方式

    在这里插入图片描述

    3.2.2、单个发布确认(速度慢,1秒/不超过数百个请求)

    在这里插入图片描述

    3.2.3、批量发布确认

    上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

    3.2.4、异步发送(性价比最高)–调用channel中的addConfirmListener方法

    在这里插入图片描述

    
            Channel channel = RabbitMqUtil.getConnection().createChannel();
            String queueName= UUID.randomUUID().toString();
            channel.queueDeclare(queueName,false,false,false,null);
            //开启发布确认
            channel.confirmSelect();
            // 开启异步方式
            channel.addConfirmListener((deliveryTag, multiple) -> {
                System.out.println("成功处理:" + deliveryTag);
            }, (deliveryTag, multiple) -> {
                System.out.println("未成功处理的数据:" + deliveryTag);
            });
            long begin=System.currentTimeMillis();
            for(int i = 0; i< 1000 ;i ++){
                String message = i + "";
                channel.basicPublish("",queueName,null,message.getBytes());
            }
            long end=System.currentTimeMillis();
            System.out.println("发布" + 10+ "个单独确认消息,耗时" + (end-begin) +"ms"); //耗时290ms
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    3.2.4.1、处理异步未处理信息(使用ConcurrentSkipListMap线程安全的map)

    在这里插入图片描述

    package com.bear.发布确认.异步发布确认;
    
    import com.bear.utils.RabbitMqUtil;
    import com.rabbitmq.client.Channel;
    import jdk.management.resource.internal.inst.SocketOutputStreamRMHooks;
    
    import java.util.UUID;
    import java.util.concurrent.ConcurrentNavigableMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    
    /**  异步发布确认       最常用的发布确认方式
     * <简述>
     * <详细描述>
     *
     * @author LiuShanshan
     * @version $Id$
     */
    public class AsynConfirm {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtil.getConnection().createChannel();
            String queueName= UUID.randomUUID().toString();
            channel.queueDeclare(queueName,false,false,false,null);
            //开启发布确认
            channel.confirmSelect();
            // 线程安全的哈希表保存信道中的标识+传输数据
            ConcurrentSkipListMap<Long,String> outstandingConfirms=new ConcurrentSkipListMap<>();
    
            // 开启异步方式
            channel.addConfirmListener((deliveryTag, multiple) -> {
                // 批量回调和单个回调
                if(!multiple){
                    System.out.println("单个数据回调:" + deliveryTag);
                    outstandingConfirms.remove(deliveryTag);
                }else {
                    System.out.println("多个数据回调:" + deliveryTag);
                    ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
                            outstandingConfirms.headMap(deliveryTag);
                    longStringConcurrentNavigableMap.clear();
                }
                System.out.println("成功处理:" + deliveryTag);
            }, (deliveryTag, multiple) -> {
                String message = outstandingConfirms.get(deliveryTag);
                System.out.println("未成功处理的发送的数据为:" + message);
                System.out.println("未成功处理的数据:" + deliveryTag);
            });
            long begin=System.currentTimeMillis();
            for(int i = 0; i< 1000 ;i ++){
                String message = i + "";
                // 将信道中的数据的标识 和 放入的数据放入线程安全的map中
                /***channel.getNextPublishSeqNo()获取下一个消息的序列号 *通过序列号与消息体进行一个关联 *全部都是未确认的消息体 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("",queueName,null,message.getBytes());
    
            }
            long end=System.currentTimeMillis();
            System.out.println("发布" + 10+ "个单独确认消息,耗时" + (end-begin) +"ms"); //耗时290ms
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    3.3、交换机 exchange(下面包含:fannout模式;direct模式;topic模式)

    3.3.1、发布订阅模式使用到了交换机

    在这里插入图片描述

    3.3.2、exchange的类型(direct、topic、headers、fanout)

    在这里插入图片描述

    3.3.3、临时队列(一旦我们断开了消费者的连接,队列将被自动删除。)

    在这里插入图片描述

    3.3.4、绑定(有交换机参与之后,生产者将消息发送给交换机,根据routingkey分配给对应的队列)
    3.3.5、fanout(广播模式)–消费者中定义临时队列,临时队列和交换机绑定;在生产者中设置交换机名称和类型,不定义队列。

    消费者代码:

    public static void main(String[] args) throws Exception {
            Connection connection = RabbitMqUtil.getConnection();
            Channel channel = connection.createChannel();
            /***生成一个临时的队列队列的名称是随机的 *当消费者断开和该队列的连接时队列自动删除 */
            String queueName=channel.queueDeclare().getQueue();
            //把该临时队列绑定我们的exchange其中routingkey(也称之为bindingkey)为空字符串
            channel.queueBind(queueName, "fanout-test", "fanout");
            System.out.println("等待接收消息,把接收到的消息打印在屏幕...........");
            // 成功回调的函数式接口
            DeliverCallback deliverCallback = (consumerTag, delivery)->{
                System.out.println("成功消费,消费的数据为:" + new String(delivery.getBody()));
            } ;
            // 获取队列中待消费的数据
            channel.basicConsume(queueName, true, deliverCallback, (consumerTag) -> {});
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    生产者代码:

    public static void main(String[] args) throws Exception {
            Connection connection = RabbitMqUtil.getConnection();
            Channel channel = connection.createChannel();
            /***声明一个exchange*1.exchange的名称 *2.exchange的类型 */
            channel.exchangeDeclare("fanout-test", "fanout");
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while(sc.hasNext()){
                String message=sc.nextLine();
                channel.basicPublish("fanout-test", "",null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);}
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3.3.6、Direct exchange(定向交换机模式)

    在这里插入图片描述

    3.3.6.1、消费者(定义队列,定义交换机类型,定义绑定的routingkey)

    代码:

    public static void main(String[] args) throws Exception {
            Connection connection = RabbitMqUtil.getConnection();
            Channel channel = connection.createChannel();
            //交换机定义
            channel.exchangeDeclare("direct-test", BuiltinExchangeType.DIRECT);
            // 队列名称
            String queueName="test1";
            // 队列定义
            channel.queueDeclare(queueName, false, false, false, null);
            // 队列绑定交换机,设置routingkey
            channel.queueBind(queueName, "direct-test","error");
            channel.queueBind(queueName, "direct-test","warning");
    
            System.out.println("等待接收消息,把接收到的消息打印在屏幕...........");
            // 成功回调的函数式接口
            DeliverCallback deliverCallback = (consumerTag, delivery)->{
                System.out.println("成功消费,消费的数据为:" + new String(delivery.getBody()));
            } ;
            // 获取队列中待消费的数据
            channel.basicConsume(queueName, true, deliverCallback, (consumerTag) -> {});
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    3.3.6.2、生产者(定义交换机类型,发送数据的时候,写入routingkey)
     public static void main(String[] args) throws Exception {
            Connection connection = RabbitMqUtil.getConnection();
            Channel channel = connection.createChannel();
            /***声明一个exchange*1.exchange的名称 *2.exchange的类型 */
            channel.exchangeDeclare("direct-test", BuiltinExchangeType.DIRECT);
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while(sc.hasNext()){
                String message=sc.nextLine();
                channel.basicPublish("direct-test", "error",null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);}
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    3.3.7、topics(通配符模式)
    3.3.7.1、topic的要求

    在这里插入图片描述

    3.3.7.2、匹配案例

    在这里插入图片描述

    3.3.7.3、注意事项

    在这里插入图片描述

    3.3.7.4、消费者(定义队列,定义交换机类型为topic,定义绑定的routingkey,这里的routingkey可以使用*和#)--------跟之前的redict的区别是:更改了交换机的类型为:BuiltinExchangeType.TOPIC;修改了routinkey,里面可以有 * (一个单词)或者#(0或者无数单词)

    在这里插入图片描述

    3.3.7.5、生产者(定义交换机类型为topic,定义发送的routingkey,这里的routingkey不包含#和*)-------跟之前的redict的区别是:更改了交换机的类型为:BuiltinExchangeType.TOPIC

    在这里插入图片描述

    3.4、死信队列

    3.4.1、概念

    queue中的某些消息无法被消费,然后消息就会放入死信队列中

    3.5.2、哪些情况会用到死信队列

    在这里插入图片描述

    3.5.3、死信队列架构图

    在这里插入图片描述

    3.5.4、产生死信队列的3种情况
    3.5.5、消息有效期过期(消息TTL过期–消息存活时间过期)
    3.5.5.1、消费方(正常的交换机+队列;死信的交换机+队列;设置正常的队列的时候就绑定死信交换机信息)
    Connection connection = RabbitMqUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明死信和普通交换机类型为direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            //声明死信队列
            String deadQueue = "dead-queue1";
            // 队列定义
            channel.queueDeclare(deadQueue,false,false,false,null);
            //死信队列绑定死信交换机与routingkey
            channel.queueBind(deadQueue, DEAL_EXCHANGE,"lisi");
    
            // 正常队列
            //正常队列绑定死信队列信息
            Map<String,Object> params=new HashMap<>();
            //正常队列设置死信交换机参数key是固定值
            params.put("x-dead-letter-exchange",DEAL_EXCHANGE);
            //正常队列设置死信routing-key参数key是固定值
            params.put("x-dead-letter-routing-key","lisi");
    
    
            //声明正常队列
            String deadNormalQueue = "dead-normal-queue";
            // 队列定义  ---如果有死信,则设置arguments里面的值
            channel.queueDeclare(deadNormalQueue,false,false,false,params);
            //正常队列与交换机绑定routingkey
            channel.queueBind(deadNormalQueue, NORMAL_EXCHANGE,"zhangsan");
    
            System.out.println("等待接收消息,把接收到的消息打印在屏幕...........");
            // 成功回调的函数式接口
            DeliverCallback deliverCallback = (consumerTag, delivery)->{
                System.out.println("成功消费,消费的数据为:" + new String(delivery.getBody()));
            } ;
            // 获取队列中待消费的数据
            channel.basicConsume(deadNormalQueue, true, deliverCallback, (consumerTag) -> {});
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    重点信息红框标出来的图片:
    在这里插入图片描述

    3.5.5.2、生产方(跟之前的topic、redicet的代码没有区别)

    在这里插入图片描述

    3.5.5.3、死信队列消费方(跟普通的消费方没啥区别,注意的是交换机名称和队列名称都是死信的就行了)

    在这里插入图片描述

    3.5.6、队列达到最大长度(跟3.5.5消息TTL过期的区别在于,生产者里面的TTL去掉,在消费者里面的params里面设置队列的最大长度);
    3.5.6.1、消费者关键图片截图

    在这里插入图片描述

    3.5.6.2、生产者去掉ttl代码,其他的跟direct,topic的代码一样
    3.5.7、消息被拒(生产者跟之前的没有任何区别,消费者需要手动应答,并且在成功回调里面拒绝签收,并且不将数据重新入队,那么这条数据就会放入死信队列中)

    消费者重要截图:
    在这里插入图片描述

    3.5、延迟队列(是死信队列中的消息TTL过期)

    3.5.1、应用场景

    在这里插入图片描述

    3.5.2、rabbitmq中的TTL(存活时间)

    在这里插入图片描述

    3.5.3、整合springboot(队列、消息ttl和组件把交换机弄成ttl)
    3.5.3.0、pom和peroties

    pom:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.7.3</version>
    		<relativePath/> <!-- lookup parent from repository -->
    	</parent>
    	<groupId>com.bear</groupId>
    	<artifactId>springboot-rabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<name>springboot-rabbitmq</name>
    	<description>Demo project for Spring Boot</description>
    	<properties>
    		<java.version>1.8</java.version>
    	</properties>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    
    		<!--RabbitMQ依赖-->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-amqp</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.projectlombok</groupId>
    			<artifactId>lombok</artifactId>
    		</dependency>
    		<!--swagger-->
    		<dependency>
    			<groupId>io.springfox</groupId>
    			<artifactId>springfox-swagger2</artifactId>
    			<version>2.9.2</version>
    		</dependency>
    		<dependency>
    			<groupId>io.springfox</groupId>
    			<artifactId>springfox-swagger-ui</artifactId>
    			<version>2.9.2</version>
    		</dependency>
    		<!--RabbitMQ测试依赖-->
    		<dependency><groupId>org.springframework.amqp</groupId>
    			<artifactId>spring-rabbit-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>cn.hutool</groupId>
    			<artifactId>hutool-all</artifactId>
    			<version>5.7.14</version>
    		</dependency>
    
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    </project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    properties:

    spring.rabbitmq.host=120.48.77.231
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    
    • 1
    • 2
    • 3
    • 4
    3.5.3.1、定义队列,定义交换机,定义绑定关系的的配置类(重要)-----可以设置队列的ttl过期时间
    package com.bear.configure;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * <简述>  定义队列,定义交换机,定义队列和交换机的关系的配置类
     * <详细描述>
     *
     * @author LiuShanshan
     * @version $Id$
     */
    @Configuration
    public class TtlQueueConfig {
        // 正常的1个交换机和2个队列
        public static final String X_EXCHANGE = "X";
        public static final String QUEUE_A = "QA";
        public static final String QUEUE_B="QB";
        // 没有ttl的队列
        public static final String QUEUE_C="QC";
    
        // 死信交换机和队列
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
        public static final String DEAD_LETTER_QUEUE="QD";
    
        // 声明direct交换机  X
        @Bean
        public DirectExchange xExchange(){
            // 广播交换机
    //        FanoutExchange fanoutExchange = new FanoutExchange(X_EXCHANGE);
            // topic交换机
    //        TopicExchange topicExchange = new TopicExchange(X_EXCHANGE);
    
            return new DirectExchange(X_EXCHANGE);
        }
    
        // 声明direct交换机   Y
        @Bean
        public DirectExchange yExchange(){
            return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
        }
    
        // 声明队列 QA ,ttl时间为10s
        @Bean
        public Queue queueA(){
            Map<String,Object> args=new HashMap<>(3);
            //声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key","YD");
            //声明队列的TTL
            args.put("x-message-ttl",10000);
            return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
        }
        // 声明队列A绑定X交换机
        @Bean
        public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange")DirectExchange xExchange){
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
        }
    
        // 声明队列 QB , ttl时间为40s
        @Bean
        public Queue queueB(){
            Map<String,Object> args=new HashMap<>(3);
            //声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key","YD");
            //声明队列的TTL
            args.put("x-message-ttl",40000);
            return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
        }
    
        // 声明队列B绑定X交换机
        @Bean
        public Binding queueaBindingX2(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange")DirectExchange xExchange){
            return BindingBuilder.bind(queueB).to(xExchange).with("XB");
        }
    
        // 声明队列 QC , 没有ttl
        @Bean
        public Queue queueC(){
            Map<String,Object> args=new HashMap<>();
            //声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
            //声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key","YD");
            //不声明队列的TTL
            return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
        }
    
        // 声明队列C绑定X交换机
        @Bean
        public Binding queueaBindingX3(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange){
            return BindingBuilder.bind(queueC).to(xExchange).with("XC");
        }
    
    
        //声明死信队列QD
        @Bean("queueD")
        public Queue queueD(){
            return new Queue(DEAD_LETTER_QUEUE);
        }
    
    
        //声明死信队列QD绑定关系
        @Bean
        public Binding deadLetterBindingQAD(@Qualifier("queueD")Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    3.5.3.2、消费者监听类
    package com.bear.configure;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.Date;
    
    /**
     * <简述> 死信队列接收消息的地方
     * <详细描述>
     *
     * @author LiuShanshan
     * @version $Id$
     */
    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {
    
        @RabbitListener(queues="QD")
        public void receiveD(Message message, Channel channel) throws IOException {
            String msg=new String(message.getBody());
            log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    3.5.3.3、生产者发送数据的类(可以在这里定义消息的ttl时间,但是有个问题:因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。)
    package com.bear.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    /**
     * <简述> 生产者生产信息
     * <详细描述>
     *
     * @author LiuShanshan
     * @version $Id$
     */
    @Slf4j
    @RequestMapping("ttl")
    @RestController
    public class TtlController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("sendMsg/{message}")
        public void sendMsg(@PathVariable String message){
            log.info("当前时间:{},发送一条信息给两个 TTL队列:{}", new Date(),message);
            rabbitTemplate.convertAndSend("X","XA","消息来自 ttl为 10S的队列: "+message);
            rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl为 40S的队列: "+message);
        }
    
        @GetMapping("sendMsgTtl/{message}/{ttlTime}")
        public void sendMsgTtl(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime){
            log.info("当前时间:{},发送一条设置了ttl的信息给队列:{}", new Date(),message);
            rabbitTemplate.convertAndSend("X","XC","消息来自 ttl为" + ttlTime +"S的队列: "+message, correlationData ->{
                // 设置信息的过期时间
                correlationData.getMessageProperties().setExpiration(ttlTime);
                return  correlationData;});
            log.info("当前时间:{},发送一条时长{}毫秒 TTL信息给队列 C:{}", new Date(),ttlTime, message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    3.5.3.4、生产者那里设置消息ttl时间的注意事项:

    因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
    在这里插入图片描述

    3.5.3.4、*** 组件将交换机弄成ttl,解决队列和消息ttl弊端的问题

    代码跟上面的没有啥不同,改的有,修改交换机的类型:
    在这里插入图片描述
    在发送消息的时候修改:
    在这里插入图片描述

    其他的都一样

    3.6、发布确认-高级(处理交换机或者队列挂掉,消息无法发送出去的情况)

    3.6.1、确认交换机是否收到消息回调:当消息发送给交换机之后,交换机不管有没有,收没收到消息都会调用回调方法
    3.6.1.1、定义交换机、队列、绑定交换机,队列代码(普通的交换机和队列)

    在这里插入图片描述

    3.6.1.2、定义回调类(有三步,需要按照步骤走,如下图:)

    在这里插入图片描述

    3.6.1.3、生产者(使用对象CorrelationData,放入发送的对象里面)

    在这里插入图片描述

    3.6.1.4、消费者(跟之前的消费者代码没啥区别)

    在这里插入图片描述

    3.6.2、确认队列是否收到消息回调(回退消息):
    3.6.2.1、回调类

    增加了:
    在这里插入图片描述
    在这里插入图片描述

    3.6.2.2、结果

    在这里插入图片描述

    3.6.2.1、总结

    其他的跟上面的代码一样,这样就可以处理交换机是否收到消息和队列是否收到消息,双重处理.

    3.6.3、备份交换机(原交换机无法被路由的消息,就会都进入这个备份交换机,备份交换机(广播模式)分发给队列)
    3.6.4、重要代码(配置类)

    声明正常队列时绑定备份交换机:
    在这里插入图片描述
    声明备份交换机为fanout模式,然后绑定2个交换机:
    在这里插入图片描述
    其他的跟上面的3.6.2没有区别

    3.6.3.2、结果分析

    在这里插入图片描述

    四、rabbitmq其他知识点

    4.1、幂等性(重复消费的问题)

    使用redis中的原子性来处理
    在这里插入图片描述

    4.2、优先级队列(需要队列是优先级的,然后设置发送的时候消息的优先级,越大优先级越大)

    在这里插入图片描述
    在这里插入图片描述

    4.3、惰性队列

    使用场景:
    在这里插入图片描述
    如何使用:
    在这里插入图片描述

    五、搭建集群(后期要用的时候搭建)

  • 相关阅读:
    for in 和 for of 区别
    OpenCascade插件化三维算法研究平台
    初识Rasp——Openrasp代码分析
    java版工程管理系统Spring Cloud+Spring Boot+Mybatis实现工程管理系统源码
    五,修改.hdr插件
    JS高级 之 深拷贝 && 浅拷贝
    Hello World!1分钟配置好你的Go环境
    【C++】vector的模拟实现
    新一代开源免费的轻量级 SSH 终端,不要太好用
    51 单片机 data idata xdata 使用
  • 原文地址:https://blog.csdn.net/M1275601161/article/details/126467658