MQ除了具有解除业务耦合的作用之外,还可以消峰填谷,可以将流量拉平,避免短期密集请求压垮系统。
需要安装RocketMQ,安装教程请见springboot基础(48): RocketMQ的安装
public static void startConsumer() throws MQClientException {
log.info("----开始批量消费消息-----");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");//设置group
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//从队列的头部消费
consumer.subscribe("msgid", "*");
consumer.setConsumeMessageBatchMaxSize(32);//每次指定消费的10条消息,默认1
consumer.setPullBatchSize(32);//每次从Broker中拉40条消息,默认32
consumer.setPullInterval(5000);//拉取时间间隔
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("收到的消息数量:" + list.size());
for(MessageExt messageExt:list){
String str = new String(messageExt.getBody(), Charset.forName("UTF-8"));
System.out.println(str);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回状态是消费成功还是消费失败
}
});
consumer.start();
log.info("consumer started ....");
}
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String id) {
SendResult result = rocketMQTemplate.syncSend("msgid", "同步测试消息"+id, 1000);
System.out.println("msgid:"+id+",发送状态:"+result);
启动消费者后,我们快速的生产消息,供消费者进行消费,当快速发送请求时,RocketMQ消费消息。
假设系统的对某业务的处理能力是QPS为n,当QPS超过n时,会超过处理能力。
那么我们为了方便距离,直接让消费者最大数量为1,也就是只有一个consumer thread,这样所有的请求都需要等待这个consumer来处理。
这里为了方便模拟MQ消峰填谷效果,所以使用了consumer thread 为1,业务场景当然不会这么设置。
设置消费者线程最大最小数量为1
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
public static void startConsumer() throws MQClientException {
log.info("----开始批量消费消息-----");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");//设置group
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//从队列的头部消费
consumer.subscribe("msgid", "*");
consumer.setConsumeMessageBatchMaxSize(5);//每次指定消费的10条消息,默认1
consumer.setPullBatchSize(12);//每次从Broker中拉40条消息,默认32
consumer.setPullInterval(5000);//拉取时间间隔
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("收到的消息数量:" + list.size());
for(MessageExt messageExt:list){
String str = new String(messageExt.getBody(), Charset.forName("UTF-8"));
log.info(str);
TimeUnit.SECONDS.sleep(1);//模拟业务处理消耗的时间
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回状态是消费成功还是消费失败
}
});
consumer.start();
log.info("consumer started ....");
}
此时候for循环,发送mq消费,模拟突增的业务请求。
启动服务器,批量发送生产多条消息。可以看到生产者突然向MQ发送了大量的消息,而观察consumer,则依然按照设定的业务处理能力进行业务处理。
消费方按照1条/秒的处理能力处理消息。
参数的可取值范围。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.java