将http请求参数和返回值部分进行封装,发送给MQ,由于该项目是一个社区项目,需要将用户点赞、评论等信息发送给消费者,使用接口较多,所以采用AOP注解方式进行生产者的消息发送。
(1)Target:目标类,要被代理的类,例如,UserService;
(2)JoinPoint(连接点):所谓的连接点,是指那些被拦截到的方法;
(3)PointCut(切入点):被增强的连接点(所谓的增强其实就是添加的新功能);
(4)Advice(通知、增强),增强代码;
(5)Weaving(织入):是指把增强的advice应用到目标对象target来创建新的代理对象proxy的 过程。
(6)proxy:代理类;
(7)Aspect(切面):是切入点pointcut和通知advice的结合。
(8)前置通知(@Before):在我们执行目标方法之前运行;
(9)后置通知(@After):在我们执行目标方法结束之后,不管有没有异常;
(10)返回通知(@AfterReturning):在我们的目标方法正常返回值后运行;
(11)异常通知(AfterThrowing):在我们的目标方法出现异常后运行;
(12)环绕通知(@Around):动态代理,需要手动执行jionPoint.process(),其实就是执行我们的目标方法执行之前,相当于前置通知,执行之后就相当于我们的后置通知。
任何带有该注解的方法都会将发送mq。
- package com.community.annotations;
-
- import java.lang.annotation.*;
-
- /**
- * @description: 带有该注解的方法将发送活动mq
- * @author kanlina
- * @date 2022/9/1 下午2:10
- * @version 1.0
- */
- @Target({ElementType.PARAMETER, ElementType.METHOD}) //注解作用域
- @Retention(RetentionPolicy.RUNTIME)//注解运行时期
- @Documented
- public @interface ActivityMqProducer {
- }
- package com.community.aop;
-
- import com.alibaba.fastjson.JSONObject;
- import com.community.mq.producer.activityPush.ActivityPushLocal;
- import com.marketingCommunityApi.dto.response.CommonModel;
- import com.ruubypay.miss.common.response.CommonResponse;
- import lombok.extern.slf4j.Slf4j;
- import org.aspectj.lang.JoinPoint;
- import org.aspectj.lang.annotation.AfterReturning;
- import org.aspectj.lang.annotation.Aspect;
- import org.aspectj.lang.annotation.Pointcut;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- /**
- * @author kanlina
- * @version 1.0
- * @description: 活动mq切入点
- * @date 2022/9/1 下午2:15
- */
- @Aspect
- @Component
- @Slf4j
- public class ActivityMqProducerAspect {
- @Resource
- ActivityPushLocal activityPushLocal;
- @Pointcut(value = "@annotation(com.community.annotations.ActivityMqProducer)")
- public void serviceAspect() {
- }
-
- /**
- * 正常返回通知,拦截service层记录用户正常的日志
- */
- @SuppressWarnings({"rawtypes", "unchecked"})
- @AfterReturning(returning = "returnValue", pointcut = "serviceAspect()")
- public void doAfter(JoinPoint joinPoint, Object returnValue) throws Exception {
- log.info("aop处理activityMq开始");
- String name = joinPoint.getSignature().getName();
- System.out.println(name);
- //发送mq
- //获取返回值
- CommonResponse response = (CommonResponse) returnValue;
- CommonModel commonModel = JSONObject.parseObject(JSONObject.parseObject(response.getResData().toString()).getString("CommonModel"), CommonModel.class);
- activityPushLocal.pushMessageLocal(commonModel);
- }
- }
- "1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
-
-
-
- <bean id="activityPushProducerRef" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
-
- <property name="properties" >
- <map>
-
- <entry key="ProducerId" value="#{messagePushGroup['activityPushProducerId']}"/>
-
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- bean>
- <bean id="activityPushProducer" class="com.community.mq.producer.PushCenterProducer">
- <property name="topic" value="#{messagePushGroup['activityPushTopicId']}"/>
- <property name="producer" ref="activityPushProducerRef"/>
- bean>
-
- beans>
- package com.community.mq.producer;
-
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.Producer;
- import com.aliyun.openservices.ons.api.SendResult;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.io.UnsupportedEncodingException;
-
- /**
- * @description: 消息推送生产者配置
- * @author kanlina
- * @date 2022/8/30 上午10:51
- * @version 1.0
- */
-
- public class PushCenterProducer {
-
- private Logger logger = LoggerFactory.getLogger(PushCenterProducer.class);
- /**
- * 消息通道:tpoic,生产者和消费者必须保持相同的topic切网络环境能连通才行
- */
- private String topic;
-
- private Producer producer;
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public void setProducer(Producer producer) {
- this.producer = producer;
- }
-
- /**
- * 消息队列短信队列发送示例
- * @param message 消息内容
- * @param tag tag:给消息打的tag,消费者可以消费指定tag的消息,如果消费者配置的 tag接收策略为* ,表示该通道
- * 下的所有tab的消息都要被消费。tag一般用于消息过滤时使用。
- * @param key 给消息设置的一个key,尽量做到全局唯一。
- * @return
- */
- public void pushMessage(String message, String tag, String key) throws UnsupportedEncodingException {
- Message msg = new Message( //
- // Message所属的Topic
- topic,
- // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
- tag,
- // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
- // 需要Producer与Consumer协商好一致的序列化和反序列化方式.
- //短信中心消息通道接收json格式的字符串。(详情查看README.md)严格使用"UTF-8编码"
- message.getBytes("UTF-8"));
- // 设置代表消息的业务关键属性,请尽可能全局唯一
- // 1 -uuid 2 -uuid+时间戳 3 -推特的snowflake算法(保证唯一)
- // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
- // 注意:不设置也不会影响消息正常收发
- //此id可以作为业务消费方去重的依据。
- msg.setKey(key);
- // 发送消息,只要不抛异常就是成功 (默认的api就是同步发送消息,只要不抛异常,同步发送消息就是成功)
- try {
- SendResult sendResult = producer.send(msg);
- logger.info("消息管理 推送消息成功 返回参数:{}", sendResult);
- } catch (Exception e) {
- logger.error("消息管理 推送消息异常:", e);
- }
- }
- }
生产者信息发送
- package com.community.mq.producer.activityPush;
-
- import com.alibaba.fastjson.JSON;
- import com.community.mq.producer.PushCenterProducer;
- import com.marketingCommunityApi.dto.response.CommonModel;
- import com.ruubypay.miss.cmsg.constants.PushType;
- import com.ruubypay.miss.cmsg.dto.MqMsgDTO;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.util.Date;
- import java.util.UUID;
-
- /**
- * @author kanlina
- * @version 1.0
- * @description: 社区活动推送
- * @date 2022/8/30 上午10:18
- */
- @Slf4j
- @Component
- public class ActivityPushLocal {
-
- @Resource(name = "activityPushProducer")
- private PushCenterProducer activityPushProducer;
-
- /**
- * 发送mq
- *
- * @param smsString
- */
- @Async
- public void smsMessage(String smsString) {
- try {
- activityPushProducer.pushMessage(
- smsString,
- "activityPush",
- UUID.randomUUID().toString().replaceAll("-", ""));
- } catch (Exception e) {
- log.error("调用活动推送异常", e);
- }
- }
-
- /**
- * 消息中心推送消息,只展示在消息页
- */
- @Async
- public void pushMessageLocal(CommonModel model) {
- log.debug("社区消息页,推送消息,推送内容:{}", model);
- smsMessage(JSON.toJSONString(model));
- }
- }
消费者消费
- "1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <bean id="msgListener" class="com.ruubypay.marketing.mq.message.consumer.MqMessageListener">bean>
-
- <bean id="transRecordListener" class="com.ruubypay.marketing.mq.message.consumer.MqTransRecordListener"/>
- <bean id="matchedListener" class="com.ruubypay.marketing.mq.message.consumer.MqMatchedListener"/>
- <bean id="matched4CMBListener" class="com.ruubypay.marketing.mq.message.consumer.MqMatched4CMBListener"/>
- <bean id="shopOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqShopOrderListener"/>
- <bean id="digitalShopOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqDigitalShopOrderListener"/>
- <bean id="anniversaryListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryListener"/>
- <bean id="anniversaryOrderListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryOrderListener"/>
- <bean id="anniversaryBusListener" class="com.ruubypay.marketing.mq.message.consumer.MqAnniversaryBusListener"/>
- <bean id="cmbEquityListener" class="com.ruubypay.marketing.mq.message.consumer.MqCmbEquityListener"/>
- <bean id="groundPushListener" class="com.ruubypay.marketing.mq.message.consumer.MqGroundPushListener"/>
-
- <bean id="iCBCActivityListener" class="com.ruubypay.marketing.mq.message.consumer.MqICBCActivityListener"/>
- <bean id="couponUsingRecordListener" class="com.ruubypay.marketing.mq.message.consumer.MqCouponUsingRecordListener"/>
- <bean id="payFissionListener" class="com.ruubypay.marketing.mq.message.consumer.MqPayFissionListener"/>
- <bean id="payChannelOperateListener" class="com.ruubypay.marketing.mq.message.consumer.MqPayChannelListener"/>
- <bean id="communityActivityPushListener" class="com.ruubypay.marketing.mq.message.consumer.MqCommunityActivityPushListener"/>
-
-
- <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['mq_groupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- <entry key="ConsumeThreadNums" value="50"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="msgListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['consumerTopic']}"/>
- <property name="expression" value="*"/>
-
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="transRecordConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['transRecordGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="transRecordListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['transRecordTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="shopOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['shopOrderGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="shopOrderListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="digitalShopOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['digitalShopOrderGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="digitalShopOrderListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="matchedTripConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['matchedTripGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="matchedListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="matchedTrip4CMBConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['matchedTrip4CMBGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="matched4CMBListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="anniversaryConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['anniversary2GroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="anniversaryListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['matchedTripTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="anniversaryOrderConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['anniversary2OrderGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="anniversaryOrderListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['shopOrderTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="anniversaryBusConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['anniversary2BusGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="anniversaryBusListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['transRecordTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
- <bean id="cmbEquityConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['cmbEquityGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="cmbEquityListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['cmbEquityTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
-
- <bean id="groundPushConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['groundPushGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="groundPushListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['groundPushTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
-
-
- <bean id="iCBCActivityConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['iCBCActivityGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="iCBCActivityListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['iCBCActivityTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
-
-
- <bean id="couponUsingRecordConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['couponUsingRecordGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="couponUsingRecordListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['couponUsingRecordTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
-
-
- <bean id="payFissionConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['payFissionGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="payFissionListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['groundPushTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
-
- <bean id="payChannelOperateConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['payChannelOperateGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="payChannelOperateListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['payChannelOperateTopic']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
-
-
- <bean id="communityActivityPushConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
- <property name="properties" >
- <map>
- <entry key="GROUP_ID" value="#{messagePushGroup['communityActivityPushGroupId']}"/>
- <entry key="AccessKey" value="#{messagePushGroup['accessKey']}"/>
- <entry key="SecretKey" value="#{messagePushGroup['secretKey']}"/>
- map>
- property>
- <property name="subscriptionTable">
- <map>
- <entry value-ref="communityActivityPushListener">
- <key>
- <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
- <property name="topic" value="#{messagePushGroup['communityActivityPushTopicId']}"/>
- <property name="expression" value="*"/>
- bean>
- key>
- entry>
- map>
- property>
- bean>
-
- beans>
消费者业务监听
- package com.ruubypay.marketing.mq.message.consumer;
-
- import com.alibaba.fastjson.JSONObject;
- import com.aliyun.openservices.ons.api.Action;
- import com.aliyun.openservices.ons.api.ConsumeContext;
- import com.aliyun.openservices.ons.api.Message;
- import com.aliyun.openservices.ons.api.MessageListener;
- import com.marketingCommunityApi.dto.response.CommonModel;
- import com.ruubypay.log.annotation.LogMarker;
- import com.ruubypay.marketing.mq.model.PayChannelInfo;
- import com.ruubypay.marketing.mq.service.ActivityCommunityPushService;
- import com.ruubypay.marketing.mq.service.ActivityPayChannelService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
-
- /**
- * @Author KanLina
- * @Description 社区操作通知
- * @Date 2022/9/01 下午2:50
- **/
- @Slf4j
- public class MqCommunityActivityPushListener implements MessageListener {
-
- @Autowired
- private ActivityCommunityPushService activityCommunityPushService;
-
- @LogMarker(businessDescription = "社区活动操作通知")
- @Override
- public Action consume(Message message, ConsumeContext context) {
- log.debug("收到消息,Topic:{},MsgID:{},开始执行[{}]", message.getTopic(), message.getMsgID(), message.getReconsumeTimes());
- try {
- CommonModel request = JSONObject.parseObject(message.getBody(), CommonModel.class);
- activityCommunityPushService.dealRequest(request);
- } catch (Exception e) {
- log.error("执行异常", e);
- return Action.ReconsumeLater;
- }
- log.debug("收到消息,Topic:{},MsgID:{},处理成功[{}]",message.getTopic(), message.getMsgID(), message.getReconsumeTimes());
- return Action.CommitMessage;
- }
- }