public class RocketMQConsumerDemo {
public static void main(String[] args) throws Exception{
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-java");
//设置 broker 地址
consumer.setNamesrvAddr("192.168.66.66:9876");
consumer.subscribe("topicA", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("接受消息:队列 ID==>"+messageExt.getQueueId() + " 消息 Id:" + messageExt.getMsgId());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} });
consumer.start();
//System.in.read();
}
}
注意: 订阅消息必须要使得消费者订阅主题必须存在。
一个消费者组不同的消费者是否可以同时消费多个主题??(试验)
答案:同一个消费者组中多个消费者不能同时消费多个主题topic,如果出现此现象,会导致部分消息无法消费,如上图。
什么原因导致这样现象呢??
答案:负载均衡算法问题导致了这一现象。(平均分配算法,环形平均分配算法…)
不同的消费者组,是否可以消费相同同的topic ??
可以的
注意:默认消费模式就是 一个消费者组消费一个主题(使用负载均衡算法进行分配)
由demo中 consumer.start(); 方法进入:
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
defaultMQPushConsumerImpl.start()方法实现:
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 检查配置
this.checkConfig();
// Rebalance负载均衡 复制订阅数据
this.copySubscription();
// 设置instanceName,为一个字符串化的数字,比如10072
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 获取MQClient对象,clientId为ip@instanceName,比如192.168.0.1@10072
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 设置负载均衡器
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
//默认这是消费模式为集群模式,每条消息被同一组的消费者中的一个消费
//还可以设置为广播模式,每条消息被同一个组的所有消费者都消费一次
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//默认是AllocateMessageQueueAveragely,均分策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 拉取API封装
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//生成消费进度处理器,集群模式下消费进度保存在Broker上,因为同一组内的消费者要共享进度;广播模式下进度保存在消费者端
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
//若是广播模式,加载本地的消费进度文件
this.offsetStore.load();
// 根据监听是顺序模式还是并发模式来生成相应的ConsumerService
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 设置MQClient对象
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
// 设置服务状态
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
mQClientFactory.start()实现:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
//如果url未指定,可以通过Http请求从其他处获取
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// 启动多个定时任务
this.startScheduledTask();
// Start pull service 拉取消费服务
this.pullMessageService.start();
// Start Consumer rebalance service 负载均衡
this.rebalanceService.start();
//启动内部默认的生产者,用于消费者SendMessageBack,但不会执行MQClientInstance.start(),也就是当前方法不会被执行
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
public void start() {
this.thread.start();
}
/**
* consumer负载均衡线程服务
*/
@SuppressWarnings("SpellCheckingInspection")
private final RebalanceService rebalanceService;
重点每隔20秒重新负载均衡
this.mqClientFactory.doRebalance();
/**
* Rebalance Service
* consumer负载均衡线程服务
*/
public class RebalanceService extends ServiceThread {
/**
* 等待间隔,单位:毫秒
*/
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final Logger log = ClientLogger.getLog();
/**
* MQClient对象
*/
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//每等待20S执行一次负载均衡
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
查看mqClientFactory.doRebalance()方法
/**
* 消费者进行平衡
*/
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
查看mqClientFactory.doRebalance()方法实现
@Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance(false);
}
}
/**
* 执行分配消费队列
*
* @param isOrder 是否顺序消息
*/
public void doRebalance(final boolean isOrder) {
// 分配每个 topic 的消息队列
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
// 移除未订阅的topic对应的消息队列
this.truncateMessageQueueNotMyTopic();
}
查看 this.rebalanceByTopic(topic, isOrder)实现:
/**
* 消费者对 单个Topic 重新进行平衡
*
* @param topic Topic
* @param isOrder 是否顺序
*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { //广播模式,每条消息被同一组的所有消费者均消费
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", //
consumerGroup, //
topic, //
mqSet, //
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: { //默认是集群模式,每条消息被同一消费者组的一个消费,
// 获取 topic 对应的 队列 和 consumer信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
// 排序 消费队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
List<MessageQueue> mqAll = new ArrayList<>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; //AllocateMessageQueueAveragely
// 根据 队列分配策略 分配消费队列
List<MessageQueue> allocateResult;
try {
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新消费队列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, "
+ "rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
/**
* Average Hashing queue algorithm
* 队列分配策略 - 平均分配
* 如果 队列数 和 消费者数量 相除有余数时,余数按照顺序"1"个"1"个分配消费者。
* 例如,5个队列,3个消费者时,分配如下:
* - 消费者0:[0, 1] 2个队列
* - 消费者1:[2, 3] 2个队列
* - 消费者2:[4, 4] 1个队列
*
* 代码块 (mod > 0 && index < mod) 判断即在处理相除有余数的情况。
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
// 校验参数是否正确
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 平均分配
int index = cidAll.indexOf(currentCID); // 第几个consumer。
int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
//队列总数 <= 消费者总数时,分配当前消费者1个队列
//不能均分 && 当前消费者序号(从0开始) < 余下的队列数 ,分配当前消费者 mqAll / cidAll +1 个队列
//不能均分 && 当前消费者序号(从0开始) >= 余下的队列数 ,分配当前消费者 mqAll / cidAll 个队列
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize
: index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消费队列。
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
}
/**
* Cycle average Hashing queue algorithm
* 队列分配策略 - 环状分配
*/
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// 校验参数是否正确
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 环状分配
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
return result;
}
@Override
public String getName() {
return "AVG_BY_CIRCLE";
}
}
/**
* Computer room Hashing queue algorithm, such as Alipay logic room
* 分配策略 - 首先筛选可消费Broker对应的消费队列,筛选后的消费队列平均分配。
* TODO 疑问:Broker、Consumer如何配置
*/
public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
/**
* 消费者消费brokerName集合
*/
private Set<String> consumeridcs;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// 参数校验
List<MessageQueue> result = new ArrayList<MessageQueue>();
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
return result;
}
// 计算符合当前配置的消费者数组('consumeridcs')对应的消费队列
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {
premqAll.add(mq);
}
}
// 平均分配
int mod = premqAll.size() / cidAll.size();
int rem = premqAll.size() % cidAll.size();
int startIndex = mod * currentIndex;
int endIndex = startIndex + mod;
for (int i = startIndex; i < endIndex; i++) {
result.add(mqAll.get(i));
}
if (rem > currentIndex) {
result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}
return result;
}
@Override
public String getName() {
return "MACHINE_ROOM";
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
注意:消费者也必须和namesvr保存长连接,consumer从namesvr每隔30s获取topic路由信息。从而根据负载策略进行消费的消费。队列(ip)和broker对应关系。