大家好,在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。

Apache RocketMQ在消费者服务中,为我们提供了Push模式也提供了Pull模式
那他们主要有什么区别呢?
Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
BROADCASTING(广播模式):当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。
CLUSTERING(集群模式):当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
我们直接跑一个官方提供的Demo,大家也可以去官网上去下载源码
- public static void main(String[] args) throws InterruptedException, MQClientException {
-
- /*
- * Instantiate with specified consumer group name.
- * 消费者模式有两种 推和拉
- */
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
- consumer.setNamesrvAddr("127.0.0.1:9876");
- /*
- * Specify where to start in case the specific consumer group is a brand-new one.
- * 指定消费从哪里开始
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- /*
- * Subscribe one more topic to consume.
- * 设置监听主题以及过滤条件
- */
- consumer.subscribe("TopicTest999", "*");
- /*
- * Register callback to execute on arrival of messages fetched from brokers.
- * 注册消息监听器
- */
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- //System.out.println("待消费条数:"+ msgs.size());
- LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
- /*try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }*/
- LOGGER.info("success");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- /*
- * Launch the consumer instance.
- */
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
- 复制代码
- consumer.setNamesrvAddr("127.0.0.1:9876");
- 复制代码
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- 复制代码
- // AllocateMessageQueueByConfig
- AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
- MessageQueue messageQueue = new MessageQueue();
- messageQueue.setBrokerName("broker-a");
- messageQueue.setQueueId(2);
- messageQueue.setTopic("TopicTest");
- allocateMessageQueueByConfig.setMessageQueueList(Collections.singletonList(messageQueue));
- consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
- 复制代码
- // AllocateMessageQueueByMachineRoom
- AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
- // Broker部署
- @Override
- public String brokerDeployIn(MessageQueue messageQueue) {
- System.out.println(messageQueue.getBrokerName().split("-")[0]);
- return messageQueue.getBrokerName().split("-")[0];
- }
- // 消费端部署
- @Override
- public String consumerDeployIn(String clientID) {
- System.out.println(clientID.split("-")[0]);
- return clientID.split("-")[0];
- }
- };
- consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));
- 复制代码
Tag过滤,用于对某个Topic下的消息进行分类,
消息发送到名称为TopicTest999的Topic中,被各个不同的系统所订阅,我们可以利用Tag来区分
- consumer.subscribe("TopicTest999", "order");
-
- consumer.subscribe("TopicTest999", "user");
- 复制代码
注册消息监听器的目的就是为了接收消息,RocketMQ本身为我们提供了两种模式
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
- /*try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }*/
- LOGGER.info("success");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- 复制代码
- consumer.registerMessageListener(new MessageListenerOrderly() {
- AtomicLong consumeTimes = new AtomicLong(0);
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- this.consumeTimes.incrementAndGet();
- if ((this.consumeTimes.get() % 2) == 0) {
- return ConsumeOrderlyStatus.SUCCESS;
- } else if ((this.consumeTimes.get() % 5) == 0) {
- context.setSuspendCurrentQueueTimeMillis(3000);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- 复制代码
他们主要的区别继承MessageListener接口的实现
除了这一些重要的参数以外,RocketMQ为我们提供了其他非常丰富的配置,我总结在了下图

需要注意的是,在配置后我们才能去调用启动方法
入口:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
- @Override
- public void start() throws MQClientException {
- // step 1 设置消费者分组
- setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
- this.defaultMQPushConsumerImpl.start();
- if (null != traceDispatcher) {
- try {
- traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
- } catch (MQClientException e) {
- log.warn("trace dispatcher start failed ", e);
- }
- }
- }
- 复制代码
在这里主要做了五件事
- private void checkConfig() throws MQClientException {
- // 检查消费者组,是否满足条件
- Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
-
- if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
- ...
- }
-
- if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
- ...
- }
-
- if (null == this.defaultMQPushConsumer.getMessageModel()) {
- ...
- }
-
- if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
- ...
- }
- // allocateMessageQueueStrategy
- // subscription
- // messageListener
- // consumeThreadMin
- // consumeThreadMax
- // consumeConcurrentlyMaxSpan
- // pullThresholdForQueue
- // pullThresholdForTopic
- // pullThresholdSizeForQueue
- // pullInterval
- // consumeMessageBatchMaxSize
- // pullBatchSize
- 复制代码
主要是进行了参数配置的校验,如果一些参数设置不合理的,在这里就会抛出异常,终止了消费者服务的启动,这里的配置对后面的使用会产生一定的影响,所以我们在配置的时候需要更加的谨慎
- private void copySubscription() throws MQClientException {
- try {
- Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
- if (sub != null) {
- for (final Map.Entry<String, String> entry : sub.entrySet()) {
- final String topic = entry.getKey();
- final String subString = entry.getValue();
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
- // 更新内部订阅关系
- this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
- }
- }
-
- if (null == this.messageListenerInner) {
- this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
- }
- // 默认情况下我们是CLUSTERING模式
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING:
- break;
- case CLUSTERING:
- // 创建重试主题
- final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
- // 将重试主题放入订阅关系容器中
- this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
- 复制代码
获取配置的订阅关系,因为setSubscription()方法已经被作废,subscription都是为空的,在下面他会去维护一个subscriptionInner
- protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
- new ConcurrentHashMap<String, SubscriptionData>();
- 复制代码
subscriptionInner保存了我们在前置配置的时候插入的订阅关系
- if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
- this.defaultMQPushConsumer.changeInstanceNameToPID();
- }
- public void changeInstanceNameToPID() {
- if (this.instanceName.equals("DEFAULT")) {
- this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
- }
- }
- 复制代码
将实例名称修改为 pid + 时间戳
- // format: "pid@hostname"
- final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName();
-
- public static int getPid() {
- try {
- return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@')));
- } catch (Exception e) {
- return -1;
- }
- }
- 复制代码
- this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
-
- // 一个clientId对应一个实例
- if (null == instance) {
- instance =
- new MQClientInstance(clientConfig.cloneClientConfig(),
- this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
- // 一个clientId对应一个实例
- MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
- if (prev != null) {
- instance = prev;
- log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
- } else {
- log.info("Created new MQClientInstance for clientId:[{}]", clientId);
- }
- }
- 复制代码
在一个JVM中所有消费者、生产者持有同一个MQClientInstance实例,采用了双重检查的方式,来确定有且只有一个MQClientInstance实例。
- this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
- this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
- this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
- this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
- 复制代码
这里注册过滤消息的钩子函数有什么用呢,我们可以思考一下 ?
- private final ArrayList
filterMessageHookList = new ArrayList(); -
- this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
- // 注册过滤消息 钩子函数
- this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
- 复制代码
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- // 广播模式
- case BROADCASTING:
- // 消息进度存储在本地文件
- this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
- break;
- // 集群模式
- case CLUSTERING:
- // 消息进度存储在Broker 服务器上
- this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
- break;
- default:
- break;
-
- this.offsetStore.load();
- }
- 复制代码
目的是初始化消息进度,以及确定消息存储的位置。
顺序消费模式与并发消费模式有一些的不同
根据不同的消息监听器初始化消费消息线程池、扫描过期消息清除线程池
顺序消息模式,不初始化扫描过期消息清除线程池,只初始化消费消息线程池
- if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
- this.consumeOrderly = true;
- this.consumeMessageService =
- new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
- }
- 复制代码
初始化消费消息线程池、扫描过期线程池
- if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
- this.consumeOrderly = false;
- this.consumeMessageService =
- new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
- }
- 复制代码
入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#start
- // 启动消费消息服务
- this.consumeMessageService.start();
- 复制代码
顺序消费模式,启动的是线程池名称为ConsumeMessageScheduledThread_开头的定时线程池,每秒扫描一次
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- ConsumeMessageOrderlyService.this.lockMQPeriodically();
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
- }
- }
- }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
- 复制代码
lockMQPeriodically()这个方法就是给当前客户端所消费的所有队列去borker进行上锁。
- public synchronized void lockMQPeriodically() {
- if (!this.stopped) {
- this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
- }
- }
- 复制代码
启动的线程池是清除过期消息定时线程池,每15分钟扫描一次
- // 开启过期消息清除,定时器
- this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- cleanExpireMsg();
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
- }
- }
-
- }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
- 复制代码
入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerConsumer
- // 将消费者组注入消费者容器
- boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
- 复制代码
消费者容器
- private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
-
- MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
- if (prev != null) {
- log.warn("the consumer group[" + group + "] exist already.");
- return false;
- }
-
- 复制代码
使用consumerTable来保存消费者组与消费者关系
在后续更新主题路由信息,检查客户端与Broker的关系等
入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#start
- mQClientFactory.start();
- 复制代码
这一个环节的内容过多,我们将在另外写一篇文章中,去了解RocketMQ是如何做消息监听的
更新主题发布详情,当发布者变动时
- this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
- Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
- if (subTable != null) {
- for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
- final String topic = entry.getKey();
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- }
- }
- 复制代码
检查Broker状态
- this.mQClientFactory.checkClientInBroker();
- 复制代码
向Broker设置心跳检测
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- 复制代码
Consumer跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。
我们主要了解的RocketMQ消息消费的启动流程,还是看得懂的,抓住这一条主线,我们在接下去的过程中,我们可以给自己留下一些问题,比如他是如何做负载均衡的?如何监听消息?在这过程中他使用到了线程池,我们能够去监听线程池的情况等等问题。
我用思维导图整理了今天分享的内容,如果有不对的地方欢迎大家指出~
