• springcloudalibaba架构(23):RocketMQ普通消息和顺序消息


    前言

    RocketMQ发送不同的普通消息和顺序消息。

    1. 发送消息的方式

    RocketMQ提供了三种方式发送普通消息:可靠同步发送、可靠异步发送和单向发送。

    • 可靠同步发送
      同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
      此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
    • 可靠异步发送
      异步发送是指发送方发送数据后,不等接收方发送响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
      异步发送一般用于链路耗时较长,对RT响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
    • 单向发送
      单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。适用于某些耗时非常短,但对可靠性要求不高的场景,例如日志收集。

    2. 配置和依赖

    1. 引入依赖
      接下来RocketMQ单元测试需要引用的依赖
    <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
            dependency>
            
            <dependency>
                <groupId>org.apache.rocketmqgroupId>
                <artifactId>rocketmq-spring-boot-starterartifactId>
                <version>2.2.2version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 配置rocketmq
    rocketmq:
      name-server: localhost:9876
      producer:
        group: group_rocketmq #需要指定默认的组名
    
    • 1
    • 2
    • 3
    • 4

    3. 发送普通消息

    3.1 发送同步消息

    对于同步消息,我们可以获取到发送结果。

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class MessageTypeTest {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @Test
        public void testSyncSend(){
            SendResult result = rocketMQTemplate.syncSend("topic-1", "同步测试消息", 10000);
            System.out.println(result);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    运行测试用例,查看RocketMq-console控制台。
    在这里插入图片描述
    在这里插入图片描述

    添加tag标签

    SendResult result = rocketMQTemplate.syncSend("topic-1:mytag", "同步测试消息", 10000);
    
    • 1

    在这里插入图片描述
    在这里插入图片描述

    3.2 发送异步消息

    发送异步消息,通过回调方式获取发送结果。

    @Test
        public void testAsyncSend() throws Exception {
            rocketMQTemplate.asyncSend("topic-1:mytag", "异步测试消息", new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("成功回调消息");
                    System.out.println(sendResult);
                }
    
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("回调发生异常了");
                    System.out.println(throwable);
                }
            });
            System.out.println("-----------------");
            TimeUnit.SECONDS.sleep(300);//避免主线程死掉无法获取回调结果
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行测试用例
    在这里插入图片描述
    在这里插入图片描述

    3.3 单向异步消息

    直接发送消息,不等待结果,不保证消息一定送达,不回调。

     @Test
        public void testSendOneWay(){
            //不回调,不等待
            rocketMQTemplate.sendOneWay("topic-1:mytag","异步单向消息");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    运行测试用例
    在这里插入图片描述

    在这里插入图片描述

    4. 顺序消息

    我们查看前面已经发送的普通消息(主题topic-1),可以看到它有4个队列,消费者在消费时就没办法保证消息的顺序性。
    在这里插入图片描述

    4.1 发送单向顺序消息

    通过sendOneWayOrderly方法,将消息发送到同一个队列,即可实现消息的顺序性。

     @Test
        public void testSendOneWayOrderly(){
            for (int i=0;i<10;i++){
                //第三个参数的作用是用来决定这些消息发送到哪个队列
                rocketMQTemplate.sendOneWayOrderly("topic-1","异步单向顺序消息","xx");
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    运行发送消息后,查看RocketMq-console控制台,可以看到消息进入了同一个队列。
    在这里插入图片描述

    4.2 同步顺序消息和异步顺序消息

    同理,同步和异步的消息发送也提供了顺序发送的API,使用方式完全一样。
    在这里插入图片描述

  • 相关阅读:
    【好文鉴赏】面试官说你回答的不够深入,怎么办?
    [深度学习]yolov10+bytetrack+pyqt5实现目标追踪
    vim的IDE进阶之路
    方法引用知识点
    dubbo环境搭建ZooKeeper注册中心
    【小沐学NLP】关联规则分析Apriori算法(Mlxtend库,Python)
    【机器学习】使用scikitLearn对数据进行降维处理:PCA法及增量训练
    在Github上封神的JDK源码,看完竟吊打了面试官,厉害了
    ubuntu小技巧30--23.10桌面版安装钉钉启动报错undefined symbol: FT_Get_Color_Glyph_Layer
    JavaWeb后端
  • 原文地址:https://blog.csdn.net/u011628753/article/details/126321129