• Rabbit消息的可靠性


    生产者重连

     消费者重试

     

     

    Confirm模式简介

    消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;

      1. 具体代码设置

    yml 

    1. server:
    2. port: 8080
    3. spring:
    4. application:
    5. name: confirm-learn1
    6. rabbitmq:
    7. host: 192.168.126.130
    8. port: 5672
    9. username: admin
    10. password: 123456
    11. virtual-host: powernode
    12. publisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式
    13. my:
    14. exchangeName: exchange.confirm.1
    15. queueName: queue.confirm.1

    配置类

    1. package com.powernode.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.beans.factory.annotation.Value;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. @Configuration
    7. public class RabbitConfig {
    8. @Value("${my.exchangeName}")
    9. private String exchangeName;
    10. @Value("${my.queueName}")
    11. private String queueName;
    12. @Bean
    13. public DirectExchange directExchange(){
    14. return ExchangeBuilder.directExchange(exchangeName).build();
    15. }
    16. @Bean
    17. public Queue queue(){
    18. return QueueBuilder.durable(queueName).build();
    19. }
    20. @Bean
    21. public Binding binding(DirectExchange directExchange,Queue queue){
    22. return BindingBuilder.bind(queue).to(directExchange).with("info");
    23. }
    24. }

    写法一

     配置回调类

    1. package com.powernode.config;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.connection.CorrelationData;
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    5. import org.springframework.stereotype.Component;
    6. @Component
    7. @Slf4j
    8. public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    9. @Override
    10. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    11. log.info("关联id为:{}",correlationData.getId()+"");
    12. if (ack){
    13. log.info("消息正确的达到交换机");
    14. return;
    15. }
    16. //ack =false 没有到达交换机
    17. log.error("消息没有到达交换机,原因为:{}",cause);
    18. }
    19. }

     发送消息类

    1. package com.powernode.service;
    2. import com.powernode.config.MyConfirmCallBack;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.amqp.core.Message;
    5. import org.springframework.amqp.core.MessageBuilder;
    6. import org.springframework.amqp.rabbit.connection.CorrelationData;
    7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    8. import org.springframework.stereotype.Service;
    9. import javax.annotation.PostConstruct;
    10. import javax.annotation.Resource;
    11. import java.util.Date;
    12. @Service
    13. @Slf4j
    14. public class MessageService {
    15. @Resource
    16. private RabbitTemplate rabbitTemplate;
    17. @Resource
    18. private MyConfirmCallBack confirmCallBack;
    19. @PostConstruct //构造方法后执行它,相当于初始化作用
    20. public void init(){
    21. rabbitTemplate.setConfirmCallback(confirmCallBack);
    22. }
    23. public void sendMsg(){
    24. Message message= MessageBuilder.withBody("hello world".getBytes()).build();
    25. CorrelationData correlationData=new CorrelationData(); //关联数据
    26. correlationData.setId("order_123456"); //发送订单信息
    27. rabbitTemplate.convertAndSend("exchange.confirm.1","info",message,correlationData);
    28. log.info("消息发送完毕,发送时间为:{}",new Date());
    29. }
    30. }

     启动类

    1. package com.powernode;
    2. import com.powernode.service.MessageService;
    3. import org.springframework.boot.ApplicationArguments;
    4. import org.springframework.boot.ApplicationRunner;
    5. import org.springframework.boot.SpringApplication;
    6. import org.springframework.boot.autoconfigure.SpringBootApplication;
    7. import javax.annotation.Resource;
    8. @SpringBootApplication
    9. public class Application implements ApplicationRunner {
    10. @Resource
    11. private MessageService messageService;
    12. public static void main(String[] args) {
    13. SpringApplication.run(Application.class, args);
    14. }
    15. @Override
    16. public void run(ApplicationArguments args) throws Exception {
    17. messageService.sendMsg();
    18. }
    19. }

     方法二

    利用lambda 可以省掉配置回调类

    1. package com.powernode.service;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageBuilder;
    5. import org.springframework.amqp.rabbit.connection.CorrelationData;
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    7. import org.springframework.stereotype.Service;
    8. import javax.annotation.PostConstruct;
    9. import javax.annotation.Resource;
    10. import java.util.Date;
    11. @Service
    12. @Slf4j
    13. public class MessageService{
    14. @Resource
    15. private RabbitTemplate rabbitTemplate;
    16. @PostConstruct //构造方法后执行它,相当于初始化作用
    17. public void init(){
    18. rabbitTemplate.setConfirmCallback(
    19. //lambda 表达式
    20. (correlationData, ack, cause)->{
    21. log.info("关联id为:{}",correlationData.getId()+"");
    22. if (ack){
    23. log.info("消息正确的达到交换机");
    24. return;
    25. }
    26. //ack =false 没有到达交换机
    27. log.error("消息没有到达交换机,原因为:{}",cause);
    28. }
    29. );
    30. }
    31. public void sendMsg(){
    32. Message message= MessageBuilder.withBody("hello world".getBytes()).build();
    33. CorrelationData correlationData=new CorrelationData(); //关联数据
    34. correlationData.setId("order_123456"); //发送订单信息
    35. rabbitTemplate.convertAndSend("exchange.confirm.4dddd","info",message,correlationData);
    36. log.info("消息发送完毕,发送时间为:{}",new Date());
    37. }
    38. }

    1. RabbitMQ消息Return模式

    1. 消息可靠性投递

    rabbitmq 整个消息投递的路径为:

    producer —> exchange —> queue —> consumer

    >> 消息从 producer 到 exchange 则会返回一个 confirmCallback;

    >> 消息从 exchange –> queue 投递失败则会返回一个 returnCallback;

    我们可以利用这两个callback控制消息的可靠性投递;

    开启 确认模式;

    使用rabbitTemplate.setConfirmCallback设置回调函数,当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理;

    注意配置文件中,开启 退回模式;

    spring.rabbitmq.publisher-returns: true

    使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到

    queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;

    yml

    1. server:
    2. port: 8080
    3. spring:
    4. application:
    5. name: ttl-learn1
    6. rabbitmq:
    7. host: 192.168.126.130
    8. port: 5672
    9. username: admin
    10. password: 123456
    11. virtual-host: powernode
    12. publisher-returns: true #开启return模式
    13. my:
    14. exchangeName: exchange.return.1
    15. queueName: queue.return.1

    配置类

    1. package com.powernode.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.beans.factory.annotation.Value;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. @Configuration
    7. public class RabbitConfig {
    8. @Value("${my.exchangeName}")
    9. private String exchangeName;
    10. @Value("${my.queueName}")
    11. private String queueName;
    12. @Bean
    13. public DirectExchange directExchange(){
    14. return ExchangeBuilder.directExchange(exchangeName).build();
    15. }
    16. @Bean
    17. public Queue queue(){
    18. return QueueBuilder.durable(queueName).build();
    19. }
    20. @Bean
    21. public Binding binding(DirectExchange directExchange,Queue queue){
    22. return BindingBuilder.bind(queue).to(directExchange).with("info");
    23. }
    24. }

    方式一 

    发送消息类

    1. package com.powernode.service;
    2. import com.powernode.config.MyReturnCallBack;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.amqp.core.Message;
    5. import org.springframework.amqp.core.MessageBuilder;
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    7. import org.springframework.stereotype.Service;
    8. import javax.annotation.PostConstruct;
    9. import javax.annotation.Resource;
    10. import java.util.Date;
    11. @Service
    12. @Slf4j
    13. public class MessageService {
    14. @Resource
    15. private RabbitTemplate rabbitTemplate;
    16. @Resource
    17. private MyReturnCallBack myReturnCallBack;
    18. @PostConstruct
    19. public void init(){
    20. rabbitTemplate.setReturnsCallback(myReturnCallBack); //设置回调
    21. }
    22. public void sendMsg(){
    23. Message message= MessageBuilder.withBody("hello world".getBytes())
    24. .build();
    25. rabbitTemplate.convertAndSend("exchange.return.1","info1111",message);
    26. log.info("消息发送完毕,发送时间为:{}",new Date());
    27. }
    28. }

          回调配置类

    1. package com.powernode.config;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.ReturnedMessage;
    4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * 当消息从交换机 没有正确地 到达队列,则会触发该方法
    8. * 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法
    9. *
    10. * @param returned
    11. */
    12. @Component
    13. @Slf4j
    14. public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
    15. @Override
    16. public void returnedMessage(ReturnedMessage returnedMessage) {
    17. log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}",returnedMessage.getReplyText());
    18. }
    19. }

    方式二 lambda

    1. package com.powernode.service;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageBuilder;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.stereotype.Service;
    7. import javax.annotation.PostConstruct;
    8. import javax.annotation.Resource;
    9. import java.util.Date;
    10. @Service
    11. @Slf4j
    12. public class MessageService{
    13. @Resource
    14. private RabbitTemplate rabbitTemplate;
    15. @PostConstruct
    16. public void init(){
    17. rabbitTemplate.setReturnsCallback(
    18. //使用lambda表达式
    19. message->{
    20. log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}",message.getReplyText());
    21. }
    22. ); //设置回调
    23. }
    24. public void sendMsg(){
    25. Message message= MessageBuilder.withBody("hello world".getBytes())
    26. .build();
    27. rabbitTemplate.convertAndSend("exchange.return.4","info",message);
    28. log.info("消息发送完毕,发送时间为:{}",new Date());
    29. }
    30. }

    RabbitMQ交换机详细属性

    3.1具体参数

    1、Name:交换机名称;就是一个字符串

    2、Type:交换机类型,direct, topic, fanout, headers四种

    3、Durability:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在;

    4、Auto delete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机;

    5、Internal:内部使用的,如果是yes,客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定。

    6、Arguments:只有一个取值alternate-exchange,表示备用交换机;

    3.2代码演示

    结论1:没发消息之前不会创建交换机和对列

    结论2:发消息后,如果交换机不存在,才开始创建交换机,如果队列不存在,则创建新的对列

    结论3:创建交换机或者队列完成后再重新创建,如果修改交换机或队列参数则会报错

    406错误(inequivalent arg 'durable' for exchange 'exchange.durability' in vhost 'powernode': received 'false' but current is 'true', class-id=40, method-id=10))

    结论4:设置持久化为false ,重启rabbitmq-server,则交换机丢失,实验durable参数,先看下控制台,然后重启rabbitmq-server

    结论5:实验自动删除为 true ,从控制台上手动解绑,会发现自动删除

    3.3 备用交换机

    3.3.1 备用交换机使用场景

    当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。

    3.3.2 主要代码和注意事项

    备用交换机示例如下:

    注意:备用交换机一般使用fanout交换机

    测试时:指定一个错误路由

    重点:普通交换机设置参数绑定到备用交换机

    Map arguments = new HashMap<>();

    //指定当前正常的交换机的备用交换机是谁

    arguments.put("alternate-exchange", EXCHANGE_ALTERNATE);

    //DirectExchange(String name, boolean durable, boolean autoDelete, Map arguments)

    return new DirectExchange(EXCHANGE, true, false, arguments);

    //return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();

    3.3.3 参考配置代码

    yml

    1. server:
    2. port: 8080
    3. spring:
    4. application:
    5. name: ttl-learn1
    6. rabbitmq:
    7. host: 192.168.126.130
    8. port: 5672
    9. username: admin
    10. password: 123456
    11. virtual-host: powernode
    12. my:
    13. exchangeNormalName: exchange.normal.alternate #正常交换机
    14. exchangeAlternateName: exchange.alternate.1 #备用交换机
    15. queueNormalName: queue.normal.alternate #正常队列
    16. queueAlternateName: queue.alternate.1 # 备用队列

    配置类 

    1. package com.powernode.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.beans.factory.annotation.Value;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. @Configuration
    7. public class RabbitConfig {
    8. @Value("${my.exchangeNormalName}")
    9. private String exchangeNormalName;
    10. @Value("${my.exchangeAlternateName}")
    11. private String exchangeAlternateName;
    12. @Value("${my.queueNormalName}")
    13. private String queueNormalName;
    14. @Value("${my.queueAlternateName}")
    15. private String queueAlternateName;
    16. @Bean
    17. public DirectExchange normalExchange(){
    18. return ExchangeBuilder // 默认为持久化的,默认不自动删除
    19. .directExchange(exchangeNormalName) // 交换机的名字
    20. .alternate(exchangeAlternateName) //设置备用交换机 alternate-exchange
    21. .build();
    22. }
    23. @Bean
    24. public Queue queueNormal(){
    25. return QueueBuilder.durable(queueNormalName).build();
    26. }
    27. @Bean
    28. public Binding binding(DirectExchange normalExchange,Queue queueNormal){
    29. return BindingBuilder.bind(queueNormal).to(normalExchange).with("info");
    30. }
    31. @Bean //备用交换机
    32. public FanoutExchange alternateExchange(){
    33. return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build();
    34. }
    35. @Bean
    36. public Queue alternateQueue(){
    37. return QueueBuilder.durable(queueAlternateName).build();
    38. }
    39. @Bean
    40. public Binding bindingAlternate(FanoutExchange alternateExchange,Queue alternateQueue){
    41. return BindingBuilder.bind(alternateQueue).to(alternateExchange);
    42. }
    43. }

    3.3.4 参考发送消息代码

    @Service

    public class MessageService {

        @Resource

        private RabbitTemplate rabbitTemplate;

        /**

         * 发送消息

         */

    public void sendMessage() {

    //我们故意写错路由key,由于我们正常交换机设置了备用交换机,所以该消息就会进入备用交换机
    //从而进入备用对列,我们可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理
    //如果正常交换机没有设置备用交换机,则该消息会被抛弃。

            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info1223", "hello");

            System.out.println("消息发送完毕......");

        }

    }

    RabbitMQ队列详细属性

    4.1 具体参数

    Type:队列类型

    Name:队列名称,就是一个字符串,随便一个字符串就可以;

    Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;

    Auto delete: 是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;

    Exclusive:exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;

    基本上不设置它,设置成false

    Arguments:队列的其他属性,例如指定DLX(死信交换机等);

    1、x-expires:Number

    当Queue(队列)在指定的时间未被访问,则队列将被自动删除;

    2、x-message-ttl:Number

    发布的消息在队列中存在多长时间后被取消(单位毫秒);

    3、x-overflow:String

    设置队列溢出行为,当达到队列的最大长度时,消息会发生什么,有效值为Drop HeadReject Publish

    4、x-max-length:Number

    队列所能容下消息的最大长度,当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法;

    5、 x-single-active-consumer:默认为false

    激活单一的消费者,也就是该队列只能有一个消息者消费消息;

    6、x-max-length-bytes:Number

    限定队列的最大占用空间,当超出后也使用类似于Redis的LRU算法;

    7、x-dead-letter-exchange:String

    指定队列关联的死信交换机,有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;

    8.x-dead-letter-routing-key:String

    指定死信交换机的路由键,一般和6一起定义;

    9.x-max-priority:Number

    如果将一个队列加上优先级参数,那么该队列为优先级队列;

    (1)、给队列加上优先级参数使其成为优先级队列

    x-max-priority=10【0-255取值范围】

    (2)、给消息加上优先级属性

    通过优先级特性,将一个队列实现插队消费;

    MessageProperties messageProperties=new MessageProperties();
    messageProperties.setPriority(8);

    10、x-queue-mode:String(理解下即可)

    队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用,如果未设置,则队列将保留内存缓存以尽可能快地传递消息;

    11、x-queue-master-locator:String(用的较少,不讲)

    在集群模式下设置队列分配到的主节点位置信息;

    每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;

    每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave。

    基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;

    关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator,有三种可供选择的策略:

    (1)min-masters:选择master queue数最少的那个服务节点host;

    (2)client-local:选择与client相连接的那个服务节点host;

    (3)random:随机分配;

    4.2 参考代码

    @Configuration

    public class RabbitConfig {

        public static final String EXCHANGE = "exchange";

        public static final String QUEUE = "queue";

        public static final String KEY = "info";

        QueueBuilder builder;

        @Bean

        public DirectExchange directExchange() {

            return ExchangeBuilder.directExchange(EXCHANGE).build();

        }

        @Bean

        public Queue queue() {

            Map arguments = new HashMap<>();

            //arguments.put("x-expires", 5000);

            //arguments.put("x-max-length", 5);

            //arguments.put("x-overflow", "reject-publish");

            arguments.put("x-single-active-consumer", false); //TODO ???

            //arguments.put("x-max-length-bytes", 20); // 单位是字节

            //arguments.put("x-max-priority", 10); // 0-255 //表示把当前声明的这个队列设置成了优先级队列,那么该队列它允许消息插队

            //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM内存的使用,如果未设置,队列将保留内存缓存以尽可能快地传递消息;

            //有时候我们把这种队列叫:惰性队列

            //arguments.put("x-queue-mode", "lazy");

            //设置队列版本。默认为版本1。

            //版本1有一个基于日志的索引,它嵌入了小消息。

            //版本2有一个不同的索引,可以在许多场景中提高内存使用率和性能,并为以前嵌入的消息提供了按队列存储。

            //arguments.put("x-queue-version", 2);

            // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

            //arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());

            //-------------------------

            //arguments.put("x-expires", 10000); //自动过期,10秒

            //arguments.put("x-message-ttl", 10000); //自动过期,10秒,不会删除队列

            //QueueBuilder 类里面有定义,设置队列溢出行为,当达到队列的最大长度时消息会发生什么,有效值是drop-head、reject-publish

            //arguments.put("x-max-length", 5);

            //arguments.put("x-overflow", QueueBuilder.Overflow.dropHead.getValue());

            //表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

            //arguments.put("x-single-active-consumer", true);

            // x-max-length-bytes,队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;

            //arguments.put("x-max-length-bytes", 10);

            //参数是1到255之间的正整数,表示队列应该支持的最大优先级,数字越大代表优先级越高,没有设置priority优先级字段,那么priority字段值默认为0;如果优先级队列priority属性被设置为比x-max-priority大,那么priority的值被设置为x-max-priority的值。

            //arguments.put("x-max-priority", 10);

            //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;

            //arguments.put("x-queue-mode", "lazy");

            arguments.put("x-queue-version", 2);

            // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

            arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());

            //---------------------------------------------

            // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map arguments)

            return new Queue(QUEUE, true, false, false, arguments);

        }

        @Bean

        public Binding binding(DirectExchange directExchange, Queue queue) {

            return BindingBuilder.bind(queue).to(directExchange).with(KEY);

        }

    }

    实验durable 参数 重启rabbitmq-server,队列丢失

    实验autodelete参数:加入接收者,发现停掉服务,那么久没有消费者了,对列就会自动删除

    消息可靠性投递

    消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;

    如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。

    确保消息在队列正确地存储

    可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;

    解决方案:

    1. 、队列持久化

    代码:

    QueueBuilder.durable(QUEUE).build();

    1. 、交换机持久化

    代码:

    ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

    1. 、消息持久化

    代码:

    默认持久化

     MessageProperties messageProperties = new MessageProperties();

    //设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码

     messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

    1. 、集群,镜像队列,高可用

    1. 确保消息从队列正确地投递到消费者

    采用消息消费时的手动ack确认机制来保证;

    如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。

    为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

    开启手动ack消息消费确认


    spring.rabbitmq.listener.simple.acknowledge-mode=manual

     yml

    1. server:
    2. port: 8080
    3. spring:
    4. application:
    5. name: rabbit-12-reliability
    6. rabbitmq:
    7. host: 192.168.126.130
    8. port: 5672
    9. username: admin
    10. password: 123456
    11. virtual-host: powernode
    12. publisher-confirm-type: correlated # 开启发布者的确认模式
    13. publisher-returns: true # 开启发布者的return模式
    14. listener:
    15. simple:
    16. acknowledge-mode: manual # 开始消费者的手动确认模式
    17. my:
    18. exchangeName: exchange.reliability
    19. queueName: queue.reliability

     配置类

    1. package com.powernode.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.beans.factory.annotation.Value;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. @Configuration
    7. public class RabbitConfig {
    8. @Value("${my.exchangeName}")
    9. private String exchangeName;
    10. @Value("${my.queueName}")
    11. private String queueName;
    12. @Bean
    13. public DirectExchange directExchange(){
    14. //默认就是持久化的
    15. return ExchangeBuilder.directExchange(exchangeName).build();
    16. }
    17. @Bean
    18. public Queue queue(){
    19. //队列持久化
    20. return QueueBuilder.durable(queueName).build();
    21. }
    22. @Bean
    23. public Binding binding(DirectExchange directExchange,Queue queue){
    24. return BindingBuilder.bind(queue).to(directExchange).with("info");
    25. }
    26. }

    消息类

    1. package com.powernode.service;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageBuilder;
    5. import org.springframework.amqp.core.MessageDeliveryMode;
    6. import org.springframework.amqp.core.MessageProperties;
    7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    8. import org.springframework.stereotype.Service;
    9. import javax.annotation.PostConstruct;
    10. import javax.annotation.Resource;
    11. import java.util.Date;
    12. @Service
    13. @Slf4j
    14. public class MessageService {
    15. @Resource
    16. private RabbitTemplate rabbitTemplate;
    17. /**
    18. * 构造方法执行后自动执行
    19. */
    20. @PostConstruct
    21. public void init(){
    22. //开启生产者的确定模式
    23. rabbitTemplate.setConfirmCallback(
    24. (correlationData, ack, cause)->{
    25. if(!ack){
    26. log.error("消息没有到达交换机,原因为:{}",cause);
    27. //TODO 重发消息或者记录错误日志
    28. }
    29. }
    30. );
    31. rabbitTemplate.setReturnsCallback(
    32. returnedMessage->{
    33. log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}",returnedMessage.getReplyText());
    34. //TODO 记录错误日志,给程序员发短信或者或者邮件
    35. }
    36. );
    37. }
    38. public void sendMsg(){
    39. MessageProperties messageProperties=new MessageProperties();
    40. //设置单条消息的持久化,默认就是持久化
    41. messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    42. Message message= MessageBuilder.withBody("hello world".getBytes())
    43. .andProperties(messageProperties).build();
    44. rabbitTemplate.convertAndSend("exchange.reliability","info",message);
    45. log.info("消息发送完毕,发送时间为:{}",new Date());
    46. }
    47. }

    发送消息

    1. package com.powernode;
    2. import com.powernode.service.MessageService;
    3. import org.springframework.boot.ApplicationArguments;
    4. import org.springframework.boot.ApplicationRunner;
    5. import org.springframework.boot.SpringApplication;
    6. import org.springframework.boot.autoconfigure.SpringBootApplication;
    7. import javax.annotation.Resource;
    8. @SpringBootApplication
    9. public class Application implements ApplicationRunner {
    10. @Resource
    11. private MessageService messageService;
    12. public static void main(String[] args) {
    13. SpringApplication.run(Application.class, args);
    14. }
    15. @Override
    16. public void run(ApplicationArguments args) throws Exception {
    17. messageService.sendMsg();
    18. }
    19. }

     消费者消费消息类(手动确认)

    1. package com.powernode.message;
    2. import com.rabbitmq.client.Channel;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.amqp.core.Message;
    5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    6. import org.springframework.stereotype.Component;
    7. import java.io.IOException;
    8. @Component
    9. @Slf4j
    10. public class ReceiveMessage {
    11. @RabbitListener(queues = {"queue.reliability"})
    12. public void receiveMsg(Message message, Channel channel){
    13. //获取消息的唯一标识
    14. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    15. try {
    16. log.info("接收到的消息为:{}",new String(message.getBody()));
    17. // TODO 插入订单等
    18. // int a=10/0;
    19. //手动确认
    20. channel.basicAck(deliveryTag,false);
    21. } catch (Exception e) {
    22. log.error("消息处理出现问题");
    23. try {
    24. channel.basicNack(deliveryTag,false,true);
    25. } catch (IOException ex) {
    26. throw new RuntimeException(ex);
    27. }
    28. throw new RuntimeException(e);
    29. }
    30. }
    31. }

    消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;

    如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

    1. 消息的幂等性

    消息消费时的幂等性(消息不被重复消费)

    同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

    幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;

    以接口幂等性举例:

    接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;

    注册接口;

    发送短信验证码接口;

    比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的;

    如何避免消息的重复消费问题?(消息消费时的幂等性)

    全局唯一ID + Redis

    生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

    具体代码参考以下代码;

    参考代码:

      //1、把消息的唯一ID写入redis

            boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getId(), String.valueOf(orders.getId())); //如果redis中该key不存在,那么就设置,存在就不设置

            if (flag) { //key不存在返回true

                //相当于是第一次消费该消息

                //TODO 处理业务

                System.out.println("正常处理业务....." + orders.getId());

            }

  • 相关阅读:
    使用微PE工具箱制作winU盘启动盘~重装系统
    Alpine Linux 源使用帮助
    诞生在 KFC 的《开源之迷》:作者如何在嘈杂而开放的环境中进行创作
    【每日一题】执行 K 次操作后的最大分数
    ES6 入门教程 7 数值的扩展 7.9 BigInt 数据类型
    干货 | 一改测试步骤代码就全写?为什么不试试用 Yaml实现数据驱动?
    [Chipscope 16-213] The debug port ‘dbg_hub/clk‘ has 1 unconnected channels 解决
    Java类型转换和类型提升
    基于微信小程序的音乐播放器设计与实现(源码+lw+部署文档+讲解等)
    L1W4作业1 逐步构建你的深度神经网络
  • 原文地址:https://blog.csdn.net/yuzheh521/article/details/133211065