暂时之后再进行更新
Producer:消息的发送者;举例:发信者
Consumer:消息接收者;举例:收信者
Broker:暂存和传输消息;举例:邮局
NameServer:管理Broker;举例:各个邮局的管理机构
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
类似于 rabbitmq 的 路由键 ( topic交换机 )
Message Queue:相当于是Topic的分区;用于并行发送和接收消息
主题包含多个标签,例如主题春节,标签可以是放鞭炮,团年饭
同一个消费组,给不同的消费者设置不同的tag时,后启动的消费者会覆盖先启动的消费者设置的tag。
生产消息
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息 (这里可以使用发送异步消息 )
6.关闭生产者producer
消费消息
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
ps: 这里的 消费者和rabbitmq 有很大的不同的, rabbitmq 是从队列中取出消费消费 , 这里是从订阅主题消费消息 ,和redis 消费消息的方式很像
下面的 案例 分组可以不同 “g1”组的消费者(DefaultMQPushConsumer) 可以接收 “g2”组的生产者发送的 消息 , tag和topic 必须相同才能接收消息 指定唯一消费路径
//发送同步消息
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("g1");
// 设置NameServer的地址
producer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
producer.setSendMsgTimeout(1000000);
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.println("%s%n"+ sendResult);
System.out.println("发送状态:"+sendResult.getSendStatus());
System.out.println("发送消息的id:"+sendResult.getMsgId());
System.out.println("消息队列的id: "+sendResult.getMessageQueue().getQueueId());
Thread.sleep(1000L);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
这里要注意 ,需要防止主线程先关闭 导致Topic 被删除,消息找不到topic 发送失败咯~~
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("g2");
// 设置NameServer的地址
producer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
producer.setSendMsgTimeout(1000000);
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 防止主线程先关闭!!!
Thread.sleep(60000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
单向消息 是不需要关心结果的消息 ,例如日志收集
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("g3");
// 设置NameServer的地址
producer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
producer.setSendMsgTimeout(1000000);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic", "hi~~ one way message".getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
}
producer.shutdown();
}
}
这里的
-------- 消费者1
/*
* 1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
* */
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
// 设置NameServer的地址
consumer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
consumer.setConsumeTimeout(1000000);
// 消费 tag
consumer.subscribe("topic","tag");
// 设置回调函数 用来接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(consumeConcurrentlyContext);
//返回消费 成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
------------ 消费者2
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
// 设置NameServer的地址
consumer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
consumer.setConsumeTimeout(1000000);
// 消费 tag
consumer.subscribe("TopicTest","TagA");
// 设置回调函数 用来接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
log.info("我是消费者2 ,我消费了消息:{}",messageExt.getMsgId());
byte[] body = messageExt.getBody();
System.out.println(new String(body));
}
System.out.println(consumeConcurrentlyContext);
//返回消费 成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info(" ");
}
topic 和tag 相同 这里默认是轮询的接收消息 消费者1 接收1 3 5 ,消费者2 就接收2 4 6 …
@Slf4j
public class Consumer02 {
/*
* 1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
* */
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
// 设置NameServer的地址
consumer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
consumer.setConsumeTimeout(1000000);
//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 消费 tag
consumer.subscribe("TopicTest","TagA");
// 设置回调函数 用来接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
log.info("我是消费者2 ,我消费了消息:{}",messageExt.getMsgId());
byte[] body = messageExt.getBody();
System.out.println(new String(body));
}
System.out.println(consumeConcurrentlyContext);
//返回消费 成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info(" ");
}
}
这里十条消息 有两个消费者 消费者1 (默认消息接收方式(默认轮询)). 消费者2(广播接收消息模式)
消费者1:接收到了 8 4 6 5 9 0 7
消费者2 :接收了 1–10 全部消息
继续实验
这里有3个消费者
消费者1 消费者3 是默认轮询模式
消费者2 是广播模式
消费者1 3 平分了消息
消费者2 接收了所有的消息
这里出现了个小bug,,,, 消费者1 和消费者2 加起来一共才消费了8 条消息 , 消费者3接收了10条消息
第二次实验 消息9 不见了…
实验3 ,关闭广播模式 的消费者 ,两个消费者不仅能正常监听到消息,还能把之前未监听到的消息 重新监听到…
总结:当 轮询消费者和 广播消费者 混一起监听同一个topic 下的 tag的时候 ,广播模式的消费者可以监听全部的消息 ,
所有 轮询模式消费者 加起来还是会有几条消息没有监听到 ,,,所以 rabbitmq 中的手动ack加消息回退 是多么的重要!!!(题外话)
原始 , 消息的发送者会把消息 发送到各个不同的队列 ,消费者也是 同时监听所有队列
局部消息顺序 ,将 具有顺序性的消息发送给一个队列 , 消费者专门开辟一条线程监听那个队列 ,这样就能实现消息的顺序性
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
// 消息的生产者
// 按照订单号 的不同 订单消息进入不同的 消息队列 (类比一下 哈希取余 )
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException,
InterruptedException {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("g2");
// 设置NameServer的地址
producer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
producer.setSendMsgTimeout(1000000);
// 启动Producrde
producer.start();
//发送消息
List<OrderStep> orderSteps = OrderStep.buildOrders();
//构建消息集合 根据订单id 有选择的去发
for (int i=0;i<orderSteps.size();i++) {
byte[] bytes = JSON.toJSONString(orderSteps.get(i)).getBytes(StandardCharsets.UTF_8);
Message message = new Message("orderTopic","order","i:"+i,bytes);
//new MessageQueueSelector() :消息队列的选择器 ,根据消息的业务标识 实现方法,这里只要订单id 一样,进入的队列就是一样的
// orderId: 业务标识
producer.send(message, new MessageQueueSelector() {
/*
* list:队列集合
* message:消息对象
* arg :业务标识参数 === orderSteps.get(i).getOrderId()
* */
// 选择队列的方法
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
long orderId = (long) arg; // 订单id
long index = orderId % list.size();// 消息被发送的队列
return list.get((int) index);
}
},orderSteps.get(i).getOrderId());
}
producer.shutdown();
}
// 消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
// 设置NameServer的地址
consumer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
consumer.setConsumeTimeout(1000000);
// 消费 tag
consumer.subscribe("orderTopic","*");
//注册监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(" ThreadName:"+Thread.currentThread().getName()+" QueueID:"+msg.getQueueId()
+",获得的消息是:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动");
}
}
打印的结果 (我 区分了一下)
ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"创建","orderId":1039}
ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"付款","orderId":1039}
ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"推送","orderId":1039}
ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"完成","orderId":1039}
ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"创建","orderId":7235}
ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"付款","orderId":7235}
ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"完成","orderId":7235}
ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"创建","orderId":1065}
ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"付款","orderId":1065}
ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"完成","orderId":1065}
可以很清楚的看到 线程id 和消息队列id 是一一对应的… 不会出现 一个消息队列里的消息同时被两条线程消费 ,这样打印出来的消息就是顺序的了
message.setDelayTimeLevel(2); 消息延迟两秒
消息会在消息队列延迟2秒
这里可以做成定时任务 ,
实验了一下 ,,这里不能做消息队列
我 模拟了一百条消息 , 每条消息都设置了随机 的延迟时间,并没有发现 延迟时间短的先出来 ,接收消息都是随机的,乱序的
总之一句话 和rabbitmq 差别很大…
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
解决方案
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
// 消息发送者
public static void main(String[] args) throws Exception{
// 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("g3");
// 设置NameServer的地址
producer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
producer.setSendMsgTimeout(1000000);
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagB", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagC", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
producer.shutdown();
}
// 消息接收者
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
// 设置NameServer的地址
consumer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
consumer.setConsumeTimeout(1000000);
// 消费 tag TagA||TagB :只消费 这两种tag , * 消费BatchTest下全部的tag
consumer.subscribe("BatchTest","TagA||TagB");
// 设置回调函数 用来接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
log.info("我消费了消息:{},消息延迟了:{},消息队列是:{},消息TAG是:{}",messageExt.getMsgId(),
System.currentTimeMillis()-messageExt.getStoreTimestamp(),messageExt.getQueueId()
,messageExt.getTags().toString()
);
byte[] body = messageExt.getBody();
System.out.println(new String(body));
}
System.out.println(consumeConcurrentlyContext);
//返回消费 成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info(" ");
}
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
常量支持类型为:
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
Exception in thread “main” org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
需要更改 enablePropertyFilter 属性
sql 过滤失效
这样就好了
消费者 和 生产者
public static void main(String[] args) throws Exception{
// 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("g1");
// 设置NameServer的地址
producer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
producer.setSendMsgTimeout(1000000);
producer.start();
for (int i = 0; i < 20; i++) {
Random random = new Random();
Message message = new Message("topic1", "TagA","hi~~ one way message".getBytes(StandardCharsets.UTF_8));
// putUserProperty : 便于 消费者使用sql 语句进行 过滤 ~~
message.putUserProperty("i", String.valueOf(i));
producer.send(message);
}
producer.shutdown();
}
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
// 设置NameServer的地址
consumer.setNamesrvAddr("116.205.161.47:9876");
//增大超时时间,防止超时
consumer.setConsumeTimeout(1000000);
//
MessageSelector messageSelector = MessageSelector.bySql("i > 5");
consumer.subscribe("topic1",messageSelector);
// 设置回调函数 用来接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
log.info("我消费了消息:{},消息队列是:{},消息TAG是:{},消息的properties是:{}",messageExt.getMsgId(),
messageExt.getTags().toString(),messageExt.getProperties().toString()
);
byte[] body = messageExt.getBody();
System.out.println(new String(body));
}
System.out.println(consumeConcurrentlyContext);
//返回消费 成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info(" ");
}
执行结果
6:44:52.956 [main] INFO com.example.rocketmq.Consumer -
16:50:06.561 [ConsumeMessageThread_1] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24DF50006,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=32, i=6, CONSUME_START_TIME=1661158204994, UNIQ_KEY=C0A82BF2316818B4AAC26FC24DF50006, WAIT=true, TAGS=TagA},消息的properties是:{}
16:50:06.561 [ConsumeMessageThread_11] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC250560010,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=35, i=16, CONSUME_START_TIME=1661158205592, UNIQ_KEY=C0A82BF2316818B4AAC26FC250560010, WAIT=true, TAGS=TagA},消息的properties是:{}
16:50:06.561 [ConsumeMessageThread_9] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24FEC000E,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=34, i=14, CONSUME_START_TIME=1661158205468, UNIQ_KEY=C0A82BF2316818B4AAC26FC24FEC000E, WAIT=true, TAGS=TagA},消息的properties是:{}
16:50:06.562 [ConsumeMessageThread_8] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24FBB000D,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=34, i=13, CONSUME_START_TIME=1661158205421, UNIQ_KEY=C0A82BF2316818B4AAC26FC24FBB000D, WAIT=true, TAGS=TagA},消息的properties是:{}
hi~~ one way message
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext@17042905
hi~~ one way message
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
当消息发送之后如果没有被 提交 消息是不能 被 消费者消费的!!
executeLocalTransaction() 给消息设置是否能提交
checkLocalTransaction() 回查
* TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
* TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
* TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
生产者
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.Random;
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化生产者
TransactionMQProducer producer = new TransactionMQProducer("g1");
// 设置NameServer的地址
producer.setNamesrvAddr("116.205.161.47:9876");
//设置监听者
producer.setTransactionListener(new TransactionListener() {
// 当消息被 mq接收到了 会触发该方法 ,对消息进行处理 COMMIT_MESSAGE(提交) ROLLBACK_MESSAGE(丢丢)UNKNOW(被checkLocalTransaction()处理)
//该方法中执行本地事务 ROLLBACK_MESSAGE:消息会被丢掉
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
if (message.getTags().equals("A")){
return LocalTransactionState.COMMIT_MESSAGE;
}else if (message.getTags().equals("B")){
return LocalTransactionState.ROLLBACK_MESSAGE;
}else if (message.getTags().equals("C")){
return LocalTransactionState.UNKNOW ;
}else {
return LocalTransactionState.UNKNOW ;
}
}
//该方法 对事务状态的回查
// unknow 的消息会被 checkLocalTransaction() 处理
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息Tag:"+messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
//增大超时时间,防止超时
producer.setSendMsgTimeout(1000000);
producer.start();
String[] tag = {"A","B","C"} ;
for (int i = 0; i < 3; i++) {
Message message = new Message("topic1", tag[i],"hi~~ one way message".getBytes(StandardCharsets.UTF_8));
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
System.out.println("发送结果是: "+result.getSendStatus());
}
// producer.shutdown();
}
}
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener
类来修改这个行为。transactionMsgTimeout
参数。 <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
@Test
void contextLoads() {
rocketMQTemplate.convertAndSend("springtest","hello Rocket");
}
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "my-group", topic = "springtest")
public class Mqlistener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接收到的消息是:{}",s);
}
}
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
```![在这里插入图片描述](https://img-blog.csdnimg.cn/7b0e0978ab8346eeb0ba5d450b8fadbd.png)
# 消息重试
>对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
```c
# 异常重试
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//处理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
# 异常不重试
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置, key存入 数据库 来搞幂等性:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 key 做幂等处理
}
});