同步通信:耗时长,受网络波动影响,不能保证高成功率,耦合性高。
同步,异步
并发:一段时间(1S)多个请求数
并行:时间节点,多个指令同时被执行
串行:顺利执行
同步方式的问题:当一个用户提交订单到成功需要300ms+300ms+300ms+20ms = 920ms,这是不能容忍的。也就是说库存、支付、物流、最后保存数据库全部成功,订单的提交才算完成。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理,提高了应用程序的响应时间。
消息队列:Redis 发布订阅(pub/sub)
异步方式:用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。也就是说,订单消息提交到MQ,MQ回馈一个消息成功,然后再把订单提交到数据库20ms,就完成了。至于MQ通知库存、支付、物流系统所花费的时间和订单系统成功没有关系了。 这样这个订单系统提升用户体验和系统吞吐量(单位时间内处理请求的数目)
服务与之间耦合度,比如订单服务与用户积分服务(需求:下单成功,增加积分)
如果不用消息队列,订单服务和积分服务就要通信,下单后调用积分服务的接口通知积分服务进行处理(或者定时扫描之类的),那么调用接口失败,或者延时等等…一系列的问题要考虑处理,非常繁琐
用了消息队列,用户A下单成功后下单服务通过redis发布(mq的生产者)一消息,就不用管了.用户积分服务redis订阅了(mq的消费者),就会受到这用户A下单的消息,进行处理.这就降低了多个服务之间的耦合,即使积分服务发生异常,也不会影响用户正常下单.处理起来就非常的丝滑,各干各的互不影响.
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
使用消息队列的方式:使用 MQ 使得应用间解耦,提升容错性和可维护性。库存和支付和物流直接去MQ取到订单的信息即可,即使库存系统报错,没关系,等到库存修复后再次从MQ中去取就可以了
订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。但不一定宕机,只会很慢,一旦宕机就会有消息丢失。
消息被MQ保存起来了,5000条数据对于MQ,简直是小意思,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了
Redis队列:Redis队列是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。
MQ队列 :在分布式系统中存储转发消息,在易用性、扩展性、高可用等方面表现不俗,主要是为了实现系统之间的双向解耦。
Redis没有相应的机制保证消息的消费,当消费者消费失败的时候,消费体丢失,需要手动处理。MQ:具有消息消费确认ACK,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费;
Redis采用主从模式,读写分离,但是故障转移还没有非常完善的官方解决方案;MQ集群采用磁盘、内存节点,任意单点故障都不会影响整个队列的操作;
将整个Redis实例持久化到磁盘,MQ的队列、消息,都可以选择是否持久化;
Redis的特点是轻量级,高并发,延迟敏感,用于即使数据分析、秒杀计数器、缓存等;MQ的特点是重量级,高并发,用于异步、批量数据异步处理、并发任务串行化,高负载任务的负载均衡等。
1.可靠性
redis:没有机制保证消息的可靠性,发布一条消息没有对应的订阅者的话,这条消息将丢失,不会存在内存中。
mq:具有消息确认机制,发布一条消息,没有消费者消费该队列,这条消息一直存放在队列中,直到有消费者消费了该条消息,保证消息的可靠消费。
2.实时性
redis实时性高,redis是高效的缓存服务器,所有数据到存在内存中,所以具有更高的实时性。
3.消费者负载均衡
mq队列可以被多个消费者同时监控消费,但每一条消息只能消费一次,由于mq的消费确认机制,因此能够根据消费者的消费能力调整负载。
redis发布订阅模式,一个队列可被多个消费者同时订阅,消息到达时,会将消息一次发送给每个订阅者,是一种消息的广播形式,redis本身不做消费者的负载均衡,因此消费效率存在瓶颈。
4.持久性
redis:redis的持久化是针对整个redis缓存,可将整个redis实例持久化到磁盘来做备份,以防止异常情况下导致数据丢失。
mq:每条消息都可以选择持久化,持久化粒度更小,更灵活。
5.队列监控
mq实现了后台监控平台,可在平台上看到所有创建的队列的详细情况。redis没有监控平台。
6.性能
发布消息时,数据较小时,redis性能高于mq,数据大小超过10K时redis比较慢。读取消息时,无论数据大小,redis性能高于mq。
总结:
redis:轻量级,低延迟,高并发,低可靠性。
mq:重量级,高可靠,异步,不保证实时。
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信,在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量
1、系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
2、系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
3、一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?
生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
容许短暂的不一致性。
确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
** ** | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义 | 自定义协议,社区封装了http协议支持 |
客户端支持语言 | 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,PHP,Perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python等 |
单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
功能特性 | 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
AMQP是一种协议,Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。
JMS即Java消息服务(JavaMessage Service)应用程序接口,好比java提供一套jdbc的接口API,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。 (Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛)
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ 基础架构如下图:
上图说明:
1、Broker:接收和分发消息的应用,就是一个中介,RabbitMQ Server就是 Message Broker
2、Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
3、Connection:publisher/consumer 和 broker 之间的 TCP 连接
4、Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销5、Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
6、Queue:消息最终被送到这里等待 consumer 取走
7、Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);官网对应模式介绍:https://www.rabbitmq.com/getstarted.html , 点击手册按钮 RabbitMQ Tutorials
RabbitMQ 官方地址:http://www.rabbitmq.com/ ,详见安装文档
http://erlang.org/download/
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.11.4
国内源
rabbitmq:https://mirrors.huaweicloud.com/rabbitmq-server/
rabbitmq-plugins.bat enable rabbitmq_management
http://localhost:15672/
用户名密码:guest/guest
登录用户是中文解决方案:
1、创建用户为英文,再安装相关环境
2、修改相应的目录
用管理员执行CMD
rabbitmq-service.bat remove
set RABBITMQ_BASE=D:\rabbitmq_server\data
rabbitmq-service.bat install
rabbitmq-plugins enable rabbitmq_management
1.添加相关依赖
修改pom.xml文件内容为如下:
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
dependencies>
2.启动类
package com.woniu.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class);
}
}
3.配置RabbitMQ
创建application.yml,内容如下:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /woniu
username: woniu
password: woniu
创建队列参数说明:
参数 | 说明 |
---|---|
name | 字符串值,queue的名称。 |
durable | 布尔值,表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 queue 是否会恢复/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。 |
exclusive | 布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。 |
autoDelete | 布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。 |
不指定 durable、exclusive 和 autoDelete 时,默认为 true 、 false 和 false 。表示持久化、非排它、不用自动删除。
创建交换机参数说明
参数 | 说明 |
---|---|
name | 字符串值,exchange 的名称。 |
durable | 布尔值,表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后,该 exchange 是否会恢复/仍存在。 |
autoDelete | 布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。 |
不指定 durable 和 autoDelete 时,默认为
true
和false
。表示持久化、不用自动删除
4.创建RabbitMQ队列与交换机绑定的配置类com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//队列名称
public static final String ITEM_QUEUE = "item_queue";
//声明队列
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
}
5.创建ProducerController类,发送消息到消息队列
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/senMq/{msg}")
public String senMq(@PathVariable String msg){
rabbitTemplate.convertAndSend(RabbitMQConfig.WONIU_QUEUE, msg);
return "OK";
}
}
6.创建ConsumerController类,接收消息到消息队列
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/getMq")
public Object getMq(){
return rabbitTemplate.receiveAndConvert(RabbitMQConfig.WONIU_QUEUE);
}
}
或者用消息监听处理类
编写消息监听器com.woniu.rabbitmq.listener.MyListener
package com..rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyListener {
@RabbitListener(queues = RabbitMQConfig.WONIU_QUEUE)
public void myListener1(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}
方式二
@Component
@RabbitListener(queues = RabbitMQConfig.WONIU_QUEUE)
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println(hello);
}
}
上述的入门案例中中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。
RabbitMQ是AMQP协议的Erlang的实现。
概念 | 说明 |
---|---|
连接Connection | 一个网络连接,比如TCP/IP套接字连接。 |
会话Session | 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。 |
信道Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 |
客户端Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 |
服务节点Broker | 消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 |
端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 |
消费者Consumer | 一个从消息队列里请求消息的客户端程序。 |
生产者Producer | 一个向交换机发布消息的客户端应用程序。 |
在入门案例中:
客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker;
channel.basicPublish方法对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channel.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。它们处于竞争者的关系,一条消息只会被一个消费者接收,rabbit采用轮询
的方式将消息是平均发送给消费者的;消费者在处理完某条消息后,才会收到下一条消息。应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。如生产者生产一千条消息,那么c1和c2各消费500条,队列消费消息是均衡分配
1、生产者
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/senMq/{msg}")
public String senMq(@PathVariable String msg){
rabbitTemplate.convertAndSend(RabbitMQConfig.WONIU_QUEUE, msg);
return "OK";
}
}
2、消费者
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConsumerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/getMq")
public Object getMq(){
return rabbitTemplate.receiveAndConvert(RabbitMQConfig.WONIU_QUEUE);
}
}
启动两个消费端,进行测试
订阅模式示例图:
在订阅模型中,多了一个exchange角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
1.创建RabbitMQ队列与交换机绑定的配置类com…rabbitmq.config.RabbitMQConfig
package com.woniu.springconsumer.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
// 广播模式
@Bean
public Queue gb(){
return new Queue("gb");
}
@Bean
public Queue gb01(){
return new Queue("gb01");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("gbe");
}
// gb01队列绑定到广播交换机
@Bean
public Binding getGb01ed(){
return BindingBuilder.bind(gb()).to(fanoutExchange());
}
// gb02队列绑定到广播交换机
@Bean
public Binding getGb02ed(){
return BindingBuilder.bind(gb02()).to(fanoutExchange());
}
}
1、实现生产者
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendFM/{msg}")
public String sendFM(@PathVariable String msg){
rabbitTemplate.convertAndSend("gbe", "", msg);
return "消息发送成功";
}
}
创建交换机参数说明:
参数 | 说明 |
---|---|
exchange | 字符串值,交换机名称 |
type | 交换机的类型,有三种类型:FANOUT、DIRECT、TOPIC |
durable | 交换机是否持久化,表示当rabbitmq重启时或者意外宕机,这个交换机还在不在 |
autoDelete | 是否自动删除,表示当该交换机没人发消息时,是否会被自动删除。 |
internal | 内部使用,一般为false |
arguments | 其它参数 |
发送消息参数说明
参数 | 说明 |
---|---|
exchange | 字符串值,交换机名称 |
routingKey | 如果交换机类型是fanout,则routingKey为"" |
props | 消息基本属性配置 |
body | 要发送的消息的内容 |
2、消费方实现
GbListener类
package com.woniu.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "gb")
public class GbListener {
@RabbitHandler
public void getMsg(String msg){
System.out.println("广播消息:" + msg);
}
}
Gb01Listener 类
package com.woniu.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "gb01")
public class Gb01Listener {
@RabbitHandler
public void getMsg(String msg){
System.out.println("广播01消息:" + msg);
}
}
发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
路由模式特点:
RoutingKey
(路由key)RoutingKey
。Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息在编码上与 Publish/Subscribe发布与订阅模式
的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
1、创建RabbitMQ队列与交换机绑定的配置类com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// Routing路由模式
@Bean
public Queue zl01(){
return new Queue("zl01");
}
@Bean
public Queue zl02(){
return new Queue("zl02");
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("zle");
}
@Bean
public Binding zlBinding01(){
return BindingBuilder.bind(zl01()).to(directExchange()).with("01");
}
@Bean
public Binding zlBinding02(){
return BindingBuilder.bind(zl02()).to(directExchange()).with("02");
}
}
2、生产方实现
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendD1M/{msg}")
public String sendD1M(@PathVariable String msg){
rabbitTemplate.convertAndSend("zle", "01", msg);
return "success";
}
@RequestMapping("/sendD2M/{msg}")
public String sendD2M(@PathVariable String msg){
rabbitTemplate.convertAndSend("zle", "02", msg);
return "success";
}
}
3.消费方实现
创建2个消费方并启动,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果
Zl01Listener类
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "zl01")
public class Zl01Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("zl01消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
Zl02Listener类
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "zl02")
public class Zl02Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("zl02消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配0个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert
或者item
item.*
:只能匹配item.insert
创建RabbitMQ队列与交换机绑定的配置类com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// Topics通配符模式
@Bean
public Queue tt01(){
return new Queue("tt01");
}
@Bean
public Queue tt02(){
return new Queue("tt02");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("tte");
}
@Bean
public Binding ttBinding01(){
return BindingBuilder.bind(tt01()).to(topicExchange()).with("#.error");
}
@Bean
public Binding ttBinding02(){
return BindingBuilder.bind(tt02()).to(topicExchange()).with("order.*");
}
}
1、生产方代码实现
使用topic类型的Exchange
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@RequestMapping("/sendT1MT1/{msg}")
public String sendT1MT1(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error", msg);
return "success";
}
@RequestMapping("/sendT1MT2/{msg}")
public String sendT1MT2(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.22.error", msg);
return "success";
}
@RequestMapping("/sendT1MF/{msg}")
public String sendT1MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error.22", msg);
return "success";
}
@RequestMapping("/sendT2MF/{msg}")
public String sendT2MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.11.22", msg);
return "success";
}
@RequestMapping("/sendT2MT/{msg}")
public String sendT2MT(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.1", msg);
return "success";
}
}
2、消费方实现
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "tt01")
public class Tt01Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("tt01消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "tt02")
public class Tt02Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
System.out.println("tt02消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
}
创建2个消费方并启动,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为:producer—>rabbitmq broker—>exchange—>queue—>consumer
我们将利用这两个 callback 控制消息的可靠性投递
1.引入依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
2.在配置文件中 添加publisher-confirm-type: correlated配置
spring:
rabbitmq:
host: localhost
port: 5672
username: woniu
password: woniu
virtual-host: /woniu
publisher-confirm-type: correlated
publisher-returns: true
在springboot2.2.0.RELEASE版本之前(spring.rabbitmq.publisher-confirm发布确认属性配置)是amqp正式支持的属性,用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;
NONE值是禁用发布确认模式,是默认值;
CORRELATED值是发布消息成功到交换器后会触发回调方法;
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
3.RabbitConfig增加代码
由于spring的Bean默认都是单例的,这个RabbitTemplate也不例外,既然每个RabbitTemplate对象只支持一个回调,那我们就在该Bean放入spring容器把该RabbitTemplate设置为原型的(也就是@Scope=“prototype”),RabbitTemplate 设置成多列模式,每次bean都是新的, 代码如下
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setMessageConverter(new SerializerMessageConverter());
return template;
}
注意:如果在controller中调用,还需要再controller添加
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- 1
4、编写ConsumerController类
package com.woniu.springconsumer.controller;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Scope;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.List;
@RestController
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ConsumerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendD2M/{msg}")
public String sendD2M(@PathVariable String msg){
/** 步骤:
* 确认模式: 生产者把消息发送给交换机,交换机收到消息的确认
* 1、确认模式的开启:publisher-confirm-type: correlated
* 2、在rabbitTemplate定义confirmCallback回调函数
*/
//定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相关配置信息
* @param ack 表示交换机是否成功收到生产者发送的 消息,true成功,false 失败
* @param s 失败原因,如果成功则该参数值为“”
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
System.out.println("confirm方法被执行.....");
if(b){
System.out.println("接收成功消息" + s);
}else{
//接收失败
System.out.println("接收失败消息" + s);
//做一些处理,让消息再次发送。
}
}
});
//该交换机zle01不存在,故回调方法的ack为false
rabbitTemplate.convertAndSend("zle", "02", msg);
return "消息发送成功";
}
}
在上个例子的基础上,再添加一个测试方法testReturn
package com.woniu.springconsumer.controller;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Scope;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.List;
@RestController
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ConsumerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendD2M/{msg}")
public String sendD2M(@PathVariable String msg){
/**
* 回退模式:当消息发送给Exchange,Exchange路由到Queue失败时,才执行ReturnCallback
* 步骤
* 1、开启回退模式:publisher-returns="true"
* 2、设置ReturnCallBack
* 3、设置Exchange处理消息的模式
* 1、如果消息没有路由到queue,则丢弃消息(这是默认规则)
* 2、如果消息没有路由到queue,返回给消息发送方ReturnCallBack
* */
//设置 ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机名称
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
//处理
}
});
rabbitTemplate.convertAndSend("zle", "022", msg);
return "消息发送成功";
}
}
由于路由键不正确 022,故交换机的消息无法发送到消息队列,setReturnCallback()方法,也就是Exchange路由到Queue失败时执行,这个前提是必须设置 rabbitTemplate.setMandatory(true);如果不加这句话,意味着交换机处理消息模式采用默认的模式,模式模式是直接丢掉该消息,不会执行setReturnCallback()方法。 当然如果交换机发送消息到队列,如果成功了也不会执行该方法,因为setReturnCallback是交换机发送消息到队列失败才执行的。
设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
txSelect(), 用于将当前channel设置成transaction模式
txCommit(),用于提交事务
txRollback(),用于回滚事务
ack指Acknowledge(翻译为:应答),表示消费端收到消息后的确认方式。有三种确认方式:
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动确认,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
1.引入相关依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
2.在配置文件中 添加手动确认的配置
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
3.编写Ack监听器
package com.woniu.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ChannelListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
/**
* Consumer ACK机制:
* 1、设置手动签收,在 listener-container容器中 添加 acknowledge = ”manual“
* 2. @RabbitListener(queues = "zl01")和 @RabbitHandler
* 3、如果消息成功处理,则调用channel的basicAck()签收
* 4、如果消息处理失败,则调用channel的basicNack()拒绝签收,broker会重新发送消息给consumer
*/
@Component
@RabbitListener(queues = "zl01")
public class Zl01Listener {
@RabbitHandler
public void getMsg(String msg, Message message, Channel channel) throws IOException {
try {
//1、接受转换的消息
//2、处理业务逻辑
System.out.println("zl01消息:" + msg);
int i = 1/ 0;
//3、手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息, 不能批量拒绝消息,第二个参数为true则不丢掉,为false表示丢掉
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
basicAck的批量应答问题说明:
channel.basicAck(8,true) 如果前面还有4,6,7的deliveryTag未被确认,则会一起确认,减少网络流量,当然当前deliveryTag=8这条消息也会确认,如果没有前面没有未被确认的消息,则只会确认当前消息,也就是说可以一次性确认某个队列小于等于delivery_tag值的所有消息
basicNack的参数说明:
第一个参数为deliveryTag,也就是每个消息标记index,消息标记值从1开始,依次递增
第二个参数为multiple,表示是否批量,如果为true,那么小于或者等于该消息标记的消息(如果还没有签收)都会拒绝签收
第三个参数为requeue,表示被拒绝的消息是否重回队列,如果设置为true,则消息重新回到queue,那么broker会重新推送该消息给消费端,如果设置为false,则消息在队列中被删除,即消息会被直接丢失(当然如果为false,还有一种情况就是放到死信队列)
4.测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test(){
while(true){}
}
}
启动之前的生产者发送消息给test_queue_confirm队列,如果抛出异常则该消息一直重发
持久化
生产方确认Confirm
消费方确认Ack
Broker高可用
消费端每次从队列中取一部分消息,然后消费者解决完业务处理,当业务处理完之后,消费者采用手动应答的方式,回应消息队列,然后继续取一部分消息处理,实现削峰填谷的效果
例如:多个生产者同时给MQ发送消息10000条,如果不做消费端限流,那么A系统请求瞬间增多 。限流就是让A系统每次从MQ取1000条,然后做业务处理,当处理完后,手动应答队列,然后队列在发1000条处理,反复10次即可处理完请求。
1.在消费端工程的配置文件配置如下
spring:
rabbitmq:
listener:
direct:
prefetch: 1
在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息,消费端的确认模式一定为手动确认。acknowledge=“manual”
2.编写监听器
package com.woniu.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* Consumer 限流机制
* 1、ack机制为手动确认
* 2、listener-container配置属性
* prefetch = 1 表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1、获取消息
System.out.println(new String(message.getBody()));
//2、处理业务逻辑
//3、签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
3.启动消费端工程
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test(){
while(true){}
}
}
4.在生产者方,添加一个测试方法,给test_queue_confirm队列发送消息
//限流测试
@Test
public void testSendQos(){
for (int i = 0; i <10 ; i++) {
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm ...");
}
}
注意:每次从队列中取一条处理,然后手动应答,如果注释掉手动应答(签收),那么消费者在第一次取一条消息后,不会从队列取消息了,因为这个时候队列的状态是Unacked(表示有一条为签收),ready为9,表示队列还有9条消息
如果发送到消费者的消息一直不确认,或者很长时间才确认,只有等到A消费者与rabbitmq的连接中断,rabbitmq才会考虑将A消费者未确认的消息重新投递给另一个消费者,而且每次也不会拉取固定的n条消息。确认多少则拉取多少
TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。当消息超过过期时间还没有被消费,则丢弃
1、添加交换机
2、添加队列,设置队列的过期时间
3、交换机和消息队列的绑定
交换机发送消息
代码实现:由于ttl表示消息在队列的存活时间,所以在生产者工程操作
1、创建RabbitMQ队列与交换机绑定的配置类com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
// 2. ttl队列
@Bean
public Queue ttlQueue(){
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 1000 * 10);
return QueueBuilder.durable("ttlQueue").withArguments(args).build();
}
}
2、生产方实现
package com.woniu.rabbitmq.controller;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class PrducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// ttl队列
@RequestMapping("/sendTtl/{msg}")
public String sendTtl(@PathVariable String msg) {
rabbitTemplate.convertAndSend("ttlQueue", msg);
return "OK";
}
}
3.测试
情况1:发给test_queue_ttl 队列的消息统一设置过期时间,交换机发给 test_queue_ttl 队列后,10秒后,消息消失。
情况2:某条消息单独设置过期时间
package com.woniu.rabbitmq.controller;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class PrducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// ttl队列
@RequestMapping("/sendTtl/{msg}")
public String sendTtl(@PathVariable String msg) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//刚才我们在配置文件设置的队列的消息是10秒,这里是5秒,注意:以时间短的为准
message.getMessageProperties().setExpiration("5000"); //消息的过期时间
return message;//消息一定要返回
}
};
rabbitTemplate.convertAndSend("ttlQueue",msg,messagePostProcessor);
return "OK";
}
}
情况3:发送给队列的n条信息中,单独给某个消息设置过期
package com.woniu.rabbitmq.controller;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class PrducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 死信DLX
@RequestMapping("/sendTtl/{msg}")
public String sendTtl(@PathVariable String msg) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//刚才我们在配置文件设置的队列的消息是10秒,这里是5秒,注意:以时间短的为准
message.getMessageProperties().setExpiration("5000"); //消息的过期时间
return message;//消息一定要返回
}
};
for (int i = 0; i < 10; i++) {
if(i==5){
rabbitTemplate.convertAndSend("ttlQueue",msg,messagePostProcessor);
}else{
rabbitTemplate.convertAndSend("ttlQueue",msg);
}
}
return "OK";
}
}
情况3:当i == 5 时,也就是给第五条消息设置过期时间是5秒,其它的还是10秒,发现失效,这里要注意一点,由于这条消息发送给队列的时候不是在队列的头部,故不会单独判断,而是和其它队列一样,10秒钟就消失,可以改成i==0,则第一条消息是5秒过期,或者i<3,即队列的头三条都是5秒的时间。
1、多条消息,要单独设置过期时间,则从第一条消息开始设置
2、配置文件和代码都设置了过期时间,以时间短的为准
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息在队列成为Dead message后,通过该队列把这条死信消息发给另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况(面试常问):
队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
1、创建RabbitMQ队列与交换机绑定的配置类com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
// 1. 创建DLX交换机
@Bean
public DirectExchange dlxDirectExchange(){
return new DirectExchange("dlx_exchange");
}
// 2. ttl队列
@Bean
public Queue ttlQueue(){
Map<String, Object> args = new HashMap<>();
// 1、正常队列绑定死信交换机-->
// 1.1 x-dead-letter-exchange 死信交换机的名称
args.put("x-dead-letter-exchange", "dlx_exchange");
// 1.2 x-dead-letter-routing-key 正常队列发送消息到死信 交换机的routingKey
args.put("x-dead-letter-routing-key", "dlx01");
// 2 消息成为死信的三种情况
// 2.1 设置队列的过期时间 ttl x-message-ttl
args.put("x-message-ttl", 1000 * 10);
// 2.2 设置队列的长度限制 x-max-length 10条消息,超过进死信
args.put("x-max-length", 10);
// 2.3 消费者拒接消费消息,并且不重回队列 这种情况后面在消费工程测试
return QueueBuilder.durable("ttlQueue").withArguments(args).build();
}
// 3. 死信队列
@Bean
public Queue dlxQ(){
return new Queue("dlxQ");
}
// 4.dlxQ绑定DXL交换机
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQ()).to(dlxDirectExchange()).with("dlx01");
}
}
2、生产者工程测试:
//死信队列测试
@Test
public void testDlx(){
//1、测试过期时间,死信消息
//rabbitTemplate.convertAndSend("ttlQueue","我是一条消息,我会死吗");
//2、测试队列长度限制,消息死信
for (int i = 0; i < 20 ; i++) {
rabbitTemplate.convertAndSend("ttlQueue","我是一条消息,我会死吗");
}
//前两步测试结果:死信队列会有21条记录 1(过期) + 10(限制)+10(正常队列过期后的10条)
}
3、消息成为死信的第三种情况实现
1).在配置文件中 添加手动确认的配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
2).添加正常队列的监听器
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*
* @Auther:吴兴龙
* @Date:2023/2/28
* @Description:
*/
@Component
@RabbitListener(queues = "ttlQueue")
public class TtlListener {
@SneakyThrows
@RabbitHandler
public void dlxQ(String msg, Message message, Channel channel){
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("处理业务逻辑");
int m = 1/0;
channel.basicAck(deliveryTag,true);
}catch(Exception ex){
/** basicNack(long deliveryTag, boolean multiple, boolean requeue)
* multiple是否批量. true:将一次性拒绝所有小于或者等于deliveryTag的消息。
* requeue:被拒绝的消息是否重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端,如果 * 为 requeue=false,不重回队列,则消息发送最终到死信队列
*/
channel.basicNack(deliveryTag,true,false);
}
}
}
3).在生产端的testDlx方法再次给正常交换机发送消息
//死信队列测试
@Test
public void testDlx(){
rabbitTemplate.convertAndSend("ttlQueue","我是一条消息,我会死吗?");
}
延迟队列,即消息进入队列后不会立即被消费者调用,只有到达指定时间后,才会被调用者调用消费。
如下需求:
当用户提交订单后,数据库保存订单信息,同时库存表相应的库存减少,然后消息队列保存订单的信息(如订单Id),此时库存系统监听队列,队列不会把消息立刻发送给库存,而是过30分钟再把信息发送给库存系统,库存系统去查询订单数据库,根据订单id查询,如果该订单还没有支付,则取消订单,回滚库存,如果支付过了,则库存表什么都不用做。也就是给用户30分钟的机会,一个订单在30分钟后还没有支付,则该订单的库存信息直接回滚。
实现方式:
1、创建RabbitMQ队列与交换机绑定的配置类com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
// 1. 创建DLX交换机
@Bean
public DirectExchange dlxDirectExchange(){
return new DirectExchange("dlx_exchange");
}
// 2. ttl队列
@Bean
public Queue ttlQueue(){
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 1000 * 10);
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "dlx01");
return QueueBuilder.durable("ttlQueue").withArguments(args).build();
}
// 3. 死信队列
@Bean
public Queue dlxQ(){
return new Queue("dlxQ");
}
// 4.dlxQ绑定DXL交换机
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQ()).to(dlxDirectExchange()).with("dlx01");
}
}
3、消费方监听类
package com.woniu.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "dlxQ")
public class Dlxlistener {
@RabbitHandler
public void dlxQ(String msg){
System.out.println("dlx is msg = " + msg);
}
}
3、生产者测试类
package com.woniu.rabbitmq.controller;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
public class PrducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendTtl/{msg}")
public String sendTtl(@PathVariable String msg) {
rabbitTemplate.convertAndSend("ttlQueue", msg);
return "OK";
}
}
4、启动消费者测试程序
RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。
1、查看队列
rabbitmqctl list_queues #查看所有虚拟主机里面的队列
rabbitmqctl list_queues -p /vhost #查看某个虚拟主机里面的队列
2、删除所有队列
rabbitmqctl stop_app #关闭应用
rabbitmqctl reset #清除队列中的消息
rabbitmqctl start_app # 再次启动此应用
注意:此方式,会同时删除一些配置信息,需要慎用
3、查看rabbitmq中的交换机
rabbitmqctl list_exchanges [-p vhost]
4、rabbitmq的用户操作命令
rabbitmqctl list_users
rabbitmqctl add_user 用户名 密码
rabbitmqctl delete_user 用户名
5、查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged
6、查看队列环境变量
rabbitmqctl environment
7、查看队列消费者信息
rabbitmqctl list_consumers
8、查看队列连接
rabbitmqctl list_connections
9、查看准备就绪的队列
rabbitmqctl list_queues name messages_ready
10、查看单个队列的内存使用
rabbitmqctl list_queues name memory
11、列出所有虚拟主机
rabbitmqctl list_vhosts
rabbitmqctl status | grep rabbit ##查看rabbitmq的版本
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。
对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
firehose的机制是将生产者投递给队列的消息,以及队列投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际交换机和队列的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。
1、打开trace 功能
rabbitmqctl trace_on [-p vhost] ##开启Firehose命令
打开 trace 会影响消息写入功能,适当打开后请关闭,关闭Firehose命令:rabbitmqctl trace_off [-p vhost],打开后会多一个交换机,如下图
2、新建一个消息队列,并给该交换机绑定一个消息队列
3、打开任何一个其他的队列,并往队列发送一条消息,则这个test_trace队列也会有其他队列的消息
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
1、启用插件:
[root@localhost ~]# rabbitmq-plugins list ###查询插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing
1、新建一个trace,将来所有的消息都被trace保存起来,文件的默认路径为/var/tmp/rabbitmq-tracing
不管在哪个队列发送消息,都会保存到日志文件mytrace.log中
如果是用其它的用户创建这个消息日志。则需要在/etc/rabbitmq/rabbit.config配置文件添加如下内容:创建的用户名和密码
{rabbitmq_tracing,
[
{directory, "/var/log/rabbitmq/rabbitmq_tracing"},
{username, "woniu"},
{password, "woniu"}
]
}
重启消息队列服务器即可
消息可靠性保障、消息幂等性处理 、微服务中用消息队列实现微服务的异步调用,而用openfeign采用的同步
需求:100%确保消息发送成功
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
第一次生产者发送一条消息,但是消费方系统宕机,即不能立即消费,于是回调检查服务监听不到Q2的响应消息,也不会写入数据库MDB,当隔一段时间后,生产者又发送一条延迟消息到Q3队列,回调检查服务能监听到Q3队列消息,于是和MDB去比较是否有,由于消费方的失败,消息最终没有入库MDB,这个时候回调检查服务和MDB数据库比对失败,于是通知生产者,重新发送一条消息给消费者,那么这个时候Q1就有2条消息了,当消费方正常运行的时候,由于监听的Q1是两条2消息,怎么办呢?乐观锁
第一次执行:version=1
update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
第二次执行:version=2
update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
9.3 消息积压问题
实际场景可能有这样现象:大量消息在rabbitmq里积压了几个小时了还没消息,怎么办?
这种时候只好采用 “丢弃+批量重导” 的方式来解决了,临时写个程序,连接到mq里面消费数据,收到消息之后直接将其丢弃,快速消费掉积压的消息,降低MQ的压力。或者多启几个消费端。
ueue上获取的消息。
1、打开trace 功能
rabbitmqctl trace_on [-p vhost] ##开启Firehose命令
打开 trace 会影响消息写入功能,适当打开后请关闭,关闭Firehose命令:rabbitmqctl trace_off [-p vhost],打开后会多一个交换机,如下图
[外链图片转存中…(img-hwBeTNK8-1694491328528)]
2、新建一个消息队列,并给该交换机绑定一个消息队列
[外链图片转存中…(img-5b1oO5AI-1694491328528)]
[外链图片转存中…(img-2KW2gIO3-1694491328529)]
3、打开任何一个其他的队列,并往队列发送一条消息,则这个test_trace队列也会有其他队列的消息
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
1、启用插件:
[root@localhost ~]# rabbitmq-plugins list ###查询插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing
[外链图片转存中…(img-a5kZLRTK-1694491328529)]
1、新建一个trace,将来所有的消息都被trace保存起来,文件的默认路径为/var/tmp/rabbitmq-tracing
[外链图片转存中…(img-KDWDRhqf-1694491328529)]
不管在哪个队列发送消息,都会保存到日志文件mytrace.log中
[外链图片转存中…(img-QhQ33iSY-1694491328529)]
如果是用其它的用户创建这个消息日志。则需要在/etc/rabbitmq/rabbit.config配置文件添加如下内容:创建的用户名和密码
{rabbitmq_tracing,
[
{directory, "/var/log/rabbitmq/rabbitmq_tracing"},
{username, "woniu"},
{password, "woniu"}
]
}
重启消息队列服务器即可
消息可靠性保障、消息幂等性处理 、微服务中用消息队列实现微服务的异步调用,而用openfeign采用的同步
需求:100%确保消息发送成功
[外链图片转存中…(img-gCVOCwsC-1694491328530)]
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
[外链图片转存中…(img-SvgKUCKK-1694491328530)]
第一次生产者发送一条消息,但是消费方系统宕机,即不能立即消费,于是回调检查服务监听不到Q2的响应消息,也不会写入数据库MDB,当隔一段时间后,生产者又发送一条延迟消息到Q3队列,回调检查服务能监听到Q3队列消息,于是和MDB去比较是否有,由于消费方的失败,消息最终没有入库MDB,这个时候回调检查服务和MDB数据库比对失败,于是通知生产者,重新发送一条消息给消费者,那么这个时候Q1就有2条消息了,当消费方正常运行的时候,由于监听的Q1是两条2消息,怎么办呢?乐观锁
第一次执行:version=1
update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
第二次执行:version=2
update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
9.3 消息积压问题
实际场景可能有这样现象:大量消息在rabbitmq里积压了几个小时了还没消息,怎么办?
这种时候只好采用 “丢弃+批量重导” 的方式来解决了,临时写个程序,连接到mq里面消费数据,收到消息之后直接将其丢弃,快速消费掉积压的消息,降低MQ的压力。或者多启几个消费端。