延迟队列就是用来存放需要在指定时间被处理的元素的队列。
例如:订单在十分钟之内未支付则自动取消。
例如:用户注册成功后,如果三天内没有登录则进行短信提醒。
一个订单业务流程如下:
导入rabbitmq的相关配置:
<!--支持rabbitmq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--rabbitmq的测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--支持web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--fastjson依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>
<!--swagger对界面进行测试-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
swagger2的作用:
修改rabbitmq的相关配置文件:
spring.rabbitmq.host=39.103.163.156
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=0818
代码架构图:
springboot项目 和 一般项目 都可以单独声明一些交换机,队列:
配置文件类代码:
package com.itholmes.shopping.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* TTL队列 , 延迟队列
* 配置文件类代码
*/
@Configuration
public class TtlQueueConfig {
//普通交换机的名称
public static final String X_EXCHANGE = "X";
//死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信队列的名称
public static final String DEAD_LETTER_QUEUE_D = "QD";
//声明xExchange 普通直接交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//声明yExchange 死信直接交换机
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明queueA普通队列 , 10秒
@Bean("queueA")
public Queue queueA(){
Map<String ,Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
//设置过期时间ttl,单位是ms毫秒。
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
//声明queueB普通队列 , 40秒
@Bean("queueB")
public Queue queueB(){
Map<String ,Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
//设置过期时间ttl,单位是ms毫秒。
arguments.put("x-message-ttl",40000);
//通过使用构建工具类QueueBuilder来声明队列。
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
//死信队列
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
}
//绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
//构建工具类来创建Binding,将queueA绑定给xExchange ,RoutingKey为XA。
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){
//构建工具类来创建Binding,将queueA绑定给xExchange ,RoutingKey为XA。
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){
//构建工具类来创建Binding,将queueA绑定给xExchange ,RoutingKey为XA。
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
接口生产者(发送消息)代码:
package com.itholmes.shopping.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* 生产者:
* 发送延迟消息
* 通过controller层接口接受浏览器发送过来的信息,进而作为生产者操作。
*/
@Slf4j //slf4j日志
@RestController
@RequestMapping("/ttl1")
public class SendMsgController {
//使用spring公司提供的RabbitTemplate操作
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
//下面的大括号是占位符,对应后面两个占位符。
log.info("当前事件:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
/**
* rabbitTemplate.convertAndSend()的参数:
* 参数exchange:交换机名
* 参数routingKey:对应的routingKey绑定
* 参数Object:对应发送的消息
*/
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
}
}
消费者代码(处理ttl过时,死信队列里面的消息):
package com.itholmes.shopping.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Date;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
* 消费者:
* 对于延迟队列已经过期的消息会发到死信队列中
* 之后,消费者负责处理死信队列里面的消息。
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//接受消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel){
//注意这里导入的要导入rabbitmq的相关包!!
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
}
}
上面代码结构,一个过期时间对应一个队列,这样很冗余!
因此就有了下面的这种优化结构:
QC是不设置TTL过期时间的队列
。配置类添加:
//优化队列-普通队列的名称
public static final String QUEUE_C = "QC";
//优化队列配置: 死信配置
@Bean("queueC")
public Queue queueC(){
Map<String ,Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信routingkey
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
//优化队列配置:绑定配置
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
生产者代码:
//优化后的消息,从生产者发送TTL
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);
/**
* 确定权在生产者手中。
* 通过写MessagePostProcessor的lamba表达式来操作
*/
rabbitTemplate.convertAndSend("X","XC",message,msg->{
//设置发送消息的时候 延迟时长 还是要注意这里的单位是毫秒数。
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
基于死信做延迟队列的巨大缺点:
rabbitmq只会检查第一个消息是否过期
,如果第一个消息延迟时间很长,第二个消息的延迟时间很短,第二个消息并不会优先的得到执行。这个问题没办法解决,因此就有了基于插件的延迟队列。
去官方下载rabbitmq_delayed_message_exchange插件。
将下载的插件,放到rabbitmq目录下的plugins目录下:
在plugins目录下执行使插件生效的命令:
之后,重新启动rabbitmq的服务。
安装成功后,交换机就会多了一种类型:
基于死信的延迟队列:(是在队列设置TTL实现的)
基于插件的延迟队列:(是在交换机实现延迟效果的)
基于插件的延迟队列声明,配置类:
package com.itholmes.shopping.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class DelayedQueueConfig {
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//声明队列
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
//声明交换机,因为这次用的是插件引入的新交换机,所以使用的是自定义交换机
@Bean
public CustomExchange delayedExchange(){
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");//延迟类型,直接
/**
* 1.交换机名称
* 2.交换机的类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.其他的参数map
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
}
//声明绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者代码:
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//基于插件的延迟队列生产者
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendDelayMsg(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME,DELAYED_ROUTING_KEY,message,msg->{
//这里就要设置 延迟时长 单位依然是毫秒。
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
消费者代码:
package com.itholmes.shopping.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 基于插件的延迟消息
*/
@Slf4j
@Component
public class DelayQueueConsumer {
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//监听消息
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("当前事件:{},收到延迟队列的消息:{}",new Date().toString(),msg);
}
}
基于插件的延迟队列,就弥补了基于死信的延迟队里的巨大缺陷问题。因此,优先使用基于插件的延迟队列。
问题:RabbitMQ由于某些原因宕机或者重启,在这个期间生产者发送的消息投递失败,导致消息丢失,需要手动处理和恢复。
为了解决上面问题,就有了发布确认高级机制。
—
为了解决这一问题,rabbitmq出现问题,这种情况,我们必须要有一个回调接口!
因为涉及到回调接口,所以要进行配置:
发布确认高级 配置类:
package com.itholmes.shopping.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 发布确认高级 配置类
*/
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//RoutingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
//声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//声明队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
发布确认高级 生产者:
package com.itholmes.shopping.controller;
import com.itholmes.shopping.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* 发布确认高级:生产者 开始发消息 测试确认
*/
@Slf4j
@RestController
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//对应回调接口的correlationData
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送消息为:{}",message+"key1");
}
}
发布确认高级 消费者:
package com.itholmes.shopping.consumer;
import com.itholmes.shopping.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 发布确认高级:消费者
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接受消费为confirm.queue的消息:{}",msg);
}
}
发布确认高级 回调接口(重点):
package com.itholmes.shopping.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 发布确认高级 回调接口
* 必须实现RabbitTemplate.ConfirmCallback(函数式接口,可以写lamda表达式)
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 此外要将MyCallBack注入到RabbitTemplate.ConfirmCallback,这样才会起作用!!
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 注解:@PostConstruct注解
* 这个注解是其他注解加载完成后,再执行!
* 这里我们要等到rabbitTemplate注入后,在执行。
* 可以使用@PostConstruct注解一个方法来完成初始化
* ,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。
*/
@PostConstruct
public void init(){
//注入
rabbitTemplate.setConfirmCallback(this);
}
/**
* 该方法就是 交换机确认回调方法
*
* 发消息,交换机接收到了的回调:
* correlationData:保存回调消息的ID以及相关信息
* b(ack): 因为交换机收到了消息,返回true
* s(cause): 失败原因,成功为null。
*
* 发消息,交换机接受失败了的回调:
* correlationData:保存回调消息的ID以及相关信息
* b(ack): 交换机未收到消息,返回false
* s(cause): 失败原因
*
*
* correlationData这个参数是生产者参数发出的,生产者不传参,这里就拿不到。
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//三元运算,拿到ID,预防correlationData为null报错。
String id = correlationData != null ? correlationData.getId() : "";
if (ack){
log.info("交换机已经收到了,ID为{}",id);
}else {
log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);
}
}
}
以上就仅仅解决了rabbitmq的交换机宕机或者出问题,丢失消息。接下来解决由于队列(routingkey)出现问题,引起的消息丢失。
交换机发送消息给队列,如果由于某些原因没有发送过去,交换机就应该回退消息给生产者,这样消息就不会丢失了。
配置springboot,开启回退消息:
以下整合发布确认和回退消息全部代码:
配置类:
package com.itholmes.shopping.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 发布确认高级
* 回调接口
* 必须实现RabbitTemplate.ConfirmCallback(函数式接口,可以写lamda表达式)
* 回退接口
* 必须实现RabbitTemplate.ReturnsCallback
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
/**
* 此外要将MyCallBack注入到RabbitTemplate.ConfirmCallback,这样才会起作用!!
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 注解:@PostConstruct注解
* 这个注解是其他注解加载完成后,再执行!
* 这里我们要等到rabbitTemplate注入后,在执行。
* 可以使用@PostConstruct注解一个方法来完成初始化
* ,@PostConstruct注解的方法将会在依赖注入完成后被自动调用。
*/
@PostConstruct
public void init(){
//发布确认注入rabbitTemplate
rabbitTemplate.setConfirmCallback(this);
//回退消息注入rabbitTemplate
rabbitTemplate.setReturnsCallback(this);
//再次强调不注入是不行的!!
}
/**
* 该方法就是 交换机确认回调方法
*
* 发消息,交换机接收到了的回调:
* correlationData:保存回调消息的ID以及相关信息
* b(ack): 因为交换机收到了消息,返回true
* s(cause): 失败原因,成功为null。
*
* 发消息,交换机接受失败了的回调:
* correlationData:保存回调消息的ID以及相关信息
* b(ack): 交换机未收到消息,返回false
* s(cause): 失败原因
*
*
* correlationData这个参数是生产者参数发出的,生产者不传参,这里就拿不到。
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//三元运算,拿到ID,预防correlationData为null报错。
String id = correlationData != null ? correlationData.getId() : "";
if (ack){
log.info("交换机已经收到了,ID为{}",id);
}else {
log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);
}
}
/**
* 可以在消息传递过程中不可达目的地时将消息返回给生产者 ,达到回退效果。
* returnedMessage对象对应属性:
* message:对应消息
* replyText:退回原因
* exchange:那个交换机
* routingKey:那个路由
* 也别忘记注入到rabbitTemplate里面!!
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息{},被交换机{}退出,退回原因:{},路由Key:{}"
,new String(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),
returnedMessage.getReplyText(),
returnedMessage.getRoutingKey());
}
}
生产者:
package com.itholmes.shopping.controller;
import com.itholmes.shopping.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* 发布确认高级:生产者 开始发消息 测试确认
*/
@Slf4j
@RestController
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//对应回调接口的correlationData
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("发送消息为:{}",message+"key1");
//模拟队列错误,专门绑定一个不存在的routingkey
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData2);
log.info("发送消息为:{}",message+"key2");
}
}
消费者和环境配置类都和发布确认高级一样。
这样就解决了rabbitmq(交换机或者队列)出现问题,防止消息丢失了。