• rocketmq Listener 消费消息的优雅方式(基于SPEL)


    DefaultMQPushConsumer 配置

    1. package repayment.config;
    2. import cn.itcast.wanxinp2p.repayment.message.diy.DefaultMessageListenerConcurrently;
    3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    4. import org.apache.rocketmq.client.exception.MQClientException;
    5. import org.springframework.beans.BeansException;
    6. import org.springframework.context.ApplicationContext;
    7. import org.springframework.context.ApplicationContextAware;
    8. import org.springframework.context.annotation.Bean;
    9. import org.springframework.context.annotation.Configuration;
    10. @Configuration
    11. public class RocketMQConsumerConfig {
    12. // 如果定义了多个 DefaultMQPushConsumer, 请注意 形参 的名字
    13. @Bean(initMethod = "start", destroyMethod = "shutdown")
    14. public DefaultMQPushConsumer defaultMQPushConsumer(DefaultMessageListenerConcurrently messageListener) throws MQClientException {
    15. // 初始化consumer,并设置consumer group name
    16. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DEFAULT_CONSUMER_GROUP");
    17. // 设置NameServer地址
    18. consumer.setNamesrvAddr("localhost:9876");
    19. //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
    20. consumer.subscribe("TEST_TOPIC", "*");
    21. //注册回调接口来处理从Broker中收到的消息
    22. consumer.registerMessageListener(messageListener);
    23. return consumer;
    24. }
    25. }

    自定义 MessageListener

    需要特别注意 MessageListener 使用的是 @Autowired 注入的是 MessageHandler 类型的接口

    并且执行了 MessageHandler  的getELFilter(),[通过SPEL计算得出]和 test()

    计算是该MessageExt否符合.

    对于符合的MessageHandler , 先对其 MessageExt 提取Body. 再 执行 具体处理消息的逻辑onMessage()

    1. package repayment.message;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.apache.commons.lang3.StringUtils;
    4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    7. import org.apache.rocketmq.common.message.MessageExt;
    8. import org.springframework.beans.factory.annotation.Autowired;
    9. import org.springframework.expression.EvaluationContext;
    10. import org.springframework.expression.ExpressionParser;
    11. import org.springframework.expression.spel.standard.SpelExpressionParser;
    12. import org.springframework.expression.spel.support.SimpleEvaluationContext;
    13. import org.springframework.stereotype.Component;
    14. import java.util.List;
    15. import java.util.Optional;
    16. @Slf4j
    17. @Component
    18. public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
    19. @Autowired
    20. private List rocketMQListenerList;
    21. @SuppressWarnings("unchecked")
    22. @Override
    23. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    24. for (MessageExt messageExt : msgs) {
    25. log.debug("received msg: {}", messageExt);
    26. try {
    27. long now = System.currentTimeMillis();
    28. // rocketMQListener 选择
    29. ExpressionParser parser = new SpelExpressionParser();
    30. EvaluationContext cont = SimpleEvaluationContext.forReadWriteDataBinding().build();
    31. cont.setVariable("messageExt", messageExt);
    32. Optional first = rocketMQListenerList.stream()
    33. .filter(rocketMQListener -> {
    34. String elFilter = rocketMQListener.getELFilter();
    35. if (StringUtils.isBlank(elFilter))
    36. return true;
    37. return parser.parseExpression(elFilter).getValue(cont, Boolean.class);
    38. })
    39. .filter(rocketMQListener -> rocketMQListener.test(messageExt))
    40. .findFirst();
    41. // 注意,如果筛选完成没有获取到 rocketMQListener 则自此会抛出异常
    42. MessageHandler rocketMQListener = first.get();
    43. // 转换消息并执行
    44. rocketMQListener.onMessage(rocketMQListener.convertMessage(messageExt));
    45. long costTime = System.currentTimeMillis() - now;
    46. log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
    47. } catch (Exception e) {
    48. log.warn("consume message failed. messageExt:{}", messageExt, e);
    49. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    50. }
    51. }
    52. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    53. }
    54. }

    MessageHandler 接口定义

    1. package repayment.message;
    2. import org.apache.rocketmq.common.message.MessageExt;
    3. /**
    4. * @param 消息 body 的数据类型,如果没有重写 convertMessage 方法, 则建议 为 String
    5. * 消费 RocketMQ 消息的帮助类
    6. */
    7. public interface MessageHandler {
    8. /**
    9. * 通过 SPEL 筛选 MessageHandler 的方式
    10. * SPEL 上下文设置的了 #messageExt
    11. *
    12. * @return 稍后用于计算的 SPEL 表达式(默认返回空字符串,代表不过滤)
    13. */
    14. default String getELFilter() {
    15. return "";
    16. }
    17. /**
    18. * 通过 messageExt 筛选 MessageHandler 的普通方式
    19. * 默认返回 空字符串,代表不过滤。
    20. *
    21. * @param messageExt MessageExt
    22. * @return true:保留,false:丢弃
    23. */
    24. default boolean test(MessageExt messageExt) {
    25. return true;
    26. }
    27. /**
    28. * @param messageExt MessageExt
    29. * @return 默认为字符串类型的数据
    30. */
    31. default T convertMessage(MessageExt messageExt) {
    32. return (T) new String(messageExt.getBody());
    33. }
    34. /**
    35. * 具体处理消息的逻辑
    36. *
    37. * @param message messageExt.body
    38. */
    39. void onMessage(T message);
    40. }

    自定义的 HelloMessageHandler

    用于解析 topic = TEST_TOPIC, Tags.contains("tag0") 的消息

    1. package repayment.message.handler;
    2. import cn.itcast.wanxinp2p.repayment.ann.MQSelect;
    3. import org.apache.rocketmq.common.message.MessageExt;
    4. // topic = TEST_TOPIC, Tags.contains("tag0")
    5. @Component
    6. public class HelloMessageHandler implements MessageHandler {
    7. @Override
    8. public String getELFilter() {
    9. return "#messageExt.topic == 'TEST_TOPIC'";
    10. }
    11. @Override
    12. public boolean test(MessageExt messageExt) {
    13. return messageExt.getTags().contains("tag0");
    14. }
    15. @Override
    16. public void onMessage(String message) {
    17. System.out.println(message);
    18. }
    19. }

    Debug 调试效果

  • 相关阅读:
    面试:抽象类和接口
    DAY52 300.最长递增子序列 + 674. 最长连续递增序列 + 718. 最长重复子数组
    Go语言入门(一)
    14:第二章:架构后端项目:10:封装“返回结果”;(也就是定义API统一返回对象)(同时,使用枚举类统一管理错误信息)
    MySQL中数据库、数据表的基本操作
    数据结构——树
    上手Python之set(集合)
    Selenium爬取内容并存储至MySQL数据库
    C语言 - 你一定能看懂的三子棋详解(万字长文,傻瓜式解析)
    pytest+allure生成测试报告
  • 原文地址:https://blog.csdn.net/qq_34922830/article/details/136482070