• 4.RabbitMQ 消息确认机制


    4.RabbitMQ 消息确认机制

    • RabbitMQ在传递消息的过程中充当了代理人(Broker)的角色,那生产者(Producer)怎样知道消息被正确投递到 Broker了呢?
    • RabbitMQ提供了监听器(Listener)来接收消息投递的状态。
      消息确认涉及两种状态:Confirm与Return。

    Confirm & Return

    • Confirm代表生产者将消息送到Broker时产生的状态,后续会出现两种情况:
      • ack 代表Broker已经将数据接收。
      • nack 代表Broker拒收消息。原因有多种,队列已满,限流,1O异常…
    • Return代表消息被Broker正常接收(ack)后,但Broker没有对应的队列进行投递时产生的状态,消息被退回给生产者。
    • 注意:上面两种状态只代表生产者与Broker之间消息投递的情况。与消费者是否接收/确认消息无关。

    代码如下:

    import com.rabbitmq.client.*;
    import rabbitmq.ConnectionUtil;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    public class Confirm {
        public static void main(String[] args)  throws Exception{
            String exchangeStr = "exchange_weather_routing";
            Map<String,String> area = new HashMap();
            area.put("china.beijing.20221128","北京20221128号天气晴朗!");
            area.put("china.zhengzhou.20221128","郑州20221128号天气小雪!");
            area.put("us.NewYork.20221129","纽约20221129号天气晴朗!");
            area.put("us.Washington.20221129","华盛顿20221129号天气小雪!");
    
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //开启confirm 监听模式
            channel.confirmSelect();
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long l, boolean b) throws IOException {
                    //第二个参数代表接收的数据是否为批量接收,一般用不到
                    System.out.println("消息已经被接收,tag:"+l);
                }
    
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("消息已经被Broker拒收,tag:"+l);
                }
            });
            // 开启Return 监听
            channel.addReturnListener(new ReturnCallback() {
                public void handle(Return aReturn) {
                    System.out.println("===================");
                    System.out.println("Return编码:"+aReturn.getReplyCode()+"==Return 描述:"+aReturn.getReplyText());
                    System.out.println("交换机:"+aReturn.getExchange()+"==路由Key:"+aReturn.getRoutingKey());
                    System.out.println("Return主题:"+new String(aReturn.getBody()));
                }
            });
            Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
            while (itr.hasNext()){
                Map.Entry<String, String> m = itr.next();
                //第一个参数:交换机名字,第二个参数:消息的Routing key
                channel.basicPublish(exchangeStr,m.getKey(),null,m.getValue().getBytes());
            }
            //不能关闭,关闭掉就监听不到了
    /*        channel.close();
            connection.close();*/
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    总结:消息确认的两种状态:Confirm与Return,是生产者的状态不是消费者的状态,消费者的状态为:basicAck方法。

  • 相关阅读:
    【面试题精讲】Object类的常见方法有哪些?
    假离婚变成真离婚,财产怎么办
    阿里云2023年双十一优惠活动整理汇总
    浅析接口测试
    13.4web自动化测试(Selenium3+Java)
    【前段工程化】node运行npm run 命令的时候报错JavaScript heap out of memory 内存超限
    ADM 架构开发方法概述以及各个阶段的目的和交付物
    【UE5 Cesium】19-Cesium for Unreal 建立飞行跟踪器(4)
    二叉树题目:二叉树剪枝
    static_cast与dynamic_cast到底是什么?
  • 原文地址:https://blog.csdn.net/Mao_yafeng/article/details/128106323