目录
首先我要抛出几个问题让大家思考一下:为什么你的项目中要用MQ呢?使用MQ为你解决了什么问题?当然解决问题的同时它又有哪些弊端值得注意?
如果你不太清楚或者你根本没有考虑过,那么请往下看你会找到你想要的答案。
官网介绍:RabbitMQ is the most widely deployed open source message broker.(RabbitMQ是部署最广泛的开源信息代理。)
RabbitMQ采用Erlang语言开发,是AMQP(Advanced Message Queuing Protocol)的标准实现。支持持久化,支持多种客户端,如 Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等。常用于分布式系统中的存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
- 分布式部署,支持集群模式、跨区域部署,以满足高可用、高吞吐量应用场景;
- 异步消息传递,支持多种消息传递协议、消息队列、传递确认机制,灵活的路由消息到队列,多种交换类型;
- 良好的开发者体验,可在许多操作系统及云环境中运行,并为大多数流行语言提供各种开发工具;
- 可插拔身份认证授权,支持 TLS(Transport Layer Security)和 LDAP(Lightweight Directory Access Protocol),轻量且容易部署到内部、私有云或公有云中;
- 有专门用于管理和监督的HTTP-API、命令行工具和 UI;
- 支持连续集成,可以插件方式灵活地扩展 RabbitMQ 的功能。
优点:
- 由于 Erlang 语言的特性,RabbitMQ 性能较好、高并发;
- 有消息确认机制和持久化机制,可靠性高;
- 高度灵活可定制的路由;
- 管理界面较丰富,在互联网公司也有较大规模的应用;
- 健壮、稳定、易用、跨平台、支持多种语言客户端、文档齐全;
- 社区活跃度高,更新快。
缺点:
- 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
- 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得 RabbitMQ 易于使用和部署,但使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;
- 需要学习比较复杂的接口和协议,学习和维护成本较高。
安装官网:RabbitMQ
RabbitMQ支持window、unix、macos等平台安装,按自己的需要进行安装
管理端页面:http://localhost:15672/
账号密码:guest
因为springboot已经有mq相关starter,所以我们之间引用依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
yml文件配置信息:
- spring:
- #rabbitmq配置
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- virtual-host: /
配置文件:
- package com.iterge.iterge_pre.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @author liuph
- * @date 2023/10/17 15:50:37
- */
- @Configuration
- public class RabbitmqConfig {
- public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
- public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
- public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
- public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
- public static final String ROUTINGKEY_SMS="inform.#.sms.#";
-
- //声明交换机
- @Bean(EXCHANGE_TOPICS_INFORM)
- public Exchange EXCHANGE_TOPICS_INFORM(){
- //durable(true) 持久化,mq重启之后交换机还在
- return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
- }
-
- //声明QUEUE_INFORM_EMAIL队列
- @Bean(QUEUE_INFORM_EMAIL)
- public Queue QUEUE_INFORM_EMAIL(){
- return new Queue(QUEUE_INFORM_EMAIL);
- }
- //声明QUEUE_INFORM_SMS队列
- @Bean(QUEUE_INFORM_SMS)
- public Queue QUEUE_INFORM_SMS(){
- return new Queue(QUEUE_INFORM_SMS);
- }
-
- //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
- @Bean
- public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
- @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
- }
- //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
- @Bean
- public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
- @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
- }
- }
生产者配置:
- package com.iterge.iterge_pre.mq;
-
- import com.iterge.iterge_pre.config.RabbitmqConfig;
- import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * @author liuph
- * @date 2023/10/17 15:52:48
- */
- @Component
- public class ProducerService {
- @Autowired
- private RabbitMessagingTemplate mqTemplate;
-
- public void sendMag(String msg){
- mqTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", msg);
- }
- }
消费者配置:
- package com.iterge.iterge_pre.mq;
-
- import com.iterge.iterge_pre.config.RabbitmqConfig;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @author liuph
- * @date 2023/10/17 15:53:04
- */
- @Component
- public class ConsumerService {
- //监听email队列
- @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
- public void receive_email(Message message, Channel channel){
- String body = new String(message.getBody());
- System.out.println("消费者:QUEUE_INFORM_EMAIL msg_"+body);
- }
- //监听sms队列
- @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
- public void receive_sms(Message message, Channel channel){
- String body = new String(message.getBody());
- System.out.println("消费者:QUEUE_INFORM_SMS msg_"+body);
- }
-
- }
创建Controller
- package com.iterge.iterge_pre.controller;
-
- import com.iterge.iterge_pre.entity.Response;
- import com.iterge.iterge_pre.mq.ProducerService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * @author liuph
- * @date 2023/10/17 15:58:32
- */
- @RestController
- @RequestMapping("mq")
- public class MQController {
- @Autowired
- private ProducerService producerService;
-
- @GetMapping("send/{msg}")
- public Response
send(@PathVariable(value = "msg") String mag){ - producerService.sendMag(mag);
- return Response.ok();
- }
- }
测试:
控制台信息:
如上,消费者成功消费到hi_rabbitmq
所以文章读完了,开篇的问题思考的有结果了吗?
没有结果也没关系,下面我们做下总结:
1. 为什么你的项目中要用MQ呢?
这个问题要结合自己的业务场景来判断,比如多服务调用时需要应用解偶,高并发场景下提高需要保证服务的高可用,高性能等。
2.使用MQ为你解决了什么问题?
那MQ为我们解决什么问题呢,一般可以用6个字来概括:解偶、异步、削峰
解偶场景:比如当B、C系统需要用到A系统的数据时,A系统要分别调用B、C两端的接口就行数据传输,如果这是在来个D系统也需要A的数据,那么A系统还要改造代码,这时我们就可以把方案调整为通过MQ就行数据传输,A生产数据,别的系统需要数据之间消费就行了
异步场景:比如A系统需要调用B系统的接口进行业务操作,而B接口的业务逻辑又要用到C系统的接口,假如a自身业务处理耗时1s,a调b耗时2s,b调c耗时2秒,那整个流程耗时5s;
如果把方案调整为,a把数据放到MQ中,b系统消费后进行业务处理,假如a自身业务处理耗时1s,MQ耗时1s,共耗时2s,大大提高了业务处理能力
削峰场景:假如现在有1000w个用户请求a系统,所有请求的数据要存到数据库中,如果这些请求数据一股脑儿的都往数据库怼,那可能导致的结果是数据库顶不住,连接出现异常或者宕机,这种情况下改用MQ的方式,当数据进来时先把数据存的mq,b系统再慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
3.当然解决问题的同时它又有哪些弊端值得注意?
而在解决上面的问题的同时,MQ也有一些弊端需要我们注意,由于链路变长,就有存在一些数据一致性的问题,比如数据丢失、重复消费、顺序消费等问题就出现。
本文章针对这些弊端问题暂时先不做讲述。