• Springboot整合阿里云ONS RocketMq(4.0 http)


    1. 引入依赖

    
    <dependency>
      <groupId>com.aliyun.openservicesgroupId>
      <artifactId>ons-clientartifactId>
      <version>1.8.4.Finalversion>
    dependency>
    

    2. 配置

    配置注意事项:

    1. nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
      image.png
    2. 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
    3. 订阅关系参考官方文档: 订阅关系一致
    4. 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
    aliyun:
      rocketmq:
        accessKey: LTAI5txxxxxxx
        secretKey: Afq06tBxrdBxxxxxxxx
        nameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80
        orderGroupId: GID_xxxxxx_test
        orderTag: 'order'
        orderTopic: vehicle-order-test
        marketGroupId: GID_xxxxxx2_test
        marketTag: 'market'
        marketTopic: vehicle-market-test
        vehicleGroupId: GID_xxxxxx3_test
        vehicleTag: 'vehicle'
        vehicleTopic: vehicle-order-test
    

    3. 配置类

    3.1 基本配置类

    package com.vehicle.manager.core.config;
    
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Properties;
    
    /**
     * Rocket MQ 配置类
     * @author zr 2024/3/1
     */
    @Configuration
    @ConfigurationProperties(prefix = "aliyun.rocketmq")
    @Data
    public class RocketMqConfig {
    
        private String accessKey;
        private String secretKey;
        private String nameSrvAddr;
        private String marketGroupId;
        private String marketTopic;
        private String marketTag;
        private String orderTopic;
        private String orderGroupId;
        private String orderTag;
        private String vehicleTopic;
        private String vehicleGroupId;
        private String vehicleTag;
    
        public Properties getMqPropertie() {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
            properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
            properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
            return properties;
        }
    }
    
    

    3.2 生产者配置

    package com.vehicle.manager.core.config;
    
    import com.aliyun.openservices.ons.api.bean.ProducerBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author zr 2024/3/1
     */
    @Configuration
    public class ProducerConfig {
        @Autowired
        private RocketMqConfig mqConfig;
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ProducerBean buildProducer() {
            ProducerBean producer = new ProducerBean();
            producer.setProperties(mqConfig.getMqPropertie());
            return producer;
        }
    }
    
    

    3.3 消费者配置

    package com.vehicle.manager.core.config;
    
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.bean.ConsumerBean;
    import com.aliyun.openservices.ons.api.bean.Subscription;
    import com.vehicle.manager.core.listener.VehicleListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    /**
     * RocketMq消费者
     * @author zr 2024/3/1
     */
    @Configuration
    public class VehicleConsumerConfig {
        @Autowired
        private RocketMqConfig mqConfig;
    
        @Autowired
        private VehicleListener vehicleListener;
    
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ConsumerBean buildVehicleBuyerConsumer() {
            ConsumerBean consumerBean = new ConsumerBean();
            //配置文件
            Properties properties = mqConfig.getMqPropertie();
            properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());
            //将消费者线程数固定为20个 20为默认值
            properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
            consumerBean.setProperties(properties);
            //订阅关系
            Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
            Subscription subscription = new Subscription();
            subscription.setTopic(mqConfig.getVehicleTopic());
            subscription.setExpression(mqConfig.getVehicleTag());
            subscriptionTable.put(subscription, vehicleListener);
            //订阅多个topic如上面设置
    
            consumerBean.setSubscriptionTable(subscriptionTable);
            return consumerBean;
        }
    }
    
    

    4. 生产者工具类

    • MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
    • 参数说明:
      • topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
      • tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
      • key – 业务主键
      • body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
    package com.vehicle.manager.core.util;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.bean.ProducerBean;
    import com.aliyun.openservices.ons.api.exception.ONSClientException;
    import com.vehicle.manager.core.config.RocketMqConfig;
    import com.vehicle.manager.core.mapper.MessageRecordMapper;
    import com.vehicle.manager.core.model.entity.MessageRecord;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.time.LocalDateTime;
    
    /**
     * RocketMessageProducer rocketMQ消息生产者
     * @author zr 2024/3/1
     */
    @Component
    @Slf4j
    public class RocketMessageProducer {
        private static ProducerBean producer;
        private static RocketMqConfig mqConfig;
    
        private  static MessageRecordMapper messageRecordMapper;
    
        @Autowired
        private  MessageRecordMapper messageRecordMapperInstance;
    
    
        @PostConstruct
        public void init() {
            RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;
        }
    
        public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {
            this.producer = producer;
            this.mqConfig = mqConfig;
        }
    
        /**
         * 生产车辆服务普通消息
         * @param tag
         * @param key
         * @param body
         */
        public  static void producerVehicleMsg(String tag, String key, String body) {
            Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
            long time = System.currentTimeMillis();
            try {
                SendResult sendResult = producer.send(msg);
                assert sendResult != null;
                log.info(time
                        + " Send mq message success.Topic is:" + msg.getTopic()
                        + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())
                        + " msgId is:" + sendResult.getMessageId());
    
                MessageRecord messageRecord = new MessageRecord();
                messageRecord.setPlatformType("mq");
                messageRecord.setMessageType("order");
                messageRecord.setMqMessageTopic(msg.getTopic());
                messageRecord.setMqMessageTag(msg.getTag());
                messageRecord.setMqMessageKey(msg.getKey());
                messageRecord.setMqMessageId(sendResult.getMessageId());
                messageRecord.setCreatedTime(LocalDateTime.now());
                messageRecord.setMessageContent(new String(msg.getBody()));
                messageRecordMapper.insert(messageRecord);
            } catch (ONSClientException e) {
                e.printStackTrace();
                log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
            }
        }
    
        /**
         * 生产车辆服务延时普通消息
         * @param tag  order:订单服务   vehicle:主要用于本服务的超时回应
         * @param key
         * @param body
         * @param delay 延迟秒
         */
        public  static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {
            Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
            long time = System.currentTimeMillis();
            msg.setStartDeliverTime(time+ delay*1000);
            try {
                SendResult sendResult = producer.send(msg);
                assert sendResult != null;
                log.info(time
                        + " 发送消息成功.Topic is:" + msg.getTopic()
                        + " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())
                        + " msgId 为:" + sendResult.getMessageId());
            } catch (ONSClientException e) {
                e.printStackTrace();
                log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
            }
        }
    }
    
    

    5. 消费者监听

    package com.vehicle.manager.core.listener;
    
    import com.alibaba.fastjson.JSON;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
    import com.vehicle.manager.core.service.HlCarService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    
    /**
     * @author zr 2024/3/1
     */
    @Component
    @Slf4j
    public class VehicleListener implements MessageListener {
        @Autowired
        private HlCarService hlCarService;
    
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
    
            log.info("VehicleReceive 消息: " + message);
    
            try {
                byte[] body = message.getBody();
                String s = new String(body);
                log.info(s);
                // VehicleMQMessageDTO需要自行根据业务封装
                VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);
                log.info(vehicleMQMessageDTO.toString());
    
                // 以下做你的业务处理
                // .........
                
                return Action.CommitMessage;//进行消息的确认
            } catch (Exception e) {
                log.info(e.getMessage());
                //消费失败
                return Action.ReconsumeLater;
            }
        }
    }
    
    

    6. 测试

    6.1 发送消息

    package com.vehicle.manager.core;
    
    import com.alibaba.fastjson.JSON;
    import com.vehicle.manager.api.StartApplication;
    import com.vehicle.manager.core.util.RocketMessageProducer;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    
    /**
     * @author zr 2024/3/1
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = StartApplication.class)
    public class MqTest {
        
        @Test
        public void producerMsg() {
            RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));
        }
    }
    
    

    6.2 接收消息

    image.png

    7. 延时消息

    如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg

  • 相关阅读:
    CCF-CSP真题《202209-3—防疫大数据》思路+python题解
    26 | 信任始于握手:TLS1.2连接过程解析
    计算机网络实验 第四次 11月1日
    清晰易懂IoC
    【设计模式】概述
    力扣 895. 最大频率栈
    【uniapp小程序】—— 配置首页搜索框
    uniapp实现移动端的视频图片轮播组件
    最新AI创作系统ChatGPT源码/支持国内AI模型/支持GPT4.0/支持AI绘画
    Jetson平台180度鱼眼相机畸变校正调试记录
  • 原文地址:https://blog.csdn.net/qq_31745863/article/details/139743731