• RabbitMQ: return与confirm 机制整合 spirngboot实现


    1. package com.qf.mq2203.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    4. import org.springframework.amqp.rabbit.connection.CorrelationData;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.context.annotation.Bean;
    8. import org.springframework.context.annotation.Configuration;
    9. import javax.annotation.PostConstruct;
    10. @Configuration
    11. public class RabbitmqConfig {
    12. @Autowired
    13. RabbitTemplate rabbitTemplate;
    14. @PostConstruct
    15. public void init(){
    16. // 设置 return callback
    17. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    18. @Override
    19. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    20. System.out.println(Thread.currentThread().getName() + "================");
    21. final byte[] body = message.getBody(); // 消息的内容
    22. System.out.println(message);
    23. System.out.println(replyCode);
    24. System.out.println(replyText);
    25. System.out.println(exchange);
    26. System.out.println(routingKey);
    27. System.out.println(Thread.currentThread().getName() + "================");
    28. }
    29. });
    30. // 设置 confirm回调方法
    31. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    32. @Override
    33. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    34. System.out.println(Thread.currentThread().getName() + "----------------");
    35. System.out.println(correlationData);
    36. System.out.println(ack);
    37. System.out.println(cause);
    38. System.out.println(Thread.currentThread().getName() + "----------------");
    39. }
    40. });
    41. }
    42. @Bean
    43. public TopicExchange topicExchange() {
    44. final TopicExchange topicExchange = new TopicExchange("boot-topic-exchange", true, false);
    45. return topicExchange;
    46. }
    47. @Bean
    48. public Queue queue() {
    49. final Queue queue = new Queue("boot-queue");
    50. return queue;
    51. }
    52. @Bean
    53. public Binding binding(Queue queue, TopicExchange topicExchange) {
    54. String routingkey = "*.red.*";
    55. final Binding binding = BindingBuilder.bind(queue).to(topicExchange).with(routingkey);
    56. return binding;
    57. }
    58. }

    yml

    1. spring:
    2. rabbitmq:
    3. host: 39.98.95.55
    4. port: 5672
    5. virtual-host: /test
    6. username: test
    7. password: test
    8. listener:
    9. simple:
    10. prefetch: 1
    11. acknowledge-mode: manual
    12. publisher-returns: true # 开启mq的return机制
    13. publisher-confirm-type: correlated

  • 相关阅读:
    重温 C# 字典Dictionary类
    基于bp神经网络的房价预测,房价预测 神经网络
    岩土工程监测中的振弦采集仪选择与布设策略
    105道Java面试题
    【程序员表白大师】html七夕脱单必看源码制作
    Spring动态代理源码分析
    go实现限流器
    搜索二维矩阵 II
    Redis集群|集群搭建|集群Jedis开发
    钉钉机器人消息推送composer拓展 laravel-dingbot
  • 原文地址:https://blog.csdn.net/qq_53374893/article/details/132768461