
Push方式是broker接收到消息后,主动把消息推送到Consumer
broker的工作量,影响broker性能. 2) Consumer的处理能力各不相同,Consumer的状态不受broker控制,如果Consumer不能及时处理broker推送过来的消息,会造成各种问题,比如缓冲区溢出、内存溢出等Pull方式是Consumer循环地从broker拉取消息,拉取多少消息,什么时候拉取都是由Consumer决定,处理完毕再继续拉取,这样可以达到限流的目的,不会出现处理不过来的情况。
Consumer自己控制流量broker的消息不能及时处理长轮询:既有Pull的优点,又能达到实时性的目的。长轮询的局限性是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
consumerGroup:用于把多个Consumer组织在一起,提高处理能力,consumerGroup需要和消息模式(MessageModel)配合使用
RocketMQ支持两种消息模式:Clustering和Broadcasting
Clustering(集群)模式下,同一个ConsumerGroup(消费组)(GroupName相同)里的Consumer负载均衡消费部分信息Broadcasting(广播)模式下,同一个ConsumerGroup(消费组)里的每个Consumer都能消费到所订阅Topic的全部信息,即广播模式TAG:通过指定消息的Tag进行消息过滤(在发送消息时设置Tag)
// 订阅testTopic下TagA的消息
consumer.subscribe(topic, "TagA");
// 订阅testTopic下TagA或TagC或TagD的消息
consumer.subscribe("testTopic", "TagA || TagC || TagD");
// 订阅testTopic下所有消息
consumer.subscribe("testTopic", "*");
DefaultMQPushConsumer:系统收到消息后自动调用监听器处理,自动保存offset,加入新的DefaultMQPushConsumer会自动做负载均衡。一个应用创建一个DefaultMQPushConsumer,由应用来维护此对象,可以设置为全局对象或者单例。
并行消费使用MessageListenerConcurrently适用于不考虑消息顺序。PushConsumer为了保证消息肯定消费成功,消费方需要明确表示消费成功,Broker才会认为消息消费成功。
ConsumeConcurrentlyStatus.CONSUME_SUCCESS,Broker才会认为这批消息(默认是1条)是消费完成的。ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。push模式并发消费案例
@Test
public void testPushConsumer() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
String consumerGroup = "ConsumerGroupName3";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
try {
//从上一次消费位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setInstanceName("Consumber");
//设置集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//批量接收20条
consumer.setConsumeMessageBatchMaxSize(20);
//默认64
consumer.setConsumeThreadMax(4);
//默认20
consumer.setConsumeThreadMin(1);
consumer.setVipChannelEnabled(false);
String topic = "testTopic";
String tag = "TagA";
consumer.subscribe(topic, tag);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
try {
String topic = messageExt.getTopic();
String msg = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
String tags = messageExt.getTags();
logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), topic, tags, msg);
} catch (Exception e) {
logger.error(e.getMessage(), e);
// 重试消费,重发到Broker的RETRY TOPIC。 10s后Broker默认重新投递
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
//表示此批消息消费完成
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Consumer对象在使用之前必须要调用start初始化,初始化一次即可
consumer.start();
LockSupport.park();
} catch (MQClientException e) {
logger.error(e.getMessage(), e);
} finally {
consumer.shutdown();
}
}
push顺序消费案例:MessageListenerOrderly适用于顺序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费。所以顺序消息消费失败,不会再继续推进,直到失败的消息消费成功为止
@Test
public void testMsgOrder() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
String consumerGroup = "ConsumerGroupNameOrder";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
try {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setInstanceName("ConsumberOrder");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeMessageBatchMaxSize(20);
consumer.setVipChannelEnabled(false);
consumer.setConsumeThreadMax(4);
consumer.setConsumeThreadMin(1);
consumer.subscribe("testTopic", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
//设置自动提交,默认true
context.setAutoCommit(true);
for (MessageExt messageExt : list) {
String msgStr = null;
try {
msgStr = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), messageExt.getTopic(), messageExt.getTags(), msgStr);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
LockSupport.park();
} catch (MQClientException e) {
logger.error(e.getMessage(), e);
} finally {
consumer.shutdown();
}
}
Pull方式:Consumer自己控制消费
@Test
public void testPullConsumer() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("ConsumerGroupName11");
DefaultMQPullConsumer defaultMQPullConsumer = scheduleService.getDefaultMQPullConsumer();
try {
defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
defaultMQPullConsumer.setVipChannelEnabled(false);
scheduleService.setMessageModel(MessageModel.CLUSTERING);
String topic = "testTopic";
scheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
logger.info("brokerName:{},topic:{},queueId:{}", mq.getBrokerName(), mq.getTopic(), mq.getQueueId());
try {
// 获取消费偏移量,其中fromStore为true表示从存储端(即Broker端)获取消费进度;
// 若fromStore为false表示从本地内存获取消费进度
long offset = consumer.fetchConsumeOffset(mq, true);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "TagA", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for (MessageExt messageExt : list) {
logger.info("msg:{}", new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
}
break;
case NO_MATCHED_MSG:
logger.info("没有匹配的消息");
break;
case NO_NEW_MSG:
logger.info("没有新消息");
break;
case OFFSET_ILLEGAL:
logger.info("偏移量非法");
break;
default:
break;
}
// 存储Offset,客户端每隔5s会定时刷新到Broker
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置再过1000ms后重新拉取
context.setPullNextDelayTimeMillis(1000);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
scheduleService.start();
LockSupport.park();
} catch (MQClientException e) {
logger.error(e.getMessage(), e);
} finally {
scheduleService.shutdown();
}
}