上一篇介绍了springboot整合使用了生产者,接下来看看消费者
消息通过生产者发送到某一个Topic,如果需要订阅该Topic并消费里面的消息的话,就要创建对应的消费者进行消费。在介绍消费者的使用方法之前,我们先介绍消费组、消费位点、推和拉等概念。
消息系统的重要作用之一是削峰填谷,但比如在电商大促的场景中,如果下游的消费者消费能力不足的话,大量的瞬时流量进入会后堆积在服务端。此时,消息的端到端延迟(从发送到被消费的时间)就会增加,对服务端而言,一直消费历史数据也会产生冷读。因此需要增加消费能力来解决这个问题,除了去优化消息消费的时间,最简单的方式就是扩容消费者。
但是否随意增加消费者就能提升消费能力? 首先需要了解消费组的概念。在消费者中消费组的有非常重要的作用,如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。
在 Apache RocketMQ 有两种消费模式,分别是:
集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。
广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力,具体示例如下图所示。
集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?
Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等,可以通过如下代码进行设置相应负载均衡策略
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
默认的分配策略是平均分配,这也是最常见的策略。平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。
在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力。
但也不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力。
如上图所示,在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复。
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Apache RocketMQ既提供了Push模式也提供了Pull模式。
顺序消息消费代码演示:
import com.formiss.common.Constant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_seq_group");
consumer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagB || TagC || TagD || TagE");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
public static void main(String[] args) throws InterruptedException, MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
System.out.println("----------------====================------------------");
for (MessageExt messageExt : msgs) {
System.err.println("消费消息: " + new String(messageExt.getBody()));//输出消息内容
}
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
相同,只是没用重写的:
@PostConstruct
public void consumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
try {
consumer.subscribe("TopicTest", "TagC");
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
System.out.println("====" + list.size());
for (MessageExt messageExt : list) {
System.err.println("消费消息: " + new String(messageExt.getBody()));//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
使用监听:
/**
* 消息消费方
* 1.如果两个消费者group和topic都一样,则二者轮循接收消息
* 2.如果两个消费者topic一样,而group不一样,则消息变成广播机制
* RocketMQListener<>泛型必须和接收的消息类型相同
*/
@Component
@RocketMQMessageListener(
topic = "convertAndSend-topic", // 消息的发送者使用同一个topic
consumerGroup = "shen-producer-group" // 不用和生产者group相同 ( 在RocketMQ中消费者和发送者组没有关系 )
// ,messageModel = MessageModel.CLUSTERING // 不用和生产者group相同 ( 在RocketMQ中消费者和发送者组没有关系 )
)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
Push消费用的是DefaultMQPushConsumer
,Pull消费用的是DefaultMQPullConsumer
用法都基本一样,只是4.6.0版本的Pull消费推出了Lite Pull Consumer,使用DefaultLitePullConsumer
,相比于原始的Pull Consumer更加简单易用,它提供了Subscribe和Assign两种模式,Subscribe模式,Subscribe模式示例如下
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.setPullBatchSize(20);
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
首先还是初始化DefaultLitePullConsumer并设置ConsumerGroupName,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过setPullBatchSize可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer默认是自动提交位点。在subscribe模式下,同一个消费组下的多个LitePullConsumer会负载均衡消费,与PushConsumer一致。
Assign模式的示例
public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}
Assign模式一开始仍然是初始化DefaultLitePullConsumer,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列
,因此在例子中,先用fetchMessageQueues获取了Topic下的队列,再取前面的一半队列进行拉取,示例中还调用了seek方法,将第一个队列拉取的位点设置从10开始。紧接着进入循环不停地调用poll方法拉取消息,拉取到消息后调用commitSync方法手动提交位点。
1、可以使用tag
// subExpression 订阅表达式:可以是"tag1 || tag2 || tag3"
// 空或者* 表示订阅全部的标签
consumer.subscribe(topic, subExpression);
消费者将接收指定标签的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
常量支持类型为:
// 设置一些属性(生产者)
msg.putUserProperty("a", 5);
// (消费者)只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
// MessageSelector.bySql("(num between 0 and 5 ) and (info = 'aaa')")
// MessageSelector.bySql("(a is not null and a between 0 and 3)")