看视频的时候,到这一章节,弹幕里全是终于熬到这一章了,哈哈,对于大家来说,看理论知识确实很枯燥,不过很多东西必须要有一定的理论知识为我们做支撑,否则就是墙上芦苇、山间竹笋。
那么,我们就开始RocketMQ的应用,使用代码来实现各项功能吧。
Producer 发送一条消息,Broker 收到后会返回一个结果。我们此处会打印出结果信息,并在dashboard上查看是否有相关消息信息。
代码:
// 创建一个producer,参数pg为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr("rocketmq:9876");
// 设置当发送失败时,重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限,默认3s
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
// 生产并发送消息
for (int i = 0; i < 100; i++){
byte[] body = ("Hello," + i).getBytes();
Message message = new Message("firstTopic", "firstTag", body);
// 设置key
message.setKeys("key-"+i);
SendResult result = producer.send(message);
System.out.println(result);
}
// 关闭producer
producer.shutdown();
打印结果:
SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF47D0000, offsetMsgId=C0A83D6500002A9F000000000008C8B6, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF7560001, offsetMsgId=C0A83D6500002A9F000000000008C970, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF79F0002, offsetMsgId=C0A83D6500002A9F000000000008CA2A, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF7E00003, offsetMsgId=C0A83D6500002A9F000000000008CAE4, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=1], queueOffset=0]
......
dashboard的Topic:
dashboard的Message:
异步消息发送,在Producer发送消息后,并不会等待ACK才继续,当收到ACK后会执行onSuccess回调,出错会执行onException回调。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmq:9876");
// 设置当发送失败时,重试发送的次数,默认为2次
producer.setRetryTimesWhenSendAsyncFailed(3);
// 设置Queue的个数,默认为4
producer.setDefaultTopicQueueNums(3);
producer.start();
// 生产并发送消息
for (int i = 0; i < 100; i++){
byte[] body = ("Hello," + i).getBytes();
Message message = new Message("AsyncTopic", "AsyncTag", body);
message.setKeys("key-"+i);
// 异步发送
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
// 休眠3秒,因为是异步,不休眠就会导致还未发送producer就被关闭,导致程序报错
TimeUnit.SECONDS.sleep(3);
producer.shutdown();
SendResult [sendStatus=SEND_OK, msgId=7F0000012C4018B4AAC25F060205005C, offsetMsgId=C0A83D6500002A9F000000000009A797, messageQueue=MessageQueue [topic=AsyncTopic, brokerName=rocketMQ, queueId=0], queueOffset=31]
SendResult [sendStatus=SEND_OK, msgId=7F0000012C4018B4AAC25F0602050060, offsetMsgId=C0A83D6500002A9F000000000009A85E, messageQueue=MessageQueue [topic=AsyncTopic, brokerName=rocketMQ, queueId=0], queueOffset=32]
SendResult [sendStatus=SEND_OK, msgId=7F0000012C4018B4AAC25F0602040043, offsetMsgId=C0A83D6500002A9F000000000009A925, messageQueue=MessageQueue [topic=AsyncTopic, brokerName=rocketMQ, queueId=0], queueOffset=33]
......
Producer向Broker发送消息后,并不会有返回值。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmq:9876");
producer.start();
// 生产并发送消息
for (int i = 0; i < 100; i++){
byte[] body = ("Hello," + i).getBytes();
Message message = new Message("OneWayTopic", "OneWayTag", body);
producer.sendOneway(message);
}
producer.shutdown();
// 指定一个pull消费者
// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
// 指定一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("rocketmq:9876");
// 指定消费的Topic和Tag
consumer.subscribe("firstTopic","*");
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一旦Broker中有了订阅的消息就会触发监听器
// 返回值为当前consumer消费状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 消费消息
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费
consumer.start();
System.out.println("starting...");
严格按照消息的发送顺序进行消费的消息。
根据有序范围的不同,RocketMQ可以严格的保证两种消息的有序性:全局有序和分区有序。
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,成为全局有序。
全局有序比较简单,就是将queue设置为1就好了,就不代码演示。
如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,成为分区有序。
Queue的选择:
在定义Producer时,我们可以指定消息队列选择器,它需要实现MessageQueueSelector接口。
定义选择器时要使用选择key,它可以是消息key,也可以是其他值。要求:唯一性。
一般的选择算法:让选择key(或其hash值)与该topic所包含的Queue的数量进行取模。
/**
* Producer,发送顺序消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("rocketmq:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTestA", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("rocketmq:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestA", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(5));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
当消息写入Broker后,在指定的时长后才可被消费处理的消息。
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
下面是Producer的代码,如果取消message.setDelayTimeLevel(3);这一行代码,则跟普通发送一样了。
消费和普通的一样,就不列举了。
DefaultMQProducer producer = new DefaultMQProducer("ProducerG1");
producer.setNamesrvAddr("rocketmq:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("TopicA", "TagA", ("Hello-" + i).getBytes());
// 指定延迟等级为3,即延迟10s消费,有可能存在一定的误差
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
RocketMQ提供了类似X/open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致,XA是一种分布式事务的解决方案,一种分布式事务处理模式。
生产者进行消息发送时可以一次性发送多条消息,这可以大大提高Producer的发送效率。
注意:
批量发送的大小
默认情况下,一批发送的消息总大小不能超过4MB字节,如需超过,有两种解决办法:
生产者发送的消息
send(message)是将message生成一个字符串进行发送,这个字符串由四个部门组成:Topic、消息body、消息日志、及用于描述消息的一堆key-value键值对。这些属性包括生产者地址、生产时间等信息,最终写入到Broker中消息单元的属性中。
为了防止消息超过4m,超过了就将消息分割成消息列表
public class MessageListSplitter implements Iterable<List<Message>>{
private final int SIZE_LIMIT = 4 * 1024 * 1024;
private final List<Message> MESSAGES;
// 要进行批量操作的小集合的起始索引
private int currentIndex;
public MessageListSplitter(List<Message> messages) {
MESSAGES = messages;
}
public boolean hashNext(){
return currentIndex < MESSAGES.size();
}
public List<Message> next(){
int nextIndex = currentIndex;
int totalSize = 0;
for(;nextIndex < MESSAGES.size(); nextIndex++){
Message message = MESSAGES.get(nextIndex);
// 计算当前消息的长度
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String,String> properties = message.getProperties();
for (Map.Entry<String,String> entry: properties.entrySet()) {
tmpSize = entry.getKey().length() + entry.getValue().length();
}
tmpSize += 20;
if (tmpSize > SIZE_LIMIT) {
if(nextIndex - currentIndex == 0){
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}// for - end
// subList -> [ )
List<Message> subList = MESSAGES.subList(currentIndex, nextIndex);
currentIndex = nextIndex;
return subList;
}
}
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmq:9876");
// 只写此处是不行的,还得修改Broker加载的配置文件MaxMessageSize属性
// producer.setMaxMessageSize(8 * 1024 * 1024);
producer.start();
List<Message> messageList = new ArrayList<Message>();
for (int i = 0; i < messageList.size(); i++) {
Message message = new Message("BatchTopic","BatchTag", "Batchkey" + i,("Hello" + i).getBytes());
messageList.add(message);
}
MessageListSplitter splitter = new MessageListSplitter(messageList);
while(splitter.hashNext()) {
try {
List<Message> item = splitter.next();
producer.send(item);
} catch (Exception e) {
e.printStackTrace();
}
} // while - end
producer.shutdown();
// 指定每次批处理多少条消息
consumer.setConsumeMessageBatchMaxSize(1);
// 指定可以从Broker拉取的消息数量
consumer.setPullBatchSize(32);
Tag过滤 和 SQL 过滤
Tag过滤 : 使用 || 分割,例如:TagA || TagC
SQL过滤可以看成是Tag过滤的升级,可以进行复杂过滤。
通过message.putUserProperty()事先埋入属性。如message.putUserProperty("age", i + "")
;
消费时consumer.subscribe("topicAA", MessageSelector.bySql("age > 2"));
通过bySQL进行过滤。
Broker默认情况下没有开启SQL过滤,如需使用,需要修改配置文件的
enablePropertyFilter = true;
Producer对发送失败的消息进行重新发送的机制称为消息发送重试机制,也称为消息重投机制。
特点:
当consumer消费消息失败后,为了保证消息的顺序性,其会自动不断的进行消息重试,直到成功消费,重试期间应用会出现消息消费阻塞的情况。因此,此时一定要保证应用能够及时监控并处理消费失败的消息,避免永久阻塞。
重试的时间间隔默认是1000ms,可以通过consumer.setSuspendCurrentQueueTimeMillis(ms);
来修改。ms的取值范围为10-30000ms。
顺序消息是没有发送重试,而有消费重试的。
当consumer消费消息失败后,可以通过设置返回状态达到消息重试的效果。
无序消息只对集群消费模式生效。
广播消费方式没有消费重试,失败后不再重试,继续消费后续消息。
在无序消息集群模式下,每条消息最多重试16次,但每次的时间间隔不一样。会逐渐变长,如下。
可以通过consumer.setMaxReconsumeTimes();
修改次数。若这个次数小于等于16次,则仍然按照时间间隔执行,若大于16次,则都是2小时。
对于consumer group,若仅修改了一个consumer的消费次数,则会应用到该group的其他consumer实例。
若修改了多个consumer实例,则采用覆盖策略,最后的修改会覆盖前面的修改。
对于需要重试消费的消息,,并不是consumer在等待相应时间后再去拉取原来的消息进行消费,而是将这些消息放入一个特殊的Topic的队列中,而后进行再次消费,这个队列就是重试队列。Broker为消费者组创建的队列名称为%RETRY%consumerGroup@consumerGroup。
无序消息的集群模式下,消息失败后希望进行重试,需要在消息监听器接口的实现中明确进行如下三种之一的配置。
无序消息的集群模式下,消息失败后不希望进行重试,则捕获异常后返回ConsumeConcurrentStatus.CONSUME_SUCCESS即可。
当重试次数达到最大次数时,仍然无法消费消息,表明消费者在正常情况下无法正确的消费消息,此时就会将该消息发送到一个特殊队列——死信队列。其中的消息称为死信消息。
特征:
处理:
当一条消息进入死信队列,就意味着系统中某些地方出现了问题,导致消费者无法正常处理消息。因此,对于私信消息,通常需要开发人员进行特殊处理,最关键的步骤是排查可疑因素,解决可能存在的bug,然后将原来的私信消息进行重新投递消费。
前前后后还是花了很多时间来学习RocketMQ,主要是根据尚硅谷的视频来的,因此,在这几章里,有的地方是直接截取的课件的内容。虽然说,从头到尾跟了一遍,但个人感觉还是不太够,理论知识比较多,实操还是太少了点,因此,我需要再去找点东西练练手,最后再将RocketMQ融入到Spring Boot 或者 Spring Cloud中去。需要多次的练手,融合贯通知识,才能学有所成,学有所用。一起卷起来吧,再会。
B站尚硅谷视频:https://www.bilibili.com/video/BV1cf4y157sz
GitHub:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
GitHub上的比较的新且全,实例也比较好,可以多参考。