上一篇rocketMQ文章介绍相关概念与理论,接下来看下整合springboot使用
如果是集群就先创建Topic
> sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes=null]
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.2.0version>
dependency>
server:
port: 8086
#rocketmq配置信息
rocketmq:
#nameservice服务器地址(多个以英文逗号隔开)
name-server: 服务器ip:9876
#生产者配置 【这些值应放在.properties文件中!】
producer:
# #组名
group: shen-producer-group
# send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
# compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
# max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
# retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
# retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
# retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
# #目的地(topic:tag)
# #topic
# topic: shen-topic
# #sync tag(同步消息tag)
# sync-tag: shen-sync-tags
# #async tag(异步消息tag)
# async-tag: shen-async-tags
# #oneway tag(单向消息tag)
# oneway-tag: shen-oneway-tags
logging:
file:
path: E:\workspace\log\mq_product\
# path: /usr/log/mqproductservice/mqproductservice.log
level:
root: INFO
com.anran.projectmanage.mapper: DEBUG
# 不同类型的消息使用不同的topic
rocketmq.producer.topic: shen-topic
rocketmq.producer.group: shen-producer-group
# sync tag(同步消息tag)
rocketmq.producer.sync-tag: TagS
#async tag(异步消息tag)
rocketmq.producer.async-tag: TagA
#oneway tag(单向消息tag)
rocketmq.producer.oneway-tag: TagO
@RestController
@RequestMapping("/rocket")
public class RocketController {
@Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.sync-tag}")
private String syncDestination;
@Value(value = "${rocketmq.producer.topic}")
private String syncTopic;
@Value(value = "${rocketmq.producer.sync-tag}")
private String syncTag;
@Autowired
private RocketMQTemplate rocketMQTemplate;
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
同步发送的整个代码流程如下:
/**
* 发送同步消息
* 消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式
* 可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
*/
@PostMapping("/syncSend")
public String syncSend(@RequestBody UserContent user) {
// 发送消息前插入数据库
// 发送消息 tags在topic之后用:拼接 自动解析
SendResult sendResult = rocketMQTemplate.syncSend(syncTopic + ":" + syncTag, JSONObject.toJSONString(user));
System.out.printf("同步发送字符串%s,发送结果:%s", user.toString(), sendResult);
// 发送消息后插入数据库
// 解析发送结果
// if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// response = sendResult.getMsgId() + " : " + sendResult.getSendStatus();
// }
return sendResult.getSendStatus().toString();
}
DefaultMQProducer 方式发送
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("sync_group"); //(1)
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876"); //(2)
// 启动producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("sync_topic" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
); //(3)
// 利用producer进行发送,并同步等待发送结果
SendResult sendResult = producer.send(msg); //(4)
System.out.printf("%s%n", sendResult);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
同步发送方式请务必捕获发送异常,并做业务侧失败兜底逻辑,如果忽略异常则可能会导致消息未成功发送的情况。
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
异步发送需要实现异步发送回调接口(SendCallback)。
消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
RocketMQTemplate方式发送
/**
* 异步发送
* 发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息
*/
@PostMapping("asyncSend")
public void asyncSend(@RequestBody UserContent user) {
System.out.printf("async send begin %s%n", LocalDateTime.now());
rocketMQTemplate.asyncSend("shen-asyncSend:TagAsync", JSONObject.toJSONString(user), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 记录到数据库 成功
// MqLogSuccess mqLogSuccess = bulidMqLogSuccess(message, sendResult);
// mqLogSuccessMapper.insert(mqLogSuccess);
System.out.printf("异步发送成功%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
// 记录到数据库 失败
// MqLogFail mqLogFail = bulidMqLogFail(message);
// mqLogFailMapper.insert(mqLogFail);
System.out.printf("异步发送失败%s%n", e.getMessage());
}
});
}
DefaultMQProducer 方式发送
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("shen-asyncSend");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动producer
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("async_topic",
"TagA",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是send方法需要传入 SendCallback 的实现,SendCallback 接口主要有onSuccess 和 onException 两个方法,表示消息发送成功和消息发送失败。
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
/**
* 单向模式发送
* 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
*/
@PostMapping("/sendOneWay")
public void sendOneWay(@RequestBody UserContent user){
// 消息埋点
// MqLogAll mqLogAll = bulidMqLogAll(message);
// mqLogAllMapper.insert(mqLogAll);记录到数据库
// 发送消息 tags在topic之后用:拼接
rocketMQTemplate.sendOneWay("oneway_topic:TagOneWay", JSONObject.toJSONString(user));
System.out.println("sendOneWay发送成功");
}
DefaultMQProducer 方式发送
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 初始化一个producer并设置Producer group name
DefaultMQProducer producer = new DefaultMQProducer("shen-oneWay");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
producer.sendOneway(msg);
}
// 一旦producer不再使用,关闭producer
producer.shutdown();
}
}
单向模式调用sendOneway,不会对返回结果有任何等待和处理。
顺序消息是一种对消息发送和消费顺序有严格要求的消息。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。
需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。
生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:
满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。
例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。
顺序消息示例代码
class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("sequence_group");
producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("sequence_topic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ ".concat(i + "~~" + Utils.getFormatTime()).getBytes(RemotingHelper.DEFAULT_CHARSET)));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
如果一个Broker掉线,那么此时队列总数是否会发化?
如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(–order)为true,表示顺序消息:
> sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]
其次要保证NameServer中的配置 orderMessageEnable
和 returnOrderTopicConfigToBroker
必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 设置NameServer的地址
producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
// 启动生产者
producer.start();
int totalMessagesToSend = 5;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", (LocalDateTime.now()+",Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间[18个等级])
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
DelayTimeLevel:消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
延时消息的实现逻辑需要先经过定时存储等待触发,延时时间到达后才会被投递给消费者。因此,如果将大量延时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
示例代码:
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
}
这里调用非常简单,将消息打包成 Collection msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
/**
* 消息列表分割器:其只会处理每条消息的大小不超4M的情况【将一个消息集合中的消息分割为多个消息列表(不超4M)】
* 若存在某条消息,其本身大小大于4M,这个分割器无法处理,其直接将这条消息构成一个子列表返回。并没有再进行分割
* 本地SplitterProducer代码不支持单条超过大小限制,若只有一条并超过的限制大小会抛数组越界异常;若多条中其中有一条超过限制,则会丢失一条消息
*/
public class MessageListSplitter implements Iterator<List<Message>> {
// 指定极限值为4M
private final int SIZE_LIMIT = 4 * 1024 * 1024;
// private final int SIZE_LIMIT = 1024;
// 存放所有要发送的消息
private final List<Message> messages;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public MessageListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);
// 统计当前遍历的message的大小
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
// Producer发送消息Message的结构:Topic + Body + Log(固定20字节) + Properties
tmpSize = tmpSize + 20;
// 判断当前消息本身是否大于4M
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
// 当前消息的大小 + 之前统计要发送这一小批次消息列表的大小 》极限值4M
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
// 统计要发送这一小批次消息列表的大小
totalSize += tmpSize;
}
} // end-for
// 获取当前messages列表的子集合[currIndex, nextIndex)
List<Message> subList = messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex = nextIndex;
return subList;
}
}
// 定义批量消息生产者
class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
// 指定要发送的消息的最大大小,默认是4M
// 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的
// maxMessageSize属性
// producer.setMaxMessageSize(8 * 1024 * 1024);
producer.start();
// 定义要发送的消息集合
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
if(i==3)
messages.add(new Message("TopicD", "TagA", "OrderID000", "Hello,RocketMQ 可以试着让该条消息超过限制大小~".getBytes()));
byte[] body = (LocalDateTime.now() + " Hi,RocketMQ " + i).getBytes();
Message msg = new Message("TopicD", "someTag", body);
messages.add(msg);
}
// 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
MessageListSplitter splitter = new MessageListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:
整个事务消息的详细交互流程如下图所示:
import com.formiss.common.Constant;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
(LocalDateTime.now()+" Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
事务消息的发送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer
进行发送,上述的例子中设置了事务回查的线程池,如果不设置也会默认生成一个,最重要的是需要实现 TransactionListener
接口,并传入 TransactionMQProducer
。
executeLocalTransaction
是半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:
checkLocalTransaction是由于二次确认消息没有收到,Broker端回查事务状态的方法。回查规则:本地事务执行完成后,若Broker端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。
此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。
下一篇介绍消费者使用