• RabbitMQ从入门到精通之安装、通讯方式详解



    RabbitMQ

    一、RabbitMQ介绍

    1.1 现存问题

      • 服务异步调用: 服务A如何保证异步请求一定能被服务B接收到并处理
        在这里插入图片描述
      • 削峰: 海量请求,如何实现削峰的效果,将请求全部放到一个队列中,慢慢的消费,这个队列怎么实现?

    在这里插入图片描述

      • 服务解耦: 如何尽量的降低服务之间的耦合问题,如果在订单与积分和商家服务解构,需要一个队列,而这个队列依然需要实现上述两个情况功能。

    在这里插入图片描述

    一、RabbitMQ介绍

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    AMQP协议:
    在这里插入图片描述

    Erlang:

    Erlang在1991年由爱立信公司向用户推出了第一个版本,经过不断的改进完善和发展,在1996年爱立信又为所有的Erlang用户提供了一个非常实用且稳定的OTP软件库并在1998年发布了第一个开源版本。Erlang同时支持的操作系统有linux,windows,unix等,可以说适用于主流的操作系统上,尤其是它支持多核的特性非常适合多核CPU,而分布式特性也可以很好融合各种分布式集群。

    二、RabbitMQ安装

    docker-compose.yml

    version: “3.1”
    services:
    rabbitmq:
    image: daocloud.io/library/rabbitmq:3.8.5
    container_name: rabbitmq
    restart: always
    volumes:
    - ./data/:/var/lib/rabbitmq/
    ports:
    - 5672:5672
    - 15672:15672

    在这里插入图片描述
    在这里插入图片描述

    docker-compose.yml文件内容:
    在这里插入图片描述

    镜像拉取完成后,直接在linux 内部执行: curl localhost:5672

    在这里插入图片描述
    执行后能够显示AMQP 字样的内容就说明执行成功了

    执行 docker exec -it rabbitmq bash 命令 进入容器内部
    cd opt/rabbitmq/ 目录下
    在这里插入图片描述
    执行cd plugins/sbin 命令进入目录下, 执行 ./rabbitmq-plugins enable rabbitmq_managent命令 启动rabbitmq 图形化界面
    在这里插入图片描述
    访问15672 端口,默认的账号密码是guest/guest
    在这里插入图片描述

    三、RabbitMQ架构

    在这里插入图片描述

    四、RabbitMQ通信方式

    4.1 RabbitMQ提供的通讯方式

    • Hello World :为了入门操作
    • Work queues : 一个队列被多个消费者消费
    • Publish/Subscribe:手动创建Exchange(FANOUT)
    • Routing: 手动创建Exchange(DIRECT)
    • Topics : 手动创建Exchange(TOPIC)
    • RPC: RPC方式
    • Publisher Confirms

    4.2 Helloworld 方式

    在这里插入图片描述

    //工具类
    public class RabbitMQConnectionUtil {
        public static final String RABBITMQ_HOST ="172.16.177.133";
        public static final int RABBITMQ_POST =5672;
        public static final String RABBITMQ_USERNAME ="guest";
        public static final String RABBITMQ_PASSWORD ="guest";
        public static final String RABBITMQ_VIRTUAL_HOST ="/";
    
        /**
         * 构建RabbitMQ的连接对象
         * @return
         */
        public static Connection getConnection() throws Exception{
            //1.创建connection    工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(RABBITMQ_HOST);
            connectionFactory.setPort(RABBITMQ_POST);
            connectionFactory.setUsername(RABBITMQ_USERNAME);
            connectionFactory.setPassword(RABBITMQ_PASSWORD);
            connectionFactory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
            //2.设置RabbitMQ的连接信息
            Connection connection = connectionFactory.newConnection();
            //3. 返回连接对象
    
            return connection;
        }
    //生产者
     @Test
        public void consume() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
            //4. 监听消息
            DefaultConsumer callback =new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者获取消息"+new String(body,"UTF-8"));
                }
            };
            channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
    
            System.out.println("开始监听");
            System.in.read();
    
        }
    
    //消费者
        @Test
        public void publish() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //4. 发布消息
            String message="hello word!";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送成功");
            System.in.read();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    4.2Work queues

    在这里插入图片描述
    一个队列中的消息,只会被一个消费者成功的消费,默认情况下,RabbitMQ的队列会将消息以轮询的方式交给不同的消费者消费,消费者拿到消息后,需要给RabbitMQ一个ack,RabbitMQ认为消费者已经拿到消息了

    //消费者
     @Test
        public void consume() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
            //3.1 设置消息的控制,一次拿几个消息
    //#2        channel.basicQos(1);
            //4. 监听消息
            DefaultConsumer callback =new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                    	//模拟业务执行时间
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消息者获取消息"+new String(body,"UTF-8"));
    //#1				channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
    //#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
            System.out.println("开始监听");
            System.in.read();
     //消费者2
     @Test
        public void consume2() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);
            //3.1 设置消息的控制,一次拿几个消息
    //#2        channel.basicQos(1);
            //4. 监听消息 
            DefaultConsumer callback =new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                    	//模拟业务执行时间
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消息者获取消息"+new String(body,"UTF-8"));
                    //basicAck(标识,是否批量操作)
    //#1				channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(Publisher.QUEUE_NAME,true,callback);
    //#1		channel.basicConsume(Publisher.QUEUE_NAME,false,callback);
    
            System.out.println("开始监听");
            System.in.read();
    
        }
    //生产者
     public static final String QUEUE_NAME="work";
        @Test
        public void publish() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //4. 发布消息
            for (int i=0;i<10;i++){
                String message="hello word!"+i;
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
             }
            System.out.println("消息发送成功");
            System.in.read();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    当两台消费者的消费能力不相同的时候,为了提高效率,就不能以轮询的方式进行分发,而是以消费者消费完成后手动传递ack 的方式进行下一个消息的分发,将== #1 #2 的代码 ==打开即可
    操作步骤:

    • 操作#1 让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
    • 操作#2 设置每次拿几个消息

    4.3 Publish/Subscribe

    自行创建路由器,并绑定队列
    在这里插入图片描述
    如何构建一个自定义交换机,并指定类型是FANOUT,让交换机和多个Queue绑定到一起

    //生产者 
    public static final String EXCHANGE_NAME="pubsub";
        public static final String QUEUE_NAME1="pubsub-one";
        public static final String QUEUE_NAME2="pubsub-two";
        @Test
        public void pulish() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2. 构建Channal
            Channel channel = connection.createChannel();
            //3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //4. 构建队列
            channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
            channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
            //5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,
            channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
            channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");
            //6.发消息到交换机
            channel.basicPublish(EXCHANGE_NAME,"",null,"publish/subscribe!".getBytes());
            System.out.println("消息成功发送");
            
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4.4 Routing

    DIRECT 类型的交换机,在绑定Exchange和Queue时,需要指定好routingKey同时在发送消息的时候,也指定routingkey,只有在routingkey 一致时,才会把指定的消息路由到指定的队列
    在这里插入图片描述

    public static final String EXCHANGE_NAME="routing";
        public static final String QUEUE_NAME1="routing-one";
        public static final String QUEUE_NAME2="routing-two";
        @Test
        public void pulish() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2. 构建Channal
            Channel channel = connection.createChannel();
            //3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.DIRECT
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //4. 构建队列
            channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
            channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
            //5. 绑定 交换机 和 队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定 ,routingkey 参数随便写什么都可以,
            channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
            channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
            channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");
            //6.发消息到交换机
            channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());
            channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林!".getBytes());
            channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());
    
    
            System.out.println("消息成功发送");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    4.5 Topics

    topics 模式支持模糊匹配RoutingKey,就像是sql中的 like子句模糊查询,而路由模式等同于sql中的where子句等值查询

    在这里插入图片描述

    通过模糊路由到队列。该方式的Routing key必须具有固定格式:以 . 间隔的一串单词,比如:quick.orange.rabbit,Routing key 最多不能超过255byte。

    交换机和队列的Binding key用通配符来表示,有两种语法:

    • * 可以替代一个单词;
    • # 可以替代 0 或多个单词;

    例如 #.com.#
    #可以表示0级或多级。xx.com、com.xx、com、xx.com.xx.xx、xx.xx.com.xx都可以

    例如 *.com. *
    *表示一级,xx.com.xx 可以 ,com.xx 不可以,前面缺少一级,xx.com.xx.xx不可以,com后面只能有一级xx,最终格式必须是 xx.com.xx

     public static final String EXCHANGE_NAME="topics";
        public static final String QUEUE_NAME1="topics-one";
        public static final String QUEUE_NAME2="topics-two";
        @Test
        public void pulish() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2. 构建Channal
            Channel channel = connection.createChannel();
            //3.构建虚拟机, exchange 交换机名称,type 交换机类型【枚举】 * 交换机类型 改成 BuiltinExchangeType.TOPIC
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //4. 构建队列
            channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
            channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
            //5. 绑定 交换机 和 队列,
            // Topic类型的交换机,在和队列绑定时,需要以aaa.bbb.ccc 方式编写routingKey
            // 其中有两个特殊字符: *(相当于占位符),# (相当通配符)
            channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
            channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
            channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");
            //6.发消息到交换机
            channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙子兔子!".getBytes());//匹配1、2
            channel.basicPublish(EXCHANGE_NAME,"small.while.rabbit",null,"小兔子!".getBytes());//匹配1、2
            channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog",null,"懒狗狗狗狗!".getBytes());//匹配 3
    
    
            System.out.println("消息成功发送");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    4.6 RPC (了解)

    因为两个服务在交互时,可以尽量做到Client和server的结偶,通过RabbitMQ进行结藕操作
    需要让client 发送消息时,携带两个属性,

    • replyto告知server将相应信息放到哪个队列
    • correlationId告知server 发送相应消息时,需要携带位置标识来告知client响应的消息

    在这里插入图片描述

    public class Publisher {
        public static final String QUEUE_PUBLISHER = "rpc_publisher";
        public static final String QUEUE_CONSUMER = "rpc_consumer";
    
        @Test
        public void publish() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
            channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
            //4. 发布消息
            String message="hello rpc!";
            String uuid = UUID.randomUUID().toString();
            AMQP.BasicProperties prop =new AMQP.BasicProperties()
                .builder().replyTo(QUEUE_CONSUMER).correlationId(uuid).build();
            channel.basicPublish("",QUEUE_PUBLISHER,prop,message.getBytes());
    
            channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String id = properties.getCorrelationId();
                    if(id!=null && id.equals(uuid)){
                        System.out.println("接收到服务端的响应:"+new String(body));
                    }
                channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
    
            System.out.println("消息发送成功");
            System.in.read();
        }
    }
    
    public class Consumer {
        public static final String QUEUE_PUBLISHER = "rpc_publisher";
        public static final String QUEUE_CONSUMER = "rpc_consumer";
        @Test
        public void consume() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
            channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
            //4. 监听消息
            DefaultConsumer callback =new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者获取消息"+new String(body,"UTF-8"));
                    String resp = "获取到client发出的请求,这里是响应的信息";
                    String respQueueName = properties.getReplyTo();
                    String uuid = properties.getCorrelationId();
                    AMQP.BasicProperties prop =new AMQP.BasicProperties()
                    .builder().correlationId(uuid).build();
                    channel.basicPublish("",respQueueName,prop,resp.getBytes());
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE_PUBLISHER,false,callback);
    
            System.out.println("开始监听");
            System.in.read();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    五、Springboot 操作RabbitMQ


    • 创建项目
    • 导入依赖
    	>
            >org.springframework.boot>
            >spring-boot-starter-amqp>
        >
    
    • 1
    • 2
    • 3
    • 4
    • 配置rabbitmq信息
    spring:
      rabbitmq:
        host: 172.16.177.133
        password: guest
        username: guest
        port: 5672
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    配置类声明队列

    @Configuration
    public class RabbitMQConfig {
        public static final String EXCHANGE="boot-exchange";
        public static final String QUEUE="boot-queue";
        public static final String ROUTING_KEY="*.black.*";
    
        @Bean
        public Exchange bootExchange(){
            return ExchangeBuilder.topicExchange(EXCHANGE).build();
        }
    
        @Bean
        public Queue bootQueue(){
            return QueueBuilder.durable(QUEUE).build();
        }
    
        @Bean
        public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
            return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    生产者配置

    @SpringBootTest
    class SpringbootRabbitmqApplicationTests {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        void contextLoads() {
        }
    
        @Test
        public void publisher(){
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
            System.out.println("消息发送成功");
        }
    
        @Test
        public void publiWithProps(){
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setCorrelationId("123");
                    return message;
                }
            });
            System.out.println("消息发送成功2");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    消费者配置

    @Component
    public class ConsumeListener {
    
        /**
         *
         * @param msg
         * @param channel 前提是配置好spring.rabbitmq.listener.simple.acknowledge-mode: manual #开启手动ack
         * @param message
         * @throws IOException
         */
        @RabbitListener(queues = RabbitMQConfig.QUEUE)
        public void consume(String msg, Channel channel, Message message) throws IOException {
            System.out.println("队列消息为:"+msg);
            String correlationId = message.getMessageProperties().getCorrelationId();
            System.out.println("标识为:"+correlationId);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 声明交换机&队列

    六、RabbitMQ保证消息可靠性

    confirm机制
    可以通过confirm效果保证消息一定送达到Exchange,官方提供了三种,选择了对于效率影响最低的异步回调的效果

    6.1、 保证消息一定送达到Exchange

    使用confirm机制

     public static final String QUEUE_NAME="confirms ";
        @Test
        public void publish() throws Exception {
            //1.获取连接对象
            Connection connection = RabbitMQConnectionUtil.getConnection();
            //2.创建Channel 通道
            Channel channel = connection.createChannel();
            //3.构建队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //3.1 开启confirms 的异步回调
            channel.confirmSelect();
            String message="hello word!";
            //3.2 设置confirms的异步回调
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息成功发送到Exchange");
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作");
                }
            });
            //3.3 设置return回调,确认消息是否路由到了队列
           channel.addReturnListener(new ReturnListener() {
               @Override
               public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   System.out.println("消息没有到指定队列时,做其他的补偿措施!!");
               }
           });
            //3.4、设置消息持久化
            AMQP.BasicProperties prop =new AMQP.BasicProperties().builder()
                    .deliveryMode(2)//消息持久化
                   .build();
    
            //4. 发布消息
            channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());
            System.out.println("消息发送成功");
            System.in.read();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    6.2、保证消息可以路由到Queue中

    使用return 机制
    为了保证Exchange上的消息一定可以送达到Queue

    //6.2设置return 回调,确认消息是否路由到了Queue
    channel.addReturnListener(new ReturnListener() {
               @Override
               public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   System.out.println("消息没有到指定队列时,做其他的补偿措施!!");
               }
           });
    //7.在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return	 回调
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    6.3、保证队列持久化消息

    DeliveryMode设置消息持久化

    //6.3、设置消息持久化
            AMQP.BasicProperties prop =new AMQP.BasicProperties().builder()
                    .deliveryMode(2)//消息持久化
                   .build();
     //4. 发布消息
            channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    6.4、保证消息者可以正常消费消息

    详情看WorkQueue模式

    6.5 SpringBoot实现上述操作

    6.5.1 Confirm
    • 编写配置文件开启Confirm机制

    spring:
    rabbitmq:
    publisher-confirm-type: correlated # 新版本 开启confirm机制
    publisher-confirms: true # 老版本

    • 在发送消息时,配置RabbitTemplate
    @Test
        public void publishWithConfirms() throws IOException {
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, String s) {
                    if(b){
                        System.out.println("消息已经送达到交换机");
                    }else{
                        System.out.println("消息没有送到到Exchange,需要做一些补偿操作!");
                    }
                }
            });
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
            System.out.println("消息发送成功");
    
            System.in.read();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    6.5.2 Return

    • 编写配置文件开启Return机制

    spring:
    rabbitmq:
    publisher-returns: true # 开启return机制

    • 在发送消息时,配置RabbitTemplate
    @Test
        public void publishWithReturn() throws IOException {
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                    String msg = new String(message.getBody());
                    System.out.println("消息失败:"+msg+"路由队列失败!!做补救操作");
                }
            });
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
            System.out.println("消息发送成功");
    
            System.in.read();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    6.5.3 消息持久化

    //3.4、设置消息持久化
    AMQP.BasicProperties prop =new AMQP.BasicProperties().builder()
                    .deliveryMode(2)//消息持久化 #1
                   .build();
            //4. 发布消息,  将参数mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
            channel.basicPublish("",QUEUE_NAME,true,prop,message.getBytes());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    七、RabbitMQ死信队列 & 延迟交换机

    ###7.1、 消息被消费者拒绝,requeue设置为false
    ###7.2.1、发送消息时设置消息的生存时间,如果生存时间到了,还没有被消费。
    ###7.2.2、 也可以指定某个队列中所有消息的生存时间,如果生存时间到了,还没有被消费
    ###7.3、队列已经达到消息的最长长度后,再路由过来的消息直接变成死信
    在这里插入图片描述

    7.4、准备Exchange&Queue

    @Configuration
    public class DeadLetterConfig {
        public static final String NORMAL_EXCHANGE="normal-exchange";
        public static final String NORMAL_QUEUE="normal-queue";
        public static final String NORMAL_ROUTING_KEY="normal.#";
    
    
        public static final String DEAD_EXCHANGE="dead-exchange";
        public static final String DEAD_QUEUE="dead-queue";
        public static final String DEAD_ROUTING_KEY="dead.#";
        /**普通交换机*/
        @Bean
        public Exchange normalExchange(){
            return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
        }
        /**普通队列*/
        @Bean
        public Queue normalQueue(){
            return QueueBuilder.durable(NORMAL_QUEUE)
                    .deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列
                    .build();
        }
        /**普通队列绑定路由*/
        @Bean
        public Binding normalBingding(Queue normalQueue,Exchange normalExchange){
            return BindingBuilder.bind(normalQueue).to(normalExchange).with(DEAD_ROUTING_KEY).noargs();
        }
    
        /**死信交换机*/
        @Bean
        public Exchange deadExchange(){
            return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
        }
    
        /**死信队列*/
        @Bean
        public Queue deadQueue(){
            return QueueBuilder.durable(DEAD_QUEUE ).build();
        }
        /**绑定死信队列和交换机*/
        @Bean
        public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
            return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    7.5、实现效果

    • 基于消费者进行reject 或者 nack 实现死信效果
    • 实现延迟消费的效果,比如:下订单时,有15分钟的付款时间
    @Component
    public class DeadListener {
        /**
         *
         * @param msg
         * @param channel 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual
         * @param message 需要手动启动ACK 才能有效 spring.rabbitmq.listener.simple.acknowledge-mode: manual
         */
        @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
        public void comsume(String msg, Channel channel, Message message) throws IOException {
            System.out.println("接收到normal队列的消息:"+msg);
    //        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 消息的生存时间

      • 给消息设置生存时间
       @Test
      public void publishExpire(){
          String msg ="dead letter expire";
          rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                  message.getMessageProperties().setExpiration("5000");//五秒
                  return message;
              }
          });
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 给队列设置生存时间
       /**普通队列*/
      @Bean
      public Queue normalQueue(){
          return QueueBuilder.durable(NORMAL_QUEUE)
                  .deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列
                  .deadLetterRoutingKey("dead.abc") //从新修改routingkey 信息,不设置的话,普通队列的消息会消失,但是死信队列中却没有出现
                  .ttl(10000) //队列生存时间
                  .build();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 设置Queue中的消息最大长度
          /**普通队列*/
          @Bean
          public Queue normalQueue(){
              return QueueBuilder.durable(NORMAL_QUEUE)
                      .deadLetterExchange(DEAD_EXCHANGE)//绑定死信队列
                      .deadLetterRoutingKey("dead.abc") //从新修改routingkey 信息,不设置的话,普通队列的消息会消失,但是死信队列中却没有出现
      //                .ttl(10000) //队列生存时间
                      .maxLength(1)   //队列最大长度
                      .build();
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10

      只要队列中有一个消息,如果再次发送一个消息,这个消息就会变成死信

    7.6、延迟交换机

    死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

    • 构建延迟交换机
    /**
     * @ClassName:DelayedConfig
     * @Description:延迟队列, 注意: 消息是在交换机中延迟,时间到了后才会放到队列中,
     *                              此时如果消息在交换机中延迟过程中,rabbitmq重启则会丢失消息
     * @Author:
     * @Date:9/7/23 10:05 上午
     * @Versiion:1.0
     */
    @Configuration
    public class DelayedConfig {
        public static final String DELAYED_EXCHANGE="delayed-exchange";
        public static final String DELAYED_QUEUE="delayed-queue";
        public static final String DELAYED_ROUTING_KEY="delayed.#";
    
        @Bean
        public Exchange delayedExchange(){
            Map<String,Object> arguments =new HashMap<String,Object>();
            arguments.put("x-delayed-type","topic");
            Exchange exchange =new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true
            ,false,arguments);
            return exchange;
        }
    
        @Bean
        public Queue delayedQueue(){
            return QueueBuilder.durable(DELAYED_QUEUE).build();
        }
        @Bean
        public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
            return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 发送消息
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        public void publish(){
            rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.adb", "delayedxxx",new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setDelay(3000);
                    return message;
                }
            });
            System.out.println("发送成功");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    UVA 294 约数 Divisors
    从零开发一款ChatGPT VSCode插件
    如何实现超大场景三维模型数据坐标转换
    EIP-3664合约研究笔记06--text功能分析
    【备忘】ChromeDriver 官方下载地址 Selenium,pyppetter依赖
    Android Studio实现一个新闻APP
    idea整合Tomcat进行Javaweb工程
    Allegro如何制作routekeepin操作指导
    【计算机网络】集线器
    仙人指路,引而不发,Go lang1.18入门精炼教程,由白丁入鸿儒,Golang中New和Make函数的使用背景和区别EP16
  • 原文地址:https://blog.csdn.net/qq_39940489/article/details/132157578