应用程序在使用RocketMQ进行消息消费时必须支持幂等消费,即同一个消息被消费多次和消费一次的结果一样,这一点在使用RoketMQ或者分析RocketMQ源代码之前再怎么强调也不为过。
“至少一次送达”的消息交付策略,和消息重复消费是一对共生的因果关系,要做到不丢消息就无法避免消息重复消费,原因很简单,试想一下这样的场景:客户端接收到消息并完成了消费,在消费确认过程中发生了通讯错误,从Broker的角度是无法得知客户端是在接收消息过程中出错还是在消费确认过程中出错,为了确保不丢消息,重发消息是唯一的选择。
有了消息幂等消费约定的基础,RocketMQ就能够有针对性地采取一些性能优化措施,例如:并行消费、消费进度同步机制等,这也是RocketMQ性能优异的原因之一。
从不同的维度划分,Consumer支持以下消费模式:
使用相同 Group ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点
注意事项
广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。
注意事项
如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。

注意事项
RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。
我们上面使用的消费者都是PUSH模式,也是最常用的消费模式
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。
实现方式,代码上使用 DefaultMQPushConsumer
consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。
RocketMQ的PUSH模式是由PULL模式来实现的
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。
注意:RocketMQ 4.6.0版本后将弃用DefaultMQPullConsumer
DefaultMQPullConsumer方式需要手动管理偏移量,官方已经被废弃,将在2022年进行删除

DefaultLitePullConsumer
该类是官方推荐使用的手动拉取的实现类,偏移量提交由RocketMQ管理,不需要手动管理
consumer的每个实例是靠队列分配来决定如何消费消息的,那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)
为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
业务实现消费回调的时候,当且仅当此回调函数返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { - System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
- execute();//执行真正消费
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- })
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回
ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { - System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
- execute();//执行真正消费
- return ConsumeConcurrentlyStatus.RECONSUME_LATER
- }
- })
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。
对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况,因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
消息队列RocketMQ版默认允许每条消息最多重试16次,每次重试的间隔时间如下。
| 第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
|---|---|---|---|
| 1 | 10秒 | 9 | 7分钟 |
| 2 | 30秒 | 10 | 8分钟 |
| 3 | 1分钟 | 11 | 9分钟 |
| 4 | 2分钟 | 12 | 10分钟 |
| 5 | 3分钟 | 13 | 20分钟 |
| 6 | 4分钟 | 14 | 30分钟 |
| 7 | 5分钟 | 15 | 1小时 |
| 8 | 6分钟 | 16 | 2小时 |
如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。
消费者和生产者的重试还是有区别的,主要有两点
1S,5S,10S,30S,1M,2M····2H进行重试。注意:Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。
需要重试
消费失败后,重试配置方式,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
无需重试
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。
- //注册消息监听器
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- public ConsumeConcurrentlyStatus consumeMessage(List
list, ConsumeConcurrentlyContext context) { - //消息处理逻辑抛出异常,消息将重试。
- try {
- doConsumeMessage(list);
- }catch (Exception e){
- //捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- //业务方正常消费
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
在正常情况下无法被消费(超过最大重试次数)的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列就称为死信队列(Dead-Letter Queue)
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
redis队列中存放车辆信息,调度系统从队列中获取车辆信息,打车完成后再将车辆信息放回队列中
从list左侧弹出一个车辆
- /**
- * 从Redis List列表中拿取一个车辆ID
- * 如果没有获取到延时10S
- *
- * @return
- */
- public String takeVehicle() {
- //从Redis List列表中拿取一个车辆ID
- return redisTemplate.opsForList().leftPop(DispatchConstant.VEHICLE_QUEUE, 1, TimeUnit.SECONDS);
- }
检查车辆状态,并从右侧压入车辆
- /**
- * 设置车辆状态为Ready
- *
- * @param vehicleId
- */
- public void readyDispatch(String vehicleId) {
- //检查车辆状态
- DispatchConstant.DispatchType vehicleDispatchType = taxiVehicleStatus(vehicleId);
- //如果车辆时运行状态
- if (vehicleDispatchType.isRunning() || vehicleDispatchType.isReady()) {
- redisTemplate.opsForValue().set(DispatchConstant.VEHICLE_STATUS_PREFIX + vehicleId, DispatchConstant.DispatchType.READY.toString());
- //从右侧压入车辆
- redisTemplate.opsForList().rightPush(DispatchConstant.VEHICLE_QUEUE, vehicleId);
- }
- }