MQ的两种消息模型
MQ的类型
注意:发消息是发给交换机,收消息是监听队列。
一个连接有很多个信道,每个信道用于收发一个队列。
虚拟主机可以用来环境隔离,如生产环境和开发环境隔离。
#启动容器(可以自动下载安装)
docker run -p 1883:1883 -p 4369:4369 -p 5671:5671 \
-p 5672:5672 -p 8883:8883 \
-p 15672:15672 -p 25672:25672 \
-d --name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:management
使用15672端口号访问MQ管理网页界面。
headers交换器和direct交换器完全一致但性能更差,所以几乎不用。常用交换机类型有:
(路由键通配符:#匹配0个或多个单词,*匹配一个单词)
(交换机绑定队列时需要设置一个路由键且如果是topic类型支持设置带有通配符的路由键。客户端发送消息时也需要携带一个消息的路由键给交换机)
引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
SpringBoot配置文件中配置
spring.rabbitmq.host=192.168.239.135
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
启动类上添加注解@EnableRabbit
发送消息
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(){
//发送一个对象消息,默认会把对象序列化后发送,对象类必须实现Serializable接口
Person person = new Person();
person.setName("张三");
person.setAge(20);
rabbitTemplate.convertAndSend("exchange.direct","ycy.news",person);
}
}
默认会把对象序列化后发送,但可配置为Json转换器:
@Configuration
public class RabbitMQConfig {
/**
* 注入一个把对象转成Json的消息转换器,无则使用默认序列化转换器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
接收消息
@RabbitListener(queues = {"队列名"})
监听哪些队列
@RabbitHandler
重载区分不同的消息类型
@RabbitListener(queues = {"ycy.news"})
@Service
public class ConponServiceImpl implements ConponService {
//监听方法参数必须和发送时为同一类型,不同微服务之间收发消息注意类型统一,可以封装一个专门用来传输消息的DTO实体类放在commmon模块下
@RabbitHandler
public void receiveMessage(Object object){
System.out.println("receiveMessage1");
System.out.println("object:"+object.getClass());
}
@RabbitHandler
public void receiveMessage(Message message, Person person, Channel channel){
System.out.println("receiveMessage2");
System.out.println("message:"+message);
System.out.println("person:"+person);
System.out.println("channel:"+channel);
}
@RabbitHandler
public void receiveMessage(Animal animal){
System.out.println("receiveMessage3");
System.out.println("animal"+animal);
}
}
注:微服务如果做了集群,这时各个微服务节点会争抢消息,但一个消息只会被一个微服务节点获取到。
保证消息不丢失,可靠抵达,可以使用事务,但性能下降250倍,为此引入了确认机制。
配置文件中添加:
spring.rabbitmq.publisher-confirms=true
RabbitMQ配置类中编写回调方法:
@Configuration
public class RabbitMQConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 注入一个把对象转成Json的消息转换器,无则使用默认序列化转换器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制rabbitTemplate
* 注解@PostConstruct :RabbitMQConfig在创建完成后调用此方法
*/
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* p->b
* 发送到broke时回调,确认消息是否到达
* @param correlationData 消息唯一关联数据
* @param ack 是否成功还是失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData==" + correlationData);
System.out.println("ack==" + ack);
System.out.println("cause==" + cause);
}
});
}
}
发送消息的时候指定每个消息的ID:
(后续可以记录到数据库中,定期扫描重发发送失败的消息)
rabbitTemplate.convertAndSend("exchange.direct","ycy.news",person,new CorrelationData("12345678"));
配置文件中添加:
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
RabbitMQ配置类中编写回调方法:
@Configuration
public class RabbitMQConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 注入一个把对象转成Json的消息转换器,无则使用默认序列化转换器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制rabbitTemplate
* 注解@PostConstruct :RabbitMQConfig在创建完成后调用此方法
*/
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* p->b
* 发送到broke时回调,确认消息是否到达
* @param correlationData 消息唯一关联数据
* @param ack 是否成功还是失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData==" + correlationData);
System.out.println("ack==" + ack);
System.out.println("cause==" + cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* e->q
* 只有消息没有到达队列才回调
* @param message 失败消息
* @param replyCode 失败码
* @param replyText 回复内容
* @param exchange 发送的交换机
* @param routingKey 交换机key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("fail message==" + message);
System.out.println("replyCode" + replyCode);
System.out.println("replyText" + replyText);
System.out.println("exchange==" + exchange);
System.out.println("routingKey==" + routingKey);
}
});
}
}
此阶段常见的错误有:发送的消息找不到匹配的路由键
默认监听队列会自动ack签收,但是如果无法确定此消息是否被处理完成, 或者成功处理,我们可以切换成手动ack签收模式。
确认签收成功消息才会从MQ队列中删除。
配置文件中添加:
#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在监听消息处理方法中手动签收:
@RabbitHandler
public void receiveMessage(Message message, Object object, Channel channel){
System.out.println("receiveMessage2");
System.out.println("message:"+message);
System.out.println("object:"+object);
System.out.println("channel:"+channel);
//手动签收,非批量模式
try {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
//网络中断
e.printStackTrace();
}
}
TTL(Time To Live)
消息的TTL就是消息的存活时间。
• RabbitMQ可以对队列和消息分别设置TTL。
• 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
• 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。
Dead Letter Exchanges(DLX)
死信路由。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。
• 被消费者拒收了
• TTL过期了
• 队列的长度限制满了
延时队列实现方案:
过期的消息会自动进入配置的死信路由,再由路由转到发真正的业务队列 即可实现延时队列的效果。
①队列过期的方式:
②消息过期的方式:
推荐使用队列过期的方式:
消息过期方式可能会有问题,如先进来一个5分钟过期的消息,又进来一个3秒过期的消息,那么第二个消息实际被仍出去是在5分零3秒的时候。而队列过期方式由于所有消息过期时间都相同就不会出现上述问题。
延时队列代码实现:
使用第一种方式的变种
引入RabbitMQ依赖
参照整合SpringBoot章节。
创建队列,交换机,绑定关系
@Configuration
public class MyMQConfig {
/**
* 创建过期队列
* @return
*/
@Bean
public Queue orderDelayQueue(){
//配置过期时间和死信交换机和死信路由键
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","order-event-exchange");
arguments.put("x-dead-letter-routing-key","order.release.order");
arguments.put("x-message-ttl",60000);
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
/**
* 创建普通队列
* @return
*/
@Bean
public Queue orderReleaseOrderQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
/**
* 创建交换机
* @return
*/
@Bean
public Exchange orderEventExchange(){
return new TopicExchange("order-event-exchange",true,false);
}
/**
* 交换机绑定过期队列
* @return
*/
@Bean
public Binding orderCreateOrderBinding(){
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 交换机绑定普通队列
* @return
*/
@Bean
public Binding orderReleaseOrderBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
}
注意:只有先编写监听队列方法后才会触发创建队列交换机等操作
收发消息,测试延时队列
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(){
//发送一个对象消息,默认会把对象序列化后发送,对象类必须实现Serializable接口
Person person = new Person();
person.setName("张三");
person.setAge(20);
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",person,new CorrelationData("12345678"));
}
@RabbitListener(queues = "order.release.order.queue")
public void receiveMessage(Person person){
System.out.println("延时触发:"+person);
}
}
RabbitMQ 中的节点类型:
集群中至少要有一个Disk节点。
RabbitMQ 集群的两种模式:
普通集群(默认)
该模式还是会造成单点故障,无法保证高可用。消息只会存在集群中的一个节点,且不会同步到其它队列。对于消费者,如果消息进入A节点,当从B节点拉取时,MQ会将消息从A中取出,并经过B发送给消费者。
该模式适用于消息无需持久化的场景,如日志队列。
镜像集群
集群中的节点会自动同步消息数据,有一套选举算法,1个master,n个slaver,生产者消费者的请求都会转至master。
缺点:若镜像节点过多且消息体量大,集群内部同步数据会消耗大量网络带宽
镜像集群也是基于普通集群,即只有先搭建普通集群然后才能设置镜像集群。
若消费过程中,master挂了,则选举新master,若没来得及确认,可能会重复消费