- FxTaskCommonMessageDTO fxTaskCommonMessageDTO = new FxTaskCommonMessageDTO();
- fxTaskCommonMessageDTO.setType(tag);
- fxTaskCommonMessageDTO.setBody(body);
- fxTaskCommonMessageDTO.setUniqueId(UUID.randomUUID().toString());
- //直接将一个对象转成jsonString的code形式
- Message message = new Message(topic, tag,JSON.toJSONBytes(fxTaskCommonMessageDTO));
用map缓存生产者。
- Properties properties = new Properties();
- properties.setProperty(PropertyKeyConst.GROUP_ID, producerGroup.getCode());
- properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
- properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
- properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServer);
- Producer producer = ONSFactory.createProducer(properties);
- producer.start();
- producerGroupMap.put(producerGroup.getCode(), producer);
取生产者。发送消息。
- Producer producer = producerGroupMap.get(producerGroup.getCode());
- SendResult sendResult = producer.send(message);
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
- consumer.setNamesrvAddr("localhost:9876");
- consumer.subscribe("Topic", "zhangkai");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, - ConsumeConcurrentlyContext context) {
- System.out.println(fromBytes(msgs.get(0).getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
又或者实现 implements MessageListener接口。然后作为消费者的参数bean。
- @RocketMQMessageListener(consumerGroup = MQConstant.GID_NJIA_COMMON, topic = MQConstant.USER_GROWTH_ORDER, selectorExpression = "pay||aftersale||settle")
- //自定义的注解。记录着这个类是一个监听类.他的消费组名,订阅的top,多个tag。
- public class UserOrderGrowthListener implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext consumeContext) {
- }
- }
-
- private void createConsumer() {
- Class> clazz = AopProxyUtils.ultimateTargetClass(bean);
- RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
- consumer.subscribe(annotation.topic(), annotation.selectorExpression(), (MessageListener) bean);
- consumer.start();
- }
如此就创建好了接收消息的消费者。消费消息只需要对接收消息的body进行操作。
- String msgBody;
- String tag;
- try {
- msgBody = new String(message.getBody());
- tag = message.getTag();
- UserOrderGrowthDTO userOrderGrowthDTO = JSON.parseObject(msgBody, UserOrderGrowthDTO.class);
- //一下对数据进行操作。
- } catch (Exception e) {
- LoggerUtil.error(defaultLogger, e, "订单成长值消息消费失败", userId);
- return Action.ReconsumeLater;
- } finally {
- if (Objects.nonNull(key)) {
- redisCacheManager.releaseLock(key, requestId);
- }
- }
- @Component
- public class DefaultMQProducerTemplate implements InitializingBean {
-
- @Value("${rocketmq.consumer.accessKey}")
- private String accessKey;
-
- @Value("${rocketmq.consumer.secretKey}")
- private String secretKey;
-
- @Value("${rocketmq.consumer.onsAddr}")
- private String nameServer;
-
- private static final Logger logger = LoggerUtil.ROCKET_MQ;
-
- private static final Map
producerGroupMap = Maps.newHashMap(); -
- private static final Map
orderProducerGroupMap = Maps.newHashMap(); -
- @Override
- public void afterPropertiesSet() {
- for (ProducerGroup producerGroup : ProducerGroup.values()) {
- Properties properties = new Properties();
- properties.setProperty(PropertyKeyConst.GROUP_ID, producerGroup.getCode());
- properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
- properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
- properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServer);
- if (ConsumeMode.CONCURRENTLY == producerGroup.getConsumeMode()) {
- // 创建普通生产者
- Producer producer = ONSFactory.createProducer(properties);
- producer.start();
- producerGroupMap.put(producerGroup.getCode(), producer);
- } else {
- // 创建顺序生产者
- OrderProducer producer = ONSFactory.createOrderProducer(properties);
- producer.start();
- orderProducerGroupMap.put(producerGroup.getCode(), producer);
- }
- }
- }
-
- private static void validate(final Message message, final Admin admin) {
- Assert.isTrue(Objects.nonNull(admin), "生产者不存在");
- Assert.isTrue(Objects.nonNull(message), "消息不能为空");
- }
-
- /**
- * 发送顺序消息
- *
- * @param message 要发送的消息
- * @param shardingKey 全局顺序消息,该字段可以设置为任意非空字符串。分区顺序消息中区分不同分区的关键字段,sharding key于普通消息的key是完全不同的概念。
- * @param producerGroup 生产者组
- */
- public SendResult sendOrder(final Message message, String shardingKey, final ProducerGroup producerGroup) {
- OrderProducer orderProducer = orderProducerGroupMap.get(producerGroup.getCode());
- validate(message, orderProducer);
- try {
- // 同步发送消息,只要不抛异常就是成功
- return orderProducer.send(message, shardingKey);
- } catch (ONSClientException e) {
- // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
- }
- return null;
- }
-
- public SendResult send(final Message message, final ProducerGroup producerGroup) {
- return send(message, producerGroup, true);
- }
-
- public SendResult sendDelayMessage(final Message message, final LocalDateTime startDeliverTime, final ProducerGroup producerGroup) {
- // 设置消息需要被投递的时间
- message.setStartDeliverTime(LocalDateTimeUtil.getMilliByTime(startDeliverTime));
- return send(message, producerGroup, true);
- }
-
- /**
- * 发送同步消息
- *
- * @param message 要发送的消息
- * @param producerGroup 生产者组
- * @param compensate 是否持久化
- */
- public SendResult send(final Message message, final ProducerGroup producerGroup, final boolean compensate) {
- Producer producer = producerGroupMap.get(producerGroup.getCode());
- validate(message, producer);
- // 需要重试
- int retryTimes = 1;
- int maxRetryTimes = 3;
- String traceId = UUID.randomUUID().toString();
- do {
- try {
- // 同步发送消息,只要不抛异常就是成功
- if (retryTimes == 1) {
- LoggerUtil.info(logger, "[RocketMQSend]", "start", traceId, producerGroup.getCode(), message.getTopic(), message);
- }
- SendResult sendResult = producer.send(message);
- LoggerUtil.info(logger, "[RocketMQSend]", "success", traceId, producerGroup.getCode(), sendResult.getTopic(), sendResult.getMessageId());
- return sendResult;
- } catch (ONSClientException e) {
- // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
- LoggerUtil.warn(logger, e, "[RocketMQSend]", "failed", traceId, producerGroup.getCode(), message.getTopic(), "retryTimes=" + retryTimes);
- if (retryTimes >= maxRetryTimes) {
- if (compensate) {
- messageCompensation(message, producerGroup);
- return new SendResult();
- } else {
- throw new ONSClientException("超出重试次数");
- }
- }
- retryTimes++;
- }
- } while (true);
- }
-
- /**
- * 发送消息,异步Callback形式
- *
- * @param message 要发送的消息
- * @param producerGroup 生产者组
- */
- public void sendAsync(final Message message, final ProducerGroup producerGroup) {
- String traceId = UUID.randomUUID().toString();
- LoggerUtil.info(logger, "[RocketMQSendAsync]", "start", traceId, producerGroup.getCode(), message.getTopic(), message);
- Producer producer = producerGroupMap.get(producerGroup.getCode());
- validate(message, producer);
- SendCallback sendCallback = new SendCallback() {
- @Override
- public void onSuccess(final SendResult sendResult) {
- // 消费发送成功
- LoggerUtil.info(logger, "[RocketMQSendAsync]", "success", traceId, sendResult.getTopic(), sendResult.getMessageId());
- }
-
- @Override
- public void onException(OnExceptionContext context) {
- // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
- LoggerUtil.warn(logger, context.getException(), "[RocketMQSendAsync]", "failed", traceId, producerGroup.getCode(), context.getTopic(), context.getMessageId());
- messageCompensation(message, producerGroup);
- }
- };
- producer.sendAsync(message, sendCallback);
- }
-
-
- /**
- * 发送消息,OneWay形式,服务器不应答,无法保证消息是否成功到达服务器
- *
- * @param message 要发送的消息
- * @param producerGroup 生产者组
- */
- public void sendOneWay(final Message message, final ProducerGroup producerGroup) {
- Producer producer = producerGroupMap.get(producerGroup.getCode());
- validate(message, producer);
- producer.sendOneway(message);
- }
-
- /**
- * 持久化MQ消息
- *
- * @param message
- * @param producerGroup
- */
- private void messageCompensation(Message message, ProducerGroup producerGroup) {
- String messageBody = new String(message.getBody());
- MQMessageCompensationDO compensationDO = new MQMessageCompensationDO(producerGroup.getCode(), message.getTopic(), message.getTag(), message.getKey(), messageBody);
- int success = mqMessageCompensationMapper.insertMessage(compensationDO);
- if (success <= 0) {
- LoggerUtil.error(logger, "[RocketMQSend]持久化失败");
- throw new ONSClientException("持久化失败");
- }
- }
- }
- import com.aliyun.openservices.ons.api.Consumer;
- import com.aliyun.openservices.ons.api.MessageListener;
- import com.aliyun.openservices.ons.api.ONSFactory;
- import com.aliyun.openservices.ons.api.PropertyKeyConst;
- import com.aliyun.openservices.ons.api.order.MessageOrderListener;
- import com.aliyun.openservices.ons.api.order.OrderConsumer;
- import com.google.common.collect.Maps;
- import org.apache.logging.log4j.Logger;
- import org.springframework.aop.framework.AopProxyUtils;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.SmartInitializingSingleton;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.context.ConfigurableApplicationContext;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.Map;
- import java.util.Properties;
-
- @Configuration
- public class ConsumeConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
-
- private static final Logger logger = LoggerUtil.COMMON_DEFAULT;
-
- @Value("${rocketmq.consumer.accessKey}")
- private String accessKey;
-
- @Value("${rocketmq.consumer.secretKey}")
- private String secretKey;
-
- @Value("${rocketmq.consumer.onsAddr}")
- private String nameServer;
-
- private ConfigurableApplicationContext applicationContext;
-
- private Map
/*consumerGroup*/, Map/*topic*/, Object>> consumerGroupMap = Maps.newHashMap(); -
- private Map
/*consumerGroup*/, Map/*topic*/, Object>> orderConsumerGroupMap = Maps.newHashMap(); -
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext) applicationContext;
- }
-
- @Override
- public void afterSingletonsInstantiated() {
- Map
beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); - beans.forEach(this::registerConsumer);
- createConsumer();
- }
-
- private void registerConsumer(String beanName, Object bean) {
- Class> clazz = AopProxyUtils.ultimateTargetClass(bean);
- RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
- String consumerGroup = annotation.consumerGroup();
- String topic = annotation.topic();
- Map
> groupMap; - if (ConsumeMode.CONCURRENTLY == annotation.consumeMode()) {
- groupMap = consumerGroupMap;
- if (!MessageListener.class.isAssignableFrom(bean.getClass())) {
- throw new IllegalStateException(clazz + " is not instance of " + MessageListener.class.getName());
- }
- } else {
- groupMap = orderConsumerGroupMap;
- if (!MessageOrderListener.class.isAssignableFrom(bean.getClass())) {
- throw new IllegalStateException(clazz + " is not instance of " + MessageOrderListener.class.getName());
- }
- }
- if (!groupMap.containsKey(consumerGroup)) {
- groupMap.put(consumerGroup, Maps.newHashMap());
- }
- // 指针引用
- Map
messageListenerMap = groupMap.get(consumerGroup); -
- // 判断该consumerGroup是否有重复topic
- if (messageListenerMap.keySet().contains(topic)) {
- throw new IllegalStateException("[" + topic + "]" + " topic conflicts with existing," + bean.toString());
- }
- messageListenerMap.put(topic, bean);
- LoggerUtil.info(logger, "RegisterRocketMQ", beanName, annotation);
- }
-
- private void createConsumer() {
- // 普通消息订阅
- for (Map.Entry
> entry : consumerGroupMap.entrySet()) { - String consumerGroup = entry.getKey();
- Map
messageListenerMap = entry.getValue(); - Properties consumerProperties = createProperties(consumerGroup);
- Consumer consumer = ONSFactory.createConsumer(consumerProperties);
- // 订阅多个topic
- for (Map.Entry
topicEntry : messageListenerMap.entrySet()) { - Object bean = topicEntry.getValue();
- Class> clazz = AopProxyUtils.ultimateTargetClass(bean);
- RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
- consumer.subscribe(annotation.topic(), annotation.selectorExpression(), (MessageListener) bean);
- }
- consumer.start();
- LoggerUtil.info(logger, "Consumer start success.", consumerGroup);
- }
-
- // 顺序消息订阅
- for (Map.Entry
> entry : orderConsumerGroupMap.entrySet()) { - String consumerGroup = entry.getKey();
- Map
messageListenerMap = entry.getValue(); - Properties consumerProperties = createProperties(consumerGroup);
- OrderConsumer consumer = ONSFactory.createOrderedConsumer(consumerProperties);
- // 订阅多个topic
- for (Map.Entry
topicEntry : messageListenerMap.entrySet()) { - Object bean = topicEntry.getValue();
- Class> clazz = AopProxyUtils.ultimateTargetClass(bean);
- RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
- consumer.subscribe(annotation.topic(), annotation.selectorExpression(), (MessageOrderListener) bean);
- }
- consumer.start();
- LoggerUtil.info(logger, "Order Consumer start success.", consumerGroup);
- }
- }
-
- private Properties createProperties(String consumerGroup) {
- Properties consumerProperties = new Properties();
- consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, consumerGroup);
- consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
- consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
- consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServer);
- return consumerProperties;
- }
- }