• springboot中使用rabbitmq


    1.加入依赖

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    

     2. 配置,配置中各个参数的含义 传送门

    spring:
        rabbitmq:
          host: xxxx
          username: xxx
          password: xxxx
          port: 5672
          virtual-host: /
          publisher-confirm-type: correlated
          publisher-returns: true
          template:
            mandatory: true
          listener:
            type: simple
            simple:
              acknowledge-mode: manual
              retry:
                enabled: true
              prefetch: 30
    

    3.使用,我这里是根据自己的业务场景的具体使用,可以看这个大神总结的使用方式传送门 

    3.1 配置一个topic类型的交换机,绑定队列,指定routingkey 

    1. @Configuration
    2. public class TopicRabbitMqConfig {
    3. public final static String exchange = "xxx";
    4. public final static String queue = "xxx";
    5. private final static String routing = "xxx";
    6. @Bean
    7. TopicExchange netdiskTopicExchange(){
    8. return new TopicExchange(exchange, true, false);
    9. }
    10. @Bean
    11. Queue netdiskQueue(){
    12. return new Queue(queue);
    13. }
    14. @Bean
    15. Binding netdiskBinding(){
    16. return BindingBuilder.bind(netdiskQueue()).to(netdiskTopicExchange()).with(routing);
    17. }
    18. }

    3.2 封装了一个工具类方便后续使用

    1. @Slf4j
    2. @Component
    3. public class MqUtil implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    4. @Resource
    5. private RabbitTemplate rt;
    6. public static RabbitTemplate rabbitTemplate;
    7. @PostConstruct
    8. private void init() {
    9. MqUtil.rabbitTemplate = this.rt;
    10. rabbitTemplate.setConfirmCallback(this::confirm);
    11. rabbitTemplate.setReturnsCallback(this::returnedMessage);
    12. }
    13. /**
    14. * 不论是否进入交换机,都会回调当前方法
    15. *
    16. * @param correlationData 消息投递封装对象
    17. * @param ack 是否投递成功
    18. * @param exception 如果错误,错误原因
    19. */
    20. @Override
    21. public void confirm(CorrelationData correlationData, boolean ack, String exception) {
    22. if (!ack) {
    23. if (correlationData instanceof CorrelationDataExt) {
    24. CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
    25. Object message = correlationDataExt.getData();
    26. log.error("消息进入交换机失败:{}, 原因:{}", JSON.toJSONString(message), exception);
    27. }
    28. }
    29. }
    30. /**
    31. * 消息从交换机进入队列失败回调方法:只会在失败的情况下
    32. *
    33. * @param ReturnedMessage returnedMessage
    34. */
    35. @Override
    36. public void returnedMessage(ReturnedMessage returnedMessage) {
    37. Message message = returnedMessage.getMessage();
    38. int replyCode = returnedMessage.getReplyCode();
    39. String replyText = returnedMessage.getReplyText();
    40. String exchange = returnedMessage.getExchange();
    41. String routingKey = returnedMessage.getRoutingKey();
    42. String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);
    43. Map map = new HashMap<>();
    44. map.put("replyCode", replyCode);
    45. map.put("replyText", replyText);
    46. map.put("exchange", exchange);
    47. map.put("routingKey", routingKey);
    48. map.put("message", messageContent);
    49. log.error("消息从交换机进入队列失败:{}", JSON.toJSONString(map));
    50. }
    51. public static void send(String type, MqMessageData data) throws AmqpException {
    52. String msgId = UUID.randomUUID().toString();
    53. CorrelationDataExt correlationData = new CorrelationDataExt();
    54. correlationData.setId(msgId);
    55. correlationData.setData("xxxx");
    56. rabbitTemplate.convertAndSend(TopicRabbitMqConfig.exchange, "xxx", "message", correlationData);
    57. }
    58. }

    3.2 CorrelationDataExt ,扩展CorrelationData ,方便把我们发送出的消息挂回到生产者确认的回调里,这部分的使用方式不确定是否一定对,因为我也是第一次用,但是发现CorrelationData 里只有id能拿到,却拿不到数据,后来看了一个帖子可以扩展CorrelationData ,能实现我想要的效果,还希望路过的大神能指导一下,生产者确认的回调里,如果失败了怎么处理消息?感谢!!

    1. /**
    2. * CorrelationData的自定义实现,用于拿到消息内容
    3. * @author coco
    4. * @date 2022/9/16
    5. */
    6. public class CorrelationDataExt extends CorrelationData {
    7. //数据
    8. private volatile Object data;
    9. public Object getData() {
    10. return data;
    11. }
    12. public void setData(Object data) {
    13. this.data = data;
    14. }
    15. }

     

     

  • 相关阅读:
    保存文件时电脑提示:你没有权限在此位置中保存文件。请与管理员联系以获得相应权限。
    动态规划专项---最长上升子序列模型
    Day32 Web自动化进阶
    linux 安装Docker
    【Python基础】Python文件操作介绍
    连接池及Druid(德鲁伊) 数据库连接池
    pod(八):pod的调度——将 Pod 指派给节点
    如何快速通过PMP考试?
    ros发布节点和接收节点的编写
    黑客(网络安全)技术速成自学
  • 原文地址:https://blog.csdn.net/Coco_chun/article/details/126926640