目录
目前业界有很多的 MQ 产品,如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需 求及 MQ 产品特征,综合考虑。

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进 行通信。



使用 MQ 使得应用间解耦,提升容错性和可维护性。

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms 用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢了

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰 就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直 到消费完积压的消息,这就叫做“填谷”。

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何 保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理 失败。如何保证消息数据处理的一致性?
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

RabbitMQ 中的相关概念⚫ Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
⚫ Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue
⚫ Connection:publisher/consumer 和 broker 之间的 TCP 连接
⚫ Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
⚫ Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
⚫ Queue:消息最终被送到这里等待 consumer 取走
⚫ Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据
简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
JMS 是 JavaEE 规范中的一种,API 规范接口,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有
- <dependencies>
- <!--rabbitmq java 客户端-->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.6.0</version>
- </dependency>
- </dependencies>
-
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.0</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
基础配置信息
- //1.创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //2. 设置参数
- factory.setHost("121.37.118.193");//ip 默认值 localhost
- factory.setPort(5672); //端口 默认值 5672
- factory.setVirtualHost("dobby");//虚拟机 默认值/
- factory.setUsername("dobby");//用户名 默认 guest
- factory.setPassword("123456");//密码 默认值 guest
- //3. 创建连接 Connection
- Connection connection = factory.newConnection();
- //4. 创建Channel
- Channel channel = connection.createChannel();

- //创建队列Queue
- /*
- queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments) - 参数:
- 1. queue:队列名称
- 2. durable:是否持久化,当mq重启之后,还在
- 3. exclusive:
- * 是否独占。只能有一个消费者监听这队列
- * 当Connection关闭时,是否删除队列
- *
- 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
- 5. arguments:参数。
- */
- //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
- channel.queueDeclare("hello_world",true,false,false,null);
- /*
- basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- 参数:
- 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
- 2. routingKey:路由名称
- 3. props:配置信息
- 4. body:发送消息数据
- */
-
- String body = "hello rabbitmq~~~";
-
- //发送消息
- channel.basicPublish("","hello_world",null,body.getBytes());

与简单模式供应端代码一致,只不过对应多个消费端,以下消费端代码示例
- String queue1Name = "test_fanout_queue1";
-
- // 接收消息
- Consumer consumer = new DefaultConsumer(channel){
- /*
- 回调方法,当收到消息后,会自动执行该方法
- 1. consumerTag:标识
- 2. envelope:获取一些信息,交换机,路由key...
- 3. properties:配置信息
- 4. body:数据
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- /* System.out.println("consumerTag:"+consumerTag);
- System.out.println("Exchange:"+envelope.getExchange());
- System.out.println("RoutingKey:"+envelope.getRoutingKey());
- System.out.println("properties:"+properties);*/
- System.out.println("body:"+new String(body));
- System.out.println("将日志信息保存数据库.....");
- }
- };
- /*
- basicConsume(String queue, boolean autoAck, Consumer callback)
- 参数:
- 1. queue:队列名称
- 2. autoAck:是否自动确认
- 3. callback:回调对象
- */
- channel.basicConsume(queue2Name,true,consumer);

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队 列绑 定到默认的交换机
- String exchangeName = "test_fanout";
- //创建交换机
- /*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map
arguments) - 参数:
- 1. exchange:交换机名称
- 2. type:交换机类型
- DIRECT("direct"),:定向
- FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
- TOPIC("topic"),通配符的方式
- HEADERS("headers");参数匹配
- 3. durable:是否持久化
- 4. autoDelete:自动删除
- 5. internal:内部使用。 一般false
- 6. arguments:参数
- */
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
- //创建队列
- String queue1Name = "test_fanout_queue1";
- String queue2Name = "test_fanout_queue2";
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
- //绑定队列和交换机
- /*
- queueBind(String queue, String exchange, String routingKey)
- 参数:
- 1. queue:队列名称
- 2. exchange:交换机名称
- 3. routingKey:路由键,绑定规则
- 如果交换机的类型为fanout ,routingKey设置为""
- */
- channel.queueBind(queue1Name,exchangeName,"");
- channel.queueBind(queue2Name,exchangeName,"");
-
- String body = "日志信息:张三调用了findAll方法...日志级别:info...";
- //发送消息
- channel.basicPublish(exchangeName,"",null,body.getBytes());
-
- //释放资源
- channel.close();
- connection.close();

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
- //创建交换机
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
- //创建队列
- String queue1Name = "test_direct_queue1";
- String queue2Name = "test_direct_queue2";
-
- channel.queueDeclare(queue1Name,true,false,false,null);
- channel.queueDeclare(queue2Name,true,false,false,null);
- //绑定队列和交换机
- //队列1绑定 error
- channel.queueBind(queue1Name,exchangeName,"error");
- //队列2绑定 info error warning
- channel.queueBind(queue2Name,exchangeName,"info");
- channel.queueBind(queue2Name,exchangeName,"error");
- channel.queueBind(queue2Name,exchangeName,"warning");
-
- String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
- //8. 发送消息
- channel.basicPublish(exchangeName,"warning",null,body.getBytes());

- //创建交换机
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
- ...
- // routing key 系统的名称.日志的级别。
- //=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
- channel.queueBind(queue1Name,exchangeName,"#.error");
- channel.queueBind(queue1Name,exchangeName,"order.*");
- channel.queueBind(queue2Name,exchangeName,"*.*");
-
- String body = "日志信息:张三调用了findAll方法...日志级别:info...";
- //8. 发送消息
- channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
rabbitmq.properties
- rabbitmq.host=121.37.118.193
- rabbitmq.port=5672
- rabbitmq.username=dobby
- rabbitmq.password=123456
- rabbitmq.virtual-host=dobby
Spring-rabbitmq-prodicer.xml
- "1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context
- https://www.springframework.org/schema/context/spring-context.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
-
- <context:property-placeholder location="classpath:rabbitmq.properties"/>
-
-
- <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
- port="${rabbitmq.port}"
- username="${rabbitmq.username}"
- password="${rabbitmq.password}"
- virtual-host="${rabbitmq.virtual-host}"/>
-
- <rabbit:admin connection-factory="connectionFactory"/>
-
-
-
-
- <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
-
-
-
- <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
-
-
- <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
-
-
- <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
- <rabbit:bindings>
- <rabbit:binding queue="spring_fanout_queue_1" />
- <rabbit:binding queue="spring_fanout_queue_2"/>
- rabbit:bindings>
- rabbit:fanout-exchange>
-
-
-
-
-
- <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
-
- <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
-
- <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
-
- <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
- <rabbit:bindings>
- <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
- <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
- <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
- rabbit:bindings>
- rabbit:topic-exchange>
-
-
- <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
- beans>
测试代码:
-
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
- public class ProducerTest {
-
- //1.注入 RabbitTemplate
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
-
- @Test
- public void testHelloWorld(){
- //2.发送消息
-
- rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
- }
-
-
- /**
- * 发送fanout消息
- */
- @Test
- public void testFanout(){
- //2.发送消息
-
- rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
- }
-
-
- /**
- * 发送topic消息
- */
- @Test
- public void testTopics(){
- //2.发送消息
-
- rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topic....");
- }
- }
SpringQueueListener
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageListener;
-
- public class SpringQueueListener implements MessageListener {
- @Override
- public void onMessage(Message message) {
- //打印消息
- System.out.println(new String(message.getBody()));
- }
- }
Spring-rabbitmq-consumer.xml
- "1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context
- https://www.springframework.org/schema/context/spring-context.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
-
- <context:property-placeholder location="classpath:rabbitmq.properties"/>
-
-
- <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
- port="${rabbitmq.port}"
- username="${rabbitmq.username}"
- password="${rabbitmq.password}"
- virtual-host="${rabbitmq.virtual-host}"/>
-
- <bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
-
-
- <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
- <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
-
- rabbit:listener-container>
- beans>
-
- <parent>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-parentartifactId>
- <version>2.1.4.RELEASEversion>
- parent>
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- dependency>
- dependencies>
- @Configuration
- public class RabbitMQConfig {
-
- public static final String EXCHANGE_NAME = "boot_topic_exchange";
- public static final String QUEUE_NAME = "boot_queue";
-
- //1.交换机
- @Bean("bootExchange")
- public Exchange bootExchange(){
- return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
- }
-
-
- //2.Queue 队列
- @Bean("bootQueue")
- public Queue bootQueue(){
- return QueueBuilder.durable(QUEUE_NAME).build();
- }
-
- //3. 队列和交互机绑定关系 Binding
- /*
- 1. 知道哪个队列
- 2. 知道哪个交换机
- 3. routing key
- */
- @Bean
- public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
- }
-
-
- }
- # 配置RabbitMQ的基本信息 ip 端口 username password..
- spring:
- rabbitmq:
- host: 172.16.98.133 # ip
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- public class ProducerTest {
-
- //1.注入RabbitTemplate
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void testSend(){
-
- rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~");
- }
- }
消费者:
RabbimtMQListener
- @Component
- public class RabbimtMQListener {
-
- @RabbitListener(queues = "boot_queue")
- public void ListenerQueue(Message message){
- //System.out.println(message);
- System.out.println(new String(message.getBody()));
- }
-
- }