4.RabbitMQ 消息确认机制
Confirm & Return
代码如下:
import com.rabbitmq.client.*;
import rabbitmq.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class Confirm {
public static void main(String[] args) throws Exception{
String exchangeStr = "exchange_weather_routing";
Map<String,String> area = new HashMap();
area.put("china.beijing.20221128","北京20221128号天气晴朗!");
area.put("china.zhengzhou.20221128","郑州20221128号天气小雪!");
area.put("us.NewYork.20221129","纽约20221129号天气晴朗!");
area.put("us.Washington.20221129","华盛顿20221129号天气小雪!");
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//开启confirm 监听模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
//第二个参数代表接收的数据是否为批量接收,一般用不到
System.out.println("消息已经被接收,tag:"+l);
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息已经被Broker拒收,tag:"+l);
}
});
// 开启Return 监听
channel.addReturnListener(new ReturnCallback() {
public void handle(Return aReturn) {
System.out.println("===================");
System.out.println("Return编码:"+aReturn.getReplyCode()+"==Return 描述:"+aReturn.getReplyText());
System.out.println("交换机:"+aReturn.getExchange()+"==路由Key:"+aReturn.getRoutingKey());
System.out.println("Return主题:"+new String(aReturn.getBody()));
}
});
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()){
Map.Entry<String, String> m = itr.next();
//第一个参数:交换机名字,第二个参数:消息的Routing key
channel.basicPublish(exchangeStr,m.getKey(),null,m.getValue().getBytes());
}
//不能关闭,关闭掉就监听不到了
/* channel.close();
connection.close();*/
}
}
总结:消息确认的两种状态:Confirm与Return,是生产者的状态不是消费者的状态,消费者的状态为:basicAck方法。