• RabbitMQ: return与confirm 机制整合 spirngboot实现


    1. package com.qf.mq2203.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    4. import org.springframework.amqp.rabbit.connection.CorrelationData;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. import javax.annotation.PostConstruct;
    10. @Configuration
    11. public class RabbitmqConfig {
    12. @Autowired
    13. RabbitTemplate rabbitTemplate;
    14. @PostConstruct
    15. public void init(){
    16. // 设置 return callback
    17. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    18. @Override
    19. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    20. System.out.println(Thread.currentThread().getName() + "================");
    21. final byte[] body = message.getBody(); // 消息的内容
    22. System.out.println(message);
    23. System.out.println(replyCode);
    24. System.out.println(replyText);
    25. System.out.println(exchange);
    26. System.out.println(routingKey);
    27. System.out.println(Thread.currentThread().getName() + "================");
    28. }
    29. });
    30. // 设置 confirm回调方法
    31. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    32. @Override
    33. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    34. System.out.println(Thread.currentThread().getName() + "----------------");
    35. System.out.println(correlationData);
    36. System.out.println(ack);
    37. System.out.println(cause);
    38. System.out.println(Thread.currentThread().getName() + "----------------");
    39. }
    40. });
    41. }
    42. @Bean
    43. public TopicExchange topicExchange() {
    44. final TopicExchange topicExchange = new TopicExchange("boot-topic-exchange", true, false);
    45. return topicExchange;
    46. }
    47. @Bean
    48. public Queue queue() {
    49. final Queue queue = new Queue("boot-queue");
    50. return queue;
    51. }
    52. @Bean
    53. public Binding binding(Queue queue, TopicExchange topicExchange) {
    54. String routingkey = "*.red.*";
    55. final Binding binding = BindingBuilder.bind(queue).to(topicExchange).with(routingkey);
    56. return binding;
    57. }
    58. }

    yml

    1. spring:
    2. rabbitmq:
    3. host: 39.98.95.55
    4. port: 5672
    5. virtual-host: /test
    6. username: test
    7. password: test
    8. listener:
    9. simple:
    10. prefetch: 1
    11. acknowledge-mode: manual
    12. publisher-returns: true # 开启mq的return机制
    13. publisher-confirm-type: correlated

  • 相关阅读:
    聚苯乙烯/Fe3O4纳米复合材料PS@Fe3O4|硫化锌-四氧化三铁(ZnS/Fe3O40纳米复合物(齐岳)
    Allegro如何调整丝印位号/参数值到器件中心
    【单词】【2016】
    记录一个出现多次的小BUG:用串口读的数据,只能有一次赋值
    容器数据卷
    网课答案公众号搭建过程详解
    jquery html(““)造成内存上涨
    提升20%!京东广告模型系统负载均衡揭秘
    数据结构(一)综述
    初学java懵了,这个异常是怎么产生的?
  • 原文地址:https://blog.csdn.net/qq_53374893/article/details/132768461