【0.RocketMQ专栏的内容在这里哟,帮你整理好了,更多内容持续更新中】
【1.Docker安装部署RocketMQ消息中间件详细教程】
知识补充:
在使用RocketMQ发送消息之前需要我们先完成之前RocketMQ的安装,然后在Maven导入依赖并熟悉生产者发送消息的API。
xml文件中导入依赖
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-clientartifactId>
<version>4.8.0version>
dependency>
生产者发送消息步骤
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址
producer.setNamesrvAddr("IP地址:9876");
//producer.setSendLatencyFaultEnable(true);
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

运行结果相关参数分析


异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址
producer.setNamesrvAddr("IP地址:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

运行结果相关参数分析

这种方式主要用在不特别关心发送结果的场景,例如日志发送。单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer对象
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址
producer.setNamesrvAddr("IP地址:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}



好了,到这里【RocketMQ生产者发送消息的三种方式:发送同步消息、异步消息、单向消息&案例实战&详细学习流程】就先学习到这里,更多内容持续创作学习中,敬请期待呦^ - ^.