• springboot中使用rabbitmq


    1.加入依赖

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    

     2. 配置,配置中各个参数的含义 传送门

    spring:
        rabbitmq:
          host: xxxx
          username: xxx
          password: xxxx
          port: 5672
          virtual-host: /
          publisher-confirm-type: correlated
          publisher-returns: true
          template:
            mandatory: true
          listener:
            type: simple
            simple:
              acknowledge-mode: manual
              retry:
                enabled: true
              prefetch: 30
    

    3.使用,我这里是根据自己的业务场景的具体使用,可以看这个大神总结的使用方式传送门 

    3.1 配置一个topic类型的交换机,绑定队列,指定routingkey 

    1. @Configuration
    2. public class TopicRabbitMqConfig {
    3. public final static String exchange = "xxx";
    4. public final static String queue = "xxx";
    5. private final static String routing = "xxx";
    6. @Bean
    7. TopicExchange netdiskTopicExchange(){
    8. return new TopicExchange(exchange, true, false);
    9. }
    10. @Bean
    11. Queue netdiskQueue(){
    12. return new Queue(queue);
    13. }
    14. @Bean
    15. Binding netdiskBinding(){
    16. return BindingBuilder.bind(netdiskQueue()).to(netdiskTopicExchange()).with(routing);
    17. }
    18. }

    3.2 封装了一个工具类方便后续使用

    1. @Slf4j
    2. @Component
    3. public class MqUtil implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    4. @Resource
    5. private RabbitTemplate rt;
    6. public static RabbitTemplate rabbitTemplate;
    7. @PostConstruct
    8. private void init() {
    9. MqUtil.rabbitTemplate = this.rt;
    10. rabbitTemplate.setConfirmCallback(this::confirm);
    11. rabbitTemplate.setReturnsCallback(this::returnedMessage);
    12. }
    13. /**
    14. * 不论是否进入交换机,都会回调当前方法
    15. *
    16. * @param correlationData 消息投递封装对象
    17. * @param ack 是否投递成功
    18. * @param exception 如果错误,错误原因
    19. */
    20. @Override
    21. public void confirm(CorrelationData correlationData, boolean ack, String exception) {
    22. if (!ack) {
    23. if (correlationData instanceof CorrelationDataExt) {
    24. CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
    25. Object message = correlationDataExt.getData();
    26. log.error("消息进入交换机失败:{}, 原因:{}", JSON.toJSONString(message), exception);
    27. }
    28. }
    29. }
    30. /**
    31. * 消息从交换机进入队列失败回调方法:只会在失败的情况下
    32. *
    33. * @param ReturnedMessage returnedMessage
    34. */
    35. @Override
    36. public void returnedMessage(ReturnedMessage returnedMessage) {
    37. Message message = returnedMessage.getMessage();
    38. int replyCode = returnedMessage.getReplyCode();
    39. String replyText = returnedMessage.getReplyText();
    40. String exchange = returnedMessage.getExchange();
    41. String routingKey = returnedMessage.getRoutingKey();
    42. String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);
    43. Map map = new HashMap<>();
    44. map.put("replyCode", replyCode);
    45. map.put("replyText", replyText);
    46. map.put("exchange", exchange);
    47. map.put("routingKey", routingKey);
    48. map.put("message", messageContent);
    49. log.error("消息从交换机进入队列失败:{}", JSON.toJSONString(map));
    50. }
    51. public static void send(String type, MqMessageData data) throws AmqpException {
    52. String msgId = UUID.randomUUID().toString();
    53. CorrelationDataExt correlationData = new CorrelationDataExt();
    54. correlationData.setId(msgId);
    55. correlationData.setData("xxxx");
    56. rabbitTemplate.convertAndSend(TopicRabbitMqConfig.exchange, "xxx", "message", correlationData);
    57. }
    58. }

    3.2 CorrelationDataExt ,扩展CorrelationData ,方便把我们发送出的消息挂回到生产者确认的回调里,这部分的使用方式不确定是否一定对,因为我也是第一次用,但是发现CorrelationData 里只有id能拿到,却拿不到数据,后来看了一个帖子可以扩展CorrelationData ,能实现我想要的效果,还希望路过的大神能指导一下,生产者确认的回调里,如果失败了怎么处理消息?感谢!!

    1. /**
    2. * CorrelationData的自定义实现,用于拿到消息内容
    3. * @author coco
    4. * @date 2022/9/16
    5. */
    6. public class CorrelationDataExt extends CorrelationData {
    7. //数据
    8. private volatile Object data;
    9. public Object getData() {
    10. return data;
    11. }
    12. public void setData(Object data) {
    13. this.data = data;
    14. }
    15. }

     

     

  • 相关阅读:
    Codeforces Round 889 (Div. 2)A~C1题解
    微服务架构之:Redis的分布式锁---搭建生产可用的Redis分布式锁
    ORACLE 19C pdb修改的参数保存在哪个数据字典中?
    【数学建模】层次分析
    EureKa详解:微服务发现与注册的利器
    带你Java入门(Java系列1)
    Spring 接口日志切片记录
    三星在又一个市场击败中国手机,继续称霸全球,中国市场没那么重要
    003:如何画出成交量的柱状图
    本地离线模型搭建指南-中文大语言模型底座选择依据
  • 原文地址:https://blog.csdn.net/Coco_chun/article/details/126926640