• Spring Boot对接RocketMQ示例


    部署服务

    参考RocketMq入门介绍

    示例

    引入maven依赖

    <dependency>
                <groupId>org.apache.rocketmqgroupId>
                <artifactId>rocketmq-spring-boot-starterartifactId>
                <version>2.2.2version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    完整依赖如下:

    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.7.9version>
            <relativePath/> 
        parent>
        <groupId>com.examplegroupId>
        <artifactId>rocketMqDemoartifactId>
        <version>0.0.1-SNAPSHOTversion>
        <name>rocketMqDemoname>
        <description>rocketMqDemodescription>
        <properties>
            <java.version>8java.version>
        properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>org.apache.rocketmqgroupId>
                <artifactId>rocketmq-spring-boot-starterartifactId>
                <version>2.2.2version>
            dependency>
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.bootgroupId>
                    <artifactId>spring-boot-maven-pluginartifactId>
                plugin>
            plugins>
        build>
    
    project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    修改application.properties文件

    配置文件如下:

    rocketmq.name-server=127.0.0.1:9876
    rocketmq.producer.group=my-group
    
    rocketmq.consumer.topic=test-topic
    
    • 1
    • 2
    • 3
    • 4

    所有的配置参考RocketMQProperties源码中配置。
    rocketmq.name-server:服务地址
    rocketmq.producer.group:生产者的组名称
    rocketmq.consumer.topic:消费者的主题名称

    定义生产者

    生产者是 Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。

    生产者通常被集成在业务系统中,将业务消息按照要求封装成 Apache RocketMQ 的消息(Message)并发送至服务端。

    生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。
    在这里插入图片描述
    代码示例如下:

    @Component
    public class RocketMqProducer {
    
        @Resource
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 发送同步消息
         * @param msg
         */
        public void sendSyncMsg(String msg){
            rocketMQTemplate.convertAndSend("test-topic-1", msg);
        }
    
        /**
         * 发送Spring消息
         * @param msg
         */
        public void sendSpringMsg(String msg){
            rocketMQTemplate.send("test-topic-1"
                    , MessageBuilder.withPayload(msg).build());
        }
    
        /**
         * 发送异步消息
         * @param msg
         */
        public void sendAsyncMsg(String msg){
            rocketMQTemplate.asyncSend("test-topic-1", new MsgBean(msg), new SendCallback() {
                @Override
                public void onSuccess(SendResult var1) {
                    System.out.printf("async onSucess SendResult=%s %n", var1);
                }
    
                @Override
                public void onException(Throwable var1) {
                    System.out.printf("async onException Throwable=%s %n", var1);
                }
    
            });
        }
    
        /**
         * 发送有序消息
         * @param msg
         */
        public void sendOrderlyMsg(String msg){
            rocketMQTemplate.syncSendOrderly("test-topic-1",MessageBuilder.withPayload(msg).build(),"hashkey");
    
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    定义消费者

    消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从 Apache RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。

    在消息消费端,可以定义如下传输行为:

    • 消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。

    • 消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括PushConsumer类型、SimpleConsumer类型、PullConsumer类型(仅推荐流处理场景使用)等。具体信息,请参见消费者分类。

    • 消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。
      在这里插入图片描述

    代码示例如下:

    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-group")
    @Component
    public class RocketMqConsumer implements RocketMQListener<String> {
        public void onMessage(String message) {
            System.out.println("received message: "+ JSON.toJSONString(message));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    定义Controller调用消费者

    代码示例如下:

    @Controller
    public class RestController {
        @Autowired
        RocketMqProducer producer;
        
        @RequestMapping(value = "/sendSyncMsg")
        @ResponseBody
        public String sendSyncMsg(){
            producer.sendSyncMsg("hello word");
            return "ok";    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    [网鼎杯 2020 朱雀组]Nmap
    SSM+智慧养老服务平台 毕业设计-附源码211709
    视频号小店发展前景怎么样?
    Spark SQL自定义collect_list分组排序
    找工作小项目:day16-重构核心库、使用智能指针(1)
    Spring Security(三):直观体验OAuth2.0
    2022.6.28 Linux——线程安全
    数据库迁移-国产化-Oracle迁移至GBase8a(存储过程)
    DELTA热金属检测器维修V5G-JC-R1激光测量传感器/检测仪原理分析
    Matlab随机波动率SV、GARCH用MCMC马尔可夫链蒙特卡罗方法分析汇率时间序列
  • 原文地址:https://blog.csdn.net/Luck_gun/article/details/136285354