- package repayment.config;
-
- import cn.itcast.wanxinp2p.repayment.message.diy.DefaultMessageListenerConcurrently;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.springframework.beans.BeansException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RocketMQConsumerConfig {
-
- // 如果定义了多个 DefaultMQPushConsumer, 请注意 形参 的名字
- @Bean(initMethod = "start", destroyMethod = "shutdown")
- public DefaultMQPushConsumer defaultMQPushConsumer(DefaultMessageListenerConcurrently messageListener) throws MQClientException {
- // 初始化consumer,并设置consumer group name
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DEFAULT_CONSUMER_GROUP");
-
- // 设置NameServer地址
- consumer.setNamesrvAddr("localhost:9876");
- //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
- consumer.subscribe("TEST_TOPIC", "*");
-
- //注册回调接口来处理从Broker中收到的消息
- consumer.registerMessageListener(messageListener);
- return consumer;
- }
-
-
- }
-
需要特别注意 MessageListener 使用的是 @Autowired 注入的是 MessageHandler 类型的接口
并且执行了 MessageHandler 的getELFilter(),[通过SPEL计算得出]和 test()
计算是该MessageExt否符合.
对于符合的MessageHandler , 先对其 MessageExt 提取Body. 再 执行 具体处理消息的逻辑onMessage()
- package repayment.message;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.expression.EvaluationContext;
- import org.springframework.expression.ExpressionParser;
- import org.springframework.expression.spel.standard.SpelExpressionParser;
- import org.springframework.expression.spel.support.SimpleEvaluationContext;
- import org.springframework.stereotype.Component;
-
- import java.util.List;
- import java.util.Optional;
-
- @Slf4j
- @Component
- public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
-
- @Autowired
- private List
rocketMQListenerList; -
- @SuppressWarnings("unchecked")
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { - for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- // rocketMQListener 选择
- ExpressionParser parser = new SpelExpressionParser();
- EvaluationContext cont = SimpleEvaluationContext.forReadWriteDataBinding().build();
- cont.setVariable("messageExt", messageExt);
- Optional
first = rocketMQListenerList.stream() - .filter(rocketMQListener -> {
- String elFilter = rocketMQListener.getELFilter();
- if (StringUtils.isBlank(elFilter))
- return true;
- return parser.parseExpression(elFilter).getValue(cont, Boolean.class);
- })
- .filter(rocketMQListener -> rocketMQListener.test(messageExt))
- .findFirst();
- // 注意,如果筛选完成没有获取到 rocketMQListener 则自此会抛出异常
- MessageHandler rocketMQListener = first.get();
-
- // 转换消息并执行
- rocketMQListener.onMessage(rocketMQListener.convertMessage(messageExt));
-
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
- } catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
- package repayment.message;
-
- import org.apache.rocketmq.common.message.MessageExt;
-
- /**
- * @param
消息 body 的数据类型,如果没有重写 convertMessage 方法, 则建议 为 String - * 消费 RocketMQ 消息的帮助类
- */
- public interface MessageHandler
{ -
- /**
- * 通过 SPEL 筛选 MessageHandler 的方式
- * SPEL 上下文设置的了 #messageExt
- *
- * @return 稍后用于计算的 SPEL 表达式(默认返回空字符串,代表不过滤)
- */
- default String getELFilter() {
- return "";
- }
-
- /**
- * 通过 messageExt 筛选 MessageHandler 的普通方式
- * 默认返回 空字符串,代表不过滤。
- *
- * @param messageExt MessageExt
- * @return true:保留,false:丢弃
- */
- default boolean test(MessageExt messageExt) {
- return true;
- }
-
-
- /**
- * @param messageExt MessageExt
- * @return 默认为字符串类型的数据
- */
- default T convertMessage(MessageExt messageExt) {
- return (T) new String(messageExt.getBody());
- }
-
- /**
- * 具体处理消息的逻辑
- *
- * @param message messageExt.body
- */
- void onMessage(T message);
-
-
- }
用于解析 topic = TEST_TOPIC, Tags.contains("tag0") 的消息
- package repayment.message.handler;
-
- import cn.itcast.wanxinp2p.repayment.ann.MQSelect;
- import org.apache.rocketmq.common.message.MessageExt;
-
- // topic = TEST_TOPIC, Tags.contains("tag0")
- @Component
- public class HelloMessageHandler implements MessageHandler
{ -
- @Override
- public String getELFilter() {
- return "#messageExt.topic == 'TEST_TOPIC'";
- }
-
- @Override
- public boolean test(MessageExt messageExt) {
- return messageExt.getTags().contains("tag0");
- }
-
- @Override
- public void onMessage(String message) {
- System.out.println(message);
- }
-
- }