• RabbitMq确认机制


    confirm机制主要包含的三个属性分别是,mandatory、publisher-confirms、publisher-return
    rabbitmq客户端发送消息首先发送的交换器exchange,然后通过路由键routingKey和bindingKey比较判定需要将消息发送到那个队列queue上;在这个过程有两个地方消息可能丢失,第一消息发送到交换器exchange的过程,第二消息从交换器exchange发送到队列queue的过程;

    publisher-confirms

    publiser-confirm模式可以确保生产者到交换器exchange消息有没有发送成功

    #设置此属性配置可以确保消息成功发送到交换器
    spring.rabbitmq.publisher-confirms=true
    
    
    • 1
    • 2
    • 3

    示例
    rabbitTemplate支持ConfirmCallback接口

       RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack){
                        System.out.println(correlationData.toString() + "发送成功");
                    }else {
                        System.out.println(correlationData.toString() + "发送失败, 原因: " + cause);
                    }
                }
            };
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.convertAndSend("direct_exchange", "ROUTING_KEY_01", map, correlationData);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    当ack返回true时,代表消息已经被交换机正确接收,返回false代表消息没有被交换机正确接收
    回调有三个参数correlationData,带代表指定的消息id被正确接收或者失败了,可以根据消息id再做指定的操作
    因此,如果需要使用confirmCallback,correlationData发送时是需要传的。否则并无法确定那条消息得到确定

    默认的ConfirmCallback

    是AsyncRabbitTemplate中的实现

    @Override
    	public void confirm(@NonNull CorrelationData correlationData, boolean ack, @Nullable String cause) {
    		if (this.logger.isDebugEnabled()) {
    			this.logger.debug("Confirm: " + correlationData + ", ack=" + ack
    					+ (cause == null ? "" : (", cause: " + cause)));
    		}
    		String correlationId = correlationData.getId();
    		if (correlationId != null) {
    			RabbitFuture<?> future = this.pending.get(correlationId);
    			if (future != null) {
    				future.setNackCause(cause);
    				((SettableListenableFuture<Boolean>) future.getConfirm()).set(ack);
    			}
    			else {
    				if (this.logger.isDebugEnabled()) {
    					this.logger.debug("Confirm: " + correlationData + ", ack=" + ack
    							+ (cause == null ? "" : (", cause: " + cause))
    							+ " no pending future - either canceled or the reply is already received");
    				}
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    得到确认后,对future做返回值,那么可以这么使用

    rabbitTemplate.convertAndSend("direct_esxchange", "ROUTING_KEY_01", messageProperties,correlationData);
    correlationData.getFuture().get();
    
    • 1
    • 2

    可以通过future的结果判断是否发送成功。如果失败,例如可以采用重新发送的方式
    但是RabbitTemplate并没有实现这个类,也就说如果如果直接get,那么将会一直阻塞。可见如果我们开启了配置却不实现ConfirmCallback,是无法处理丢失掉的消息,也会丢消息

    publisher-confirms和mandatory

    这两个配置都是空值消息退回的
    你可能会有一个疑问,这两个配置都是指定未找到合适队列时将消息退回,究竟是如何分别起作用呢?
    spring.rabbitmq.template.mandatory属性的优先级高于spring.rabbitmq.publisher-returns的优先级
    spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true,
    spring.rabbitmq.template.mandatory结果为true、false时会忽略掉spring.rabbitmq.publisher-returns属性的值
    spring.rabbitmq.template.mandatory结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns确定

    可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃

    例如

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returned) {
                    System.out.println(returned);
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ReturnsCallback中含一个returnMessage方法,入参是ReturnedMessage对象

    public class ReturnedMessage {
    
    	private final Message message;
    
    	private final int replyCode;
    
    	private final String replyText;
    
    	private final String exchange;
    
    	private final String routingKey;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    image.png
    消息退回可以做进一步处理

    实现重试机制

    防止消息发送失败导致消息丢失
    基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
    在rabbitTemplate异步确认的基础上
    1 在本地缓存已发送的message
    2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
    3 定时扫描本地的message,如果大于一定时间未被确认,则重发。
    当然了,这种解决方式也有一定的问题
    想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。

  • 相关阅读:
    K8S简单学习
    电力系统机组组合优化调度(IEEE14节点、IEEE30节点、IEEE118节点)(Matlab代码实现)
    配置与管理数据库服务器(MariaDB)
    【JavaEE】JavaScript webAPI的基本知识
    QT_字符串相关操作_QString
    Unity微信小游戏登录授权获取用户信息
    Eunomia: 让 ebpf 程序的分发和使用像网页和 web 服务一样自然
    【刷题记录⑨】Java工程师丨字节面试真题(三)
    day2 ARM基础
    Unity进阶第二章-消息框架(实现角色吃金币功能)
  • 原文地址:https://blog.csdn.net/qq_37436172/article/details/130913766