• RabbitMQ 消息的可靠性


    RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

    RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

     Confirm保证消息到达exchange。

    Return机制监听消息是否从exchange送到指定的queue中。

    消费者在消费消息时,如果消费者宕机了,可以使用手动ACK,但是会导致重复消费的问题

    在maven项目下,加入Confirm确认机制

    直接在生产者里面加入下面代码就可以

    普通Confirm方式

    1. //3.1 开启confirm
    2. channel.confirmSelect();
    3. //3.2 发送消息
    4. String msg = "Hello-World!";
    5. channel.basicPublish("","HelloWorld",null,msg.getBytes());
    6. //3.3 判断消息发送是否成功
    7. if(channel.waitForConfirms()){
    8. System.out.println("消息发送成功");
    9. }else{
    10. System.out.println("发送消息失败");
    11. }

    批量Confirm方式

    1. //3.1 开启confirm
    2. channel.confirmSelect();
    3. //3.2 批量发送消息
    4. for (int i = 0; i < 1000; i++) {
    5. String msg = "Hello-World!" + i;
    6. channel.basicPublish("","HelloWorld",null,msg.getBytes());
    7. }
    8. //3.3 确定批量操作是否成功
    9. channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException

    异步Confirm方式。

    1. //3.1 开启confirm
    2. channel.confirmSelect();
    3. //3.2 批量发送消息
    4. for (int i = 0; i < 1000; i++) {
    5. String msg = "Hello-World!" + i;
    6. channel.basicPublish("","HelloWorld",null,msg.getBytes());
    7. }
    8. //3.3 开启异步回调
    9. channel.addConfirmListener(new ConfirmListener() {
    10. @Override
    11. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    12. System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
    13. }
    14. @Override
    15. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    16. System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
    17. }
    18. });

     

    Return机制

    Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

    而且exchange是不能持久化消息的,queue是可以持久化消息。

    采用Return机制来监听消息是否从exchange送到了指定的queue中

     

     开启Return机制,并在发送消息时,指定mandatory为true  生产者里加

    1. // 开启return机制
    2. channel.addReturnListener(new ReturnListener() {
    3. @Override
    4. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    5. // 当消息没有送达到queue时,才会执行。
    6. System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
    7. }
    8. });
    9. // 在发送消息时,指定mandatory参数为true
    10. channel.basicPublish("","HelloWorld",true,null,msg.getBytes());

    SpringBoot实现

    编写配置文件

    1. spring:
    2. rabbitmq:
    3. publisher-confirm-type: simple
    4. publisher-returns: true

    开启Confirm和Return

    1. @Component
    2. public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. @PostConstruct // init-method
    6. public void initMethod(){
    7. rabbitTemplate.setConfirmCallback(this);
    8. rabbitTemplate.setReturnCallback(this);
    9. }
    10. @Override
    11. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    12. if(ack){
    13. System.out.println("消息已经送达到Exchange");
    14. }else{
    15. System.out.println("消息没有送达到Exchange");
    16. }
    17. }
    18. @Override
    19. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    20. System.out.println("消息没有送达到Queue");
    21. }
    22. }

     

    避免消息重复消费

    mave实现

    重复消费消息,会对非幂等行操作造成问题

    重复消费消息的原因是,消费者没有给RabbitMQ一个ack

     

     

    为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

    id-0(正在执行业务)

    id-1(执行业务成功)

    如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

    极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

    生产者,发送消息时,指定messageId

    1. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    2. .deliveryMode(1) //指定消息书否需要持久化 1 - 需要持久化 2 - 不需要持久化
    3. .messageId(UUID.randomUUID().toString())
    4. .build();
    5. String msg = "Hello-World!";
    6. channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());

     消费者,在消费消息时,根据具体业务逻辑去操作redis

    1. efaultConsumer consume = new DefaultConsumer(channel){
    2. @Override
    3. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    4. Jedis jedis = new Jedis("192.168.199.109",6379);
    5. String messageId = properties.getMessageId();
    6. //1. setnx到Redis中,默认指定value-0
    7. String result = jedis.set(messageId, "0", "NX", "EX", 10);
    8. if(result != null && result.equalsIgnoreCase("OK")) {
    9. System.out.println("接收到消息:" + new String(body, "UTF-8"));
    10. //2. 消费成功,set messageId 1
    11. jedis.set(messageId,"1");
    12. channel.basicAck(envelope.getDeliveryTag(),false);
    13. }else {
    14. //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
    15. String s = jedis.get(messageId);
    16. if("1".equalsIgnoreCase(s)){
    17. channel.basicAck(envelope.getDeliveryTag(),false);
    18. }
    19. }
    20. }
    21. };

     

    SpringBoot如何实现

    导入依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-data-redisartifactId>
    4. dependency>

    配置文件

    1. spring:
    2. redis:
    3. host: 192.168.199.109
    4. port: 6379

    修改生产者

    1. @Test
    2. void contextLoads() throws IOException {
    3. CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
    4. rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
    5. System.in.read();
    6. }

    修改消费者

    1. @Autowired
    2. private StringRedisTemplate redisTemplate;
    3. @RabbitListener(queues = "boot-queue")
    4. public void getMessage(String msg, Channel channel, Message message) throws IOException {
    5. //0. 获取MessageId
    6. String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    7. //1. 设置key到Redis
    8. if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
    9. //2. 消费消息
    10. System.out.println("接收到消息:" + msg);
    11. //3. 设置key的value为1
    12. redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
    13. //4. 手动ack
    14. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    15. }else {
    16. //5. 获取Redis中的value即可 如果是1,手动ack
    17. if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
    18. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    19. }
    20. }
    21. }

  • 相关阅读:
    voip|网络电话,软件实现电信座机
    计算机网络第2章-DNS(3)
    SpringBoot+Vue+Element-UI实现在线外卖系统
    免费文档翻译软件电脑版软件
    SpringBoot学习之SpringBoot3集成OpenApi(三十七)
    Dockerfile实现容器镜像的自定义及生成
    为dev c++配置图形开发环境easyx
    【软件与系统安全】栈溢出利用的分析
    拓端tecdat|R语言使用ARIMA模型预测股票收益时间序列
    Java 性能优化实战案例分析:并行计算让代码“飞”起来
  • 原文地址:https://blog.csdn.net/weixin_60934893/article/details/128104302