• 48. 从零开始学springboot: 接入RocketMQ


    前言

    微服务的架构越来越流行, 很多老旧项目面临着解耦重构, 复杂项目的解耦通常会引入一些中间件来帮助我们更好的完成工作, 本章, 我们就来通过实例了解下消息中间件的用法.

    市面上比较流行的消息中间件如下
    image.png

    因为鱼哥的项目上了阿里的云, 所以选择很简单, 就用RocketMQ即可, 看官们根据实际情况择优选择.

    RocketMQ

    注意, 本文使用了4.0sdk,截止到文章发表, ali已推出5.0SDK
    关于阿里的RocketMQ的介绍官方有详细的文档, 这里就不啰嗦了

    官方已SDK提供了简单的接入方式, 需要注意的是, 官方提供了两种协议的SDK

    image.png

    • HTTP协议
      采用RESTful风格,方便易用,快速接入,跨网络能力强. 支持Java、C++、.NET、Go、Python、Node.js和PHP七种语言客户端

    • TCP协议
      区别于HTTP简单的接入方式,提供更为专业、可靠、稳定的TCP协议的SDK接入服务. 支持的语言包括Java、C/C++ 以及.NET

    注意, 使用的话需要自费开通对应的服务, 通过后才能使用, 大部分管理功能都能在阿里云的控制台中找到.

    实例

    下面我们就springboot来实际使用下RocketMQ.
    为了较好的使用, 我们同时演示tcp何http两种方式

    pom引入依赖

    
    
        com.aliyun.openservices
        ons-client
        ${ali-mq-tcp.version}
    
    
    
        com.aliyun.mq
        mq-http-sdk
        ${ali-mq-http.version}
        jar-with-dependencies
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    接入tcp方式

    定义mq配置类

    @Getter
    @Setter
    @Component
    @ConfigurationProperties(prefix = "aliyun.rocketmq.tcp")
    public class MqTcpProperties {
    
        //AccessKey ID,身份验证,在RAM控制台创建
        private String accessKeyId;
    
        //AccessKey Secret,身份验证,在RAM控制台创建
        private String accessKeySecret;
    
        //实例TCP协议接入地址(内网)
        private String nameSrvAddr;
    
        //发送超时时间: 3s
        private String sendMsgTimeoutMillis = "3000";
    
        //线程数目:默认20
        private String consumeThreadNums = "20";
    
        //订阅方式: 集群
        private String messageModel = "CLUSTERING";
    
    
        public Properties getMqProperties() {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId);
            properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret);
            properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
            // 设置发送超时时间,单位毫秒
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
            // 设置消费者线程数为20个(默认20)
            properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);
            // 订阅方式设置
            properties.put(PropertyKeyConst.MessageModel, this.messageModel);
            return properties;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    定义Topic配置类

    @Getter
    @Setter
    @Configuration
    @ConfigurationProperties(prefix = "aliyun.rocketmq.tcp")
    public class MqTcpTopicProperties {
        private String topic;
        private String groupId;
        private String tag;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    封装一个工具类

    @Slf4j
    public class MqTcpUtil {
    
        @Autowired
        private ProducerBean producer;
    
        @Autowired
        private MqTcpProperties mqTcpProperties;
    
        @Autowired
        private MqTcpTopicProperties mqTcpTopicProperties;
    
        private ConsumerBean consumerBean;
    
        /**
         * 同步发送消息 - 配置默认topic
         *
         * @param msgTag      标签,可用于消息小分类标注
         * @param messageBody 消息body内容,生产者自定义内容
         * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
         * @return success:SendResult or error:null
         */
        public SendResult sendMsg(String msgTag, String messageBody, String msgKey) {
            Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes());
            return this.send(msg, Boolean.FALSE);
        }
    
        /**
         * 同步发送消息 - 配置默认topic - 重试次数
         *
         * @param msgTag      标签,可用于消息小分类标注
         * @param messageBody 消息body内容,生产者自定义内容
         * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
         * @param retryTimes  重试次数,注意实际请求次数为 retryTimes + 1
         * @return success:SendResult or error:null
         */
        public SendResult sendMsg(String msgTag, String messageBody, String msgKey, Integer retryTimes) {
            Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes());
            SendResult result = this.send(msg, Boolean.FALSE);
            if (ObjectUtil.isNotEmpty(result) || retryTimes == 0) {
                return result;
            }
            return this.sendMsg(msgTag, messageBody, msgKey, --retryTimes);
        }
    
        /**
         * 同步发送消息
         *
         * @param topic       topic名
         * @param msgTag      标签,可用于消息小分类标注
         * @param messageBody 消息body内容,生产者自定义内容
         * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
         * @return success:SendResult or error:null
         */
        public SendResult sendMsg(String topic, String msgTag, String messageBody, String msgKey) {
            Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes());
            return this.send(msg, Boolean.FALSE);
        }
    
        /**
         * 同步发送单向消息
         *
         * @param topic       topic名
         * @param msgTag      标签,可用于消息小分类标注
         * @param messageBody 消息body内容,生产者自定义内容
         * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
         */
        public void sendOneWayMsg(String topic, String msgTag, String messageBody, String msgKey) {
            Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes());
            this.send(msg, Boolean.TRUE);
        }
    
    
        /**
         * 发送普通消息
         *
         * @param msg      消息
         * @param isOneWay 是否单向发送
         */
        private SendResult send(Message msg, Boolean isOneWay) {
            try {
                if (isOneWay) {
                    //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
                    //若数据不可丢,建议选用同步或异步发送方式。
                    producer.sendOneway(msg);
                    success(msg, "单向消息MsgId不返回");
                    return null;
                } else {
                    //可靠同步发送
                    SendResult sendResult = producer.send(msg);
                    //获取发送结果,不抛异常即发送成功
                    assert sendResult != null;
                    success(msg, sendResult.getMessageId());
                    return sendResult;
                }
            } catch (Exception e) {
                error(msg, e);
                return null;
            }
        }
    
        /**
         * 成功日志打印
         *
         * @param msg
         * @param messageId
         */
        private void success(Message msg, String messageId) {
            log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                    , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody()));
        }
    
        /**
         * 异常日志打印
         *
         * @param msg
         * @param e
         */
        private void error(Message msg, Exception e) {
            log.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
                    , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody()));
            log.error("errorMsg", e);
        }
    
    
        @PostConstruct
        public void init() {
            log.info("[Init] tcp consumerBean init");
            consumerBean = new ConsumerBean();
        }
    
        public ConsumerBean getConsumer() {
            return consumerBean;
        }
    
        public ConsumerBean getDefaultConsumer(MessageListener messageListener) {
            //配置文件
            Properties properties = mqTcpProperties.getMqProperties();
            //消费者
            properties.setProperty(PropertyKeyConst.GROUP_ID, mqTcpTopicProperties.getGroupId());
            //设置消费者线程数为20个(默认20)
            properties.setProperty(PropertyKeyConst.ConsumeThreadNums, mqTcpProperties.getConsumeThreadNums());
            // 广播订阅方式设置
            properties.put(PropertyKeyConst.MessageModel, mqTcpProperties.getMessageModel());
            consumerBean.setProperties(properties);
    
            //订阅消息
            Map subscriptionTable = new HashMap<>();
            //订阅消息
            Subscription smsSubscription = new Subscription();
            smsSubscription.setTopic(mqTcpTopicProperties.getTopic());
            smsSubscription.setExpression(mqTcpTopicProperties.getTag());
            subscriptionTable.put(smsSubscription, messageListener);
            consumerBean.setSubscriptionTable(subscriptionTable);
            return consumerBean;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157

    自动注入类

    /**
     * rocketMq服务自动配置类
     * 业务系统要使用rocketMq服务服务,需要在配置文件中增加如下配置
     * 【
     * aliyun.rocketmq.tcp.enable=true
     * aliyun.rocketmq.tcp.accessKeyId=
     * aliyun.rocketmq.tcp.accessKeySecret=
     * aliyun.rocketmq.tcp.nameSrvAddr=
     * aliyun.rocketmq.tcp.default.topic=
     * aliyun.rocketmq.tcp.default.groupId=
     * aliyun.rocketmq.tcp.default.tag=
     */
    @Slf4j
    @Configuration
    @EnableConfigurationProperties({MqTcpProperties.class, MqTcpTopicProperties.class})
    @ConditionalOnClass({ProducerBean.class, ConsumerBean.class})
    @ConditionalOnProperty(prefix = "aliyun.rocketmq.tcp", value = "enable", havingValue = "true")
    public class MqTcpDefaultAutoConfiguration {
    
        @Autowired
        private MqTcpProperties mqTcpProperties;
    
        @PostConstruct
        public void init() {
            log.info("[Auto Config] MqTcpDefaultAutoConfiguration loading......");
        }
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        @ConditionalOnMissingBean
        public ProducerBean buildProducer() {
            ProducerBean producer = new ProducerBean();
            producer.setProperties(mqTcpProperties.getMqProperties());
            return producer;
        }
    
        @Bean
        public MqTcpUtil mqTcpUtil() {
            return new MqTcpUtil();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    核心功能基本完成, 我们可以通过MqTcpUtil执行消息的发送了
    新增个测试类

    @Test
    public void mqTcpTest() {
        // 自定义一条body内容
        JSONObject body = new JSONObject();
        UUID uuid = UUID.randomUUID();
        body.put("notice", "这是一条tcp通知类信息");
        //同步发送消息-不带返回值的(一般使用该方法)
        log.info(String.valueOf(mqTcpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid))));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    最后我们加上Ali RocketMq的配置即可

    ## AK
    aliyun.rocketmq.tcp.enable=true
    aliyun.rocketmq.tcp.accessKeyId=
    aliyun.rocketmq.tcp.accessKeySecret=
    
    ## TCP
    aliyun.rocketmq.tcp.nameSrvAddr=
    aliyun.rocketmq.tcp.sendMsgTimeoutMillis=3000
    aliyun.rocketmq.tcp.consumeThreadNums=20
    aliyun.rocketmq.tcp.messageModel=CLUSTERING
    
    ## topic
    aliyun.rocketmq.tcp.topic=
    aliyun.rocketmq.tcp.groupId=
    aliyun.rocketmq.tcp.tag=
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    消费者

    以上完成了核心功能的接入与开发, 我们已经能正常发送MQ消息了, 通俗点讲就是生产者就绪了, 我们还需要消费者去消费这些消息.

    合理的拆分是生产者和消费者分服务部署, 这里为了演示, 鱼哥就直接自产自销了.

    定义消费者监听器MqMessageListener

    @Slf4j
    @Component
    public class MqMessageListener implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            log.info("接收到MQ详细信息:{}", message);
            log.info("解析MQ-Body自定义内容:{}", new String(message.getBody()));
            try {
                //do something..
                return Action.CommitMessage;
            } catch (Exception e) {
                log.error("消费MQ消息失败,msgId:" + message.getMsgID() + ",ExceptionMsg:" + e.getMessage());
                return Action.ReconsumeLater;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    申明消费者类, 这里我们定义两种消费者, 一种是默认配置的, 一种是主定义配置的,两者选一即可

    @Component
    public class MqConsumerClient {
    
    //    @Autowired
    //    private MqTcpProperties mqTcpProperties;
    
        @Autowired
        private MqMessageListener mqMessageListener;
    
    //    @Autowired
    //    private MqTcpTopicProperties mqTcpTopicProperties;
    
        @Autowired
        private MqTcpUtil mqTcpUtil;
    
    //    //自定义消费者
    //    @Bean(initMethod = "start", destroyMethod = "shutdown")
    //    public ConsumerBean messageBuildConsumer() {
    //        ConsumerBean consumerBean = mqTcpUtil.getConsumer();
    //        //配置文件
    //        Properties properties = mqTcpProperties.getMqProperties();
    //        //消费者
    //        properties.setProperty(PropertyKeyConst.GROUP_ID, mqTopicProperties.getGroupId());
    //        //设置消费者线程数为20个(默认20)
    //        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
    //        consumerBean.setProperties(properties);
    //        //订阅消息
    //        Map subscriptionTable = new HashMap<>();
    //        // 广播订阅方式设置
    //        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    //        //订阅消息
    //        Subscription smsSubscription = new Subscription();
    //        smsSubscription.setTopic(mqTopicProperties.getTopic());
    //        smsSubscription.setExpression(mqTopicProperties.getTag());
    //        subscriptionTable.put(smsSubscription, mqMessageListener);
    //        consumerBean.setSubscriptionTable(subscriptionTable);
    //        return consumerBean;
    //    }
    
        // 默认消费者
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ConsumerBean defaultConsumer() {
            ConsumerBean consumerBean = mqTcpUtil.getDefaultConsumer(mqMessageListener);
            return consumerBean;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    最后启动服务, 即可看到消息消费的输出信息

    image.png
    image.png

    接入http方式

    定义mq配置类

    @Getter
    @Setter
    @Configuration
    @ConfigurationProperties(prefix = "aliyun.rocketmq.http")
    public class MqHttpProperties {
    
        //AccessKey ID,身份验证,在RAM控制台创建
        private String accessKeyId;
    
        //AccessKey Secret,身份验证,在RAM控制台创建
        private String accessKeySecret;
    
        //实例TCP协议接入地址(内网)
        private String nameSrvAddr;
    
        //实例TCP协议接入地址(内网)
        private String sendMsgTimeoutMillis = "3000";
    
        //线程数目:默认20
        private String consumeThreadNums = "20";
    
        //线程数目:默认20
        private String messageModel = "CLUSTERING";
    
        public Properties getMqProperties() {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId);
            properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret);
            properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
            // 设置发送超时时间,单位毫秒
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
            // 设置消费者线程数为20个(默认20)
            properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);
            // 广播订阅方式设置
            properties.put(PropertyKeyConst.MessageModel, this.messageModel);
            return properties;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    定义Topic配置类

    @Getter
    @Setter
    @Configuration
    @ConfigurationProperties(prefix = "aliyun.rocketmq.http")
    public class MqHttpTopicProperties {
        private String topic;
        private String groupId;
        private String tag;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    定义操作工具类

    @Slf4j
    public class MqHttpUtil {
    
        @Autowired
        private MqHttpProperties mqHttpProperties;
    
        @Autowired
        private MqHttpTopicProperties mqHttpTopicProperties;
    
        private MQProducer producer;
    
        private MQConsumer consumer;
    
        private MQClient mqClient;
    
        @PostConstruct
        public void init() {
            log.info("[Init] http producer init");
            mqClient = new MQClient(
                    // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
                    mqHttpProperties.getNameSrvAddr(),
                    // AccessKey ID,身份验证,在阿里云RAM控制台创建。
                    mqHttpProperties.getAccessKeyId(),
                    // AccessKey Secret,身份验证,在阿里云RAM控制台创建。
                    mqHttpProperties.getAccessKeySecret()
            );
        }
    
        /**
         * 同步发送消息
         *
         * @param topic       topic名
         * @param msgTag      标签,可用于消息小分类标注
         * @param messageBody 消息body内容,生产者自定义内容
         * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
         * @return success:SendResult or error:null
         */
        public TopicMessage sendMsg(String topic, String msgTag, String messageBody, String msgKey) {
            producer = mqClient.getProducer(topic);
            TopicMessage msg = new TopicMessage(
                    messageBody.getBytes(),
                    msgTag);
            msg.setMessageId(msgKey);
    
            try {
                //可靠同步发送
                TopicMessage sendResult = producer.publishMessage(msg);
                //获取发送结果,不抛异常即发送成功
                assert sendResult != null;
                success(topic, msg);
                return sendResult;
    
            } catch (Exception e) {
                error(topic, msg, e);
                return null;
            }
        }
    
        public MQClient getMqClient() {
            return mqClient;
        }
    
        public MQConsumer getDefaultConsumer() {
            consumer = mqClient.getConsumer(mqHttpTopicProperties.getTopic(), mqHttpTopicProperties.getGroupId(), mqHttpTopicProperties.getTag());
            return consumer;
        }
    
    
        /**
         * 成功日志打印
         *
         * @param msg
         */
        private void success(String topic, TopicMessage msg) {
            log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , tag:{}, body:{}"
                    , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString());
        }
    
        /**
         * 异常日志打印
         *
         * @param msg
         * @param e
         */
        private void error(String topic, TopicMessage msg, Exception e) {
            log.error("发送MQ消息失败-- Topic:{}, msgId:{}, tag:{}, body:{}"
                    , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString());
            log.error("errorMsg", e);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    自动注入

    /**
     * rocketMq服务自动配置类
     * 业务系统要使用rocketMq服务服务,需要在配置文件中增加如下配置
     * 【
     * aliyun.rocketmq.http.enable=true
     * aliyun.rocketmq.http.accessKeyId=
     * aliyun.rocketmq.http.accessKeySecret=
     * aliyun.rocketmq.http.nameSrvAddr=
     * aliyun.rocketmq.http.default.topic=
     * aliyun.rocketmq.http.default.groupId=
     * aliyun.rocketmq.http.default.tag=
     */
    @Slf4j
    @Configuration
    @ConditionalOnProperty(prefix = "aliyun.rocketmq.http", value = "enable", havingValue = "true")
    public class MqHttpDefaultAutoConfiguration {
    
        @PostConstruct
        public void init() {
            log.info("[Auto Config] MqHttpDefaultAutoConfiguration loading......");
        }
    
        @Bean
        public MqHttpUtil mqHttpUtil() {
            return new MqHttpUtil();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    最后测试下

        @Test
        public void mqHttpTest() {
            // 自定义一条body内容
            JSONObject body = new JSONObject();
            UUID uuid = UUID.randomUUID();
            body.put("notice", "这是一条http通知类信息");
            //同步发送消息-不带返回值的(一般使用该方法)
            log.info(String.valueOf(mqHttpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid))));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    最后增加配置

    #***********************MQ-http*********************************
    ## AK
    aliyun.rocketmq.http.enable=true
    aliyun.rocketmq.http.accessKeyId=
    aliyun.rocketmq.http.accessKeySecret=
    
    ## HTTP
    aliyun.rocketmq.http.nameSrvAddr=
    aliyun.rocketmq.http.sendMsgTimeoutMillis=3000
    aliyun.rocketmq.http.consumeThreadNums=20
    aliyun.rocketmq.http.messageModel=CLUSTERING
    
    ## topic
    aliyun.rocketmq.http.topic=topic
    aliyun.rocketmq.http.groupId=group_dev
    aliyun.rocketmq.http.tag=tag_dev
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    运行测试
    image.png
    image.png

    项目地址

    注意: 案例需要配置你自己申请的RocketMQ配置,否则无法启动.

    https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-alibaba-rocketmq

    请关注我的订阅号

    订阅号.png

  • 相关阅读:
    异常处理(try,catch,finally)
    【EMQX】2.1.4 EMQ X 有哪些产品
    UMA 2 - Unity Multipurpose Avatar☀️三.给UMA设置默认服饰Recipes
    【SA8295P 源码分析】98 - MARVELL 88Q5152 Switch 芯片介绍
    Unity减少发布打包文件的体积——获取精灵图片的信息限制它的大小
    Java 全新生态的框架,Solon v1.10.12 发布
    java接口返回图片或pdf如何设置在线预览还是下载
    【JMeter入门】—— JMeter介绍
    智能家居—ESP32开发环境搭建
    UE4贴图自适应屏幕大小
  • 原文地址:https://blog.csdn.net/MrCoderStack/article/details/126287453