• rocketMQ的使用


    首先服务器配置好MQ

    其次添加maven依赖,增加mq配置,例如nameserver地址。

    创建生产者要素

    1.  NameServer.   //boker的ip+端口。
    2.  生产者组的name名字。
    3. 也可以有其他 键值对。

    创建Message要素。

    1. topic   //一个主题。主题会有消费者订阅。
    2. tag     //标签同一个标题下也会有不同的分支。
    3. 要传信息String的code[]。//可以通过json工具转成code【】。
    1. FxTaskCommonMessageDTO fxTaskCommonMessageDTO = new FxTaskCommonMessageDTO();
    2. fxTaskCommonMessageDTO.setType(tag);
    3. fxTaskCommonMessageDTO.setBody(body);
    4. fxTaskCommonMessageDTO.setUniqueId(UUID.randomUUID().toString());
    5. //直接将一个对象转成jsonString的code形式
    6. Message message = new Message(topic, tag,JSON.toJSONBytes(fxTaskCommonMessageDTO));

    创建好生产者后,statr()后,不shotdown()是不会停止的。可以将生成的生产者放入一个map中方便随时取用。

    用map缓存生产者。

    1. Properties properties = new Properties();
    2. properties.setProperty(PropertyKeyConst.GROUP_ID, producerGroup.getCode());
    3. properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    4. properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    5. properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServer);
    6. Producer producer = ONSFactory.createProducer(properties);
    7. producer.start();
    8. producerGroupMap.put(producerGroup.getCode(), producer);

     取生产者。发送消息。

    1. Producer producer = producerGroupMap.get(producerGroup.getCode());
    2. SendResult sendResult = producer.send(message);

    创建消费者要素

    1. 消费组的name
    2. NameServer.  ///consumer.setNamesrvAddr("localhost:9876");
    3.  消费者订阅的topic和tag。
    4. 一个消息的处理方式例如:如下处理方式是注册一个监听。
      1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
      2. consumer.setNamesrvAddr("localhost:9876");
      3. consumer.subscribe("Topic", "zhangkai");
      4. consumer.registerMessageListener(new MessageListenerConcurrently() {
      5. @Override
      6. public ConsumeConcurrentlyStatus consumeMessage(List msgs,
      7. ConsumeConcurrentlyContext context) {
      8. System.out.println(fromBytes(msgs.get(0).getBody()));
      9. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      10. }
      11. });
      12. consumer.start();

      又或者实现    implements MessageListener接口。然后作为消费者的参数bean。

      1. @RocketMQMessageListener(consumerGroup = MQConstant.GID_NJIA_COMMON, topic = MQConstant.USER_GROWTH_ORDER, selectorExpression = "pay||aftersale||settle")
      2. //自定义的注解。记录着这个类是一个监听类.他的消费组名,订阅的top,多个tag。
      3. public class UserOrderGrowthListener implements MessageListener {
      4. @Override
      5. public Action consume(Message message, ConsumeContext consumeContext) {
      6. }
      7. }
      8. private void createConsumer() {
      9. Class clazz = AopProxyUtils.ultimateTargetClass(bean);
      10. RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
      11. consumer.subscribe(annotation.topic(), annotation.selectorExpression(), (MessageListener) bean);
      12. consumer.start();
      13. }

      如此就创建好了接收消息的消费者。消费消息只需要对接收消息的body进行操作。

      1. String msgBody;
      2. String tag;
      3. try {
      4. msgBody = new String(message.getBody());
      5. tag = message.getTag();
      6. UserOrderGrowthDTO userOrderGrowthDTO = JSON.parseObject(msgBody, UserOrderGrowthDTO.class);
      7. //一下对数据进行操作。
      8. } catch (Exception e) {
      9. LoggerUtil.error(defaultLogger, e, "订单成长值消息消费失败", userId);
      10. return Action.ReconsumeLater;
      11. } finally {
      12. if (Objects.nonNull(key)) {
      13. redisCacheManager.releaseLock(key, requestId);
      14. }
      15. }

       

    完整生产者代码。

    1. @Component
    2. public class DefaultMQProducerTemplate implements InitializingBean {
    3. @Value("${rocketmq.consumer.accessKey}")
    4. private String accessKey;
    5. @Value("${rocketmq.consumer.secretKey}")
    6. private String secretKey;
    7. @Value("${rocketmq.consumer.onsAddr}")
    8. private String nameServer;
    9. private static final Logger logger = LoggerUtil.ROCKET_MQ;
    10. private static final Map producerGroupMap = Maps.newHashMap();
    11. private static final Map orderProducerGroupMap = Maps.newHashMap();
    12. @Override
    13. public void afterPropertiesSet() {
    14. for (ProducerGroup producerGroup : ProducerGroup.values()) {
    15. Properties properties = new Properties();
    16. properties.setProperty(PropertyKeyConst.GROUP_ID, producerGroup.getCode());
    17. properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    18. properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    19. properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServer);
    20. if (ConsumeMode.CONCURRENTLY == producerGroup.getConsumeMode()) {
    21. // 创建普通生产者
    22. Producer producer = ONSFactory.createProducer(properties);
    23. producer.start();
    24. producerGroupMap.put(producerGroup.getCode(), producer);
    25. } else {
    26. // 创建顺序生产者
    27. OrderProducer producer = ONSFactory.createOrderProducer(properties);
    28. producer.start();
    29. orderProducerGroupMap.put(producerGroup.getCode(), producer);
    30. }
    31. }
    32. }
    33. private static void validate(final Message message, final Admin admin) {
    34. Assert.isTrue(Objects.nonNull(admin), "生产者不存在");
    35. Assert.isTrue(Objects.nonNull(message), "消息不能为空");
    36. }
    37. /**
    38. * 发送顺序消息
    39. *
    40. * @param message 要发送的消息
    41. * @param shardingKey 全局顺序消息,该字段可以设置为任意非空字符串。分区顺序消息中区分不同分区的关键字段,sharding key于普通消息的key是完全不同的概念。
    42. * @param producerGroup 生产者组
    43. */
    44. public SendResult sendOrder(final Message message, String shardingKey, final ProducerGroup producerGroup) {
    45. OrderProducer orderProducer = orderProducerGroupMap.get(producerGroup.getCode());
    46. validate(message, orderProducer);
    47. try {
    48. // 同步发送消息,只要不抛异常就是成功
    49. return orderProducer.send(message, shardingKey);
    50. } catch (ONSClientException e) {
    51. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
    52. }
    53. return null;
    54. }
    55. public SendResult send(final Message message, final ProducerGroup producerGroup) {
    56. return send(message, producerGroup, true);
    57. }
    58. public SendResult sendDelayMessage(final Message message, final LocalDateTime startDeliverTime, final ProducerGroup producerGroup) {
    59. // 设置消息需要被投递的时间
    60. message.setStartDeliverTime(LocalDateTimeUtil.getMilliByTime(startDeliverTime));
    61. return send(message, producerGroup, true);
    62. }
    63. /**
    64. * 发送同步消息
    65. *
    66. * @param message 要发送的消息
    67. * @param producerGroup 生产者组
    68. * @param compensate 是否持久化
    69. */
    70. public SendResult send(final Message message, final ProducerGroup producerGroup, final boolean compensate) {
    71. Producer producer = producerGroupMap.get(producerGroup.getCode());
    72. validate(message, producer);
    73. // 需要重试
    74. int retryTimes = 1;
    75. int maxRetryTimes = 3;
    76. String traceId = UUID.randomUUID().toString();
    77. do {
    78. try {
    79. // 同步发送消息,只要不抛异常就是成功
    80. if (retryTimes == 1) {
    81. LoggerUtil.info(logger, "[RocketMQSend]", "start", traceId, producerGroup.getCode(), message.getTopic(), message);
    82. }
    83. SendResult sendResult = producer.send(message);
    84. LoggerUtil.info(logger, "[RocketMQSend]", "success", traceId, producerGroup.getCode(), sendResult.getTopic(), sendResult.getMessageId());
    85. return sendResult;
    86. } catch (ONSClientException e) {
    87. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
    88. LoggerUtil.warn(logger, e, "[RocketMQSend]", "failed", traceId, producerGroup.getCode(), message.getTopic(), "retryTimes=" + retryTimes);
    89. if (retryTimes >= maxRetryTimes) {
    90. if (compensate) {
    91. messageCompensation(message, producerGroup);
    92. return new SendResult();
    93. } else {
    94. throw new ONSClientException("超出重试次数");
    95. }
    96. }
    97. retryTimes++;
    98. }
    99. } while (true);
    100. }
    101. /**
    102. * 发送消息,异步Callback形式
    103. *
    104. * @param message 要发送的消息
    105. * @param producerGroup 生产者组
    106. */
    107. public void sendAsync(final Message message, final ProducerGroup producerGroup) {
    108. String traceId = UUID.randomUUID().toString();
    109. LoggerUtil.info(logger, "[RocketMQSendAsync]", "start", traceId, producerGroup.getCode(), message.getTopic(), message);
    110. Producer producer = producerGroupMap.get(producerGroup.getCode());
    111. validate(message, producer);
    112. SendCallback sendCallback = new SendCallback() {
    113. @Override
    114. public void onSuccess(final SendResult sendResult) {
    115. // 消费发送成功
    116. LoggerUtil.info(logger, "[RocketMQSendAsync]", "success", traceId, sendResult.getTopic(), sendResult.getMessageId());
    117. }
    118. @Override
    119. public void onException(OnExceptionContext context) {
    120. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
    121. LoggerUtil.warn(logger, context.getException(), "[RocketMQSendAsync]", "failed", traceId, producerGroup.getCode(), context.getTopic(), context.getMessageId());
    122. messageCompensation(message, producerGroup);
    123. }
    124. };
    125. producer.sendAsync(message, sendCallback);
    126. }
    127. /**
    128. * 发送消息,OneWay形式,服务器不应答,无法保证消息是否成功到达服务器
    129. *
    130. * @param message 要发送的消息
    131. * @param producerGroup 生产者组
    132. */
    133. public void sendOneWay(final Message message, final ProducerGroup producerGroup) {
    134. Producer producer = producerGroupMap.get(producerGroup.getCode());
    135. validate(message, producer);
    136. producer.sendOneway(message);
    137. }
    138. /**
    139. * 持久化MQ消息
    140. *
    141. * @param message
    142. * @param producerGroup
    143. */
    144. private void messageCompensation(Message message, ProducerGroup producerGroup) {
    145. String messageBody = new String(message.getBody());
    146. MQMessageCompensationDO compensationDO = new MQMessageCompensationDO(producerGroup.getCode(), message.getTopic(), message.getTag(), message.getKey(), messageBody);
    147. int success = mqMessageCompensationMapper.insertMessage(compensationDO);
    148. if (success <= 0) {
    149. LoggerUtil.error(logger, "[RocketMQSend]持久化失败");
    150. throw new ONSClientException("持久化失败");
    151. }
    152. }
    153. }

     

    消费者配置

    1. import com.aliyun.openservices.ons.api.Consumer;
    2. import com.aliyun.openservices.ons.api.MessageListener;
    3. import com.aliyun.openservices.ons.api.ONSFactory;
    4. import com.aliyun.openservices.ons.api.PropertyKeyConst;
    5. import com.aliyun.openservices.ons.api.order.MessageOrderListener;
    6. import com.aliyun.openservices.ons.api.order.OrderConsumer;
    7. import com.google.common.collect.Maps;
    8. import org.apache.logging.log4j.Logger;
    9. import org.springframework.aop.framework.AopProxyUtils;
    10. import org.springframework.beans.BeansException;
    11. import org.springframework.beans.factory.SmartInitializingSingleton;
    12. import org.springframework.beans.factory.annotation.Value;
    13. import org.springframework.context.ApplicationContext;
    14. import org.springframework.context.ApplicationContextAware;
    15. import org.springframework.context.ConfigurableApplicationContext;
    16. import org.springframework.context.annotation.Configuration;
    17. import java.util.Map;
    18. import java.util.Properties;
    19. @Configuration
    20. public class ConsumeConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    21. private static final Logger logger = LoggerUtil.COMMON_DEFAULT;
    22. @Value("${rocketmq.consumer.accessKey}")
    23. private String accessKey;
    24. @Value("${rocketmq.consumer.secretKey}")
    25. private String secretKey;
    26. @Value("${rocketmq.consumer.onsAddr}")
    27. private String nameServer;
    28. private ConfigurableApplicationContext applicationContext;
    29. private Map/*consumerGroup*/, Map/*topic*/, Object>> consumerGroupMap = Maps.newHashMap();
    30. private Map/*consumerGroup*/, Map/*topic*/, Object>> orderConsumerGroupMap = Maps.newHashMap();
    31. @Override
    32. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    33. this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    34. }
    35. @Override
    36. public void afterSingletonsInstantiated() {
    37. Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
    38. beans.forEach(this::registerConsumer);
    39. createConsumer();
    40. }
    41. private void registerConsumer(String beanName, Object bean) {
    42. Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    43. RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    44. String consumerGroup = annotation.consumerGroup();
    45. String topic = annotation.topic();
    46. Map> groupMap;
    47. if (ConsumeMode.CONCURRENTLY == annotation.consumeMode()) {
    48. groupMap = consumerGroupMap;
    49. if (!MessageListener.class.isAssignableFrom(bean.getClass())) {
    50. throw new IllegalStateException(clazz + " is not instance of " + MessageListener.class.getName());
    51. }
    52. } else {
    53. groupMap = orderConsumerGroupMap;
    54. if (!MessageOrderListener.class.isAssignableFrom(bean.getClass())) {
    55. throw new IllegalStateException(clazz + " is not instance of " + MessageOrderListener.class.getName());
    56. }
    57. }
    58. if (!groupMap.containsKey(consumerGroup)) {
    59. groupMap.put(consumerGroup, Maps.newHashMap());
    60. }
    61. // 指针引用
    62. Map messageListenerMap = groupMap.get(consumerGroup);
    63. // 判断该consumerGroup是否有重复topic
    64. if (messageListenerMap.keySet().contains(topic)) {
    65. throw new IllegalStateException("[" + topic + "]" + " topic conflicts with existing," + bean.toString());
    66. }
    67. messageListenerMap.put(topic, bean);
    68. LoggerUtil.info(logger, "RegisterRocketMQ", beanName, annotation);
    69. }
    70. private void createConsumer() {
    71. // 普通消息订阅
    72. for (Map.Entry> entry : consumerGroupMap.entrySet()) {
    73. String consumerGroup = entry.getKey();
    74. Map messageListenerMap = entry.getValue();
    75. Properties consumerProperties = createProperties(consumerGroup);
    76. Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    77. // 订阅多个topic
    78. for (Map.Entry topicEntry : messageListenerMap.entrySet()) {
    79. Object bean = topicEntry.getValue();
    80. Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    81. RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    82. consumer.subscribe(annotation.topic(), annotation.selectorExpression(), (MessageListener) bean);
    83. }
    84. consumer.start();
    85. LoggerUtil.info(logger, "Consumer start success.", consumerGroup);
    86. }
    87. // 顺序消息订阅
    88. for (Map.Entry> entry : orderConsumerGroupMap.entrySet()) {
    89. String consumerGroup = entry.getKey();
    90. Map messageListenerMap = entry.getValue();
    91. Properties consumerProperties = createProperties(consumerGroup);
    92. OrderConsumer consumer = ONSFactory.createOrderedConsumer(consumerProperties);
    93. // 订阅多个topic
    94. for (Map.Entry topicEntry : messageListenerMap.entrySet()) {
    95. Object bean = topicEntry.getValue();
    96. Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    97. RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    98. consumer.subscribe(annotation.topic(), annotation.selectorExpression(), (MessageOrderListener) bean);
    99. }
    100. consumer.start();
    101. LoggerUtil.info(logger, "Order Consumer start success.", consumerGroup);
    102. }
    103. }
    104. private Properties createProperties(String consumerGroup) {
    105. Properties consumerProperties = new Properties();
    106. consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, consumerGroup);
    107. consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
    108. consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
    109. consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServer);
    110. return consumerProperties;
    111. }
    112. }

  • 相关阅读:
    【MATLAB教程案例4】直接序列扩频通信系统的MATLAB仿真
    $nextTick解决echarts宽度固定为100%的问题
    U盘里文件损坏无法打开怎么恢复?
    设计模式学习
    尿苷二磷酸修饰阿拉伯糖,阿拉伯糖偶联核苷酸,UDP-B-L-阿拉伯糖二钠盐,15839-78-8
    RAID和LVM配置指南:创建、扩容和管理RAID设备和逻辑卷的方法
    stm32之串口/蓝牙控制led灯
    GO远程构建并调试
    MySQL 锁分类和详细介绍
    redis(封装jedis)-----面试
  • 原文地址:https://blog.csdn.net/qq_55471073/article/details/125988378