• SpringBoot集成Apache RocketMQ详解



    在这里插入图片描述

    0. 前言

    上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下
    如果已经安装了RocketMQ 学习环境可以略过此章节《【实践篇(一)】RocketMQ入门之学习环境搭建》
    本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证 在SpringBoot应用中展示如何使用Apache RocketMQ的生产者(Producer)进行消息发送。
    这段代码实现了以下类型的消息发送:
    使用Apache RocketMQ 官方的依赖库 RocketMQTemplate,实现同步、异步等消息。

    1. 同步消息:使用syncSend方法,生产者会等待消息服务器回复确认后才会继续发送下一条消息。

    2. 异步消息:使用asyncSend方法,生产者发送消息后不等待服务器回复,直接发送下一条消息。

    3. 单向消息:使用sendOneWay方法,生产者只负责发送消息,不等待服务器回复,也不关注发送结果。

    4. 顺序消息:使用sendOrderly方法,按照消息的发送顺序进行消费(First-In-First-Out)。

    5. 延迟消息:使用sendDelayed方法,消息被发送后,不会立即被消费,等待特定的延迟时间后,才能被消费。

    6. 批量消息:使用sendBatch方法,一次发送多条消息,可以有效提高发送的吞吐量。

    关于RocketMQ消息的消息模型介绍和使用,我专门写了一篇博客,搭建可以了解
    《RocketMQ 消息传递模型》https://blog.csdn.net/wangshuai6707/article/details/132863088

    1. Spring Boot 集成Apache RocketMQ详细步骤

    1.1.添加依赖

    在SpringBoot项目的pom.xml文件中添加RocketMQ的依赖:

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0modelVersion>
    	<parent>
    		<groupId>org.springframework.bootgroupId>
    		<artifactId>spring-boot-starter-parentartifactId>
    		<version>2.7.15version>
    		<relativePath/>
    	parent>
    	<groupId>com.icepip.projectgroupId>
    	<artifactId>springboot-icepip-rocketMQ-exampleartifactId>
    	<version>0.0.1-SNAPSHOTversion>
    	<name>springboot-icepip-rocketMQ-examplename>
    	<description>Spring boot 集成rocketMQ 示例description>
    	<properties>
    		<java.version>8java.version>
    	properties>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-webartifactId>
    		dependency>
    		<dependency>
    			<groupId>org.apache.rocketmqgroupId>
    			<artifactId>rocketmq-spring-boot-starterartifactId>
    			<version>2.0.4version>
    		dependency>
    
    		<dependency>
    			<groupId>org.projectlombokgroupId>
    			<artifactId>lombokartifactId>
    			<optional>trueoptional>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-testartifactId>
    			<scope>testscope>
    		dependency>
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-actuatorartifactId>
    		dependency>
    
    		<dependency>
    			<groupId>org.springframework.bootgroupId>
    			<artifactId>spring-boot-starter-thymeleafartifactId>
    		dependency>
    
    	dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.bootgroupId>
    				<artifactId>spring-boot-maven-pluginartifactId>
    				<configuration>
    					<excludes>
    						<exclude>
    							<groupId>org.projectlombokgroupId>
    							<artifactId>lombokartifactId>
    						exclude>
    					excludes>
    				configuration>
    			plugin>
    		plugins>
    	build>
    
    project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    1.2.配置RocketMQ

    application.properties文件中配置RocketMQ的相关信息:

    rocketmq.name-server=你的RocketMQ服务IP:9876
    rocketmq.producer.group=my-producer
    # 刚开始未配置 导致超时报错
    rocketmq.producer.sendMessageTimeout=10000
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.3.创建消息生产者(Producer)

    package com.icepip.project.mqtt.controller;
    
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.ArrayList;
    import java.util.List;
    /**
     *  SpringBoot集成Apache RocketMQ详解
     * @author 冰点
     * @version 1.0.0
     * @date 2023/9/9 17:02
     */
    
    @RestController
    @RequestMapping("/producer")
    public class ProducerController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
    
        /**
         * 同步发送消息到指定主题
         * @param message
         * @return
         */
        @GetMapping("/syncSend")
        public String syncSend(String message) {
            // 同步发送消息到指定主题
            rocketMQTemplate.syncSend("test-topic", message);
            return "Sync message: " + message + " sent";
        }
        /**
         * 异步发送消息到指定主题
         * @param message
         * @return
         */
        @GetMapping("/asyncSend")
        public String asyncSend(String message) {
            // 异步发送消息到指定主题
            rocketMQTemplate.asyncSend("test-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("Async message sent successfully, result: " + sendResult);
                }
    
                @Override
                public void onException(Throwable throwable) {
                    System.err.println("Failed to send async message: " + throwable.getMessage());
                }
            }, 3000, 3); // 3000 ms timeout, delay level 3
    
            return "Async message: " + message + " sent";
        }
    
        /**
         * 发送单向消息到指定主题,无需等待Broker的确认
         * @param message
         * @return
         */
        @GetMapping("/sendOneWay")
        public String sendOneWay(String message) {
            // 发送单向消息到指定主题,无需等待Broker的确认
            rocketMQTemplate.sendOneWay("test-topic", message);
            return "OneWay message: " + message + " sent";
        }
    
        // 发送顺序消息
        @GetMapping("/sendOrderly")
        public String sendOrderly(String message) {
            // 发送顺序消息到指定主题
            rocketMQTemplate.syncSendOrderly("test-topic", message, "order");
            return "Orderly message: " + message + " sent";
        }
    
        // 发送延迟消息
        @GetMapping("/sendDelayed")
        public String sendDelayed(String message) {
            // 发送延迟消息到指定主题,延迟级别为3
            rocketMQTemplate.syncSend("test-topic", MessageBuilder.withPayload(message).build(), 1000, 3);
            return "Delayed message: " + message + " sent";
        }
    
        // 发送批量消息
        @GetMapping("/sendBatch")
        public String sendBatch() {
            List<String> messages = new ArrayList<>();
            messages.add("message1");
            messages.add("message2");
            // 批量发送消息到指定主题
            rocketMQTemplate.syncSend("test-topic", messages);
            return "Batch messages sent";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    1.4.创建消息消费者(Consumer)

    package com.icepip.project.mqtt.handler;
    
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 定义一个消费者,监听test-topic主题的消息
     * @author 冰点
     * @version 1.0.0
     * @date 2023/9/9 16:29
     */
    
    @Service
    @RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
    public class MyConsumer implements RocketMQListener<String>{
    
        // 当收到消息时,该方法将被调用
        @Override
        public void onMessage(String message) {
            System.out.println("Received message: "+ message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2. 测试验证

    在这里插入图片描述
    在这里插入图片描述

    3. 常见报错

    1. See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6386]ms, Topic: test-topic, BrokersSent: [698f11314447, 698f11314447, 698f11314447]
      See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

    org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.8:10911> failed
    解决办法,修改Broker的IP为宿主机IP
    进容器修改配置文件,修改完启动服务 。启动之前先kill 掉容器里原来的Broker。
    nohup sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/broker.conf &
    在这里插入图片描述

    4. 参考文档

    1. 官方文档链接:https://rocketmq.apache.org/docs/

    2. GitHub链接:https://github.com/apache/rocketmq-spring

    5. 源码地址

    我的github https://github.com/wangshuai67/icepip-springboot-action-examples

  • 相关阅读:
    动手学习深度学习 09:循环神经网络
    第四章 数据库安全性
    日志、logback、logback.xml --java学习笔记
    Windows密码凭证获取学习
    函数栈帧的形成与释放
    Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0
    【Codeforces】 CF1097G Vladislav and a Great Legend
    cpu温度监测 Turbo Boost Switcher Pro for mac最新
    【概率论与数理统计】计算机保研复习
    Oracle 19c RAC OS的准备检查阶段
  • 原文地址:https://blog.csdn.net/wangshuai6707/article/details/132778960