
日升时奋斗,日落时自省
目录
注:该博客有点长,重复内容不多,希望友友们慢慢观看
(1)应用解耦
服务与服务之前进行远程调用时不需要直接调度,降低了关联性,服务直接给中间件发送消息通过MQ消息给调度服务,做到降低服务与服务之间耦合性

(2)异步提速
如何做到异步提速呢???主要就是MQ作为消息中间人,进行转发告诉其他服务这边已经好了,你们可以开始了,为啥说这里的异步快,因为Rabbitmq请求速度是微妙级别的,速度相比远程调用的请求速度是要快的多的

(3)削峰填谷
先来理解一下削峰填谷:图有点抽象,麻烦友友们慢慢看一下,暂时也没有想到很突显的图

如何做到削峰填谷呢

MQ的产品

AMQP, 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
RabbitMQ基础架构图
下面来解释以下怎么看这个图:producer就是提供者,通过channel(频道)连接到Exchange(交换机)交换机通过虚拟用户绑定一个Queue(队列);consumer就是消费者,通过channel(频道)直接去Queue队列里拿消息(前提是要有消息)
Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace (进行隔离)概念。当多个不同的用户使用同一个 RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/ queue 等
Connection: publisher/ consumer 和 broker 之间的 TCP 连接
Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。 Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯, AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection极大减少了操作系统建立 TCP connection 的开销
Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue: 消息最终被送到这里等待 consumer 取走
Binding: exchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key。 Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
去官网上看有以下六种模式:友友们可以直接来官网上看RabbitMQ Tutorials — RabbitMQ

RabbitMQ安装在linux上需要一定的依赖环境
这里直接给友友们 配上:(如果是Ubuntu的直接yum换成apt就行)
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
还需要一个前置环境Erlang(从下面进行去安装包,也可以去对应官网取)
链接: https://pan.baidu.com/s/1QVOM5FOHFezovA63vyi5Bg?pwd=1234 提取码: 1234
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
开启管理界面:
rabbitmq-plugins enable rabbitmq_management
修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
注:进来后找这个位置(只改这一行)改成下面的样子就可以了一会要靠这个登录呢

启动RabbitMQ
service rabbitmq-server start
访问地址:ip:15672 (如果界面打不开,可能是防火墙没有关,或者云服务器安全组没有开放,针对问题稍微百度一下即可)
注:15672是RabbitMQ的Web管理界面的默认监听端口,5672是AMQP协议的默认端口,25672是RabbitMQ集群间节点通信的默认端口(当前我们使用管理页面访问15672接口即可)

注:账号密码都是guest
mq.tar文件还是从https://pan.baidu.com/s/1QVOM5FOHFezovA63vyi5Bg?pwd=1234取
docker直接拉取也挺好用的,先把tar文件加载一下
docker load -i mq.tar
然后创建一个网络
docker network create rabbtimq-maxxub
剩下的跑起来就行 -e 是环境变量其实就是设置登录名和密码
到时候登录名:maxxub; 密码就是1223456
docker run \
-e RABBITMQ_DEFAULT_USER=maxxub \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network rabbtimq-maxxub \
-d \
rabbitmq:3.8-management
(1)父工程
直接上springboot先创建父工程目起个名字(我这里叫mq-demo)以下是需要的依赖
-
- <module>publishermodule>
- <module>consumermodule>
-
-
pom -
-
-
org.springframework.boot -
spring-boot-starter-parent -
2.7.12 -
-
-
-
-
8 -
8 -
-
-
-
-
org.projectlombok -
lombok -
-
-
-
org.springframework.boot -
spring-boot-starter-amqp -
-
-
-
org.springframework.boot -
spring-boot-starter-test -
-
-
org.springframework.boot -
spring-boot-starter-logging -
-
(2)创建消费者模块和提供者模块(子模块不需要依赖)


application.yml 消费者和提供者都可以使用同一份,复制过去就行了
下面需要友友们知道有虚拟用户这个东西;

- logging:
- pattern:
- dateformat: MM-dd HH:mm:ss:SSS
- level:
- com.itheima: debug
- spring:
- rabbitmq:
- host: 这里写友友们自己的虚拟机ip或者云服务器ip
- port: 5672
- virtual-host: / #虚拟用户
- username: maxxub #登录rabbitmq的账号
- password: 123456 #登录rabbitmq的密码
- listener:
- simple:
- prefetch: 1
- acknowledge-mode: auto #模式:自动 进行一定的重试次数
- retry:
- enabled: true # 超时重试
- initial-interval: 1000ms
- multiplier: 2 #连接失败的重试
- max-attempts: 3 #最大重试次数的
- stateless: true #针对事务,默认为false ,true针对上下文有定义保存,是一个状态

P表示:producer 提供者
红色框:Queue 队列
C表示:consumer 消费者
简介叙述:直接通过队列进行,比较简单,没有啥条件限制
提供者:这里直接就在测试进行开始写了
消息发送依赖于RabbitTemplate类,这个类直接注入就行了

- @Slf4j
- @SpringBootTest
- public class SpringAmqpTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void testSendMessage2Queue(){
- String queueName="simple.queue"; //我们这里写队列名称一会在管理页面创建
- String msg="hello,ampq!"; //消息
- rabbitTemplate.convertAndSend(queueName,msg); //参数队列 + 消息
- }
- }
消费者:
先去创建一个队列,直接到rabbitmq管理页面进行创建就行了,ip:15672 直接登录既可以

消费者的代码

- @SpringBootApplication
- public class ConsumerApplication { //启动类
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApplication.class, args);
- }
- }
- @Slf4j
- @Component
- public class MqListener {
- @RabbitListener(queues = "simple.queue") //使用该注解监听队列消息
- public void listenSimpleQueue(String msg){
- System.out.println("消费者收到simple.queue的消息"+msg);
- }
- }
注:先启动消费者的启动类 ,再启动提供者的测试代码消费者会收到消息并且直接打印


P表示:producer 提供者
红色框:Queue 队列
C表示:consumer 消费者
注:这次有两个消费者,其实也可以是多个消费者
与简单模式没有很大的区别,使用同一个队列
消费者:(很简单没有任何区别,就是添加了一个方法) 该方法还是写在MqListener 类中
- @Slf4j
- @Component
- public class MqListener {
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到simple.queue的消息"+msg);
- Thread.sleep(20);
- }
- @RabbitListener(queues = "simple.queue")
- public void listenSimpleQueue2(String msg) throws InterruptedException {
- System.err.println("消费者2收到simple.queue的消息"+msg);
- Thread.sleep(200);
- }
- }
注:这里加了睡眠时间是为了下面演示做准备
提供者:(测试一下)写在producer测试代码 其实也没用改啥,就是给这个队列多发点消息
- @Slf4j
- @SpringBootTest
- public class SpringAmqpTest {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void testSendMessage2Queue() throws InterruptedException {
- String queueName="simple.queue";
- for(int i=1;i<50;i++){
- String msg="hello,ampq!";
- rabbitTemplate.convertAndSend(queueName,msg);
- Thread.sleep(20);
- }
- }
- }
注:先启动消费者启动类,在启动提供者测试类
- listener:
- simple:
- prefetch: 1 #每次每次只处理一个消息
- acknowledge-mode: auto
- retry:
- enabled: true # 超时重试
- initial-interval: 1000ms #设定重试时间
- multiplier: 2 #连接失败的重试
- max-attempts: 3 #最大重试次数
- stateless: true
这里给消费者2设置等待时间是200ms秒,消费者设置等待时间是20ms,处理是及时处理,也就是一部分是消费者1一部分是消费者2
这里展示的结果跟我们前面的配置参数有关
- listener:
- simple:
- prefetch: 1 #每次每次只处理一个消息
- acknowledge-mode: auto
- retry:
- enabled: true # 超时重试
- initial-interval: 1000ms #设定重试时间
- multiplier: 2 #连接失败的重试
- max-attempts: 3 #最大重试次数
- stateless: true

如果把prefetch: 1该参数去掉消费者2等待时间为200ms>>20ms所以会进行等待处理导致消息处理饥饿


P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。 Exchange有常见以下3种类型:
简单说三种常见交换机类型:fanout、direct、topic
有交换机和队列了,我们简单教友友们了解一下如何绑定交换机和队列关系在管理页面
后面就开始给友友们代码直接创建交换机和队列并且进行绑定(这是比较好的操作)

下面这个位置创建一个类

直接上代码:(代码创建交换机和队列并且进行绑定,这里创建队列与管理页面创建效果都是一样的,就是更快,也不容易出问题,消费者跑起来以后,可以去管理页面看,交换机和队列都是存在的)
- @Configuration
- public class FanoutConfiguration {
-
- @Bean //创建FanoutExchange类型交换机 以下是两种方法 两个选择一个就行
- public FanoutExchange fanoutExchange(){
- // ExchangeBuilder.fanoutExchange("maxxub.fanout").build(); //使用Builder创建
- return new FanoutExchange("maxxub.fanout2"); //直接new一个交换机对象 参数:填写交换机名称
- }
-
- @Bean //创建Queue类型队列 以下是两种方法 两个选择一个就行
- public Queue fanoutQueue3(){
- // QueueBuilder.durable("fanout.queue3").build(); //使用Builder创建
- return new Queue("fanout.queue3"); //直接new一个队列对象 参数:填写队列名称
- }
-
- @Bean //进行队列和交换机绑定 使用绑定构建
- public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
-
- //直接看 BindingBuilder 就是 bind(绑定)fanoutQueue3队列 to(到) fanoutExchange交换机上
- return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
- }
-
-
- @Bean
- public Queue fanoutQueue4(){
- // QueueBuilder.durable("fanout.queue4").build();
- return new Queue("fanout.queue4");
- }
-
- @Bean
- public Binding fanoutBinding4(){
- //可以直接调用方法
- return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
- }
- }
消费者:(MqListener类中写就行) 还是同样的监听
- @RabbitListener(queues = "fanout.queue3")
- public void listenFanoutQueue3(String msg) throws InterruptedException {
- System.out.println("消费者收到fanout.queue3的消息"+msg);
- Thread.sleep(200);
- }
- @RabbitListener(queues = "fanout.queue4")
- public void listenFanoutQueue4(String msg) throws InterruptedException {
- System.err.println("消费者收到fanout.queue4的消息"+msg);
- Thread.sleep(200);
- }
提供者:(写在producer的测试类中)
注:fanout刻画类型是没有路由的,所这里就是null,我们本身也没有设置路由
- @Test
- void testFanoutQueue(){
- //前面创建的是maxxub.fanout2交换机所以这里也绑定maxxub.fanout2
- String exchangeName="maxxub.fanout2"; //交换机名称
- String msg="hello , amqp"; //消息
- //参数:交换机、路由、消息
- rabbitTemplate.convertAndSend(exchangeName,null,msg);
- }
注:先启动消费者启动类 ,再启动提供者测试方法(效果如下)


就拿这个图来分析一下:这里的P(提供者)连接X(交换机)设置不同Routing(路由)给两个队列,相当于给两个队列加上了特殊的标签,black路由(标签)和green路由(标签)给了Q2队列,这个队列就负责接收black,green标签的消息其他的消息不归我管,orange路由(标签)给Q1队列,这个队列就负责接收orange标签的消息

创建Direct交换机和队列并设置路由,进行绑定 (不用的时候,把@Configuration注解注释掉)
- @Configuration
- public class DirectConfiguration {
-
- @Bean //创建DirectExchange交换机 以下是两种方法 进行创建
- public DirectExchange directExchange(){
- //参数就是 : 交换机的名称
- // ExchangeBuilder.fanoutExchange("maxxub.direct").build(); //使用Builder进行创建交换机
- return new DirectExchange("maxxub.direct");
- }
-
- @Bean //创建 队列
- public Queue directQueue3(){
- //填写参数就是 : 队列名称
- // QueueBuilder.durable("direct.queue1").build();
- return new Queue("direct.queue1");
- }
-
- @Bean //进行交换机和队列的绑定 同时添加上路由
- public Binding directBinding1(Queue directQueue3,DirectExchange directExchange){
- //解释 : BindingBuilder bind(绑定)directQueue3队列 to(到)directExchange with(带上路由)red
- return BindingBuilder.bind(directQueue3).to(directExchange).with("red");
- }
- @Bean
- public Binding directBinding2(Queue directQueue3,DirectExchange directExchange){
- //解释 : BindingBuilder bind(绑定)directQueue3队列 to(到)directExchange with(带上路由)blue
- return BindingBuilder.bind(directQueue3).to(directExchange).with("blue");
- }
-
-
- @Bean
- public Queue directQueue4(){
- // QueueBuilder.durable("direct.queue2").build();
- return new Queue("direct.queue2");
- }
-
- @Bean
- public Binding directBinding4(){
- //可以直接调用方法 于上面是雷同的,交换机一个就够了,这里再给交换机绑定队列和路由就行了
- return BindingBuilder.bind(directQueue4()).to(directExchange()).with("red");
- }
- @Bean
- public Binding directBinding3(){
- //可以直接调用方法
- return BindingBuilder.bind(directQueue4()).to(directExchange()).with("yellow");
- }
- }
注:这种方法比较麻烦,如果需要绑定多个路由或者队列还是很不方便,这里只作为演示,下面开始使用注解进行创建交换机和队列进行绑定并且带上路由
消费者:(MqListener类中写就行) 还是同样的监听
参数解释:
bindings就是绑定参数;
@QueueBinding注解中的参数value就是需要绑定的队列(name队列名称,durable就是消息持久化)
@QueueBinding注解中的参数exchange就是被绑定的交换机(name交换机名称,type交换机类型)
参数key就是路由:{可以直接写多个路由参数}
注:好处就是能节省很多代码,很简单,并且很很清晰
- @RabbitListener(bindings = @QueueBinding(
- value =@Queue(name = "direct.queue1",durable = "true"),
- exchange = @Exchange(name = "maxxub.direct",type = ExchangeTypes.DIRECT),
- key = {"red","blue"}))
- public void listenDirectQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到direct.queue1的消息"+msg);
- Thread.sleep(200);
- }
- @RabbitListener(bindings = @QueueBinding(
- value =@Queue(name = "direct.queue2",durable = "true"),
- exchange = @Exchange(name = "maxxub.direct",type = ExchangeTypes.DIRECT),
- key = {"red","yellow"}))
- public void listenDirectQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2收到direct.queue2的消息"+msg);
- Thread.sleep(200);
- }
提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示
- @Test
- void testDirectQueue(){
- String exchangeName="maxxub.direct";
- String msg="红色警报";
- //参数:交换机,路由,消息
- rabbitTemplate.convertAndSend(exchangeName,"red",msg);
- }
注:先启动消费者启动类,在启动提供者的测试方法(因为两个队列都有路由“red”,所以两个对哦都会收到消息)

路由使用通配符的形式进行
*:代表一个字符
#:代表0个或者多个字符

英文大概意思就是这里路由是通过带有通配符进行的,和direct类型不同的就是通配符能更灵活的设置路由(类似标签)
消费者:(MqListener类中写就行) 还是同样的监听 这里没有啥改变基本就是key改了一点点
- @RabbitListener(bindings = @QueueBinding(
- value =@Queue(name = "topic.queue1",durable = "true"),
- exchange = @Exchange(name = "maxxub.topic",type = ExchangeTypes.TOPIC),
- key = {"#.new"}))
- public void listenTopicQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到topic.queue1的消息");
- Thread.sleep(200);
- }
- @RabbitListener(bindings = @QueueBinding(
- value =@Queue(name = "topic.queue2",durable = "true"),
- exchange = @Exchange(name = "maxxub.topic",type = ExchangeTypes.TOPIC),
- key = {"china.#"}))
- public void listenTopicQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2收到topic.queue2的消息"+msg);
- Thread.sleep(200);
- }
提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示
- @Test
- void testTopicQueue(){
- String exchangeName="maxxub.topic";
- String msg="中国繁荣";
- rabbitTemplate.convertAndSend(exchangeName,"china.strong",msg);
- }
注:先启动消费者启动类,在启动提供者的测试方法(因为两个队列都有路由“red”,所以两个对哦都会收到消息)
解释:这里china.#的路由才能满足条件,所以只有topic.queue2队列接收到了消息

死信队列,英文缩写: DLX 。 Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX
主要就是为了处理消息丢失的现象,如果消息长时间处理不带,或者确认机制设置的问题导致消息丢失了怎么办,不能说丢了就丢了吧,死信队列就是一个比较好的办法去解决

就是给普通的消息队列设置一个消息过期时间,在一定时间内没有被处理掉的,那就把它放到私信队列中
思路:我们给simple.queue队列发消息设置过期时间10000ms,但是这个队列没有提供者,同时给他绑定一个死信交换机和死信队列,消息在10000ms以后会流入死信队列中
先做前置操作:

上面是一步一步的过程,下面是基本操作流程

消费者:MqListener类中写就行)这里只写死信队列的就可以了,simple队列是故意不让他拿到消息的,为了能够超过过期时间
- @RabbitListener(queues = "dlx.queue")
- public void listenDlxQueue2(String msg) throws InterruptedException {
- log.info("消费者收到dlx.queue的消息"+msg);
- Thread.sleep(200);
- }
提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示
参数没有很多的变化,消息队列、路由、消息、一个消息处理器的对象进行设置消息过期时间
- //延迟队列
- @Test
- void testSendTTLMessage(){
- rabbitTemplate.convertAndSend("simple.direct", "hi", "hello", new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- //设置消息拿到消息参数,进行设置过期时间
- message.getMessageProperties().setExpiration("10000");
- return message;
- }
- });
- //下面打印一下消息是执行成功的
- log.info("消息发送成功");
- }
注:先启动消费者启动类,在启动提供者的测试方法 (这里不好具体展示)

Rabbitmq在3.6.0版本开始就添加了Lazy Queue概念,惰性队列
惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认是2048条)
消费者要消费消息时才会从磁盘中读取并加载到内存
支持数百万条的消息存储
但是现在会直接进行默认Lazy Queue模式

官方的大体意思就是:3.12版本开始这个参数就可以进行忽略了,开始默认为Lazy Queue队列,但是仍然有用户是哟弄过CQv1队列,也就是经典队列,这里是可以通过参数配置进行调整为CQv2,配置在rabbitmq的配置文件中
classic_queue.default_version=2
我这里使用的是rabbitmq3.8版本,所以这里展示还是通过代码方式给友友们展示一下
在管理界面添加一个lazy.queue队列

按照以上步骤会有这么个提示:(将鼠标放上去)

提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示
- @Test
- void testPageOut(){
- Message msg = MessageBuilder
- .withBody("hello".getBytes(StandardCharsets.UTF_8)) //带有消息参数消息只能以byte类型发送所以这里自动转化一下
- .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); //设置一下队列的模式,这里NON_PERSISTENT表示是数据不是持久的,重启mq消息会丢失
- //尝试发送10万条消息
- for(int i=0;i<100000;i++){
- rabbitTemplate.convertAndSend("lazy.queue",msg);
- }
- }
会有以下这两个点需要友友们去看 ,消息处理时间很快,并且也大量数据情况下消息不会断

当然了,也不仅仅就是了看消息处理的有多快的,也是为了实用的,前面说去配置配置文件也是可以简经典队列调节为lazy队列的,如果想要仅仅只是个别创建队列能够是lazy队列Java代码也可以是可以实现的(以下就是Lazy队列的使用代码创建的方法,同时可以直接接收消息,效果是一样的)
- @RabbitListener(queuesToDeclare = @Queue(
- name = "lazy.queue", //设置队列名称
- durable = "true", //设置持久化
- arguments = @Argument(name = "x-queue.mode",value = "lazy") //设置队列模式
- ))
- public void LazyQueue(String msg){
- log.info("接收到 lazy.queue对的消息 :"+msg);
- }
RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开启确认机制会消耗一定资源也就会降低消息传输的速度,返回确认消息给生产者,返回的结果有这么几种
消息投递到了MQ,但是路由失败,此时会通过PublisherReturn返回路由异常原因,然后返回Ack
临时消息,入队成功就可以返回Ack
持久消息,完成持久化就返回Ack
其他情况也就是失败情况返回NACK,投递失败
上面的不要记住那么多,简单分为成功就是Ack不成功就是Nack
直接在yml配置文件中配置的即可:有以下三种模式
none:关闭confirm机制(默认机制)
simple:同步阻塞等待MQ的回执消息
correlated:MQ异步回调方式返回回执消息
下面直接上步骤:

每个RabbitTemplate只能配置一个ReturnCallback,在项目创建之前就需要配置好
这个配置是给提供者的,是提供者消费发送的成功与失败进行返回消息
提供者:这里只是配置一个消息回调
- @Slf4j
- @Configuration
- public class MqConfirmConfig implements ApplicationContextAware {
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- //配置回调 这里配置confirm是无效的,idea会给你提示confirm的方法,但是效果不是,这里需要的是回调
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- log.debug("收到消息的return callback",
- returnedMessage.getExchange(),returnedMessage.getRoutingKey(),
- returnedMessage.getMessage(),returnedMessage.getReplyCode(),
- returnedMessage.getReplyText());
- }
- });
- }
- }
提供者:下面开始发送消息进行确认

下面我们根据需要的参数进行写这个id代码;
- @Test
- void testConfirmCallback() throws InterruptedException {
- //创建一个CorrelationData 对象, 我们只取id 即可 调用一个带参的构造函数
- CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
- /*
- * 添加 CorrelationData 对象中提供了一个异步处理方法也就是多线程可以看见的方法,getFuture()
- * 这里给添加一个回调方法 参数是一个对象,里面内部方法需要重写里面的 失败和成功的方法 ,
- * 但是这里失败是spring操作的失败不是Rabbitmq的失败所以不会返回Nack
- * */
- cd.getFuture().addCallback(new ListenableFutureCallback
() { - @Override
- public void onFailure(Throwable ex) {
- log.debug("消息回调失败",ex);
- }
-
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- log.debug("收到confirm callback回执");
- /*
- * 这里才是排定rabbitmq的消息是否成功 判断一下结果是否是有ack
- * 如果是的话 返回一个我们自己打印的消息
- * */
- if(result.isAck()){
- log.debug("消息发送成功 收到ack");
- }else{
- log.error("消息发送失败 收到nack"+result.getReason());
- }
- }
- });
- // CorrelationData 对象创建好了 ,其实这里也可以说是id创建好了,添加进发送的消息中
- rabbitTemplate.convertAndSend("maxxub.direct","red2","hello",cd);
- Thread.sleep(2000);
- }
注:这样的情况一般使用于检查问题,因为需要一定的开销,会大大降低rabbitmq处理消息的速度
针对这部分参数:(前面已经给友友们提供了)
- listener:
- simple:
- prefetch: 1
- acknowledge-mode: auto #模式:自动 进行一定的重试次数
- retry:
- enabled: true # 超时重试
- initial-interval: 1000ms
- multiplier: 2 #连接失败的重试
- max-attempts: 3 #最大重试次数的
- stateless: true #针对事务,默认为false ,true针对上下文有定义保存,是一个状态
前面都是直接给友友们提供了配置参数,这里稍微具体说一下 MessageRecoverer就是消息恢复

继承该接口类有三个 :
(1)RejectAndDontRequeueRecoverer :重试耗尽后消息会被直接扔掉
(2)ImmediateRequeueMessageRecoverer:重试耗尽后,会重新进入队列(消耗太大了)
(3)RepublishMessageRecoverer:重试耗尽后,会去进入指定队列先存着
最优的当然是第三种了,对消息做到了保护作用,同时也不会无限进入队列4
下面就来实现一下
消费者:

- @Slf4j
- @Configuration
- //@ConditionalOnProperty(prefix = "spring",name = "rabbitmq.listener.simple.retry.enabled",havingValue = "true")
- public class ErrorConfiguration {
- /*创建一个交换机 队列 绑定一下并设置路由*/
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("error.direct");
- }
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue");
- }
- @Bean
- public Binding errorBinding(DirectExchange directExchange,Queue errorQueue){
- return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
- }
- //这里 使用 MessageRecoverer 进行接收
- @Bean
- public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
- log.debug("MessageRecoverer消息");
- //调用 RepublishMessageRecoverer 返回机制 前面说了只能绑定一个rabbitTemplate ,第一个参数填写即可
- //第二个参数 交换机 刚刚创建的指定交换机 和 指定交换机的路由
- return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
- }
- }
注:@ConditionalOnProperty注解指定配置参数没有弄好,但是这里是可以使用的,如果有佬们能够很好的使用,请评论下怎么弄,很感谢