• 【RocketMQ生产者发送消息的三种方式:发送同步消息、异步消息、单向消息&案例实战&详细学习流程】


    一.知识回顾:

    【0.RocketMQ专栏的内容在这里哟,帮你整理好了,更多内容持续更新中】
    【1.Docker安装部署RocketMQ消息中间件详细教程】


    知识补充:
    在使用RocketMQ发送消息之前需要我们先完成之前RocketMQ的安装,然后在Maven导入依赖并熟悉生产者发送消息的API。

    xml文件中导入依赖

    <dependency>
    <groupId>org.apache.rocketmqgroupId>
    <artifactId>rocketmq-clientartifactId>
    <version>4.8.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    生产者发送消息步骤

    1. 创建消息生产者producer,并指定生产者组名
    2. 指定Nameserver地址
    3. 启动producer
    4. 创建消息对象,指定Topic、Tag和消息体
    5. 发送消息
    6. 关闭生产者producer

    二.RocketMQ发送同步消息

    2.1 同步消息

    同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
    在这里插入图片描述

    2.2 生产者同步发送消息实现代码

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class SyncProducer {
        public static void main(String[] args) throws Exception{
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("group_test");
    
            // 设置NameServer的地址
            producer.setNamesrvAddr("IP地址:9876");
            //producer.setSendLatencyFaultEnable(true);
    
            // 启动Producer实例
            producer.start();
    
    
            for (int i = 0; i < 10; i++) {
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 发送消息到一个Broker
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            //如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    2.3 生产者同步发送消息运行结果

    在这里插入图片描述
    运行结果相关参数分析

    1. msgId
      消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息。
    2. sendStatus
      发送的标识:成功,失败等
    3. queueId
      queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。
    4. queueOffset
      Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。

    2.4 生产者同步发送消息控制台展示

    在这里插入图片描述

    在这里插入图片描述

    三.RocketMQ发送异步消息

    3.1 异步消息

    异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
    在这里插入图片描述

    3.2 生产者异步发送消息实现代码

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class AsyncProducer {
        public static void main(String[] args) throws Exception{
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("group_test");
            // 设置NameServer的地址
            producer.setNamesrvAddr("IP地址:9876");
            // 启动Producer实例
            producer.start();
            for (int i = 0; i < 10; i++) {
                final int index = i;
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest", "TagA", "OrderID888",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%s%n", sendResult);
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            }
            Thread.sleep(10000);
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3.3 生产者异步发送消息运行结果

    在这里插入图片描述
    运行结果相关参数分析

    1. msgId
      消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息。
    2. sendStatus
      发送的标识:成功,失败等
    3. queueId
      queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。
    4. queueOffset
      Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。

    3.4 生产者异步发送消息控制台展示

    在这里插入图片描述

    四.RocketMQ发送单向消息

    4.1 单向消息

    这种方式主要用在不特别关心发送结果的场景,例如日志发送。单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
    在这里插入图片描述

    4.2 生产者单向发送消息代码实现

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class OnewayProducer {
        public static void main(String[] args) throws Exception{
            // 实例化消息生产者Producer对象
            DefaultMQProducer producer = new DefaultMQProducer("group_test");
            // 设置NameServer的地址
            producer.setNamesrvAddr("IP地址:9876");
            // 启动Producer实例
            producer.start();
            for (int i = 0; i < 10; i++) {
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 发送单向消息,没有任何返回结果
                producer.sendOneway(msg);
    
            }
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    4.3 生产者单向发送消息结果展示

    在这里插入图片描述

    4.4 生产者单向发送消息控制台展示

    在这里插入图片描述

    五.同步消息 vs 异步消息 vs 单向消息

    在这里插入图片描述

    好了,到这里【RocketMQ生产者发送消息的三种方式:发送同步消息、异步消息、单向消息&案例实战&详细学习流程】就先学习到这里,更多内容持续创作学习中,敬请期待呦^ - ^.

  • 相关阅读:
    PHP代码审计敏感函数合集
    Java Metrics系统性能监控工具
    Selenium环境+元素定位大法
    零日漏洞预防
    【DB运营管理/开发解决方案】上海道宁为您提供提高工作便利性的集成开发工具——Orange
    公钥密码学中的公钥和私钥
    【Flink源码】Flink心跳机制
    全面升级 | 两行代码,轻松搞定登录系统
    strcmp和stricmp,C 标准库 string.h
    vscode配置调用visual studio的编译和调试环境
  • 原文地址:https://blog.csdn.net/Coder_ljw/article/details/127667338