• SpringBoot基于AOP实现RocketMQ发送与消费


    一、业务要求

    将http请求参数和返回值部分进行封装,发送给MQ,由于该项目是一个社区项目,需要将用户点赞、评论等信息发送给消费者,使用接口较多,所以采用AOP注解方式进行生产者的消息发送。

    二、AOP几个概念:

    (1)Target:目标类,要被代理的类,例如,UserService;

    (2)JoinPoint(连接点):所谓的连接点,是指那些被拦截到的方法;

    (3)PointCut(切入点):被增强的连接点(所谓的增强其实就是添加的新功能);

    (4)Advice(通知、增强),增强代码;

    (5)Weaving(织入):是指把增强的advice应用到目标对象target来创建新的代理对象proxy的 过程。

    (6)proxy:代理类;

    (7)Aspect(切面):是切入点pointcut和通知advice的结合。

    (8)前置通知(@Before):在我们执行目标方法之前运行;

    (9)后置通知(@After):在我们执行目标方法结束之后,不管有没有异常;

    (10)返回通知(@AfterReturning):在我们的目标方法正常返回值后运行;

    (11)异常通知(AfterThrowing):在我们的目标方法出现异常后运行;

    (12)环绕通知(@Around):动态代理,需要手动执行jionPoint.process(),其实就是执行我们的目标方法执行之前,相当于前置通知,执行之后就相当于我们的后置通知。

    二、自定义注解@ActivityMqProducer

    任何带有该注解的方法都会将发送mq。

    1. package com.community.annotations;
    2. import java.lang.annotation.*;
    3. /**
    4. * @description: 带有该注解的方法将发送活动mq
    5. * @author kanlina
    6. * @date 2022/9/1 下午2:10
    7. * @version 1.0
    8. */
    9. @Target({ElementType.PARAMETER, ElementType.METHOD}) //注解作用域
    10. @Retention(RetentionPolicy.RUNTIME)//注解运行时期
    11. @Documented
    12. public @interface ActivityMqProducer {
    13. }

    三、AOP设置

    1. package com.community.aop;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.community.mq.producer.activityPush.ActivityPushLocal;
    4. import com.marketingCommunityApi.dto.response.CommonModel;
    5. import com.ruubypay.miss.common.response.CommonResponse;
    6. import lombok.extern.slf4j.Slf4j;
    7. import org.aspectj.lang.JoinPoint;
    8. import org.aspectj.lang.annotation.AfterReturning;
    9. import org.aspectj.lang.annotation.Aspect;
    10. import org.aspectj.lang.annotation.Pointcut;
    11. import org.springframework.stereotype.Component;
    12. import javax.annotation.Resource;
    13. /**
    14. * @author kanlina
    15. * @version 1.0
    16. * @description: 活动mq切入点
    17. * @date 2022/9/1 下午2:15
    18. */
    19. @Aspect
    20. @Component
    21. @Slf4j
    22. public class ActivityMqProducerAspect {
    23. @Resource
    24. ActivityPushLocal activityPushLocal;
    25. @Pointcut(value = "@annotation(com.community.annotations.ActivityMqProducer)")
    26. public void serviceAspect() {
    27. }
    28. /**
    29. * 正常返回通知,拦截service层记录用户正常的日志
    30. */
    31. @SuppressWarnings({"rawtypes", "unchecked"})
    32. @AfterReturning(returning = "returnValue", pointcut = "serviceAspect()")
    33. public void doAfter(JoinPoint joinPoint, Object returnValue) throws Exception {
    34. log.info("aop处理activityMq开始");
    35. String name = joinPoint.getSignature().getName();
    36. System.out.println(name);
    37. //发送mq
    38. //获取返回值
    39. CommonResponse response = (CommonResponse) returnValue;
    40. CommonModel commonModel = JSONObject.parseObject(JSONObject.parseObject(response.getResData().toString()).getString("CommonModel"), CommonModel.class);
    41. activityPushLocal.pushMessageLocal(commonModel);
    42. }
    43. }

    四、MQ配置

    1. 配置消费者和topic 阿里云 消息队列RocketMQ版 创建topic和Group
    2. 生产者配置文件
      1. "1.0" encoding="UTF-8"?>
      2. <beans xmlns="http://www.springframework.org/schema/beans"
      3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
      5. <bean id="activityPushProducerRef" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
      6. <property name="properties" >
      7. <map>
      8. <entry key="ProducerId" value="#{messagePushGroup['activityPushProducerId']}"/>
      9. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      10. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      11. map>
      12. property>
      13. bean>
      14. <bean id="activityPushProducer" class="com.community.mq.producer.PushCenterProducer">
      15. <property name="topic" value="#{messagePushGroup['activityPushTopicId']}"/>
      16. <property name="producer" ref="activityPushProducerRef"/>
      17. bean>
      18. beans>
    3.  生产者配置
      1. package com.community.mq.producer;
      2. import com.aliyun.openservices.ons.api.Message;
      3. import com.aliyun.openservices.ons.api.Producer;
      4. import com.aliyun.openservices.ons.api.SendResult;
      5. import org.slf4j.Logger;
      6. import org.slf4j.LoggerFactory;
      7. import java.io.UnsupportedEncodingException;
      8. /**
      9. * @description: 消息推送生产者配置
      10. * @author kanlina
      11. * @date 2022/8/30 上午10:51
      12. * @version 1.0
      13. */
      14. public class PushCenterProducer {
      15. private Logger logger = LoggerFactory.getLogger(PushCenterProducer.class);
      16. /**
      17. * 消息通道:tpoic,生产者和消费者必须保持相同的topic切网络环境能连通才行
      18. */
      19. private String topic;
      20. private Producer producer;
      21. public void setTopic(String topic) {
      22. this.topic = topic;
      23. }
      24. public void setProducer(Producer producer) {
      25. this.producer = producer;
      26. }
      27. /**
      28. * 消息队列短信队列发送示例
      29. * @param message 消息内容
      30. * @param tag tag:给消息打的tag,消费者可以消费指定tag的消息,如果消费者配置的 tag接收策略为* ,表示该通道
      31. * 下的所有tab的消息都要被消费。tag一般用于消息过滤时使用。
      32. * @param key 给消息设置的一个key,尽量做到全局唯一。
      33. * @return
      34. */
      35. public void pushMessage(String message, String tag, String key) throws UnsupportedEncodingException {
      36. Message msg = new Message( //
      37. // Message所属的Topic
      38. topic,
      39. // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
      40. tag,
      41. // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
      42. // 需要Producer与Consumer协商好一致的序列化和反序列化方式.
      43. //短信中心消息通道接收json格式的字符串。(详情查看README.md)严格使用"UTF-8编码"
      44. message.getBytes("UTF-8"));
      45. // 设置代表消息的业务关键属性,请尽可能全局唯一
      46. // 1 -uuid 2 -uuid+时间戳 3 -推特的snowflake算法(保证唯一)
      47. // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
      48. // 注意:不设置也不会影响消息正常收发
      49. //此id可以作为业务消费方去重的依据。
      50. msg.setKey(key);
      51. // 发送消息,只要不抛异常就是成功 (默认的api就是同步发送消息,只要不抛异常,同步发送消息就是成功)
      52. try {
      53. SendResult sendResult = producer.send(msg);
      54. logger.info("消息管理 推送消息成功 返回参数:{}", sendResult);
      55. } catch (Exception e) {
      56. logger.error("消息管理 推送消息异常:", e);
      57. }
      58. }
      59. }
    4. 生产者信息发送

      1. package com.community.mq.producer.activityPush;
      2. import com.alibaba.fastjson.JSON;
      3. import com.community.mq.producer.PushCenterProducer;
      4. import com.marketingCommunityApi.dto.response.CommonModel;
      5. import com.ruubypay.miss.cmsg.constants.PushType;
      6. import com.ruubypay.miss.cmsg.dto.MqMsgDTO;
      7. import lombok.extern.slf4j.Slf4j;
      8. import org.springframework.scheduling.annotation.Async;
      9. import org.springframework.stereotype.Component;
      10. import javax.annotation.Resource;
      11. import java.util.Date;
      12. import java.util.UUID;
      13. /**
      14. * @author kanlina
      15. * @version 1.0
      16. * @description: 社区活动推送
      17. * @date 2022/8/30 上午10:18
      18. */
      19. @Slf4j
      20. @Component
      21. public class ActivityPushLocal {
      22. @Resource(name = "activityPushProducer")
      23. private PushCenterProducer activityPushProducer;
      24. /**
      25. * 发送mq
      26. *
      27. * @param smsString
      28. */
      29. @Async
      30. public void smsMessage(String smsString) {
      31. try {
      32. activityPushProducer.pushMessage(
      33. smsString,
      34. "activityPush",
      35. UUID.randomUUID().toString().replaceAll("-", ""));
      36. } catch (Exception e) {
      37. log.error("调用活动推送异常", e);
      38. }
      39. }
      40. /**
      41. * 消息中心推送消息,只展示在消息页
      42. */
      43. @Async
      44. public void pushMessageLocal(CommonModel model) {
      45. log.debug("社区消息页,推送消息,推送内容:{}", model);
      46. smsMessage(JSON.toJSONString(model));
      47. }
      48. }
    5. 消费者消费

      1. "1.0" encoding="UTF-8"?>
      2. <beans xmlns="http://www.springframework.org/schema/beans"
      3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
      5. <bean id="msgListener" class="com.ruubypay.marketing.mq.message.consumer.MqMessageListener">bean>
      6. <bean id="transRecordListener" class="com.ruubypay.marketing.mq.message.consumer.MqTransRecordListener"/>
      7. <bean id="matchedListener" class="com.ruubypay.marketing.mq.message.consumer.MqMatchedListener"/>
      8. <bean id="matched4CMBListener" class="com.ruubypay.marketing.mq.message.consumer.MqMatched4CMBListener"/>
      9. <bean id="shopOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqShopOrderListener"/>
      10. <bean id="digitalShopOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqDigitalShopOrderListener"/>
      11. <bean id="anniversaryListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryListener"/>
      12. <bean id="anniversaryOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryOrderListener"/>
      13. <bean id="anniversaryBusListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryBusListener"/>
      14. <bean id="cmbEquityListener" class="com.ruubypay.marketing.mq.message.consumer.MqCmbEquityListener"/>
      15. <bean id="groundPushListener" class="com.ruubypay.marketing.mq.message.consumer.MqGroundPushListener"/>
      16. <bean id="iCBCActivityListener" class="com.ruubypay.marketing.mq.message.consumer.MqICBCActivityListener"/>
      17. <bean id="couponUsingRecordListener" class="com.ruubypay.marketing.mq.message.consumer.MqCouponUsingRecordListener"/>
      18. <bean id="payFissionListener" class="com.ruubypay.marketing.mq.message.consumer.MqPayFissionListener"/>
      19. <bean id="payChannelOperateListener" class="com.ruubypay.marketing.mq.message.consumer.MqPayChannelListener"/>
      20. <bean id="communityActivityPushListener" class="com.ruubypay.marketing.mq.message.consumer.MqCommunityActivityPushListener"/>
      21. <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      22. <property name="properties" >
      23. <map>
      24. <entry key="GROUP_ID" value="#{messagePushGroup['mq_groupId']}"/>
      25. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      26. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      27. <entry key="ConsumeThreadNums" value="50"/>
      28. map>
      29. property>
      30. <property name="subscriptionTable">
      31. <map>
      32. <entry value-ref="msgListener">
      33. <key>
      34. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      35. <property name="topic" value="#{messagePushGroup['consumerTopic']}"/>
      36. <property name="expression" value="*"/>
      37. bean>
      38. key>
      39. entry>
      40. map>
      41. property>
      42. bean>
      43. <bean id="transRecordConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      44. <property name="properties" >
      45. <map>
      46. <entry key="GROUP_ID" value="#{messagePushGroup['transRecordGroupId']}"/>
      47. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      48. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      49. map>
      50. property>
      51. <property name="subscriptionTable">
      52. <map>
      53. <entry value-ref="transRecordListener">
      54. <key>
      55. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      56. <property name="topic" value="#{messagePushGroup['transRecordTopic']}"/>
      57. <property name="expression" value="*"/>
      58. bean>
      59. key>
      60. entry>
      61. map>
      62. property>
      63. bean>
      64. <bean id="shopOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      65. <property name="properties" >
      66. <map>
      67. <entry key="GROUP_ID" value="#{messagePushGroup['shopOrderGroupId']}"/>
      68. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      69. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      70. map>
      71. property>
      72. <property name="subscriptionTable">
      73. <map>
      74. <entry value-ref="shopOrderListener">
      75. <key>
      76. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      77. <property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
      78. <property name="expression" value="*"/>
      79. bean>
      80. key>
      81. entry>
      82. map>
      83. property>
      84. bean>
      85. <bean id="digitalShopOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      86. <property name="properties" >
      87. <map>
      88. <entry key="GROUP_ID" value="#{messagePushGroup['digitalShopOrderGroupId']}"/>
      89. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      90. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      91. map>
      92. property>
      93. <property name="subscriptionTable">
      94. <map>
      95. <entry value-ref="digitalShopOrderListener">
      96. <key>
      97. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      98. <property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
      99. <property name="expression" value="*"/>
      100. bean>
      101. key>
      102. entry>
      103. map>
      104. property>
      105. bean>
      106. <bean id="matchedTripConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      107. <property name="properties" >
      108. <map>
      109. <entry key="GROUP_ID" value="#{messagePushGroup['matchedTripGroupId']}"/>
      110. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      111. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      112. map>
      113. property>
      114. <property name="subscriptionTable">
      115. <map>
      116. <entry value-ref="matchedListener">
      117. <key>
      118. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      119. <property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
      120. <property name="expression" value="*"/>
      121. bean>
      122. key>
      123. entry>
      124. map>
      125. property>
      126. bean>
      127. <bean id="matchedTrip4CMBConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      128. <property name="properties" >
      129. <map>
      130. <entry key="GROUP_ID" value="#{messagePushGroup['matchedTrip4CMBGroupId']}"/>
      131. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      132. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      133. map>
      134. property>
      135. <property name="subscriptionTable">
      136. <map>
      137. <entry value-ref="matched4CMBListener">
      138. <key>
      139. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      140. <property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
      141. <property name="expression" value="*"/>
      142. bean>
      143. key>
      144. entry>
      145. map>
      146. property>
      147. bean>
      148. <bean id="anniversaryConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      149. <property name="properties" >
      150. <map>
      151. <entry key="GROUP_ID" value="#{messagePushGroup['anniversary2GroupId']}"/>
      152. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      153. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      154. map>
      155. property>
      156. <property name="subscriptionTable">
      157. <map>
      158. <entry value-ref="anniversaryListener">
      159. <key>
      160. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      161. <property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
      162. <property name="expression" value="*"/>
      163. bean>
      164. key>
      165. entry>
      166. map>
      167. property>
      168. bean>
      169. <bean id="anniversaryOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      170. <property name="properties" >
      171. <map>
      172. <entry key="GROUP_ID" value="#{messagePushGroup['anniversary2OrderGroupId']}"/>
      173. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      174. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      175. map>
      176. property>
      177. <property name="subscriptionTable">
      178. <map>
      179. <entry value-ref="anniversaryOrderListener">
      180. <key>
      181. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      182. <property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
      183. <property name="expression" value="*"/>
      184. bean>
      185. key>
      186. entry>
      187. map>
      188. property>
      189. bean>
      190. <bean id="anniversaryBusConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      191. <property name="properties" >
      192. <map>
      193. <entry key="GROUP_ID" value="#{messagePushGroup['anniversary2BusGroupId']}"/>
      194. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      195. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      196. map>
      197. property>
      198. <property name="subscriptionTable">
      199. <map>
      200. <entry value-ref="anniversaryBusListener">
      201. <key>
      202. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      203. <property name="topic" value="#{messagePushGroup['transRecordTopic']}"/>
      204. <property name="expression" value="*"/>
      205. bean>
      206. key>
      207. entry>
      208. map>
      209. property>
      210. bean>
      211. <bean id="cmbEquityConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      212. <property name="properties" >
      213. <map>
      214. <entry key="GROUP_ID" value="#{messagePushGroup['cmbEquityGroupId']}"/>
      215. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      216. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      217. map>
      218. property>
      219. <property name="subscriptionTable">
      220. <map>
      221. <entry value-ref="cmbEquityListener">
      222. <key>
      223. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      224. <property name="topic" value="#{messagePushGroup['cmbEquityTopic']}"/>
      225. <property name="expression" value="*"/>
      226. bean>
      227. key>
      228. entry>
      229. map>
      230. property>
      231. bean>
      232. <bean id="groundPushConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      233. <property name="properties" >
      234. <map>
      235. <entry key="GROUP_ID" value="#{messagePushGroup['groundPushGroupId']}"/>
      236. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      237. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      238. map>
      239. property>
      240. <property name="subscriptionTable">
      241. <map>
      242. <entry value-ref="groundPushListener">
      243. <key>
      244. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      245. <property name="topic" value="#{messagePushGroup['groundPushTopic']}"/>
      246. <property name="expression" value="*"/>
      247. bean>
      248. key>
      249. entry>
      250. map>
      251. property>
      252. bean>
      253. <bean id="iCBCActivityConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      254. <property name="properties" >
      255. <map>
      256. <entry key="GROUP_ID" value="#{messagePushGroup['iCBCActivityGroupId']}"/>
      257. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      258. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      259. map>
      260. property>
      261. <property name="subscriptionTable">
      262. <map>
      263. <entry value-ref="iCBCActivityListener">
      264. <key>
      265. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      266. <property name="topic" value="#{messagePushGroup['iCBCActivityTopic']}"/>
      267. <property name="expression" value="*"/>
      268. bean>
      269. key>
      270. entry>
      271. map>
      272. property>
      273. bean>
      274. <bean id="couponUsingRecordConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      275. <property name="properties" >
      276. <map>
      277. <entry key="GROUP_ID" value="#{messagePushGroup['couponUsingRecordGroupId']}"/>
      278. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      279. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      280. map>
      281. property>
      282. <property name="subscriptionTable">
      283. <map>
      284. <entry value-ref="couponUsingRecordListener">
      285. <key>
      286. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      287. <property name="topic" value="#{messagePushGroup['couponUsingRecordTopic']}"/>
      288. <property name="expression" value="*"/>
      289. bean>
      290. key>
      291. entry>
      292. map>
      293. property>
      294. bean>
      295. <bean id="payFissionConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      296. <property name="properties" >
      297. <map>
      298. <entry key="GROUP_ID" value="#{messagePushGroup['payFissionGroupId']}"/>
      299. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      300. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      301. map>
      302. property>
      303. <property name="subscriptionTable">
      304. <map>
      305. <entry value-ref="payFissionListener">
      306. <key>
      307. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      308. <property name="topic" value="#{messagePushGroup['groundPushTopic']}"/>
      309. <property name="expression" value="*"/>
      310. bean>
      311. key>
      312. entry>
      313. map>
      314. property>
      315. bean>
      316. <bean id="payChannelOperateConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      317. <property name="properties" >
      318. <map>
      319. <entry key="GROUP_ID" value="#{messagePushGroup['payChannelOperateGroupId']}"/>
      320. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      321. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      322. map>
      323. property>
      324. <property name="subscriptionTable">
      325. <map>
      326. <entry value-ref="payChannelOperateListener">
      327. <key>
      328. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      329. <property name="topic" value="#{messagePushGroup['payChannelOperateTopic']}"/>
      330. <property name="expression" value="*"/>
      331. bean>
      332. key>
      333. entry>
      334. map>
      335. property>
      336. bean>
      337. <bean id="communityActivityPushConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
      338. <property name="properties" >
      339. <map>
      340. <entry key="GROUP_ID" value="#{messagePushGroup['communityActivityPushGroupId']}"/>
      341. <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
      342. <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
      343. map>
      344. property>
      345. <property name="subscriptionTable">
      346. <map>
      347. <entry value-ref="communityActivityPushListener">
      348. <key>
      349. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
      350. <property name="topic" value="#{messagePushGroup['communityActivityPushTopicId']}"/>
      351. <property name="expression" value="*"/>
      352. bean>
      353. key>
      354. entry>
      355. map>
      356. property>
      357. bean>
      358. beans>
    6. 消费者业务监听

    1. package com.ruubypay.marketing.mq.message.consumer;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.aliyun.openservices.ons.api.Action;
    4. import com.aliyun.openservices.ons.api.ConsumeContext;
    5. import com.aliyun.openservices.ons.api.Message;
    6. import com.aliyun.openservices.ons.api.MessageListener;
    7. import com.marketingCommunityApi.dto.response.CommonModel;
    8. import com.ruubypay.log.annotation.LogMarker;
    9. import com.ruubypay.marketing.mq.model.PayChannelInfo;
    10. import com.ruubypay.marketing.mq.service.ActivityCommunityPushService;
    11. import com.ruubypay.marketing.mq.service.ActivityPayChannelService;
    12. import lombok.extern.slf4j.Slf4j;
    13. import org.springframework.beans.factory.annotation.Autowired;
    14. /**
    15. * @Author KanLina
    16. * @Description 社区操作通知
    17. * @Date 2022/9/01 下午2:50
    18. **/
    19. @Slf4j
    20. public class MqCommunityActivityPushListener implements MessageListener {
    21. @Autowired
    22. private ActivityCommunityPushService activityCommunityPushService;
    23. @LogMarker(businessDescription = "社区活动操作通知")
    24. @Override
    25. public Action consume(Message message, ConsumeContext context) {
    26. log.debug("收到消息,Topic:{},MsgID:{},开始执行[{}]", message.getTopic(), message.getMsgID(), message.getReconsumeTimes());
    27. try {
    28. CommonModel request = JSONObject.parseObject(message.getBody(), CommonModel.class);
    29. activityCommunityPushService.dealRequest(request);
    30. } catch (Exception e) {
    31. log.error("执行异常", e);
    32. return Action.ReconsumeLater;
    33. }
    34. log.debug("收到消息,Topic:{},MsgID:{},处理成功[{}]",message.getTopic(), message.getMsgID(), message.getReconsumeTimes());
    35. return Action.CommitMessage;
    36. }
    37. }

     

  • 相关阅读:
    Vector-常用CAN工具 - CANoe入门到精通_02
    PDF流前端如何接收:深度解析与实用策略
    MATLAB冒号符号
    SpringBoot-自定义Starter
    Redis的安装和使用
    阿里发布大模型发布图结构长文本处理智能体,超越GPT-4-128k
    练[MRCTF2020]套娃
    使用patch-package保存node_modules包修改
    深入了解汽车级功率MOSFET NVMFS2D3P04M8LT1G P沟道数据表
    【强化学习】Offline RL
  • 原文地址:https://blog.csdn.net/kk_lina/article/details/126642060