• springboot实战(十)之全网最全RabbitMQ集成


    目录

    序言

    1.介绍

    2.特点

    3.安装

    4.集成

    5.思考


    序言

    首先我要抛出几个问题让大家思考一下:为什么你的项目中要用MQ呢?使用MQ为你解决了什么问题?当然解决问题的同时它又有哪些弊端值得注意?

    如果你不太清楚或者你根本没有考虑过,那么请往下看你会找到你想要的答案。

    1.介绍

    官网介绍:RabbitMQ is the most widely deployed open source message broker.(RabbitMQ是部署最广泛的开源信息代理。)
    RabbitMQ采用Erlang语言开发,是AMQP(Advanced Message Queuing Protocol)的标准实现。支持持久化,支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等。常用于分布式系统中的存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    2.特点

    • 分布式部署,支持集群模式、跨区域部署,以满足高可用、高吞吐量应用场景;
    • 异步消息传递,支持多种消息传递协议、消息队列、传递确认机制,灵活的路由消息到队列,多种交换类型;
    • 良好的开发者体验,可在许多操作系统及云环境中运行,并为大多数流行语言提供各种开发工具;
    • 可插拔身份认证授权,支持 TLS(Transport Layer Security)和 LDAP(Lightweight Directory Access Protocol),轻量且容易部署到内部、私有云或公有云中;
    • 有专门用于管理和监督的HTTP-API、命令行工具和 UI;
    • 支持连续集成,可以插件方式灵活地扩展 RabbitMQ 的功能。

    优点:

    • 由于 Erlang 语言的特性,RabbitMQ 性能较好、高并发;
    • 有消息确认机制和持久化机制,可靠性高;
    • 高度灵活可定制的路由;
    • 管理界面较丰富,在互联网公司也有较大规模的应用;
    • 健壮、稳定、易用、跨平台、支持多种语言客户端、文档齐全;
    • 社区活跃度高,更新快。

    缺点:

    1. 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
    2. 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得 RabbitMQ 易于使用和部署,但使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;
    3. 需要学习比较复杂的接口和协议,学习和维护成本较高。

    3.安装

    安装官网:RabbitMQ

    RabbitMQ支持window、unix、macos等平台安装,按自己的需要进行安装

    管理端页面:http://localhost:15672/ 

    账号密码:guest 

    4.集成

    因为springboot已经有mq相关starter,所以我们之间引用依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>

    yml文件配置信息:

    1. spring:
    2. #rabbitmq配置
    3. rabbitmq:
    4. host: 127.0.0.1
    5. port: 5672
    6. username: guest
    7. password: guest
    8. virtual-host: /

    配置文件:

    1. package com.iterge.iterge_pre.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.beans.factory.annotation.Qualifier;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. /**
    7. * @author liuph
    8. * @date 2023/10/17 15:50:37
    9. */
    10. @Configuration
    11. public class RabbitmqConfig {
    12. public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    13. public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    14. public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    15. public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    16. public static final String ROUTINGKEY_SMS="inform.#.sms.#";
    17. //声明交换机
    18. @Bean(EXCHANGE_TOPICS_INFORM)
    19. public Exchange EXCHANGE_TOPICS_INFORM(){
    20. //durable(true) 持久化,mq重启之后交换机还在
    21. return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    22. }
    23. //声明QUEUE_INFORM_EMAIL队列
    24. @Bean(QUEUE_INFORM_EMAIL)
    25. public Queue QUEUE_INFORM_EMAIL(){
    26. return new Queue(QUEUE_INFORM_EMAIL);
    27. }
    28. //声明QUEUE_INFORM_SMS队列
    29. @Bean(QUEUE_INFORM_SMS)
    30. public Queue QUEUE_INFORM_SMS(){
    31. return new Queue(QUEUE_INFORM_SMS);
    32. }
    33. //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    34. @Bean
    35. public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
    36. @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
    37. return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    38. }
    39. //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    40. @Bean
    41. public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
    42. @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
    43. return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    44. }
    45. }

    生产者配置:

    1. package com.iterge.iterge_pre.mq;
    2. import com.iterge.iterge_pre.config.RabbitmqConfig;
    3. import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author liuph
    8. * @date 2023/10/17 15:52:48
    9. */
    10. @Component
    11. public class ProducerService {
    12. @Autowired
    13. private RabbitMessagingTemplate mqTemplate;
    14. public void sendMag(String msg){
    15. mqTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", msg);
    16. }
    17. }

    消费者配置:

    1. package com.iterge.iterge_pre.mq;
    2. import com.iterge.iterge_pre.config.RabbitmqConfig;
    3. import com.rabbitmq.client.Channel;
    4. import org.springframework.amqp.core.Message;
    5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    6. import org.springframework.stereotype.Component;
    7. /**
    8. * @author liuph
    9. * @date 2023/10/17 15:53:04
    10. */
    11. @Component
    12. public class ConsumerService {
    13. //监听email队列
    14. @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
    15. public void receive_email(Message message, Channel channel){
    16. String body = new String(message.getBody());
    17. System.out.println("消费者:QUEUE_INFORM_EMAIL msg_"+body);
    18. }
    19. //监听sms队列
    20. @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
    21. public void receive_sms(Message message, Channel channel){
    22. String body = new String(message.getBody());
    23. System.out.println("消费者:QUEUE_INFORM_SMS msg_"+body);
    24. }
    25. }

    创建Controller

    1. package com.iterge.iterge_pre.controller;
    2. import com.iterge.iterge_pre.entity.Response;
    3. import com.iterge.iterge_pre.mq.ProducerService;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.web.bind.annotation.GetMapping;
    6. import org.springframework.web.bind.annotation.PathVariable;
    7. import org.springframework.web.bind.annotation.RequestMapping;
    8. import org.springframework.web.bind.annotation.RestController;
    9. /**
    10. * @author liuph
    11. * @date 2023/10/17 15:58:32
    12. */
    13. @RestController
    14. @RequestMapping("mq")
    15. public class MQController {
    16. @Autowired
    17. private ProducerService producerService;
    18. @GetMapping("send/{msg}")
    19. public Response send(@PathVariable(value = "msg") String mag){
    20. producerService.sendMag(mag);
    21. return Response.ok();
    22. }
    23. }

    测试:

    控制台信息:

     如上,消费者成功消费到hi_rabbitmq

    5.思考

    所以文章读完了,开篇的问题思考的有结果了吗?

    没有结果也没关系,下面我们做下总结:

    1. 为什么你的项目中要用MQ呢?

    这个问题要结合自己的业务场景来判断,比如多服务调用时需要应用解偶,高并发场景下提高需要保证服务的高可用,高性能等。

    2.使用MQ为你解决了什么问题?

    那MQ为我们解决什么问题呢,一般可以用6个字来概括:解偶、异步、削峰

    解偶场景:比如当B、C系统需要用到A系统的数据时,A系统要分别调用B、C两端的接口就行数据传输,如果这是在来个D系统也需要A的数据,那么A系统还要改造代码,这时我们就可以把方案调整为通过MQ就行数据传输,A生产数据,别的系统需要数据之间消费就行了

    异步场景:比如A系统需要调用B系统的接口进行业务操作,而B接口的业务逻辑又要用到C系统的接口,假如a自身业务处理耗时1s,a调b耗时2s,b调c耗时2秒,那整个流程耗时5s;

    如果把方案调整为,a把数据放到MQ中,b系统消费后进行业务处理,假如a自身业务处理耗时1s,MQ耗时1s,共耗时2s,大大提高了业务处理能力

    削峰场景:假如现在有1000w个用户请求a系统,所有请求的数据要存到数据库中,如果这些请求数据一股脑儿的都往数据库怼,那可能导致的结果是数据库顶不住,连接出现异常或者宕机,这种情况下改用MQ的方式,当数据进来时先把数据存的mq,b系统再慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

    3.当然解决问题的同时它又有哪些弊端值得注意?

    而在解决上面的问题的同时,MQ也有一些弊端需要我们注意,由于链路变长,就有存在一些数据一致性的问题,比如数据丢失、重复消费、顺序消费等问题就出现。

    本文章针对这些弊端问题暂时先不做讲述。

  • 相关阅读:
    计算机毕业设计Java自驾游网站系统(源码+系统+mysql数据库+lw文档)
    Go 代码测试
    lvresize与lvextend扩容逻辑卷的区别
    使用 Ruby 语言来解析开放文档格式 OOXML 文件
    AQS源码解析 7.共享模式_CyclicBarrier重复屏障
    MD5退出历史舞台你知道吗?
    青云1000------华为昇腾310
    画CMB天图使用Planck配色方案
    轻松入门自然语言处理系列 专题8 源码解读──基于HMM的结巴分词
    FPGA 学习笔记:Vivado 程序固化并烧写到 SPI Flash
  • 原文地址:https://blog.csdn.net/it_erge/article/details/133879897