消息队列是一种应用程序之间通过异步通信进行数据交换的通信模式
消息队列的类型:
总结:ZeroMQ小而美,RabbitMQ大而稳,Kakfa和RocketMQ快而强劲
微服务通信、任务队列、实时通信等各种场景
一、安装
3. 常用命令进行设置
4. 查看服务状态
停止服务
/sbin/service rabitmq-server stop
开启web管理
查看防火墙状态,需要关闭防火墙(关闭防火墙,并指定下次开机不需要开启防火墙)
/sbin/service rebbitmq-server start 重新启动,然后通过浏览器进行访问
创建用户以及权限
重新使用admin的方式进行登录
通过用户界面添加新用户
docker方式安装
二、使用Rabbit
/**
消息生产者
*/
public class Producer{
//队列名称
public static final String Queue_NAME = "hello";
//发消息
public static void main(){
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
//创建连接
Connection conneciton = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
生成一个队列
参数:队列名,是否持久化(默认是存储内存),是否提供一个消费者进行消费(true多个消费者共享)
是否自动删除(true是不自动删除),其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello world";//发送消息
//参数:发送到哪个交换机,路由key是哪个,其他参数,发送消息的消息体
channel.basicPublish("",QUEUE_NAME,null,message.getBytes())
System.out.println("消息发送完毕");
}
}
/**
消息消费者
*/
public class Consumer{
//队列名称(必须和生产者的队列名称一致)
public static final String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
//创建连接
Connection conneciton = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
消费者消费消息
参数:消费哪个队列,消费成功后是否应答(true标识自动应答,false手动应答)
消费者未成功消费的回调
*/
DeliverCallback deliverCallback = (consumerTag,messaage) -> {
System.out.println(new String(message.getBody()));
}
//取消消息时回调
CancelCallack cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
}
channel.basicConsume(QUEUE_NAME,trye,deliverCallback,cancelCallback);
}
}
三、工作模式的场景
work queues:一个消息只能被处理一次,不可以处理多次,消费者(工作线程)采用轮询的方式依次接收消息。
默认是轮询机制,如果两个消费者的处理时间差距大,必须采用处理消息快的能够多次消费到消息
//抽取工具类,将连接工厂、创建信道抽取为一个工具
public class RabbitMqUtils{
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
//消费者,工作线程
publci class Worker01{
//队列名称
public static final String QUEUE_NAME = "hello";
public static void main(String[] args){
Channel channel = RabbitMqUtils.getChannel();
//消息接收
DeliverCallback deliverCallback = (sonsumerTag,message) -> {
System.out.println("接收到的消息"+new String(message.getBody()));
};
//消息被取消执行
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
/**
参数:
消费哪个队列,
消费成功之后是否要自动应答true,手动应答false
消费者未成功消费的回调
消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
采用idea工具,模拟多线程消费
//生产者,发送大量消息
public class Task01{
//队列名称
public static final String QUEUE_NAME = "hello";
//发送大量消息
public static void main(String[] args){
Channel channel = RabbitMqUtils.getChannel();
/**
参数:
队列名称
队列里的消息是否持久化,默认是内存中
队列消息是否只供一个消费者进行消费,true多个消费者,false一个消费者
是否自动删除,最后一个消费者断开连接true自动删除,false不自动删除
其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台接收信息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
不公平分发,消费者 接收消息设置参数channelasicQos(1),每个消费者都需要设置
预取值,提前指定消费者
设置方式和不公平分发一样
四、集群
镜像模式搭建RabbitMQ集群
五、RabbitMQ 如何保证全链路数据100%不丢失 ?
消息从生产端到消费端消费要经过3个步骤:
生产端可靠性投递:
丢失原因,消息在网络传输的过程中发生网络故障消息丢失;消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失
针对以上情况,RabbitMQ提供的机制:
①、事务消息机制:由于会严重降低性能,一般不采用,而采用另一种轻量级的解决方案:confirm消息确认机制
②、confirm消息确认机制:生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。
通过以下代码来开启确认模式
channel.confirmSelect();//开启发送方确认模式
然后异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener(){
//消息正确到达broker
@Override
public void handleAck(long deliveryTag,boolean multiple) throws IOException{
System.out.println("已收到消息");
//做一些其他处理
}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handlerNack(long deliveryTag,boolean multiple) throws IOException{
System.out.println("未确认消息,标识:" + deliveryTag);
//做一些其他处理,比如消息重发等
}
});
消息持久化:
RabbitMQ收到消息后将这个消息暂时存在了内存中,那这就会有个问题,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。那如何持久化呢?
message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。
需要给exchange、queue和message都进行持久化:
①、exchange持久化
//第三个参数true标识这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
②、queue持久化
//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
队列持久化:声明队列的时候把durable参数设置为持久化,当rabbitmq重启后队列不会被删除掉
如果之前声明的队列不是持久化,需要把原先队列删除,或者重新创建一个队列,否职责会报错
③、message持久化
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHENGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
消息持久化:消息生产者MessageProperties.PERSISTENT_TEXT_PLAIN
这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。
RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,就不好做接下来的处理。
所以除了RabbitMQ提供的一些机制外,我们自己也要做一些消息补偿机制,以应对一些极端情况。其中的一种解决方案——消息入库。
将要发送的消息保存到数据库中。
首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认。在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。
这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以给个时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。
这样消息就可以可靠性投递到RabbitMQ中了,而生产端也可以感知到了。
消费端可靠性接收:
一、在RabbitMQ将消息发出后,消费掉还没接收之前,发送网络故障,消费端与RabbitMQ断开连接,此时消息会丢失。
二、在RabbitMQ将消息发出后,消费端还没接收到消息,消费端挂了,消息会丢失
三、消费端接收到消息,但在处理消息的过程中发生异常,宕机。消息也会丢失。
上述3中情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。
所以就需要将自动ack机制改为手动ack机制
当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:
应答机制:
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
try{
//接收到消息,做处理
//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception e){
//出错处理,这可以让消息重回队列重新发送或直接丢弃消息
}
};
//第二个参数autoAck设为false表示自动关闭确认机制,需要手动确认
channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag -> {
});
当消息被消费者完全处理后,队列才能删除消息。否则会一旦消费者出现故障,消息未被完全处理就被队列删除,造成消息丢失。
消息自动重新入队:
当接收消息的C1突然宕机,队列会重新安排消息进入队列,让其他消费者进行处理
//消息生产者
public class Task02{
//队列名称
public static final String task_queue_name = "ack_queue";
publix static void main(String[] args){
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message)
}
}
}
//消息消费者一,消息在手动应答时不丢失,放回队列中重新消费
public class Work01{
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args){
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间较短");
DeliverCallback deliverCallback = (consumerTag,message) -> {
//模拟处理业务
Thread.sleep(1000);
System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));
//手动应答(消息的标记,不批量处理应答)
channel.basicAck(message.getEnvelope(),getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsumer(TASK_QUEUE_NAME,autoAck,(consumerTag -> {
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
}));
}
}
//消息消费者二,消息在手动应答时不丢失,放回队列中重新消费
public class Work02{
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args){
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间较短");
DeliverCallback deliverCallback = (consumerTag,message) -> {
//模拟处理业务较长
Thread.sleep(1000000);
System.out.println("接收到的消息:"+ new String(message.getBody(),"UTF-8"));
//手动应答(消息的标记,不批量处理应答)
channel.basicAck(message.getEnvelope(),getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsumer(TASK_QUEUE_NAME,autoAck,(consumerTag -> {
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
}));
}
}
消息生产者发送消息后,C2处理时间较长,还未处理完就宕机,此时会看到C1接收到了
说明消息队列被重新入队了。
如果RabbitMQ一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(RabbitMQ会自己感知到),则RabbitMQ会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,当然也有能还是原来的那个消费端,当然消费端也需要确保幂等性。
Spring AMQP+RabbitMQ死信交换机实现延迟发送邮件
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
【如何这个包含死信的队列配置了dead-letter-exchange属性,制定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称之为死信交换机(Dead Letter Exchange)】
①、如图,一个消息被消费者拒绝了,变成了死信:
因为zbbmeta.direct.queue1绑定了死信交换机dl.direct,因此死信会投递给dl.direct交换机:
如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
队列将死信投递给死信交换机时,必须知道两个信息:
②、一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
死信交换机结合TTL实现延迟发送邮件
@Configuration
public class DeadLetterConfig{
//死信交换机deadL
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("deadLetter.direct");
}
//声明死信队列
@Bean
public Queue dlQueue(){
return new Queue("deadLetter.queue");
}
//死信队列和交换机绑定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlExchange())
.to(dlQueue())
.with("dlmail");
}
//声明接收注册信息的队列
@Bean
public Queue registerQueue(){
return QueueBuilder.durable("register.queue")
.ttl(10*1000)//表示消息所在的队列超过10s,就算过期 可以自行练习模拟24小时
.deadLetterExchange("deadLetter.direct")//指定死信交换机
.deadLetterRoutingKey("dlmail")//指定死信交换机和队列之间的routingKey路由
.build();
}
//声明注册信息的交换机
@Bean
public DirectExchange registerDirectExchange(){
return new DirectExchange("register.direct");
}
//交换机和队列绑定
@Bean
public Binding registerBinding(){
return BindingBuilder.bind(registerQueue()).to(registerDirectExchange()).with("register");
}
}
消费端的SpringRabbitListener类中添加监听接收死信队列消息
@RabbitListener(bindings=@QueueBinding(
value=@Queue(name="deadLetter.queue",durable="true"),
exchange=@Exchange(name="deadLetter.direct"),
key="dlmail"
))
public void listenDlQueue(){
System.out.println("接收到 24小时候的确认邮件-----:"+msg+" ====================="+ LocalTime.now());
}
生产端创建PublisherController创建接口,模拟用户注册成功后的动作
@GetMapping("/register/{msg}")
public String registerUser(@PathVariable("msg") String msg){
rabbitTemplate.convertAndSend("register.direct","register",ms);
return "注册成功 时间为"+ LocalTime.now();
}
发送消息时,设定TTL
@GetMapping("/register/{msg}")
public String registerUser(@PathVariable("msg") String msg){
//创建消息
Message message = MessageBuilder.withBody("hello,ttl message".getBytes(StandardCharsets.UTF-8))
.setExpiration("5000")
.build();
//消息ID需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//发送消息
rabbitTemplate.convertAndSend("register.direct", "register", message, correlationData);
return "注册成功 时间为"+ LocalTime.now();
}
测试:
启动消息生产服务和消息消费服务
打开浏览器:http://localhost:8888/publisher/register/注册用户2,模拟注册成功,向后台发送信息
结果如下:在RabbitMQ的客户端发现register.queue中存在一条消息
等待是10s后,发现监听死信队列,消费了消息
因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。
总结:
消息超时的两种方式是?
如何实现发送一个消息20秒后消费者才收到消息?
延迟发送消息的使用场景包括:
======================================================================
RocketMQ主要由NameServer、Broker、Producer以及Consumer四部分构成。
流程:
消息领域模型:分为Message、Topic、Queue、Offset以及Group这几部分。
RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,Offset来访问,Offset为Java long类型64位。
Message Queue是一个长度无限的数组,Offset就是下标。
RocketMQ的关键特性:
①:消息的顺序,值得是消息消费时,能按照发送的顺序来消费。
例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。
RocketMQ是通过将“相同的ID的消息发送到同一个队列,而一个队列的消息只由一个消费者来处理”来实现顺序消息。
②:消息的重复,消息领域有一个对消息投递的Qos(服务质量)定义,分为:
③:消息去重
原则:使用业务端逻辑保持幂等性
就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的。
去重策略:保证每条消息都有唯一编号(唯一流水号),且保证消息处理成功与去重的日志同时出现。
建立一个消息表,拿到这个消息作数据库的insert操作,给这个消息做一个唯一主键或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
基于RocketMQ实现分布式事务
用户发起支付后,订单系统需要调用库存服务执行库存扣减逻辑。
跨服务调用,因此会产生分布式事务。使用RocketMQ的事务消息来实现分布式事务。
①、订单服务的应用服务层处理支付逻辑,并调用RocketMQ发送事务消息:
@Override
public String payment(String oderSn){
//集成支付宝
//支付流水号
String outOrderNo = IdUtils.get32UUID();
TradeOder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn))
.orElseThrow(() -> new BusinessException("订单编号不存在"));
// 如果订单处于待支付状态
if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {
OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);
TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");
if (SendStatus.SEND_OK == sendResult.getSendStatus()
&& sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
return tradeOrder.getOrderSn();
} else {
throw new BusinessException("支付失败...");
}
} else {
throw new BusinessException("订单已支付,请勿重复提交...");
}
}
②、在订单服务的基础设施层,创建一个类实现 RocketMQLocalTransactionListener 接口:
该接口有两个方法:
@Component
@Slf4j
public class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {
@Resource
private TransactionTemplate transactionTemplate;
@Resource
private TradeOrderService tradeOrderService;
/**
* 执行本地事务
* 将订单状态修改成已支付
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);
try {
// 放到同一个本地事务中
this.transactionTemplate.executeWithoutResult(status -> {
String orderSn = orderPaidEvent.getOrderSn();
// 修改成待发货
tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);
});
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("修改订单状态失败", e);
// ROLLBACK 则回滚消息,rocketmq将废弃这条消息
return RocketMQLocalTransactionState.ROLLBACK;
// 如果是UNKNOWN, 则触发回查
}
}
/**
* 检查本地事务执行状态
* 消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);
String orderSn = orderPaidEvent.getOrderSn();
TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);
// 如果已经修改成待发货说明本地事务执行成功,此时消费端可以直接消费
if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {
return RocketMQLocalTransactionState.COMMIT;
} else {
// 这里查不到的时候返回 UNKNOWN在于,有可能事务还没有提交,回查就开始了
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
③、在库存服务的基础设施层,监听消息以执行库存扣减逻辑:
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")
public class InventoryDeductionConsumer extends EnhanceMessageHandler<OrderPaidEvent> implements RocketMQListener<OrderPaidEvent> {
@Resource
private InventoryDomainService inventoryDomainService;
@Override
public void onMessage(OrderPaidEvent orderPaidEvent) {
super.dispatchMessage(orderPaidEvent);
}
@Override
protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {
// 执行库存扣减逻辑
String orderSn = orderPaidEvent.getOrderSn();
inventoryDomainService.deductionInventory(orderSn);
}
}
通过以上步骤,我们完成了RocketMQ事务消息的发送,利用事务消息的特性保证分布式事务的最终一致性。与普通消息相比,事务消息在处理时需要实现 RocketMQLocalTransactionListener 接口,这是事务消息的核心。
介绍完事务消息的使用,接下来我们再来聊聊事务消息的原理。
很容易想到的一个问题就是消息丢失。当保存订单后由于网络问题导致消息丢失,如下图所示:
在不使用RocketMQ的情况下,我们往往会通过 本地消息表 + 补偿重试 的机制来保证消息一定会发送出去。
那RocketMQ是如何解决这个问题的呢?
发送half消息,探测MQ是否正常
在基于RocketMQ的事务消息中,我们不是先执行自身的订单支付逻辑,而是先让订单系统发送一条 half消息 到MQ去。这个half消息本质上是一个订单支付成功的消息,只不过此时库存系统是看不见这个half消息的。然后,我们等待接收这个half消息写入成功的响应通知。
发送half消息的本质其实是为了探测MQ是否仍然正常运行。但问题来了,如上所述,消息会发生丢失,那么half消息丢失怎么办呢?
half消息发送失败
在发送half消息时,由于网络原因或者MQ直接挂了,就会导致half消息发送失败。这个时候订单系统需要执行一系列的回滚操作。在我们的场景中,应该执行退款操作,将钱退还给用户,并告知用户交易失败。
half消息成功,订单系统执行自己的业务逻辑
如果成功收到half消息的正常响应,此时订单系统应该执行自己的业务逻辑。在我们这个场景中,就是修改订单数据库状态,将其修改为待发货状态。这部分逻辑就对应上述代码中的executeLocalTransaction()方法。
订单本地事务执行失败
如果订单系统执行本地事务失败,则需要发送一个rollback请求给MQ,让其删除这条half消息。
订单本地事务执行成功
如果订单系统的本地事务执行正常,此时需要发送一个commit请求给MQ,要求MQ对之前的half消息进行commit操作,这样库存系统就可以消费这条消息了。
订单创建消息处于half状态时,库存系统是看不见它的。必须等到订单系统执行commit请求,消息被commit后,库存系统才能看到并获取这条消息进行后续处理。
然而,还有一个问题:如果订单系统发送half消息成功后却没有收到half消息的响应,该如何处理呢?
在这种情况下,订单系统可能会误以为是发送half消息到MQ失败了。订单系统就会执行回滚流程,退还支付金额,关闭订单。
然而,此时MQ系统中已经存在了一条half消息。这条half消息又该如何处理呢?
在RocketMQ中,有一套补偿流程。RocketMQ会定期扫描处于half状态的消息。如果一直没有对这个消息执行 commit/rollback 操作,超过了一定的时间,RocketMQ就会回调你的订单系统的一个接口,用以确认你本地事务的情况。
当订单系统收到MQ的回查请求时,就需要检索一下数据库,根据订单状态决定执行commit还是rollback。
这部分逻辑就对应上述代码中checkLocalTransaction()方法。
7. rollback 或者 commit 失败怎么办?
通过上述说明,可以看到,RocketMQ是根据rollback或commit操作来决定half消息的状态的。如果业务系统执行了commit操作,则将half消息设置为可见,库存系统可以消费;如果业务系统执行了rollback操作,MQ就会删除half消息。那么问题来了:如果订单系统在执行rollback或commit操作时失败又该如何处理呢?
这时候仍然依赖于前文提到的回查机制。
由于此时MQ中的消息一直处于half状态,超过一定的超时时间后,MQ会发现这个half消息有问题,然后回调你的订单系统的接口。此时订单系统需要根据订单状态来决定执行commit请求还是rollback请求。
以上,就是RocketMQ事务消息的原理。结合文章开头的代码,是不是已经很清晰了呢?
RocketMQ如何支持多事务消息?
订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。
这两个业务逻辑都需要通过事务消息来保证分布式事务。
在订单模块中创建两个事务消息监听器,分别用于处理库存扣减和积分赠送的事务处理和事务回查。
/**
处理订单支付的事务逻辑
*/
@Component
@Slf4j
public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener{
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o){
//处理订单支付逻辑
}
@Override
public RocketMQLocalTransacionState checkLocalTransaction(Message message){
//检查订单处理逻辑
}
}
/**
处理订单收货的事务监听器
*/
@Component
@Slf4j
public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener{
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
}
}
完成业务逻辑编写并启动服务时,可能会遇到如下错误:rocketMQTemplate already exists RocketMQLocalTransactionListener
在rocketmq-spring-boot-starter版本低于2.1.0的项目中,可以使用多个 @RocketMQTransactionListener 监听不同的 txProducerGroup 来发送不同类型的事务消息到topic。
从 RocketMQ-Spring 2.1.0 版本开始,注解 @RocketMQTransactionListener 不能设置 txProducerGroup、ak、sk,这些值均需与对应的 RocketMQTemplate 保持一致。
通过阅读源码 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已经存在了 RocketMQTransactionListener 则会出现上述错误。
解决方案:
为了在保证系统只有一个 RocketMQTransactionListener 的前提下实现多事务消息,可以将 RocketMQLocalTransactionListener 不处理具体业务逻辑,而是将其作为一个分发器使用。
①、定义事务消息处理接口
首先,定义公共的事务消息处理接口,所有事务消息都实现此接口而非 RocketMQ 默认的 RocketMQLocalTransactionListener
public interface TransactionMessageHandler{
//执行本地事务
RocketMQLocalTransactionState executeLocalTransaction(Object payload,Object arg);
//检查本地执行状态
RocketMQLocalTransactionState checkLocalTransaction(Object payload);
}
②、修改事务消息发送工具类,指定消息处理器
public <T extends RemoteDomainEvent> TransactionSendResult sendTransaction(String topic, String tag, T message,
Class<? extends TransactionMessageHandler> transactionMessageListener){
if(transactionMessageListener == null){
throw new IllegalArgumentException("transactionMessageListener must not null");
}
String destination = buildDestination(topic,tag);
Message<T> sendMessage = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.KEYS, message.getKey())
.setHeader(SOURCE_HEADER, message.getSource())
.setHeader(TRANSACTION_MESSAGE_HEADER, transactionMessageListener.getSimpleName())
.build();
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, sendMessage, null);
log.info("[{}]事务消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message),JSONObject.toJSON(sendResult));
}
③、修改RocketMQ事务监听器
@Slf4j
@RocketQTransactionListener
public class DefalutlRocketMQTransactionListener implements RocketMQLocalTransactionaListener{
private final Map<String, TransactionMessageHandler> transactionMessageHandlerMap;
public DefaultRocketMQTransactionListener(Map<String, TransactionMessageHandler> transactionMessageHandlerMap) {
this.transactionMessageHandlerMap = transactionMessageHandlerMap;
}
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("消费者收到事务消息[{}]", JSONObject.toJSON(message));
String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
if (null == listenerName) {
throw new RuntimeException("not params transactionMessageListener");
}
RocketMQLocalTransactionState state;
Object payload = message.getPayload();
try {
TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
if (null == messageHandler) {
throw new RuntimeException("not match condition TransactionMessageHandler");
}
state = messageHandler.executeLocalTransaction(payload, arg);
} catch (Exception e) {
log.error("rocket transaction message executeLocal error:{}", e.getMessage());
return RocketMQLocalTransactionState.ROLLBACK;
}
return state;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("消费者收到事务回查消息[{}]", JsonUtils.obj2String(message.getHeaders()));
String listenerName = (String) message.getHeaders().get(MessageHeaderConstant.TRANSACTION_MESSAGE_HEADER);
if (null == listenerName) {
throw new RuntimeException("not params transactionMessageListener");
}
RocketMQLocalTransactionState state;
try {
TransactionMessageHandler messageHandler = transactionMessageHandlerMap.get(listenerName);
if (null == messageHandler) {
throw new RuntimeException("not match condition TransactionMessageHandler");
}
state = messageHandler.checkLocalTransaction(message.getPayload());
} catch (Exception e) {
log.error("rocket transaction message executeLocal error:{}", e.getMessage());
return RocketMQLocalTransactionState.ROLLBACK;
}
return state;
}
}
在上述代码中,根据消息头中的TRANSACTION_MESSAGE_HEADER参数选择对应的事务处理器来处理事务消息。
在 DailyMart 中有一个公共组件 dailymart-rocketmq-spring-boot-starter 专门用于 RocketMQ 消息发送监听的封装,因此我们也将事务消息的处理逻辑封装到了此组件中。
④、修改事务消息处理逻辑
所有的事务消息处理逻辑都实现 TransactionMessageHandler 接口,以订单支付的处理逻辑为例:
@Component
@Slf4j
public class OrderPaidTransactionConsumer implements TransactionMessageHandler {
@Resource
private TransactionTemplate transactionTemplate;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg) {
final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Object payload) {
final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) payload, OrderPaidEvent.class);
}
}
⑤、修改事务消息发送逻辑,指定事务处理器
TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID", orderPaidEvent, OrderPaidTransactionConsumer.class);
体系结构:若干个Producer,若干个Broker,若干个Consumer,一个Zookeeper集群。
Zookeeper:用来负责集群元数据的管理、控制器的选举。
Producer:将消息发送到Broker
Broker:将受到的消息存储到磁盘中
Consumer:负责从Broker订阅并消费消息
JDK+Zookeeper+kafka 的安装与配置
# jdk安装包的下载,并解压
ll jdk-8u181-linux- x64.tar.gz
tar zxvf jdk-8u181-linux- x64.tar.gz
#解压之后当前/opt目录下生成一个名为jdk1.8.0_181的文件夹
cd jdk1.8.0_181/
pwd
/opt/jdk1.8.0_181
配置JDK环境变量,修改/etc/prifile文件并向其添加如下配置
export JAVA_HOME=/opt/jdk1.8.0_181
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/LIB:$JRE_HOME/lib