更详细请看:Rocketmq并发消费失败重试机制
消费者在消费完成之后, 需要处理消费的结果, 是成功或失败
ConsumeMessageConcurrentlyService#processConsumeResult
/**
* 石臻臻的杂货铺
* vx: shiyanzu001
**/
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
){
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
// 这个意思是,就算你返回了消费成功,但是你还是可以通过设置ackIndex 来标记从哪个索引开始时消费失败了的;从而记录到 消费失败TPS的监控指标中;
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// Maybe message is expired and cleaned, just ignore it.
if (!consumeRequest.getProcessQueue().containsMessage(msg)) {
log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "
+ "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),
msg.getQueueId(), msg.getQueueOffset());
continue;
}
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
//..... 部分代码省略....
}
上面省略了部分代码, 上面代码是主要的针对发送失败的消息 发送回Broker的情况;
光看代码理解的意思如下
context.setAckIndex()
来设置ACK的索引值的; 比如你本批次消息量10条, 你这里设置为4; 则表示前面5条成功,后面5条失败; 当然了,这里并不会给失败的做重试;看图,再讲几个重点
需要重试的消息, 会优先被发回重试队列中,发送成功之后它会被当做消费成功, 这样做的目的是为了不要让某个消息消费失败就阻碍了整个消费Offset的提交;
比如, 1、2、3、4 四条消息, 第1条消费失败,其他都成功, 那么就因为最小的Offset 1 失败了导致后面的都不能标标记为成功去提交。
所以让1也设置为成功,就不会成为阻塞点,当然要把它发送到重试队列中等待重试。
可提交的消费Offset的值永远是TreeMap中的最小值, 这个TreeMap存放的就是pullMessage获取到的所有待消费Msg。消费成功就删除。
比如, 1、2、3、4 四条消息。1、2 消费成功删除了,那么最小的就是3这个偏移量,那么它之前的都可以提交了;如果2、3、4都消费成功并且删除了,但是1还在,那么可提交的偏移量还是当前最小的值1 ;
用户可自己决定从哪条消息开始重试
上面其实已经说了, 用户可以通过入参ConsumeConcurrentlyContext
来设置ackIndex控制重试的起始索引;
/**
* 石臻臻的杂货铺
* vx: shiyanzu001
**/
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf(" ----- %s 消费消息: %s 本批次大小: %s ------ ", Thread.currentThread().getName(), msg, msg.size());
for (int i = 0; i < msg.size(); i++) {
System.out.println("第 " + i + " 条消息, MSG: " + msg.get(i));
try{
// 消费逻辑
}catch(Exception e){
// 这条消息失败, 从这条消息以及其后的消息都需要重试
context.setAckIndex(i-1);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
PS: 目前所看为版本(5.1.3), 笔者始终觉得ackIndex这个设置有点问题;
关于这一点,我更倾向于这是一个Bug; 或者设计缺陷
优化建议:
如果该批次的消息消费失败, 则会尝试重试,
重试会尝试一条一条的把Message发回去
DefaultMQPushConsumerImpl#sendMessageBack
请求头 ConsumerSendMsgBackRequestHeader
属性 | 说明 |
---|---|
group | GroupName |
originTopic | Topic |
offset | 该消息的在Log中的偏移量 |
delayLevel | 延迟重试等级;也是重试策略级别;[-1:不重试,直接放到死信队列中、0:Broker控制重试频率、>0 : 客户端控制重试频率 ] ; 如果大于0的情况,重试的时候会延迟对应的延迟等级(延迟消息); 如果是0的情况, 延迟等级为已经重试的次数+3, 意思是每重试一次延迟增加一个等级; 这里说的延迟等级就是18个级别的延迟消息 |
originMsgId | 消息ID |
maxReconsumeTimes | 最大重试次数,并发模式下,默认16;在有序模式下,默认 Integer.MAX_VALUE。 |
bname | BrokerName |
目标地址
Message所在Broker的地址
msg.getStoreHost()
请求方式
同步请求
请求流程
/**
* 石臻臻的杂货铺
* wx: szzdzhp001
**/
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
boolean needRetry = true;
try {
// 部分代码忽略....
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Throwable t) {
log.error("Failed to send message back, consumerGroup={}, brokerName={}, mq={}, message={}",
this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t);
if (needRetry) {
//以发送普通消息的形式发送重试消息
sendMessageBackAsNormalMessage(msg);
}
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
RequestCode.CONSUMER_SEND_MSG_BACK
3 + msg.getReconsumeTimes()
; 这里发送消息的Producer客户端是Consumer在构建实例的时候创建的内置的Producer客户端,客户端实例名是:CLIENT_INNER_PRODUCER; 这个发送也是同步发送;超时时间是 3000
本地客户端重试是一直重试还是有次数限制?
如果一直失败,并且都是客户端重试,没有次数限制,并且每次都是延迟5秒消费;它会成为消费Offset的阻塞点;后续的消息都有被重新消费的可能性(比如客户端重启)
AbstractSendMessageProcessor#consumerSendMsgBack
brokerPermission
权限不可写则返回无权限错误码retryQueueNums
<=0 返回无权限错误码offset
查找该Message; 如果没有查询到则返回系统异常错误码consumeMessageAfter
方法注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。所以就需要我们消费者端做好消费幂等操作。
顺序消费完毕执行处理结果的流程
ConsumeMessageOrderlyService#processConsumeResult
几个重要点
如果在消费的时候,你返回的是ConsumeConcurrentlyStatus#RECONSUME_LATER, 则表示本次消费失败,需要重试,则本次分配到的Msgs都会被重试;
本次分配的Msgs数量是由consumer.setConsumeMessageBatchMaxSize(1)
决定的;默认就是1;表示一次消费一条消息;
可以。
控制重试次数:
3.4.9 之前是使用subscriptionGroupConfig消费组配置retryMaxTimes
3.4.9 之后是客户端指定(requestHeader.getMaxReconsumeTimes())
这里可以通过Consumer#setMaxReconsumeTimes(最大次数)
来设置值
并发模式默认16次重试的间隔时间:
默认情况下,都是Broker端来控制的重试间隔时间,间隔时间是用延迟消息来实现的,比如Broker端的延迟级别为3+重试次数
; 默认情况下第一次重试对应的等级 3的时间间隔为:10s;
想要自定义重试的间隔时间的话,那么就需要自己在消费的时候来处理了,比如
/**
* 石臻臻的杂货铺
* vx: shiyanzu001
**/
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf(" ----- %s 消费消息: %s 本批次大小: %s ------ ", Thread.currentThread().getName(), msg, msg.size());
for (int i = 0; i < msg.size(); i++) {
System.out.println("第 " + i + " 条消息, MSG: " + msg.get(i));
if(消费失败){
// 延迟等级5 = 延迟1分钟;
context.setDelayLevelWhenNextConsume(5);
// 或者你也可以根据重试的次数来递增延迟级别
context.setDelayLevelWhenNextConsume(3 + msg.get(i).getReconsumeTimes());
}
// 需要重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
可以
但是目前仅限于返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS的情况。如果是返回的ConsumeConcurrentlyStatus.RECONSUME_LATER,则整批的消息都会重试。
具体,请看下面的代码
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf(" ----- %s 消费消息: %s 本批次大小: %s ------ ", Thread.currentThread().getName(), msg, msg.size());
for (int i = 0; i < msg.size(); i++) {
System.out.println("第 " + i + " 条消息, MSG: " + msg.get(i));
try{
// 消费逻辑
}catch(Exception e){
// 这条消息失败, 从这条消息以及其后的消息都需要重试
context.setAckIndex(i-1);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
笔者认为,这里应该同样支持 消费失败(RECONSUME_LATER)的情况,来允许用户控制从哪个消息开始才需要重试。
需要重试的消息,会将消息写入到 %RETRY%{consumerGroup} 重试队列中,等延迟时间一到,客户端会重新消费这些消息。
如果超出重试次数,则会放入到死信队列%DLQ%{consumerGroup}中。不会再重试
答: 有影响。
消费重试的机制是,先往Broker发回重试消息,如果你把写权限关闭了,那么这个流程就阻塞了,就会在本地客户端一直重试, 无限次数的延迟5s进行消费。
当然,如果一直本地重试的话,这个Msg就会成功消费的一个阻塞点,所有它后面的Offset就算被消费了,也提交不了。
所以关闭Broker写权限还是需要慎重。
更详细请看:Rocketmq并发消费失败重试机制