在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
- @Slf4j
- @RestController
- public class MessageProduce implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @PostConstruct
- private void init()
- {
- rabbitTemplate.setConfirmCallback(this);
- /**
- * true:
- * 交换机无法将消息进行路由的时候,会将该消息返回给生产者
- * false:
- * 如果发现消息无法进行路由,则直接将消息扔掉
- */
- rabbitTemplate.setMandatory(true);
- //将回退消息交给谁处理
- rabbitTemplate.setReturnCallback(this);
- }
- @GetMapping("sendMessage/{message}")
- public void sendMessage(@PathVariable String message)
- {
- //让消息绑定一个id值
- CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1);
- rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData1);
- log.info("发送消息id位{}内容为{}",correlationData1.getId(),message+"key1");
- log.info("发送消息id位{}内容为{}",correlationData1.getId(),message+"key2");
-
- }
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- String id= correlationData!=null?correlationData.getId():"";
- if(b)
- {
- log.info("交换机收到消息确认成功;id{}",id);
- }
- else {
- log.error("消息id{}未成功投递到交换机,原因是:{}",id,s);
- }
- }
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText,
- String exchange, String routingKey) {
- log.info("消息:{}被服务器退回,退回的原因是{},交换机是{}",
- new String(message.getBody()),replyText,exchange,routingKey);
- }
- }
- @Component
- @Slf4j
- public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack ,String cause) {
- String id=correlationData!=null?correlationData.getId():"";
- if(ack)
- {
- log.info("交换机已经收到id为{}的消息",id);
- }
- else
- {
- log.info("交换机还未收到id未:{}的消息,原因是{}",cause);
- }
- }
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText,
- String exchange, String routingKey) {
- log.error("消息{},被交换机{}退回。退回的原因是:{},路由key为{}",new String(message.getBody()),
- exchange,replyText,routingKey);
- }
- }
- @Component
- @Slf4j
- public class ConfirmConsumer {
- public static final String CONFIRM_QUEUE_NAME="confirm.queue";
- @RabbitListener(queues = CONFIRM_QUEUE_NAME)
- public void receiveMsg(Message message)
- {
- String s = new String(message.getBody());
- log.info("接收到队列confirm.queue消息:{}",s);
- }
- }
http://localhost:8989/sendMessage/8888
